大家都知道 mapreduce 分为 map 和reduce 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 reduce;
大家都明白 map接受一个参数,经过map处理后,将处理结果作为reduce的入参分发给reduce,然后在reduce中统计了word 的数量,最终输出到输出结果;
但是初看遇到的问题:
一、map的输入参数是个 text之类的 对象,并不是 file对象
二、reduce中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在reduce的时候才判断的
三、map过程到底做了什么,reduce过程到底做了什么?为什么它能够做到多个map多个reduce?
一、
1. 怎么将 文件参数 传递 到 job中呢?
在 client 我们调用了fileinputformat.addinputpath(job, new path(otherargs[0]));
实际上 addinputpath 做了以下的事情(将文件路径加载到了conf中)
public static void addinputpath(job job,
path path) throws ioexception {
configuration conf = job.getconfiguration();
path = path.getfilesystem(conf).makequalified(path);
string dirstr = stringutils.escapestring(path.tostring());
string dirs = conf.get(input_dir);
conf.set(input_dir, dirs == null ? dirstr : dirs "," dirstr);
}
我们再来看看 fileinputformat 是做什么用的, fileinputformat 实现了 inputformat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于inputformat,这是一个抽象类,其子类有专门用于读取普通文件的fileinputformat,用来读取数据库的dbinputformat等等。
我们会看到 在 inputformat 接口中 有getsplits方法,也就是说分片操作实际上实在 map之前 就已经做好了
list
generate the list of files and make them into filesplits.
具体实现参考 fileinputformat getsplits 方法:
上面是fileinputformat的getsplits()方法,它首先得到分片的最小值minsize和最大值maxsize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由liststatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computesplitsize计算出分片大小splitsize,计算方法是:math.max(minsize, math.min(maxsize, blocksize));也就是保证在minsize和maxsize之间,且如果minsize<=blocksize<=maxsize,则设为blocksize。然后我们根据这个splitsize计算出每个文件的inputsplits集合,然后加入分片列表splits中。注意到我们生成inputsplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:mapreduce.input.num.files。
二、计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?
我们使用了 就是inputformat中的另一个方法createrecordreader() 这个方法:
recordreader:
recordreader是用来从一个输入分片中读取一个一个的k -v 对的抽象类,我们可以将其看作是在inputsplit上的迭代器。我们从api接口中可以看到它的一些方法,最主要的方法就是nextkeyvalue()方法,由它获取分片上的下一个k-v 对。
可以看到接口中有:
public abstract boolean nextkeyvalue() throws ioexception, interruptedexception;
public abstract keyin getcurrentkey() throws ioexception, interruptedexception;
public abstract valuein getcurrentvalue() throws ioexception, interruptedexception;
public abstract float getprogress() throws ioexception, interruptedexception;
public abstract void close() throws ioexception;
fileinputformat
direct known subclasses:
combinefileinputformat, keyvaluetextinputformat, nlineinputformat, sequencefileinputformat, textinputformat
对于 wordcount 测试用了 nlineinputformat和 textinputformat 实现类
在 inputformat 构建一个 recordreader 出来,然后调用recordreader initialize 的方法,初始化recordreader 对象
那么 到底 map是怎么调用 的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了recordreader中,
下面继续看看这些recordreader是如何被mapreduce框架使用的
终于 说道 map了 ,我么如果要实现map 那么 一定要继承 mapper这个类
public abstract class context
implements mapcontext
}
protected void setup(context context) throws ioexception, interruptedexception
protected void map(keyin key, valuein value, context context) throws ioexception,interruptedexception { }
protected void cleanup(context context ) throws ioexception, interruptedexception { }
public void run(context context) throws ioexception, interruptedexception { }
我们写mapreduce程序的时候,我们写的mapper都要继承这个mapper.class,通常我们会重写map()方法,map()每次接受一个k-v对,然后我们对这个k-v对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个list之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出k-v对。举个例子就是:inputsplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个k-v对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);
最后我们看看mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextkeyvalue()获取的k-v对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextkeyvalue()就是使用了相应的recordreader来获取k-v对的。
我们看看mapper.class中的context类,它继承与mapcontext,使用了一个recordreader进行构造。下面我们再看这个mapcontext。
public mapcontextimpl(configuration conf, taskattemptid taskid,
recordreader
recordwriter
outputcommitter committer,
statusreporter reporter,
inputsplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
recordreader 看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 recordreader 传过来了 ?
我们可以想象 这里 应该被框架调用的可能性比较大了,那么mapreduce 框架是怎么分别来调用map和reduce呢?
还以为分析完map就完事了,才发现这里仅仅是做了mapreduce 框架调用前的一些准备工作,
还是继续分析 下 mapreduce 框架调用吧:
1.在 job提交 任务之后 首先由jobtrack 分发任务,
在 任务分发完成之后 ,执行 task的时候,这时 调用了 maptask 中的 runnewmapper
在这个方法中调用了 mapcontextimpl, 至此 这个map 和框架就可以联系起来了。
我的名字刚好二十个字不信你数数