読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

Scala akka 導入

基本

http://www.slideshare.net/sifue/akka-39611889

が超わかりやすい.
並列処理の基本とJavaのconcurrent packageとを引き合いにして,
Akka Documentをかなり完結にまとめている.

Akkaとは

状態とアドレス,MailBoxをもった1つのThreadであるActorを提供,管理してくれる並列分散処理のフレームワーク

シングルノードの環境だったらJavaのExecutorServiceのような感じだが,
マルチノードでも行えるし,状態とアドレス,MailBoxを1つのThreadが持っているイメージ.

何ができる

同期/非同期,ブロッキング/ノンブロッキング,イベントドリブンで対障害性を持つ分散/並列プログラミングが可能

  • 完全非同期メッセージパッシング
  • 準非同期メッセージパッシング ~= GraphX
  • ブローカーシステム ~= Kafka, RabbitMQ
  • ビッグデータ解析 ~= Spark
  • 大規模機械学習 ~= MLLib
  • 大規模分散ストリーム処理 ~= Spark Streaming, Storm
  • Transactionも可能らしい

なんでもできると言いたいが,基本は

で,タスクを小さくしたメッセージパッシングを行うのが妥当だと思う.

Javaのマルチスレッドではデータをメモリで共有していいるので,TaskをThreadに投げるけれど,メッセージパッシングでは,小さいメッセージ(命令)を投げるのがいいと思う.というのも,基本はマルチノードでデータがネットワーク上をトラバースするので大きいメッセージは投げないほうがいい.そして,ワーカーとなるActorでメッセージによって違うタスクを実行する.

専門用語

Concurrency vs. Parallelism
Asynchronous vs. Synchronous
Non-blocking vs. Blocking
Deadlock vs. Starvation vs. Live-lock

http://doc.akka.io/docs/akka/2.3.8/general/terminology.html

Hello Akka

環境

  • Eclipse Kepler
  • Scala IDE 3.0 (Scala 2.10)
  • compile group: 'com.typesafe.akka', name: 'akka-actor_2.10', version: '2.3.8'
SampleActor
package akka_sample.edu.kzk

import akka.actor.Actor

class SampleActor extends Actor {
    def receive = {
      case i: Int => {
        println(s"Message ${i} is received.");
        sender() ! "Reply Mesage!";
      } 
    }
}
HelloActor
package akka_sample.edu.kzk

import java.util.concurrent.TimeUnit

// import scala.concurrent.ExecutionContext.Implicits.global // onCompleteで必要
import scala.util.Success

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.pattern.ask // ?で必要
import akka.util.Timeout

object HelloActor extends App {

    // Create an actor system
    val actorSystem = ActorSystem("hello");

    // Create an actor
    val actor = actorSystem.actorOf(Props[SampleActor]);


    // Tell asynchronouly
    actor ! 1;

    // Ask asynchronouly
    implicit val timeout = Timeout(5000, TimeUnit.MILLISECONDS); // ?で必要
    val f = actor ? 2;

    import actorSystem.dispatcher // ExecutionContextは,これでもいい.

    f.onComplete {
    case Success(msg) => println(msg)
    }
    
    actorSystem.shutdown
}

http://doc.akka.io/docs/akka/2.0/intro/getting-started-first-scala.html

も参考になる.

やっていることは

  • Master/Worker/ListenerをActorを継承して定義
  • ApplicationでMaster/Listenerを作成
  • MasterはApplicationからメッセージをもらうとWokersを作成
  • Wokersはタスクを実行して,Masterへメッセージを送る
  • MasterはWokersからメッセージを集め終わったら,タスクを実行し,Listenerへメッセージを送る
  • ListenerはMasterからメッセージを受け取るとタスクを実行


あまりまとめてないがとりあえずここまで.

これからはJavaではなく,Scala + Akkaの時代だと思う.