怒涛のAkka: Routing
基本
Router Actorは別のActorにラップして使用する.
理由は,Router ActorはRouter Actorをextendして使うわけではなく,実装も普通のActorとは違うため,別のActorにラップしてsupervised strategyを適用できるようにしたいから.
Routerの基本的使い方
- Worker extends Actor
- recieveで何か処理
- Mater extends Actor
- router作る
- context.actorOf(RoundRobinPool(5).props(Props[Worker]), "routername")
- recieveでrouter.forward(${message})
- router作る
routeeはmessageを送った人にメッセージを返すのでforwardにしている.
そうしないと,Router ActorをラップしているMasterにmessageが返される.
Masterにmessageを返したいなら,router.forward(message)は特に必要なし.
動くサンプル
- Worker
package edu.kzk.actor.router import akka.actor.Actor import akka.pattern._ class Worker extends Actor { def receive = { case x: String => sender ! x } }
- Master
package edu.kzk.actor.router import akka.actor.Actor import akka.actor.Props import akka.routing.RoundRobinPool import akka.pattern._ class Master extends Actor { val nWorkers = 5; val workerRouterPool = context.actorOf(Props[Worker].withRouter(RoundRobinPool(nWorkers)), name = "workerRouter") def receive = { case x: String => workerRouterPool.forward(x) } }
- RouterApp
package edu.kzk.actor.router import akka.actor.ActorSystem import akka.actor.Props import akka.pattern._ import scala.concurrent.Future import akka.util.Timeout object RouterApp extends App { implicit val system = ActorSystem("routing-actor-system"); val master = system.actorOf(Props[Master]) import system.dispatcher; implicit val timeout = Timeout(100000) val futures = for (i <- 0 to 100) yield { val future: Future[Any] = master ? s"message ${i}"; future.onComplete { x => println(x.get) } } Thread.sleep(5000); system.shutdown() }
他の例は単なるスニペットなので注意.
Router Actor
Pool/Group
- Pool
- routerの子供としてroutee actorができる.routeeが止まるとrouteeを取り除く.
- Group
- routerの子供でないroutee actorができる.routeeの監視しない.
Pool
こんな感じで作る
val router2: ActorRef = context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")
configからでも作れる.
akka.actor.deployment {
/parent/router1 {
router = round-robin-pool
nr-of-instances = 5
}
}
と
val router1: ActorRef = context.actorOf(FromConfig.props(Props[Worker]), "router1")
Senders
routeeで
sender() ! x
はsender()が自分.
router actorに返事をしたいときもあるのでそれをやる場合は
sender().!("返事の内容")(context.parent)
Supervision
RouterPoolを使うときは,supervised strategyをちゃんとセットする.
でないと,
で1つrouteeeが止まったら,みんな止まるので,
val escalator = OneForOneStrategy() { case e => testActor ! e; SupervisorStrategy.Escalate } val router = system.actorOf(RoundRobinPool(1, supervisorStrategy = escalator).props( routeeProps = Props[TestActor]))
のようにしたほうがいい.
注意点
- routeeが止まっても,routerは再起動させない
- all routeesが止まると,routerも止まる (dynamic routerはその限りでない)
Group
RouterGroupにActorsのパスを渡す.
Configの場合
akka.actor.deployment { /parent/router3 { router = round-robin-group routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"] } }
と
val router3: ActorRef = context.actorOf(FromConfig.props(), "router3")
コードの場合は
val router4: ActorRef = context.actorOf(RoundRobinGroup(paths).props(), "router4")
な感じ.
そして,routeeは別に作る.例えば,
class Workers extends Actor { context.actorOf(Props[Worker], name = "w1") context.actorOf(Props[Worker], name = "w2") context.actorOf(Props[Worker], name = "w3") // ...
system.actorOf(Props[Workers], "workers")
な感じ.
リモートアクターの場合も可能.
Router Usage
簡単にまとめる.
- RoundRobinPool and RoundRobinGroup
- RoudRobin-fashionでrouting
- RandomPool and RandomGroup
- ランダムにrouting
- BalancingPool
- 忙しいrouteeからidleなrouteeに再度メッセージを配布
- すべてのrouteeはmailboxをシェアしている
- SmallestMailboxPool
- idleでmailboxが空ののrouteeに優先してrouting
- BroadcastPool and BroadcastGroup
- すべてのrouteeにメッセージを送る
- ScatterGatherFirstCompletedPool and ScatterGatherFirstCompletedGroup
- すべてのrouteeにメッセージを送る
- 一番はじめに帰ってきたリスポンスを元の送り主に送る
- それ以外のリスポスは捨てる
- TailChoppingPool and TailChoppingGroup
- routeeを1つランダムに選択してメッセージを送る
- routeeが次のrouteeを1つランダムに選択(前の以外)してメッセージを送る
- 続ける
- 一番はじめに帰ってきたリスポンスを元の送り主に送る
- ConsistentHashingPool and ConsistentHashingGroup
- ConsistentHashingでroueeを選択
詳しくはDoc参考.
Specially Handled Messages
通常はメッセージはrouteeに送られるが,特別にハンドリングされるメッセージがある.
Broadcast Messages
Broadcast(payload)のpalyloadがrouteeに送られる.
PoisonPill Messages
routeeに送られない.送りたかったら,Broadcastでラップ.
Kill Messages
routeeに送られない.送りたかったら,Broadcastでラップ.
Management Messages
これらはrouterで処理されると思う (明確に書いてない)
- akka.routing.GetRoutees
- akka.routing.AddRoutee
- akka.routing.RemoveRoutee
- akka.routing.AdjustPoolSize
Dynamically Resizable Pool
Routeeのリサイジングが可能.
例えば
- Config
akka.actor.deployment { /parent/router29 { router = round-robin-pool resizer { lower-bound = 2 upper-bound = 15 messages-per-resize = 100 } } }
- Code
val router29: ActorRef = context.actorOf(FromConfig.props(Props[Worker]), "router29")
Custom Router
Doc参考.用意されているRouterで事足ることがほとんどだと思う.
Configuring Dispatchers
routeeのdispathcerの設定がconfigで可能.
- Config
akka.actor.deployment { /poolWithDispatcher { router = random-pool nr-of-instances = 5 pool-dispatcher { fork-join-executor.parallelism-min = 5 fork-join-executor.parallelism-max = 5 } } }
- Code
val router: ActorRef = system.actorOf( // “head” router actor will run on "router-dispatcher" dispatcher // Worker routees will run on "pool-dispatcher" dispatcher RandomPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]), name = "poolWithDispatcher") ...