怒涛のAkka: Cluster Usage
この記事はAkka2.3.9なので注意.
Documentの通りにやる.
A Simple Cluster Example
Cluster extension を有効にするには次のapplication.confをclass path rootにおく.
src/main/resources/においておけばいい
applicatoin.conf
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] auto-down-unreachable-after = 10s } }
- seed-nodeはactorがclusterに入るときのcontactポイント
- 別のマシンでノードを起動させたいなら,localhostを別のhostname or ipaddrにする
SimpleClusterListener.scala
package sample.cluster.simple import scala.annotation.varargs import akka.actor.Actor import akka.actor.ActorLogging import akka.cluster.Cluster import akka.cluster.ClusterEvent.InitialStateAsEvents import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.UnreachableMember class SimpleClusterListener extends Actor with ActorLogging { val cluster = Cluster(context.system); // subscribe to cluster changes, re-subscribe when restart override def preStart(): Unit = { //#subscribe cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) //#subscribe }; override def postStop(): Unit = cluster.unsubscribe(self); def receive = { case MemberUp(member) => log.info("Member is Up: {}", member.address) case UnreachableMember(member) => log.info("Member detected as unreachable: {}", member) case MemberRemoved(member, previousStatus) => log.info("Member is Removed: {} after {}", member.address, previousStatus); case _: MemberEvent => // ignore } }
Run A Simple Cluster Example
上記のサンプルをどうやって動かすかというと,
- activatorをDLして,
- [www.typesafe.com/activator/template/akka-sample-cluster-scala?_ga=1.263330700.1035464810.1442845000:title=tutorial]に従え
らしい.akkaのサンプルは結構不親切だと思う.以下全体的にそう.
Activatorに付随しているTutorialを見ろということなのか.
確かに,上記だけだと,Appクラスがないが,
ここのPreview the tutorialをみると
Open SimpleClusterApp.scala.
というのがあり,その中身が,
SimpleClusterApp.scala.
package sample.cluster.simple import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem import akka.actor.Props object SimpleClusterApp { def main(args: Array[String]): Unit = { if (args.isEmpty) startup(Seq("2551", "2552", "0")); else startup(args) } def startup(ports: Seq[String]): Unit = { ports foreach { port => // Override the configuration of the port val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). withFallback(ConfigFactory.load()); // Create an Akka system val system = ActorSystem("ClusterSystem", config); // Create an actor that handles cluster domain events system.actorOf(Props[SimpleClusterListener], name = "clusterListener"); } } }
となっている.
SimpleClusterApp.scala, SimpleClusterListener.scala, and applicatoin.confを使って,EclipseからScala Applicationを実行すると,
stdout
[INFO] [09/23/2015 22:40:43.532] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Starting up... [INFO] [09/23/2015 22:40:43.545] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Started up successfully [INFO] [09/23/2015 22:40:43.553] [ClusterSystem-akka.actor.default-dispatcher-15] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar [INFO] [09/23/2015 22:40:43.564] [ClusterSystem-akka.actor.default-dispatcher-15] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Metrics collection has started successfully [INFO] [09/23/2015 22:40:43.714] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#708058940] to Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#708058940] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [09/23/2015 22:40:43.719] [ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles [] [INFO] [09/23/2015 22:40:44.423] [ClusterSystem-akka.actor.default-dispatcher-15] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up] [INFO] [09/23/2015 22:40:44.427] [ClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551 [INFO] [09/23/2015 22:40:48.794] [ClusterSystem-akka.actor.default-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:47614] is JOINING, roles [] [INFO] [09/23/2015 22:40:48.795] [ClusterSystem-akka.actor.default-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles [] [INFO] [09/23/2015 22:40:48.891] [ClusterSystem-akka.actor.default-dispatcher-19] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:47614] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551] [INFO] [09/23/2015 22:40:48.893] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:47614/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551 [INFO] [09/23/2015 22:40:48.891] [ClusterSystem-akka.actor.default-dispatcher-22] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551] [INFO] [09/23/2015 22:40:48.894] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551 [INFO] [09/23/2015 22:40:49.415] [ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up] [INFO] [09/23/2015 22:40:49.415] [ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:47614] to [Up] ...
な感じのログがconsoleに出力されて,状態遷移図のようにleader がnodeをupの状態にしていることがわかる.
Acvitorを入れると
<path to activator dir>/activator "runMain sample.cluster.simple.SimpleClusterApp 2552"
な感じで動かせるらしいが,これはmainを指定しているだけなので,jarにしてからmainとargsを指定でも同じことができる.projectからdistjarを作ってやって
gradle.build
apply plugin: 'java' apply plugin: 'scala' apply plugin: 'eclipse' apply plugin: 'application' sourceCompatibility = 1.7 version = '1.0' mainClassName = "edu.kzk.akka.cluster.simple.SimpleClusterApp" jar { manifest { attributes 'Implementation-Title': 'Gradle Quickstart', 'Implementation-Version': version } } repositories { mavenCentral() } dependencies { compile group: 'commons-collections', name: 'commons-collections', version: '3.2' compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.3.9' compile group: 'com.typesafe.akka', name: 'akka-testkit_2.11', version: '2.3.9' compile group: 'com.typesafe.akka', name: 'akka-cluster_2.11', version: '2.3.9' compile group: 'org.scalatest', name: 'scalatest_2.11', version: '2.2.4' testCompile group: 'junit', name: 'junit', version: '4.+' } test { systemProperties 'property': 'value' } uploadArchives { repositories { flatDir { dirs 'repos' } } }
project 右クリ->Gradle Build... -> clean, build, distZipと設定してGradle Buildを実行.
- build/distributions/akka_cluster_sample-1.0.zip
ができるので,
cd build/distributions unzip akka_cluster_sample-1.0.zip cd akka_cluster_sample-1.0.zip bash bin/akka_cluster_sample
or
cd build/distributions unzip akka_cluster_sample-1.0.zip cd ../.. java -classpath "./build/distributions/akka_cluster_sample-1.0/lib/*" edu.kzk.akka.cluster.simple.SimpleClusterApp
ここにはportを引数として指定するやり方も書いてあって,terminal を3つ開いてそれぞれで,
* first terminal as a seed node $ java -classpath "./build/distributions/akka_cluster_sample-1.0/lib/*" edu.kzk.akka.cluster.simple.SimpleClusterApp 2551 * second terminal as a seed node $ java -classpath "./build/distributions/akka_cluster_sample-1.0/lib/*" edu.kzk.akka.cluster.simple.SimpleClusterApp 2552 * third terminal as a node $ java -classpath "./build/distributions/akka_cluster_sample-1.0/lib/*" edu.kzk.akka.cluster.simple.SimpleClusterApp 0
順にやっていくと初めのターミナルでは
... [INFO] [09/24/2015 00:05:04.844] [ClusterSystem-akka.actor.default-dispatcher-17] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles [] [INFO] [09/24/2015 00:05:05.839] [ClusterSystem-akka.actor.default-dispatcher-18] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up] [INFO] [09/24/2015 00:05:05.854] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551 [INFO] [09/24/2015 00:05:22.947] [ClusterSystem-akka.actor.default-dispatcher-5] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles [] [INFO] [09/24/2015 00:05:23.825] [ClusterSystem-akka.actor.default-dispatcher-5] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up] [INFO] [09/24/2015 00:05:23.826] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2552 [INFO] [09/24/2015 00:06:26.079] [ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:39986] is JOINING, roles [] [INFO] [09/24/2015 00:06:26.825] [ClusterSystem-akka.actor.default-dispatcher-22] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:39986] to [Up] [INFO] [09/24/2015 00:06:26.826] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:39986
こんな感じのログがでていることがわかる.初めのノードがLeader nodeになっていると思われる.他のterminalでは"Leader is moving node [...] to [Up]"のログがでないので.
あるterminalでCtrl+cでプロセスを落とすと,Learder Nodeでの次のようなログがでる.
[WARN] [09/24/2015 00:10:17.622] [ClusterSystem-akka.remote.default-remote-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. [WARN] [09/24/2015 00:10:21.825] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/cluster/core/daemon] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up)] [INFO] [09/24/2015 00:10:21.826] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member detected as unreachable: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up) [WARN] [09/24/2015 00:10:22.863] [ClusterSystem-akka.remote.default-remote-dispatcher-23] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2552]]. [WARN] [09/24/2015 00:10:28.859] [ClusterSystem-akka.remote.default-remote-dispatcher-23] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2552]]. [INFO] [09/24/2015 00:10:31.844] [ClusterSystem-akka.actor.default-dispatcher-16] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is auto-downing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552] [INFO] [09/24/2015 00:10:31.844] [ClusterSystem-akka.actor.default-dispatcher-21] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Marking unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552] as [Down] [INFO] [09/24/2015 00:10:32.825] [ClusterSystem-akka.actor.default-dispatcher-22] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is removing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552] [INFO] [09/24/2015 00:10:32.826] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Removed: akka.tcp://ClusterSystem@127.0.0.1:2552 after Down
状態遷移図の通りCtrl+cで落ちたノードの状態は,Unreachable -> Down -> Removedとなっている.
Joining to Seed Nodes
- クラスターに参加するかは手動/自動どちらもコンフィグによるべき
- 新しいノードは,すべてのシードノードにmessageを送る
- 一番初めに答えたノードに,join commandを送る
- だれも答えない場合は,成功するまでか自分が落ちる(shutdowned)まで,ずっと繰り返す
- seed node のconfigはapplication.confでもJava system propertiesでも設定可能
- applicaton.conf
akka { ... cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] auto-down-unreachable-after = 10s } ... }
URIは{akka.tcp|akka.udp}://{actor system name}@{ip}:{port}になる.
- system property
-Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552 -Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host2:2552
- seed nodesはどの順でスタートしてもいい
- すべてのseed nodesがスタートしなくてもいい
- ただし,seed-nodes configuration listの初めのノードはスタートしないといない
- そうでないと,他のseed nodesが初期化されず,他のノードはクラスターにジョインできない
- この特別な初期ノードはからクラスターがスタートする際の,separated islands (特に説明がないが,leaderが2以上になることか?)を避けるため
- 2つ以上のノードが起動してからは,特別な初期ノードを落としてもいい
- seed nodesをスタートするのは非常に早い,失敗したらseed-node-timeoutだけ次のリクエストを待つ
- seed nodesの設定を指定していないと programmatically or manuallyにnodeをクラスターに参加させる必要がある
- Cluster.get(system).join
- Cluster(system).joinSeedNodes (外部API/toolのときに良いらしい)
- joinが失敗したら,retry-unsuccessful-join-afterだけ待ってリトライ
- retryはoffにもできる
- actor systemが一度clusterに入ったら,リスタートしないと再度,同じまたは別のクラスターには入れないので注意
Automatic vs. Manual Downing
Uncreachable -> Downの状態変化の話
- デフォルトではUncreachable -> Downは自動ではならない
- Cluster(system).down(address)を自分で呼ぶ
- applicatoin.confで設定するとleaderが状態変化をおこなう
akka { ... cluster { akka.cluster.auto-down-unreachable-after = 120s } ... }
- auto-downはネットワーク分断が合った時に2つの切断されたクラスタができてしまうことを暗示しているので,気をつけること(それで良い場合も,良くない場合もあってそれは各自アプリケーション特有の問題)
Leaving
clusterからメンバを取り除く話.2通りのやり方がある.
- JVMのプロセスを止めて,auto-down or manually downを行う
- Cluster(system).leave(address)
Subscribe to Cluster Event
Cluster Eventを購読する話
クラスタメンバの変更通知を購読するには,Cluster(system).subscribeをつかう
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
- akka.cluster.ClusterEvent.CurrentClusterStateが一番初めのメッセージになる
- CurrentClusterStateはクラスタ状態のsnapshot
case class CurrentClusterState( members: immutable.SortedSet[Member] = immutable.SortedSet.empty, unreachable: Set[Member] = Set.empty, seenBy: Set[Address] = Set.empty, leader: Option[Address] = None, roleLeaderMap: Map[String, Option[Address]] = Map.empty) { ...
- 参加が完了しない前に購読を始めると,空の,どのメンバも含んでいない,CurrentClusterStateが通知される
- これは期待される振る舞い
- nodeがclusterにjoinするのが許可されると,MemberUpメッセージを受け取る
- CurrentClusterStateをハンドルしたくないなら,ClusterEvent.InitialStateAsEventsをCluster(system).subscribeのパラメータに入れる
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
- メンバライフサイクルをトラックするイベント("xxx"は状態)
- ClusterEvent.MemberUp: 新しいメンバがクラスタにジョインして"Up"になった
- ClusterEvent.MemberExited : メンバがクラスタから去った, そのメンバの状態は"Exiting". そのノードは,すでに落ちているかもしれない
- ClusterEvent.MemberRemoved: メンバがクラスタから完全に除去された"Removed"
- ClusterEvent.UnreachableMember: 少なくとの1つのFailure Detectorから,メンバが"Unreachable"とされた.
- ClusterEvent.ReachableMember: メンバが"Unreachable"から"Reachable"になった
- その他: akka.cluster.ClusterEvent.ClusterDomainEventに書いてある
- 購読でなくて,クラスタのイベントを取得したいならCluster(system).stateを使うが,他のEventsとsyncしているわけではないので注意
Worker Dial-in Example
- worker: backend, masterを見つけて,masterに登録する
- master: frontend
想定するのは,一般的なロードバランシング.text transformation jobをmasterが受け取ってそのjobをworkerに委譲する.workerはjobを実行してoriginal clientに結果を返す.新しいworker, およびmasterは動的にクラスタに追加,削除される.
以下サンプル.Documentに書いてあるコードだけだと不十分なので,activatorを入れた方がいい.理解が進むので入れた方がいい.ただし必ずしも必要ではない.
使うのであれば,
- ここからactivatorをDLしてきて,untar
- ./activator ui で起動,
- akka-sample-cluster-scala のtutrialを開いて
- Code -> src -> mainと辿って行くとsampleがある
ここに書いてあるサンプルをDL/C&Pするでこと足りるよう.
コード
ざっくり何をやっているかというと
- TransformationApp
- seed nodesとしてFrontend, Backendを1つづつ作成
- Backend*2つくって,Frontend*1を作成
- TransformationFrontend
- Jobが来て,backends: IndexedSeqが空なら失敗
- Jobが来たら,jobcounter++して,backendに委譲
- BackendRegistrationが来て,backends: IndexedSeqにsender(=backend)が入っていない場合は登録
- Terminateメッセージが来た場合は,backendsからsender(=backend)を除く
- main
- port, config, actor systemして
- system.actorOf()でfrontend actorを作る
- system.schduler.scheduleで定期的にjobを発行
- TransformationBackend
- cluseter objectを持っているのはこの人
- TransformationJobが来たら,textをupper caseにする(=job)
- CurrentClusterStateが来たら,クラスタメンバを見て,UP状態だったら, BackendRegistration messagをFrontend Actorに投げる
- MemberUpメッセージが来たら,register.このメッセージを購読していないと,backendが登録されない場合がある
- registerはメンバのロールがfrontedだった場合にbackendを登録するメッセージをfrontendに投げる
- main
- port, config, actor systemして
- system.actorOf()でbackend actorを作る
実行
A Simple Cluster Exampleと同じでAppクラスを実行することもできるし,別のJVMで起動することも可能.
Node Roles
- すべてのノードがロールを持つ必要はない
- ロールの例
- web front-end
- data access
- 複雑な計算
- cluster-aware routersはロールを考慮することが可能
- akka.cluster.rolesがあるので,設定ファイルで設定するか,actor systemを作るときにconfig objectに渡す
- Member.roles: Set[String])で入っているので,member objectからアクセスできる
How To Startup when Cluster Size Reached
クラスターがある特定条件を満たしたら,アクターをスタートさせたい場合がある
例えば,
- cluster has been initialized
- members have joined
- the cluster has reached a certain size
になった場合など.
leaderがメンバーの状態を"Joining" -> "Up"にするまでに必要なメンバの数はconfigで指定可能
akka.cluster.min-nr-of-members = 3
ロール別にも可能
akka.cluster.role { frontend.min-nr-of-members = 1 # role-name=frontend backend.min-nr-of-members = 2 # role-name=backend #<role-name>.min-nr-of-members = 1 }
典型例ではCluster.registerOnMemberUpのコールバックと一緒に使ってこのコールバックの中でアクターを作る
Cluster.registerOnMemberUpコールバックの例
Cluster(system) registerOnMemberUp { system.actorOf(Props(classOf[FactorialFrontend], upToN, true), name = "factorialFrontend") }
Contribution
この変からContribution Modulesが入るが,
If a contributions turns out to not “take off” it may be removed again at a later time.
らしいので,実際に使用する場合は気をつける
Cluster Singleton
Singleton Actorを作りたい場合がある.
member eventsを購読して,特別なケースを考慮すればいいのだが,
Cluster Singletonを使った方がいい.
Cluster Sharding
Shardingの話.
Cluster Shardingによると,
Cluster sharding is typically used when you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors it might be easier to run them on a Cluster Singleton node.
らしい.
Distributed Publish Subscribe
一般的なpub/subを実現するDistributed Publish Subscribe.
メッセージを送る側(publisher)は,アクターが動いている(最終的な)目的地を意識しなくていい.
Cluster Client
Cluster Clientモジュールがある.
自分でclientを作るには,TransformationBackend#registerを見る感じだと,エントリポイントがわかっていれば,
context.actorSelection(RootActorPath(member.address) / "user" / "frontend")
で取得したActorSelectionにメセージを送ればいいと思われる.
Cluster Clientモジュールを見ている感じだと,同じようなことをやっているはず.
StatsSampleClient.scalaをみるとなんとなくクライアントの書き方がわかる.
Failure Detector
説明
- clusterではノードは他のノード(default maxで5)にmoniteringされている
- あるノードがunreachableとなったらgossip protocolで広がっていく
- 1つのノードがunreachable nodeをunreachableとするのみで十分ということ
- system messagesがノードに届かない場合に,ノードが隔離される
- そのsystem messagesは,watch, Terminated, remote actor deployment, failures of actors supervised by remote parentなど
- その後,down or removedになる
- nodeはheartbeatを送り合って,互いにモニタリングしている
- heartbeat到達時間はPhi Accrual Failure Detectorによって解釈される
- suspicion level of failureはphi値によって与えられる
phi = -log10(1 - F(timeSinceLastHeartbeat))
Thresholdの参考値
- defaultは8
- 基本はこれでOK
- low thresholdは,false-positiveになるけど,detectionは早くて本当にcrashしている時に早くわかる
- low thresholdは,false-positiveを小さくするけれど,本当のcrashを発見するのに時間がかかる
- Cloud環境 (EC2)などはthreshold=12くらいにしたほうがいい.networkの問題がたまに起こるので.
akka.cluster.failure-detector.acceptable-heartbeat-pause
Cluster Aware Routers
Routerはクラスターノードawareで作られている(defaultはoffのよう).ノードがunchreable or leave cluster担ったら,そのノードのRouteeもそうなるし,unreachable -> reachableでも同じ.また,新しいノードがクラスターに参加したら,そのノードのrouteeもルーターに追加される.
ルータには次の2種類がある
- Group
- メッセージはactor selctionを使って,特定のパスに送る
- routeeは別ノードのrouterでシェアできる
- backend nodeで動いているサービス向きでfrontendにいるroutersに使われる
- Pool
Router with Group of Routees
- routeeアクターはクラスタメンバとしてスタートさせないとならない
- routerで行うのではない
- routee actorsはactor systemを起動したらすぐにスタートすること
- routeeのlookupをuse-roleでfilterすることも可能
- routeeの設定は,configでもコードでもできる (以下,全部そうだが,コードのサンプルは割愛する.ドキュメントには載っている)
Router Example with Group of Routees
サンプルは,text中に出てくるwordsの平均文字数を数えるタスク
コード (一度読んだほうが理解が深まる)
- Actors(ここにある)
- StatsService: routerを持っていて,jobが来たら,aggregatorを作って,jobと一緒に,routerに渡す
- StatsAggregator: word lenghtのカウント, word数 in textの数のタスクが集まるまではresultを返さない
- StatsWorker: routeeになる,キャッシュから引いてきているけれど,このタスクなら,word.lengthしたほうが早いかも
- StatsSampleClient: クライアント,2秒に1回 tick taskを送っている.tickタスクでStatsJobがキックされる
- config (ここにある)
- application.conf
- stat1.conf
- main
- StatsSample: StatsWorker, StatsServiceを含むactor systemを作って起動し,StatsSampleClientを含むactor systemを作る(StatsSampleClientはobjectもあるので注意)
ConsistentHashableEnvelope(word, word)をメッセージにworkerに送っているが,ソースをみると,ConsistentHashableEnvelope(message: Any, hashKey: Any)になっているので,2nd paramでどのルータに振るか決めて,1ast paramをメッセージにするんだと思う
ConfigFactory.load("stats1")でstats1.confを読んで,stats1.confの中でinclude "aplication"している.すなわち,confはapplication.conf, reference.confでなくてもいい.ConfigFactory.loadで指定すればOK.
Router with Pool of Remote Deployed Routees
どうもContribution Moduleの Cluster Singletonを使っているよう.single master nodeを作るのに必要のよう.pool routerだからというわけではなく,single master node上でやって見ようという感じ.
基本的にRouter Example with Group of Routeesと同じだけれども
StatsSampleOneMaster
def startup(ports: Seq[String]): Unit = { ports foreach { port => ... //#create-singleton-manager system.actorOf(ClusterSingletonManager.props( singletonProps = Props[StatsService], singletonName = "statsService", terminationMessage = PoisonPill, role = Some("compute")), name = "singleton") //#create-singleton-manager //#singleton-proxy system.actorOf(ClusterSingletonProxy.props(singletonPath = "/user/singleton/statsService", role = Some("compute")), name = "statsServiceProxy") //#singleton-proxy ...
ClusterSingletonManagerはsingleton actorを作って,その子として,statsServiceを作っている感じ.
ClusterSingletonProxyは,statsServiceProxyを作って,/user/singleton/statsServiceへのproxyとしているよう.
なので,StatsSampleOneMasterClientはStatsSampleClientを作って,"/user/statsServiceProxy"を引数に渡している.
Cluster Metrics
metricsもわかる.基本的には,load-balancing routerに使用される.
Hyperic Sigar
built-in metricsはJMX MBeansで集計される様だけど,Hyperic Sigar (VMwareのプロダクトの様)をつかってもいいらしい,SigarはOSのライブラリを使っている.
Adaptive Load Balancing
AdaptiveLoadBalancingPool / AdaptiveLoadBalancingGroupが,cluster metrics dataに基づいて,メッセージのload balancingをする
ロードバランシングの基準があって,
- heap: JVMのheap使用量に基づく
- load: topで見えるload averageに基づく
- cpu: User + Sys + Nice + Waitの使用時間に基づく
- mix: 上記すべてに基づく
集計された値は,exponential weighted averagされる.
サンプルは割愛
Subscribe to Metrics Events/Custom Metrics Collector
MetricsListenerのサンプルが載っているが,割愛.
自作も可能ということ.
- MetricsListenerのサンプル
- akka.cluster.SigarMetricsCollector
- akka.cluster.JmxMetricsCollector
のインプリみてインパイアーされてくれと書いてある.
How to Test
Multi Node TestingとMulti JVM Testingを使う.
Multi JVM Testingはsbtのpluginなのでsbtを使っていない人にはつかない感じ.しかし,まともに使おうとしたら,テストはできないと困る.なので,Multi Node Testingだけで頑張ってみる.
TransformationSampleSpecのみ行う
Original TransformationSampleSpec.scalaとの差分
MultiJvmNodeXがいらない
//class TransformationSampleSpecMultiJvmNode1 extends TransformationSampleSpec //class TransformationSampleSpecMultiJvmNode2 extends TransformationSampleSpec //class TransformationSampleSpecMultiJvmNode3 extends TransformationSampleSpec //class TransformationSampleSpecMultiJvmNode4 extends TransformationSampleSpec //class TransformationSampleSpecMultiJvmNode5 extends TransformationSampleSpec
JUnitで実行
@RunWith(classOf[JUnitRunner]) class TransformationSampleSpec extends MultiNodeSpec(TransformationSampleSpecConfig) with WordSpecLike with Matchers with BeforeAndAfterAll with ImplicitSender {
system propertyの設定が必要なよう
- multinode.max-nodes': '10'
- multinode.host': 'localhost'
- multinode.port': '0'
- multinode.server-host': 'localhost'
- multinode.index': '0'
タイムアウトなどが出たらmulti-node configを設定する.
それでもエラー
Caused by: java.lang.RuntimeException: unexpected transition: Transition(Actor[akka://TransformationSampleSpec/user/TestConductorClient#-484425751],Connecting,Failed)
ここによると,sbtMultiJvm pluginによって,multinode.indexは,自動的に付加されるらしく,sbtMultiJvm pluginを使わずにbuildしたいときには,自分でスクリプトを書かないと行けないらしい.なので,sbtテストしたほうがいい.
activator経由で生成したサンプルプロジェクトで
sbt multiple-jvm:test
で実行され,stdoutに
... [JVM-5] All tests passed. [JVM-4] Run completed in 33 seconds, 279 milliseconds. [JVM-4] Total number of tests run: 3 [JVM-4] Suites: completed 1, aborted 0 [JVM-4] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0 [JVM-4] All tests passed. [info] Passed: Total 5, Failed 0, Errors 0, Passed 5 [success] Total time: 37 s, completed 2015/10/04 2:05:12
な感じのログが出る.
akka clusterをやるなら,sbtを使ったほうがいいということ.
Command Line Management
Akka distributionのbin/akka-clusterがCommand Line Managementツールらしい.
ツールを使う前に,ActorSystemを起動する際に,jmxremote.portを指定しないといけない
java -Dcom.sun.management.jmxremote.port=9999 \ -Dcom.sun.management.jmxremote.authenticate=false \ -Dcom.sun.management.jmxremote.ssl=false
Configuration
いろいろconfigがあるので,これをみろという話.
Especially the heartbeating actors that is used for failure detection can generate false positives if they are not given a chance to run at regular intervals. For this purpose you can define a separate dispatcher to be used for the cluster actors:
ハートビートアクターはfailure detectionに使われるので,アクターシステムに負荷がかかって,実行されずにいると,failure detectionでfalse positiveが起こる可能性がある.そうしないために別のdispatcherを割り当てられる
- conf 設定例
akka.cluster.use-dispatcher = cluster-dispatcher cluster-dispatcher { type = "Dispatcher" executor = "fork-join-executor" fork-join-executor { parallelism-min = 5 parallelism-max = 5 } }
Dispatcherのthread数を5より小さくしないこと.Deadlockが起きる可能性がある.2.4からは大丈夫.
全体所感
- クラスターはP2Pなのに,どうやって,IPが異なるノードがクラスタに参加できるのかと疑問に思っていたが,seed-nodesさえ指定すれば,それがクラスタへのエントリポイントになるので,seed-nodesが立ち上がっていたら,自動的にクラスタに参加できるはず.なので,seed-nodesは落ちないようにしないと行けないと思う.
- プロダクションでは,基本的に1Actor Sytem/1JVM/1Node/1BMor1VMを動かすべき.
- configを使ったrouterは,confggとコードでcouplingしやすい気がする.(すくなくともサンプルはcouplingしている)
- akkaのドキュメントは,全体的に説明が不十分な気がする.
- テストがわかりづらい.
これで,クラスタを使えるようになったつもりになれた.ちゃんと読めばそこまで難しくない.
Note
Akka 2.4.0がstableになっていた.
以下の項目が充実されている.
Networkingに関しては,今までcontributionであったのが,distributionに加わっている.大分充実されたように感じる.
Actor
- Persistence
- Persistence - Schema Evolution
- Persistence Query
- Persistence Query for LevelDB
Networking
- Cluster Singleton
- Distributed Publish Subscribe in Cluster
- Cluster Client
- Cluster Sharding
- Cluster Metrics Extension
- Distributed Data
参考URL
- http://doc.akka.io/docs/akka/2.3.14/scala/cluster-usage.html
- http://www.typesafe.com/activator/download
- http://www.typesafe.com/activator/template/akka-sample-cluster-scala?_ga=1.263330700.1035464810.1442845000
- http://www.typesafe.com/activator/template/akka-sample-cluster-scala?_ga=1.263330700.1035464810.1442845000#code/src/main/resources/application.conf
- http://www.typesafe.com/activator/template/akka-sample-cluster-scala?_ga=1.263330700.1035464810.1442845000#code/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala