読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

怒涛のAkka: Cluster Usage

この記事はAkka2.3.9なので注意.
Documentの通りにやる.

A Simple Cluster Example

Cluster extension を有効にするには次のapplication.confをclass path rootにおく.
src/main/resources/においておけばいい

applicatoin.conf
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
 
  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]
 
    auto-down-unreachable-after = 10s
  }
}
  • seed-nodeはactorがclusterに入るときのcontactポイント
  • 別のマシンでノードを起動させたいなら,localhostを別のhostname or ipaddrにする
SimpleClusterListener.scala
package sample.cluster.simple

import scala.annotation.varargs

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.InitialStateAsEvents
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.ClusterEvent.UnreachableMember

class SimpleClusterListener extends Actor with ActorLogging {

	val cluster = Cluster(context.system);

	// subscribe to cluster changes, re-subscribe when restart 
	override def preStart(): Unit = {
			//#subscribe
			cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
					classOf[MemberEvent], classOf[UnreachableMember])

					//#subscribe
	};

	override def postStop(): Unit = cluster.unsubscribe(self);

	def receive = {

	case MemberUp(member) =>
	log.info("Member is Up: {}", member.address)

	case UnreachableMember(member) =>
	log.info("Member detected as unreachable: {}", member)

	case MemberRemoved(member, previousStatus) =>
	log.info("Member is Removed: {} after {}",
			member.address, previousStatus);

	case _: MemberEvent => // ignore
	}
}

Run A Simple Cluster Example

上記のサンプルをどうやって動かすかというと,

  • activatorをDLして,
  • [www.typesafe.com/activator/template/akka-sample-cluster-scala?_ga=1.263330700.1035464810.1442845000:title=tutorial]に従え

らしい.akkaのサンプルは結構不親切だと思う.以下全体的にそう.
Activatorに付随しているTutorialを見ろということなのか.

確かに,上記だけだと,Appクラスがないが,
ここのPreview the tutorialをみると

Open SimpleClusterApp.scala.

というのがあり,その中身が,

SimpleClusterApp.scala.
package sample.cluster.simple

import com.typesafe.config.ConfigFactory

import akka.actor.ActorSystem
import akka.actor.Props

object SimpleClusterApp {
	def main(args: Array[String]): Unit = {
			if (args.isEmpty)
				startup(Seq("2551", "2552", "0"));
			else
				startup(args)
	}

	def startup(ports: Seq[String]): Unit = {
			ports foreach { port =>
        
			// Override the configuration of the port
			val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
			withFallback(ConfigFactory.load());

			// Create an Akka system
			val system = ActorSystem("ClusterSystem", config);

			// Create an actor that handles cluster domain events
			system.actorOf(Props[SimpleClusterListener], name = "clusterListener");
			}
	}

}

となっている.

SimpleClusterApp.scala, SimpleClusterListener.scala, and applicatoin.confを使って,EclipseからScala Applicationを実行すると,

