什么是MapReduce
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)“和"Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。 map task通过InputFormat接口中的TextInputFormat来读取文件
MapReduce的优缺点
优点: 1.Mapreduce易于编程. 它简单的实现一些接口,就可以完成一个分布式程序,这个程序可以分布到大量的廉价的pc机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特性使的Mapreduce编程变得非常流行。 2.良好的扩展性. 项目当你的计算资源得不到满足的时候,你可以通过简单的通过增加机器来扩展它的计算能力 3.高容错性 Mapreduce的设计初衷就是使程序能够部署在廉价的pc机器上,这就要求它具有很高的容错性。比如一个机器挂了,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由hadoop内部完成的。 适合PB级以上海量数据的离线处理
缺点: 1.无法进行实时计算 2.无法进行流式计算 3.DAG(有向图)计算。多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
MapReduce的流程
MapReduce的流程图
MapReduce的源码解析
InputFormat的源码
public abstract class InputFormat
<K
, V
> {
public abstract List
<InputSplit
> getSplits(JobContext context
) throws IOException
, InterruptedException
;
public abstract RecordReader
<K
,V
> createRecordReader(InputSplit split
, TaskAttemptContext context
) throws IOException
, InterruptedException
;
}
}
RecordReader的源码
public class TextInputFormat extends FileInputFormat
<LongWritable
, Text
> implements
public RecordReader
<LongWritable
, Text
> getRecordReader(InputSplit genericSplit
, JobConf job
, Reporter reporter
) throws IOException
{
reporter
.setStatus(genericSplit
.toString());
String delimiter
= job
.get("textinputformat.record.delimiter");
byte
[] recordDelimiterBytes
= null
;
if (null
!= delimiter
) {
recordDelimiterBytes
= delimiter
.getBytes(Charsets
.UTF_8
);
}
return new
LineRecordReader(job
, (FileSplit
)genericSplit
, recordDelimiterBytes
);
}
}
InputSplit的源码
public InputSplit
[] getSplits(JobConf job
, int numSplits
) throws IOException
{
Stopwatch sw
= (new
Stopwatch()).start();
FileStatus
[] files
= this
.listStatus(job
);
job
.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files
.length
);
long totalSize
= 0L;
FileStatus
[] arr$
= files
;
int len$
= files
.length
;
for(int i$
= 0; i$
< len$
; ++i$
) {
FileStatus file
= arr$
[i$
];
if (file
.isDirectory()) {
throw new
IOException("Not a file: " + file
.getPath());
}
totalSize
+= file
.getLen();
}
long goalSize
= totalSize
/ (long)(numSplits
== 0 ? 1 : numSplits
);
long minSize
= Math
.max(job
.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this
.minSplitSize
);
ArrayList
<FileSplit
> splits
= new
ArrayList(numSplits
);
NetworkTopology clusterMap
= new
NetworkTopology();
FileStatus
[] arr$
= files
;
int len$
= files
.length
;
for(int i$
= 0; i$
< len$
; ++i$
) {
FileStatus file
= arr$
[i$
];
Path path
= file
.getPath();
long length
= file
.getLen();
if (length
== 0L) {
splits
.add(this
.makeSplit(path
, 0L, length
, new String
[0]));
} else {
FileSystem fs
= path
.getFileSystem(job
);
BlockLocation
[] blkLocations
;
if (file instanceof LocatedFileStatus
) {
blkLocations
= ((LocatedFileStatus
)file
).getBlockLocations();
} else {
blkLocations
= fs
.getFileBlockLocations(file
, 0L, length
);
}
if (!this
.isSplitable(fs
, path
)) {
String
[][] splitHosts
= this
.getSplitHostsAndCachedHosts(blkLocations
, 0L, length
, clusterMap
);
splits
.add(this
.makeSplit(path
, 0L, length
, splitHosts
[0], splitHosts
[1]));
} else {
long blockSize
= file
.getBlockSize();
long splitSize
= this
.computeSplitSize(goalSize
, minSize
, blockSize
);
long bytesRemaining
;
String
[][] splitHosts
;
for(bytesRemaining
= length
; (double)bytesRemaining
/ (double)splitSize
> 1.1D
; bytesRemaining
-= splitSize
) {
splitHosts
= this
.getSplitHostsAndCachedHosts(blkLocations
, length
- bytesRemaining
, splitSize
, clusterMap
);
splits
.add(this
.makeSplit(path
, length
- bytesRemaining
, splitSize
, splitHosts
[0], splitHosts
[1]));
}
if (bytesRemaining
!= 0L) {
splitHosts
= this
.getSplitHostsAndCachedHosts(blkLocations
, length
- bytesRemaining
, bytesRemaining
, clusterMap
);
splits
.add(this
.makeSplit(path
, length
- bytesRemaining
, bytesRemaining
, splitHosts
[0], splitHosts
[1]));
}
}
}
}
sw
.stop();
if (LOG
.isDebugEnabled()) {
LOG
.debug("Total # of splits generated by getSplits: " + splits
.size() + ", TimeTaken: " + sw
.elapsedMillis());
}
return (InputSplit
[])splits
.toArray(new FileSplit
[splits
.size()]);
}
InputSplit的源码 目的是区分分片
public abstract class InputSplit
{
public
InputSplit() {
}
public abstract
long getLength() throws IOException
, InterruptedException
;
public abstract String
[] getLocations() throws IOException
, InterruptedException
;
@Evolving
public SplitLocationInfo
[] getLocationInfo() throws IOException
{
return null
;
}
}
我们在写Mapper代码的时候设置的变量(LongWritable,Text)就是map的key和value。LongWritable表示偏移量可以理解为记录的行号,Text表示一行的语句
public class Mapper extends org
.apache
.hadoop
.mapreduce
.Mapper
<LongWritable
,Text
,Text
,IntWritable
>
经过分片后获得的键值对key和value放到Mapper中的map方法进行逻辑处理形成新的键值对,在通过context.write方法输出到OutputCollector收集器中.OutputCollector把收集到的(k,v)键值对写入到环形缓冲区中(环形的优势:输入的时候会先清除,清除完了再输入.并且内存利用率高),环形缓冲区默认的大小为100M,只写80%(因为填充因子,剩下的20%要记录环形缓冲区的信息).然后将环形缓冲区的内容拿出来根据HashPartitioner来进行分区(因为要散列,就是将内容平均的分开所以用Hash来分区)然后要根据key进行快排分好区,接着用Combiner来进行归并将相同的归类,形成一个大文件;reduce task根据自己的分区号,去各个map task节点上copy相同的partition的数据到reduce task 本地磁盘工作目录,并且reduce task 会吧同一分区的来自不同的map task的结果文件,在进行merge合并成一个大文件(归并排序),大文件内容按照k有序