KZKY memo

自分用メモ.

怒涛のAkka: Fault Tolerance

Fault Handling in Practice

ここに書いてあるダイアグラムとコードを読むべき.Fault-Toleranceなsystemの設計指針がわかる.

Creating a Supervisor Strategy

こんな感じでStrategryを定義できる

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
 
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException => Resume
    case _: NullPointerException => Restart
    case _: IllegalArgumentException => Stop
    case _: Exception => Escalate
}

Default Supervisor Strategy

いつもうまいことやってくれるわけではない.
デフォルトがある.

  • ActorInitializationException: 子アクターを止める
  • ActorKilledException: 子アクターを止める
  • Exception: 子アクターをリスタート
  • Other types of Throwable: 親アクターにエスカレート

こんな感じでデフォルトストラテジに新しい定義を係合可能

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
 
override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException => Resume
    case t =>
    super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
}

root guardianまで例外がエスカレートされると,Default Supervisor Strategyで対応.

Stopping Supervisor Strategy

StoppingSupervisorStrategyというものがあって,エラーで子アクター止めるて,DeathWatchで親は矯正的な動作をとるらしい.よくわからん.

Logging of Actor Failures

  • エラーがエスカレーションされない限りはFailureログが残る.
  • SupervisorStrategyはabstractになっている.
  • SupervisorStrategy.loggingEnabledをfalseでロギングしない設定が可能.
  • SupervisorStrategy#logFailureのoverrideでFailure時のログのカスタマイズ可能.

Supervision of Top-Level Actors

Top-Level Actorsはsystem.actorOf()で作られたActorのことでuser guardian (ルートActorの下にぶら下がっているアクターの1つ)にぶら下がる.

Test Application

akka-testkit_2.11, scalatest_2.11をdependencyに入れておくこと.
docのサンプルは動かない(AkkaSpecがないため)が,下記サンプルは動く.

TestクラスでTestKit(ActorSystem("TestKitUsageSpec"))とほかのtraitをいろいろwithすること.
tell msg_0; sender() ! msg_1; expectMsg(msg_2) のtripletでテストが基本のよう.
junitで実行するので,@RunWith(classOf[JUnitRunner])をつけておく.

  • Actors
package edu.kzk.actor.strategy

import akka.actor.Props
import akka.actor.Actor

class Supervisor extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException => Resume
      case _: NullPointerException => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception => Escalate
    }

  def receive = {
    case p: Props => sender() ! context.actorOf(p)
  }
}

class Supervisor2 extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException => Resume
      case _: NullPointerException => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception => Escalate
    }

  def receive = {
    case p: Props => sender() ! context.actorOf(p)
  }
  // override default to kill all children during restart
  override def preRestart(cause: Throwable, msg: Option[Any]) {}
}

class Child extends Actor {
  var state = 0
  def receive = {
    case ex: Exception => throw ex
    case x: Int => state = x
    case "get" => sender() ! state
  }
}
  • FaultHandlingDocSpec
package edu.kzk.actor.strategy

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Terminated
import akka.actor.actorRef2Scala
import akka.testkit.DefaultTimeout
import akka.testkit.EventFilter
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory

@RunWith(classOf[JUnitRunner])
class FaultHandlingDocSpec extends TestKit(ActorSystem("TestKitUsageSpec",
  ConfigFactory.parseString("""
          akka.loggers = ["akka.testkit.TestEventListener"]
          """))) with DefaultTimeout with ImplicitSender
    with WordSpecLike with Matchers with BeforeAndAfterAll {
  //import FaultHandlingDocSpec._

  "A supervisor" must {

    "apply the chosen strategy for its child" in {

      val supervisor = system.actorOf(Props[Supervisor], "supervisor")

      supervisor ! Props[Child]
      val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor

      EventFilter.warning(occurrences = 1) intercept {
        println("EventFilter.warning(occurrences = 1)")
        child ! 42 // set state to 42
        child ! "get"
        println(expectMsg(42))

        child ! new ArithmeticException // crash it
        child ! "get"
        println(expectMsg(42))
      }

      EventFilter[NullPointerException](occurrences = 1) intercept {
        println(EventFilter[NullPointerException](occurrences = 1))
        child ! new NullPointerException // crash it harder
        child ! "get"
        println(expectMsg(0))
      }

      EventFilter[IllegalArgumentException](occurrences = 1) intercept {
        println(EventFilter[IllegalArgumentException](occurrences = 1))
        watch(child) // have testActor watch “child”
        child ! new IllegalArgumentException // break it
        expectMsgPF() { case Terminated(`child`) => () }
      }

      EventFilter[Exception]("CRASH", occurrences = 2) intercept {
        println(EventFilter[Exception]("CRASH", occurrences = 2))
        supervisor ! Props[Child] // create new child
        val child2 = expectMsgType[ActorRef]

        watch(child2)
        child2 ! "get" // verify it is alive
        println(expectMsg(0))

        child2 ! new Exception("CRASH") // escalate failure
        expectMsgPF() {
          case t @ Terminated(`child2`) if t.existenceConfirmed => ()
        }

        // As default policy, top-level actor restarts childs, which is not desired in some cases.
        // so as to override preStart in Supervisor2
        val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")

        supervisor2 ! Props[Child]
        val child3 = expectMsgType[ActorRef]

        child3 ! 23
        child3 ! "get"
        println(expectMsg(23))

        child3 ! new Exception("CRASH")
        child3 ! "get"
        println(expectMsg(0))
      }

    }
  }
}

EventFilterを使うときは
akka.loggers = ["akka.testkit.TestEventListener"]
をハードコードもしくはConfigに書くことを忘れないこと.
でないと,occurence>=1のときにinterceptでblockしてそこから,テストが進まない.

Resume/Restartの違いは,Actorの内部状態が残る/残らないなのか?

余談

公式Docで,明らかに動きそうなコードなのに,動かないコードをはりつけるのはやめてほしい.ナイトメア.