Execute Spark Application on Eclipse + Spark (Scala) + Gradle
Eclipse + Spark (Scala) + Gradle でSpark Application(jarファイル)を実行
一般的な開発では,
というステップをとるのが自然だと思う.
sbtでbuildでなく,mavenを使ってbuildしたいというのが昨今の普通のjavaプログラマーだと思うが,最近はGradleだろうということで,Eclipse + Gradle + Spark (Scala)で開発.
環境
Project作成
Gradleの設定
- cloudera maven repository
を参考にgroup, artifact, versionを設定
CDH5.3を使っているので5.3に合わせておくにこしたことはない.
- build.gradle
fatjarを作る用にしている (application pluginのdistZip taskで事足りるかも)
buildscript { repositories { jcenter() } dependencies { classpath 'eu.appsatori:gradle-fatjar-plugin:0.3' } } apply plugin: 'eu.appsatori.fatjar' apply plugin: 'scala' apply plugin: 'Eclipse' sourceCompatibility = 1.7 version = '1.0' jar { manifest { attributes 'Implementation-Title': 'Gradle Quickstart', 'Implementation-Version': version } } repositories { mavenCentral() maven { url "https://repository.cloudera.com/artifactory/cloudera-repos" } } dependencies { compile group: 'commons-collections', name: 'commons-collections', version: '3.2' compile 'org.scala-lang:scala-library:2.11.2' // java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package がでるためコメント //compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.5.0-cdh5.3.0' compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.2.0-cdh5.3.0' testCompile group: 'junit', name: 'junit', version: '4.+' } test { systemProperties 'property': 'value' } uploadArchives { repositories { flatDir { dirs 'repos' } } }
project上で右クリ-> Gralde -> refresh all (or depencencis) で${lib}.jarのDLが始まる.結構時間が掛かる.
${HOME}/.gradleの下にlibがDLされている.
Run Application
3パターンやる
- Eclipseからapplicationをlocalで実行
- Eclipseからaplicationをyarn clusterで実行
- Eclipseでjarを作り,spark applicationをyarn clusterで実行
- spark-submit shellを使う場合はfatでなくても良い.
- /opt/cloudera/parcels/CDH/lib/spark/lib/spark-assembly-${version}.jarもクラスパスにはいるので
- assemply jarに入ってないライブラリに依存している場合はその限りでない
sample codeは基本的に
https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications
のSelf-Contained Applicationsのを一部変更
Sample Code
package edu.kzk import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext.rddToPairRDDFunctions object SimpleApp { def main(args: Array[String]) { // spark context val conf = new SparkConf(); conf.setAppName("Simple Test Application"); // when local application conf.setMaster("local[3]"); // when an application on cluster //conf.setMaster("yarn-client"); //conf.setMaster("yarn-cluster"); val sc = new SparkContext(conf); // wordcount //val filepath = "file:///home/kzk/datasets/news20/news20.dat"; // when on local val filepath = "datasets/news20/news20.dat"; // when on cluster val data = sc.textFile(filepath, 2).cache(); val result = data.flatMap(line => line.split(" ")) .map(elm => (elm.split(":")(0), 1)) .reduceByKey(_ + _); //result.take(10).foreach(elm => println(elm)); } }
- conf.setMaster("local[3]");
- conf.setMaster("yarn-client");
- conf.setMaster("yarn-cluster");
の3通りのうち実行したい形態で実行するので,以下注意.
Eclipseからapplicationをlocalで実行
これは簡単
- 1 threadならconf.setMaster("local");
- n threadならconf.setMaster("local[3]");
Eclipseからaplicationをyarn clusterで実行
confの設定
classpathにconfディレクトリを通さなくてはならない.
環境変数ではないので注意
Cloudera Managerからyarn-confを取ってくる.
YARN -> Configuration -> Action -> Download Configuration
DLできたら適当なディレクトリに置く
Run -> Run Confuguration -> SimpleApp -> Classpath -> Advanced -> Add External Folder -> /pathto/yarn-conf
add user
$ sudo adduser kzk $ sudo -u hdfs hadoop fs -mkdir /user/kzk $ sudo -u hdfs hadoop fs -chown kzk /user/kzk
以下,Sample Code上,
conf.setMaster("yarn-client");
でRun Application
Error
... 14/12/24 00:13:06 WARN hdfs.DFSClient: DataStreamer Exception org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/kzk/.sparkStaging/application_1419324462336_0005/spark-yarn_2.10-1.1.0-cdh5.2.0-SNAPSHOT.jar could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget(BlockManager.java:1504) ...
ログを見る.
/var/log/hadoop-yarn/hadoop-cmf-yarn-RESOURCEMANAGER-${hostname}.log.out
2014-12-24 00:34:22,113 INFO org.apache.hadoop.yarn.server.resourcemanager.ClientRMService: Allocated new applicationId: 7
job submitは成功しているようだが,minReplication=1なのに自分を覗いてreplicationしようとしているからエラーのよう.ちなみに,同じエラーでhdfsのcacheを消せとか,nomenode formatしろとか書いてあるが,nodeからはput/getできるののでそれらが問題ではないと思う.
現状single nodeなので,これ以上は追わない.
Eclipseでjarを作り,spark applicationをyarn clusterで実行
run jar
$ /opt/cloudera/parcels/CDH/bin/spark-submit \ --class edu.kzk.SimpleApp \ --master yarn --deploy-mode client # or cluster (違いはURL参照)\ /pathto/spark-gradle-sample-1.0.jar
cluster modeだとfailed statusで終わる.
そもそもClouderaのドキュメントとSparkのドキュメントで,spark-submitのオプション指定の仕方が異なる
Cloudera Doc
--master yarn --deploy-mode client # or cluster (違いはURL参照)\
vs
Spark doc
--master yarn-client # or cluster
2つ目のだとそもそも実行すらされなかった.
1つ目のはログを見る限り正しくstdoutにprintlnできているが,
ERROR yarn.ApplicationMaster: SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application.
のエラーが起きて,Application finished with failed statusで帰ってきているよう.
参考URLs
- http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/
- http://open-groove.net/spark/yarn-client-or-cluster-mode/
- http://www.cloudera.com/content/cloudera/en/documentation/cdh5/v5-0-0/CDH5-Installation-Guide/cdh5ig_running_spark_apps.html
- https://spark.apache.org/docs/1.2.0/submitting-applications.html
- http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/cdh_ig_ports_cdh5.html
- http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/cdh_ig_running_spark_apps.html