Spark计算平台

Catalogue
  1. 1. 前言
    1. 1.1. 官方文档
    2. 1.2. 相关平台
    3. 1.3. 学习文档
  2. 2. 架构
    1. 2.1. 集群基础图
    2. 2.2. 相关组件说明
    3. 2.3. 启动程序
    4. 2.4. 运行模式
    5. 2.5. 运行方式
      1. 2.5.1. 程序提交
      2. 2.5.2. spark-submit探讨
    6. 2.6. 提交Spark应用
  3. 3. 基本操作
    1. 3.1. 程序提交
      1. 3.1.1. 主要方式
    2. 3.2. 使用Spark
      1. 3.2.1. Pyspark
      2. 3.2.2. Submit
      3. 3.2.3. Launcher
    3. 3.3. 认识RDD
      1. 3.3.1. 概念
      2. 3.3.2. RDD操作流程示例
    4. 3.4. RDD基本函数
    5. 3.5. 存取文件
      1. 3.5.1. 存取HDFS并保存为一般格式
      2. 3.5.2. 存取pickle文件
    6. 3.6. 存取parquet格式文件
      1. 3.6.1. 转换与保存
      2. 3.6.2. 读取与操作
      3. 3.6.3. 本地打印元素
      4. 3.6.4. topK操作
  4. 4. Shuffle
    1. 4.1. 概念
      1. 4.1.1. 粗细粒度探讨
  5. 5. 进一步了解Spark
    1. 5.1. 应用调度策略
      1. 5.1.1. 两种调度策略
      2. 5.1.2. 参数设置
    2. 5.2. Spark map reduce基本原理
    3. 5.3. RDD
      1. 5.3.1. 基本组成
      2. 5.3.2. RDD,DataFrame,DataSet
    4. 5.4. blockManager
    5. 5.5. 计算任务流程的梳理
      1. 5.5.1. 划分Stage
      2. 5.5.2. 提交
      3. 5.5.3. 监控
      4. 5.5.4. 任务结果的获取
  6. 6. 适用场景
    1. 6.1. 不适合超大数据量的计算
    2. 6.2. 不适合异步更新模型

前言

官方文档

相关平台

学习文档

注: Spark更新较快,具体操作查看官网doc可能更详细

架构

集群基础图

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).
Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.

由此可知:

  • Spark说到底就是集群中的一系列由SparkContext协调的进程,而SparkContext对象驻留在driver程序中;
  • SparkContext作为driver程序中最重要的部分,需要连接到cluster manager从而获取资源的分配,资源主要是executor(一种计算资源的抽象,可以理解为进程资源);
  • 当你运行Spark程序后,将通过SparkContext提交代码到executor中,并在之后SparkContext通过某种方式指定具体的task分配到executor中运行。

此处输入图片的描述

相关组件说明

  • Driver Program:请求Executor启动Task等,数据归约操作收集端;
  • Cluster Manager:standalone 集群管理器或 Mesos/YARN
  • Worker Node:一台机器默认一个worker (multi only for standalone)
  • Executor Process : 默认一个worker节点的一个JVM实例,服务于单个spark app,执行 Task任务,一个worker可以有多个executor实例,对于yarn而言,可通过–num-executors加以设置,对于standalone而言,可通过–total-executer-cores和–executor-cores结合设置;
    一个Spark app可以有多个job(action),一个job可以有多个stage(shuffle data),一个stage可以有多个task(partition, 开发者视角);
  • DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler;
  • TaskScheduler:将Taskset提交给Worker node 集群运行并返回结果,一个应用对应一个TaskScheduler;

启动程序

运行模式

Spark的运行模式取决于传递给SparkContext的MASTER环境变量的值。master URL可以是以下任一种形式:

  • local 使用一个Worker线程本地化运行SPARK
    • local[*]使用逻辑CPU个数数量的线程来本地化运行Spark
    • local[K]使用K个Worker线程本地化运行Spark(理想情况下,K应该根据运行机器的CPU逻辑核数设定)
  • standalone
    官方自己开发的集群模式,地址为spark://HOST:PORT,连接到指定的Spark standalone master。默认端口是7077。利用该模式不能解决单点故障问题,可以使用zookeeper解决该问题。
  • yarn
    • yarn-client以客户端模式连接YARN集群。
    • yarn-cluster 以集群模式连接YARN集群。集群的位置可以在HADOOP_CONF_DIR 环境变量中找到,默认Hadoop namenode 8088端口。
  • mesos
    mesos://HOST:PORT 连接到指定的Mesos集群。默认接口是5050.

