GraphX 3
Graph Operatorsのまとめ.
Graph Operatorはいろいろある.細かい話は,API Docみろと書いてある.
Property Operators
RDD.mapのgraphX版.
graph.vertices.map{}より最適.インデックスを保存して再利用すため.
class Graph[VD, ED] { def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] }
Structural Operators
Edgeの向きを変えたり,graphからsubgraphをつくったり,マスクしたり,多重エッジをマージしたりする.
class Graph[VD, ED] { def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] }
Join Operators
すでにあるGraphにデータ点を追加したりする.
class Graph[VD, ED] { def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD) : Graph[VD, ED] def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] }
Neighborhood Aggregation
Map Reduce Triplets (deprecated)
かなり最適化されているそう.
このメソッドは引数2つで,
- tripletをうけて,vertexid(=long)とvertex attribute(=tupple)を返す
- vertex attribute(=tupple)を2つ受けて,vertex attribute(=tupple)を返す.
class Graph[VD, ED] { def mapReduceTriplets[A]( map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], reduce: (A, A) => A) : VertexRDD[A] }
MapReduceTripletSampleApp.scala (deprecated)
package edu.kzk.graphx.neighborhood_aggregation import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.util.GraphGenerators import org.apache.spark.graphx.VertexRDD import org.apache.spark.graphx.Graph object MapReduceTripletSampleApp { def main(args: Array[String]) { // spark context val conf = new SparkConf(); conf.setAppName("Simple Test Application"); conf.setMaster("local") val sc = new SparkContext(conf); // Graph auto-generation val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble ); // Compute the number of older followers and their total age val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)]( triplet => { // Map Function if (triplet.srcAttr > triplet.dstAttr) { // Send message to destination vertex containing counter and age Iterator((triplet.dstId, (1, triplet.srcAttr))) } else { // Don't send a message for this triplet Iterator.empty } }, // Add counter (= the number of follower) and age (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function ); // Divide total age by number of older followers to get average age of older followers val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } ); // Display the results avgAgeOfOlderFollowers.collect.foreach(println(_)) } }
Aggregate Messages
Spark1.2からはmapReduceTripletが非推奨になった.mapReduceTripletがdeprecatedになって,aggregatedMessagesを使えとなっている.mapReduceTripletでは,API使う側にIteratorを返させるようになっており,全然ユーザフレンドリーじゃないというはなし.ただし,PregelAPIはまだmapreduceTriletを使っている様.mapReduceTripletとの違いが次.
- msgFunではtriplet(=EdgeContext)を受けとる
- メソッド内でsendToSrc or sendToSrcをする
val graph: Graph[Int, Float] = ... def msgFun(triplet: EdgeContext[Int, Float, String]) { triplet.sendToDst("Hi") } def reduceFun(a: Int, b: Int): Int = a + b val result = graph.aggregateMessages[String](msgFun, reduceFun)
AggregateMessages.scala
package edu.kzk.graphx.neighborhood_aggregation import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.util.GraphGenerators import org.apache.spark.graphx.VertexRDD import org.apache.spark.graphx.Graph object AggregateMessages { def main(args: Array[String]) { // spark context val conf = new SparkConf(); conf.setAppName("Simple Test Application"); conf.setMaster("local") val sc = new SparkContext(conf); // Import random graph generation library import org.apache.spark.graphx.util.GraphGenerators // Create a graph with "age" as the vertex property. Here we use a random graph for simplicity. val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble ) // Compute the number of older followers and their total age val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)]( triplet => { // Map Function if (triplet.srcAttr > triplet.dstAttr) { // Send message to destination vertex containing counter and age triplet.sendToDst(1, triplet.srcAttr) } }, // Add counter and age (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function ); // Divide total age by number of older followers to get average age of older followers val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } ); // Display the results avgAgeOfOlderFollowers.collect.foreach(println(_)) } }
Computing Degree Information
GraphにおけるDegreeはある点につながっている辺の数.有向グラフならout-degree/in-degree (出て行く辺,向かう辺)がある.
GraphOpsクラスがdegree関連のメソッドを持っている
// Define a reduce operation to compute the highest degree vertex def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b } // Compute the max degrees val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
Collecting Neighbors
ある点の周辺の点とそのプロパティを取ってくる.
これよりmapReduceTripletを使えと書いてある.
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
Caching and Uncaching
あるGraphを何度も使用するようなiterative computationの場合は,Graph.cache()を使う.しかし,uncachingも重要.LRU (Least Recently Used)の順でcached RDDは消えていくが,古いもう使わないデータがメモリに残っているとGCが起きたときにシステムが遅くなる.
iterative computationを使う場合はPregelAPIを使おう.
次はPregelAPI.
参考URLs
- https://spark.apache.org/p9graphx/
- https://spark.apache.org/docs/0.9.0/graphx-programming-guide.html
- http://kowshik.github.io/JPregel/pregel_paper.pdf
- http://www.cloudera.co.jp/jpevents/cwt2014/static/pdf/B-5.pdf
- http://en.wikipedia.org/wiki/Multigraph
- http://en.wikipedia.org/wiki/Multiple_edges
- http://en.wikipedia.org/wiki/Bipartite_graph
- http://www.ne.jp/asahi/hishidama/home/tech/scala/def.html#h_context_bound
- http://www.ne.jp/asahi/hishidama/home/tech/scala/classtag.html
- https://spark.apache.org/docs/latest/graphx-programming-guide.html#aggregate-messages-aggregatemessages