C*********r 发帖数: 21 | 1 最近刚开始学Spark,因为没有看过源代码,麻烦大家想请教几个问题。
1. RDD在创建的时候是lazy的,并且有narrow和wide两种类型,例如map,filter是
narrow类型的所以不需要额外的shuffle。但是groupbykey之类的就需要shuffle了。并
且每个RDD会保存有自己所对应的partition信息。我的理解是每个action或者
transformation都会被Spark internally翻译成map reduce job来执行。
我的问题是翻译之后的map reduce job的并行度是和哪些因素有关系呢?partition的
数量还有executor的数量还有executor-core的数量之间是什么样的关系呢?例如
parition越多并行的task也会越多么?
2. 当我groupbykey的时候RDD会记录下我当前的RDD中有多少个key value么?并且这些
key value和上面说的partition有什么样的联系呢,还是完全不想关的两个概念?
3. 一个RDD的action或者transformation会被转换成多少map reduce task是由什么决
定的呢?
4. task,executor,executor-core之间的关系是怎么样的呢?
谢谢! |
J****R 发帖数: 373 | 2 高手啊,这几个问题还真没怎么想过。
【在 C*********r 的大作中提到】 : 最近刚开始学Spark,因为没有看过源代码,麻烦大家想请教几个问题。 : 1. RDD在创建的时候是lazy的,并且有narrow和wide两种类型,例如map,filter是 : narrow类型的所以不需要额外的shuffle。但是groupbykey之类的就需要shuffle了。并 : 且每个RDD会保存有自己所对应的partition信息。我的理解是每个action或者 : transformation都会被Spark internally翻译成map reduce job来执行。 : 我的问题是翻译之后的map reduce job的并行度是和哪些因素有关系呢?partition的 : 数量还有executor的数量还有executor-core的数量之间是什么样的关系呢?例如 : parition越多并行的task也会越多么? : 2. 当我groupbykey的时候RDD会记录下我当前的RDD中有多少个key value么?并且这些 : key value和上面说的partition有什么样的联系呢,还是完全不想关的两个概念?
|
c******n 发帖数: 4965 | 3 rdd 跟 hadoop MR 的 input split 概念有点像
但是 spark 的独特是, rdd 上带了 transformer, master 可以把不同的 rdd merge
到一起 给 executor. 具体怎么弄我也不清楚, 你只能看 SRC code, 可以 先看 第
一个 release local executor, 很简单
【在 C*********r 的大作中提到】 : 最近刚开始学Spark,因为没有看过源代码,麻烦大家想请教几个问题。 : 1. RDD在创建的时候是lazy的,并且有narrow和wide两种类型,例如map,filter是 : narrow类型的所以不需要额外的shuffle。但是groupbykey之类的就需要shuffle了。并 : 且每个RDD会保存有自己所对应的partition信息。我的理解是每个action或者 : transformation都会被Spark internally翻译成map reduce job来执行。 : 我的问题是翻译之后的map reduce job的并行度是和哪些因素有关系呢?partition的 : 数量还有executor的数量还有executor-core的数量之间是什么样的关系呢?例如 : parition越多并行的task也会越多么? : 2. 当我groupbykey的时候RDD会记录下我当前的RDD中有多少个key value么?并且这些 : key value和上面说的partition有什么样的联系呢,还是完全不想关的两个概念?
|
f********r 发帖数: 304 | 4 并行数量取决于你的executor数目乘以executor core。这两个数值要根据你的yarn
slave node的数量和配置决定。比如你的cluster有5个node, 每个node 4 core 16G。
那么你的(num_executor,executor_core, executor_memory)可以设为(5, 4,
16)或者(10,2, 8)或者(20, 1,4)
当然exeuctor memory要注意留出一部分overhead大概是0.1,所以实际设置不能直接用
上述例子。每个executor是一个独立jvm process,core的数目是线程并发数。所以一
般core的数目对应cpu的vcore数目。partition的数目就是具体的task 数目,每个core
并行执行一个task。所以,按照上面的例子,如果你有100个partition,你可以同时运
算20个。如果同样你把partition数目增加,每个task理论计算时间会变小,但并行度
还是取决于你的launch config。要注意的是如果数据量很大然后partition很少会导致
内存溢出或GC时间太长然后driver和exeuctor time out。
通常建议每个exeuctor不多余5个core, 不超过64g内存。因为hdfs系统对并行写文件
的支持是有限制的。groupbykey一般不推荐使用。因为它在mapper阶段不做local
reduce会导致shuffle write的量增多,也就是reducer的shuffle read增加。推荐
reducebykey |
C*********r 发帖数: 21 | 5 many thanks for the clarification!
比如说我有一个两个column的table (name, money). 我想以name为key对money做一些
data mining(或者一些复杂的计算)。所以我会做以下的步骤
step1 -> 把table map 成一个file RDD
step2 -> file RDD recudeByKey(name) 或者groupByKey(name)
step3 -> 在第二步产生的RDD上面apply一些其他的运算
那么step2中的reduceByKey加入我设置partition number为100,但是我其实是有200个
distinct names,所以其实平均下来每个partiton里面会有两个(name, list
) pair么?然后每个partition会被分配给一个task进行执行。假如我的配置是(5,4,16
),所以每个partiton里面即使有两个pair,但还会顺序的进行执行,对么?这里是我
一直非常confuse的地方。。
如果这种情况下的化,我的partition数量是不是越接近不同name的数量越好,因为这
样并行度最大?
core
【在 f********r 的大作中提到】 : 并行数量取决于你的executor数目乘以executor core。这两个数值要根据你的yarn : slave node的数量和配置决定。比如你的cluster有5个node, 每个node 4 core 16G。 : 那么你的(num_executor,executor_core, executor_memory)可以设为(5, 4, : 16)或者(10,2, 8)或者(20, 1,4) : 当然exeuctor memory要注意留出一部分overhead大概是0.1,所以实际设置不能直接用 : 上述例子。每个executor是一个独立jvm process,core的数目是线程并发数。所以一 : 般core的数目对应cpu的vcore数目。partition的数目就是具体的task 数目,每个core : 并行执行一个task。所以,按照上面的例子,如果你有100个partition,你可以同时运 : 算20个。如果同样你把partition数目增加,每个task理论计算时间会变小,但并行度 : 还是取决于你的launch config。要注意的是如果数据量很大然后partition很少会导致
|
s******3 发帖数: 344 | 6
【在 C*********r 的大作中提到】 : 最近刚开始学Spark,因为没有看过源代码,麻烦大家想请教几个问题。 : 1. RDD在创建的时候是lazy的,并且有narrow和wide两种类型,例如map,filter是 : narrow类型的所以不需要额外的shuffle。但是groupbykey之类的就需要shuffle了。并 : 且每个RDD会保存有自己所对应的partition信息。我的理解是每个action或者 : transformation都会被Spark internally翻译成map reduce job来执行。 : 我的问题是翻译之后的map reduce job的并行度是和哪些因素有关系呢?partition的 : 数量还有executor的数量还有executor-core的数量之间是什么样的关系呢?例如 : parition越多并行的task也会越多么? : 2. 当我groupbykey的时候RDD会记录下我当前的RDD中有多少个key value么?并且这些 : key value和上面说的partition有什么样的联系呢,还是完全不想关的两个概念?
|
f********r 发帖数: 304 | 7
money>
16
你还是没有细读我上面的回文啊。并发数目和你的partition数目无关。并发数目之取
决于你的exeuctor数目和executor core数目。如果你的配置是5node4core那么并发数
目就是20,也就是20条thread并行执行20task。假如说你的cpu只有2core,你却设置成
4,那么没两条thread要share一个core,实际并发只有10。这就是为什么total
executor core的数目和你的cluster total 物理core的数目要对应。当然一般情况还
应该留一个core给其他service比如node manager,history server etc。
如果你在reducebykey阶段设置100个partition但是实际有200个不同的名字,那每个
task理论上会处理2个名字。但这不是一定保证的,因为你的data可能是skew的。比如
有的名字对应上千个money,有的只对应一两个。每个task上的任务必须是顺序执行的
,应为只有一条thread。
【在 C*********r 的大作中提到】 : many thanks for the clarification! : 比如说我有一个两个column的table (name, money). 我想以name为key对money做一些 : data mining(或者一些复杂的计算)。所以我会做以下的步骤 : step1 -> 把table map 成一个file RDD : step2 -> file RDD recudeByKey(name) 或者groupByKey(name) : step3 -> 在第二步产生的RDD上面apply一些其他的运算 : 那么step2中的reduceByKey加入我设置partition number为100,但是我其实是有200个 : distinct names,所以其实平均下来每个partiton里面会有两个(name, list : ) pair么?然后每个partition会被分配给一个task进行执行。假如我的配置是(5,4,16 : ),所以每个partiton里面即使有两个pair,但还会顺序的进行执行,对么?这里是我
|
f********r 发帖数: 304 | 8 我建议初学者不需要太刻意手动设置partition的数目,spark framework一般会帮你选
择合适的数目。如果你的cluster有上一千个节点,每个cpu都是8core的情况,那你理
论上可以同时运行8k个task。这种情况下你增加partition数目或许可能会有帮助。但
前提条件也是真正的运算dominate主要task时间,不是seriliazation/
deserialization或者network的overhead。如果task都是ms level说明你的partition
数目太多了。整个spark tuning是个很tricky的过程,我不建议花太多时间在这个上面
。大部分情况你的改进空间是很有限的。应该多花时间考虑你的算法逻辑,balance你
的dataset。 |
C*********r 发帖数: 21 | 9 谢谢!感觉现在RDD的action还有operation的数量太少,有很多很复杂的逻辑如果全部
用RDD拼凑起来会变得很难维护,而且要花很多时间去migrate。所以一个折中的方法就
是把复杂的逻辑用java api wrap起来,然后pass给RDD进行执行。这样可以很快的搭起
来一个系统,但是只是利用了几个基本的RDD,真正复杂的逻辑其实和Spark没有太多关
系。所以才会直接跳到performance tuning的阶段。不知道这样使用Spark会有什么潜
在的问题么?
partition
【在 f********r 的大作中提到】 : 我建议初学者不需要太刻意手动设置partition的数目,spark framework一般会帮你选 : 择合适的数目。如果你的cluster有上一千个节点,每个cpu都是8core的情况,那你理 : 论上可以同时运行8k个task。这种情况下你增加partition数目或许可能会有帮助。但 : 前提条件也是真正的运算dominate主要task时间,不是seriliazation/ : deserialization或者network的overhead。如果task都是ms level说明你的partition : 数目太多了。整个spark tuning是个很tricky的过程,我不建议花太多时间在这个上面 : 。大部分情况你的改进空间是很有限的。应该多花时间考虑你的算法逻辑,balance你 : 的dataset。
|
f********r 发帖数: 304 | 10 不知道你的复杂逻辑是什么。RDD就是Vector,你把RDD operation 想象成vector
operation就行了。并行设计主要还是考量怎么把运算逻辑并行化,选出data
independent的processing。你如果感兴趣,可以看看他们mlib的源码,有一些标准ml
的算法实现。
还有一个办法是你可以用scala重写一下你们现有的运算逻辑,尽量用fp的design
pattern。然后把涉及到的装data的collection全部改成RDD就可以用spark加速了。 |
c*********e 发帖数: 16335 | 11 linkedin都放弃scala了,你們还在用啊?scala和c++,都是曲高和寡的冬冬,最终结果
很难说啊。
ml
【在 f********r 的大作中提到】 : 不知道你的复杂逻辑是什么。RDD就是Vector,你把RDD operation 想象成vector : operation就行了。并行设计主要还是考量怎么把运算逻辑并行化,选出data : independent的processing。你如果感兴趣,可以看看他们mlib的源码,有一些标准ml : 的算法实现。 : 还有一个办法是你可以用scala重写一下你们现有的运算逻辑,尽量用fp的design : pattern。然后把涉及到的装data的collection全部改成RDD就可以用spark加速了。
|
f********r 发帖数: 304 | 12
就是个编程语言而已,用scala只不过因为spark是scala native 而已。他们自己都说
了,RDD level的话,scala比python还是要快一倍的。语言这玩意学着够用就行,将来
真不行了久换呗
【在 c*********e 的大作中提到】 : linkedin都放弃scala了,你們还在用啊?scala和c++,都是曲高和寡的冬冬,最终结果 : 很难说啊。 : : ml
|
c*********e 发帖数: 16335 | 13 python和php一个级别的,好多公司都不屑于使用。php做multi-threading那叫一个累
,c#的multi-threading和java的完全不一样。
【在 f********r 的大作中提到】 : : 就是个编程语言而已,用scala只不过因为spark是scala native 而已。他们自己都说 : 了,RDD level的话,scala比python还是要快一倍的。语言这玩意学着够用就行,将来 : 真不行了久换呗
|