java8实现spark wordcount并且按照value排序输出

最近在学习spark,本来应该是使用scala编程,但是无奈scala没接触过,还得学,就先使用java的spark api练练手,其实发现java8的函数式编程跟scala很多地方异曲同工啊,搞定spark的java api后面学scala应该事半功倍!
最开始当然是万年不变的wordcount,加了个排序输出,具体看注释(^o^)/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SparkConf conf = new SparkConf().setMaster("local").setAppName("word count");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("xxx.txt");
//使用java8提供的lambda表达式
//一句可以完成wordcount
JavaPairRDD<String, Integer> counts =
lines
//根据'\001'分割来源行
.flatMap(line -> Arrays.asList(line.split("\\001")).iterator())
//map输出 (单词,1)
.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
//通过reduce相同key,value值相加
.reduceByKey((x, y) -> x + y);

//根据得到的value值排序而不是key,默认只提供了根据key排序
//那么想到的思路是,交换key、value,进行key排序,交换回来,完成value排序
counts
//交换key-value,注意类型
.mapToPair(s -> new Tuple2<Integer, String>(s._2, s._1))
//倒序
.sortByKey(false)
//交换key-value,注意类型
.mapToPair(s -> new Tuple2<String, Integer>(s._2, s._1))
//转成集合
.collect()
//输出
.forEach(tuple -> System.out.println(tuple._1() + ": " + tuple._2()));

java8实现spark wordcount并且按照value排序输出
https://www.920929.xyz/posts/f705c1e1.html
作者
DELIN
发布于
2016年12月14日
许可协议