许多公司使用诸如Apache Hadoop的分布式文件系统来存储和分析数据。 通过离线Hadoop使用流分析,可以存储大量的大数据并实时分析这些数据。 本文向您展示了如何使用Spark Streaming启用实时关键字检测的示例。
Spark Streaming是Spark API的扩展,可实现实时数据流的可伸缩,容错处理。 Spark Streaming具有大量的适配器,允许应用程序开发人员从各种来源读写数据,包括Hadoop分布式文件系统(HDFS),Kafka,Twitter等。
Spark Streaming应用程序由一个或多个互连的离散化流(DStreams)组成。 每个DStream由一系列弹性分布式数据集(RDD)组成,它们是不可变分布式数据集的抽象。 Spark支持不同的应用程序开发语言,包括Java,Scala和Python。 在本文中,我们将使用Java语言向您展示开发关键字检测应用程序的分步方法。
图1显示了关键字检测应用程序的高级视图。
弹性分布式数据集(RDD)是对象的不可变集合。 每个对象都在群集的各个节点上进行分区和并行执行。
每个Spark Streaming应用程序都以流上下文开始,如以下代码片段所示。 “上下文”要求您传递定义批次的工期参数。
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));socketTextStream绑定到指定的主机接口和给定的端口号,并生成DStreams。
JavaReceiverInputDStream lines = ssc.socketTextStream( hostname, port, StorageLevels.MEMORY_AND_DISK_SER);textFileStream使用textFile从Hadoop读取并行关键字词典文件。 处理文件,并在内部列表中更新关键字。
JavaDStream<String> filelines = ssc.textFileStream("/tmp/Streamtest"); JavaDStream<String> updatedKeyWords = filelines.flatMap(new FlatMapFunction<String,String>() { @Override public Iterable<String> call(String x) { final Pattern SPACE = Pattern.compile(" "); String[] vec=SPACE.split(x); List<String> ls=Arrays.asList(vec); return ls; } }); updatedKeyWords.foreachRDD(new Function<JavaRDD<String>, Void> (){ public Void call(JavaRDD<String> rdd) { rdd.foreach(new VoidFunction<String>(){ @Override public void call(String x){ if(x!=null) keywords.add(x); }}); return null;从SocketStream读取的SocketStream用于与关键字列表进行比较,如以下代码所示。 当您使用命令wordPresent.print();时,结果将显示在控制台上wordPresent.print(); 。
JavaDStream<Boolean> wordPresent = lines.map(new Function<String, Boolean>() { @Override public Boolean call(String x) { return keywords.contains(x); } }); wordPresent.print();下面的清单显示了本文中使用的示例的完整代码清单。
public final class KeywordDetect { private static final Pattern SPACE = Pattern.compile(" "); public static List<String> keywords=new ArrayList<String>(); public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: KeywordDetect <hostname> <port> <words>"); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("KeywordDetect"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5)); JavaDStream<String> filelines = ssc.textFileStream("/tmp/Streamtest"); JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); keywords.add("initial"); //Initialize keyword list JavaDStream<String> updatedKeyWords = filelines.flatMap(new FlatMapFunction<String,String>() { @Override public Iterable<String> call(String x) { final Pattern SPACE = Pattern.compile(" "); String[] vec=SPACE.split(x); List<String> ls=Arrays.asList(vec); return ls; } }); updatedKeyWords.foreachRDD(new Function<JavaRDD<String>, Void> (){ public Void call(JavaRDD<String> rdd) { rdd.foreach(new VoidFunction<String>(){ @Override public void call(String x){ //x=x+1; if(x!=null) keywords.add(x); //add newly read tokens to keyword list }}); return null; } }); JavaDStream<Boolean> wordPresent = lines.map(new Function<String, Boolean>() { @Override public Boolean call(String x) { return keywords.contains(x); //compare token received from socket against keywords list } }); JavaDStream<String> inputWords = lines.map(new Function<String, String>() { @Override public String call(String x) { return x; } }); wordPresent.print(); ssc.start(); ssc.awaitTermination(); } }对于本文中的示例,我们使用Maven安装和构建应用程序。 如果使用的是Maven,请确保在pom.xml中添加适当的依赖项。 依赖关系将主要是spark-core和spark-streaming库。
以下代码显示了我们的应用程序中使用的pom依赖关系的摘要:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.2.1</version> </dependency>编译应用程序并创建jar文件之后,使用以下命令将应用程序提交给Spark调度程序:
spark-submit --class "org.apache.spark.examples.streaming.KeywordDetect" --master local[4] target/KeyWord-1.0.jar rvm.svl.ibm.com 9212由于Spark实例在具有四个内核的单个主机上运行,因此我们在–master参数中使用local [4]值。 我们的应用程序接受两个参数:主机名和端口。
该应用程序假定在端口9212上运行着一个服务器进程并发布数据。 为了在测试环境中模拟服务器,我们使用nc(netcat)Linux命令: nc -l 9212 。
nc命令绑定到9212。我们在终端中传递的任何输入都将转发给所有正在侦听端口9212的客户端。
一切设置正确之后,提交的作业开始工作并监听端口9212。您应该在终端上收到以下确认消息:
15/09/06 01:43:31 INFO dstream.SocketReceiver: Connecting to rvm.svl.ibm.com:9121 15/09/06 01:43:31 INFO dstream.SocketReceiver: Connected to rvm.svl.ibm.com:9121现在,让我们更新程序使用的内部字典。 1.2节中的代码正在监听Hadoop目录/ tmp / Streamtest中的change事件。 如果尚未创建目录,请先创建目录,然后使用以下命令上载关键字文件:
hadoop fs -mkdir /tmp/Streamtest hadoop fs -put keywords /tmp/Streamtest当检测到新文件时,将执行后续的RDD。
15/09/06 01:54:25 INFO dstream.FileInputDStream: New files at time 1441529665000 ms: hdfs://rvm.svl.ibm.com:8020/tmp/Streamtest/keyword 15/09/06 01:54:25 INFO storage.MemoryStore: ensureFreeSpace(272214) called with curMem=109298, maxMem=278302556关键字之一是“风险”。 现在,我在nc中提交关键字,如以下清单所示
[root@rvm Desktop]# nc -l 9121 risk然后,Spark检测到该关键字,并在控制台上将其标记为true。
------------------------------------------- Time: 1441529995000 ms ------------------------------------------- true可以进一步增强此应用程序,以处理完整的字符串而不是单个令牌。
关键字检测状态可以写入文件或写入UI呈现服务的端口。
如果出现“拒绝连接错误”,可能是因为:
HDFS和另一个资源协商器(YARN)未运行 服务器进程未在源端口上运行。 在我们的示例中,源端口为9121。本文演示了如何使用Spark Streaming构建实时应用程序。 我们还重点介绍了Spark Streaming应用程序的构建基块。 使用此信息作为起点将帮助您使用Spark Streaming创建更复杂的应用程序。
翻译自: https://www.ibm.com/developerworks/data/library/techarticle/dm-1511-keyword-detection-spark-streaming-trs/index.html