KZKY memo

自分用メモ.

怒涛のAkka: Actors

Actor API

extends Actorしたときに使える APIの説明

  • context.actorOfでchild actorつくる
  • start/stopのhook

etc.

Actor Lifecycle

登場人物
  • ActorPath
  • ActorSelection
    • pathの表現
    • actor refの名前解決
  • ActorRef
    • incarnationの表現
    • pathを持つ
    • uidを持つ
  • Actor Incarnation
    • pathに所属
    • uidを持つ
    • mailboxを持つ
依存関係

Actorの構成要素の依存関係

Actor -> context=ActorContext -> self = ActorRef -> path=ActorPath

Lifecycle Monitoring aka DeathWatch

あるActorの子供をwatch対象にする

import akka.actor.{ Actor, Props, Terminated }
 
class WatchActor extends Actor {
      val child = context.actorOf(Props.empty, "child")
      context.watch(child) // <-- this is the only call needed for registration
      var lastSender = system.deadLetters

      def receive = {
          case "kill" =>
          context.stop(child); lastSender = sender()
          case Terminated(`child`) => lastSender ! "finished"
      }
}

Identifying Actors via Actor Selection

pathを使って,actorのlook-upをする.

import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated }

class Follower extends Actor {
  val identifyId = 1
  context.actorSelection("/user/another") ! Identify(identifyId)

  def receive = {
    case ActorIdentity(`identifyId`, Some(ref)) =>
      context.watch(ref)
      context.become(active(ref))
    case ActorIdentity(`identifyId`, None) => context.stop(self)

  }

  def active(another: ActorRef): Actor.Receive = {
    case Terminated(`another`) => context.stop(self)
  }
}

Messages and immutability

case classにしてimmutableにすること.
そうするとreceiver側でmatchがうまく使える.

Send messages

  • tell: fire-and-forget
  • ask: send-and-receive-future

お決まりの,"!"と"?". "?"を使うには,import akka.pattern._をする.

askのcomman-pattern

  • いくつかのメッセージを別のアクターに投げる.
  • for-comprehensionでまとめて,Futureを作る
  • pipeToで別のActorに渡す (future.onComplete扱いなのでnon-blocking)
import akka.pattern.{ ask, pipe }
import system.dispatcher // The ExecutionContext that will be used
case class Result(x: Int, s: String, d: Double)
case object Request

implicit val timeout = Timeout(5 seconds) // needed for `?` below

val f: Future[Result] =
    for {
        x <- ask(actorA, Request).mapTo[Int] // call pattern directly
        s <- (actorB ask Request).mapTo[String] // call by implicit conversion
        d <- (actorC ? Request).mapTo[Double] // call by symbolic name
    } yield Result(x, s, d)

f pipeTo actorD // .. or ..
pipe(f) to actorD
いくつかの注意点
  • askは内部で別のActorを生成して,replyを受け取る.
  • exceptionが起こっても,Failureメッセージは自動で送られないので,自分で処理
try {
    val result = operation()
    sender() ! result
} catch {
    case e: Exception =>
    sender() ! akka.actor.Status.Failure(e)
    throw e
}

Forward message

targetはcontext.actorSelectionで持ってくればいいのか?

    target forward message

Receive messages

def receive = {
    case xxx => processing
    case yyy => processing
}

Reply to messages

sender() ! result

Receive timeout

in Actor class

context.setReceiveTimeout(30 milliseconds)
context.setReceiveTimeout(Duration.Undefined)

で設定できる.

Stopping actors

処理中のメッセージは処理するけれど,残りのmailboxにあるメッセージは処理しないで,deadlettersに送られる.(それは,mailboxのインプリに依存らしい.)

PoisonPill

このメッセージより前にあるメッセージが処理されてから,Actorが止まる.

Graceful Stop

Graceful stopのサンプルが載っている.
PoisonPillをworkerに送って,cleanupのtaskはpostStopに書く.

Become/Unbecome

actorの振る舞いhotswapを行うための機能.
becomeがどういう動作をするのか知りたかったのでコードを書いた.
becomeに渡されたメソッドが,今後のメッセージハンドラーとして使用される.

  • BecomeActor