运行方式

该部分查看Spark2.1源码入口程序而知,以后更新版本可能会有所改变不得而知。

Spark程序运行包括交互式运行(除spark-submit和restful api的形式)和脚本提交。以下从交互式方式说起。

程序提交

  • spark-shell
    bash脚本启动交互式scala环境,脚本内部执行main函数,调用spark-submit脚本,指定java程序的中main入口,比如:
    1
    "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"

$@ 是脚本运行时带的所有参数

  • pyspark
    bash脚本启动交互式python环境,脚本内部执行main函数,调用spark-submit脚本,指定java程序的中main入口,比如:
    1
    exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"

spark-submit探讨

  • 脚本说明
    程序很简单,将接收参数转到spark-class脚本的第二个参数。

    1
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
  • spark-class
    spark-class是所有提交程序的入口,代码相比另外的脚本复杂一些,归根到底执行一条命令,通过后面解释可知道,该命令执行scala程序:

    1
    exec "${CMD[@]}"

CMD变量的构造关键在这里(调用java程序生成命令行字符串):

1
2
3
4
5
6
7
8
9
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "$@")

$RUNNER是本地机器java的地址。因此,我们可以知道,整个交互式的spark程序的入口程序类似如下语法:

1
2
3
exec (java -Xmx128m -cp xxx org.apache.spark.launcher.Main “org.apache.spark.deploy.SparkSubmit
+ pyspark-shell-main --name "PySparkShell" "$@"
(或 --class org.apache.spark.repl.Main --name "Spark shell" "$@")”)

对于单纯spark-submit的脚本提交方式,只是

1
org.apache.spark.deploy.SparkSubmit

后面带的参数不同而已。

  • org.apache.spark.launcher.Main
    Java入口程序 将输出以’\0’分割的字符串命令

  • org.apache.spark.deploy.SparkSubmit
    scala程序,真正的 spark入口

提交Spark应用

对于交互式的传给你下,会在启动的时候自动构建SparkContext,名称为sc。其它方式由开发者新建SparkContext对象。

基本操作

程序提交

主要方式

  • 官方方式
    • 命令行
    • REST API
    • SparkLauncher类接口
  • 相关第三方工具
    • Spark-jobserver
    • Livy
    • Oozie

使用Spark

Pyspark

1
./spark/bin$ ./pyspark --executor-memory 4G --total-executor-cores 80

例子:

1
2
3
./spark/bin/pyspark --executor-memory 4G --total-executor-cores 20 --packages com.databricks:spark-csv_2.10:1.4.0
./spark/bin/spark-shell --executor-memory 4G --total-executor-cores 20 --packages com.databricks:spark-csv_2.10:1.4.0

说明:
通过添加packages,可以使一下转换操作合法,paquet转换为csv格式:

1
row_1000_df.write.format("com.databricks.spark.csv").save(filepath)

Submit

例子:

1
./spark/bin/spark-submit --executor-memory 4G --total-executor-cores 80 --py-files loadDataSetSpark.py mainSpark.py

Launcher

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
Map<String, String> env = new HashMap<String,String>();
env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
Process spark = new SparkLauncher(env)
.setAppResource("file:/data/home/gujw/projects/spark/jar_test/SparkDataSet-0.0.1-SNAPSHOT.jar")
.setMainClass("com.SparkMain")
.setConf("spark.cores.max", "20")
.addSparkArg("--verbose")
.setMaster("spark://192.168.0.11:7077")
.setConf("spark.dynamicAllocation.enabled", "true")
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.setVerbose(true)
.launch();

认识RDD

概念

  • RDD:弹性分布式数据集,表示分布在多个计算节点上可以并行操作的元素集合。
  • 操作:转化操作(transformation,以是否shuffle定stage的边界)和行动操作(action,决定了job的划分),可通过返回值区别,转化操作返回RDD类型,行动操作为其它类型。

如以下lines就是一个RDD:

1
2
lines = sc.parallelize(['pandas','i like pandas'])
lines = sc.textFile('readme.md')

lines是Spark的RDD,第二行的lines包含了在哪些机器上有file文件的块,信息是从HDFS加载而来。每文件块映射到RDD上就是一个分区。如果一个文件块128MB(默认),那么HDFS上一个1GB大小的文件就有8个文件块,由这个文件创建的RDD就会有8个分区。

