大数据集群之Hadoop Map Reduce 编程

    技术2022-07-11  116

    环境配置为:

    192.168.36.100 node1 NameNode 192.168.36.101 node2 NameNode , DataNode 192.168.36.102 node3 DataNode 192.168.36.103 node4 DataNode

    详细信息请参考:大数据集群之Hadoop集群(HA)

    任务说明与准备

    数据结构

    山西省,3,朔州市,朔城区,2013,LZW6450PF,上汽通用五菱汽车股份有限公司,五菱,小型普通客车,个人,非营运,1,L3C,8424,79,汽油,4490,1615,1900,2,3050,1386,175/70R14LT,4,2110,1275,7,上汽通用五菱汽车股份有限公司,客车,1913,男性 …

    这是一行数据,每个字段之间用’,'分割。

    任务:统计乘用车和商用车的比例

    完整数据已经上传到Github:点击跳转

    使用 xftp 或其他工具将本机数据上传至Linux系统上。

    将数据上传至hdfs上:

    hdfs dfs -mkdir -p /carSaiesAnalysis/input # 在hdfs上创建一个输入目录,输入数据放在这里 hdfs dfs -mkdir -p /carSaiesAnalysis/output # 在hdfs上创建输入目录,运算结果存放在此目录 hdfs dfs -put cars.txt /carSaiesAnalysis/input # 将文件上传至hdfs 文件系统中

    编码

    程序入口
    package com.yc.hadoop.project.carSaiesAnalysis.test1; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //hadoop jar car.jar com.yc.hadoop.project.carSaiesAnalysis.test1.App /carSaiesAnalysis/input/cars.txt /carSaiesAnalysis/output/ public class App { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if(null == args || args.length != 2){ System.out.println("<Usage> Please <input> <output>"); System.exit(1); } Configuration conf = new Configuration(); //从Hadoop hdfs文件系统读取数据 //hdfs为协议 //node1 是NameNode主机节点ip映射(此处为 192.168.36.100) //8020为hdfs连接端口 //注意是处于active状态的NameNode conf.set("fs.defaultFS", "hdfs://node1:8020"); Path input=new Path(args[0]); Path output=new Path(args[1]); FileSystem fs = FileSystem.get(conf); // 判断输入目录是否存在,若存在则删除,否则会报错 if(fs.exists(output)){ fs.delete(output, true); } // 创建任务,并指定任务入口。 Job job = Job.getInstance(conf, "统计车的销量比重"); job.setJarByClass(App.class); // 设置待读取数据路径 FileInputFormat.addInputPath(job, input ); // 设置Map阶段执行类 job.setMapperClass(CountCarMap.class); // 设置Combiner阶段执行,可不写。 // Combiner的作用是将Map的计算结果进行一次合并,减少Redece阶段的网络传输数据 job.setCombinerClass(CountCarCombiner.class); // 设置Reducer阶段执行类 job.setReducerClass(CountCarReducer.class); // 设置Map阶段输出数据的键值的数据类型,如果为设置则与输出结果类型一致 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 设置输出结果的键值的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 设置计算结果存放的目录 FileOutputFormat.setOutputPath(job, output); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
    Map
    package com.yc.hadoop.project.carSaiesAnalysis.test1; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //Object, Text表示 Mapper的输入类型 //Text, IntWritable 输出类型 public class CountCarMap extends Mapper<Object, Text, Text, LongWritable> { // key默认为当前读取的行数 // value 为当前行的数据 // context 为当前上下文 protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{ // 截取出需要的字段 String[] strs = value.toString().trim().split(","); if (strs!=null && strs.length > 10 && strs[10] != null){ String type = strs[10]; if ("非营运".equals(type)){ context.write( new Text("乘用车辆"),new LongWritable(1)); }else { context.write(new Text("商用车辆"), new LongWritable(1)); } } } }
    Combiner

    在map结束阶段将键相同的值进行合并

    package com.yc.hadoop.project.carSaiesAnalysis.test1; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; // 在map结束阶段将键相同的值进行合并 public class CountCarCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { private LongWritable result = new LongWritable(); public void reduc(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{ Long sum = new Long(0); for (LongWritable val : values){ sum += val.get(); } result.set(sum); context.write(key , result); } }
    Reducer

    将各个map的数据进行整合

    package com.yc.hadoop.project.carSaiesAnalysis.test1; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class CountCarReducer extends Reducer<Text, LongWritable, Text, Text> { Map<String, Long> map = new HashMap<>(); double all = 0; // 将各个map中的数据拉取并进行运算 protected void reduce(Text key,Iterable<LongWritable> values, Context context){ long sum = 0; for (LongWritable val : values) { sum += val.get(); } all += sum; map.put(key.toString(), sum); } // Reducer生命周期函数 在reduce函数执行完之后调用 // 计算各个类型所占的比例,并将结果写出。 @Override protected void cleanup(Reducer<Text, LongWritable, Text, Text>.Context context) throws IOException, InterruptedException { Set <String> keySet = map.keySet(); for(String key:keySet){ long value = map.get(key); double percent = value/all; context.write(new Text(key), new Text(value+"\t"+percent)); } } }

    执行

    将程序打包成jar包,并上传至Linux系统中。

    执行:

    hadoop jar car.jar com.yc.hadoop.project.carSaiesAnalysis.test1.App /carSaiesAnalysis/input/cars.txt /carSaiesAnalysis/output/ hadoop jar 为Hadoop执行jar程序的指令car.jar 是我将上述程序打包后的jar包。com.yc.hadoop.project.carSaiesAnalysis.test1.App 为main函数所在的类名,类名为全称,即包含包名/carSaiesAnalysis/input/cars.txt 为待读取数据的路径(hdfs系统中的路径)/carSaiesAnalysis/output/ 结果输出的路径(hdfs系统中的路径)

    成功执行后会打印出任务清单 查看计算结果

    hdfs dfs -cat /carSaiesAnalysis/output/part-r-00000

    可以在Hadoop web 端查看结果

    其他几个类似的案例也已上传 Github

    Processed: 0.012, SQL: 9