package edu.kzk.actor.become

import akka.actor.Actor
import akka.actor.actorRef2Scala
import scala.concurrent.duration.Duration

class BecomeActor extends Actor {
  import context._

  def angry: Receive = {
    case "angry" => sender() ! "I am already angry"
    case "happy" => become(happy)
  }

  def happy: Receive = {
    case "happy" => sender() ! "I am already happy"
    case "angry" => become(angry)
  }

  def receive = {
    case "angry" => {
      println("original angry")
      become(angry);
    }
    case "happy" => {
      println("original happy")
      become(happy);
    }
  }
}
  • BecomeApp
package edu.kzk.actor.become

import scala.util.Success
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration.Duration

object BecomeApp extends App {

  // Create an actor system
  val actorSystem = ActorSystem("become");

  // Create an actor
  val actor = actorSystem.actorOf(Props[BecomeActor]);

  // Ask asynchronously
  implicit val timeout = Timeout(5000, TimeUnit.MILLISECONDS); // ?で必要

  import actorSystem.dispatcher

  val f1 = actor ! "angry";
  Thread.sleep(1000);
  val f2 = actor ? "angry";
  Thread.sleep(1000);
  val f3 = actor ! "happy";
  Thread.sleep(1000);
  val f4 = actor ? "happy";
  Thread.sleep(1000);
  val f5 = actor ! "angry";
  Thread.sleep(1000);
  val f6 = actor ? "angry";

  f2.onComplete {
    case Success(msg) => println(msg)
  }

  f4.onComplete {
    case Success(msg) => println(msg)
  }

  f6.onComplete {
    case Success(msg) => println(msg)
  }

  actorSystem.shutdown
  //actorSystem.awaitTermination(Duration(5000, TimeUnit.MILLISECONDS))
}

Stash

mailboxにあるメッセージを隠す.消すわけではないので,隠したメッセージはunstashしたら通常通り処理される.
become/unbecomeと一緒に使用されるケースが多いと思われる.

使う場合は

class XxxService extends Actor with Stach

これだとstashがboundedらしいので,unboundedならUnboundedStashを使う.

class XxxService extends Actor with UnboundedStash

Killing an Actor

アクターをkillできる.

// kill the 'victim' actor
victim ! Kill

ActorKilledExceptionが発生するので,このハンドルはsupervised strategyに依存する.

Actors and exceptions

What happens to the Message

メッセージ処理中のエラーでその処理中のメッセージは消えるので注意.
メッセージ,DB接続などのエラー処理は自分でやってね.

What happens to the mailbox

メッセージ処理中のエラーでもActorのリスタートでもメッセージボックス本体に影響はない.同じメールボックスが使用される.

What happens to the actor

Actor内で例外が起こると,Actorは一時停止して,監視プロセスが動く.
どうなるか,リジューム,リスタート,止めるなのどの動作はスーパーバイザー依存.

Extending Actors using PartialFunction chaining

Actor.ReceiveはPartialFunction[Any, Unit]のエイリアスなので,チェーンができる.method#orElseを使う.

例えばproduer/consumerを両方効きのクラスを作りたい場合

  • producer/consumerをtraitで作って,Actor.Receiveを返すメソッドを定義.
  • これらをwithするクラスを作る
  • そのクラスの中で,producer#method orElse consumer#method

object compositionでもいい.

Initialization patterns

Actorのいろんなinitパターンがある.
Actorはresume/restartすることに注意する.

Initialization via constructor

基本はこれでいいかもしれないが,リスタート時に子Actorをとっておくのが良い場合もある.次の2つはこのケースのパターンの説明.

Initialization via preStart

  • preStartをオーバーライドして子Actorのinitをする.
  • postRestartをオーバーライドして何もしない.本来preStartが呼ばれる.
  • preRestartをオーバーライドしてpostStopを呼ぶ.

Initialization via message passing

サイクリックな依存関係がある場合はコンストラクタでinitするのは無理なので,メッセージ経由でinitする.init中に他のメッセージが来る可能性があるので気をつける.