1
2
3
4
5
6
# 能把这个RDD缓存,降低action计算的成本(因为每次新的action操作都要重新计算一次),对应的就有RDD.unpersist(),当内存吃紧时可以用
# persist(storageLevel=StorageLevel(False, True, False, False, 1))
RDD.persist()
# Persist this RDD with the default storage level (MEMORY_ONLY_SER).
RDD.cache()

RDD操作流程示例

1
2
3
4
5
lines_data = sc.textFile('readme.md')
lines = lines_data.filter()
lines.persist()
lines.count()
lines.first()
  • rdd.collect()操作是将数据存在单台机器的内存(driver)上;
  • rdd.filter(lambda x: x > 90)会分发整个self对象;

RDD基本函数

更多见官网

  • RDD操作:

    • distinct()
    • union(rdd)
    • intersection(rdd) #交集
    • substract(rdd) #减去交集
  • 转化操作:

    • map(func) # 能实现list append
    • flatMap(func) # 能实现list expend
  • 行动操作:

    • reduce(fun)
    • top(n),take(n)
  • pair RDD转化操作:

    • mapValues(func): rdd.mapValues(lambda x : x+1) # key不变,value+1
    • reduceByKey(func) #接收对相同的key的2个value参数
    • keys()
    • values()
    • combineByKey():该函数用于对key的值进行各种操作,相比其它ByKey更原生,计算(key,mean)例子如下:
1
2
3
4
5
6
sumCount = keyValue
.combineByKey((lambda x: (x,1),
(lambda x,value:(x[0] + y,x[1] + 1)),
lambda x,y: (x[0]+y[0],x[1]+y[1])))
sumCount.map(lambda key,xy:(key,xy[0]/xy[1])).collectAsMap()
1
2
3
# 其它:
rdd.reduceByKey(func) == rdd.groupByKey().mapValues(lambda x:x.reduce(func))
rdd1.join(rdd2) #{(1,(2,3)),(2,(4,5)),...}
  • pair RDD行动操作:
    • rdd.lookup(1) #{(1,2),(1,3),(2,3)} 返回[2,3]

存取文件

存取HDFS并保存为一般格式

1
2
tow_state = sc.textFile('/user/enjoyhot/town_state.csv')
tow_state.saveAsTextFile('/user/enjoyhot/town_state.csv')

在Hadoop namenode 50070端口可查看上传到分布式文件系统的情况。

存取pickle文件

python独有

