怒涛のAkka: Fault Tolerance
Fault Handling in Practice
ここに書いてあるダイアグラムとコードを読むべき.Fault-Toleranceなsystemの設計指針がわかる.
Creating a Supervisor Strategy
こんな感じでStrategryを定義できる
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }
Default Supervisor Strategy
いつもうまいことやってくれるわけではない.
デフォルトがある.
- ActorInitializationException: 子アクターを止める
- ActorKilledException: 子アクターを止める
- Exception: 子アクターをリスタート
- Other types of Throwable: 親アクターにエスカレート
こんな感じでデフォルトストラテジに新しい定義を係合可能
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Resume case t => super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate) }
root guardianまで例外がエスカレートされると,Default Supervisor Strategyで対応.
Stopping Supervisor Strategy
StoppingSupervisorStrategyというものがあって,エラーで子アクター止めるて,DeathWatchで親は矯正的な動作をとるらしい.よくわからん.
Logging of Actor Failures
- エラーがエスカレーションされない限りはFailureログが残る.
- SupervisorStrategyはabstractになっている.
- SupervisorStrategy.loggingEnabledをfalseでロギングしない設定が可能.
- SupervisorStrategy#logFailureのoverrideでFailure時のログのカスタマイズ可能.
Supervision of Top-Level Actors
Top-Level Actorsはsystem.actorOf()で作られたActorのことでuser guardian (ルートActorの下にぶら下がっているアクターの1つ)にぶら下がる.
Test Application
akka-testkit_2.11, scalatest_2.11をdependencyに入れておくこと.
docのサンプルは動かない(AkkaSpecがないため)が,下記サンプルは動く.
TestクラスでTestKit(ActorSystem("TestKitUsageSpec"))とほかのtraitをいろいろwithすること.
tell msg_0; sender() ! msg_1; expectMsg(msg_2) のtripletでテストが基本のよう.
junitで実行するので,@RunWith(classOf[JUnitRunner])をつけておく.
- Actors
package edu.kzk.actor.strategy import akka.actor.Props import akka.actor.Actor class Supervisor extends Actor { import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate } def receive = { case p: Props => sender() ! context.actorOf(p) } } class Supervisor2 extends Actor { import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate } def receive = { case p: Props => sender() ! context.actorOf(p) } // override default to kill all children during restart override def preRestart(cause: Throwable, msg: Option[Any]) {} } class Child extends Actor { var state = 0 def receive = { case ex: Exception => throw ex case x: Int => state = x case "get" => sender() ! state } }
- FaultHandlingDocSpec
package edu.kzk.actor.strategy import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.BeforeAndAfterAll import org.scalatest.Matchers import org.scalatest.WordSpecLike import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Props import akka.actor.Terminated import akka.actor.actorRef2Scala import akka.testkit.DefaultTimeout import akka.testkit.EventFilter import akka.testkit.ImplicitSender import akka.testkit.TestKit import com.typesafe.config.ConfigFactory @RunWith(classOf[JUnitRunner]) class FaultHandlingDocSpec extends TestKit(ActorSystem("TestKitUsageSpec", ConfigFactory.parseString(""" akka.loggers = ["akka.testkit.TestEventListener"] """))) with DefaultTimeout with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { //import FaultHandlingDocSpec._ "A supervisor" must { "apply the chosen strategy for its child" in { val supervisor = system.actorOf(Props[Supervisor], "supervisor") supervisor ! Props[Child] val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor EventFilter.warning(occurrences = 1) intercept { println("EventFilter.warning(occurrences = 1)") child ! 42 // set state to 42 child ! "get" println(expectMsg(42)) child ! new ArithmeticException // crash it child ! "get" println(expectMsg(42)) } EventFilter[NullPointerException](occurrences = 1) intercept { println(EventFilter[NullPointerException](occurrences = 1)) child ! new NullPointerException // crash it harder child ! "get" println(expectMsg(0)) } EventFilter[IllegalArgumentException](occurrences = 1) intercept { println(EventFilter[IllegalArgumentException](occurrences = 1)) watch(child) // have testActor watch “child” child ! new IllegalArgumentException // break it expectMsgPF() { case Terminated(`child`) => () } } EventFilter[Exception]("CRASH", occurrences = 2) intercept { println(EventFilter[Exception]("CRASH", occurrences = 2)) supervisor ! Props[Child] // create new child val child2 = expectMsgType[ActorRef] watch(child2) child2 ! "get" // verify it is alive println(expectMsg(0)) child2 ! new Exception("CRASH") // escalate failure expectMsgPF() { case t @ Terminated(`child2`) if t.existenceConfirmed => () } // As default policy, top-level actor restarts childs, which is not desired in some cases. // so as to override preStart in Supervisor2 val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2") supervisor2 ! Props[Child] val child3 = expectMsgType[ActorRef] child3 ! 23 child3 ! "get" println(expectMsg(23)) child3 ! new Exception("CRASH") child3 ! "get" println(expectMsg(0)) } } } }
EventFilterを使うときは
akka.loggers = ["akka.testkit.TestEventListener"]
をハードコードもしくはConfigに書くことを忘れないこと.
でないと,occurence>=1のときにinterceptでblockしてそこから,テストが進まない.
Resume/Restartの違いは,Actorの内部状態が残る/残らないなのか?
余談
公式Docで,明らかに動きそうなコードなのに,動かないコードをはりつけるのはやめてほしい.ナイトメア.
参考
http://doc.akka.io/docs/akka/2.3.9/scala/fault-tolerance.html
http://doc.akka.io/docs/akka/2.3.9/scala/fault-tolerance-sample.html
http://doc.akka.io/docs/akka/snapshot/scala/testkit-example.html
https://github.com/akkadotnet/akka.net/issues/323
http://techblog.net-a-porter.com/2014/01/safer-testing-with-akka-eventfilter/