Storm学习笔记(三)——Storm组件详解之Bolt、Topology

    技术2022-07-16  68

    目录

    Bolt消息处理者

    生命周期

    开发Bolt组件

    Topology拓扑

    结构

    运行模式

    示例


    Bolt消息处理者

    Bolt在Storm中是一个被动的角色,它把元组作为输入,然后产生新的元组作为输出。Bolt可以执行过滤、函数操作、合并、写数据库等操作(还可以简单地传递消息流,复杂的消息流往往需要很多步骤,因此需要很多Bolt来处理)。

    生命周期

    首先,客户端创建Bolt,然后将其序列化为拓扑,并提交给集群中的主机。

    之后,集群启动Worker进程,反序列化Bolt,调用prepare方法开始处理元组。

    接下来,Bolt处理Tuple,Bolt处理一个输入Tuple,发射0个或者多个Tuple,然后调用ack通知Storm自己已经处理过这个Tuple了(Storm提供了一个IBasicBolt自动调用ack)。Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。

    在创建Bolt对象时,通过构造方法初始化成员变量,当Bolt被提交到集群时,这些成员变量也会被序列化,所以通过反序列化,可以获取到这些成员变量。

    开发Bolt组件

    开发Bolt组件的一个简单的例子:

    package storm; import java.util.Map; import java.util.stream.Collector; import org.apache.storm.netty.util.internal.SystemPropertyUtil; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; /* * 此消息处理者用于接收Spout发来的Tuple,并将Tuple的值做打印输出 */ public class NumberBolt extends BaseRichBolt{ private OutputCollector collector; /* * Bolt的主要方法是execute,它以一个Tuple作为输入 * Bolt使用OutputCollector来发射Tuple * Bolt必须为它处理的每一个Tuple调用OutputCollector的ack方法 * 以通知Storm该Tuple被处理完成了,从而通知该Tuple的发射者Spout */ @Override public void execute(Tuple input) { int randomNum = input.getIntegerByField("number"); System.out.println(randomNum); } /* * prepare方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。 * OutputCollector是线程安全的,并且随时都可以调用它 * 在Bolt中,Tuple的发送可以再Prepare、execute、cleanup等方法中进行 * 但是一般都是在execute中进行 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } /* * 用于声名当前Bolt发送的Tuple中包含的字段,和Spout中的类似 * Bolt可以发射多条消息流,使用OutputFieldsDeclarer.declareStream方法来定义流 * 之后使用OutputCollector.emit来选择要发射的流 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("moreThan", new Fields("It")); declarer.declareStream("lessThan", new Fields("It")); } }

    Topology拓扑

    Storm的Topology是指类似于网络拓扑图的一种虚拟结构。Storm拓扑类似于MapReduce任务,一个关键的区别是MapReduce任务运行一段时间后最终会完成,而Storm拓扑一直运行(知道杀死它)。

    结构

    一个拓扑是由Spout和Bolt组成的图,Spout和Bolt之间通过刘分组连接起来,见下图。

    Topology是由Spout、Bolt、数据载体Tuple等构成的一定规则的网络拓扑图。Storm提供了TopologyBuilder类来创建Topology。

    运行模式

    Topology运行模式有两种:

    本地模式,分布式模式

    这两种模式的接口区别很大,使用场景不同

    示例

    package storm; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; public class NumberTopology { public static void main(String[] args){ //用于设置Topology相关的环境参数 Config config = new Config(); //Storm的运行有两种模式:本地模式和集群模式 //本地模式:LocalCluster //集群模式:StormSubmitter submitter = new StormSubmitter(); LocalCluster cluster = new LocalCluster(); //实例化Spout、bolt和Topologybuilder TopologyBuilder builder = new TopologyBuilder(); NumberSpout numberSpout = new NumberSpout(); NumberBolt numberBolt = new NumberBolt(); builder.setSpout("number_spout", numberSpout); builder.setBolt("number_bolt", numberBolt).shuffleGrouping("number_spout"); StormTopology topology = builder.createTopology(); cluster.submitTopology("topology", config, topology); } }

    结果输出如下图:

    Processed: 0.010, SQL: 9