1
2
3
4
adData = sc.pickleFile('/user/enjoyhot/2016-06-01’)
title=Row('field1','field2','field3')
ad_df = sqlContext.createDataFrame(adData.map(lambda e:title(*e)))
ad_df.select(['field1','field2']).show()

存取parquet格式文件

parquet格式,一种流行的文件列式存储格式,相对高效。

转换与保存

1
2
3
4
5
6
7
8
9
#from pyspark.sql import *
#sqlc = SQLContext(sc)
lines_rdd = sc.textFile('/user/gujw/town_state.csv').map(lambda line: line.split(","))
header = lines_rdd.first()
rdd = lines_rdd.filter(lambda x:x!=header)
tow_state_df = rdd.toDF(['id','town','state']) # ['id','town','state']
tow_state_df.show()
# tow_state_df.write.parquet(save_path)
tow_state_df.saveAsParquetFile('/user/gujw/test/town_state.parquet')

读取与操作

1
2
3
4
5
6
7
from pyspark.sql import *
sqlc = SQLContext(sc)
df = sqlc.read.parquet("/user/gujw/test/town_state.parquet")
df.registerTempTable("dft")
sqlc.cacheTable("dft")
s = "select id,town from dft"
sqlc.sql(s).show()

此处输入图片的描述

本地打印元素

1
2
3
4
id_town_list = sqlc.sql(s).collect()
for item in id_town_list:
print item[0]
break

此处输入图片的描述

topK操作

1
2
3
4
5
6
7
8
hello = sc.parallelize([1,2,3,4,4])
mapA = hello.map(lambda x:(x,1))
reduceA = mapA.reduceByKey(lambda a,b:a+b)
print reduceA.collect()
sortedA = reduceA.map(lambda (x,y):(y,x)).sortByKey(ascending=False).take(3)
sortedB = reduceA.map(lambda x:(x[1],x[0])).sortByKey(ascending=False).take(3)
print sortedA
print sortedB
1
2
3
4
## 结果
[(1, 1), (2, 1), (3, 1), (4, 2)]
[(2, 4), (1, 1), (1, 2)]
[(2, 4), (1, 1), (1, 2)]

Shuffle

概念

Shuffle操作介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。如图:


此处输入图片的描述

可将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。

## 前期Shuffle方式

如图所示,为Spark和Hadoop最基本的shuffle方式。

此处输入图片的描述

- 每一个Mapper会根据Reducer的数量创建出相应的bucket(一个抽象的概念),bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。
- Mapper产生的结果会根据设置的partition算法填充到每个bucket中去。这里的partition算法是可以自定义的,默认的算法是根据key哈希到不同的bucket中去。
- Reducer启动时,它会根据自己task的id和所依赖的Mapper的id从远端或是本地的block manager中取得相应的bucket作为Reducer的输入进行处理。

> bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。

PS:Spark的内存计算是指Job中间输出结果可以保存在内存中,不是说shuffle过程的中间实现,Map结果的分片数据Spark和MapReduce都存放在磁盘上。

## 发展
### hash-based
https://github.com/JerryLead/SparkInternals/blob/master/markdown/4-shuffleDetails.md
http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/
Spark 0.8.1做了改进后还是有缺点:

- shuffle write过程中会产生大量的shffle文件,总体来说多少个reduce task,一台机器就有多少个文件,封装在shuffleFileGroup概念的文件中(图中横向的一行),不再是MXR,bucket数量依然为MXR,但此时M代表的是一次map的write task数量,即CPU核数,【一个bucket将会对应writer handler的buffer,内存开销依然很大,可能有误】。因此必要的措施是减少Mapper和Reducer的数量。看图:
此处输入图片的描述
- 另一方面,shffle read中在做诸如groupByKey操作时,需要将每个partition中的value都保存到同一key对应的hashMap中,就得确保Map操作对应的partition足够小到内存能够容纳,因此合理的设计是增加task的数量,task数量增多又会带来buffer开销更大的问题,因此陷入了内存使用的两难境地。

- 来自知乎
> 以前对 shuffle write/read 的分类是 sort-based 和 hash-based。MapReduce 可以说是 sort-based,shuffle write 和 shuffle read 过程都是基于key sorting 的 (buffering records + in-memory sort + on-disk external sorting)。早期的 Spark 是 hash-based,shuffle write 和 shuffle read 都使用 HashMap-like 的数据结构进行 aggregate (without key sorting)。但目前的 Spark 是两者的结合体,shuffle write 可以是 sort-based (only sort partition id, without key sorting),shuffle read 阶段可以是 hash-based。因此,目前 sort-based 和 hash-based 已经“你中有我,我中有你”,界限已经不那么清晰。
参考:https://www.zhihu.com/question/27643595

### SortBasedShuffle
“取代”Hash BasedShuffle作为默认选项的原因是什么?

- hashbased shuffle的每个mapper都需要为每个reducer写一个文件,需要产生MR个数量的文件,如果mapper和reducer的数量比较大,产生的文件数会非常多。hashbased shuffle设计的目标之一就是避免不需要的排序(Hadoop Map Reduce被人诟病的地方,很多不需要sort的地方的sort导致了不必要的开销)。但是它在处理超大规模数据集的时候,产生了大量的DiskIO和内存的消耗,这无疑很影响性能。
- hashbased shuffle也在不断的优化中,为了更好的解决这个问题,*Spark 1.1
引入了Sortbased shuffle。首先,每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理的数据。避免产生大量的文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件对系统带来的压力。
- Spark2.0中已明确指出移除掉hash-based shuffle,详见release-note


# 提高速度的方法

## 读取文件
1
2
RDD = sc.textFile(dir | part-*.txt) # 读取目录或正则匹配
pairRDD = sc.wholeTextFile(dir | part-*.txt)


wholeTextFile如果是读一个文件,一次读取所有行,返回
1
rdd(filepath,contents)


## 代码逻辑层面

### 对数据分区
- 如pair RDD采用partitionBy(100)进行自定义哈希分区,避免不必要的混洗,但注意需要persist持久化才能避免重新分区;对RDD的结果有分区的是
- cogroup
- groupWith
- join,leftOuterJoin,rightOuterJoin
- groupByKey,reduceByKey,combineByKey
- partitionBy
- sort
- mapValues(父RDD需有分区),flatMapValues(父RDD需有分区)
- 自定义分区
如下代码,拥有相似的URL页面可能会被分到完全不同的节点上,然而,同一个域名下的网页更有可能相互链接,因此,分区时考虑将rdd中拥有同一个域名的url放在一起。
1
2
3
4
5
6
7
8
9
10
"""
>>> print urlparse.urlparse("http://www.baidu.com").netloc
www.baidu.com
>>> print urlparse.urlparse("http://www.baidu.com/a/b").netloc
www.baidu.com
"""
import urlparse
def hash_domain(url):
return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(100,hash_domain)


更详细的实战推荐两篇美团的文章:

- Spark性能优化指南-基础篇
- Spark性能优化指南-高级篇

# 实时计算相关

## Spark Streaming

### 消费数据

Spark可以接受来自文件系统, Akka actors, rsKafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源或者你自己定义的输入源。

- 读取TCP源数据
1
2
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);


