KZKY memo

自分用メモ.

GraphX 1

GraphXをさわってみた.
GraphXはSparkに内容されているSpark APIの一つ.

2014/12/24の時点で0.9なので注意.
基本的に

の抜粋とまとめ.

広告ページ

Scalaでのイメージ

graph = Graph(vertices, edges)
messages = spark.textFile("hdfs://...")
graph2 = graph.joinVertices(messages) {
  (id, vertex, msg) => {do something}
} 

実装されているアルゴ

  • PageRank
  • Connected components
  • Label propagation
  • SVD++
  • Strongly connected components
  • Triangle count

PageRank (20 iterations, 3.7B edges)は,Apache GiraphとGraphLabより早いらしい.

Pregel API

Pregelは,Googleが開発したスケーラブル/イテレーティブなグラフアルゴリズム開発プロットフォーム.GraphXのPregel APIはこれの実装.

入門

RDD-extended data structure

Resilient Distributed Property Graphは有効マルチグラフで,点/辺にプロパティーがついている

  • 有効:向き付き
  • マルチグラフ:ある点から複数の辺が出てよいし,入ってきても良いし,循環もOK

Operator

  • 基本的演算子
    • e.g., subgraph, joinVertices, and mapReduceTriplets)
  • Pregel API

Concept

昨今のグラフ解析パイプラインは,グラフ並列/データ並列をごっちゃにしていて,結果的にデータ移行が多くデータを冗長化しておき,複雑なプログラミングモデルになっている.

グラフ並列/データ並列を1つのAPIで統合し,RDDとしても見れるようにする.昨今のグラフ並列処理の発展を取り入れて,グラフ操作を最適化する.

Import

build.gradle

group, artifact ,versionは適当に合わせること

dependencies {
    compile group: 'commons-collections', name: 'commons-collections', version: '3.2'
    compile 'org.scala-lang:scala-library:2.11.2'
	compile group: 'org.apache.spark', name: 'spark-graphx_2.10', version: '1.2.0-cdh5.3.0'
    compile group: 'org.apache.spark', name: 'spark-yarn_2.10', version: '1.1.0-cdh5.2.0-SNAPSHOT'
    testCompile group: 'junit', name: 'junit', version: '4.+'
}
SimpleAppでのimport
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

Property graph

  • 点は64-bit longのkeyとプロパティで表現され,keyのオーダリングはしない.
  • 辺は点のソース/ディストとプロパティで表現される
  • プロパティグラフは点,辺それぞれにつくプロパティ(object)でパラメータ化される
  • 点のプロパティを違うタイプ(ユーザ,製品)で表現したいときもある.そういう時は継承をつかって次のようにする.
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
  • RDDと同じで,property graphsは不変で分散化されていてフォールトトレラント
  • RDDの変更は新しいRDDを作成するが,影響を受けない構造,特性,インデックスなどの重要部分は再利用される.

optimized version expressing vertex and edge

  • VertexRDD[VD] is optimized from RDD[(VertexID, VD)]
  • EdgeRDD[ED] is optimized from RDD[Edge[ED]]

これを覚えていたほうがいい.

Example Property Graph

Vertex Table

id property(V)
3 (rxin, studnet)
7 (jgonzal, postdoc)
5 (franklin, professor)
2 (istoica, professor)

Edge Table

srcid dstid property(E)
3 7 collaborator
5 3 advisor
2 5 colleague
5 7 PI

property graphの作り方いろいろ

  • graph guilder使う
  • graph objectを使う
  • RDDから作る
RDDから作る例
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

graphからvertex, edgeをフィルタしてとりだす

val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

triplet viewを見る

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

SimpleApp.scala

package edu.kzk.graphx.hello

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.impl.VertexRDDImpl


object SimpleApp {
    def main(args: Array[String]) {

        /*
         * Graph Construction
         */

        // spark context
        val conf = new SparkConf();
        conf.setAppName("Simple Test Application");
        conf.setMaster("local")
        val sc = new SparkContext(conf);
      
        // Create an RDD for the vertices
        val users: RDD[(VertexId, (String, String))] =
                sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                        (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))

        // Create an RDD for edges
        val relationships: RDD[Edge[String]] =
                sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                        Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
                        
        // Define a default user in case there are relationship with missing user
        val defaultUser = ("John Doe", "Missing")
        
        // Build the initial Graph
        val graph = Graph(users, relationships, defaultUser)

        
        /*
         * Vertices and Edge Views 
         */
        // Count all users which are postdocs
        val countPostdocs = graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
        println(s"*of postdoc is user ${countPostdocs}");

        // Count all the edges where srcId > dstId
        val countIdRelation = graph.edges.filter(e => e.srcId > e.dstId).count
        println(s"*of edges which srcId = dstId is ${countIdRelation}");
        
        /*
         * Triplet View
         */
        // Use the triplets view to create an RDD of facts.
        val facts: RDD[String] = graph.triplets.map(
                triplet =>
                triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
        facts.collect.foreach(println(_))        
        

    }
}

output

...
# of postdoc is user 1
...
# of edges which srcId = dstId is 1
...
istoica is the colleague of franklin
rxin is the collab of jgonzal
franklin is the advisor of rxin
franklin is the pi of jgonzal

とりあえずここまで.続きは次回.