stdout
[INFO] [09/23/2015 22:40:43.532] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Starting up...
[INFO] [09/23/2015 22:40:43.545] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Started up successfully
[INFO] [09/23/2015 22:40:43.553] [ClusterSystem-akka.actor.default-dispatcher-15] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [09/23/2015 22:40:43.564] [ClusterSystem-akka.actor.default-dispatcher-15] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Metrics collection has started successfully
[INFO] [09/23/2015 22:40:43.714] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#708058940] to Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#708058940] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/23/2015 22:40:43.719] [ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles []
[INFO] [09/23/2015 22:40:44.423] [ClusterSystem-akka.actor.default-dispatcher-15] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [09/23/2015 22:40:44.427] [ClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551
[INFO] [09/23/2015 22:40:48.794] [ClusterSystem-akka.actor.default-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:47614] is JOINING, roles []
[INFO] [09/23/2015 22:40:48.795] [ClusterSystem-akka.actor.default-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles []
[INFO] [09/23/2015 22:40:48.891] [ClusterSystem-akka.actor.default-dispatcher-19] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [09/23/2015 22:40:48.893] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:47614/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551
[INFO] [09/23/2015 22:40:48.891] [ClusterSystem-akka.actor.default-dispatcher-22] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [09/23/2015 22:40:48.894] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551
[INFO] [09/23/2015 22:40:49.415] [ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
[INFO] [09/23/2015 22:40:49.415] [ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:47614] to [Up]
...

な感じのログがconsoleに出力されて,状態遷移図のようにleader がnodeをupの状態にしていることがわかる.

Acvitorを入れると

<path to activator dir>/activator 
  "runMain sample.cluster.simple.SimpleClusterApp 2552"	

な感じで動かせるらしいが,これはmainを指定しているだけなので,jarにしてからmainとargsを指定でも同じことができる.projectからdistjarを作ってやって

gradle.build
apply plugin: 'java'
apply plugin: 'scala'
apply plugin: 'eclipse'
apply plugin: 'application'

sourceCompatibility = 1.7
version = '1.0'
mainClassName = "edu.kzk.akka.cluster.simple.SimpleClusterApp"

jar {
    manifest {
        attributes 'Implementation-Title': 'Gradle Quickstart', 'Implementation-Version': version
    }
}

repositories {
    mavenCentral()
}

dependencies {
    compile group: 'commons-collections', name: 'commons-collections', version: '3.2'
	compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.3.9'
	compile group: 'com.typesafe.akka', name: 'akka-testkit_2.11', version: '2.3.9'
	compile group: 'com.typesafe.akka', name: 'akka-cluster_2.11', version: '2.3.9'
	compile group: 'org.scalatest', name: 'scalatest_2.11', version: '2.2.4'

    testCompile group: 'junit', name: 'junit', version: '4.+'
}

test {
    systemProperties 'property': 'value'
}

uploadArchives {
    repositories {
       flatDir {
           dirs 'repos'
       }
    }
}

project 右クリ->Gradle Build... -> clean, build, distZipと設定してGradle Buildを実行.

  • build/distributions/akka_cluster_sample-1.0.zip

ができるので,

cd build/distributions
unzip akka_cluster_sample-1.0.zip
cd akka_cluster_sample-1.0.zip
bash bin/akka_cluster_sample

or

cd build/distributions
unzip akka_cluster_sample-1.0.zip
cd ../..
java -classpath "./build/distributions/akka_cluster_sample-1.0/lib/*" edu.kzk.akka.cluster.simple.SimpleClusterApp

ここにはportを引数として指定するやり方も書いてあって,terminal を3つ開いてそれぞれで,

* first terminal as a seed node
$ java -classpath "./build/distributions/akka_cluster_sample-1.0/lib/*" edu.kzk.akka.cluster.simple.SimpleClusterApp 2551

* second terminal as a seed node
$ java -classpath "./build/distributions/akka_cluster_sample-1.0/lib/*" edu.kzk.akka.cluster.simple.SimpleClusterApp 2552

* third terminal as a node
$ java -classpath "./build/distributions/akka_cluster_sample-1.0/lib/*" edu.kzk.akka.cluster.simple.SimpleClusterApp 0

順にやっていくと初めのターミナルでは

...
[INFO] [09/24/2015 00:05:04.844] [ClusterSystem-akka.actor.default-dispatcher-17] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles []
[INFO] [09/24/2015 00:05:05.839] [ClusterSystem-akka.actor.default-dispatcher-18] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [09/24/2015 00:05:05.854] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551
[INFO] [09/24/2015 00:05:22.947] [ClusterSystem-akka.actor.default-dispatcher-5] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles []
[INFO] [09/24/2015 00:05:23.825] [ClusterSystem-akka.actor.default-dispatcher-5] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
[INFO] [09/24/2015 00:05:23.826] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2552
[INFO] [09/24/2015 00:06:26.079] [ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:39986] is JOINING, roles []
[INFO] [09/24/2015 00:06:26.825] [ClusterSystem-akka.actor.default-dispatcher-22] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:39986] to [Up]
[INFO] [09/24/2015 00:06:26.826] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:39986

こんな感じのログがでていることがわかる.初めのノードがLeader nodeになっていると思われる.他のterminalでは"Leader is moving node [...] to [Up]"のログがでないので.


あるterminalでCtrl+cでプロセスを落とすと,Learder Nodeでの次のようなログがでる.

[WARN] [09/24/2015 00:10:17.622] [ClusterSystem-akka.remote.default-remote-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
[WARN] [09/24/2015 00:10:21.825] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/cluster/core/daemon] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up)]
[INFO] [09/24/2015 00:10:21.826] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member detected as unreachable: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up)
[WARN] [09/24/2015 00:10:22.863] [ClusterSystem-akka.remote.default-remote-dispatcher-23] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2552]].
[WARN] [09/24/2015 00:10:28.859] [ClusterSystem-akka.remote.default-remote-dispatcher-23] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2552]].
[INFO] [09/24/2015 00:10:31.844] [ClusterSystem-akka.actor.default-dispatcher-16] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is auto-downing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552]
[INFO] [09/24/2015 00:10:31.844] [ClusterSystem-akka.actor.default-dispatcher-21] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Marking unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552] as [Down]
[INFO] [09/24/2015 00:10:32.825] [ClusterSystem-akka.actor.default-dispatcher-22] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is removing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552]
[INFO] [09/24/2015 00:10:32.826] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Removed: akka.tcp://ClusterSystem@127.0.0.1:2552 after Down

