GraphX 1
GraphXをさわってみた.
GraphXはSparkに内容されているSpark APIの一つ.
2014/12/24の時点で0.9なので注意.
基本的に
- https://spark.apache.org/graphx/
- https://spark.apache.org/docs/0.9.0/graphx-programming-guide.html#summary-list-of-operators
の抜粋とまとめ.
広告ページ
Scalaでのイメージ
graph = Graph(vertices, edges) messages = spark.textFile("hdfs://...") graph2 = graph.joinVertices(messages) { (id, vertex, msg) => {do something} }
入門
RDD-extended data structure
Resilient Distributed Property Graphは有効マルチグラフで,点/辺にプロパティーがついている
- 有効:向き付き
- マルチグラフ:ある点から複数の辺が出てよいし,入ってきても良いし,循環もOK
Concept
昨今のグラフ解析パイプラインは,グラフ並列/データ並列をごっちゃにしていて,結果的にデータ移行が多くデータを冗長化しておき,複雑なプログラミングモデルになっている.
グラフ並列/データ並列を1つのAPIで統合し,RDDとしても見れるようにする.昨今のグラフ並列処理の発展を取り入れて,グラフ操作を最適化する.
Import
build.gradle
group, artifact ,versionは適当に合わせること
dependencies { compile group: 'commons-collections', name: 'commons-collections', version: '3.2' compile 'org.scala-lang:scala-library:2.11.2' compile group: 'org.apache.spark', name: 'spark-graphx_2.10', version: '1.2.0-cdh5.3.0' compile group: 'org.apache.spark', name: 'spark-yarn_2.10', version: '1.1.0-cdh5.2.0-SNAPSHOT' testCompile group: 'junit', name: 'junit', version: '4.+' }
SimpleAppでのimport
import org.apache.spark._ import org.apache.spark.graphx._ // To make some of the examples work we will also need RDD import org.apache.spark.rdd.RDD
Property graph
- 点は64-bit longのkeyとプロパティで表現され,keyのオーダリングはしない.
- 辺は点のソース/ディストとプロパティで表現される
- プロパティグラフは点,辺それぞれにつくプロパティ(object)でパラメータ化される
- 点のプロパティを違うタイプ(ユーザ,製品)で表現したいときもある.そういう時は継承をつかって次のようにする.
class VertexProperty() case class UserProperty(val name: String) extends VertexProperty case class ProductProperty(val name: String, val price: Double) extends VertexProperty // The graph might then have the type: var graph: Graph[VertexProperty, String] = null
- RDDと同じで,property graphsは不変で分散化されていてフォールトトレラント
- RDDの変更は新しいRDDを作成するが,影響を受けない構造,特性,インデックスなどの重要部分は再利用される.
optimized version expressing vertex and edge
これを覚えていたほうがいい.
Example Property Graph
Vertex Table
id | property(V) |
---|---|
3 | (rxin, studnet) |
7 | (jgonzal, postdoc) |
5 | (franklin, professor) |
2 | (istoica, professor) |
Edge Table
srcid | dstid | property(E) |
---|---|---|
3 | 7 | collaborator |
5 | 3 | advisor |
2 | 5 | colleague |
5 | 7 | PI |
property graphの作り方いろいろ
- graph guilder使う
- graph objectを使う
- RDDから作る
RDDから作る例
// Assume the SparkContext has already been constructed val sc: SparkContext // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser)
graphからvertex, edgeをフィルタしてとりだす
val graph: Graph[(String, String), String] // Constructed from above // Count all users which are postdocs graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count // Count all the edges where src > dst graph.edges.filter(e => e.srcId > e.dstId).count
triplet viewを見る
val graph: Graph[(String, String), String] // Constructed from above // Use the triplets view to create an RDD of facts. val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) facts.collect.foreach(println(_))
SimpleApp.scala
package edu.kzk.graphx.hello import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.apache.spark.graphx.impl.VertexRDDImpl object SimpleApp { def main(args: Array[String]) { /* * Graph Construction */ // spark context val conf = new SparkConf(); conf.setAppName("Simple Test Application"); conf.setMaster("local") val sc = new SparkContext(conf); // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) /* * Vertices and Edge Views */ // Count all users which are postdocs val countPostdocs = graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count println(s"*of postdoc is user ${countPostdocs}"); // Count all the edges where srcId > dstId val countIdRelation = graph.edges.filter(e => e.srcId > e.dstId).count println(s"*of edges which srcId = dstId is ${countIdRelation}"); /* * Triplet View */ // Use the triplets view to create an RDD of facts. val facts: RDD[String] = graph.triplets.map( triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) facts.collect.foreach(println(_)) } }
output
... # of postdoc is user 1 ... # of edges which srcId = dstId is 1 ... istoica is the colleague of franklin rxin is the collab of jgonzal franklin is the advisor of rxin franklin is the pi of jgonzal
とりあえずここまで.続きは次回.
参考URLs
- https://spark.apache.org/p9graphx/
- https://spark.apache.org/docs/0.9.0/graphx-programming-guide.html
- http://kowshik.github.io/JPregel/pregel_paper.pdf
- http://www.cloudera.co.jp/jpevents/cwt2014/static/pdf/B-5.pdf
- http://en.wikipedia.org/wiki/Multigraph
- http://en.wikipedia.org/wiki/Multiple_edges
- http://en.wikipedia.org/wiki/Bipartite_graph
- http://www.ne.jp/asahi/hishidama/home/tech/scala/def.html#h_context_bound
- http://www.ne.jp/asahi/hishidama/home/tech/scala/classtag.html