简单测试的方法:
运行Netcat工具作为数据服务器,在netcat服务器中输入的每一行都会被读取,在Spark streaming程序中做好统计即可。
1
$ nc -lk 9999


- 消费kafka数据
1
2
3
4
5
6
7
8
9
10
11
SparkConf conf = new SparkConf().setAppName("streaming_top10").setMaster("local[4]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.checkpoint(Constants.CHECKPOINT);
String zk_quorum = Constants.ZK_QUORUM;
String consumer_group_id="test-consumer-group";
Map<String, Integer> topicMap = new HashMap <String, Integer>();
topicMap.put("topic-kafka", 1);
JavaPairReceiverInputDStream<String,String> messages =
KafkaUtils.createStream(jssc, zk_quorum, consumer_group_id, topicMap);


- 待续

### Spark Streaming vs Storm
Spark流模块先汇聚批量数据然后进行数据块分发(视作不可变数据进行处理),而Storm是只要接收到数据就实时处理并分发。

- 延迟
- 根本的区别在于处理模型,Storm处理的是每次传入的一个事件,而Spark Streaming是处理某个时间段窗口内的事件流。因此,Storm处理一个事件可以达到秒内的延迟,而Spark Streaming则有几秒钟的延迟。

- 容错性
- Spark Streaming提供了更好的支持容错状态计算。在Storm中,每个单独的记录当它通过系统时必须被跟踪,所以Storm能够至少保证每个记录将被处理一次,但是在从错误中恢复过来时候允许出现重复记录。这意味着可变状态可能不正确地被更新两次。
- 对于Storm而言,其优势在于延迟低,如果对严格的一次处理保证有比较高的要求,此时也可考虑使用Trident。不过这种情况下其他流处理框架如spark streaming也许更适合。

## SparkSQL
实际上该功能是否真正实时依然由业务具体决定,对于比较轻量级的操作,可以直接返回,做到准实时。


# 资源管理系统

## Yarn
### 结构
Hadoop2.0对MapReduce框架做了彻底的设计重构,称Hadoop2.0中的MapReduce为MRv2或者Yarn。

- Hadoop1.x主要组件
JobTracker和TaskTracker
- Hadoop2.X中引入yarn之组件
- ResourceManger:全局的资源管理器进程,它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Application Manager,ASM)。ResourceManager 将各个资源部分(计算、内存、带宽等)精心安排给基础 NodeManager(YARN 的每节点代理)。ResourceManager 还与 ApplicationMaster 一起分配资源,与 NodeManager 一起启动和监视它们的基础应用程序。
- 应用程序管理器(Application Manager)
应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。
- ApplicationMaster:
对于Map-Reduce计算模型而言有它自己的ApplicationMaster实现,用于向ResourceManager(全局的)申请计算资源(Containers)并且和NodeManager交互来执行和监控具体的task。
- NodeManager:
YARN中每个节点上的代理,管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务(auxiliary service)。
- container
Yarn对计算机计算资源的抽象,它其实就是一组CPU和内存资源。

### 调度器
在Yarn中有三种调度器可以选择:

- FIFO Scheduler
可能导致其它应用被阻塞
- Capacity Scheduler
有一个专门用来运行小任务的队列,意味着一些需要大量资源的任务执行时间会相比可用上所有资源的FIFO调度器时间长。
- FairS cheduler
资源运行时动态调整。

