曾經在一次面試中被問到 Spark WordCount 產生多少個 RDD,您知道么?下面通過源碼來說明經典得 WordCount 到底產生多少個 RDD。
import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("WordCountApp").setMaster("local[2]") val sc = new SparkContext(conf) val wc = sc.textFile("hdfs://hadoop001:9000/data/wordcount.txt") .flatMap(x=>(x.split(","))).map(x=>(x,1)).reduceByKey(_+_) .saveAsTextFile("hdfs://hadoop001:9000/data/output") sc.stop() } }
textFile()
通過下面得源碼,可以看到在這個方法中先調用了一個 hadoopFile 方法再調用 map 方法
hadoopFile 方法返回得是個 RDD(HadoopRDD),在對這個RDD調用map方法,
點到map方法中可以看到 ,map方法中產生了一個MapPartitionsRDD
也就是說 textFile 產生 2個 RDD分別是 HadoopRDD 和 MapPartitionsRDD
flatMap ()flatMap 產生了一個 RDD,MapPartitionsRDD
map()map 產生了一個 RDD,MapPartitionsRDD
reduceByKey()這里要注意啦,reduceByKey 雖然是一個 rdd 調用得,但 reduceByKey 這個方法不是 RDD 中得方法,我們可以在 RDD 中找到如下得一個隱式轉換,當我們去調用reduceByKey 方法時,會發生隱式轉換,隱式得 RDD 轉化成了PairRDDFunctions這個類,reduceByKey 是 PairRDDFunctions 得方法
reduceByKey 產生了一個RDD,ShuffledRDD
saveAsTextFile()其實,在執行saveAsTextFile之前,我們可以通過RDD提供得toDebugString看到這些個算子在調用得時候到底產生了多少個RDD
scala> val rdd = sc.textFile("file:///home/hadoop/data/wordcount.txt").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:24 scala> rdd.toDebugString res1: String = (2) ShuffledRDD[9] at reduceByKey at <console>:24 [] +-(2) MapPartitionsRDD[8] at map at <console>:24 [] | MapPartitionsRDD[7] at flatMap at <console>:24 [] | file:///home/hadoop/data/wordcount.txt MapPartitionsRDD[6] at textFile at <console>:24 [] | file:///home/hadoop/data/wordcount.txt HadoopRDD[5] at textFile at <console>:24 []
總結
我們可以看見在 Spark 得一個標準得 WordCount 中一共會產生 6 個 RDD,textFile() 會產生一個 HadoopRDD 和一個 MapPerPartitionRDD,flatMap() 方法會產生一個 MapPartitionsRDD,map() 方法會產生一個 MapPartitionsRDD ,reduceByKey() 方法會產生一個 ShuffledRDD,saveAsTextFile 會產生一個 MapPartitionsRDD,所以一共會產生 6 個 RDD。
如果感覺上面得文章對各位有幫助,歡迎各位大佬我個人,感謝。