怒涛のAkka: Actors
Actor API
extends Actorしたときに使える APIの説明
- context.actorOfでchild actorつくる
- start/stopのhook
etc.
Actor Lifecycle
- 図を見る
登場人物
- ActorPath
- ActorSelection
- pathの表現
- actor refの名前解決
- ActorRef
- incarnationの表現
- pathを持つ
- uidを持つ
- Actor Incarnation
- pathに所属
- uidを持つ
- mailboxを持つ
Lifecycle Monitoring aka DeathWatch
あるActorの子供をwatch対象にする
import akka.actor.{ Actor, Props, Terminated } class WatchActor extends Actor { val child = context.actorOf(Props.empty, "child") context.watch(child) // <-- this is the only call needed for registration var lastSender = system.deadLetters def receive = { case "kill" => context.stop(child); lastSender = sender() case Terminated(`child`) => lastSender ! "finished" } }
Identifying Actors via Actor Selection
pathを使って,actorのlook-upをする.
import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated } class Follower extends Actor { val identifyId = 1 context.actorSelection("/user/another") ! Identify(identifyId) def receive = { case ActorIdentity(`identifyId`, Some(ref)) => context.watch(ref) context.become(active(ref)) case ActorIdentity(`identifyId`, None) => context.stop(self) } def active(another: ActorRef): Actor.Receive = { case Terminated(`another`) => context.stop(self) } }
Messages and immutability
case classにしてimmutableにすること.
そうするとreceiver側でmatchがうまく使える.
Send messages
- tell: fire-and-forget
- ask: send-and-receive-future
お決まりの,"!"と"?". "?"を使うには,import akka.pattern._をする.
askのcomman-pattern
- いくつかのメッセージを別のアクターに投げる.
- for-comprehensionでまとめて,Futureを作る
- pipeToで別のActorに渡す (future.onComplete扱いなのでnon-blocking)
import akka.pattern.{ ask, pipe } import system.dispatcher // The ExecutionContext that will be used case class Result(x: Int, s: String, d: Double) case object Request implicit val timeout = Timeout(5 seconds) // needed for `?` below val f: Future[Result] = for { x <- ask(actorA, Request).mapTo[Int] // call pattern directly s <- (actorB ask Request).mapTo[String] // call by implicit conversion d <- (actorC ? Request).mapTo[Double] // call by symbolic name } yield Result(x, s, d) f pipeTo actorD // .. or .. pipe(f) to actorD
いくつかの注意点
- askは内部で別のActorを生成して,replyを受け取る.
- exceptionが起こっても,Failureメッセージは自動で送られないので,自分で処理
try { val result = operation() sender() ! result } catch { case e: Exception => sender() ! akka.actor.Status.Failure(e) throw e }
Forward message
targetはcontext.actorSelectionで持ってくればいいのか?
target forward message
Receive messages
def receive = { case xxx => processing case yyy => processing }
Reply to messages
sender() ! result
Receive timeout
in Actor class
context.setReceiveTimeout(30 milliseconds)
context.setReceiveTimeout(Duration.Undefined)
で設定できる.
Stopping actors
処理中のメッセージは処理するけれど,残りのmailboxにあるメッセージは処理しないで,deadlettersに送られる.(それは,mailboxのインプリに依存らしい.)
PoisonPill
このメッセージより前にあるメッセージが処理されてから,Actorが止まる.
Graceful Stop
Graceful stopのサンプルが載っている.
PoisonPillをworkerに送って,cleanupのtaskはpostStopに書く.
Become/Unbecome
actorの振る舞いhotswapを行うための機能.
becomeがどういう動作をするのか知りたかったのでコードを書いた.
becomeに渡されたメソッドが,今後のメッセージハンドラーとして使用される.
- BecomeActor
package edu.kzk.actor.become import akka.actor.Actor import akka.actor.actorRef2Scala import scala.concurrent.duration.Duration class BecomeActor extends Actor { import context._ def angry: Receive = { case "angry" => sender() ! "I am already angry" case "happy" => become(happy) } def happy: Receive = { case "happy" => sender() ! "I am already happy" case "angry" => become(angry) } def receive = { case "angry" => { println("original angry") become(angry); } case "happy" => { println("original happy") become(happy); } } }
- BecomeApp
package edu.kzk.actor.become import scala.util.Success import java.util.concurrent.TimeUnit import akka.actor.ActorSystem import akka.actor.Props import akka.actor.actorRef2Scala import akka.pattern.ask import akka.util.Timeout import scala.concurrent.duration.Duration object BecomeApp extends App { // Create an actor system val actorSystem = ActorSystem("become"); // Create an actor val actor = actorSystem.actorOf(Props[BecomeActor]); // Ask asynchronously implicit val timeout = Timeout(5000, TimeUnit.MILLISECONDS); // ?で必要 import actorSystem.dispatcher val f1 = actor ! "angry"; Thread.sleep(1000); val f2 = actor ? "angry"; Thread.sleep(1000); val f3 = actor ! "happy"; Thread.sleep(1000); val f4 = actor ? "happy"; Thread.sleep(1000); val f5 = actor ! "angry"; Thread.sleep(1000); val f6 = actor ? "angry"; f2.onComplete { case Success(msg) => println(msg) } f4.onComplete { case Success(msg) => println(msg) } f6.onComplete { case Success(msg) => println(msg) } actorSystem.shutdown //actorSystem.awaitTermination(Duration(5000, TimeUnit.MILLISECONDS)) }
Stash
mailboxにあるメッセージを隠す.消すわけではないので,隠したメッセージはunstashしたら通常通り処理される.
become/unbecomeと一緒に使用されるケースが多いと思われる.
使う場合は
class XxxService extends Actor with Stach
これだとstashがboundedらしいので,unboundedならUnboundedStashを使う.
class XxxService extends Actor with UnboundedStash
Killing an Actor
アクターをkillできる.
// kill the 'victim' actor
victim ! Kill
ActorKilledExceptionが発生するので,このハンドルはsupervised strategyに依存する.
Actors and exceptions
What happens to the Message
メッセージ処理中のエラーでその処理中のメッセージは消えるので注意.
メッセージ,DB接続などのエラー処理は自分でやってね.
What happens to the mailbox
メッセージ処理中のエラーでもActorのリスタートでもメッセージボックス本体に影響はない.同じメールボックスが使用される.
What happens to the actor
Actor内で例外が起こると,Actorは一時停止して,監視プロセスが動く.
どうなるか,リジューム,リスタート,止めるなのどの動作はスーパーバイザー依存.
Extending Actors using PartialFunction chaining
Actor.ReceiveはPartialFunction[Any, Unit]のエイリアスなので,チェーンができる.method#orElseを使う.
例えばproduer/consumerを両方効きのクラスを作りたい場合
- producer/consumerをtraitで作って,Actor.Receiveを返すメソッドを定義.
- これらをwithするクラスを作る
- そのクラスの中で,producer#method orElse consumer#method
object compositionでもいい.
Initialization patterns
Actorのいろんなinitパターンがある.
Actorはresume/restartすることに注意する.
Initialization via constructor
基本はこれでいいかもしれないが,リスタート時に子Actorをとっておくのが良い場合もある.次の2つはこのケースのパターンの説明.
Initialization via preStart
- preStartをオーバーライドして子Actorのinitをする.
- postRestartをオーバーライドして何もしない.本来preStartが呼ばれる.
- preRestartをオーバーライドしてpostStopを呼ぶ.
Initialization via message passing
サイクリックな依存関係がある場合はコンストラクタでinitするのは無理なので,メッセージ経由でinitする.init中に他のメッセージが来る可能性があるので気をつける.