読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

GraphX 2

Graph Builder

ファイルからデータを読んでGraphを作るために,GraphBuilderクラスが用意されている.

  • GraphBuilderはedgeのrepartiionをしない
  • edge groupがHDFSのblockに対応する感じ
  • 同一の辺 (identical edge)はRDDの同じパーティションに置かれる
    • ここでいうidentical edgeはsrcid = dstidでpropertyが違うでいいのか?

Load from file

GraphLoader.edgeListFile(
    sc: SparkContext,
    path: String,
) : Graph[Int, Int]

TSV File

# Comment Line skipped
# Source Id <\t> Target Id
1   -5
1    2
2    7
1    8

基本的にデータソースからRDD[(VertexID, VD)]とRDD[Edge[ED]]をつくって,それらをGraph(V, E)に渡せば良いと思う.

Vertex and Edge RDDs

VertexRDDs

VertexRDD[A]は,RDD[(VertexID, A)] を拡張している.
VertexIDは,1回だけ出現するという制約を掛けて,ある特性をもった点集合として点集合を表現している.点の特性は再利用可能なhash-map構造になっている.
よくわからんが,あるVertexRDD内のhashを他のVertexRDDを作るときに再利用すると,これらRDD間でのjoin処理が高速化されるとみた.aggregateUsingIndexを参考.

こういった構造にすることで他の便利関数が追加されている.

// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
aggregateUsingIndex
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

EdgeRDDs

EdgeRDD[ED]はRDD[Edge[ED]]を拡張している.いくつか追加的な便利関数が追加されている.

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

Optimized Representation

graph partitioningのアプローチとして2通りある.

  • edge-cut
  • vertex-cut

f:id:KZKY:20141225014907p:plain

GraphXはvertex-cutを採用している.

こうすると,ノード間のコミュニケーションとstorage spaceのオーバーヘッドを減らせる.
vertex-cutなのでedges 2 machines,vertices to multile machinesという配置になる.

こんなグラフがあった時,
Vertices = [A, B, C, D, E, F]
Edgets = [(A-B), (A-C), (A-E), (A-F), (B-C), (C-D), (D-E), (E-F)]

f:id:KZKY:20141225014901p:plain

2D-partioningでAとDでcutすると,

  • Vertex Table = RDD[[A, B, C], [D, E, F]]
  • Routing Table = RDD[[A(1, 2), B(1), C(1)], [D(1, 2), E(2), F(2)]]
  • Edge Table = RDD[[(A-B), (A-C), (B-C), (C-D)], [(A-E), (A-F), (D-E), (E-F)]]

このように,cutされたvertexを何番目のパーティションにブロードキャストすればいいかのRouting Tableを持っていて,joinが必要とされる操作を高速にする.

次は,Graph Operators.