前面有说道spark-streaming的简单demo,也有说到kafka成功跑通的例子,这里就结合二者,也是常用的使用之一。
1.相关组件版本
首先确认版本,因为跟之前的版本有些不一样,所以才有必要记录下,另外仍然没有使用scala,使用java8,spark 2.0.0,kafka 0.10。
2.引入maven包
网上找了一些结合的例子,但是跟我当前版本不一样,所以根本就成功不了,所以探究了下,列出引入包。
1 2 3 4 5
| <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.0.0</version> </dependency>
|
网上能找到的不带kafka版本号的包最新是1.6.3,我试过,已经无法在spark2下成功运行了,所以找到的是对应kafka0.10的版本,注意spark2.0的scala版本已经是2.11,所以包括之前必须后面跟2.11,表示scala版本。
3.SparkSteamingKafka类
需要注意的是引入的包路径是org.apache.spark.streaming.kafka010.xxx,所以这里把import也放进来了。其他直接看注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
public class SparkSteamingKafka { public static void main(String[] args) throws InterruptedException { String brokers = "master2:6667"; String topics = "topic1"; SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1)); Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", brokers) ; kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("group.id", "group1"); kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Map<TopicPartition, Long> offsets = new HashMap<>(); offsets.put(new TopicPartition("topic1", 0), 2L); JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets) ); JavaPairDStream<String, Integer> counts = lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator()) .mapToPair(x -> new Tuple2<String, Integer>(x, 1)) .reduceByKey((x, y) -> x + y); counts.print();
ssc.start(); ssc.awaitTermination(); ssc.close(); } }
|
4.运行测试
这里使用上一篇kafka初探里写的producer类,put数据到kafka服务端,我这是master2节点上部署的kafka,本地测试跑spark2。
1 2
| UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic) producerThread.start()
|
再运行3里的SparkSteamingKafka类,可以看到已经成功。