状態遷移図の通りCtrl+cで落ちたノードの状態は,Unreachable -> Down -> Removedとなっている.

Joining to Seed Nodes

  • クラスターに参加するかは手動/自動どちらもコンフィグによるべき
  • 新しいノードは,すべてのシードノードにmessageを送る
  • 一番初めに答えたノードに,join commandを送る
  • だれも答えない場合は,成功するまでか自分が落ちる(shutdowned)まで,ずっと繰り返す
  • seed node のconfigはapplication.confでもJava system propertiesでも設定可能
  • applicaton.conf
akka {
   ...
  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]
 
    auto-down-unreachable-after = 10s
  }
  ...
}

URIは{akka.tcp|akka.udp}://{actor system name}@{ip}:{port}になる.

  • system property
-Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552
-Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host2:2552
  • seed nodesはどの順でスタートしてもいい
  • すべてのseed nodesがスタートしなくてもいい
  • ただし,seed-nodes configuration listの初めのノードはスタートしないといない
  • そうでないと,他のseed nodesが初期化されず,他のノードはクラスターにジョインできない
  • この特別な初期ノードはからクラスターがスタートする際の,separated islands (特に説明がないが,leaderが2以上になることか?)を避けるため
  • 2つ以上のノードが起動してからは,特別な初期ノードを落としてもいい
  • seed nodesをスタートするのは非常に早い,失敗したらseed-node-timeoutだけ次のリクエストを待つ
  • seed nodesの設定を指定していないと programmatically or manuallyにnodeをクラスターに参加させる必要がある
  • Cluster.get(system).join
  • Cluster(system).joinSeedNodes (外部API/toolのときに良いらしい)
  • joinが失敗したら,retry-unsuccessful-join-afterだけ待ってリトライ
  • retryはoffにもできる
  • actor systemが一度clusterに入ったら,リスタートしないと再度,同じまたは別のクラスターには入れないので注意

Automatic vs. Manual Downing

Uncreachable -> Downの状態変化の話

  • デフォルトではUncreachable -> Downは自動ではならない
  • Cluster(system).down(address)を自分で呼ぶ
  • applicatoin.confで設定するとleaderが状態変化をおこなう
