博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark源码分析 – SparkContext
阅读量:4552 次
发布时间:2019-06-08

本文共 3851 字,大约阅读时间需要 12 分钟。

这位写的非常好, 让我对Spark的源码分析, 变的轻松了许多
这里自己再梳理一遍

先看一个简单的spark操作,

val sc = new SparkContext(……) val textFile = sc.textFile("README.md")textFile.filter(line => line.contains("Spark")).count()

 

1. SparkContext

这是Spark的入口, 任何需要使用Spark的地方都需要先创建SparkContext

在SparkContext中, 最主要的初始化工作就是start TaskScheduler和DAGScheduler, 这两个就是Spark的核心所在

Spark的设计非常的干净, 把整个DAG抽象层从实际的task执行中剥离了出来

DAGScheduler, 负责解析spark命令, 生成stage, 形成DAG, 最终划分成tasks, 提交给TaskScheduler, 他只完成静态分析
TaskScheduler, 专门负责task执行, 他只负责资源管理, task分配, 执行情况的报告
这样的好处, 就是Spark可以通过提供不同的TaskScheduler简单的支持各种资源调度和执行平台, 现在Spark支持, local, standalone, mesos, Yarn...

class SparkContext(    val master: String,    val appName: String,    val sparkHome: String = null,    val jars: Seq[String] = Nil,    val environment: Map[String, String] = Map(),    // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.    // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())  extends Logging {  // Create and start the scheduler  private var taskScheduler: TaskScheduler = {  //.......  }  taskScheduler.start()  @volatile private var dagScheduler = new DAGScheduler(taskScheduler)  dagScheduler.start()}

 

2. sc.textFile

然后当然要载入被处理的数据, 最常用的textFile, 其实就是生成HadoopRDD, 作为起始的RDD

/**   * Read a text file from HDFS, a local file system (available on all nodes), or any   * Hadoop-supported file system URI, and return it as an RDD of Strings.   */  def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)      .map(pair => pair._2.toString)  }
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */  def hadoopFile[K, V](      path: String,      inputFormatClass: Class[_ <: InputFormat[K, V]],      keyClass: Class[K],      valueClass: Class[V],      minSplits: Int = defaultMinSplits      ) : RDD[(K, V)] = {    val conf = new JobConf(hadoopConfiguration)    FileInputFormat.setInputPaths(conf, path)    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)  }

 

3. Transform and Action

这里调用的filter transform很简单, 可以参考前面的blog

关键调用count action, action的不同在于, 会调用runjob
所以在调用action之前, job都是没有被真正执行的

def count(): Long = {
// 只有在action中才会真正调用runJob, 所以transform都是lazy的 sc.runJob(this, (iter: Iterator[T]) => { // count调用的是简化版的runJob, 只传入rdd和func, 其他的会用默认值补全 var result = 0L while (iter.hasNext) { result += 1L iter.next() } result }).sum }

 

4. sc.runJob

关键在于调用了dagScheduler.runJob

/**   * Run a function on a given set of partitions in an RDD and pass the results to the given   * handler function. This is the main entry point for all actions in Spark. The allowLocal   * flag specifies whether the scheduler can run the computation on the driver(创建SparkContext的进程) rather than   * shipping it out to the cluster, for short actions like first().   */  def runJob[T, U: ClassManifest](      rdd: RDD[T], //只需要传入Final RDD, 前面的可以根据dependency推出      func: (TaskContext, Iterator[T]) => U, //action的逻辑,比如count逻辑      partitions: Seq[Int],  //partition的个数      allowLocal: Boolean, //对于一些简单的action,是否允许在local执行      resultHandler: (Int, U) => Unit) { //会在JobWaiter的taskSucceeded中用于处理task result    val callSite = Utils.formatSparkCallSite    logInfo("Starting job: " + callSite)    val start = System.nanoTime    val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,      localProperties.get)    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")    rdd.doCheckpoint()    result  }

转载于:https://www.cnblogs.com/fxjwind/p/3489498.html

你可能感兴趣的文章
20181227 新的目标
查看>>
HDFS写流程
查看>>
生产环境服务器环境搭建+ 项目发布
查看>>
js按条件分类json数组,并合计同组数据(一维转换为二维)
查看>>
Exp6 信息搜集与漏洞扫描
查看>>
redis4安装
查看>>
随机生成双色球号码
查看>>
使用命令wsimport构建WebService客户端[转]
查看>>
第八遍:链接详解
查看>>
Qt5.5 使用smtp发邮件的各种坑
查看>>
js奇葩错误 字符串传递问题
查看>>
人之初,性本恶
查看>>
springboot 端口号
查看>>
使用AChartEngine画动态曲线图
查看>>
安卓项目五子棋代码详解(四)
查看>>
urllib 学习一
查看>>
bzoj4196 [Noi2015]软件包管理器——树链剖分
查看>>
kafka源码阅读环境搭建
查看>>
UI设计
查看>>
androidtab
查看>>