废寝忘食整理出来的知识梳理,希望能对大家有所帮助
大数据是需要新处理模式才能具有更强的决策力 、洞察发现力和流程优化能力来适应海量、高增长率和多样化的信息资产
4V Volume 数据量 Velocity 时效 Variety 多样性 Value 价值
B-KB-MB-GB-TB-PB-EB-ZB…
各种云存储解决方案,百度云、腾讯微云、OneDriver等、现有的硬件资源能够支撑足够大的数据。
大数据在短时间内迅速产生,即要求收集者在短时间内收集并且存储数据,且近期(合理时间内)分析完成数据。
结构化的是数据:SQL 文本
非结构化的数据:视频、音频、图片
地理位置:来自上海、北京…
设备信息:PC 手机 手表 手环
个人喜好:美女 面膜 口红 显卡 数码
社交网络: A可能认识B C ,B 可以认识C
电话号码:100 10086
网络身份证 :设备MAC+IP+地理位置+电话
警察叔叔:只关注有没有违规
大数据开发者:只关注数据本身(不关注无用数据)
AI研究:阿尔法Go (只要围棋棋谱)
所以海量数据中提取有用的数据最为关键,这就是数据分析第一步 数据降噪(数据预处理|数据清洗)
根据用户喜好 达到帮用户想的这个目的
大数据实时流处理 根据用户行为模型的支撑 判断操作是否正常
根据当代的数据信息,分析出来往年甚至古代的气候异常,预测未来的气象信息
无人汽车:百度 Google 特斯拉
语音助手:小爱 Siri
为了解决存储问题和分析问题,实际上一台机器是不能够完成这个任务,所以解决方案是使用多台机器所组成的集群上运行,进行存储和计算。
硬件资源有了,软件上怎么实现?
Hadoop: 适合大数据的分布式存储和计算平台 Hadoop不是指具体一个框架或者组件,它是Apache软件基金会下用Java语言开发的一个开源分布式计算平台。实现在大量计算机组成的集群中对海量数据进行分布式计算。适合大数据的分布式存储和计算平台。 Hadoop1.x中包括两个核心组件:MapReduce和Hadoop Distributed File System(HDFS) 其中HDFS负责将海量数据进行分布式存储,而MapReduce负责提供对数据的计算结果的汇总
HDFS:Hadoop Distribute File Sysytem
Map Reduce:Hadoop 中的分布式计算框架 实现对海量数据并行分析和计算
HDFS:Hadoop Distribute File Sysytem
Map Reduce:Hadoop 中的分布式计算框架 实现对海量数据并行分析和计算
HBase:是一款基于列式存储的NOSql
Hive:是一款sql解释引擎,可以将SQL语句翻译成MR 代码,并在集群中运行
Flume:分布式日志收集系统
Kafka: 消息队列 ,分布式的消息系统
Zookeeper:分布式协调服务 用于 注册中心 配置中心 集群选举 状态监测 分布式锁
Map Reduce:代表基于磁盘的离线静态大数据批处理
Spark : 代表基于内存的离线静态大数据批处理
Strom、Spark Streaming、Flink、Kafka Stram:实时流处理 达到对记录级别数据的毫秒级处理
解决存储问题
SSH 为 [Secure Shell](https://baike.baidu.com/item/Secure Shell) 的缩写,由 IETF 的网络小组(Network Working Group)所制定;SSH 为建立在应用层基础上的安全协议。
基于口令的验证:基于用户名和密码
基于密钥的安全验证:需要依靠密钥,在进行连接之前,需要自己创建一对密钥,并且将公钥放在需要访问的服务器上。
[root@HadoopNode00 .ssh]# ssh-keygen -t rsa # 生成公私玥 Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: 39:e3:a3:d6:5a:19:4f:c5:02:01:e0:00:a9:f0:5c:4a root@HadoopNode00 The key's randomart image is: +--[ RSA 2048]----+ |.o. ....o. | |o Eo. . . | |o+ o. . o | |. + . o | | S . | | . B | | .= . | | .o.. | | .o. | +-----------------+ [root@HadoopNode00 .ssh]# ssh-copy-id HadoopNode00 # 复制Hadoopnode00的公钥 The authenticity of host 'hadoopnode00 (192.168.126.10)' can't be established. RSA key fingerprint is 4d:18:40:3d:24:1a:85:ce:ea:3c:a2:76:85:47:e8:12. Are you sure you want to continue connecting (yes/no)? yes Warning: Permanently added 'hadoopnode00,192.168.126.10' (RSA) to the list of known hosts. root@hadoopnode00's password: Now try logging into the machine, with "ssh 'HadoopNode00'", and check in: .ssh/authorized_keys to make sure we haven't added extra keys that you weren't expecting. [root@HadoopNode00 .ssh]# ssh hadoopnode00 # 免密登陆 hadoopnode00 Last login: Thu Oct 24 19:30:44 2019 from 192.168.126.1 [root@HadoopNode00 ~]# exit; logout Connection to hadoopnode00 closed.HADOOP_HOME环境变量被第三方所依赖, 如hbase、hive、flume、spark 在集成Hadoop的时候,是通过读取HADOOP_HOME环境确定hadoop的位置
在hadoop 安装根目录etc/hadoop/ 下
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://HadoopNode00:9000</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/home/hadoop/hadoop-2.6.0/hadoop-${user.name}</value> </property> </configuration>在hadoop 安装根目录etc/hadoop/ 下
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>如果安装启动成功,可以使用WEB界面看到当前节点一些信息
hostname|IP:50070
在windows下记得配置 域名和IP的映射关系
C:\Windows\System32\drivers\etc\ HOSTS
192.168.126.10 HadoopNode00
core-site.xml
<property> <name>fs.trash.interval</name> <value>1</value> </property>记得重启
[root@HadoopNode00 ~]# hadoop fs -rm -r -f /1.txt # 删除文件 一分钟后删除 19/10/25 00:16:59 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1 minutes, Emptier interval = 0 minutes. Moved: 'hdfs://HadoopNode00:9000/1.txt' to trash at: hdfs://HadoopNode00:9000/user/root/.Trash/Current [root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/191025001700 # 能够在对应文件夹中看到文件 Found 1 items -rw-r--r-- 1 root supergroup 8917 2019-10-25 00:16 /user/root/.Trash/191025001700/1.txt [root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/191025001700 # 一分钟后就无法看到文件了 ls: `/user/root/.Trash/191025001700': No such file or directory此时使用的Hadoop需要与当前安装的版本相对应
关闭权限检查
hdfs-site.xml
<property> <name>dfs.permissions.enabled</name> <value>false</value> <description> If "true", enable permission checking in HDFS. If "false", permission checking is turned off, but all other behavior is unchanged. Switching from one parameter value to the other does not change the mode, owner or group of files or directories. </description> </property>http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes. The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.
namenode:存储系统的元数据(用于描述数据的数据),例如 文件命名空间、block到DataNode的映射,负责管理DataNode
datanode: 用于存储数据的节点 负责相应客户端读写请求 向NameNode 汇报块信息
block:数据块 是对文件拆分的最小单元 表示 一个默认为128MB的切分 尺度,每个数据块副本,默认的副本因子为3,通过dfs.replication进行配置,另外用户还可以通过 dfs.blcoksize 设置块的大小
rack:机架 使用机架对存储节点做物理编排 用于优化存储和计算
# 查看机架配置 [root@HadoopNode00 ~]# hdfs dfsadmin -printTopology Rack: /default-rack 192.168.126.10:50010 (HadoopNode00)在Hadoop 1 .x 版本 BlockSize 为64MB (因工业限制)
硬件限制:廉价PC 机械硬盘速度慢
软件优化:通常认为最佳状态为 : 寻址时间为传输时间的100分之一
答案是不行,如果Block块设置过小,集群中几百万个小文件造成寻址时间的增加,效率低下
如果太大,会造成空间的浪费,会造成存取时间过长,效率还是低,
适合的才是最好的。
FSImage:元数据信息的备份,会被加载到内存中
https://hadoop.apache.org/docs/r2.7.7/hadoop-project-dist/hadoop-hdfs/HdfsImageViewer.html
Edits:Edits文件帮助记录文件增加和更新的操作 提高效率
https://hadoop.apache.org/docs/r2.7.7/hadoop-project-dist/hadoop-hdfs/HdfsEditsViewer.html
NameNode在启动的时候 需要加载edits文件和fsimage文件,所以在第一次启动的时候需要格式化NameNode。
当用户上传文件或者下载文件的时候,会将记录写入edits文件中,这样edits文件和fsimage文件加起来永远是最新的元数据。
当用户一直进行操作,势必会导致edits文件过大,这就导致了集群在下次启动的时候时间过长(会加载两个文件)
为了解决这个问题,可以将edits文件和fsimage文件进行合并。
在这里NameNode自己不能合并元数据,合并元数据的任务交由SecondaryNameNode。将当前NameNode的edits文件和fsimage上传到自己的节点中,利用自身节点的计算资源进行合并,合并完成后会将最新的fsimage上传到namenode节点中,此时namenode加载最新的元数据。
在进行合并的期间,如果出现外部客户端有新的操作请求,会更改edits文件,但是操作edits文件可能会导致文件数据紊乱。如果和解决这个问题?那就是将新的操作记录写入一个叫做edits-inprogress文件中,等到合并完成,edits-inprogress会更名为当前系统的edits文件。
dfs.namenode.checkpoint.period(默认设置为1小时)指定两个连续检查点之间的最大延迟
<property> <name>dfs.namenode.checkpoint.period</name> <value>3600</value> <description>The number of seconds between two periodic checkpoints. </description> </property>dfs.namenode.checkpoint.txns(默认设置为100万)定义了NameNode上的非检查点事务数,即使尚未达到检查点期限,该事务也会强制执行紧急检查点。
<property> <name>dfs.namenode.checkpoint.txns</name> <value>1000000</value> <description>The Secondary NameNode or CheckpointNode will create a checkpoint of the namespace every 'dfs.namenode.checkpoint.txns' transactions, regardless of whether 'dfs.namenode.checkpoint.period' has expired. </description> </property>HDFS在启动的时候会默认开启安全模式,当等待发现绝大多数的Block块可用的时候,会自动退出安全模式。
安全模式是HDFS集群的只读模式,但是显式的将HDFS至于安全模式,使用hdfs dfsadmin -safemode命令即可。
[root@HadoopNode00 ~]# hadoop fs -put 1.txt / [root@HadoopNode00 ~]# hadoop fs -ls / Found 6 items -rw-r--r-- 1 root supergroup 8917 2019-10-25 23:27 /1.txt -rw-r--r-- 1 root supergroup 16946 2019-10-25 18:01 /2.md -rw-r--r-- 1 Administrator supergroup 18279 2019-10-25 18:07 /3.md -rw-r--r-- 1 Administrator supergroup 18279 2019-10-25 18:16 /4.md drwxr-xr-x - Administrator supergroup 0 2019-10-25 18:37 /baizhi drwx------ - root supergroup 0 2019-10-25 00:15 /user [root@HadoopNode00 ~]# hdfs dfsadmin -safemode enter # 开启安全模式 Safe mode is ON [root@HadoopNode00 ~]# hadoop fs -put 1.txt /2.txt put: Cannot create file/2.txt._COPYING_. Name node is in safe mode. [root@HadoopNode00 ~]# hdfs dfsadmin -safemode leave # 关闭安全模式 Safe mode is OFF [root@HadoopNode00 ~]# hadoop fs -put 1.txt /2.txt因为NameNode使用的是单机存储元数据,如果存储过多的小文件,会导致内存紧张
解决小文件存储的问题?
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)“和"Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
MapReduce是一个并行计算框架,将一个任务(Job)拆分成两个阶段一个是Map一个是Reudce,MapReduce充分利用了存储节点的计算资源所在物理主机(CPU/内存/网络/少许的硬盘)进行并行运算。MapReduce在需要在集群中启动Yarn,启动会出现相应的进程,在一个节点启动一个NodeMmanger对当前节点的计算资源的管理和使用,默认情况下NodeManager会将的当前节点的物理主机抽象为8个计算单元,每一个计算单元称之为一个Container,这些NodeManager都必须听从ResourceManager调度。
MapRduce 擅长处理大数据 ,Map的思想就是“分而治之”
Map 负责“分”,即把复杂的任务分解成若干小任务 数据或者计算的规模要比原任务大大缩小是就近计算,即任务会分配到所需节点上进行计算可以并行计算,彼此之间没有干扰和依赖。 Reduce 负责对map阶段的结果进行汇总上述代码是对日志进行简单的处理,在数据量小的时候,是不会出现相关问题的,但是数据量一大,势必会导致运行的时间过长,无法在合理的期间内分析出来数据,提高计算能力有几种方案:(1)给当前的计算机纵向上的硬件升级,显然成本比较大,而且在节点出现故障的时候无法容错(2)使用多台机器组成集群,横向推展,有良好的计算能力,且可以实施对应的容错机制。
NodeManager:管理主机上计算资源,是每台机器的框架代理,向RM 汇报自身的状态信息。
ResourceManager:负责集群计算资源的统筹规划,拥有着集群资源的最终决策权。
ApplicationMaster:计算任务的Master,负责申请资源,协调计算任务。
YARNChild:负责最实际的计算任务(MapTask|ReduceTask)
Container:是计算资源的抽象,代表着一组内存、CPU、网络的占用,无论是ApplicationMaster和YARNChild都需要消耗一个Container。
etc/hadoop/yarn-site.xml
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.hostname</name> <value>HadoopNode00</value> </property>etc/hadoop/mapred-site.xml
<property> <name>mapreduce.framework.name</name> <value>yarn</value> </property>访问: http://hadoopnode00:8088/cluster
http://hostname:8088
log4j
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>log4j.properties
log4j.rootLogger = info,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n 本地进行提交任然不需要修改之前的写代码 但是需要指定文件路径为本地路径 有可能出现将本地文件路径识别成HDFS 上的文件,请在文件前面加上 file:///5b.连接到对应的NM启动一个MRAppMaster
6 . MRAppMaster 在启动的时候会初始化Job
初始化Job 后,会去共享文件系统中回去切片MRAppMaster 向RM 请求计算资源连接到对应的NM ,消耗Container 启动YARNChild获取完整的Job资源计算[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4Bic1x3f-1593835203956)(assets/1572320702758.png)]
在staging路径下创建以JobID 为名字的文件夹
files 其他文件 libjar 依赖的文件 archives 归档文件 jobJar 代码的Jar包 切片文件 conf.xml
通过WC案例编写,我们不难发现,其实是按照一定的规则执行程序的输入和输出,最终将作业提交到Hadoop的集群中来。
Hadoop是将数据切分成若干个输入切片(Input Split),并将每一个Split交给一个MapTask处理;MapTask 不断的从对应的Split中解析出一个一个的key、value,并调用map()函数处理,处理完成后根据Redduce Task个数将结果分解成若干个分片(partition),写到磁盘中。
同时,每个Reduce Task 从每个MapTask的节点上读取属于自己的那个分区(partition)的数据,然后使用基于排序的方法将key相同的数据聚集在一起,调用Reduce函数,并将结果输出到文件中。
通过上面的描述,上面还缺少三个组件:
(1)指定文件格式,将输入数据切分为若干个Split,,且将每个Spalit的数据解析成一个个map()函数要求的key,value对象
(2)确定Map()函数产生的新的keyvalue对交给那个ReduceTask 函数处理
(3)指定输出文件格式,即每个keyvalue对 以何种形式保存到输出文件中。
所以在MR中,这个三个组件分别就是InputFormat,Partitioner,OutPutFormat,他们均需要用户根据自己业务进行配置,但是对于WC来说,使用的默认的即可。
但是最终,Hadoop其实为我们 提供了5个可以进行编程的组件。
InputFormat,Mapper,Partitioner,Reducer,OutPutFormat
另外还有组件叫做Conbiner 这个组件通常用于优化MR程序性能,但是需要根据具体业务场景而定。
InputFormat主要用于描述数据输入的格式,它提供了两个功能:
数据切分:按照某种策略将输入数据切分为若干个Split,以确定MapTask的个数和Spilt为Mapper 提供数据本地SplitSize 32MB 集群默认为1128MB
public List<InputSplit> getSplits(JobContext job) throws IOException { // 不用管 Stopwatch sw = (new Stopwatch()).start(); // 最小大小 为计算splitSize 做准备 long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); // 最大 大小 为计算splitSize 做准备 long maxSize = getMaxSplitSize(job); // 准备存储切片的集合 List<InputSplit> splits = new ArrayList(); // 获取所有文件的集合 List<FileStatus> files = this.listStatus(job); // 迭代 Iterator i$ = files.iterator(); while(true) { while(true) { while(i$.hasNext()) { // 获取文件状态 (到底是hdfs 上的文件还是本地文件) FileStatus file = (FileStatus)i$.next(); // 获取文件路径 Path path = file.getPath(); // 获取文件长度 long length = file.getLen(); // 如果文件不为空 则 继续执行 if (length != 0L) { //准备存储BlockLocation的数组 BlockLocation[] blkLocations; // 判断文件是否为本地文件 if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus)file).getBlockLocations(); } else { //获取到fs 对象 FileSystem fs = path.getFileSystem(job.getConfiguration()); //通过fs 对象获取到文件所有的block的地址(位置) blkLocations = fs.getFileBlockLocations(file, 0L, length); } // isSplitable 方法默认可以切分 if (this.isSplitable(job, path)) { // i获取到当前的blcoksize大小 一般为128MB long blockSize = file.getBlockSize(); /* protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } 上述代码是计算切片策略的大小 */ long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); // 准备剩余 的字节数的对象 long bytesRemaining; // blockIndex int blkIndex; // for 循环 剩余大小等于当前的长度 使用剩余大小除切片策略的大小 如果大于1.1 则向下执行 否则退出循环 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { // 剩余大小除切片策略的大小 大于1.1 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); // 给定文件路径 ,文件开始的位置 文件长度 如果剩余长度除以splitSize 大于1.1 则进入到次循环 splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } // 如果剩余长度除以splitSize 小于1.1 则进入到判断语句 if (bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); // 给定文件路径 ,文件开始的位置 文件长度 如果剩余长度除以splitSize 大于1.1 则进入到次循环 splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, new String[0])); } } job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits; } } }TextInputFormat使用RecordReader中org.apache.hadoop.mapreduce.lib.input下的LineRecordReader。这个类中方法,首先会调用initializ方法获取切片的初始化位置和结束位置,以及使用fs对象打开文件的输入流,mapper 的key、value是通过LineRecordReader.nextKeyValue,在此间期间,key被设置成当前文本的偏移量,value被设置成使用RedaLine的readDefaultLine方法读取到每一行文本:
public boolean nextKeyValue() throws IOException { if (this.key == null) { this.key = new LongWritable(); } this.key.set(this.pos); if (this.value == null) { this.value = new Text(); } int newSize = 0; while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) { if (this.pos == 0L) { newSize = this.skipUtfByteOrderMark(); } else { newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos)); this.pos += (long)newSize; } if (newSize == 0 || newSize < this.maxLineLength) { break; } LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize)); } if (newSize == 0) { this.key = null; this.value = null; return false; } else { return true; } } private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { str.clear(); int txtLength = 0; int newlineLength = 0; boolean prevCharCR = false; long bytesConsumed = 0L; do { int startPosn = this.bufferPosn; if (this.bufferPosn >= this.bufferLength) { startPosn = this.bufferPosn = 0; if (prevCharCR) { ++bytesConsumed; } this.bufferLength = this.fillBuffer(this.in, this.buffer, prevCharCR); if (this.bufferLength <= 0) { break; } } while(this.bufferPosn < this.bufferLength) { if (this.buffer[this.bufferPosn] == 10) { newlineLength = prevCharCR ? 2 : 1; ++this.bufferPosn; break; } if (prevCharCR) { newlineLength = 1; break; } prevCharCR = this.buffer[this.bufferPosn] == 13; ++this.bufferPosn; } int readLength = this.bufferPosn - startPosn; if (prevCharCR && newlineLength == 0) { --readLength; } bytesConsumed += (long)readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(this.buffer, startPosn, appendLength); txtLength += appendLength; } } while(newlineLength == 0 && bytesConsumed < (long)maxBytesToConsume); if (bytesConsumed > 2147483647L) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } else { return (int)bytesConsumed; } }MapTask数量由切片数量决定,ReduceTask的数量可以手动设置,默认为1
FileInputFormat (读取HDFS 文件)
TextInputFormat
key LongWritable 行字节偏移量value Text 当前行文本切片:以文件为单位 按照 splitsize 切分
NLineInputFormat
key LongWritable 行字节偏移量value Text 当前行文本切片:以文件为单位,n行为一个切片 ;默认一行一个切片 可以设置
CombineTextInputFormat
key LongWritable 行字节偏移量value Text 当前行文本切片:按照splitsize 进行切分,一个切片可能对应多个文件
DBInputFormat(读取RDBMS)
TableInputFormat(读取Hbase)(重点)
本地运行模式
需要加上mysql的依赖到pom文件中
jar包提交
需要将 mysql jar 包拷贝至/home/hadoop/hadoop-2.6.0/share/hadoop/yarn/ 中
远程提交
和之前保持一致即可
解决问题:HDFS中小文件存储
解决小文件存储的问题,将多个文件合并成一个SequenceFile(SequenceFile特指hadoop中一种特殊的文件,这种文件里面存在多个文件,是Hadoop用来存储二进制形式的key-value对的文件格式,SequenceFile 中有路径+文件名 为key 文件内容和为Value) package com.rechen.mr.test06; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class OwnInputFormat extends FileInputFormat<Text, BytesWritable> { @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { OwnRecordReader recordReader = new OwnRecordReader(); recordReader.initialize(inputSplit, taskAttemptContext); return recordReader; } } package com.rechen.mr.test06; import com.rechen.mr.test01.WCJob; import com.rechen.mr.test01.WCMapper; import com.rechen.mr.test01.WCReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class OwnJob { public static void main(String[] args) throws Exception { /* * 1 封装Job 对象 * */ // System.setProperty("HADOOP_USER_NAME", "root"); Configuration conf = new Configuration(); /* * 设置跨平台提交 * */ /* conf.set("mapreduce.app-submission.cross-platform", "true"); conf.addResource("conf2/core-site.xml"); conf.addResource("conf2/hdfs-site.xml"); conf.addResource("conf2/mapred-site.xml"); conf.addResource("conf2/yarn-site.xml"); conf.set(MRJobConfig.JAR,"D:\\大数据\\Code\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar"); */ Job job = Job.getInstance(conf, "WC-JOB01"); job.setJarByClass(OwnJob.class); /* * 2 设置数据的写入和写出格式 * */ job.setInputFormatClass(OwnInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); /* * * 3 设置数据读取和写出路径 * */ OwnInputFormat.addInputPath(job,new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\littlewenjian")); //TextInputFormat.setInputPaths(job, new Path("/access.tmp2019-05-19-10-28.log"),new Path("/4.md")); //TextInputFormat.setInputPaths(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\littlewenjian")); //CombineTextInputFormat.setMinInputSplitSize(job,1024000); //CombineTextInputFormat.setInputPaths(job, new Path("D:\\大数据训\\Note\\Day02-Hadoop\\数据文件\\littlewenjian")); //NLineInputFormat.setInputPaths(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\flow.dat")); //NLineInputFormat.setNumLinesPerSplit(job,3); /* * 此时需要注意 指定的文件夹 不能存在 * */ //TextOutputFormat.setOutputPath(job, new Path("/outtes1t1112111311")); //TextOutputFormat.setOutputPath(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\out2")); SequenceFileOutputFormat.setOutputPath(job,new Path("D:\\大数据\\Code\\BigData\\Hadoop_Test\\src\\main\\java\\com\\baizhi\\mr\\test06\\out4")); /* * * 4 设置数据的计算逻辑 * */ job.setMapperClass(OwnMapper.class); job.setReducerClass(OwnReducer.class); /* * 5 设置Mapper 和Reducer 的输出泛型 * */ job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); /* * 6 任务提交 * */ //job.submit(); job.waitForCompletion(true); } } package com.rechen.mr.test06; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class OwnMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> { @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } package com.rechen.mr.test06; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class OwnRecordReader extends RecordReader<Text, BytesWritable> { private FileSplit fileSplit; private Configuration conf; private Text key = new Text(); private BytesWritable value = new BytesWritable(); boolean isProgress = true; public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { /* *初始化 * */ // 获得到文件切片对象 this.fileSplit = (FileSplit) inputSplit; // 获取到 Configuration 对象 conf = taskAttemptContext.getConfiguration(); } public boolean nextKeyValue() throws IOException, InterruptedException { if (isProgress) { // 设置 存储 文件 内容的二进制数组 byte[] bytes = new byte[(int) fileSplit.getLength()]; // 获取文件路径 Path path = fileSplit.getPath(); FileSystem fileSystem = path.getFileSystem(conf); FSDataInputStream fsDataInputStream = fileSystem.open(path); IOUtils.readFully(fsDataInputStream, bytes, 0, bytes.length); /* * 设置 key 的值为文件路径 * */ key.set(path.toString()); /* * 设置 value 的值为文件内容 * */ value.set(bytes, 0, bytes.length); isProgress = false; return true; } return false; } public Text getCurrentKey() throws IOException, InterruptedException { return this.key; } public BytesWritable getCurrentValue() throws IOException, InterruptedException { return this.value; } public float getProgress() throws IOException, InterruptedException { return 0; } public void close() throws IOException { } } package com.rechen.mr.test06; import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class OwnReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { for (BytesWritable value : values) { context.write(key, value); } } }默认的HashPartitiner使用哈希值去计算key的分区
需求:
将流量数据输出到不同的文件中 package com.rechen.mr.test07; public class App { public static void main(String[] args) { String hn = "hn"; String zz = "zz"; String xy = "xy"; String kf = "kf"; String bj = "bj"; System.out.println((hn.hashCode()& 2147483647) % 1); System.out.println((zz.hashCode()& 2147483647) % 1); System.out.println((xy.hashCode()& 2147483647) % 1); System.out.println((kf.hashCode()& 2147483647) % 1); System.out.println((bj.hashCode()& 2147483647) % 1); } } package com.rechen.mr.test07; import com.baizhi.mr.test04.FlowJob; import com.baizhi.mr.test04.FlowMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class AreaJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(AreaJob.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setPartitionerClass(OwnPartatiner.class); TextInputFormat.setInputPaths(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\flow.dat")); //TextInputFormat.setInputPaths(job, new Path("/flow.dat")); TextOutputFormat.setOutputPath(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\out112311111")); //TextOutputFormat.setOutputPath(job, new Path("/out312313")); job.setMapperClass(AreaMapper.class); job.setReducerClass(AreaReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); job.setNumReduceTasks(100); job.waitForCompletion(true); } } package com.rechen.mr.test07; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class AreaMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(" "); String area = line[3]; String phone = line[0]; Long up = Long.valueOf(line[1]); Long down = Long.valueOf(line[2]); Long sum = up + down; context.write(new Text(area), new FlowBean(phone, up, down, sum)); } } package com.rechen.mr.test07; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class AreaReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { for (FlowBean value : values) { context.write(key, value); } } } package com.rechen.mr.test07; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable { private String phone; private Long upFlow; private Long downFlow; private Long sumFlow; public FlowBean() { } public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) { this.phone = phone; this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } public Long getSumFlow() { return sumFlow; } public void setSumFlow(Long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return "FlowBean{" + "phone='" + phone + '\'' + ", baizhi upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + '}' ; } /* * 序列化 编码 * */ public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.phone); dataOutput.writeLong(this.upFlow); dataOutput.writeLong(this.downFlow); dataOutput.writeLong(this.sumFlow); } /* * 反序列化 解码 * */ public void readFields(DataInput dataInput) throws IOException { this.phone = dataInput.readUTF(); this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } } package com.rechen.mr.test07; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import java.util.HashMap; public class OwnPartatiner extends Partitioner<Text, FlowBean> { private static HashMap<String, Integer> areaMap = new HashMap<String, Integer>(); static { areaMap.put("hn", 0); areaMap.put("xy", 1); areaMap.put("kf", 2); areaMap.put("bj", 3); areaMap.put("zz", 4); } public int getPartition(Text key, FlowBean value, int i) { /*if (areaMap.get(key.toString()) != null) { Integer integer = areaMap.get(key.toString()); return integer; } else { return 0; } */ return areaMap.get(key.toString())==null ? 0 :areaMap.get(key.toString()); } }自定义OutputFormat
package com.rechen.mr.test08; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class MyJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyJob.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(OwnOutputFormat.class); TextInputFormat.setInputPaths(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\flow.dat")); //TextInputFormat.setInputPaths(job, new Path("/flow.dat")); OwnOutputFormat.setOutputPath(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\asdjk1ha1")); //TextOutputFormat.setOutputPath(job, new Path("/out312313")); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); } } package com.rechen.mr.test08; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } package com.baizhi.mr.test08; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MyReducer extends Reducer<LongWritable, Text, LongWritable, Text> { @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(key, value); } } } package com.rechen.mr.test08; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class OenRecordWriter extends RecordWriter<LongWritable, Text> { private FSDataOutputStream outputStream; public OenRecordWriter(TaskAttemptContext context) throws Exception { FileSystem fileSystem = FileSystem.get(context.getConfiguration()); outputStream = fileSystem.create(new Path("D:\\大数据\\Code\\BigData\\Hadoop_Test\\src\\main\\java\\com\\baizhi\\mr\\test08\\out1\\1.txt")); } public void write(LongWritable longWritable, Text text) throws IOException, InterruptedException { outputStream.write((text.toString()+"\n").getBytes()); } public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { IOUtils.closeStream(outputStream); } } package com.rechen.mr.test08; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class OwnOutputFormat extends FileOutputFormat<LongWritable, Text> { public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { try { return new OenRecordWriter(taskAttemptContext); } catch (Exception e) { e.printStackTrace(); } return null; } }(1)Combiner是MR程序中Mapper和Reducer之外的一种组件
(2)Combiner 组件的父类就是Reducer
(3)Combiner和Reducer 的区别在于运行位置
Combiner 是在每一个Task所在的节点上运行 Reducer 是在接收全局所有的Mapper的输出结果(4)Combiner的意义在于对与每一个MapTask的输出做局部汇总,以减少网络使用量
(5)Combiner的使用不能影响最终的业务结果
进行累加 可以 求平均值 0 20 10 25 15 的平均数 14 (0,10,20) 10 (25,15)20 (10,20)15 适合累加 不适合求平均数(6)Combiner输出的KV 要与Reducer 的KV相对应
(7)使用
新建CombinerClass 继承Reducer直接使用Redcuer(8)配置
job.setCombinerClass(WCReducer.class);Shuffle过程是MR的核心,描述着MapTask输出和ReduceTask输入的这段过程。在大数据处理过程中,由于使用的是集群的计算模式,并且节点与节点之前采取并行处理的模式,而且也有可能运行着多个Job,此时节点与节点传输数据的效率会影响最终计算的效率。由于MR框架采取磁盘作为溢写文件夹的存储介质,所以也会影响最终计算的效率。
所以,从以上分析,Shuffle的过程基本要求:
完整的从MapTask节点拉取数据到ReduceTask在拉取数据的过程中,尽可能的减少网络资源的消耗尽可能较少磁盘IO的对task执行效率的影响总结:Shuffle是对Map输出结果进行分区,排序,合并等处理并且交由Reduce的过程。分为Map端和Reduce端。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iOpafhXr-1593835203999)(assets/1572491798829.png)]
Map端的Shuffle输出首先是将结果缓存到内存中(环状缓冲区),默认情况下如果达到80mb ,则溢写到磁盘中。在溢写到磁盘之前,要对内存中的数据进行分区和排序,之后再写入磁盘,每次溢写操作会生成新的磁盘文件,随着MapTask的执行,会产生很多小文件,最终当Map端计算结束之后,这些小的文件会被合并成大小的文件,之后通知相应的ReduceTask领取属于自己的数据。
map输入结果写入缓冲区缓冲区达到阈值 溢写到磁盘分区内排序合并成大文件(key,value[])(1)小文件计算优化-CombineTextInputFormat-干预切片计算逻辑
(2)实现Partitiner策略,防止数据倾斜
(3)适当调整YarnChild内存参数,可以参照YARN 参数配置手册
(4)适当调整溢写缓冲区的大小阈值
(5)适当调整合并文件并行度mapreduce.task.io.sort.factor
(6)对Map端输出溢写文件使用GZIP压缩,节省网络带宽
<property> <name>mapreduce.map.output.compress</name> <value>false</value> <description>Should the outputs of the maps be compressed before being sent across the network. Uses SequenceFile compression. </description> </property> <property> <name>mapreduce.map.output.compress.codec</name> <value>org.apache.hadoop.io.compress.DefaultCodec</value> <description>If the map outputs are compressed, how should they be compressed? </description> </property> conf.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class); conf.set("mapreduce.map.output.compress","true");略
学生信息表
gjf 00001 gzy 00002 jzz 00003 zkf 00004学生课程信息表
00001 yuwen 00001 shuxue 00002 yinyue 00002 yuwen 00003 tiyu 00003 shengwu 00004 tiyu 00004 wuli 00001 gjf yuwen shuxue 00002 gzy yinyue yuwen package com.rechen.mr.test11; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class StuJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(StuJob.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.setInputPaths(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\stu")); //TextInputFormat.setInputPaths(job, new Path("/flow.dat")); TextOutputFormat.setOutputPath(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\stu\\out1")); //TextOutputFormat.setOutputPath(job, new Path("/out312313")); job.setMapperClass(StuMapper.class); job.setReducerClass(StuReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); } } package com.rechen.mr.test11; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class StuMapper extends Mapper<LongWritable, Text, Text, Text> { /* * * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String stuNum = ""; FileSplit inputSplit = (FileSplit) context.getInputSplit(); /* * 获取到文件的名字 * */ Path name = inputSplit.getPath(); String[] line = value.toString().split(" "); if (name.toString().contains("student_info_class.txt")) { // 学号 stuNum = line[0]; // 学科名字 String classaName = line[1]; context.write(new Text(stuNum), new Text(classaName + " a")); } else if (name.toString().contains("student_info.txt")) { // 学号 stuNum = line[1]; // 学生名字 String stuName = line[0]; context.write(new Text(stuNum), new Text(stuName + " b")); } } } package com.rechen.mr.test11; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class StuReduce extends Reducer<Text, Text, NullWritable, Text> { /* * * 00001 gjf b * 00001 yuwen a * 00001 shuxue a * * * (0001,[shuxue a,gjf b,yuwen a ]) * * 00001 gjf shuxue yuwen * */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String stuName = ""; String className = ""; for (Text value : values) { String[] s = value.toString().split(" "); if (s[1].equals("a")) { className += s[0] + " "; } if (s[1].equals("b")) { stuName = s[0]; } } context.write(NullWritable.get(), new Text((key.toString() + " " + stuName + " " + className).trim())); } }004 tiyu 00004 wuli
00001 gjf yuwen shuxue 00002 gzy yinyue yuwen
package com.rechen.mr.test11;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class StuJob { public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(StuJob.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.setInputPaths(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\stu")); //TextInputFormat.setInputPaths(job, new Path("/flow.dat")); TextOutputFormat.setOutputPath(job, new Path("D:\\大数据\\Note\\Day02-Hadoop\\数据文件\\stu\\out1")); //TextOutputFormat.setOutputPath(job, new Path("/out312313")); job.setMapperClass(StuMapper.class); job.setReducerClass(StuReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); }}
package com.rechen.mr.test11;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class StuMapper extends Mapper<LongWritable, Text, Text, Text> { /* * * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String stuNum = ""; FileSplit inputSplit = (FileSplit) context.getInputSplit(); /* * 获取到文件的名字 * */ Path name = inputSplit.getPath(); String[] line = value.toString().split(" "); if (name.toString().contains("student_info_class.txt")) { // 学号 stuNum = line[0]; // 学科名字 String classaName = line[1]; context.write(new Text(stuNum), new Text(classaName + " a")); } else if (name.toString().contains("student_info.txt")) { // 学号 stuNum = line[1]; // 学生名字 String stuName = line[0]; context.write(new Text(stuNum), new Text(stuName + " b")); } }}
~~~ package com.rechen.mr.test11; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class StuReduce extends Reducer<Text, Text, NullWritable, Text> { /* * * 00001 gjf b * 00001 yuwen a * 00001 shuxue a * * * (0001,[shuxue a,gjf b,yuwen a ]) * * 00001 gjf shuxue yuwen * */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String stuName = ""; String className = ""; for (Text value : values) { String[] s = value.toString().split(" "); if (s[1].equals("a")) { className += s[0] + " "; } if (s[1].equals("b")) { stuName = s[0]; } } context.write(NullWritable.get(), new Text((key.toString() + " " + stuName + " " + className).trim())); } } 给个三连吧~