由买买提看人间百态

boards

本页内容为未名空间相应帖子的节选和存档,一周内的贴子最多显示50字,超过一周显示500字 访问原贴
Programming版 - Spark RDD
相关主题
spark 到底牛在什么地方?请教一下,各位牛人觉得Rust语言怎么样?
spark RDD不能当K/V store是吧?Spark请教。
Spark PK Akka 完胜呀如何提高Spark在Yarn上的内存使用率
spark看了一边 没什么难点啊。7天掌握spark load数据速度
Spark 和 Dynamodb 之间 如何 连接板上有 spark 大牛么?
Spark已经out了,能跳船的赶快C++的并行架构库是如何实现内存回收?
关于spark的cache问题一直没想清楚谁给讲讲FP咋火起来的
怎样schedule spark applicationh2o好像突然火了
相关话题的讨论汇总
话题: rdd话题: executor话题: spark话题: partition话题: 数目
进入Programming版参与讨论
1 (共1页)
C*********r
发帖数: 21
1
最近刚开始学Spark,因为没有看过源代码,麻烦大家想请教几个问题。
1. RDD在创建的时候是lazy的,并且有narrow和wide两种类型,例如map,filter是
narrow类型的所以不需要额外的shuffle。但是groupbykey之类的就需要shuffle了。并
且每个RDD会保存有自己所对应的partition信息。我的理解是每个action或者
transformation都会被Spark internally翻译成map reduce job来执行。
我的问题是翻译之后的map reduce job的并行度是和哪些因素有关系呢?partition的
数量还有executor的数量还有executor-core的数量之间是什么样的关系呢?例如
parition越多并行的task也会越多么?
2. 当我groupbykey的时候RDD会记录下我当前的RDD中有多少个key value么?并且这些
key value和上面说的partition有什么样的联系呢,还是完全不想关的两个概念?
3. 一个RDD的action或者transformation会被转换成多少map reduce task是由什么决
定的呢?
4. task,executor,executor-core之间的关系是怎么样的呢?
谢谢!
J****R
发帖数: 373
2
高手啊,这几个问题还真没怎么想过。

【在 C*********r 的大作中提到】
: 最近刚开始学Spark,因为没有看过源代码,麻烦大家想请教几个问题。
: 1. RDD在创建的时候是lazy的,并且有narrow和wide两种类型,例如map,filter是
: narrow类型的所以不需要额外的shuffle。但是groupbykey之类的就需要shuffle了。并
: 且每个RDD会保存有自己所对应的partition信息。我的理解是每个action或者
: transformation都会被Spark internally翻译成map reduce job来执行。
: 我的问题是翻译之后的map reduce job的并行度是和哪些因素有关系呢?partition的
: 数量还有executor的数量还有executor-core的数量之间是什么样的关系呢?例如
: parition越多并行的task也会越多么?
: 2. 当我groupbykey的时候RDD会记录下我当前的RDD中有多少个key value么?并且这些
: key value和上面说的partition有什么样的联系呢,还是完全不想关的两个概念?

c******n
发帖数: 4965
3
rdd 跟 hadoop MR 的 input split 概念有点像
但是 spark 的独特是, rdd 上带了 transformer, master 可以把不同的 rdd merge
到一起 给 executor. 具体怎么弄我也不清楚, 你只能看 SRC code, 可以 先看 第
一个 release local executor, 很简单

【在 C*********r 的大作中提到】
: 最近刚开始学Spark,因为没有看过源代码,麻烦大家想请教几个问题。
: 1. RDD在创建的时候是lazy的,并且有narrow和wide两种类型,例如map,filter是
: narrow类型的所以不需要额外的shuffle。但是groupbykey之类的就需要shuffle了。并
: 且每个RDD会保存有自己所对应的partition信息。我的理解是每个action或者
: transformation都会被Spark internally翻译成map reduce job来执行。
: 我的问题是翻译之后的map reduce job的并行度是和哪些因素有关系呢?partition的
: 数量还有executor的数量还有executor-core的数量之间是什么样的关系呢?例如
: parition越多并行的task也会越多么?
: 2. 当我groupbykey的时候RDD会记录下我当前的RDD中有多少个key value么?并且这些
: key value和上面说的partition有什么样的联系呢,还是完全不想关的两个概念?