akka {
  ...
  cluster {
    akka.cluster.auto-down-unreachable-after = 120s
  }
  ...
}
  • auto-downはネットワーク分断が合った時に2つの切断されたクラスタができてしまうことを暗示しているので,気をつけること(それで良い場合も,良くない場合もあってそれは各自アプリケーション特有の問題)

Leaving

clusterからメンバを取り除く話.2通りのやり方がある.

  • JVMのプロセスを止めて,auto-down or manually downを行う
  • Cluster(system).leave(address)

Subscribe to Cluster Event

Cluster Eventを購読する話

クラスタメンバの変更通知を購読するには,Cluster(system).subscribeをつかう

cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
  • akka.cluster.ClusterEvent.CurrentClusterStateが一番初めのメッセージになる
  • CurrentClusterStateはクラスタ状態のsnapshot
case class CurrentClusterState(
    members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
    unreachable: Set[Member] = Set.empty,
    seenBy: Set[Address] = Set.empty,
    leader: Option[Address] = None,
    roleLeaderMap: Map[String, Option[Address]] = Map.empty) {
...
  • 参加が完了しない前に購読を始めると,空の,どのメンバも含んでいない,CurrentClusterStateが通知される
  • これは期待される振る舞い
  • nodeがclusterにjoinするのが許可されると,MemberUpメッセージを受け取る
  • CurrentClusterStateをハンドルしたくないなら,ClusterEvent.InitialStateAsEventsをCluster(system).subscribeのパラメータに入れる
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
  classOf[MemberEvent], classOf[UnreachableMember])
  • メンバライフサイクルをトラックするイベント("xxx"は状態)
    • ClusterEvent.MemberUp: 新しいメンバがクラスタにジョインして"Up"になった
    • ClusterEvent.MemberExited : メンバがクラスタから去った, そのメンバの状態は"Exiting". そのノードは,すでに落ちているかもしれない
    • ClusterEvent.MemberRemoved: メンバがクラスタから完全に除去された"Removed"
    • ClusterEvent.UnreachableMember: 少なくとの1つのFailure Detectorから,メンバが"Unreachable"とされた.
    • ClusterEvent.ReachableMember: メンバが"Unreachable"から"Reachable"になった
    • その他: akka.cluster.ClusterEvent.ClusterDomainEventに書いてある
  • 購読でなくて,クラスタのイベントを取得したいならCluster(system).stateを使うが,他のEventsとsyncしているわけではないので注意

Worker Dial-in Example

  • worker: backend, masterを見つけて,masterに登録する
  • master: frontend

想定するのは,一般的なロードバランシング.text transformation jobをmasterが受け取ってそのjobをworkerに委譲する.workerはjobを実行してoriginal clientに結果を返す.新しいworker, およびmasterは動的にクラスタに追加,削除される.

以下サンプル.Documentに書いてあるコードだけだと不十分なので,activatorを入れた方がいい.理解が進むので入れた方がいい.ただし必ずしも必要ではない.
使うのであれば,

  • ここからactivatorをDLしてきて,untar
  • ./activator ui で起動,
  • akka-sample-cluster-scala のtutrialを開いて
  • Code -> src -> mainと辿って行くとsampleがある

ここに書いてあるサンプルをDL/C&Pするでこと足りるよう.

コード

ざっくり何をやっているかというと

  • TransformationApp
    • seed nodesとしてFrontend, Backendを1つづつ作成
    • Backend*2つくって,Frontend*1を作成
  • TransformationFrontend
    • Jobが来て,backends: IndexedSeqが空なら失敗
    • Jobが来たら,jobcounter++して,backendに委譲
    • BackendRegistrationが来て,backends: IndexedSeqにsender(=backend)が入っていない場合は登録
    • Terminateメッセージが来た場合は,backendsからsender(=backend)を除く
    • main
      • port, config, actor systemして
      • system.actorOf()でfrontend actorを作る
      • system.schduler.scheduleで定期的にjobを発行
  • TransformationBackend
    • cluseter objectを持っているのはこの人
    • TransformationJobが来たら,textをupper caseにする(=job)
    • CurrentClusterStateが来たら,クラスタメンバを見て,UP状態だったら, BackendRegistration messagをFrontend Actorに投げる
    • MemberUpメッセージが来たら,register.このメッセージを購読していないと,backendが登録されない場合がある
    • registerはメンバのロールがfrontedだった場合にbackendを登録するメッセージをfrontendに投げる
    • main
      • port, config, actor systemして
      • system.actorOf()でbackend actorを作る
