読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

Spark Streaming 2

基本的な話のまとめ.

一連のフロー

  • Conf作ってStreamingContextに渡す
  • インプットソースを決める
  • Transformation(s)をして,DStreamに対する出力を決める
  • start, awaitTermination, stopの順で呼ぶ

DStream (Discretized Stream)

入ってきたデータをDStream (Discretized Stream)でバッチに分割してバッチ毎の処理結果を返す.DStreamの実体はRDDのシークエンス

Input DStreams and Receivers

すべてのDStream (file stream以外)はReceiverに関連づけられている.
Receiverはデータソースからデータをとってメモリに載せる.
Multiple DataSourceも設定可能で,その場合はMultiple Receiverになる.

ローカルで動かすときは,“local[n]” as the master URL where n > number of receivers to runしろ.サーバで動かすときも同じ.

Basic Datasource

  • socketStream
    • socketからbyteで受け取る
  • socketTextStream
    • socketからtextで受け取る
  • fileStream
    • HDFS, S3, NFS, etcから受け取る
  • actorStream
    • akka actorから受け取る

Advanced Datasource

依存関係のコンフリクトを避けるために,他のlibで提供になっている.

Custom Sources

自作のReceiverを作れる.

## Receiver Reliability

  • Reliable Receiver
    • 受け取ったデータをリプリケーションしてから,データを受け取ったという承認ができるデータソース(e.g., Flume, Kafka)からデータを受け取るReceiver
  • Unreliable Receiver
    • データを受け取ったという承認ができないデータソースからデータを受け取るReceiver

重要なTransformation

transform

既存のデータ(RDD)とDstream上のbatch(RDD)を結合したいときに使う.
例えば,

  • 既存のデータ(RDD) = span DB
  • batch(RDD) = streamで入ってきたデータ

のときにパターンマッチしてフィルタする.

  • イメージ
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

Window Operations

いわゆるSliding Windowの操作ができる.

f:id:KZKY:20141227230949p:plain

  • window length (= window size)
  • sliding interval

を指定する.個々のデータのオーバーラップがあってもいい.
たとえば,window length = 3でslindnig interval = 2なら,1つのデータはオーバーラップする.ただし,両パラメータともDStreamのbatch interval の倍数でないといけない.

Output Operations on DStreams

実際の実行は,この操作でトリガーされる.RDDのaction相当.

  • saveAsTextFiles
  • saveAsObjectFiles
  • saveAsHadoopFiles

のような基本的てきなメソッドは用意されている.

好きなアウトプットデータソースにデータを吐けばいい.
foreachRDD(func)のfuncを自分で定義して,データソースに吐けばいい.

Docではありがちなミス/非効率な処理を挙げて,それを避けるように以下のようにしろと言っている.ちなみにありがちなミス/非効率な処理は,

  • Driverでコネクションを作成してしまう.コネクションオブジェクトはSerializeできないエラーがでたり,できたとしても,コネクションはDriver 2 DB間ではられるので,Workerで処理したデータがDBに吐かれることはない.
  • partition単位でなく,record単位でコネクションを作ってしまうのでオーバーヘッドが大きい.

正しいサンプルコード

  • 例1
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
  • 例2
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

dstream.foreachRDDの中で何かしらactionを呼ばないと実際のデータ処理をしないので注意.

Caching / Persistence

RDDと同様に,stream.persist()でデータをメモリキャッシュできる.
window-based/state-based operationはデフォルトでメモリキャッシュする.

ネットワークがデータソースの場合 (e.g., Kafka, Flume, sockets, etc.)の場合は,2ノードにデータをリプリケーションするようにしている.

RDDと違って,デフォルトの永続化レベルはSeriazlied In-Memoryなので注意.

Checkpointing

ストーリーミングアプリケーションは24時間356日動いていなければならないので,Failureからのリカバリーは非常に重要.そこでFailureからの早期復旧をおこなうためにチェックポイントを正しく設けなければならない.

2つのタイプのチェックポイントがある.

  • Metadata
    • Configuration: Spark Configのこと(だろう)
    • DStream Operation: Transformationのこと(だろう)
    • Incomplete Batches: キューに入っているが終わってないBatches
  • Data: 生成されたRDDの保存.状態を持つTransformationなら必要.なぜならそのような操作では,RDDの依存チェーンがあるので.

次の2タイプのアプリケーションではチェックポントを設けること.

  • updateStateByKey/reduceByKeyAndWindowが使用される状態を持つTransformationがあるとき.定期的なRDDのチェックポイントを設ける
  • DriverのFailure RecoveryにおけるMetadataのチェックポイント
状態を持つアプリケーションのチェックポイントを設ける例 (getOrCreateを使う)
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

Deploying Applications

アプリケーションのディプロイには次のもの/ことが必要

  • Cluster with a cluster manager
  • Package the application JAR
  • Configuring sufficient memory for the executors
  • Configuring checkpointing
  • Configuring automatic restart of the application
  • [Experimental in Spark 1.2] Configuring write ahead logs

それぞれ見ていく.

Cluster with a cluster manager

Spark ClusterかYARN Cluster,もしくはMesos

Package the application JAR

spark-submitでアプリケーションを起動するならSpark/Spark StreamingのライブラリはJARに詰めなくていいけれど,他のライブラリに依存するならそれをJARに入れること.

Configuring sufficient memory for the executors

入ってくるデータはメモリに保存なれてなければならない.メモリサイズの要件はアプリケーションによるけれど,アプリが10min windowなら10minはIn-Memoryでデータを処理しなければならない.

Configuring checkpointing

上記のCheckpointing参考にすること.

Configuring automatic restart of the application

Driver Failureに関してはモニタリングしないといけない.

  • Standalone Modeならば,spark-submitで--superviseオプションをつける
  • YARNも似たような仕組みあるのでそれを参考にする
  • MesosはMarathonという仕組みがある

[Experimental in Spark 1.2] Configuring write ahead logs

Recieverが受け取ったデータをWALに書き込む仕様ができた.これがあるとDriver Failureからの復帰でゼロデータロスを保証する.WALをオンにするにはspark.streaming.receiver.writeAheadLogs.enable=trueにする.ログの2重保持を防ぐため,Recieverが受け取ったデータをのreplicationをオフにする.それはstorage levelをStorageLevel.MEMORY_AND_DISK_SERにすると可能.ただ,WALをオンにするとスループットが悪化するかも.

Upgrading Application Code

リアルタイムで動いているアプリケーションを止めずに,アプリケーションをアプグレードするには2つの方法がある.

  • 新しいアプリケーションと既存アプリケーションを同時に走らせておいて,ある程度たったら,既存アプリケーションを止める.
  • 既存アプリケーションをstreamingContext.stopでとれめると,受け取ったデータを処理し終わってから止めてくれる.そして新しいアプリケーションをスタートさせる.この方法だとデータソース側でデータのバッファリングが必要.なのでKafka/Flume等で行える.