読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

怒涛のAkka: Routing

基本

Router Actorは別のActorにラップして使用する.
理由は,Router ActorはRouter Actorをextendして使うわけではなく,実装も普通のActorとは違うため,別のActorにラップしてsupervised strategyを適用できるようにしたいから.

Routerの基本的使い方

  • Worker extends Actor
    • recieveで何か処理
  • Mater extends Actor
    • router作る
      • context.actorOf(RoundRobinPool(5).props(Props[Worker]), "routername")
    • recieveでrouter.forward(${message})

routeeはmessageを送った人にメッセージを返すのでforwardにしている.
そうしないと,Router ActorをラップしているMasterにmessageが返される.
Masterにmessageを返したいなら,router.forward(message)は特に必要なし.

動くサンプル

  • Worker
package edu.kzk.actor.router

import akka.actor.Actor
import akka.pattern._

class Worker extends Actor {
  def receive = {
    case x: String => sender ! x
  }
}
  • Master
package edu.kzk.actor.router

import akka.actor.Actor
import akka.actor.Props
import akka.routing.RoundRobinPool
import akka.pattern._

class Master extends Actor {
  val nWorkers = 5;
  val workerRouterPool =
    context.actorOf(Props[Worker].withRouter(RoundRobinPool(nWorkers)), name = "workerRouter")

  def receive = {
    case x: String => workerRouterPool.forward(x)
  }

}
  • RouterApp
package edu.kzk.actor.router

import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern._
import scala.concurrent.Future
import akka.util.Timeout

object RouterApp extends App {

  implicit val system = ActorSystem("routing-actor-system");
  val master = system.actorOf(Props[Master])

  import system.dispatcher;
  implicit val timeout = Timeout(100000)

  val futures = for (i <- 0 to 100) yield {
    val future: Future[Any] = master ? s"message ${i}";
    future.onComplete { x => println(x.get) }
  }

  Thread.sleep(5000);

  system.shutdown()
}

他の例は単なるスニペットなので注意.

Router Actor

Pool/Group

  • Pool
    • routerの子供としてroutee actorができる.routeeが止まるとrouteeを取り除く.
  • Group
    • routerの子供でないroutee actorができる.routeeの監視しない.

Pool

こんな感じで作る

val router2: ActorRef =
    context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")

configからでも作れる.

akka.actor.deployment {
    /parent/router1 {
        router = round-robin-pool
        nr-of-instances = 5
    }
}

val router1: ActorRef =
    context.actorOf(FromConfig.props(Props[Worker]), "router1")

Senders

routeeで

sender() ! x

はsender()が自分.

router actorに返事をしたいときもあるのでそれをやる場合は

sender().!("返事の内容")(context.parent)

Supervision

RouterPoolを使うときは,supervised strategyをちゃんとセットする.
でないと,

  • routee error -> router error -> top-level actor
  • router restart
  • routees restart

で1つrouteeeが止まったら,みんな止まるので,

val escalator = OneForOneStrategy() {
    case e => testActor ! e; SupervisorStrategy.Escalate
}
val router = system.actorOf(RoundRobinPool(1, supervisorStrategy = escalator).props(
routeeProps = Props[TestActor]))

のようにしたほうがいい.

注意点

  • routeeが止まっても,routerは再起動させない
  • all routeesが止まると,routerも止まる (dynamic routerはその限りでない)

Group

RouterGroupにActorsのパスを渡す.

Configの場合

akka.actor.deployment {
    /parent/router3 {
        router = round-robin-group
        routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
    }
}

val router3: ActorRef =
     context.actorOf(FromConfig.props(), "router3")

コードの場合は

val router4: ActorRef =
    context.actorOf(RoundRobinGroup(paths).props(), "router4")

な感じ.

そして,routeeは別に作る.例えば,

class Workers extends Actor {
     context.actorOf(Props[Worker], name = "w1")
     context.actorOf(Props[Worker], name = "w2")
     context.actorOf(Props[Worker], name = "w3")
     // ...
system.actorOf(Props[Workers], "workers")

な感じ.

リモートアクターの場合も可能.

Router Usage

簡単にまとめる.

  • RoundRobinPool and RoundRobinGroup
    • RoudRobin-fashionでrouting
  • RandomPool and RandomGroup
    • ランダムにrouting
  • BalancingPool
    • 忙しいrouteeからidleなrouteeに再度メッセージを配布
    • すべてのrouteeはmailboxをシェアしている
  • SmallestMailboxPool
    • idleでmailboxが空ののrouteeに優先してrouting
  • BroadcastPool and BroadcastGroup
    • すべてのrouteeにメッセージを送る
  • ScatterGatherFirstCompletedPool and ScatterGatherFirstCompletedGroup
    • すべてのrouteeにメッセージを送る
    • 一番はじめに帰ってきたリスポンスを元の送り主に送る
    • それ以外のリスポスは捨てる
  • TailChoppingPool and TailChoppingGroup
    • routeeを1つランダムに選択してメッセージを送る
    • routeeが次のrouteeを1つランダムに選択(前の以外)してメッセージを送る
    • 続ける
    • 一番はじめに帰ってきたリスポンスを元の送り主に送る
  • ConsistentHashingPool and ConsistentHashingGroup
    • ConsistentHashingでroueeを選択

詳しくはDoc参考.

Specially Handled Messages

通常はメッセージはrouteeに送られるが,特別にハンドリングされるメッセージがある.

Broadcast Messages

Broadcast(payload)のpalyloadがrouteeに送られる.

PoisonPill Messages

routeeに送られない.送りたかったら,Broadcastでラップ.

Kill Messages

routeeに送られない.送りたかったら,Broadcastでラップ.

Management Messages

これらはrouterで処理されると思う (明確に書いてない)

  • akka.routing.GetRoutees
  • akka.routing.AddRoutee
  • akka.routing.RemoveRoutee
  • akka.routing.AdjustPoolSize

Dynamically Resizable Pool

Routeeのリサイジングが可能.

例えば

  • Config
akka.actor.deployment {
    /parent/router29 {
        router = round-robin-pool
        resizer {
            lower-bound = 2
            upper-bound = 15
            messages-per-resize = 100
        }
    }
}
  • Code
val router29: ActorRef =
    context.actorOf(FromConfig.props(Props[Worker]), "router29")

Custom Router

Doc参考.用意されているRouterで事足ることがほとんどだと思う.

Configuring Dispatchers

routeeのdispathcerの設定がconfigで可能.

  • Config
akka.actor.deployment {
    /poolWithDispatcher {
        router = random-pool
        nr-of-instances = 5
        pool-dispatcher {
            fork-join-executor.parallelism-min = 5
            fork-join-executor.parallelism-max = 5
        }
    }
}
  • Code
val router: ActorRef = system.actorOf(
    // “head” router actor will run on "router-dispatcher" dispatcher
    // Worker routees will run on "pool-dispatcher" dispatcher
    RandomPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]),
        name = "poolWithDispatcher")
    ...