実行

A Simple Cluster Exampleと同じでAppクラスを実行することもできるし,別のJVMで起動することも可能.

注意
  • Death watchはcluster failure detectorを使う
  • 監視されているActorのgraceful terminationに加えて,Death watchがnetwork failures and JVM crashesを発見する (よくわからん)
  • UnreachableクラスタノードがDownしてremotedになったら,Death watchがwatchしているactorにTerminated messageを送る

Node Roles

  • すべてのノードがロールを持つ必要はない
  • ロールの例
    • web front-end
    • data access
    • 複雑な計算
  • cluster-aware routersはロールを考慮することが可能
  • akka.cluster.rolesがあるので,設定ファイルで設定するか,actor systemを作るときにconfig objectに渡す
  • Member.roles: Set[String])で入っているので,member objectからアクセスできる

How To Startup when Cluster Size Reached

クラスターがある特定条件を満たしたら,アクターをスタートさせたい場合がある
例えば,

  • cluster has been initialized
  • members have joined
  • the cluster has reached a certain size

になった場合など.

leaderがメンバーの状態を"Joining" -> "Up"にするまでに必要なメンバの数はconfigで指定可能

    akka.cluster.min-nr-of-members = 3
ロール別にも可能
akka.cluster.role {
  frontend.min-nr-of-members = 1 # role-name=frontend
  backend.min-nr-of-members = 2 # role-name=backend
  #<role-name>.min-nr-of-members = 1
}

典型例ではCluster.registerOnMemberUpのコールバックと一緒に使ってこのコールバックの中でアクターを作る

Cluster.registerOnMemberUpコールバックの例
 Cluster(system) registerOnMemberUp {
   system.actorOf(Props(classOf[FactorialFrontend], upToN, true),
     name = "factorialFrontend")
 }

Contribution

この変からContribution Modulesが入るが,

If a contributions turns out to not “take off” it may be removed again at a later time.

らしいので,実際に使用する場合は気をつける

Cluster Singleton

Singleton Actorを作りたい場合がある.
member eventsを購読して,特別なケースを考慮すればいいのだが,
Cluster Singletonを使った方がいい.

Cluster Sharding

Shardingの話.
Cluster Shardingによると,

Cluster sharding is typically used when you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors it might be easier to run them on a Cluster Singleton node.

らしい.

Distributed Publish Subscribe

一般的なpub/subを実現するDistributed Publish Subscribe

メッセージを送る側(publisher)は,アクターが動いている(最終的な)目的地を意識しなくていい.

Cluster Client

Cluster Clientモジュールがある.

自分でclientを作るには,TransformationBackend#registerを見る感じだと,エントリポイントがわかっていれば,

context.actorSelection(RootActorPath(member.address) / "user" / "frontend")

で取得したActorSelectionにメセージを送ればいいと思われる.

Cluster Clientモジュールを見ている感じだと,同じようなことをやっているはず.

StatsSampleClient.scalaをみるとなんとなくクライアントの書き方がわかる.

Failure Detector

