博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop源码分析36 Child的Reduce分析
阅读量:4663 次
发布时间:2019-06-09

本文共 5609 字,大约阅读时间需要 18 分钟。

分析任务reduce_1

args =[127.0.0.1, 42767, attempt_201405060431_0003_r_000001_0,/opt/hadoop-1.0.0/logs/userlogs/job_201405060431_0003/attempt_201405060431_0003_r_000001_0,1844231936]

myTask = JvmTask{ shouldDie=false, t=ReduceTask{

jobFile="/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201405060431_0003/job.xml", taskId=attempt_201405020918_0003_r_000001_0,taskProgress=reduce,taskStatus=ReduceTaskStatus{UNASSIGNED}} }

job=JobConf{

Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201405060431_0003/job.xml}

outputFormat = TextOutputFormat@51386c70

committer = FileOutputCommitter{

outputFileSystem=DFSClient,

outputPath=/user/admin/out/123 ,

workPath=hdfs://server1:9000/user/admin/out/123/_temporary/_attempt_201405060431_0003_r_000001_0}

workDir=/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201405020918_0003/_attempt_201405060431_0003_r_000001_0

jar=/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201405020918_0003/jars/job.jar

jobCacheDir=/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201405020918_0003/jars

numCopiers  = 5

maxInFlight= 20

combinerRunner=CombinerRunner{ job={

Configuration: core-default.xml,core-site.xml, mapred-default.xml, mapred-site.xml,hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201405020918_0003/job.xml,committer=null, keyClass=org.apache.hadoop.io.Text,valueClass=org.apache.hadoop.io.IntWritable},

combineCollector = Task$CombineOutputCollector@72447399{

progressBar=10000,}

ioSortFactor =10

maxInMemOutputs =1000

maxInMemOutputs =0.66

maxRedPer =0.0

ramManager=ReduceTask$ReduceCopier$ShuffleRamManager@46e9d255 {

maxSize=141937872, maxSingleShuffleLimit=35484468}

的线程copiers={

[Thread[MapOutputCopierattempt_201405020918_0003_r_000000_1.0,5,main],

Thread[MapOutputCopierattempt_201405020918_0003_r_000000_1.1,5,main],

Thread[MapOutputCopierattempt_201405020918_0003_r_000000_1.2,5,main],

Thread[MapOutputCopierattempt_201405020918_0003_r_000000_1.3,5,main],

Thread[MapOutputCopierattempt_201405020918_0003_r_000000_1.4,5,main]]}

的线程localFSMergerThread=Thread[Threadfor merging on-disk files,5,main]

的线程inMemFSMergeThread=Thread[Threadfor merging in memory files,5,main]

的线程getMapEventsThread=Thread[Threadfor polling Map Completion Events,5,main]

线程getMapEventsThreadRPC请求:getMapCompletionEvents(JobID=job_201405060431_0003, fromEventId=2, MAX_EVENTS_TO_FETCH= 10000, TaskID=attempt_201405060431_0003_r_000001_0,jvmContext={

jvmId=jvm_201405060431_0003_r_1844231936,pid= 10727 })

线程getMapEventsThreadRPC响应:

[Task Id : attempt_201405060431_0003_m_000001_0,Status : SUCCEEDED,

Task Id : attempt_201405060431_0003_m_000000_0,Status : SUCCEEDED]

放入mapLocations={

server2=[ReduceTask$ReduceCopier$MapOutputLocation{

taskAttemptId=attempt_201405060431_0003_m_000000_0,taskId=task_201405060431_0003_m_000000,taskOutput=http://server2:50060/mapOutput?job=job_201405060431_0003&map=attempt_201405060431_0003_m_000000_0&reduce=0}],

server3=[ReduceTask$ReduceCopier$MapOutputLocation{

taskAttemptId=attempt_201405060431_0003_m_000001_0,taskId=task_201405060431_0003_m_000001,taskOutput=http://server3:50060/mapOutput?job=job_201405060431_0003&map=attempt_201405060431_0003_m_000001_0&reduce=0}]

}

混洗server,打乱server的顺序:

hostList.addAll(mapLocations.keySet());            

Collections.shuffle(hostList,this.random);

再一个个放入容器:

uniqueHosts.add(host);

scheduledCopies.add(loc);

主线程唤醒 MapOutputCopier线程: scheduledCopies.notifyAll()

线程MapOutputCopierHTTP请求:http://server3:50060/mapOutput?job=job_201405060431_0003&map=attempt_201405060431_0003_m_000000_0&reduce=1

因数据比较少,写到内存中(shuffleData->mapOutput.data->.mapOutputsFilesInMemory 

线程MapOutputCopierHTTP请求:http://server2:50060/mapOutput?job=job_201405060431_0003&map=attempt_201405060431_0003_m_000001_0&reduce=1

因数据比较少,写到内存中(shuffleData->mapOutput.data->.mapOutputsFilesInMemory 

更新copyResults={

CopyResult{
MapOutputLocation=http://server3:50060/mapOutput?job=job_201405060431_0003&map=attempt_201405060431_0003_m_000000_0&reduce=1}
CopyResult{
MapOutputLocation=http://server2:50060/mapOutput?job=job_201405060431_0003&map=attempt_201405060431_0003_m_000001_0&reduce=1}}

Merge内容:reduceCopier.createKVIterator(job,rfs, reporter)

合并两部分到: /tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201405060431_0003/attempt_201405060431_0003_r_000001_0/output/map_0.out

这里用的也是优先级队列(小根堆Heap)

添加到mapOutputFilesOnDisk = {

/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201405060431_0003/attempt_201405060431_0003_r_000001_0/output/map_0.out}

因为只有一个文件,故而不需要继续Merge

运行reducereducer.run(reducerContext)

RPC请求:commitPending(taskId=attempt_201405060431_0003_r_000001_0,taskStatus=COMMIT_PENDING,jvmContext);

RPC响应:无

 

RPC请求:canCommit(taskId=attempt_201405060431_0003_r_000001_0jvmContext);

RPC响应:true

提交任务:

复制hdfs://server1:9000/user/admin/out/123/_temporary/_attempt_201405060431_0003_r_000001_0/part-r-00001

: /user/admin/out/123/part-r-00001

 

最后的CleanUp Task

[127.0.0.1, 42767,attempt_201405060431_0003_m_000002_0,/opt/hadoop-1.0.0/logs/userlogs/job_201405060431_0003/attempt_201405060431_0003_m_000002_0,47579841]

JvmTask={ shouldDie=falset=MapTask {

jobCleanup=true,jobFile="/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201405060431_0003/job.xml}}

删除文件:/user/admin/out/123/_temporary

创建文件:/user/admin/out/123/_SUCCESS

删除文件:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201405060431_0003

 

 

 

 

转载于:https://www.cnblogs.com/leeeee/p/7276476.html

你可能感兴趣的文章
mysql慢查询日志分析
查看>>
HTTP从入门到入土(4)——URI、URL和URN
查看>>
photoshop自动切图,导出svg,支持阿里巴巴图标库上传相互转换
查看>>
UI事件 计算器界面
查看>>
lucene简单使用demo
查看>>
实验三
查看>>
敏捷开发
查看>>
Visual Studio 2017离线安装包,百度云分流
查看>>
【SAS BASE】SAS格式、缺失值表示、命名规则及路径
查看>>
杭电2096
查看>>
程序员不得不知的座右铭(中国篇)
查看>>
中国大学MOOC-数据结构基础习题集、06-4、How Long Does It Take
查看>>
第四章 串的基本操作【数据结构】
查看>>
嵌入式系统
查看>>
web前端面试题
查看>>
冲刺第十九天
查看>>
POJ 2376 Cleaning Shifts
查看>>
HDU 5596 ——GTW likes gt——————【想法题】
查看>>
python多线程不断刷新网页的源码
查看>>
MySQL5.7配置GTID主从---搭建GTID主从
查看>>