読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

怒涛のAkka: FSM

基本

Finate State Machineとは

  • 状態
  • イベント
  • アクション

のtripletで構成される.

ある状態の時にあるイベントが起こるとあるアクションをとって別状態 に移動するというマシン.ただし,とり得る状態は有限.

APIと設計

Event/State/Dataを定義する.

Event

  • case class Xxx(x, y): 引数ありの場合
  • case object: 引数なしの場合

State

  • sealed trait State
  • case object Xxx extnds State
  • case object Yyy extnds State

Data

  • sealed trait Data
  • case object Xxx extnds Data
  • case class Yyy(x, y) extnds Data

これはStateに付随するData.
Eventは

case class Event[D](event: Any, stateData: D) extends NoSerializationVerificationNeeded

のように定義されているので,EventクラスでevnetとDataが紐付けられているよう.

FSM

FSM[State, Data] というActorを継承したAPIが用意されている.
なので,FSMを継承したクラスを作り,次のメソッドを追加する.

startWith(State, Data)

初期状態とその時のデータを入れる.

when(State, FiniteDuration)(StateFunction)

StateFunctionがPartialFunctionになっているので,Stateの時に,他のStateが来た時の挙動をPartialFunctionで書く.

基本は,goto(State) or stayを書いてusing Dataと使いたいDataを書く.
e.g.,

goto(Xxx) using Yyy(x, y)
whenUnhandled(StateFunction)

whenで挙動な記述されてない時にこれが呼ばれる.
使い方は基本whenと同じ.

onTransition(transitionHandler)

ここにActionを書く.
transitionHandlerはPartialFunctionになっていて,ある状態->ある状態の変数と,actionを書く.

initialize()

initial stateへのtransition.

注意するのは,FSM ActorはSingle Threadで動くので,Eventのオーダーを気にする必要はないということ.

Sample

環境

Gradle

...
dependencies {
    compile group: 'commons-collections', name: 'commons-collections', version: '3.2'
	compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.3.9'
	compile group: 'com.typesafe.akka', name: 'akka-testkit_2.11', version: '2.3.9'
	compile group: 'org.scalatest', name: 'scalatest_2.11', version: '2.2.4'

    testCompile group: 'junit', name: 'junit', version: '4.+'
}
...

State.scala

package edu.kzk.actor.fsm

import akka.actor.ActorRef
import scala.collection.immutable.Seq

// states
sealed trait State
case object Idle extends State
case object Active extends State

sealed trait Data
case object Uninitialized extends Data
case class Todo(target: ActorRef, queue: Seq[Any]) extends Data

Event.scala

package edu.kzk.actor.fsm

import akka.actor.ActorRef
import scala.collection.immutable.Seq

// received events
case class SetTarget(ref: ActorRef)
case class Queue(obj: Any)
case object Flush

// sent events
case class Batch(obj: Seq[Any])

Buncher.scala

package edu.kzk.actor.fsm

import akka.actor.FSM
import akka.actor.FSM.Event
import scala.concurrent.duration._

class Buncher extends FSM[State, Data] {

  startWith(Idle, Uninitialized)

  when(Idle) {
    case Event(SetTarget(ref), Uninitialized) =>
      stay using Todo(ref, Vector.empty)
  }

  // transition elided ...
  when(Active, stateTimeout = 1 second) {
    case Event(Flush | StateTimeout, t: Todo) =>
      goto(Idle) using t.copy(queue = Vector.empty)
  }

  // unhandled elided ...
  whenUnhandled {
    // common code for both states
    case Event(Queue(obj), t @ Todo(_, v)) =>
      goto(Active) using t.copy(queue = v :+ obj)

    case Event(e, s) =>
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      stay
  }

  // transition
  onTransition {
    case Active -> Idle =>
      stateData match {
        //stateData; nextStateData; Both old state data and new state data can be accessed to
        // Batch(queue) is sent to caller as an action
        case Todo(ref, queue) => ref ! Batch(queue);
      }
  }

  initialize()
}

FSMDocSpec

package edu.kzk.actor.fsm

import akka.actor.Props
import scala.collection.immutable
import akka.testkit.TestKit
import akka.testkit.ImplicitSender
import org.scalatest.BeforeAndAfterAll
import akka.testkit.DefaultTimeout
import org.junit.runner.RunWith
import org.scalatest.Matchers
import akka.actor.FSM
import org.scalatest.WordSpecLike
import akka.actor.ActorSystem
import org.scalatest.junit.JUnitRunner
import scala.collection.immutable.Seq

import edu.kzk.actor.fsm.Buncher

@RunWith(classOf[JUnitRunner])
class FSMDocSpec extends TestKit(ActorSystem("TestKitUsageSpec")) with DefaultTimeout with ImplicitSender
    with WordSpecLike with Matchers with BeforeAndAfterAll {

  // fsm code elided ...

  "simple finite state machine" must {

    "demonstrate NullFunction" in {
      class A extends FSM[Int, Null] {
        val SomeState = 0
        when(SomeState)(FSM.NullFunction)
      }
    }

    "batch correctly" in {
      val buncher = system.actorOf(Props[Buncher])
      buncher ! SetTarget(testActor)
      buncher ! Queue(42)
      buncher ! Queue(43)
      println(expectMsg(Batch(Seq(42, 43))))
      buncher ! Queue(44)
      buncher ! Flush
      buncher ! Queue(45)
      println(expectMsg(Batch(Seq(44))))
      println(expectMsg(Batch(Seq(45))))
    }

    "not batch if uninitialized" in {
      val buncher = system.actorOf(Props[Buncher])
      buncher ! Queue(42)
      println(expectNoMsg)
    }
  }
}

Akka Docにある例って,そのままだと動かない場合が多いので注意.
これは,環境を合わせれば動く.
古いversionの書き方のままなのか?