李成笔记网

专注域名、站长SEO知识分享与实战技巧

RDD弹性特性 弹性中dq/dp是什么意思

RDD作为弹性分布式数据集,弹性具体体现在

自动进行内存和磁盘数据存储的切换

Spark会优先把数据放到内存中,如果内存放不下,会放到磁盘里面。当应用程序内存不足时,Spark应用程序将数据自动从内存存储切换到磁盘存储,以保证其高效运行。

基于Lineage(血统)的高效容错机制

Lineage是基于Spark RDD的依赖关系来完成的,每个操作只关联其父操作,各个分片的数据之间互不影响,出现错误只需要恢复单个Split的特定部分即可。

常规容错有两种方式:

1、数据检查点

2、记录数据的更新操作

Spark的RDD通过记录数据更新的方式进行容错,主要原因有:RDD是不可变的且Lazy;RDD的写操作是粗粒度的。但是RDD的读既可以是粗粒度的,也可以是细粒度的。

Task如果失败,会自动进行特定次数的重试

默认重试次数为4次。TaskSchedulerImpl是底层任务调度接口TaskScheduler的实现,这些Schedulers从每个Stage中的DAGSchedler中获取TaskSet,运行它们,尝试是否有故障。DAGSchedler是高层调度,计算每个Job的Stage的DAG,然后提交Stage,用TaskSets的形式启动底层TaskScheduler调度在集群中运行。

Task默认重试次数,位于org.apache.spark.internal.config#MAX_TASK_FAILURES

 private[spark] val MAX_TASK_FAILURES =
 ConfigBuilder("spark.task.maxFailures")
 .intConf
 .createWithDefault(4)

