怒涛のAkka: Futures
基本
- Futureはakka pgkではなくてscala pkgのしたにある.
- concurrent操作結果を受け取るときに使用されるがblockingでも使える.
- Execution Contextが必要
- Actor内ならimport context.dispatcherでExecutionContextを使うのも良い
- Timeoutも必要
- Await.resultでblocking
- future pipeTo actorで他のActorに結果をパイプ可能
- monadが定義されているのでfuture.map.foreachのように流れる操作が可能
- for-comprehensionも可能.だがfor-comprehensionの中身はsequentialなので注意
複雑なメッセージパッシング
複雑なメッセージパッシングではfor-comprehensionでfutrueをまとめる.
M1 to actor1 and M2 to actor2, then (R1 + R2) to actor3
さらに結果は非同期で受け取る例.
val f1 = ask(actor1, msg1) val f2 = ask(actor2, msg2) val f3 = for { a <- f1.mapTo[Int] b <- f2.mapTo[Int] c <- ask(actor3, (a + b)).mapTo[Int] } yield c f3 foreach println
便利メソッド
Future.sequence
Travserseを実装しているTに関して,
T[Future[A]]をFuture[T[A]]に変換.
Future.traverse
Travserseを実装しているTに関して,
T[A]とA => Future[B]をとって,Future[T[A]]に変換.
traverseのほうがsequenceよりoverheadが少ない.
Future.fold
start-valueをとって,FutureのListを畳み込んでいく.
Future.reduce
start-valueをとらずに,FutureのListを畳み込んでいく.
これらのサンプル.
- FutureUsefulMethod.scala
package edu.kzk.actor.future import scala.concurrent.Future import scala.concurrent.duration._ object FutureUsefulMethod extends App { import scala.concurrent.ExecutionContext; import ExecutionContext.Implicits.global val futureList = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1)) val oddSum = futureList.map(_.sum) oddSum foreach println val futureList1 = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1))) val oddSum1 = futureList1.map(_.sum) oddSum1 foreach println // Create a sequence of Futures val futures = for (i <- 1 to 1000) yield Future(i * 2) val futureSum = Future.fold(futures)(0)(_ + _) futureSum foreach println // Create a sequence of Futures val futures1 = for (i <- 1 to 1000) yield Future(i * 2) val futureSum1 = Future.fold(futures1)(0)(_ + _) futureSum1 foreach println }
Callback/Ordering/Auxiliary
Callback
Future.
- onComplete
- onSuccuess
- onFailure
という一般的なcallback methodsは用意されている.
Ordering
実行順序が必要ならandThenを使う.
future andThen {} andThen {} ...
Auxiliary
Fallbackメソッドも用意されている.
future1 fallbackTo future2 fallbackTo future3
future1 が失敗したら,future2が失敗したら,future3という具合.
成功と成功の場合も用意されていて,
future1 zip future2 zip future3
という具合.
Exception
Exceptionが起ったら,Execptionはfutureが内包している.
future recover {
case e: ArithmeticException => {}
}
な感じで使う.
After
非同期で何か返してくれるタイマーみたいな機能.