此处输入图片的描述

粗细粒度探讨

细粒度可以提高CPU的利用率,但对于短作业延迟大。待补充……

进一步了解Spark

基于Spark1.6

应用调度策略

两种调度策略

笔者认为,网上各种job的说法其实都是在说应用,而不是spark中的job,在这里我们就将job理解为普通作业,即app即可。
实际上,app对应一个调度池,而每个APP每个stage对应一个带有JobId和TaskSetManagerId的TaskSetMananger,调度池先根据JobId进行排序,再根据TaskSetManagerId排序,小的优先调度,因此job依然是通过FIFO进行调度的。

  • FIFO
    在默认情况下,Spark的调度器以FIFO(先进先出)方式调度Job的执行,standalone也只支持这一种。第一个Job优先获取所有可用的资源,接下来第二个Job再获取剩余资源。以此类推,如果第一个Job并没有占用所有的资源,则第二个Job还可以继续获取剩余资源,这样多个Job可以并行运行,否则,第二个Job就需要等待第一个任务执行完,释放空余资源,再申请和分配Job。在mesos和yarn下,有多队列调度器,如本文yarn调度器部分,通过合理设置多个队列分配资源,可以做到多个作业并行执行。

  • FAIR
    在FAIR共享模式调度下,Spark在多Job之间以轮询(round robin)方式为任务分配资源,所有的任务拥有大致相当的优先级来共享集群的资源。

参数设置

  • schedulingMode:该属性的值可以是FIFO或者FAIR,用来控制作业在调度池中排队运行(默认情况下)或者公平分享调度池资源。
  • weight:控制调度池在集群之间的分配。默认情况下,所有调度池的weight值都是为1。例如:如果你指定了一个调度池的值为2,那么这个调度池就比其它调度池多获得2倍的资源。设置一个更高的weight值,例如1000,就可以实现线程池之间的优先权——实际上,weight值为1000的调度池无论什么时候作业被激活,它都总是能够最先运行。
  • minShare:除了一个整体的权重,如果管理员喜欢,可以给每个调度池指定一个最小的shares值(也就是CPU的核数目)。公平调度器通过权重重新分配资源之前总是试图满足所有活动调度池的最小share。在没有给定一个高优先级的其他集群中,minShare属性是另外的一种方式来确保调度池能够迅速的获得一定数量的资源(例如10核CPU),默认情况下,每个调度池的minShare值都为0。

(scheduling mode 值是FIFO,weight值为1,minShare值为0)。

Spark map reduce基本原理

RDD可理解为关系数据库里的一个个操作,比如 map,filter,Join 等。在 Spark 里面实现了许多这样的RDD类,即可以看成是操作类。

  • 当我们调用一个map接口,底层实现是会生成一个MapPartitionsRDD对象,当RDD真正执行时,会调用MapPartitionsRDD对象里面的compute方法来执行这个操作的计算逻辑。
  • 但是不同的是,RDD是lazy模式,只有像count,saveasText这种action动作被调用后再会去触发runJob动作。

RDD

基本组成

Spark1.6 中,有以下四个函数比较重要:

  • def compute(split: Partition, context: TaskContext): Iterator[T]
    作用:用于计算,主要负责的是父RDD分区数据到子RDD分区数据的变换逻辑
  • protected def getPartitions: Array[Partition]
    作用:获取分片消息
  • protected def getDependencies: Seq[Dependency[_]]
    作用:获取父RDD的依赖关系,依赖分二种——如果RDD的每个分区最多只能被一个Child RDD的一个分区使用,则称之为narrow dependency(如map);若依赖于多个 Child RDD 分区,则称之为 wide dependency(如join)。
  • protected def getPreferredLocations(split: Partition): Seq[String]
    作用:获取Spark的执行模式,local等。

RDD,DataFrame,DataSet

  • RDD缺点
    • 序列化和反序列化的性能开销
      无论是集群间的通信,还是IO操作都需要对对象的结构和数据进行序列化和反序列化。
    • GC的性能开销
      频繁的创建和销毁对象, 势必会增加GC。
  • DataFrame特点
    • schema
      RDD每一行的数据,结构都是一样的。这个结构存储在schema中,Spark通过schame就能够读懂数据,因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。
    • off-heap
      Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中,内存直接受操作系统管理(而不是JVM)。当要操作数据时, 就直接操作off-heap内存。由于Spark理解schema, 所以知道该如何操作。