TaskSchedulerImpl源码,位于org.apache.spark.scheduler.TaskSchedulerImpl

 private[spark] class TaskSchedulerImpl private[scheduler](
 val sc: SparkContext,
 val maxTaskFailures: Int,
 private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
 isLocal: Boolean = false)
 extends TaskScheduler with Logging {
 ?
 import TaskSchedulerImpl._
 ?
 def this(sc: SparkContext) = {
 this(
 sc,
 sc.conf.get(config.MAX_TASK_FAILURES),
 TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
 }

Stage如果失败,会自动进行特定次数的重试

Stage对象可以跟踪多个StageInfo。默认重试次数为4次,且可以直接运行计算失败的阶段,只计算失败的数据分片。

Stage是Spark Job运行时具有相同逻辑功能和并行计算任务的一个基本单元。Stage中所有的任务都依赖同样的Shuffle,每个DAG任务通过DAGScheduler在Stage的边界处发生Shuffle形成Stage,然后DAGScheduler运行这些阶段的拓扑排序。

每个Stage都可能是ShuffleMapStage,如果是ShuffleMapStage,则跟踪每个输出节点上输出文件分区,任务结果会输入其他的Stage,或者输入一个ResultStage;如果是ResultStage,这个Stage的任务直接在这个RDD上运行计算这个Spark Action函数。

每个Stage会有firstJobId,度额定第一个提交Stage的Job,使用FIFO调度实,会使其前面的Job先计算或快速恢复。

ShuffleMapStage是DAG产生数据进行Shuffle的中间阶段,发生在每次Shuffle操作之前,可能包含多个Pipelined操作;ResultStage阶段捕获函数在RDD分区上运行Action算子计算结果。

Stage源码,位于org.apache.spark.scheduler.Stage

 private[scheduler] abstract class Stage(
 val id: Int,
 val rdd: RDD[_],
 val numTasks: Int,
 val parents: List[Stage],
 val firstJobId: Int,
 val callSite: CallSite)
 extends Logging {
 ?
 // partition的个数
 val numPartitions = rdd.partitions.length
 ?
 /** Set of jobs that this stage belongs to. */
 /** 属于这个工作集的Stage */
 val jobIds = new HashSet[Int]
 ?
 /** The ID to use for the next new attempt for this stage. */
 /** 用于此Stage的下一个新attempt的标识ID */
 private var nextAttemptId: Int = 0
 ?
 val name: String = callSite.shortForm
 val details: String = callSite.longForm
 ?
 /**
 * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
 * here, before any attempts have actually been created, because the DAGScheduler uses this
 * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
 * have been created).
 */
 /**
 * 最新的[StageInfo] object指针,需要被初始化
 * 任何attempts都是被创造出来的,因为DAGScheduler使用StageInfo
 * 告诉SparkListeners工作何时开始
 */
 private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
 ?
 /**
 * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
 * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
 * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
 * multiple tasks from the same stage attempt fail (SPARK-5945).
 */
 /**
 * 设置stage attempy IDs 当失败是可以读取失败信息
 * 跟踪这些失败,为了避免无休止地重复失败
 * 跟踪每一次attempt,以便避免记录重复故障
 * 如果从同一stage窗体间多任务失败
 */
 val fetchFailedAttemptIds = new HashSet[Int]
 ?
 private[scheduler] def clearFailures() : Unit = {
 fetchFailedAttemptIds.clear()
 }
 ?
 /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
 /** 在stage中创建一个新的attempt */
 def makeNewStageAttempt(
 numPartitionsToCompute: Int,
 taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
 val metrics = new TaskMetrics
 metrics.register(rdd.sparkContext)
 _latestInfo = StageInfo.fromStage(
 this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
 nextAttemptId += 1
 }
 ?
 /** Returns the StageInfo for the most recent attempt for this stage. */
 /** 放回当前stage中最新的StageInfo */
 def latestInfo: StageInfo = _latestInfo
 ?
 override final def hashCode(): Int = id
 ?
 override final def equals(other: Any): Boolean = other match {
 case stage: Stage => stage != null && stage.id == id
 case _ => false
 }
 ?
 /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
 /** 返回需要重新计算的分区标识的序列 */
 def findMissingPartitions(): Seq[Int]
 }

在Stage终止前允许Stage连续尝试4次,位于org.apache.spark.scheduler.DAGScheduler#maxConsecutiveStageAttempts

 /** 在终止之前允许的连续尝试次数 */
 private[scheduler] val maxConsecutiveStageAttempts =
 sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
 DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
 private[spark] object DAGScheduler {
 // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
 // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
 // as more failure events come in
 /**
 * 在毫秒级别,等待读取失败事件后就停止;这是一个避免重新提交任务的简单方法,非读取任务的map中更多失败事件的到来
 */
 val RESUBMIT_TIMEOUT = 200
 
 // Number of consecutive stage attempts allowed before a stage is aborted
 /** 终止之前允许连续尝试的次数 */
 val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
 }

checkpoint和persist(检查点和持久化),可主动或被动触发

checkpoint是对RDD进行标记,会产生一些列的文件,且所有所有父依赖都会被删除,是整个依赖的重点。checkpoint是Lazy级别的。persist后RDD工作室每个工作节点都会把计算的分片结果保存在内存或者磁盘上,下一次对相同的RDD进行其他Action计算。就可以重用。

当RDD.iterator()被调用的时候,也就是计算该RDD中某个Partition的时候,会先去cacheManager获取一个blockId,然后去BlockManager里匹配Partition是否被checkpoint了。如果是,就不用计算该Partition,直接产品能够checkPoint中读取该Partition的所有records放入ArrayBuffer里面。如果没有被checkPoint过,将Partition计算出来,然后将其所有records放入到cache中。

总体来说,当RDD会被重复使用时,RDD需要cache。Spark自动监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据,如果想手动删除RDD,可以使用RDD.unpersist()方法。

RDD.iterator源码,位于org.apache.spark.rdd.RDD#iterator

 final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
 // 判断此RDD的持久户登记是为为NONE,不进行持久化
 if (storageLevel != StorageLevel.NONE) {
 getOrCompute(split, context)
 } else {
 computeOrReadCheckpoint(split, context)
 }
 }

可以用不同的存储级别存储每一个被持久化的RDD。StorageLevel是控制存储RDD的标志,Spark的多个存储级别意味着在内存利用率和CPU利用率间的不同平衡。推荐通过下面的过程选择一个合适的存储级别

  • 如果RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是CPU利用率最高的选项,会使RDD上的操作尽可能地块。
  • 如果不适合用默认级别,就选择MEMOEY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,任然能够快速地访问。
  • 除非算子计算RDD话费较大或者需要过滤大量的数据,不要将RDD存储在磁盘上,否则重复计算一个分区,就会和从磁盘上读取数据一样慢。
  • 如果希望更快地恢复错误,可以利用replicated存储机制,所有的存储级别都可以通过replicated计算丢失的数据来支持完整的容错。另外,replicated的数据能在RDD上继续运行任务,而不需要重复计算丢失的数据。
  • 在拥有大量内存的环境中或者多应用程序的环境中,Off_Heap具有如下优势:Off_Heap运行多个执行者共享的Alluxio中的相同的内存池,限制减少GC,如果当个Executor崩溃,缓存的数据也不会丢失。

StorageLevel源码,位于org.apache.spark.storage.StorageLevel

 val NONE = new StorageLevel(false, false, false, false)
 val DISK_ONLY = new StorageLevel(true, false, false, false)
 val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
 val MEMORY_ONLY = new StorageLevel(false, true, false, true)
 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
 val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
 val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
 val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
 val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
 val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

数据弹性调度,DAGScheduler、TaskScheduler和资源管理无关

Spark将执行模型丑行为通过的有向无环图(DAG),可以将多个Stage的任务串联或并行执行,从而不需要将Stage中间结果输出到HDFS上,当发生节点运行故障时,可有其他可用节点代替该故障节点运行。

数据分片的高度弹性。

Spark 进行数据分片时,默认将数据放在内存汇总,如果内存放不下,一部分会放在磁盘上保存。

RDD的coalesce算子源码,位于org.apache.spark.rdd.RDD#coalesce

 def coalesce(numPartitions: Int, shuffle: Boolean = false,
 partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
 (implicit ord: Ordering[T] = null)
 : RDD[T] = withScope {
 require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
 if (shuffle) {
 /** Distributes elements evenly across output partitions, starting from a random partition. */
 /** 从随机分区开始,将元素均匀分布在输出分区上 */
 val distributePartition = (index: Int, items: Iterator[T]) => {
 var position = (new Random(index)).nextInt(numPartitions)
 items.map { t =>
 // Note that the hash code of the key will just be the key itself. The HashPartitioner
 // will mod it with the number of total partitions.
 position = position + 1
 (position, t)
 }
 } : Iterator[(Int, T)]
 
 // include a shuffle step so that our upstream tasks are still distributed
 // 包括一个Shuffle步骤,使上游任务仍然是分布式的
 new CoalescedRDD(
 new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
 new HashPartitioner(numPartitions)),
 numPartitions,
 partitionCoalescer).values
 } else {
 new CoalescedRDD(this, numPartitions, partitionCoalescer)
 }
 }

如果在计算过程中,产生很多的数据碎片,这是产生的Partition可能会非常小,如果一个Partition非常小,每次都会消耗一个线程取处理,这是可能降低它的处理效率。可以考虑把许多个小的Partition合并成一个较大的Partition处理,会提高效率。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言