当Spark从1.x升级到2.x时,如果使用SparkStreaming加载Kafka的数据,即使Kafka版本没有变化【一般会有所升级】,对应的spark-streaming-kafka也必须升级到对应版本,访问方式也会有所变化。 此处是从Spark1.6.0升级到Spark2.4.3,Kafka略有升级【从2.1.0升级到2.2.1】,初始使用的是:
import org.apache.spark.streaming.kafka.KafkaUtils val dframe = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)来加载数据的,当升级后,就算是更换对应的spark-streaming-kafka-0-10_2.11-2.4.3还是会报错,报错信息如下:
Spark Streaming: java.lang.NoClassDefFoundError:kafka/api/TopicMetadataRequest或者是各种类找不到!
此时就需要更换新的访问方式:
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocaltionStrategies.PreferConsistent val dframe = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topic, kafkaParams))1.spark1.6.0使用方式:
dframe.foreachRDD(rdd =>{ rdd.foreachPartition(partition =>{ partition.foreach(record => { val key = record._1 val value = record._2 } } }2.spark2.4.x使用方式:
dframe.foreachRDD(rdd =>{ rdd.foreachPartition(partition =>{ partition.foreach(record => { val key = record.key() val value = record.value() } } }备注:kafka里面的每条数据都是按照(key, value)存储的。