f********r
发帖数: 304
4
并行数量取决于你的executor数目乘以executor core。这两个数值要根据你的yarn
slave node的数量和配置决定。比如你的cluster有5个node, 每个node 4 core 16G。
那么你的(num_executor,executor_core, executor_memory)可以设为(5, 4,
16)或者(10,2, 8)或者(20, 1,4)
当然exeuctor memory要注意留出一部分overhead大概是0.1,所以实际设置不能直接用
上述例子。每个executor是一个独立jvm process,core的数目是线程并发数。所以一
般core的数目对应cpu的vcore数目。partition的数目就是具体的task 数目,每个core
并行执行一个task。所以,按照上面的例子,如果你有100个partition,你可以同时运
算20个。如果同样你把partition数目增加,每个task理论计算时间会变小,但并行度
还是取决于你的launch config。要注意的是如果数据量很大然后partition很少会导致
内存溢出或GC时间太长然后driver和exeuctor time out。
通常建议每个exeuctor不多余5个core, 不超过64g内存。因为hdfs系统对并行写文件
的支持是有限制的。groupbykey一般不推荐使用。因为它在mapper阶段不做local
reduce会导致shuffle write的量增多,也就是reducer的shuffle read增加。推荐
reducebykey
C*********r
发帖数: 21
5
many thanks for the clarification!
比如说我有一个两个column的table (name, money). 我想以name为key对money做一些
data mining(或者一些复杂的计算)。所以我会做以下的步骤
step1 -> 把table map 成一个file RDD
step2 -> file RDD recudeByKey(name) 或者groupByKey(name)
step3 -> 在第二步产生的RDD上面apply一些其他的运算
那么step2中的reduceByKey加入我设置partition number为100,但是我其实是有200个
distinct names,所以其实平均下来每个partiton里面会有两个(name, list
) pair么?然后每个partition会被分配给一个task进行执行。假如我的配置是(5,4,16
),所以每个partiton里面即使有两个pair,但还会顺序的进行执行,对么?这里是我
一直非常confuse的地方。。
如果这种情况下的化,我的partition数量是不是越接近不同name的数量越好,因为这
样并行度最大?

core

【在 f********r 的大作中提到】
: 并行数量取决于你的executor数目乘以executor core。这两个数值要根据你的yarn
: slave node的数量和配置决定。比如你的cluster有5个node, 每个node 4 core 16G。
: 那么你的(num_executor,executor_core, executor_memory)可以设为(5, 4,
: 16)或者(10,2, 8)或者(20, 1,4)
: 当然exeuctor memory要注意留出一部分overhead大概是0.1,所以实际设置不能直接用
: 上述例子。每个executor是一个独立jvm process,core的数目是线程并发数。所以一
: 般core的数目对应cpu的vcore数目。partition的数目就是具体的task 数目,每个core
: 并行执行一个task。所以,按照上面的例子,如果你有100个partition,你可以同时运
: 算20个。如果同样你把partition数目增加,每个task理论计算时间会变小,但并行度
: 还是取决于你的launch config。要注意的是如果数据量很大然后partition很少会导致

s******3
发帖数: 344
6


【在 C*********r 的大作中提到】
: 最近刚开始学Spark,因为没有看过源代码,麻烦大家想请教几个问题。
: 1. RDD在创建的时候是lazy的,并且有narrow和wide两种类型,例如map,filter是
: narrow类型的所以不需要额外的shuffle。但是groupbykey之类的就需要shuffle了。并
: 且每个RDD会保存有自己所对应的partition信息。我的理解是每个action或者
: transformation都会被Spark internally翻译成map reduce job来执行。
: 我的问题是翻译之后的map reduce job的并行度是和哪些因素有关系呢?partition的
: 数量还有executor的数量还有executor-core的数量之间是什么样的关系呢?例如
: parition越多并行的task也会越多么?
: 2. 当我groupbykey的时候RDD会记录下我当前的RDD中有多少个key value么?并且这些
: key value和上面说的partition有什么样的联系呢,还是完全不想关的两个概念?

f********r
发帖数: 304
7

money>
16
你还是没有细读我上面的回文啊。并发数目和你的partition数目无关。并发数目之取
决于你的exeuctor数目和executor core数目。如果你的配置是5node4core那么并发数
目就是20,也就是20条thread并行执行20task。假如说你的cpu只有2core,你却设置成
4,那么没两条thread要share一个core,实际并发只有10。这就是为什么total
executor core的数目和你的cluster total 物理core的数目要对应。当然一般情况还
应该留一个core给其他service比如node manager,history server etc。
如果你在reducebykey阶段设置100个partition但是实际有200个不同的名字,那每个
task理论上会处理2个名字。但这不是一定保证的,因为你的data可能是skew的。比如
有的名字对应上千个money,有的只对应一两个。每个task上的任务必须是顺序执行的
,应为只有一条thread。

【在 C*********r 的大作中提到】
: many thanks for the clarification!
: 比如说我有一个两个column的table (name, money). 我想以name为key对money做一些
: data mining(或者一些复杂的计算)。所以我会做以下的步骤
: step1 -> 把table map 成一个file RDD
: step2 -> file RDD recudeByKey(name) 或者groupByKey(name)
: step3 -> 在第二步产生的RDD上面apply一些其他的运算
: 那么step2中的reduceByKey加入我设置partition number为100,但是我其实是有200个
: distinct names,所以其实平均下来每个partiton里面会有两个(name, list
: ) pair么?然后每个partition会被分配给一个task进行执行。假如我的配置是(5,4,16
: ),所以每个partiton里面即使有两个pair,但还会顺序的进行执行,对么?这里是我

