源碼直接參照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
object SqlNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount ")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 2 second batch size
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount").setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
//Socke作為數據源
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
//words DStream
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
//調用foreachRDD方法,遍歷DStream中的RDD
words.foreachRDD((rdd: RDD[String], time: Time) => {
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Register as table
wordsDataFrame.registerTempTable("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
})
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
運行程序後,再運行下列命令
root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
處理結果:
========= 1448783840000 ms =========
+---------+-----+
| word|total|
+---------+-----+
| Spark| 12|
| system| 12|
| general| 12|
| fast| 12|
| and| 12|
|computing| 12|
| a| 12|
| is| 12|
| for| 12|
| Big| 12|
| cluster| 12|
| Data| 12|
+---------+-----+
========= 1448783842000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+
========= 1448783844000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+