博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark版本定制八:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
阅读量:6694 次
发布时间:2019-06-25

本文共 9978 字,大约阅读时间需要 33 分钟。

本期内容:

1、DStream与RDD关系彻底研究

2、Streaming中RDD的生成彻底研究

一、DStream与RDD关系彻底研究

课前思考:

RDD是怎么生成的?

RDD依靠什么生成?根据DStream来的

RDD生成的依据是什么?

Spark Streaming中RDD的执行是否和Spark Core中的RDD执行有所不同?

运行之后我们对RDD怎么处理?

ForEachDStream不一定会触发Job的执行,但是它一定会触发job的产生,和Job是否执行没有关系;

 

问:RDD依靠什么生成的?

      下面以官方自带的案例来研究RDD是依靠DStream产生的:  

object NetworkWordCount {  def main(args: Array[String]) {    // Create the context with a 1 second batch size    val sparkConf = new SparkConf().setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(1))    val lines = ssc.socketTextStream("Master", 9999)//输入的DStream    val words = lines.flatMap(_.split(" ")) //输入和输出之间的都是transformation的DStream    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)    wordCounts.print() // 内部会导致Action级别的触发  print()输出的DStream    ssc.start()    ssc.awaitTermination()  }}

从上面的红色代码中分析出此案例依次产生了如下DStream,并且它们是从后往前依赖的:

ReceiverInputDStream-->FlatMappedDStream-->MappedDStream-->ShuffledDStream-->ForEachDStream
如何证明DStream之间是相互依赖的呢,我们随便挑一个子DStream作为入口进行分析,比如MappedDStream:
/** Return a new DStream by applying a function to all elements of this DStream. */  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {    new MappedDStream(this, context.sparkContext.clean(mapFunc))  }
MappedDStream类:
private[streaming]class MappedDStream[T: ClassTag, U: ClassTag] (    parent: DStream[T],    mapFunc: T => U  ) extends DStream[U](parent.ssc) {  override def dependencies: List[DStream[_]] = List(parent)  override def slideDuration: Duration = parent.slideDuration  override def compute(validTime: Time): Option[RDD[U]] = {    parent.getOrCompute(validTime).map(_.map[U](mapFunc))  }} MappedDStream中的compute方法,会先获取parent Dstream.然后基于其结果进行map操作,其中mapFunc就是我们传入的业务逻辑,这就证明了它们的依赖关系!
问:DStream为什么要从后往前依赖呢?    因为DStream代表Spark Streaming业务逻辑,RDD是从后往前依赖的,DStream是lazy级别的。DStream的依赖关系必须和RDD的依赖关系保持高度一致 上面产生的子DStream都继承自DStream,所以我们从DStream入手:
/* * DStreams internally is characterized by a few basic properties: *  - A list of other DStreams that the DStream depends on *  - A time interval at which the DStream generates an RDD *  - A function that is used to generate an RDD after each time interval

    大致意思是:

   1.DStream依赖于其他DStream,除了第一个DStream,因为第一个DStream基于数据源产生,用于接收数据,所以无其他依赖;进一步证明了DStream是从后往前依赖!!

   2.基于DStream怎么产生RDD?每隔BatchDuration,DStream生成一个RDD;

   3.每隔BatchDuration,DStream内部函数会生成RDD;

*/abstract class DStream[T: ClassTag] (    @transient private[streaming] var ssc: StreamingContext  ) extends Serializable with Logging {  // RDDs generated, marked as private[streaming] so that testsuites can access it //DStream是RDD的模板,每隔一个batchInterval会根据DStream模板生成一个对应的RDD。然后将RDD存储到DStream中的generatedRDDs数据结构中  @transient  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

generatedRDDs是DStream的成员,说明DStream的实例中均有此成员,但实际运行的时候,只需要知道最好一个DStream即可,因为可以从最后一个推导出之前所以的DStream!!

到此,我们验证了RDD是DStream是产生的结论!

下一节我们分析DStream是到底怎么生存RDD的?

二、Streaming中RDD的生成彻底研究

//DStream是RDD的模板,每隔一个batchInterval会根据DStream模板生成一个对应的RDD。然后将RDD存储到DStream中的generatedRDDs数据结构中  @transient  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()   generatedRDDs在哪里被实例化的?搞清楚了这里的HashMap在哪里被实例化的话,就知道RDD是怎么产生的!

 1.直接切入主题,进入DStream的getOrCompute方法:

/**   * Get the RDD corresponding to the given time; either retrieve it from cache   * or compute-and-cache it.    * 先根据时间判断HashMap中是否已存在该时间对应的RDD,如果没有则调用compute得到RDD,并放入到HashMap中   */  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {    // If RDD was already generated, then retrieve it from HashMap,    // or else compute the RDD    //看缓存中是否有,有的话直接获取    generatedRDDs.get(time).orElse {      // Compute the RDD if time is valid (e.g. correct time in a sliding window)      // of RDD generation, else generate nothing.      if (isTimeValid(time)) {        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {          // Disable checks for existing output directories in jobs launched by the streaming          // scheduler, since we may need to write output to an existing directory during checkpoint          // recovery; see SPARK-4835 for more details. We need to have this call here because          // compute() might cause Spark jobs to be launched.          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {            compute(time) //根据时间计算产生RDD          }        }        //rddOption里面有RDD生成的逻辑,然后生成的RDD,会put到generatedRDDs中        rddOption.foreach { case newRDD =>          // Register the generated RDD for caching and checkpointing          if (storageLevel != StorageLevel.NONE) {            newRDD.persist(storageLevel)            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")          }          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {            newRDD.checkpoint()            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")          }          generatedRDDs.put(time, newRDD)        }        rddOption      } else {        None      }    }  }

进入compute方法,发现其并没有具体的实现,说明在其子类中有重写并生成rdd

/** Method that generates a RDD for the given time */  def compute(validTime: Time): Option[RDD[T]]

2.进入ReceiverInputDStream的compute方法:/**   * Generates RDDs with blocks received by the receiver of this stream. */

override def compute(validTime: Time): Option[RDD[T]] = {    val blockRDD = {      if (validTime < graph.startTime) {        // If this is called for any time before the start time of the context,        // then this returns an empty RDD. This may happen when recovering from a        // driver failure without any write ahead log to recover pre-failure data.        new BlockRDD[T](ssc.sc, Array.empty)      } else {        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream        // for this batch         // receiverTracker跟踪数据的产生        val receiverTracker = ssc.scheduler.receiverTracker        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)        // Register the input blocks information into InputInfoTracker        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)        // Create the BlockRDD         // 创建并返回BlockRDD,由于ReceiverInputDStream没有父依赖,所以自己生成RDD。         // 如果没有输入数据会产生一系列空的RDD         createBlockRDD(validTime, blockInfos)      }    }    Some(blockRDD)  }

注意:Spark Streaming实际上在没有输入数据的时候仍然会产生RDD(空的BlockRDD),所以可以在此修改源码,提升性能。反过来仔细思考一下,流处理实际上就是时间极短的情况下完成的批处理!!

 

3.再进入MappedDStream的compute方法:

class MappedDStream[T: ClassTag, U: ClassTag] (    parent: DStream[T],    mapFunc: T => U  ) extends DStream[U](parent.ssc) {    //除了第一个DStream产生RDD之外,其他的DStream都是从前面DStream产生的RDD开始计算  override def dependencies: List[DStream[_]] = List(parent)  override def slideDuration: Duration = parent.slideDuration    override def compute(validTime: Time): Option[RDD[U]] = {

       //getOrCompute是对RDD进行操作,后面的map就是对RDD进行操作

       //DStream里面的计算其实是对RDD进行计算,而mapFunc就是我们要操作的具体业务逻辑

parent.getOrCompute(validTime).map(_.map[U](mapFunc))  }}

4.进入ForEachDStream的compute的方法:

  发现其compute方法没有任何操作,但是重写了generateJob方法!

 

private[streaming]class ForEachDStream[T: ClassTag] (    parent: DStream[T],    foreachFunc: (RDD[T], Time) => Unit,    displayInnerRDDOps: Boolean  ) extends DStream[Unit](parent.ssc) {  override def dependencies: List[DStream[_]] = List(parent)  override def slideDuration: Duration = parent.slideDuration  override def compute(validTime: Time): Option[RDD[Unit]] = None  override def generateJob(time: Time): Option[Job] = {    parent.getOrCompute(time) match {      case Some(rdd) =>        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {          foreachFunc(rdd, time)        }

              //此时考虑jobFunc中一定有action操作

              //因此jobFunc被调用的时候就会触发action操作

Some(new Job(time, jobFunc))      case None => None    }  }}

5.从Job生成入手,JobGenerator的generateJobs方法,内部调用的DStreamGraph的generateJobs方法:

/** Generate jobs and perform checkpoint for the given `time`.  */  private def generateJobs(time: Time) {    // Set the SparkEnv in this thread, so that job generation code can access the environment    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.    SparkEnv.set(ssc.env)    Try {
//根据特定的时间获取具体的数据 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //调用DStreamGraph的generateJobs生成Job graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }

  

 DStreamGraph的generateJobs方法调用了OutputStream的generateJob方法,OutputStream就是ForEachDStream:

def generateJobs(time: Time): Seq[Job] = {    logDebug("Generating jobs for time " + time)    val jobs = this.synchronized {      outputStreams.flatMap { outputStream =>        val jobOption = outputStream.generateJob(time)        jobOption.foreach(_.setCallSite(outputStream.creationSite))        jobOption      }    }    logDebug("Generated " + jobs.length + " jobs for time " + time)    jobs  }

  

总结:DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例。DStream的依赖构成了RDD依赖关系,即从后往前计算时,只要对最后一个DStream计算即可。JobGenerator每隔BatchDuration调用DStreamGraph的generateJobs方法,调用了ForEachDStream的generateJob方法,其内部先调用父DStream的getOrCompute方法来获取RDD,然后在进行计算,从后往前推,第一个DStream是ReceiverInputDStream,其comput方法中从receiverTracker中获取对应时间段的metadata信息,然后生成BlockRDD对象,并放入到generatedRDDs中!!

 

特别感谢王家林老师的独具一格的讲解:

王家林老师名片:

中国Spark第一人

新浪微博:

微信公众号:DT_Spark

博客:

QQ:1740415547

YY课堂:每天20:00现场授课频道68917580

转载于:https://www.cnblogs.com/game-bigdata/p/5521660.html

你可能感兴趣的文章
Cookie 和 Session 关系和区别
查看>>
CoreFoundation CFRuntimeBase下的_cfinfo[4]存储信息探究
查看>>
学习记录——盒模型
查看>>
Swift语音和文本的转换
查看>>
Array方法汇总
查看>>
flex布局
查看>>
四大组件之Service_绑定服务
查看>>
swift中使用Objective C代码
查看>>
MS15-106 JScript ArrayBuffer.slice 任意地址读漏洞分析
查看>>
写一个复制 GitHub 仓库目录结构的cli
查看>>
Docker了解
查看>>
我的另类秋招 | 掘金技术征文
查看>>
【刷算法】把数组排成最小的数
查看>>
【刷算法】数值的整数次方
查看>>
笔记-OC语言的编译时与运行时
查看>>
Swift 让 Async 帮你解决线程问题
查看>>
ViewGroup事件分发机制
查看>>
LeetCode--9. 回文数
查看>>
flutter-dart 组件构造函数介绍
查看>>
基于Java语言构建区块链(三)—— 持久化 & 命令行
查看>>