f********r
发帖数: 304
8
我建议初学者不需要太刻意手动设置partition的数目,spark framework一般会帮你选
择合适的数目。如果你的cluster有上一千个节点,每个cpu都是8core的情况,那你理
论上可以同时运行8k个task。这种情况下你增加partition数目或许可能会有帮助。但
前提条件也是真正的运算dominate主要task时间,不是seriliazation/
deserialization或者network的overhead。如果task都是ms level说明你的partition
数目太多了。整个spark tuning是个很tricky的过程,我不建议花太多时间在这个上面
。大部分情况你的改进空间是很有限的。应该多花时间考虑你的算法逻辑,balance你
的dataset。
C*********r
发帖数: 21
9
谢谢!感觉现在RDD的action还有operation的数量太少,有很多很复杂的逻辑如果全部
用RDD拼凑起来会变得很难维护,而且要花很多时间去migrate。所以一个折中的方法就
是把复杂的逻辑用java api wrap起来,然后pass给RDD进行执行。这样可以很快的搭起
来一个系统,但是只是利用了几个基本的RDD,真正复杂的逻辑其实和Spark没有太多关
系。所以才会直接跳到performance tuning的阶段。不知道这样使用Spark会有什么潜
在的问题么?

partition

【在 f********r 的大作中提到】
: 我建议初学者不需要太刻意手动设置partition的数目,spark framework一般会帮你选
: 择合适的数目。如果你的cluster有上一千个节点,每个cpu都是8core的情况,那你理
: 论上可以同时运行8k个task。这种情况下你增加partition数目或许可能会有帮助。但
: 前提条件也是真正的运算dominate主要task时间,不是seriliazation/
: deserialization或者network的overhead。如果task都是ms level说明你的partition
: 数目太多了。整个spark tuning是个很tricky的过程,我不建议花太多时间在这个上面
: 。大部分情况你的改进空间是很有限的。应该多花时间考虑你的算法逻辑,balance你
: 的dataset。

f********r
发帖数: 304
10
不知道你的复杂逻辑是什么。RDD就是Vector,你把RDD operation 想象成vector
operation就行了。并行设计主要还是考量怎么把运算逻辑并行化,选出data
independent的processing。你如果感兴趣,可以看看他们mlib的源码,有一些标准ml
的算法实现。
还有一个办法是你可以用scala重写一下你们现有的运算逻辑,尽量用fp的design
pattern。然后把涉及到的装data的collection全部改成RDD就可以用spark加速了。
c*********e
发帖数: 16335
11
linkedin都放弃scala了,你們还在用啊?scala和c++,都是曲高和寡的冬冬,最终结果
很难说啊。

ml

【在 f********r 的大作中提到】
: 不知道你的复杂逻辑是什么。RDD就是Vector,你把RDD operation 想象成vector
: operation就行了。并行设计主要还是考量怎么把运算逻辑并行化,选出data
: independent的processing。你如果感兴趣,可以看看他们mlib的源码,有一些标准ml
: 的算法实现。
: 还有一个办法是你可以用scala重写一下你们现有的运算逻辑,尽量用fp的design
: pattern。然后把涉及到的装data的collection全部改成RDD就可以用spark加速了。

f********r
发帖数: 304
12

就是个编程语言而已,用scala只不过因为spark是scala native 而已。他们自己都说
了,RDD level的话,scala比python还是要快一倍的。语言这玩意学着够用就行,将来
真不行了久换呗

【在 c*********e 的大作中提到】
: linkedin都放弃scala了,你們还在用啊?scala和c++,都是曲高和寡的冬冬,最终结果
: 很难说啊。
:
: ml

c*********e
发帖数: 16335
13
python和php一个级别的,好多公司都不屑于使用。php做multi-threading那叫一个累
,c#的multi-threading和java的完全不一样。

【在 f********r 的大作中提到】
:
: 就是个编程语言而已,用scala只不过因为spark是scala native 而已。他们自己都说
: 了,RDD level的话,scala比python还是要快一倍的。语言这玩意学着够用就行,将来
: 真不行了久换呗

1 (共1页)
进入Programming版参与讨论
相关主题
h2o好像突然火了Spark 和 Dynamodb 之间 如何 连接
Spark上怎么join avro format的数据?Spark已经out了,能跳船的赶快
向版上大牛们请教一个spark的问题,多谢!关于spark的cache问题一直没想清楚
Partitioning (转载)怎样schedule spark application
spark 到底牛在什么地方?请教一下,各位牛人觉得Rust语言怎么样?
spark RDD不能当K/V store是吧?Spark请教。
Spark PK Akka 完胜呀如何提高Spark在Yarn上的内存使用率
spark看了一边 没什么难点啊。7天掌握spark load数据速度
相关话题的讨论汇总
话题: rdd话题: executor话题: spark话题: partition话题: 数目