説明

  • clusterではノードは他のノード(default maxで5)にmoniteringされている
  • あるノードがunreachableとなったらgossip protocolで広がっていく
  • 1つのノードがunreachable nodeをunreachableとするのみで十分ということ
  • system messagesがノードに届かない場合に,ノードが隔離される
  • そのsystem messagesは,watch, Terminated, remote actor deployment, failures of actors supervised by remote parentなど
  • その後,down or removedになる
  • nodeはheartbeatを送り合って,互いにモニタリングしている
  • heartbeat到達時間はPhi Accrual Failure Detectorによって解釈される
  • suspicion level of failureはphi値によって与えられる
phi = -log10(1 - F(timeSinceLastHeartbeat))
  • Fは累積正規分布で平均と分散は過去のheartbeat間隔から計算される
  • phi値 がthresholdを超えた時にfailureとなるのをakka.cluster.failure-detector.thresholdで指定可能
  • GCとかnetwork failureとかのためにマージンを設けることも可能,akka.cluster.failure-detector.acceptable-heartbeat-pauseで設定可能
  • システムオーバーロードでないのに,false positivesになったら, クラスターのための別のdispatcherを定義すべき

Thresholdの参考値

  • defaultは8
  • 基本はこれでOK
  • low thresholdは,false-positiveになるけど,detectionは早くて本当にcrashしている時に早くわかる
  • low thresholdは,false-positiveを小さくするけれど,本当のcrashを発見するのに時間がかかる
  • Cloud環境 (EC2)などはthreshold=12くらいにしたほうがいい.networkの問題がたまに起こるので.

akka.cluster.failure-detector.acceptable-heartbeat-pause

Cluster Aware Routers

Routerはクラスターノードawareで作られている(defaultはoffのよう).ノードがunchreable or leave cluster担ったら,そのノードのRouteeもそうなるし,unreachable -> reachableでも同じ.また,新しいノードがクラスターに参加したら,そのノードのrouteeもルーターに追加される.

ルータには次の2種類がある

  • Group
    • メッセージはactor selctionを使って,特定のパスに送る
    • routeeは別ノードのrouterでシェアできる
    • backend nodeで動いているサービス向きでfrontendにいるroutersに使われる
  • Pool
    • routeeを子アクターとして作り,リモートノードに配置する
    • routeeインスタンスを持っているということ
    • 3 nodes in 10 nodes clusterでrouterを作成すると(configによるが)30 routee作られる. 書くルーターが1routee/1node作るということ
    • 異なるrouterで作成されたrouteeはrouter間でシェアされない
    • single master向き.masterはjobを別ノード上のrouteeに委譲する

Router with Group of Routees

  • routeeアクターはクラスタメンバとしてスタートさせないとならない
  • routerで行うのではない
  • routee actorsはactor systemを起動したらすぐにスタートすること
  • routeeのlookupをuse-roleでfilterすることも可能
  • routeeの設定は,configでもコードでもできる (以下,全部そうだが,コードのサンプルは割愛する.ドキュメントには載っている)

Router Example with Group of Routees

サンプルは,text中に出てくるwordsの平均文字数を数えるタスク

コード (一度読んだほうが理解が深まる)
  • Actors(ここにある)
    • StatsService: routerを持っていて,jobが来たら,aggregatorを作って,jobと一緒に,routerに渡す
    • StatsAggregator: word lenghtのカウント, word数 in textの数のタスクが集まるまではresultを返さない
    • StatsWorker: routeeになる,キャッシュから引いてきているけれど,このタスクなら,word.lengthしたほうが早いかも
    • StatsSampleClient: クライアント,2秒に1回 tick taskを送っている.tickタスクでStatsJobがキックされる
  • config (ここにある)
    • application.conf
    • stat1.conf
  • main
    • StatsSample: StatsWorker, StatsServiceを含むactor systemを作って起動し,StatsSampleClientを含むactor systemを作る(StatsSampleClientはobjectもあるので注意)

ConsistentHashableEnvelope(word, word)をメッセージにworkerに送っているが,ソースをみると,ConsistentHashableEnvelope(message: Any, hashKey: Any)になっているので,2nd paramでどのルータに振るか決めて,1ast paramをメッセージにするんだと思う

