spark-streaming
spark-streaming
Spark Streaming
Spark Streaming介绍
http://spark.apache.org/docs/latest/streaming-programming-guide.html
一个简单例子
package com.didi;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
* Hello Spark.
*/
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("StreamingWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
ssc.checkpoint(".");
// initial state rdd
List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("hello", 100), new Tuple2<String, Integer>("word", 100));
JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples);
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(x -> new Tuple2<String, Integer>(x, 1));
// update function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> updateFunc = (word, count, state) -> {
int sum = count.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
};
//update in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
pairs.mapWithState(StateSpec.function(updateFunc).initialState(initialRDD));
//print
stateDstream.print();
//start
ssc.start();
ssc.awaitTermination();
}
}