GraphX 2
Graph Builder
ファイルからデータを読んでGraphを作るために,GraphBuilderクラスが用意されている.
- GraphBuilderはedgeのrepartiionをしない
- edge groupがHDFSのblockに対応する感じ
- 同一の辺 (identical edge)はRDDの同じパーティションに置かれる
- ここでいうidentical edgeはsrcid = dstidでpropertyが違うでいいのか?
Load from file
GraphLoader.edgeListFile(
sc: SparkContext,
path: String,
) : Graph[Int, Int]
Vertex and Edge RDDs
VertexRDDs
VertexRDD[A]は,RDD[(VertexID, A)] を拡張している.
VertexIDは,1回だけ出現するという制約を掛けて,ある特性をもった点集合として点集合を表現している.点の特性は再利用可能なhash-map構造になっている.
よくわからんが,あるVertexRDD内のhashを他のVertexRDDを作るときに再利用すると,これらRDD間でのjoin処理が高速化されるとみた.aggregateUsingIndexを参考.
こういった構造にすることで他の便利関数が追加されている.
// Filter the vertex set but preserves the internal index def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] // Transform the values without changing the ids (preserves the internal index) def mapValues[VD2](map: VD => VD2): VertexRDD[VD2] def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2] // Remove vertices from this set that appear in the other set def diff(other: VertexRDD[VD]): VertexRDD[VD] // Join operators that take advantage of the internal indexing to accelerate joins (substantially) def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD. def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
aggregateUsingIndex
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1))) val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) // There should be 200 entries in rddB rddB.count val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _) // There should be 100 entries in setB setB.count // Joining A and B should now be fast! val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
EdgeRDDs
EdgeRDD[ED]はRDD[Edge[ED]]を拡張している.いくつか追加的な便利関数が追加されている.
// Transform the edge attributes while preserving the structure def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2] // Revere the edges reusing both attributes and structure def reverse: EdgeRDD[ED] // Join two `EdgeRDD`s partitioned using the same partitioning strategy. def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
Optimized Representation
graph partitioningのアプローチとして2通りある.
- edge-cut
- vertex-cut
GraphXはvertex-cutを採用している.
こうすると,ノード間のコミュニケーションとstorage spaceのオーバーヘッドを減らせる.
vertex-cutなのでedges 2 machines,vertices to multile machinesという配置になる.
こんなグラフがあった時,
Vertices = [A, B, C, D, E, F]
Edgets = [(A-B), (A-C), (A-E), (A-F), (B-C), (C-D), (D-E), (E-F)]
2D-partioningでAとDでcutすると,
- Vertex Table = RDD[[A, B, C], [D, E, F]]
- Routing Table = RDD[[A(1, 2), B(1), C(1)], [D(1, 2), E(2), F(2)]]
- Edge Table = RDD[[(A-B), (A-C), (B-C), (C-D)], [(A-E), (A-F), (D-E), (E-F)]]
このように,cutされたvertexを何番目のパーティションにブロードキャストすればいいかのRouting Tableを持っていて,joinが必要とされる操作を高速にする.
次は,Graph Operators.
参考URLs
- https://spark.apache.org/p9graphx/
- https://spark.apache.org/docs/0.9.0/graphx-programming-guide.html
- http://kowshik.github.io/JPregel/pregel_paper.pdf
- http://www.cloudera.co.jp/jpevents/cwt2014/static/pdf/B-5.pdf
- http://en.wikipedia.org/wiki/Multigraph
- http://en.wikipedia.org/wiki/Multiple_edges
- http://en.wikipedia.org/wiki/Bipartite_graph
- http://www.ne.jp/asahi/hishidama/home/tech/scala/def.html#h_context_bound
- http://www.ne.jp/asahi/hishidama/home/tech/scala/classtag.html
パーティショニング戦略はデフォルトではGraphをつくるときのと同じだが,選択可能.いろんな経験則がある.