ConfigFactory.load("stats1")でstats1.confを読んで,stats1.confの中でinclude "aplication"している.すなわち,confはapplication.conf, reference.confでなくてもいい.ConfigFactory.loadで指定すればOK.

Router with Pool of Remote Deployed Routees

どうもContribution Moduleの Cluster Singletonを使っているよう.single master nodeを作るのに必要のよう.pool routerだからというわけではなく,single master node上でやって見ようという感じ.

基本的にRouter Example with Group of Routeesと同じだけれども

コード

が異なる.dependencyは,build.gradleを参考に.

StatsSampleOneMaster
  def startup(ports: Seq[String]): Unit = {
    ports foreach { port =>
     ...
     //#create-singleton-manager
      system.actorOf(ClusterSingletonManager.props(
        singletonProps = Props[StatsService], singletonName = "statsService",
        terminationMessage = PoisonPill, role = Some("compute")),
        name = "singleton")
      //#create-singleton-manager

      //#singleton-proxy
      system.actorOf(ClusterSingletonProxy.props(singletonPath = "/user/singleton/statsService",
        role = Some("compute")), name = "statsServiceProxy")
      //#singleton-proxy
    ...

ClusterSingletonManagerはsingleton actorを作って,その子として,statsServiceを作っている感じ.
ClusterSingletonProxyは,statsServiceProxyを作って,/user/singleton/statsServiceへのproxyとしているよう.
なので,StatsSampleOneMasterClientはStatsSampleClientを作って,"/user/statsServiceProxy"を引数に渡している.

Cluster Metrics

metricsもわかる.基本的には,load-balancing routerに使用される.

Hyperic Sigar

built-in metricsはJMX MBeansで集計される様だけど,Hyperic Sigar (VMwareのプロダクトの様)をつかってもいいらしい,SigarはOSのライブラリを使っている.

Adaptive Load Balancing

AdaptiveLoadBalancingPool / AdaptiveLoadBalancingGroupが,cluster metrics dataに基づいて,メッセージのload balancingをする

ロードバランシングの基準があって,

  • heap: JVMのheap使用量に基づく
  • load: topで見えるload averageに基づく
  • cpu: User + Sys + Nice + Waitの使用時間に基づく
  • mix: 上記すべてに基づく

集計された値は,exponential weighted averagされる.

サンプルは割愛

Subscribe to Metrics Events/Custom Metrics Collector

MetricsListenerのサンプルが載っているが,割愛.

自作も可能ということ.

  • MetricsListenerのサンプル
  • akka.cluster.SigarMetricsCollector
  • akka.cluster.JmxMetricsCollector

のインプリみてインパイアーされてくれと書いてある.

How to Test

Multi Node TestingMulti JVM Testingを使う.

Multi JVM Testingはsbtのpluginなのでsbtを使っていない人にはつかない感じ.しかし,まともに使おうとしたら,テストはできないと困る.なので,Multi Node Testingだけで頑張ってみる.

TransformationSampleSpecのみ行う

Original TransformationSampleSpec.scalaとの差分

MultiJvmNodeXがいらない

//class TransformationSampleSpecMultiJvmNode1 extends TransformationSampleSpec
//class TransformationSampleSpecMultiJvmNode2 extends TransformationSampleSpec
//class TransformationSampleSpecMultiJvmNode3 extends TransformationSampleSpec
//class TransformationSampleSpecMultiJvmNode4 extends TransformationSampleSpec
//class TransformationSampleSpecMultiJvmNode5 extends TransformationSampleSpec

JUnitで実行

@RunWith(classOf[JUnitRunner])
class TransformationSampleSpec extends MultiNodeSpec(TransformationSampleSpecConfig) 
    with WordSpecLike with Matchers with BeforeAndAfterAll with ImplicitSender {

system propertyの設定が必要なよう

  • multinode.max-nodes': '10'
  • multinode.host': 'localhost'
  • multinode.port': '0'
  • multinode.server-host': 'localhost'
  • multinode.index': '0'

タイムアウトなどが出たらmulti-node configを設定する.

それでもエラー

Caused by: java.lang.RuntimeException: unexpected transition: Transition(Actor[akka://TransformationSampleSpec/user/TestConductorClient#-484425751],Connecting,Failed)

ここによると,sbtMultiJvm pluginによって,multinode.indexは,自動的に付加されるらしく,sbtMultiJvm pluginを使わずにbuildしたいときには,自分でスクリプトを書かないと行けないらしい.なので,sbtテストしたほうがいい.

activator経由で生成したサンプルプロジェクトで

sbt multiple-jvm:test

で実行され,stdoutに

...
[JVM-5] All tests passed.
[JVM-4] Run completed in 33 seconds, 279 milliseconds.
[JVM-4] Total number of tests run: 3
[JVM-4] Suites: completed 1, aborted 0
[JVM-4] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[JVM-4] All tests passed.
[info] Passed: Total 5, Failed 0, Errors 0, Passed 5
[success] Total time: 37 s, completed 2015/10/04 2:05:12

な感じのログが出る.
akka clusterをやるなら,sbtを使ったほうがいいということ.

JMX

JConsole or JVisualVMなどを使ってクラスターの情報がわかるらしい.
root名はakka.Cluster. (未検証)

Command Line Management

Akka distributionのbin/akka-clusterがCommand Line Managementツールらしい.

ツールを使う前に,ActorSystemを起動する際に,jmxremote.portを指定しないといけない

 java -Dcom.sun.management.jmxremote.port=9999 \
    -Dcom.sun.management.jmxremote.authenticate=false \
    -Dcom.sun.management.jmxremote.ssl=false

Configuration

いろいろconfigがあるので,これをみろという話.

Especially the heartbeating actors that is used for failure detection can generate false positives if they are not given a chance to run at regular intervals. For this purpose you can define a separate dispatcher to be used for the cluster actors:

ハートビートアクターはfailure detectionに使われるので,アクターシステムに負荷がかかって,実行されずにいると,failure detectionでfalse positiveが起こる可能性がある.そうしないために別のdispatcherを割り当てられる

  • conf 設定例
akka.cluster.use-dispatcher = cluster-dispatcher
 
cluster-dispatcher {
  type = "Dispatcher"
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 5
    parallelism-max = 5
  }
}

Dispatcherのthread数を5より小さくしないこと.Deadlockが起きる可能性がある.2.4からは大丈夫.

全体所感

  • クラスターはP2Pなのに,どうやって,IPが異なるノードがクラスタに参加できるのかと疑問に思っていたが,seed-nodesさえ指定すれば,それがクラスタへのエントリポイントになるので,seed-nodesが立ち上がっていたら,自動的にクラスタに参加できるはず.なので,seed-nodesは落ちないようにしないと行けないと思う.
  • プロダクションでは,基本的に1Actor Sytem/1JVM/1Node/1BMor1VMを動かすべき.
  • configを使ったrouterは,confggとコードでcouplingしやすい気がする.(すくなくともサンプルはcouplingしている)
  • akkaのドキュメントは,全体的に説明が不十分な気がする.
  • テストがわかりづらい.

これで,クラスタを使えるようになったつもりになれた.ちゃんと読めばそこまで難しくない.

Note

Akka 2.4.0がstableになっていた.

以下の項目が充実されている.
Networkingに関しては,今までcontributionであったのが,distributionに加わっている.大分充実されたように感じる.

Actor
  • Persistence
  • Persistence - Schema Evolution
  • Persistence Query
  • Persistence Query for LevelDB
Networking
  • Cluster Singleton
  • Distributed Publish Subscribe in Cluster
  • Cluster Client
  • Cluster Sharding
  • Cluster Metrics Extension
  • Distributed Data