Storm实现单词统计的流程:
步骤:
1.创建普通java工程
2.将Storm依赖包导入
3.编写各组件代码
WordCountSpout代码:
public class WordCountSpout extends BaseRichSpout{ private String[] data = new String[]{ "hello Storm", "hello world", "hello hadoop", "hello world" }; private SpoutOutputCollector collector; private static int i = 0; @Override public void nextTuple() { Values line = new Values(data[i]); collector.emit(line); //防止越界 if(i == data.length-1){ i = 0; }else{ i++; } } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } //确认Tuple成功的方法 @Override public void ack(Object msgId) { super.ack(msgId); } //处理tuple传输失败的方法 //父类中的fail方法,会根据tuple-id,重新发动tuple @Override public void fail(Object msgId) { super.fail(msgId); } }SplitBolt代码:
public class SplitBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void execute(Tuple input) { try { //获取行数据 String line = input.getStringByField("line"); String[] words = line.split(" "); for(String word : words){ collector.emit(input,new Values(word)); } //通过ack确认机制来确保数据传输的可靠性 //若对可靠性要求不高,也可以不写 collector.ack(input); } catch (Exception e) { // 通知tuple接收失败 collector.fail(input); } } @Override public void prepare(Map StormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }WordCountBolt代码:
public class WordCountBolt extends BaseRichBolt{ private OutputCollector collector; private Map<String,Integer> map = new HashMap<>(); @Override public void execute(Tuple input) { String word = input.getStringByField("word"); //单词频率统计 if(map.containsKey(word)){ map.put(word, map.get(word)+1); }else{ map.put(word, 1); } collector.emit(new Values(word,map.get(word))); collector.ack(input); } @Override public void prepare(Map StormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","count")); } }ReportBolt代码:
public class ReportBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void execute(Tuple input) { String word = input.getStringByField("word"); int count = input.getIntegerByField("count"); //打印输出 System.out.println("单词:"+ word +"的当前的频率为" + count); collector.ack(input); } @Override public void prepare(Map StormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }WordCountTopology代码:
public class WordCountTopology { public static void main(String[] args) { Config config = new Config(); TopologyBuilder builder = new TopologyBuilder(); WordCountSpout spout = new WordCountSpout(); SplitBolt splitBolt = new SplitBolt(); WordCountBolt wordCountBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); // setSpout(id,bean,线程并发度) // setNumTasks()如果不设定,默认就是一个线程处理一个task,即task的并发度=线程并发度 builder.setSpout("WordCount_Spout", spout, 2); builder.setBolt("Split_Bolt", splitBolt).shuffleGrouping("WordCount_Spout"); // fieldsGrouping的作用是保证 让指定field字段的相同的 value值,发往同一个Bolt // 底层的实现方式是:Value.hashCode % NumBolt builder.setBolt("WordCount_Bolt", wordCountBolt).fieldsGrouping("Split_Bolt", new Fields("word")); // globalGrouping相当于是一个数据流汇总 builder.setBolt("Report_Bolt", reportBolt).globalGrouping("WordCount_Bolt"); StormTopology topology = builder.createTopology(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordcount_topology", config, topology); // 3s后停止Topology try { Thread.sleep(3000); cluster.killTopology("wordcount_topology"); cluster.shutdown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }结果如下图: