KZKY memo

自分用メモ.

GraphX 4

GraphXのPregelAPI関連のまとめ.

はじめにGoogleのPregel Paperの概要をまとめてから,GraphXのPregelAPIをまとめる.

Pregel Paper 概要

Balk Synchronous Parallelの実装

  • superstepを1単位としたMessage Passing
  • すべてのvertexがinactiveになって,メッセージがなくなったら終了
  • インプットソースはGFS, Bigtable
  • 基本はグラフをパーティションに分ける
  • パーティションは点と外に向かう辺を持っている
  • Combiner/Aggregatorの様にMapReduce/Sparkに登場する概念もある
  • いくつかPregelに向いているアルゴリズムを提示
    • PageRank
    • Shortest Path
      • single-source shortest paths
      • s-p shortest path
      • all-pairs shortest paths
    • Bipartite Matching
    • Semi-Clustering
  • 論文ではSingle Source Shortest Path (SSSP)で実験結果を示している

Super Stepsの簡単な例.

例では4ノードで濃い線がエッジ,ドット線がメッセージ,上から下へSuperStepsが進むみ,
白丸がActive状態で,灰色の丸がInactive状態.

f:id:KZKY:20141227020315p:plain

Pregel API

Google論文のAPIはこれ.

f:id:KZKY:20141227020318p:plain

GraphXのPregel APIインターフェイス

GraphXのPregelAPIはPregelというよりもGraphLabのメッセージに近い.エッジトライプレットの関数としてメッセージが計算され,メッセージの計算はソース/ディスト点の両方の属性にアクセスできる.

def pregel[A]
     (initialMsg: A,
      maxIter: Int = Int.MaxValue,
      activeDir: EdgeDirection = EdgeDirection.Out)
     (vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
   : Graph[VD, ED]
  • A: メッセージのタイプ
  • initialMsg: メッセージの初期値
  • maxIter: 最大イテレーション
  • activeDir: デフォルトはアウトで,前イテレーションでメッセージを受け取った点でvprogが動く
  • vprog: メッセージを受け取った点がどう振る舞うかの関数,Pregelのcompute相当.
  • sendMsg: 今回のイテレーションでメッセージが送られる辺上で作用する関数,PregelのsendMessageTo相当.
  • mergeMsg: メッセージをマージする関数(交換可能で結合律でないといけない),PregelのCombiner相当.

GraphXのPregel API実体.

  • 初期値化
    • はじめにinitialMsgを受け取って点ごとにvprogを実行し,グラフをcache
    • 次にメッセージを計算して,activeMsgの個数をカウント
  • ループに入る
    • ループはactiveMsgが正かつカウンタがマックスイテレーションより小さい間は続く
    • Msgを受け取った点でvprogを実行し,新しい点集合をつくり,それをcache
    • もとのグラフと新しい点集合をJoinし,そのグラフをcache
    • 次のメッセージを計算して,activeMsgの個数をカウント
    • カウンタをインクリメント
  • グラフを返す
class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
    // compute the messages
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages: -----------------------------------------------------------------------
      // Run the vertex program on all vertices that receive messages
      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
      // Merge the new vertex values back into the graph
      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
      // Send Messages: ------------------------------------------------------------------------------
      // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
      // get to send messages.  More precisely the map phase of mapReduceTriplets is only invoked
      // on edges in the activeDir of vertices in newVerts
      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

SingleShortestPathApp

Single Shortest Pathのサンプルコードが次.

package edu.kzk.graphx.pregel

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.graphx.Graph

import org.apache.spark.graphx._

object SingleShortestPathApp {
    def main(args: Array[String]) {

        // Spark context
        val conf = new SparkConf();
        conf.setAppName("Single Shortest Path App");
        conf.setMaster("local");
        val sc = new SparkContext(conf);

        // A graph with edge attributes containing distances
        val graph: Graph[Long, Double] =
                GraphGenerators
                .logNormalGraph(sc, numVertices = 100)
                .mapEdges(e => e.attr.toDouble);
        
        val sourceId: VertexId = 42; // The ultimate source

        // Initialize the graph such that all vertices except the root have distance infinity.
        val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity);
        val sssp = initialGraph.pregel(Double.PositiveInfinity)(
                (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
                triplet => {  // Send Message
                    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
                        Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
                    } else {
                        Iterator.empty
                    }
                },
                (a,b) => math.min(a,b) // Merge Message
                );
        println(sssp.vertices.collect.mkString("\n"))
    }
}

ドキュメントの例のように,

  • val graph: Graph[Int Double]

だと,missing parameter typeで,Eclipseに怒られたので注意.

  • val graph: Graph[Long, Double]

に変えること.

これで導入はおわり.

PregelはSuper Steps毎にSynchすると思うので,
Fully Asynchronous Message PassingならたぶんAkka使うべきだろう.