Mesos上部署spark
还是回到之前一直持续的 Mesos话题。在之前的环节里,我们已经尝试了Mesos的安装,Marathon守护服务以及相对比较主流的Mesos作为Hadoop的资源管理器的实际操作。这次就说说同属于伯克利出品的Spark。
其实spark最初0.7以前的版本还没有自己的资源管理系统,资源的调度都是通过Mesos来执行的。甚至还有小道消息说spark本身就是Mesos用来作为测试的一个项目派生出来。所以说,相比Hadoop+mesos的组合,Spark+Mesos作为大数据分析平台是更为原生态的。
曾经有人一直问过类似于“spark是否可以完整的替代hadoop?”之类的问题,我想说的是,hadoop最强大的地方是它的hdfs,而对于spark来说,它的优势在于将大数据的计算做到了极致的简化,但对于大数据来说,大容量的存储和高IO的读写也是一个复杂的工程,这正是Hadoop的价值所在。恰恰相反的是spark的流行只会加速Hadoop的发展。
OK,回到正题,还是要准备一套Mesos的环境,请参照 此文档。
到 这里下载一份spark的安装包,到你的Mesos Master节点上。个人的建议是你可以选择“pre-built for hadoop的版本”,比如我用的 spark-1.5.2-bin-hadoop2.6.tgz
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.5.2-bin-hadoop2.6.tgz
照例的解包之后,开始配置
cd <YOUR_SPARK_PATH>/conf 之后,vi spark-env.sh,内容其实挺简单的:
#!/usr/bin/env bash export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos-0.24.0.so export SPARK_EXECUTOR_URI=/usr/local/spark.tar.gz #export SPARK_EXECUTOR_URI=hdfs://spark.tar.gz
好吧,这个地方有一个很大的坑,官方文档上都没有任何的提及,直到我在踩了很久之后才从这里爬出来。vi spark-defaults.conf。
spark.io.compression.codec lzf
全部弄好之后,将你的spark目录重新打包成tar.gz,然后分发到每一个slave节点下的/usr/local。官方文档是将这个文件上传到hdfs上,这个默认的配置已经被我注掉了,这仅仅只是个测试,我没有HDFS。
应该就差不多了,回到spark主目录,冒烟测试:
/usr/local/spark# bin/spark-shell --master mesos://10.239.21.100:5050 Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.5.2 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91) Type in expressions to have them evaluated. Type :help for more information. I1218 16:00:58.975153 37814 sched.cpp:164] Version: 0.24.0 I1218 16:00:58.983880 37808 sched.cpp:262] New master detected at [email protected]:5050 I1218 16:00:58.985206 37808 sched.cpp:272] No credentials provided. Attempting to register without authentication I1218 16:00:58.992024 37808 sched.cpp:640] Framework registered with 20151218-113549-3298160394-5050-19515-0052 Spark context available as sc. .... .... scala> val a = sc.parallelize(2 to 1000) //手工输入 a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:15 scala> a.collect //手工输入 res0: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177... scala> exit //退出scala界面
中途省略的部分可能有部分报错,以我的经验是换成了官方的jdk1.8之后就OK。看起来openjdk也不是一直都靠谱的。
实验下python调用吧,还是写一个并发版的经典 “羊群繁殖”算法,由于我的集群有96个core,而在一个时间片上一个core只能做一个job,所谓大数据中多于core的任务分配数就是耍流氓,所以这个节点数就设置了96。
vi hog.py
from pyspark import SparkContext if __name__ == "__main__": sc = SparkContext(appName="PythonHog") def f(i): if i < 2: return 1 if i < 5: return f(i-1) + f(i-2) return f(i-1) + f(i-2) - f(i-5) count = sc.parallelize(range(0,35), 96).map(f).reduce(lambda a, b: b-a) print(count) sc.stop()
提交一个任务
bin/spark-submit --master mesos://10.239.21.100:5050 hog.py
PS: 相比python/Java/R语言来说,最适合Spark的编程语言是Scala。支持全面而且代码简单明了。不过我对这种非常容易写出过于变态的语法还是持一种谨慎态度的。