読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

Spark Streaming 1

基本的に
https://spark.apache.org/docs/latest/streaming-programming-guide.html
のまとめ.

データソース

処理フロー

入ってきたデータをDStream (Discretized Stream)でバッチに分割してバッチ毎の処理結果を返す.DStreamの実体はRDDのシークエンス

f:id:KZKY:20141227225403p:plain

Spark1.2からはPython APIもSpark Streaming対応したが,まだインプットソースがテキストかTCP Socketだけ.

Quick Example

build.gradle

clouderaのmaven repoから持ってくる.

....
dependencies {
    compile group: 'commons-collections', name: 'commons-collections', version: '3.2'
    compile 'org.scala-lang:scala-library:2.11.2'
//    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.5.0-cdh5.3.0'
//    compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.2.0-cdh5.3.0'
	compile group: 'org.apache.spark', name: 'spark-graphx_2.10', version: '1.2.0-cdh5.3.0'
	compile group: 'org.apache.spark', name: 'spark-streaming_2.10', version: '1.2.0-cdh5.3.0'
    compile group: 'org.apache.spark', name: 'spark-yarn_2.10', version: '1.1.0-cdh5.2.0-SNAPSHOT'
    testCompile group: 'junit', name: 'junit', version: '4.+'
    
}
...

NetworkWordCount

package edu.kzk.streaming

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._


object NetworkWordCount {
    def main(args: Array[String]) {
        // Create a local StreamingContext with two working thread and batch interval of 1 second.
        // The master requires 2 cores to prevent from a starvation scenario.
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        val ssc = new StreamingContext(conf, Seconds(1)) // accessible to ssc.sparkContext
        
        // Create a DStream that will connect to hostname:port, like localhost:9999
        val lines = ssc.socketTextStream("localhost", 9999);
        
        // Split each line into words
        val words = lines.flatMap(_.split(" "));

        // Count each word in each batch
        val pairs = words.map(word => (word, 1));
        val wordCounts = pairs.reduceByKey(_ + _);

        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print();

        // To start the processing after all the transformations have been setup, we finally call
        ssc.start()             // Start the computation
        ssc.awaitTermination()  // Wait for the computation to terminate

    }
}

netcat

nc -lk 9999

Output

...
(foo,18)
(hello,27)
(world,27)
(hoge,36)
...