KZKY memo

自分用メモ.

Spark Streaming 3

Performance Tuning

基本的にはこれのまとめ

Streamingで気を配るは以下2点

  • クラスタのリソースを効率的に使用してのデータバッチの処理時間
  • バッチを受け取ってからすぐ処理できるような適切なバッチサイズ

Reducing the Processing Time of each Batch

Sparkに関してのパラメータチューニングは,Tuning Guideでいろいろ議論されてる.
ここでは,Streamingに重要な点をまとめる.

Level of Parallelism in Data Receiving

データの受け取りを最適化するならMultiple Data Streamにする.
Kafkaの例なら2つのトピックを2つのData Streamに分けて受け取ればいい.
ここは2スレッドで並列化される.
そして,それらを結合して処理可能.

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

他にinputStream.repartition()で分散化する方法もある.

他の重要なパラメータは,receiverのblocking interval (spark.streaming.blockInterval).普通,receiverはデータを受け取ってcoaleaseしていくつかのblocksを作る.この1blockに対してタスクが実行される.

{ \displaystyle
\#Tasks/(Receiver \times Batch) \propto \frac{Batch Interval}{Block Interval}
}

らしいので,例えば,200ms block intervalで2 sec batchなら10 tasksになる.

Level of Parallelism in Data Processing

クラスタリソースよりもリソースを使っていない場合にちゃんと使おう.
例えば,reduceByKeyにおいて並列タスク数はspark.default.parallelismで決められるけれど,numPartitionsを引数にいれて調整可能.

Data Serialization

の2点に注意.Sparkと違って,GC関連のポーズを最小化するために,RDDデータはデフォルトでSerialized In-Memoryになっている.

Task Launching Overheads

1secで何個もタスクがローンチされる(e.g., 50+ tasks)と高オーバーヘッド.

以下2点で改善する可能性あり.

  • Task Serialization: Kryo serializationを使う
  • Execution mode: Stand-aloneで起動する.もしくはMesosの粗モードで起動

Setting the Right Batch Size

良いbatch sizeを決めるにはまず,batch interval = 5-10 secくらいにかつ,data rateを小さくしてみる.システムがdata rateに対応するようならend-to-endのディレイを見て,それが維持されているようならシステムは安定的.一時的な data rateの上昇でディレイが大きくなるのは許容.
(要するに,要件に合うように見積もり立ててトライアンドエラーしろと)

Memory Tuning

SparkアプリのメモリチューニングはTuning Guideをみること.
Spark Streamingアプリのメモリ最適化の方針としては,GC関連のポーズを小さくして一貫したバッチ処理時間を確保する.

  • DStreamの永続化レベルはSeriazlied In-Memory
    • Ser/DeのオーバーヘッドもあるけれどGC関連のポーズは小さくなる
  • デフォルトでLRUによりSpark Streamingで生成されたRDDはメモリ上から消される
    • 賢く非永続化するならpark.streaming.unpersist=trueにする.
  • concurrent GCを使う
    • concurrent GCはシステム全体のスループットを下げるというのは周知だが,Spark Streamingにおけるbatch processing time一定にするには良い.