怒涛のAkka: FSM
基本
Finate State Machineとは
- 状態
- イベント
- アクション
のtripletで構成される.
ある状態の時にあるイベントが起こるとあるアクションをとって別状態 に移動するというマシン.ただし,とり得る状態は有限.
APIと設計
Event/State/Dataを定義する.
Event
- case class Xxx(x, y): 引数ありの場合
- case object: 引数なしの場合
State
- sealed trait State
- case object Xxx extnds State
- case object Yyy extnds State
Data
- sealed trait Data
- case object Xxx extnds Data
- case class Yyy(x, y) extnds Data
これはStateに付随するData.
Eventは
case class Event[D](event: Any, stateData: D) extends NoSerializationVerificationNeeded
のように定義されているので,EventクラスでevnetとDataが紐付けられているよう.
FSM
FSM[State, Data] というActorを継承したAPIが用意されている.
なので,FSMを継承したクラスを作り,次のメソッドを追加する.
startWith(State, Data)
初期状態とその時のデータを入れる.
when(State, FiniteDuration)(StateFunction)
StateFunctionがPartialFunctionになっているので,Stateの時に,他のStateが来た時の挙動をPartialFunctionで書く.
基本は,goto(State) or stayを書いてusing Dataと使いたいDataを書く.
e.g.,
goto(Xxx) using Yyy(x, y)
whenUnhandled(StateFunction)
whenで挙動な記述されてない時にこれが呼ばれる.
使い方は基本whenと同じ.
onTransition(transitionHandler)
ここにActionを書く.
transitionHandlerはPartialFunctionになっていて,ある状態->ある状態の変数と,actionを書く.
initialize()
initial stateへのtransition.
注意するのは,FSM ActorはSingle Threadで動くので,Eventのオーダーを気にする必要はないということ.
Sample
Gradle
... dependencies { compile group: 'commons-collections', name: 'commons-collections', version: '3.2' compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.3.9' compile group: 'com.typesafe.akka', name: 'akka-testkit_2.11', version: '2.3.9' compile group: 'org.scalatest', name: 'scalatest_2.11', version: '2.2.4' testCompile group: 'junit', name: 'junit', version: '4.+' } ...
State.scala
package edu.kzk.actor.fsm import akka.actor.ActorRef import scala.collection.immutable.Seq // states sealed trait State case object Idle extends State case object Active extends State sealed trait Data case object Uninitialized extends Data case class Todo(target: ActorRef, queue: Seq[Any]) extends Data
Event.scala
package edu.kzk.actor.fsm import akka.actor.ActorRef import scala.collection.immutable.Seq // received events case class SetTarget(ref: ActorRef) case class Queue(obj: Any) case object Flush // sent events case class Batch(obj: Seq[Any])
Buncher.scala
package edu.kzk.actor.fsm import akka.actor.FSM import akka.actor.FSM.Event import scala.concurrent.duration._ class Buncher extends FSM[State, Data] { startWith(Idle, Uninitialized) when(Idle) { case Event(SetTarget(ref), Uninitialized) => stay using Todo(ref, Vector.empty) } // transition elided ... when(Active, stateTimeout = 1 second) { case Event(Flush | StateTimeout, t: Todo) => goto(Idle) using t.copy(queue = Vector.empty) } // unhandled elided ... whenUnhandled { // common code for both states case Event(Queue(obj), t @ Todo(_, v)) => goto(Active) using t.copy(queue = v :+ obj) case Event(e, s) => log.warning("received unhandled request {} in state {}/{}", e, stateName, s) stay } // transition onTransition { case Active -> Idle => stateData match { //stateData; nextStateData; Both old state data and new state data can be accessed to // Batch(queue) is sent to caller as an action case Todo(ref, queue) => ref ! Batch(queue); } } initialize() }
FSMDocSpec
package edu.kzk.actor.fsm import akka.actor.Props import scala.collection.immutable import akka.testkit.TestKit import akka.testkit.ImplicitSender import org.scalatest.BeforeAndAfterAll import akka.testkit.DefaultTimeout import org.junit.runner.RunWith import org.scalatest.Matchers import akka.actor.FSM import org.scalatest.WordSpecLike import akka.actor.ActorSystem import org.scalatest.junit.JUnitRunner import scala.collection.immutable.Seq import edu.kzk.actor.fsm.Buncher @RunWith(classOf[JUnitRunner]) class FSMDocSpec extends TestKit(ActorSystem("TestKitUsageSpec")) with DefaultTimeout with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { // fsm code elided ... "simple finite state machine" must { "demonstrate NullFunction" in { class A extends FSM[Int, Null] { val SomeState = 0 when(SomeState)(FSM.NullFunction) } } "batch correctly" in { val buncher = system.actorOf(Props[Buncher]) buncher ! SetTarget(testActor) buncher ! Queue(42) buncher ! Queue(43) println(expectMsg(Batch(Seq(42, 43)))) buncher ! Queue(44) buncher ! Flush buncher ! Queue(45) println(expectMsg(Batch(Seq(44)))) println(expectMsg(Batch(Seq(45)))) } "not batch if uninitialized" in { val buncher = system.actorOf(Props[Buncher]) buncher ! Queue(42) println(expectNoMsg) } } }
Akka Docにある例って,そのままだと動かない場合が多いので注意.
これは,環境を合わせれば動く.
古いversionの書き方のままなのか?