使用MapReduce完成员工信息和部门信息的关联查询

    技术2022-07-17  116

    需求说明

    有以下两个分别存储员工信息和部门信息的txt文件: 现在要根据员工表中的部门id,显示员工的部门名称。

    代码实现

    这里使用MapReduce完成 要编写4个类:Mapper,Reducer,Driver,员工表的实体类EMP(实现Writable接口)

    EMP类

    import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class EMP implements Writable { private String name; private String sex; private String age; private String depId; private String depName; private String flag; //序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(name); dataOutput.writeUTF(sex); dataOutput.writeUTF(age); dataOutput.writeUTF(depId); dataOutput.writeUTF(depName); dataOutput.writeUTF(flag); } //反序列化 @Override public void readFields(DataInput dataInput) throws IOException { this.name=dataInput.readUTF(); this.sex=dataInput.readUTF(); this.age=dataInput.readUTF(); this.depId=dataInput.readUTF(); this.depName=dataInput.readUTF(); this.flag=dataInput.readUTF(); } public EMP() { } public EMP(String name, String sex, String age, String depId, String depName, String flag) { this.name = name; this.sex = sex; this.age = age; this.depId = depId; this.depName = depName; this.flag = flag; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } public String getDepId() { return depId; } public void setDepId(String depId) { this.depId = depId; } public String getDepName() { return depName; } public void setDepName(String depName) { this.depName = depName; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public String toString() { return name + ',' + sex + ',' + age + ',' + depId + ',' + depName; } }

    EMPMapper 这里要注意:EMP.txt里有空行,如果不判断空行的情况会报数组越界错误: java.lang.ArrayIndexOutOfBoundsException: 1

    import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class EMPMapper extends Mapper<LongWritable,Text,Text,EMP> { EMP emp=new EMP(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将字段进行切割,返回字段数组 String[] fields = value.toString().split("\\s+"); //这里的字段数组的长度有三种:0,2,4 //长度为0是因为文件中间有空行 //长度为2是部门表 //长度为4是员工表 if (fields.length == 4) {//对于员工表中可以赋值的字段进行赋值 emp.setName(fields[0]); emp.setSex(fields[1]); emp.setAge(fields[2]); emp.setDepId(fields[3]); emp.setDepName(""); emp.setFlag("1"); } else if (fields.length==2){//对于部门表中可以赋值的字段进行赋值 emp.setName(""); emp.setSex(""); emp.setAge(""); emp.setDepId(fields[0]); emp.setDepName(fields[1]); emp.setFlag("0"); }else{//对于空行,把所有的字段都赋为空字符串,这样reducer在处理时就会忽略它 emp.setName(""); emp.setSex(""); emp.setAge(""); emp.setDepId(""); emp.setDepName(""); emp.setFlag(""); } context.write(new Text(emp.getDepId()),emp); } }

    EMPReducer

    import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; public class EMPReducer extends Reducer<Text,EMP,EMP, NullWritable> { @Override protected void reduce(Text key, Iterable<EMP> values, Context context) throws IOException, InterruptedException { //1、准备员工记录集合 ArrayList<EMP> emps=new ArrayList<>(); //准备1个部门bean对象 EMP eBean=new EMP(); //2、把数据放入到集合中,准备合并bean对象 for (EMP bean : values) { if ("1".equals(bean.getFlag())){//原始的员工表 EMP empBean=new EMP(); try { BeanUtils.copyProperties(empBean,bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } emps.add(empBean); } else{//部门表 try { BeanUtils.copyProperties(eBean,bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } //3、遍历表,进行拼接 for (EMP emp : emps) { emp.setDepName(eBean.getDepName()); //4、调写出方法,进行写出 context.write(emp,NullWritable.get()); } } }

    RMPDriver

    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class EMPDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1、创建配置文件 Configuration conf=new Configuration(); Job job=Job.getInstance(conf,"DepNamejoin"); //2、设置jar的位置 job.setJarByClass(EMPDriver.class); //3、设置Map和Reduce的位置、 job.setMapperClass(EMPMapper.class); job.setReducerClass(EMPReducer.class); //4、设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(EMP.class); //5、设置reduce输出的key,value类型 job.setOutputKeyClass(EMP.class); job.setOutputValueClass(NullWritable.class); //设置reduce数量(可不设) //job.setNumReduceTasks(4); //6、设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("data/practice2")); FileOutputFormat.setOutputPath(job, new Path("data/OutputPractice")); //7、提交程序运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); } }

    结果展示:

    运行Driver后生成文件

    Processed: 0.009, SQL: 9