読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

怒涛のAkka: Futures

基本

  • Futureはakka pgkではなくてscala pkgのしたにある.
  • concurrent操作結果を受け取るときに使用されるがblockingでも使える.
  • Execution Contextが必要
  • Actor内ならimport context.dispatcherでExecutionContextを使うのも良い
  • Timeoutも必要
  • Await.resultでblocking
  • future pipeTo actorで他のActorに結果をパイプ可能
  • monadが定義されているのでfuture.map.foreachのように流れる操作が可能
  • for-comprehensionも可能.だがfor-comprehensionの中身はsequentialなので注意

複雑なメッセージパッシング

複雑なメッセージパッシングではfor-comprehensionでfutrueをまとめる.

M1 to actor1 and M2 to actor2, then (R1 + R2) to actor3

さらに結果は非同期で受け取る例.

val f1 = ask(actor1, msg1)
val f2 = ask(actor2, msg2)
 
val f3 = for {
    a <- f1.mapTo[Int]
    b <- f2.mapTo[Int]
    c <- ask(actor3, (a + b)).mapTo[Int]
} yield c
 
f3 foreach println

便利メソッド

Future.sequence

Travserseを実装しているTに関して,
T[Future[A]]をFuture[T[A]]に変換.

Future.traverse

Travserseを実装しているTに関して,
T[A]とA => Future[B]をとって,Future[T[A]]に変換.

traverseのほうがsequenceよりoverheadが少ない.

Future.fold

start-valueをとって,FutureのListを畳み込んでいく.

Future.reduce

start-valueをとらずに,FutureのListを畳み込んでいく.

これらのサンプル.

  • FutureUsefulMethod.scala
package edu.kzk.actor.future

import scala.concurrent.Future
import scala.concurrent.duration._

object FutureUsefulMethod extends App {

  import scala.concurrent.ExecutionContext;
  import ExecutionContext.Implicits.global

  val futureList = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1))
  val oddSum = futureList.map(_.sum)
  oddSum foreach println

  val futureList1 = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1)))
  val oddSum1 = futureList1.map(_.sum)
  oddSum1 foreach println

  // Create a sequence of Futures
  val futures = for (i <- 1 to 1000) yield Future(i * 2)
  val futureSum = Future.fold(futures)(0)(_ + _)
  futureSum foreach println

  // Create a sequence of Futures
  val futures1 = for (i <- 1 to 1000) yield Future(i * 2)
  val futureSum1 = Future.fold(futures1)(0)(_ + _)
  futureSum1 foreach println

}

Callback/Ordering/Auxiliary

Callback

Future.

  • onComplete
  • onSuccuess
  • onFailure

という一般的なcallback methodsは用意されている.

Ordering

実行順序が必要ならandThenを使う.

future andThen {} andThen {} ...

Auxiliary

Fallbackメソッドも用意されている.

future1 fallbackTo future2 fallbackTo future3

future1 が失敗したら,future2が失敗したら,future3という具合.

成功と成功の場合も用意されていて,

future1 zip future2 zip future3

という具合.

Exception

Exceptionが起ったら,Execptionはfutureが内包している.

future recover {
case e: ArithmeticException => {}
}

な感じで使う.

After

非同期で何か返してくれるタイマーみたいな機能.