使用Spark Streaming进行关键字检测

    技术2024-04-22  250

    许多公司使用诸如Apache Hadoop的分布式文件系统来存储和分析数据。 通过离线Hadoop使用流分析,可以存储大量的大数据并实时分析这些数据。 本文向您展示了如何使用Spark Streaming启用实时关键字检测的示例。

    Spark Streaming是Spark API的扩展,可实现实时数据流的可伸缩,容错处理。 Spark Streaming具有大量的适配器,允许应用程序开发人员从各种来源读写数据,包括Hadoop分布式文件系统(HDFS),Kafka,Twitter等。

    先决条件

    软件先决条件: IBMInfoSphere®BigInsights 4.0或更高版本以及Apache Maven。 业务先决条件:中级Java™开发技能以及Hadoop和Spark的入门知识。

    解决方案概述

    Spark Streaming应用程序由一个或多个互连的离散化流(DStreams)组成。 每个DStream由一系列弹性分布式数据集(RDD)组成,它们是不可变分布式数据集的抽象。 Spark支持不同的应用程序开发语言,包括Java,Scala和Python。 在本文中,我们将使用Java语言向您展示开发关键字检测应用程序的分步方法。

    图1显示了关键字检测应用程序的高级视图。

    什么是弹性分布式数据集(RDD)?

    弹性分布式数据集(RDD)是对象的不可变集合。 每个对象都在群集的各个节点上进行分区和并行执行。

    图1.关键字检测应用程序图

    图1中的组件说明

    SocketTextStream允许您在传输控制协议(TCP)套接字上绑定和侦听消息。 SocketTextStream的输出被馈送到使用当前关键字列表查找匹配令牌的自定义流中。 TextFileStream用于监视Hadoop目录。 只要检测到一个新的文件,它会读取文件,并将其由读转换成DStreams.The值TextFileStream用于更新内部关键字列表中使用自定义逻辑。 关键字检测逻辑使用更新的关键字列表,因此该图使用虚线表示这种关系。

    每个组件的实现细节

    每个Spark Streaming应用程序都以流上下文开始,如以下代码片段所示。 “上下文”要求您传递定义批次的工期参数。

    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));

    socketTextStream绑定到指定的主机接口和给定的端口号,并生成DStreams。

    JavaReceiverInputDStream lines = ssc.socketTextStream( hostname, port, StorageLevels.MEMORY_AND_DISK_SER);

    textFileStream使用textFile从Hadoop读取并行关键字词典文件。 处理文件,并在内部列表中更新关键字。

    JavaDStream<String> filelines = ssc.textFileStream("/tmp/Streamtest"); JavaDStream<String> updatedKeyWords = filelines.flatMap(new FlatMapFunction<String,String>() { @Override public Iterable<String> call(String x) { final Pattern SPACE = Pattern.compile(" "); String[] vec=SPACE.split(x); List<String> ls=Arrays.asList(vec); return ls; } }); updatedKeyWords.foreachRDD(new Function<JavaRDD<String>, Void> (){ public Void call(JavaRDD<String> rdd) { rdd.foreach(new VoidFunction<String>(){ @Override public void call(String x){ if(x!=null) keywords.add(x); }}); return null;

    从SocketStream读取的SocketStream用于与关键字列表进行比较,如以下代码所示。 当您使用命令wordPresent.print();时,结果将显示在控制台上wordPresent.print(); 。

    JavaDStream<Boolean> wordPresent = lines.map(new Function<String, Boolean>() { @Override public Boolean call(String x) { return keywords.contains(x); } }); wordPresent.print();

    下面的清单显示了本文中使用的示例的完整代码清单。

    public final class KeywordDetect { private static final Pattern SPACE = Pattern.compile(" "); public static List<String> keywords=new ArrayList<String>(); public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: KeywordDetect <hostname> <port> <words>"); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("KeywordDetect"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5)); JavaDStream<String> filelines = ssc.textFileStream("/tmp/Streamtest"); JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); keywords.add("initial"); //Initialize keyword list JavaDStream<String> updatedKeyWords = filelines.flatMap(new FlatMapFunction<String,String>() { @Override public Iterable<String> call(String x) { final Pattern SPACE = Pattern.compile(" "); String[] vec=SPACE.split(x); List<String> ls=Arrays.asList(vec); return ls; } }); updatedKeyWords.foreachRDD(new Function<JavaRDD<String>, Void> (){ public Void call(JavaRDD<String> rdd) { rdd.foreach(new VoidFunction<String>(){ @Override public void call(String x){ //x=x+1; if(x!=null) keywords.add(x); //add newly read tokens to keyword list }}); return null; } }); JavaDStream<Boolean> wordPresent = lines.map(new Function<String, Boolean>() { @Override public Boolean call(String x) { return keywords.contains(x); //compare token received from socket against keywords list } }); JavaDStream<String> inputWords = lines.map(new Function<String, String>() { @Override public String call(String x) { return x; } }); wordPresent.print(); ssc.start(); ssc.awaitTermination(); } }

    编译并启动程序

    对于本文中的示例,我们使用Maven安装和构建应用程序。 如果使用的是Maven,请确保在pom.xml中添加适当的依赖项。 依赖关系将主要是spark-core和spark-streaming库。

    以下代码显示了我们的应用程序中使用的pom依赖关系的摘要:

    <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.2.1</version> </dependency>

    编译应用程序并创建jar文件之后,使用以下命令将应用程序提交给Spark调度程序:

    spark-submit --class "org.apache.spark.examples.streaming.KeywordDetect" --master local[4] target/KeyWord-1.0.jar rvm.svl.ibm.com 9212

    由于Spark实例在具有四个内核的单个主机上运行,​​因此我们在–master参数中使用local [4]值。 我们的应用程序接受两个参数:主机名和端口。

    该应用程序假定在端口9212上运行着一个服务器进程并发布数据。 为了在测试环境中模拟服务器,我们使用nc(netcat)Linux命令: nc -l 9212 。

    nc命令绑定到9212。我们在终端中传递的任何输入都将转发给所有正在侦听端口9212的客户端。

    一切设置正确之后,提交的作业开始工作并监听端口9212。您应该在终端上收到以下确认消息:

    15/09/06 01:43:31 INFO dstream.SocketReceiver: Connecting to rvm.svl.ibm.com:9121 15/09/06 01:43:31 INFO dstream.SocketReceiver: Connected to rvm.svl.ibm.com:9121

    现在,让我们更新程序使用的内部字典。 1.2节中的代码正在监听Hadoop目录/ tmp / Streamtest中的change事件。 如果尚未创建目录,请先创建目录,然后使用以下命令上载关键字文件:

    hadoop fs -mkdir /tmp/Streamtest hadoop fs -put keywords /tmp/Streamtest

    当检测到新文件时,将执行后续的RDD。

    15/09/06 01:54:25 INFO dstream.FileInputDStream: New files at time 1441529665000 ms: hdfs://rvm.svl.ibm.com:8020/tmp/Streamtest/keyword 15/09/06 01:54:25 INFO storage.MemoryStore: ensureFreeSpace(272214) called with curMem=109298, maxMem=278302556

    关键字之一是“风险”。 现在,我在nc中提交关键字,如以下清单所示

    [root@rvm Desktop]# nc -l 9121 risk

    然后,Spark检测到该关键字,并在控制台上将其标记为true。

    ------------------------------------------- Time: 1441529995000 ms ------------------------------------------- true

    未来的增强

    可以进一步增强此应用程序,以处理完整的字符串而不是单个令牌。

    关键字检测状态可以写入文件或写入UI呈现服务的端口。

    例外条件

    如果出现“拒绝连接错误”,可能是因为:

    HDFS和另一个资源协商器(YARN)未运行 服务器进程未在源端口上运行。 在我们的示例中,源端口为9121。

    结论

    本文演示了如何使用Spark Streaming构建实时应用程序。 我们还重点介绍了Spark Streaming应用程序的构建基块。 使用此信息作为起点将帮助您使用Spark Streaming创建更复杂的应用程序。


    翻译自: https://www.ibm.com/developerworks/data/library/techarticle/dm-1511-keyword-detection-spark-streaming-trs/index.html

    Processed: 0.033, SQL: 9