MapReduce是一个分布式计算框架
起源于Google
它将大型数据操作作业分解为可以跨服务器集群并执行单个任务
适用于大规模数据处理场景
每个节点处理存储在该节点的数据
每个job包含Map和Reduce两部分
分而治之
简化并行计算的编程模型
构建抽象模型:Map和Reduce
开发人员专注于实现Mapper 和Reducer 函数
隐藏系统层细节
优点:
易于编程:实现接口就能完成一个分布式计算框架
可扩展性强
高容错性
高吞吐量:处理的数据量
不适用领域
难以实时计算 :Mapper reduce 是批量处理 无法实时处理(离计算)
不适合流式计算:只能处理静态数据
不适合DAG(有向图)计算
key和value类型
1、必须可序列化(serializable)
作用:网络传输以及持久化存储
IntWritable、LongWriteable、FloatWritable、Text、DoubleWritable,BooleanWritable、NullWritable等
2、都继承了Writable接口
并实现write()和readFields()方法
3、Keys必须实现WritableComparable接口
Reduce阶段需要sort
keys需要可比较
Mapreduce编程模型
input 输入
map 接收键和值
combine 进行优化
partitioner 进行分区
reducer 接收输入 转为输出
output 输出
mapreduce 工作过程
1、创建maven工程 添加pom文件依赖关系
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>Mapreduce</groupId> <artifactId>cn.kgc</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <hadoop.version>2.6.0</hadoop.version> </properties> <dependencies> <!--MapReduce--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>compile</scope> </dependency> </dependencies> </project>
2、新建class类编写wordMapper
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
//继承Mapper,重写map方法 //ps:所有导入的包都是hadoop的 //四个参数分别是 KEYIN 输入的key, // VALUEIN输入的value, // KEYOUT输出的key 类型, // VALUEOUT输出的value类型 public class WCMapper extends Mapper<LongWritable, Text, Text,IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(1);
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将文本转换成String String line = value.toString();
//将字符串切割 String[] words = line.split("\\s+");
//将每一个单词循环写出去 for (String word : words) { k.set(word); context.write(k,v); } } }
3、 新建class类编写wordReducer类
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException; /** * 继承Reducer 重写 reduce 方法 * KEYIN : reduce端输入的key类型,即map端输出的key类型 * VALUEIN : reduce端输入的value类型,即map端输出的value类型 * KEYOUT : reduce端输出的key类型 * VALUEOUT : reducer输出的value类型 */public class WCReducer extends Reducer<Text, IntWritable,Text,IntWritable> { //创建输出的v对象 IntWritable v = new IntWritable(); int sum = 0;
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce 端接受到的类型大概是这样的 (wish,(1,1,1,1,1,1)) //对迭代器进行累加求和 for (IntWritable count : values) { sum += count.get(); } //将key value 进行写出 v.set(sum); context.write(key,v); } }
3、 编写驱动类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WCDirver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //创建配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "wordcount"); job.getConfiguration().setStrings("mapreduce.reduce.shuffle.memory.limit.percent", "0.15"); //设置jar的位置 job.setJarByClass(WCDirver.class); //设置map和reduce的位置 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class);
//设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
//设置key输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
//设置输入输出的路径 // 新建input 文件夹,新建文本文件 里面随便写点单词 // 输入路径指向要统计的文件的路径 FileInputFormat.setInputPaths(job,new Path("file:///D:\\Idea\\Mapreduce\\input")); //FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path("file:///D:\\Idea\\Mapreduce\\output")); // FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交程序运行 boolean result = job.waitForCompletion(true); System.exit(result? 0 : 1); } }
4、运行驱动类
5、我踩过的l坑
1)版本不匹配的问题
此目录下的hadoop.dll 与解压到本地的 hadoop/lib文件夹下的hadoop.sh不是同一个文件
2)本地用户 用户名中包含空格则会抛出两个异常
shuffle error in local#1....
Caused by: java.io.FileNotFoundException: D:/tmp/hadoop....
经过多番周折才获知Hadoop运行过程中的文件路径中不能出现空格键。根据电脑中的当前用户名产生的,所以我新建了一个不带空格的本地用户,问题成功解决(千万不要修改本地用户文件夹的名称,试过的我离当场去世只差一步,新建用户是最好的办法)。
3)若是代码相同,而代码飘红则是jar包导错了,检查导入包有无问题;
承上
原文件如下
直接新建flowCount包
1、新建class类,于map和reduce/输入/输出都为key-value键值对形式,所以必须将手机的流量信息封装成一个Bean类将这个类作为value
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; //序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //反序列化 //顺序不能乱 怎么读 怎么取 ps:序列化和反序列化字段的顺序要保持一致 @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } public FlowBean(){} public FlowBean(long upFlow, long downFlow, long sumFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } @Override public String toString() { return "Flowcount{" + "upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + '}'; } //自己创建一个set方法用于mapper 和reduce设置 public void set(long upFlow,long downFlow){ this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } }2、FlowMapper 继承Mapper 重写map方法
public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] files = line.split("\\s+"); String phoneNumber = files[1]; //取出上行流量和下行流量 long upFlow = Long.parseLong(files[files.length - 3]); long downFlow = Long.parseLong(files[files.length - 2]); k.set(phoneNumber); v.set(upFlow,downFlow); context.write(k,v); } }3、FlowReducer 继承 Reducer 重写 reduce 方法
public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> { FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { //reduce的输入大概是这样的 (“13560439658”,(FlowBean(198,4938),FlowBean(1116,954))) //创建两个初始值,用于累加操作 long sum_upFlow = 0; long sun_downFlow = 0; //执行累加操作 for (FlowBean flowBean : values) { sum_upFlow += flowBean.getUpFlow(); sun_downFlow += flowBean.getDownFlow(); } v.set(sum_upFlow,sun_downFlow); context .write(key,v); } }4、 驱动类,与上一个案例基本类似
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //创建配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "flowCount"); //设置jar的位置 job.setJarByClass(FlowDriver.class); //设置map和reduce的位置 job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //设置key输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置输入输出的路径 FileInputFormat.setInputPaths(job,new Path("file:///D:\\Idea\\Mapreduce\\input")); //FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path("file:///D:\\Idea\\Mapreduce\\output\\out2")); //FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交程序运行 boolean result = job.waitForCompletion(true); System.exit(result? 0 : 1); } }5、运行成功