1.什么是Apache Storm? Apache Storm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然Storm是无状态的,它通过Apache ZooKeeper管理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操作。 2.Apache Storm核心概念 Apache Storm从一端读取实时数据的原始流,并将其传递通过一系列小处理单元,并在另一端输出处理/有用的信息。 现在让我们仔细看看Apache Storm的组件 1.Tuple------Tuple是Storm中的主要数据结构。它是有序元素的列表。默认情况下,Tuple支持所有数据类型。通常,它被建模为一组逗号分隔的值,并传递到Storm集群。 2.Stream----流是元组的无序序列。 3.Spouts----流的源。通常,Storm从原始数据源(如Twitter Streaming API,Apache Kafka队列,Kestrel队列等)接受输入数据。否则,您可以编写spouts以从数据源读取数据。“ISpout”是实现spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。 4.Bolts —Bolts是逻辑处理单元。Spouts将数据传递到Bolts和Bolts过程,并产生新的输出流。Bolts可以执行过滤,聚合,加入,与数据源和数据库交互的操作。Bolts接收数据并发射到一个或多个Bolts。 “IBolt”是实现Bolts的核心接口。一些常见的接口是IRichBolt,IBasicBolt等。
3.Storm 入门word-count实例 本例的实现逻辑 1.创建一个maven工程 storm-demo ,引入相关依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.1</version> </dependency>2.常见数据源
package com.hzm; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Map; /** * @Author:huangzhimin * @Date:2020/7/1 * @描述:storm的数据源 **/ public class SentenceSpout extends BaseRichSpout { //BaseRichSpout是ISpout接口和IComponent接口的简单实现,接口对用不到的方法提供了默认的实现 private SpoutOutputCollector collector; //模拟数据源 public String [] sentenceArray = { "my name is huangzhimi", "i am a IT worker", "i like this job", "i am working in nanjin" }; //每次只发布一个句话,故设置索引 public int index = 0; /** * sout 初始化是被调用 * @param map 为此spout提供storm配置。 * @param topologyContext 提供有关拓扑中的spout位置,其任务ID,输入和输出信息的完整信息 * @param spoutOutputCollector 使我们能够发出将由bolts处理的元组。 * 在本实例中无需初始化实例,只需保存spoutOutputCollector 实例即可 */ @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; } /** * 通过收集器发出生成的数据 * nextTuple 是spout 输出数据的核心 */ @Override public void nextTuple() { //发射出第一句话 this.collector.emit(new Values(sentenceArray[index])); //发射出下一句话 index ++; //避免出数据越界 if(index >= sentenceArray.length){ index=0; } Utils.sleep(1); } /** *通过该方法告诉storm 集群 spout 抛出那些字段 * @param outputFieldsDeclarer */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } }2.创建第一个bolt 用于拆分语句
package com.hzm; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * @Author:huangzhimin * @Date:2020/7/1 * @描述:流处理逻辑1,拆分数据 **/ public class SplitBolt extends BaseRichBolt { private OutputCollector collector; //BaseRichBolt是IComponent和IBolt接口的实现 /**为bolt提供要执行的环境。执行器将运行此方法来初始化spout。 * prepare()方法类似于ISpout 的open()方法。 * 这个方法在blot初始化时调用,可以用来准备bolt用到的资源,比如数据库连接 * 本例子和SentenceSpout类一样,SplitSentenceBolt类不需要太多额外的初始化 * @param map * @param topologyContext * @param outputCollector */ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } /** * 处理单个元组的输入 * @param tuple */ @Override public void execute(Tuple tuple) { String value = tuple.getStringByField("sentence"); System.out.println("value=======>"+value); String [] wordArray = value.split(" "); for (int i = 0; i < wordArray.length; i++) { System.out.println(wordArray[i]); this.collector.emit(new Values(wordArray[i])); } } /** * 声明元组的输出模式。 * @param outputFieldsDeclarer */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }3.创建第2个bolt 进行统计
package com.hzm; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * @Author:huangzhimin * @Date:2020/7/1 * @描述:流处理逻辑,数据求和 **/ public class CountBolt extends BaseRichBolt { private OutputCollector collector; private Map<String,Integer> wordCoutMap ; /** * 和splitBolt 同理 * @param map * @param topologyContext * @param outputCollector */ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.wordCoutMap = new HashMap<>(); } /** * 处理数据,对数据进行求和 * @param tuple */ @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Integer count = this.wordCoutMap.get(word); if(count == null){ //初始化 count = 0; } count ++; wordCoutMap.put(word,count); this.collector.emit(new Values(word,count)); } /** * 将数据发射出去 * @param outputFieldsDeclarer */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word","count")); } }4.创建最终的处理逻辑
package com.hzm; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.util.*; /** * @Author:huangzhimin * @Date:2020/7/1 * @描述:逻辑处理3,形成统计报告 **/ public class ReportBolt extends BaseRichBolt { private Map<String,Integer> wordCoutMap ; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.wordCoutMap = new HashMap<>(); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Integer count = tuple.getIntegerByField("count"); wordCoutMap.put(word,count); System.out.println("当前数据为====>"+wordCoutMap); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //这是最后一个处理逻辑,不需要在抛出数据了 } /** * cleanup是IBolt接口中定义 * Storm在终止一个bolt之前会调用这个方法 * 本例我们利用cleanup()方法在topology关闭时输出最终的计数结果 * 通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或数据库连接 * 但是当Storm拓扑在一个集群上运行,IBolt.cleanup()方法不能保证执行(这里是开发模式,生产环境不要这样做)。 */ @Override public void cleanup(){ System.out.println("---------- FINAL COUNTS -----------"); //从map中取值 Iterator<String> iter = wordCoutMap.keySet().iterator(); while(iter.hasNext()){ String key=iter.next(); //存放同一类型的元素 Integer num = wordCoutMap.get(key); System.out.println("key:"+key+" num ="+num); } System.out.println("----------------------------"); } }5.形成拓扑结构
package com.hzm; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; /** * @Author:huangzhimin * @Date:2020/7/1 * @描述:用于本地测试流的逻辑 **/ public class Topology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "word-count-topology"; public static void main(String[] args) { //System.out.println( "Hello World!" ); //实例化spout和bolt SentenceSpout spout = new SentenceSpout(); SplitBolt SplitBolt = new SplitBolt(); CountBolt countBolt = new CountBolt(); ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder();//创建了一个TopologyBuilder实例 //TopologyBuilder提供流式风格的API来定义topology组件之间的数据流 //builder.setSpout(SENTENCE_SPOUT_ID, spout);//注册一个sentence spout //设置两个Executeor(线程),默认一个 builder.setSpout(SENTENCE_SPOUT_ID, spout,2); // SentenceSpout --> SplitSentenceBolt //注册一个bolt并订阅sentence发射出的数据流,shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例 //builder.setBolt(SPLIT_BOLT_ID, SplitBolt).shuffleGrouping(SENTENCE_SPOUT_ID); //SplitSentenceBolt单词分割器设置4个Task,2个Executeor(线程) builder.setBolt(SPLIT_BOLT_ID, SplitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID); // SplitSentenceBolt --> WordCountBolt //fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中 //这里fieldsGrouping()方法保证所有“word”字段相同的tuuple会被路由到同一个WordCountBolt实例中 //WordCountBolt单词计数器设置4个Executeor(线程) builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word")); // WordCountBolt --> ReportBolt //globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBolt builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID); Config config = new Config();//Config类是一个HashMap<String,Object>的子类,用来配置topology运行时的行为 //设置worker数量 //config.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); //本地提交 cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); Utils.sleep(10000); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }注:本文演示的为storm本地运行模式,集群模式带后续文章。