Spark Streaming 1
基本的に
https://spark.apache.org/docs/latest/streaming-programming-guide.html
のまとめ.
処理フロー
入ってきたデータをDStream (Discretized Stream)でバッチに分割してバッチ毎の処理結果を返す.DStreamの実体はRDDのシークエンス
Spark1.2からはPython APIもSpark Streaming対応したが,まだインプットソースがテキストかTCP Socketだけ.
Quick Example
build.gradle
clouderaのmaven repoから持ってくる.
.... dependencies { compile group: 'commons-collections', name: 'commons-collections', version: '3.2' compile 'org.scala-lang:scala-library:2.11.2' // compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.5.0-cdh5.3.0' // compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.2.0-cdh5.3.0' compile group: 'org.apache.spark', name: 'spark-graphx_2.10', version: '1.2.0-cdh5.3.0' compile group: 'org.apache.spark', name: 'spark-streaming_2.10', version: '1.2.0-cdh5.3.0' compile group: 'org.apache.spark', name: 'spark-yarn_2.10', version: '1.1.0-cdh5.2.0-SNAPSHOT' testCompile group: 'junit', name: 'junit', version: '4.+' } ...
NetworkWordCount
package edu.kzk.streaming import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object NetworkWordCount { def main(args: Array[String]) { // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); val ssc = new StreamingContext(conf, Seconds(1)) // accessible to ssc.sparkContext // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999); // Split each line into words val words = lines.flatMap(_.split(" ")); // Count each word in each batch val pairs = words.map(word => (word, 1)); val wordCounts = pairs.reduceByKey(_ + _); // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print(); // To start the processing after all the transformations have been setup, we finally call ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } }
netcat
nc -lk 9999
Output
... (foo,18) (hello,27) (world,27) (hoge,36) ...