Spark Streaming 2
基本的な話のまとめ.
一連のフロー
- Conf作ってStreamingContextに渡す
- インプットソースを決める
- Transformation(s)をして,DStreamに対する出力を決める
- start, awaitTermination, stopの順で呼ぶ
DStream (Discretized Stream)
入ってきたデータをDStream (Discretized Stream)でバッチに分割してバッチ毎の処理結果を返す.DStreamの実体はRDDのシークエンス
Input DStreams and Receivers
すべてのDStream (file stream以外)はReceiverに関連づけられている.
Receiverはデータソースからデータをとってメモリに載せる.
Multiple DataSourceも設定可能で,その場合はMultiple Receiverになる.
ローカルで動かすときは,“local[n]” as the master URL where n > number of receivers to runしろ.サーバで動かすときも同じ.
Basic Datasource
Custom Sources
自作のReceiverを作れる.
## Receiver Reliability
- Reliable Receiver
- 受け取ったデータをリプリケーションしてから,データを受け取ったという承認ができるデータソース(e.g., Flume, Kafka)からデータを受け取るReceiver
- Unreliable Receiver
- データを受け取ったという承認ができないデータソースからデータを受け取るReceiver
重要なTransformation
transform
既存のデータ(RDD)とDstream上のbatch(RDD)を結合したいときに使う.
例えば,
のときにパターンマッチしてフィルタする.
- イメージ
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... })
Window Operations
いわゆるSliding Windowの操作ができる.
- window length (= window size)
- sliding interval
を指定する.個々のデータのオーバーラップがあってもいい.
たとえば,window length = 3でslindnig interval = 2なら,1つのデータはオーバーラップする.ただし,両パラメータともDStreamのbatch interval の倍数でないといけない.
Output Operations on DStreams
実際の実行は,この操作でトリガーされる.RDDのaction相当.
- saveAsTextFiles
- saveAsObjectFiles
- saveAsHadoopFiles
のような基本的てきなメソッドは用意されている.
好きなアウトプットデータソースにデータを吐けばいい.
foreachRDD(func)のfuncを自分で定義して,データソースに吐けばいい.
Docではありがちなミス/非効率な処理を挙げて,それを避けるように以下のようにしろと言っている.ちなみにありがちなミス/非効率な処理は,
- Driverでコネクションを作成してしまう.コネクションオブジェクトはSerializeできないエラーがでたり,できたとしても,コネクションはDriver 2 DB間ではられるので,Workerで処理したデータがDBに吐かれることはない.
- partition単位でなく,record単位でコネクションを作ってしまうのでオーバーヘッドが大きい.
正しいサンプルコード
- 例1
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
- 例2
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
dstream.foreachRDDの中で何かしらactionを呼ばないと実際のデータ処理をしないので注意.
Caching / Persistence
RDDと同様に,stream.persist()でデータをメモリキャッシュできる.
window-based/state-based operationはデフォルトでメモリキャッシュする.
ネットワークがデータソースの場合 (e.g., Kafka, Flume, sockets, etc.)の場合は,2ノードにデータをリプリケーションするようにしている.
RDDと違って,デフォルトの永続化レベルはSeriazlied In-Memoryなので注意.
Checkpointing
ストーリーミングアプリケーションは24時間356日動いていなければならないので,Failureからのリカバリーは非常に重要.そこでFailureからの早期復旧をおこなうためにチェックポイントを正しく設けなければならない.
2つのタイプのチェックポイントがある.
- Metadata
- Configuration: Spark Configのこと(だろう)
- DStream Operation: Transformationのこと(だろう)
- Incomplete Batches: キューに入っているが終わってないBatches
- Data: 生成されたRDDの保存.状態を持つTransformationなら必要.なぜならそのような操作では,RDDの依存チェーンがあるので.
次の2タイプのアプリケーションではチェックポントを設けること.
- updateStateByKey/reduceByKeyAndWindowが使用される状態を持つTransformationがあるとき.定期的なRDDのチェックポイントを設ける
- DriverのFailure RecoveryにおけるMetadataのチェックポイント
状態を持つアプリケーションのチェックポイントを設ける例 (getOrCreateを使う)
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
Deploying Applications
アプリケーションのディプロイには次のもの/ことが必要
- Cluster with a cluster manager
- Package the application JAR
- Configuring sufficient memory for the executors
- Configuring checkpointing
- Configuring automatic restart of the application
- [Experimental in Spark 1.2] Configuring write ahead logs
それぞれ見ていく.
Cluster with a cluster manager
Spark ClusterかYARN Cluster,もしくはMesos
Package the application JAR
spark-submitでアプリケーションを起動するならSpark/Spark StreamingのライブラリはJARに詰めなくていいけれど,他のライブラリに依存するならそれをJARに入れること.
Configuring sufficient memory for the executors
入ってくるデータはメモリに保存なれてなければならない.メモリサイズの要件はアプリケーションによるけれど,アプリが10min windowなら10minはIn-Memoryでデータを処理しなければならない.
Configuring checkpointing
上記のCheckpointing参考にすること.
Configuring automatic restart of the application
Driver Failureに関してはモニタリングしないといけない.
- Standalone Modeならば,spark-submitで--superviseオプションをつける
- YARNも似たような仕組みあるのでそれを参考にする
- MesosはMarathonという仕組みがある
[Experimental in Spark 1.2] Configuring write ahead logs
Recieverが受け取ったデータをWALに書き込む仕様ができた.これがあるとDriver Failureからの復帰でゼロデータロスを保証する.WALをオンにするにはspark.streaming.receiver.writeAheadLogs.enable=trueにする.ログの2重保持を防ぐため,Recieverが受け取ったデータをのreplicationをオフにする.それはstorage levelをStorageLevel.MEMORY_AND_DISK_SERにすると可能.ただ,WALをオンにするとスループットが悪化するかも.
Upgrading Application Code
リアルタイムで動いているアプリケーションを止めずに,アプリケーションをアプグレードするには2つの方法がある.
- 新しいアプリケーションと既存アプリケーションを同時に走らせておいて,ある程度たったら,既存アプリケーションを止める.
- 既存アプリケーションをstreamingContext.stopでとれめると,受け取ったデータを処理し終わってから止めてくれる.そして新しいアプリケーションをスタートさせる.この方法だとデータソース側でデータのバッファリングが必要.なのでKafka/Flume等で行える.