通过schema和off-heap,DataFrame解决了RDD的缺点,但是却丢了RDD的优点。DataFrame不是类型安全的, API也不是面向对象风格的。

  • Dataset
    DataSet以Catalyst逻辑执行计划表示,Dataset跟RDD相似,但是Dataset并没有使用Java序列化库和Kryo序列化库,而是使用特定Encoder来序列化对象。并且,序列化后数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。

blockManager

实现缓存的重要类,通过类似于在内部再构建了一个KV系统,K表示每个分区 ID 号,V 表示这个分区计算后的结果。

  • 例如在streaming计算时,每个batch会去消息队列上拉取这个时间段的数据,每个Recevier接收过来数据形成block块并存放到blockManager上,为了可靠性,这个block块可以远程备份,后续的batch计算就直接在之前已读取的block块上进行计算,这样不断循环迭代来完成流处理。

计算任务流程的梳理

划分Stage

当某个操作触发计算,向DAGScheduler提交作业时,DAGScheduler需要从RDD依赖链最末端的RDD出发,遍历整个RDD依赖链,划分Stage任务阶段,并决定各个Stage之间的依赖关系。

提交

  • 理解
    Stage—>TaskSet

    1
    2
    3
    4
    5
    DAGScheduler.call(TaskSet){
    res = TaskScheduler.submit(TaskSet){
    TaskSetManager(res)
    }
    }
  • 文字说明
    每个Stage的提交,最终是转换成一个TaskSet任务集的提交,DAGScheduler通过TaskScheduler接口提交stage(TaskSet),每个TaskSet最终会触发TaskScheduler构建一个TaskSetManager(调度单位)的实例来管理这个TaskSet的生命周期,对于DAGScheduler来说提交Stage的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过 TaskSetManager调度具体的Task到对应的Executor节点上进行运算。


    1.6.0版本实现

    • ExecutorBackend:在Worker上执行Task的线程组
  • SchedulerBackend:主要用来与Worker中的ExecutorBackend建立连接,用来向Executor发送要执行任务,或是接受执行任务的结果,也可以用来创建AppClient(包装App信息,包含可以创建CoarseGrainedExecutorBackend实例Command),用于向Master汇报资源需求

监控

DAGScheduler就必然需要监控当前Job/Stage乃至Task的完成情况。这是通过对外(主要是对TaskScheduler)暴露一系列的回调函数来实现的。

任务结果的获取

一个具体的任务在Executor中执行完毕以后,其结果需要以某种形式返回给DAGScheduler,根据任务类型的不同,任务的结果的返回方式也不同:

  • 对于FinalStage所对应的任务(触发action的那个,对应的类为ResultTask)返回给DAGScheduler的是运算结果本身;
  • 而对于 ShuffleMapTask,返回给DAGScheduler的是一个MapStatus对象,MapStatus对象管理了ShuffleMapTask的运算输出结果在BlockManager里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个Stage的任务获取输入数据的依据。
    参考:http://blog.csdn.net/laiwenqiang/article/details/50032171

适用场景

不适合超大数据量的计算

Spark适用于那些在多个并行操作之间重用数据的应用,通过rdd是基于内存的优点免去了传统MapReduce不断读写磁盘的IO损耗,但基于内存的rdd操作,可想而知,数据量一大就容易产生OOM的问题。

不适合异步更新模型

由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如增量的web爬虫和索引,以及在一些机器学习和数据挖掘(MLDM)算法上表现并非最优,spark ML与Mahout都是采用Iterative MapReduce架构,都是同步迭代,而关于迭代式算法:

迭代式算法,许多机器学习算法都用一个函数对相同的数据(优点)进行重复的计算,更新同一个模型(局限)。

同步迭代的缺点,存在木桶效应,可参考该图:


此处输入图片的描述

图参考:链接

就是说,对于那种增量修改的应用模型不太适合。因为rdd的操作可以理解为分散式的,每个分散的任务不是针对同一个共同体。(笔者认为,假如硬是要通过spark共享一个共同体,一般实现是每个任务完成后重写共同体,共同体全局可见,因此对于细粒度而言,任务一多,通信开销可想而知,但这对于CPU密集型,网络开销小的硬件如GPU不是问题)。这是一个模型并行化和数据并行化的问题。

Comments