Storm 初学总结(一)

    技术2022-07-13  75

    Storm 初学总结

    参考:《Storm应用实践》 —— 肖恩 T.艾伦 (Sean T. Allen) / 马修·扬科夫斯基 (Matthew Jankowski)

    Storm简介

    大数据处理工具

    数据处理工具大致分为两个主要层级:批(batch)处理和流(stream)处理。最近又新增了一种介于两者之间的衍生层:基于流的微型批处理(micro-batch)层。 二者区别:

    流式处理批处理处理对象数据(消息)流批量数据数据长度无限有限数据时效实时历史触发方式实时历史运行特征常住服务定时任务延迟低高代表stormHadoop

    数据流向: 图1.1 数据流向和批处理器 图1.2 数据流向和流处理

    什么是storm

    Storm是一个分布式实时计算框架,适用于处理无边界的流数据,被业界称为实时版Hadoop。将Storm与你当前使用的队列和持久化技术相结合,就能实现多种处理和转换流数据的方式。

    应用场景

    推荐系统(实时推荐,根据下单或加入购物车推荐相关商品)、金融系统、预警系统、网站统计(实时销量、流量统计,如淘宝双11效果图)、交通路况实时系统等等。

    storm的一些特性

    1.适用场景广泛: storm可以实时处理消息和更新DB,对一个数据量进行持续的查询并返回客户端(持续计算),对一个耗资源的查询作实时并行化的处理(分布式方法调用,即DRPC),storm的这些基础API可以满足大量的场景。 2. 可伸缩性高: Storm的可伸缩性可以让storm每秒可以处理的消息量达到很高。扩展一个实时计算任务,你所需要做的就是加机器并且提高这个计算任务的并行度 。Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易的扩展。 3. 保证无数据丢失: 它可以确保每个输入的数据至少会被处理一次。 4. 异常健壮: 也可以称之为高容错性。Storm中有四个主要的组件,在大部分时间里,摧毁任何一个组件都不会中断数据的处理。。 5. 语言无关性: 它与使用的编程语言无关,如果你的程序能在JVM上执行,它就可以在Storm上轻松执行。

    storm 基础概念

    Nimbus:即Storm的Master,负责资源分配和任务调度。一个Storm集群只有一个Nimbus。

    Supervisor:即Storm的Slave,负责接收Nimbus分配的任务,管理所有Worker,一个Supervisor节点中包含多个Worker进程。

    Worker:工作进程,每个工作进程中都有多个Task。

    Executor:Worker进程在执行任务时,会启动多个Executor线程

    Task:任务,在 Storm 集群中每个 Spout 和 Bolt 都由若干个任务(tasks)来执行。每个任务都与一个执行线程相对应。

    Stream:数据流(Streams)是 Storm 中最核心的抽象概念。一个数据流指的是在分布式环境中并行创建、处理的一组元组(tuple)的无界序列。数据流可以由一种能够表述数据流中元组的域(fields)的模式来定义。

    Spout:数据源(Spout)是拓扑中数据流的来源。一般 Spout 会从一个外部的数据源读取元组然后将他们发送到拓扑中。根据需求的不同,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源。一个可靠的 Spout能够在它发送的元组处理失败时重新发送该元组,以确保所有的元组都能得到正确的处理;相对应的,不可靠的 Spout 就不会在元组发送之后对元组进行任何其他的处理。一个 Spout可以发送多个数据流。

    Bolt:拓扑中所有的数据处理均是由 Bolt 完成的。通过数据过滤(filtering)、函数处理(functions)、聚合(aggregations)、联结(joins)、数据库交互等功能,Bolt 几乎能够完成任何一种数据处理需求。一个 Bolt 可以实现简单的数据流转换,而更复杂的数据流变换通常需要使用多个 Bolt 并通过多个步骤完成。

    Stream grouping:为拓扑中的每个 Bolt 的确定输入数据流是定义一个拓扑的重要环节。数据流分组定义了在 Bolt 的不同任务(tasks)中划分数据流的方式。在 Storm 中有八种内置的数据流分组方式。

    Reliability:可靠性。Storm 可以通过拓扑来确保每个发送的元组都能得到正确处理。通过跟踪由 Spout 发出的每个元组构成的元组树可以确定元组是否已经完成处理。每个拓扑都有一个“消息延时”参数,如果 Storm 在延时时间内没有检测到元组是否处理完成,就会将该元组标记为处理失败,并会在稍后重新发送该元组。

    tuple:元组是拓扑中结点之间传输数据的形式,它本身是一个有序的数值序列,其中每个数值都会被赋予一个命名。一个结点可以创建元组,然后发送(可选)至任意其他结点,这个发送元组到任意结点的过程,称作发射(emit)一个元组。

    Topology:计算拓扑,Storm 的拓扑是对实时计算应用逻辑的封装,它的作用与 MapReduce 的任务(Job)很相似,区别在于 MapReduce 的一个 Job 在得到结果之后总会结束,而拓扑会一直在集群中运行,直到你手动去终止它。拓扑还可以理解成由一系列通过数据流(Stream Grouping)相互关联的 Spout 和 Bolt 组成的的拓扑结构,注意不能成环结构,否则会死循环。如下图:

    storm基本功能代码实现

    soupt

    package com.stort.start; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; /** * @Author: Cnshao * @Date: 2020/6/22 20:20 */ public class TestSpout extends BaseRichSpout { private static final long serialVersionUID = 225243592780939490L; private SpoutOutputCollector collector; private static final String field="word"; private int count=0; private String[] message = { "aaa", "bbb", "ccc", "aaa bbb", "bbb ccc eee" }; /** * open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用 * @param map Storm配置 * @param topologyContext topology中组件的信息 * @param spoutOutputCollector 发射tuple的方法 */ @Override public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { System.out.println("open:"+map.get("test")); this.collector = spoutOutputCollector; } /** * nextTuple()方法是Spout实现的核心。 * 也就是主要执行方法,用于输出信息,通过collector.emit方法发射。 */ @Override public void nextTuple() { if(count < message.length){ System.out.println("第"+(count+1) +"次开始发送数据..."); this.collector.emit(new Values(message[count],count),"maoding_1"); } count++; } /** * declareOutputFields是在IComponent接口中定义,用于声明数据格式。 * 即输出的一个Tuple中,包含几个字段。 * @param outputFieldsDeclarer */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { System.out.println("定义格式..."); outputFieldsDeclarer.declare(new Fields(field,"count")); } /** * 当一个Tuple处理成功时,会调用这个方法 */ @Override public void ack(Object obj) { System.out.println("ack:"+obj); } /** * 当Topology停止时,会调用这个方法 */ @Override public void close() { System.out.println("关闭..."); } /** * 当一个Tuple处理失败时,会调用这个方法 */ @Override public void fail(Object obj) { System.out.println("失败并重试:"+obj); ConstantClass.failNum++; } }

    Blot

    package com.stort.start; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; import java.util.Random; /** * 分割字符 * @Author: Cnshao * @Date: 2020/6/22 20:29 */ public class TestBolt extends BaseRichBolt { private static final long serialVersionUID = 4743224635827696343L; private OutputCollector collector; static int num=1; /** * 在Bolt启动前执行,提供Bolt启动环境配置的入口 * * 一般对于不可序列化的对象进行实例化。 * * 注:如果是可以序列化的对象,那么最好是使用构造函数。 * @param map * @param topologyContext * @param outputCollector */ @Override public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) { System.out.println("prepare:"+map.get("test")); this.collector=outputCollector; // Random random = new Random(); // System.err.println(random.nextInt(100)); } /** * execute()方法是Bolt实现的核心。 * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。 * @param tuple */ @Override public void execute(Tuple tuple) { String msg = tuple.getStringByField("word"); System.out.println("开始分割单词:" + msg); String[] words = msg.toLowerCase().split(" "); for (String word : words) { this.collector.emit(new Values(word,1));//向下一个bolt发射数据 Random random = new Random(); if(num%2==0){ this.collector.emit("streamId_1",tuple,new Values(word+"streamId_1",1));//向下一个bolt发射数据 } } num++; //test fail // Random random = new Random(); // int i = random.nextInt(10); // ConstantClass.list.add(i); // if(i<5){ // ConstantClass.count++; // collector.ack(tuple); // }else{ // collector.fail(tuple); // } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("count","num")); outputFieldsDeclarer.declareStream("streamId_1",new Fields("count","num")); } /** * cleanup是IBolt接口中定义,用于释放bolt占用的资源。 * Storm在终止一个bolt之前会调用这个方法。 */ @Override public void cleanup() { System.out.println("TestBolt的资源释放"); } }

    Topology

    package com.stort.start; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; /** * @Author: Cnshao * @Date: 2020/6/22 20:19 */ public class starter { private static final String test_spout="test_spout"; private static final String test_bolt="test_bolt"; private static final String test_bolt_2="test_bolt_2"; private static final String test_bolt_3="test_bolt_3"; public static void main(String[] args) { //定义一个拓扑 TopologyBuilder topologyBuilder = new TopologyBuilder(); //设置一个Executeor(线程),默认一个 topologyBuilder.setSpout(test_spout, new TestSpout(),1); //设置一个Executeor(线程),和一个task ,shuffleGrouping:表示是随机分组 topologyBuilder.setBolt(test_bolt, new TestBolt(),1) .setNumTasks(1).shuffleGrouping(test_spout); //设置一个Executeor(线程),和一个task,fieldsGrouping:表示是按字段分组 topologyBuilder.setBolt(test_bolt_2, new TestBolt2(),1) .setNumTasks(1).fieldsGrouping(test_bolt,new Fields("count")); //设置一个Executeor(线程),和一个task,fieldsGrouping:表示是按字段分组,steamid可以只获取id为steamid的流数据 topologyBuilder.setBolt(test_bolt_3, new TestBolt3(),1) .setNumTasks(2).fieldsGrouping(test_bolt,"streamId_1",new Fields("num")); Config config = new Config(); config.put("test", "test"); //运行拓扑 try { if (args != null && args.length > 0) { //有参数时,表示向集群提交作业,并把第一个参数当做topology名称 System.out.println("运行远程模式"); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else { //没有参数时,本地提交 //启动本地模式 System.out.println("运行本地模式"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Word-counts", config, topologyBuilder.createTopology()); Thread.sleep(6000); //关闭本地集群 cluster.shutdown(); System.err.println(ConstantClass.count); System.err.println(ConstantClass.list); System.err.println(ConstantClass.failNum); } }catch (Exception e) { e.printStackTrace(); } } }

    storm如何保证可靠的消息处理

    spout发射出的一个元组,下游的bolt在接收并完成处理后,可能会发射出更多种类的元组,这其实就创建了一棵元组树(tupletree),其中spout发射的元组称为根(root)元组。Storm为每个由spout发射出的元组创建一棵元组树,并持续跟踪这棵树的处理情况。当一棵元组树的所有叶结点都标记为已处理,那么Storm才会认为由spout发出的这个元组已经完整处理过了。为了确保Storm能创建并跟踪元组树的状态,你需要借助Storm的API完成两件事情:

    当bolt要发射新的元组时,你需要确保其输入元组已被锚定。如果bolt自己能说话,那它此时会对大家说:“我准备好了,我将立即发射一个新元组,并且包含初始化的输入元组,你们可以与其建立连接了”。确保你的bolt在完成对输入元组的处理之后会通知Storm,这个动作叫做应答(acking),如果用bolt说话的方式语言来表达,那就是“Storm,我已经完成了对该元组的处理,可以放心在元组树中将其标记为处理完成了”。紧接着,Storm就会开始按需求创建和跟踪元组树。

    bolt中的锚定、应答和容错

    在需要由程序来判断元组批次是否完成时(例如当执行聚合时),或者在运行中决定是否需要连接两个或更多数据流时,那么系统需要具备判断锚定、应答和容错的能力。在这些案例中,你就需要使用BaseRichBolt来代替BaseBasicBolt作为基类,如下面的列表所示,展现了继承于BaseRichBolt的一个bolt内部能实现的功能。

    锚定——为了提供显式的锚定能力,我们需要在bolt的execute()方法中将输入的元组传到outputCollector上的emit()方法里:其中outputCollector.emit(newValues(order))将变成outputCollector.emit(tuple,new Values(order))。应答——为了提供显式的应答能力,我们需要在bolt的execute()方法中调用outputCollector的ack方法:outputCollector.ack(tuple)。容错——这是在bolt中的execute()的方法内调用outputCollector的fail方法:throw new FailedException()将变成outputCollector.fail(tuple)。 代码: /** * execute()方法是Bolt实现的核心。 * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。 * @param tuple */ @Override public void execute(Tuple tuple) { String msg=tuple.getStringByField("count"); Object msg2=tuple.getIntegerByField("num"); System.out.println("第"+count+"次统计单词出现的次数"); /** * 如果不包含该单词,说明在该map是第一次出现 * 否则进行加1 */ if (!counts.containsKey(msg)) { counts.put(msg, 1); } else { counts.put(msg, counts.get(msg)+1); } count++; //test fail Random random = new Random(); int i = random.nextInt(10); ConstantClass.list.add(i); if(i<5){ ConstantClass.count++; collector.ack(tuple); }else{ collector.fail(tuple); } }

    spout 容错职责

    storm需要实现ack方法和fail方法,代码:

    /** * 当一个Tuple处理成功时,会调用这个方法 */ @Override public void ack(Object obj) { System.out.println("ack:"+obj); } /** * 当一个Tuple处理失败时,会调用这个方法 */ @Override public void fail(Object obj) { System.out.println("失败并重试:"+obj); //可以再次调用collector.emit方法,但是需要有可以支持回放的可靠数据源 }

    Storm在基础设施中提供了大量工具,用于确保spout发出的元组可以实现完整处理。但是为了保障消息能被有效地处理,你必须使用一个可以支持元组回放的可靠数据源。另外,实现的spout还需要具备能对数据源提供的数据执行回放的能力。如果你希望在拓扑中为消息处理提供保障机制,那了解以上这几点至关重要。

    storm架构

    任务提交处理流程 Nimbus是调度中心,Supervisor是任务执行的地方。Supervisor上面有若干个Worker,每个Worker都有自己的端口,Worker可以理解为一个JVM进程。另外,每个Worker中还可以运行若干个线程(executer)。

    当客户端向storm集群提交一个Topology时,这里的提交就是在集群上通过命令storm jar xxx启动topology。如果我们是在Supervisor节点上执行storm jar xxx,那么Supervisor会将jar包拷贝到Nimbus,之后Nimbus对Topology进行调度。

    Nimbus会根据Topology所需要的Worker进行分配,将其分配到各个Supervisor的节点上执行。

    现在假设我们我们有4个Supervisor节点,每个Supervisor都配置4个Worker。这是我们提交了一个Topology,需要4个Worker,那可能的分配情况可能如下图所示:

    Storm的并发度

    在Storm中,Worker不是组件执行的最小单位。Executor才是,Executor可以理解为是一个线程。我们在创建topology的时候,可以设置执行spout的线程数和bolt的线程数。

    假设spout和bolt的线程数加起来设置了8个,然后设置了2个worker,那么这8个线程可能就会随机分配到2个worker中,可能一个worker3个,一个worker5个。也有可能各自分配4个。如下图所示:

    代码地址

    今天先写这么多…

    Processed: 0.033, SQL: 9