Kafka+SparkStreaming-实战入门(Java版)

    技术2023-09-22  55

    Producer.class

    package com.fromjoker; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class MyKafka { public static Map<String, String> kafkaConsumerParams = new HashMap<>(); public static Properties kafkaProducerParams = new Properties(); public static String SERVER = "kafka-0.kafka.default.svc.cluster.local:9092,kafka-1.kafka.default.svc.cluster.local:9092,kafka-2.kafka.default.svc.cluster.local:9092"; public static Map<String, String> consumerDefaultConf(){ kafkaConsumerParams.put("metadata.broker.list",SERVER); kafkaConsumerParams.put("group.id", "apsnew2"); kafkaConsumerParams.put("fetch.message.max.bytes", "52428800"); return kafkaConsumerParams; } public static void producerDefaultConf(){ kafkaProducerParams.put("bootstrap.servers",SERVER); kafkaProducerParams.put("acks", "all"); kafkaProducerParams.put("retries", 0); kafkaProducerParams.put("batch.size", 16384); kafkaProducerParams.put("linger.ms", 1); kafkaProducerParams.put("buffer.memory", 33554432); kafkaProducerParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProducerParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); } public static void producer(String topic,String key,String value){ producerDefaultConf(); Producer<String, String> producer = new KafkaProducer<>(kafkaProducerParams); String key = "who am i"; String value = "i am joker"; try { producer.send(new ProducerRecord<String, String>(topic,key,value)); }catch(Exception e){ e.printStackTrace(); }finally { producer.close(); } } }

    Consumer.class

    package com.fromjoker; import kafka.serializer.StringDecoder; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.util.HashSet; import java.util.Set; //标准工时看板 public class MyConsumer { private static Logger logger = Logger.getLogger(MyConsumer.class); public static SparkSession spark = null; public static void main(String[] args)throws Exception{ logger.getLogger("org").setLevel(Level.ERROR); spark = new MySpark().init(); JavaStreamingContext streamingContext = new JavaStreamingContext(JavaSparkContext.fromSparkContext(spark.sparkContext()), new Duration(5000)); Set<String> topicsSet = new HashSet<>(); topicsSet.add("test"); JavaPairInputDStream<String, String> message = KafkaUtils.createDirectStream( streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, MyKafka.consumerDefaultConf(), topicsSet ); JavaDStream<String> valueDStream = message.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> v1) throws Exception { return v1._1()+"_"+v1._2; } }); valueDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { @Override public void call(JavaRDD<String> json) throws Exception { if(!json.isEmpty()) { json.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); } } }); streamingContext.start(); streamingContext.awaitTermination(); } }

    MySpark.class

    package com.fromjoker; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; public class MySpark { public SparkSession spark = null; public SparkConf conf = null; public SparkSession init(){ SparkSession.clearDefaultSession(); conf = new SparkConf(); defaultconf(); spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate(); return spark; } public SparkSession getSpark() { return spark; } public void setSpark(SparkSession spark) { this.spark = spark; } public SparkConf getConf() { return conf; } public void setConf(SparkConf conf) { this.conf = conf; } //spark默认通用参数 public SparkConf defaultconf(){ //conf.setMaster("yarn"); conf.setMaster("local[4]"); conf.setAppName("aps"); conf.set("spark.port.maxRetries", "1280"); conf.set("spark.sql.warehouse.dir", "hdfs://hadoop-master-0.hadoop-master.default.svc.cluster.local:8020/user/hive/warehouse"); //忽略损坏的分区数据 conf.set("spark.sql.hive.verifyPartitionPath", "true"); conf.set("spark.io.compression.codec", "snappy"); conf.set("spark.testing.memory", "2147480000");//后面的值大于512m即可 // conf.set("spark.shuffle.service.enabled", "true"); conf.set("spark.dynamicAllocation.enabled","false"); conf.set("spark.streaming.stopGracefullyOnShutdown","true");//优雅的关闭 conf.set("spark.rpc.askTimeout", "600s"); conf.set("spark.sql.autoBroadcastJoinThreshold", "-1"); // 任务运行核心数 conf.set("spark.executor.cores", "4"); conf.set("spark.cores.max", "21"); // executor运行内存 conf.set("spark.executor.memory", "6g"); conf.set("spark.driver.memory", "15g"); conf.set("spark.sql.shuffle.partitions", "400"); //机器之间的可以重用的网络连接,主要用于在大型集群中减小网络连接的建立开销,如果一个集群的机器并不多,可以考虑增加这个值 conf.set("spark.shuffle.io.numConnectionsPerPeer", "10"); // 关闭自动推测分区字段的类型 conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false"); // 动态分区 conf.set("hive.exec.dynamic.partition","true"); conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false"); conf.set("hive.exec.dynamic.partition.mode", "nonstrict"); conf.set("dfs.client.socket-timeout","300000"); //通过主机名访问(master设置datanode的内网ip时,客户端获取的是内网地址,无法访问) conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); return conf; } //手动设置spark参数 public void setsparkconf(String parameter,String size){ conf.set(parameter,size); } }
    Processed: 0.022, SQL: 9