読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

GraphX 3

Graph Operatorsのまとめ.

Graph Operatorはいろいろある.細かい話は,API Docみろと書いてある.

Property Operators

RDD.mapのgraphX版.
graph.vertices.map{}より最適.インデックスを保存して再利用すため.

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

Structural Operators

Edgeの向きを変えたり,graphからsubgraphをつくったり,マスクしたり,多重エッジをマージしたりする.

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

Join Operators

すでにあるGraphにデータ点を追加したりする.

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

Neighborhood Aggregation

Map Reduce Triplets (deprecated)

かなり最適化されているそう.

このメソッドは引数2つで,

  • tripletをうけて,vertexid(=long)とvertex attribute(=tupple)を返す
  • vertex attribute(=tupple)を2つ受けて,vertex attribute(=tupple)を返す.
class Graph[VD, ED] {
  def mapReduceTriplets[A](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      reduce: (A, A) => A)
    : VertexRDD[A]
}

MapReduceTripletSampleApp.scala (deprecated)

package edu.kzk.graphx.neighborhood_aggregation

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.Graph

object MapReduceTripletSampleApp {
    def main(args: Array[String]) {
        // spark context
        val conf = new SparkConf();
        conf.setAppName("Simple Test Application");
        conf.setMaster("local")
        val sc = new SparkContext(conf);


        // Graph auto-generation
        val graph: Graph[Double, Int] =
                GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble );

        // Compute the number of older followers and their total age
        val olderFollowers: VertexRDD[(Int, Double)] = 
                graph.mapReduceTriplets[(Int, Double)](
                        triplet => { // Map Function
                            if (triplet.srcAttr > triplet.dstAttr) {
                                // Send message to destination vertex containing counter and age
                                Iterator((triplet.dstId, (1, triplet.srcAttr)))
                            } else {
                                // Don't send a message for this triplet
                                Iterator.empty
                            }
                        },

                        // Add counter (= the number of follower) and age
                        (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
                        );


        // Divide total age by number of older followers to get average age of older followers
        val avgAgeOfOlderFollowers: VertexRDD[Double] =
                olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } );

        // Display the results
        avgAgeOfOlderFollowers.collect.foreach(println(_))

    }
}

Aggregate Messages

Spark1.2からはmapReduceTripletが非推奨になった.mapReduceTripletがdeprecatedになって,aggregatedMessagesを使えとなっている.mapReduceTripletでは,API使う側にIteratorを返させるようになっており,全然ユーザフレンドリーじゃないというはなし.ただし,PregelAPIはまだmapreduceTriletを使っている様.mapReduceTripletとの違いが次.

  • msgFunではtriplet(=EdgeContext)を受けとる
val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
  triplet.sendToDst("Hi")
}
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

AggregateMessages.scala

package edu.kzk.graphx.neighborhood_aggregation

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.Graph

object AggregateMessages {
    def main(args: Array[String]) {
      
        // spark context
        val conf = new SparkConf();
        conf.setAppName("Simple Test Application");
        conf.setMaster("local")
        val sc = new SparkContext(conf);
      
        // Import random graph generation library
        import org.apache.spark.graphx.util.GraphGenerators
        // Create a graph with "age" as the vertex property.  Here we use a random graph for simplicity.
        val graph: Graph[Double, Int] =
        GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
        // Compute the number of older followers and their total age
        val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
                triplet => { // Map Function
                    if (triplet.srcAttr > triplet.dstAttr) {
                        // Send message to destination vertex containing counter and age
                        triplet.sendToDst(1, triplet.srcAttr)
                    }
                },
                // Add counter and age
                (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
                );

        // Divide total age by number of older followers to get average age of older followers
        val avgAgeOfOlderFollowers: VertexRDD[Double] =
                olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } );
        // Display the results
        avgAgeOfOlderFollowers.collect.foreach(println(_))
    }
}

Computing Degree Information

GraphにおけるDegreeはある点につながっている辺の数.有向グラフならout-degree/in-degree (出て行く辺,向かう辺)がある.

GraphOpsクラスがdegree関連のメソッドを持っている

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

Collecting Neighbors

ある点の周辺の点とそのプロパティを取ってくる.
これよりmapReduceTripletを使えと書いてある.

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]

Caching and Uncaching

あるGraphを何度も使用するようなiterative computationの場合は,Graph.cache()を使う.しかし,uncachingも重要.LRU (Least Recently Used)の順でcached RDDは消えていくが,古いもう使わないデータがメモリに残っているとGCが起きたときにシステムが遅くなる.
iterative computationを使う場合はPregelAPIを使おう.

次はPregelAPI.