GraphX 4
GraphXのPregelAPI関連のまとめ.
はじめにGoogleのPregel Paperの概要をまとめてから,GraphXのPregelAPIをまとめる.
Pregel Paper 概要
Balk Synchronous Parallelの実装
- superstepを1単位としたMessage Passing
- source verticesからメッセージをdestination verticesに送る
- 1イテレーション毎にSynchする
- すべてのvertexがinactiveになって,メッセージがなくなったら終了
- インプットソースはGFS, Bigtable
- 基本はグラフをパーティションに分ける
- パーティションは点と外に向かう辺を持っている
- Combiner/Aggregatorの様にMapReduce/Sparkに登場する概念もある
- いくつかPregelに向いているアルゴリズムを提示
- PageRank
- Shortest Path
- single-source shortest paths
- s-p shortest path
- all-pairs shortest paths
- Bipartite Matching
- Semi-Clustering
- 論文ではSingle Source Shortest Path (SSSP)で実験結果を示している
Super Stepsの簡単な例.
例では4ノードで濃い線がエッジ,ドット線がメッセージ,上から下へSuperStepsが進むみ,
白丸がActive状態で,灰色の丸がInactive状態.
Pregel API
GraphXのPregel APIインターフェイス.
GraphXのPregelAPIはPregelというよりもGraphLabのメッセージに近い.エッジトライプレットの関数としてメッセージが計算され,メッセージの計算はソース/ディスト点の両方の属性にアクセスできる.
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
- A: メッセージのタイプ
- initialMsg: メッセージの初期値
- maxIter: 最大イテレーション数
- activeDir: デフォルトはアウトで,前イテレーションでメッセージを受け取った点でvprogが動く
- vprog: メッセージを受け取った点がどう振る舞うかの関数,Pregelのcompute相当.
- sendMsg: 今回のイテレーションでメッセージが送られる辺上で作用する関数,PregelのsendMessageTo相当.
- mergeMsg: メッセージをマージする関数(交換可能で結合律でないといけない),PregelのCombiner相当.
GraphXのPregel API実体.
- 初期値化
- はじめにinitialMsgを受け取って点ごとにvprogを実行し,グラフをcache
- 次にメッセージを計算して,activeMsgの個数をカウント
- ループに入る
- ループはactiveMsgが正かつカウンタがマックスイテレーションより小さい間は続く
- Msgを受け取った点でvprogを実行し,新しい点集合をつくり,それをcache
- もとのグラフと新しい点集合をJoinし,そのグラフをcache
- 次のメッセージを計算して,activeMsgの個数をカウント
- カウンタをインクリメント
- グラフを返す
class GraphOps[VD, ED] { def pregel[A] (initialMsg: A, maxIter: Int = Int.MaxValue, activeDir: EdgeDirection = EdgeDirection.Out) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { // Receive the initial message at each vertex var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() // compute the messages var messages = g.mapReduceTriplets(sendMsg, mergeMsg) var activeMessages = messages.count() // Loop until no messages remain or maxIterations is achieved var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages: ----------------------------------------------------------------------- // Run the vertex program on all vertices that receive messages val newVerts = g.vertices.innerJoin(messages)(vprog).cache() // Merge the new vertex values back into the graph g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache() // Send Messages: ------------------------------------------------------------------------------ // Vertices that didn't receive a message above don't appear in newVerts and therefore don't // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked // on edges in the activeDir of vertices in newVerts messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() activeMessages = messages.count() i += 1 } g } }
SingleShortestPathApp
Single Shortest Pathのサンプルコードが次.
package edu.kzk.graphx.pregel import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.util.GraphGenerators import org.apache.spark.graphx.Graph import org.apache.spark.graphx._ object SingleShortestPathApp { def main(args: Array[String]) { // Spark context val conf = new SparkConf(); conf.setAppName("Single Shortest Path App"); conf.setMaster("local"); val sc = new SparkContext(conf); // A graph with edge attributes containing distances val graph: Graph[Long, Double] = GraphGenerators .logNormalGraph(sc, numVertices = 100) .mapEdges(e => e.attr.toDouble); val sourceId: VertexId = 42; // The ultimate source // Initialize the graph such that all vertices except the root have distance infinity. val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity); val sssp = initialGraph.pregel(Double.PositiveInfinity)( (id, dist, newDist) => math.min(dist, newDist), // Vertex Program triplet => { // Send Message if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a,b) => math.min(a,b) // Merge Message ); println(sssp.vertices.collect.mkString("\n")) } }
ドキュメントの例のように,
- val graph: Graph[Int Double]
だと,missing parameter typeで,Eclipseに怒られたので注意.
- val graph: Graph[Long, Double]
に変えること.
これで導入はおわり.
PregelはSuper Steps毎にSynchすると思うので,
Fully Asynchronous Message PassingならたぶんAkka使うべきだろう.
参考URLs
- https://spark.apache.org/p9graphx/
- https://spark.apache.org/docs/0.9.0/graphx-programming-guide.html
- http://kowshik.github.io/JPregel/pregel_paper.pdf
- http://www.cloudera.co.jp/jpevents/cwt2014/static/pdf/B-5.pdf
- http://en.wikipedia.org/wiki/Multigraph
- http://en.wikipedia.org/wiki/Multiple_edges
- http://en.wikipedia.org/wiki/Bipartite_graph
- http://www.ne.jp/asahi/hishidama/home/tech/scala/def.html#h_context_bound
- http://www.ne.jp/asahi/hishidama/home/tech/scala/classtag.html