大數(shù)據(jù)分析技術(shù)與實(shí)戰(zhàn)之Spark Streaming

大數(shù)據(jù)

隨著信息技術(shù)的迅猛發(fā)展,數(shù)據(jù)量呈現(xiàn)出爆炸式增長(zhǎng)趨勢(shì),數(shù)據(jù)的種類與變化速度也遠(yuǎn)遠(yuǎn)超出人們的想象,因此人們對(duì)大數(shù)據(jù)處理提出了更高的要求,越來(lái)越多的領(lǐng)域迫切需要大數(shù)據(jù)技術(shù)來(lái)解決領(lǐng)域內(nèi)的關(guān)鍵問(wèn)題。在一些特定的領(lǐng)域中(例如金融、災(zāi)害預(yù)警等),時(shí)間就是金錢、時(shí)間可能就是生命!然而傳統(tǒng)的批處理框架卻一直難以滿足這些領(lǐng)域中的實(shí)時(shí)性需求。為此,涌現(xiàn)出了一批如S4、Storm的流式計(jì)算框架。Spark是基于內(nèi)存的大數(shù)據(jù)綜合處理引擎,具有優(yōu)秀的作業(yè)調(diào)度機(jī)制和快速的分布式計(jì)算能力,使其能夠更加高效地進(jìn)行迭代計(jì)算,因此Spark能夠在一定程度上實(shí)現(xiàn)大數(shù)據(jù)的流式處理。

Spark Streaming是Spark上的一個(gè)流式處理框架,可以面向海量數(shù)據(jù)實(shí)現(xiàn)高吞吐量、高容錯(cuò)的實(shí)時(shí)計(jì)算。Spark Streaming支持多種類型數(shù)據(jù)源,包括Kafka、Flume、trwitter、zeroMQ、Kinesis以及TCP sockets等,如圖1所示。Spark Streaming實(shí)時(shí)接收數(shù)據(jù)流,并按照一定的時(shí)間間隔將連續(xù)的數(shù)據(jù)流拆分成一批批離散的數(shù)據(jù)集;然后應(yīng)用諸如map、reducluce、join和window等豐富的API進(jìn)行復(fù)雜的數(shù)據(jù)處理;最后提交給Spark引擎進(jìn)行運(yùn)算,得到批量結(jié)果數(shù)據(jù),因此其也被稱為準(zhǔn)實(shí)時(shí)處理系統(tǒng)。

大數(shù)據(jù)

圖1 Spark Streaming支持多種類型數(shù)據(jù)源

目前應(yīng)用最廣泛的大數(shù)據(jù)流式處理框架是Storm。Spark Streaming 最低0.5~2s做一次處理(而Storm最快可達(dá)0.1s),在實(shí)時(shí)性和容錯(cuò)方面不如Storm。然而Spark Streaming的集成性非常好,通過(guò)RDD不僅能夠與Spark上的所有組件無(wú)縫銜接共享數(shù)據(jù),還能非常容易地與Kafka、Flume等分布式日志收集框架進(jìn)行集成;同時(shí)Spark Streaming的吞吐量非常高,遠(yuǎn)遠(yuǎn)優(yōu)于Storm的吞吐量,如圖2所示。所以雖然Spark Streaming的處理延遲高于Storm,但是在集成性與吞吐量方面的優(yōu)勢(shì)使其更適用于大數(shù)據(jù)背景。

大數(shù)據(jù)

圖2 Spark Streaming與Storm吞吐量比較圖

Spark Streaming基礎(chǔ)概念

批處理時(shí)間間隔

在Spark Streaming中,對(duì)數(shù)據(jù)的采集是實(shí)時(shí)、逐條進(jìn)行的,但是對(duì)數(shù)據(jù)的處理卻是分批進(jìn)行的。因此,Spark Streaming需要設(shè)定一個(gè)時(shí)間間隔,將該時(shí)間間隔內(nèi)采集到的數(shù)據(jù)統(tǒng)一進(jìn)行處理,這個(gè)間隔稱為批處理時(shí)間間隔。

也就是說(shuō)對(duì)于源源不斷的數(shù)據(jù),Spark Streaming是通過(guò)切分的方式,先將連續(xù)的數(shù)據(jù)流進(jìn)行離散化處理。數(shù)據(jù)流每被切分一次,對(duì)應(yīng)生成一個(gè)RDD,每個(gè)RDD都包含了一個(gè)時(shí)間間隔內(nèi)所獲取到的所有數(shù)據(jù),因此數(shù)據(jù)流被轉(zhuǎn)換為由若干個(gè)RDD構(gòu)成的有序集合,而批處理時(shí)間間隔決定了Spark Streaming需要多久對(duì)數(shù)據(jù)流切分一次。Spark Streaming是Spark上的組件,其獲取的數(shù)據(jù)和數(shù)據(jù)上的操作最終仍以Spark作業(yè)的形式在底層的Spark內(nèi)核中進(jìn)行計(jì)算,因此批處理時(shí)間間隔不僅影響數(shù)據(jù)處理的吞吐量,同時(shí)也決定了Spark Streaming向Spark提交作業(yè)的頻率和數(shù)據(jù)處理的延遲。需要注意的是,批處理時(shí)間間隔的設(shè)置會(huì)伴隨Spark Streaming應(yīng)用程序的整個(gè)生命周期,無(wú)法在程序運(yùn)行期間動(dòng)態(tài)修改,所以需要綜合考慮實(shí)際應(yīng)用場(chǎng)景中的數(shù)據(jù)流特點(diǎn)和集群的處理性能等多種因素進(jìn)行設(shè)定。

窗口時(shí)間間隔

窗口時(shí)間間隔又稱為窗口長(zhǎng)度,它是一個(gè)抽象的時(shí)間概念,決定了Spark Streaming對(duì)RDD序列進(jìn)行處理的范圍與粒度,即用戶可以通過(guò)設(shè)置窗口長(zhǎng)度來(lái)對(duì)一定時(shí)間范圍內(nèi)的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)和分析。如果設(shè)批處理時(shí)間設(shè)為1s,窗口時(shí)間間隔為3s,如3圖所示,其中每個(gè)實(shí)心矩形表示Spark Streaming每1秒鐘切分出的一個(gè)RDD,若干個(gè)實(shí)心矩形塊表示一個(gè)以時(shí)間為序的RDD序列,而透明矩形框表示窗口時(shí)間間隔。易知窗口內(nèi)RDD的數(shù)量最多為3個(gè),即Spark Streming 每次最多對(duì)3個(gè)RDD中的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)和分析。對(duì)于窗口時(shí)間間隔還需要注意以下幾點(diǎn):

以圖3為例,在系統(tǒng)啟動(dòng)后的前3s內(nèi),因進(jìn)入窗口的RDD不足3個(gè),但是隨著時(shí)間的推移,最終窗口將被填滿。不同窗口內(nèi)所包含的RDD可能會(huì)有重疊,即當(dāng)前窗口內(nèi)的數(shù)據(jù)可能被其后續(xù)若干個(gè)窗口所包含,因此在一些應(yīng)用場(chǎng)景中,對(duì)于已經(jīng)處理過(guò)的數(shù)據(jù)不能立即刪除,以備后續(xù)計(jì)算使用。窗口時(shí)間間隔必須是批處理時(shí)間間隔的整數(shù)倍。

大數(shù)據(jù)

圖3 窗口時(shí)間間隔示意圖滑動(dòng)時(shí)間間隔

滑動(dòng)時(shí)間間隔決定了Spark Streaming對(duì)數(shù)據(jù)進(jìn)行統(tǒng)計(jì)與分析的頻率,多出現(xiàn)在與窗口相關(guān)的操作中?;瑒?dòng)時(shí)間間隔是基于批處理時(shí)間間隔提出的,其必須是批處理時(shí)間間隔的整數(shù)倍。在默認(rèn)的情況下滑動(dòng)時(shí)間間隔設(shè)置為與批處理時(shí)間間隔相同的值。如果批處理時(shí)間間隔為1s,窗口間隔為3s,滑動(dòng)時(shí)間間隔為2s,如圖4所示,其含義是每隔2s對(duì)過(guò)去3s內(nèi)產(chǎn)生的3個(gè)RDD進(jìn)行統(tǒng)計(jì)分析。

大數(shù)據(jù)

圖4 滑動(dòng)時(shí)間間隔、窗口時(shí)間間隔、批處理時(shí)間間隔綜合示意圖

DStream基本概念

DStream是Spark Streaming的一個(gè)基本抽象,它以離散化的RDD序列的形式近似描述了連續(xù)的數(shù)據(jù)流。DStream本質(zhì)上是一個(gè)以時(shí)間為鍵,RDD為值的哈希表,保存了按時(shí)間順序產(chǎn)生的RDD,而每個(gè)RDD封裝了批處理時(shí)間間隔內(nèi)獲取到的數(shù)據(jù)。Spark Streaming每次將新產(chǎn)生的RDD添加到哈希表中,而對(duì)于已經(jīng)不再需要的RDD則會(huì)從這個(gè)哈希表中刪除,所以DStream也可以簡(jiǎn)單地理解為以時(shí)間為鍵的RDD的動(dòng)態(tài)序列。設(shè)批處理時(shí)間間隔為1s,圖5為4s內(nèi)產(chǎn)生的DStream示意圖。

大數(shù)據(jù)

圖5 DStream示意圖

Spark Streaming編程模式與案例分析

Spark Streaming編程模式

下面以Spark Streaming官方提供的WordCount代碼為例來(lái)介紹Spark Streaming的使用方式。

示例1:

import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ /*創(chuàng)建一個(gè)本地模式的StreamingContext,并設(shè)定master節(jié)點(diǎn)工作線程數(shù)為2,并以1秒作為批處理時(shí)間間隔。*/ val conf = new SparkConf().setMaster("local[2]"). setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) /*通過(guò)獲取”localhost”節(jié)點(diǎn)9999端口中的實(shí)時(shí)數(shù)據(jù)流創(chuàng)建DStream。*/ val lines = ssc.socketTextStream("localhost", 9999) /*以空格作為分割DStream中數(shù)據(jù)的依據(jù),使得每一行文本轉(zhuǎn)換為若干個(gè)單詞。*/ val words = lines.flatMap(_.split(" ")) import org.apache.spark.streaming.StreamingContext._ /*對(duì)于words中的每個(gè)單詞word,轉(zhuǎn)換為相應(yīng)的二元組形式(word,1),在此基礎(chǔ)上統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)。*/ val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) //輸出DStream中每個(gè)RDD中前10個(gè)元素。 wordCounts.print() //啟動(dòng)Spark Streaming應(yīng)用程序。 ssc.start() //等待計(jì)算完成。 ssc.awaitTermination()

Spark Streaming應(yīng)用程序在功能結(jié)構(gòu)上通常包含以下五部分,如上述示例1所示。

??導(dǎo)入Spark Streaming相關(guān)包:Spark Streaming作為Spark框架上的一個(gè)組件,具有很好的集成性。在開(kāi)發(fā)Spark Streaming應(yīng)用程序時(shí),只需導(dǎo)入Spark Streaming相關(guān)包,無(wú)需額外的參數(shù)配置。

??創(chuàng)建StreamingContext對(duì)象:同Spark應(yīng)用程序中的SparkContext對(duì)象一樣, StreamingContext對(duì)象是Spark Streaming應(yīng)用程序與集群進(jìn)行交互的唯一通道,其中封裝了Spark集群的環(huán)境信息和應(yīng)用程序的一些屬性信息。在該對(duì)象中通常需要指明應(yīng)用程序的運(yùn)行模式(示例1中設(shè)為local[2])、設(shè)定應(yīng)用程序名稱(示例1中設(shè)為NetworkWordCount)、設(shè)定批處理時(shí)間間隔(示例1中設(shè)為Seconds(1)即1秒鐘),其中批處理時(shí)間間隔需要根據(jù)用戶的需求和集群的處理能力進(jìn)行適當(dāng)?shù)卦O(shè)置。

??創(chuàng)建InputDStream:Spark Streaming需要根據(jù)數(shù)據(jù)源類型選擇相應(yīng)的創(chuàng)建DStream的方法。示例1中Spark Streaming通過(guò)StreamingContext對(duì)象調(diào)用socketTextStream方法處理以socket連接類型數(shù)據(jù)源,創(chuàng)建出DStream即lines。Spark Streaming同時(shí)支持多種不同的數(shù)據(jù)源類型,其中包括Kafka、Flume、HDFS/S3、Kinesis和Twitter等數(shù)據(jù)源。

??操作DStream:對(duì)于從數(shù)據(jù)源得到的DStream,用戶可以調(diào)用豐富的操作對(duì)其進(jìn)行處理。示例1中針對(duì)lines的一系列操作就是一個(gè)典型的WordCount執(zhí)行流程:對(duì)于當(dāng)前批處理時(shí)間間隔內(nèi)的文本數(shù)據(jù)以空格進(jìn)行切分,進(jìn)而得到words;再將words中每個(gè)單詞轉(zhuǎn)換為二元組,進(jìn)而得到pairs;最后利用reduceByKey方法進(jìn)行統(tǒng)計(jì)。

??啟動(dòng)與停止Spark Streaming應(yīng)用程序:在啟動(dòng)Spark Streaming應(yīng)用程序之前,DStream上所有的操作僅僅是定義了數(shù)據(jù)的處理流程,程序并沒(méi)有真正連接上數(shù)據(jù)源,也沒(méi)有對(duì)數(shù)據(jù)進(jìn)行任何操作,當(dāng)ssc.start()啟動(dòng)后程序中定義的操作才會(huì)真正開(kāi)始執(zhí)行。

文本文件數(shù)據(jù)處理案例

功能需求

實(shí)時(shí)監(jiān)聽(tīng)并獲取本地home/dong/Streamingtext目錄中新生成的文件(文件均為英文文本文件,單詞之間使用空格進(jìn)行間隔),并對(duì)文件中各單詞出現(xiàn)的次數(shù)進(jìn)行統(tǒng)計(jì)。

代碼實(shí)現(xiàn)

package dong.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StreamingFileWordCount { def main(args: Array[String]): Unit ={ //以local模式運(yùn)行,并設(shè)定master節(jié)點(diǎn)工作線程數(shù)為2。 val sparkConf = new SparkConf(). setAppName("StreamingFileWordCount"). setMaster("local[2]") /*創(chuàng)建StreamingContext實(shí)例,設(shè)定批處理時(shí)間間隔為20秒。*/ val ssc = new StreamingContext(sparkConf,Seconds(20)) /*指定數(shù)據(jù)源來(lái)自本地home/dong/Streamingtext。*/ val lines = ssc.textFileStream("/home/dong/Streamingtext") /*在每個(gè)批處理時(shí)間間隔內(nèi),對(duì)指定文件夾中變化的數(shù)據(jù)進(jìn)行單詞統(tǒng)計(jì)并打印。*/ val words= lines.flatMap(_.split(" ")) val wordcounts=words.map(x=>(x,1)).reduceByKey(_+_) wordcounts.print() ssc.start() ssc.awaitTermination() } }

運(yùn)行演示

第1步,啟動(dòng)hadoop與Spark。

$ start-all.sh $ cd spark-1.4.0-bin-hadoop2.4 $ sbin/start-all.sh

第2步,創(chuàng)建Streaming監(jiān)控目錄。

$ mkdir /home/dong/Streamingtext

在dong用戶主目錄下創(chuàng)建Streamingtext為Spark Streaming監(jiān)控的目錄,創(chuàng)建后如圖6所示。

大數(shù)據(jù)

圖6 dong用戶主目錄下創(chuàng)建Streamingtext文件夾

第3步,在IntelliJ IDEA中編輯運(yùn)行Streaming程序。在IntelliJ IDEA中創(chuàng)建工程StreamingFileWordCount,編輯對(duì)象StreamingFileWordCount,如圖7所示。

大數(shù)據(jù)

圖7 IntelliJ IDEA中StreamingFileWordCount示意圖

由于該示例沒(méi)有輸入?yún)?shù),因此不需要配置參數(shù),可直接單擊右鍵->單擊”Run‘StreamingFileWordCount’ “。

第4步,在監(jiān)聽(tīng)目錄下創(chuàng)建文本文件。在master節(jié)點(diǎn)上的/home/dong/Streamingtext中分別創(chuàng)建file1.txt與file2.txt。

file1.txt內(nèi)容如下:

aa
bb

file2.txt內(nèi)容如下:

ee
dd
cc

創(chuàng)建后,/home/dong/Streamingtext中內(nèi)容如圖8所示。

大數(shù)據(jù)

圖8 Streamingtext文件夾內(nèi)容示意圖

查看結(jié)果

終端窗口輸出了每個(gè)批處理時(shí)間間隔(20秒)內(nèi),/home/dong/Streamingtext中新生成文件所包含的各單詞個(gè)數(shù),如圖9所示。

大數(shù)據(jù)

圖9 StreamingFileWordCount運(yùn)行結(jié)果示意圖

網(wǎng)絡(luò)數(shù)據(jù)處理案例

功能需求

監(jiān)聽(tīng)本地節(jié)點(diǎn)指定端口傳輸?shù)臄?shù)據(jù)流(本案例為master節(jié)點(diǎn)9999端口的英文文本數(shù)據(jù),以逗號(hào)間隔單詞),每5秒統(tǒng)計(jì)一次該時(shí)間間隔內(nèi)收集到的各單詞的個(gè)數(shù)。

代碼實(shí)現(xiàn)

本案例涉及數(shù)據(jù)流模擬器和分析器兩部分。為了更接近真實(shí)的網(wǎng)絡(luò)環(huán)境,首先定義數(shù)據(jù)流模擬器,該模擬器以Socket方式監(jiān)聽(tīng)網(wǎng)絡(luò)中指定節(jié)點(diǎn)上的指定端口號(hào)(master節(jié)點(diǎn)9999端口),當(dāng)外部程序通過(guò)該端口連接并請(qǐng)求數(shù)據(jù)時(shí),數(shù)據(jù)流模擬器將定時(shí)地從指定文本文件中隨機(jī)選取數(shù)據(jù)發(fā)送至指定端口(每間隔1秒鐘數(shù)據(jù)流模擬器從master節(jié)點(diǎn)上的/home/dong/Streamingtext/file1.txt中隨機(jī)截取一行文本發(fā)送給master節(jié)點(diǎn)的9999端口),通過(guò)這種方式模擬網(wǎng)絡(luò)環(huán)境下源源不斷的數(shù)據(jù)流。針對(duì)獲取到的實(shí)時(shí)數(shù)據(jù),再定義分析器(Spark Streaming應(yīng)用程序),用以統(tǒng)計(jì)時(shí)間間隔(5秒)內(nèi)收集到的單詞個(gè)數(shù)。

數(shù)據(jù)流模擬器代碼實(shí)現(xiàn)如下:

package dong.spark import java.io.{PrintWriter} import java.net.ServerSocket import scala.io.Source objectSocketSimulation { //定義隨機(jī)獲取整數(shù)的方法。 def index(length: Int)={ import java.util.Random val rdm = new Random rdm.nextInt(length) } def main(args:Array[String]): Unit ={ if(args.length!=3){ /*調(diào)用數(shù)據(jù)流模擬器需要三個(gè)參數(shù):文件路徑、端口號(hào)和批處理時(shí)間間隔時(shí)間(單位:毫秒)。*/ System.err.println("Usage:<filename><port><millisecond>") System.exit(1) } //獲取指定文件總的行數(shù)。 val filename = args(0) val lines = Source.fromFile(filename).getLines().toList val filerow=lines.length //指定監(jiān)聽(tīng)參數(shù)args(1)指定的端口,當(dāng)外部程序請(qǐng)求時(shí)建立連接。 val listener =new ServerSocket(args(1).toInt) while(true){ val socket = listener.accept() new Thread(){ override def run={ println("Got client connected from:"+socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(),true) while(true){ Thread.sleep(args(2).toLong) //當(dāng)該端口接受請(qǐng)求時(shí),隨機(jī)獲取某行數(shù)據(jù)發(fā)送給對(duì)方。 val content= lines(index(filerow)) println(content) out.write(content+'\n') out.flush() } socket.close() } }.start() } } }分析器代碼如下:package dong.spark import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def main (args:Array[String]) ={ //以local模式運(yùn)行,并設(shè)定master節(jié)點(diǎn)工作線程數(shù)為2。 val conf=new SparkConf().setAppName("NetworkWordCount"). setMaster("local[2]") val sc=new SparkContext(conf) val ssc=new StreamingContext(sc, Seconds(5)) /*通過(guò)socketTextStream獲取指定節(jié)點(diǎn)指定端口的數(shù)據(jù)創(chuàng)建DStream,并保存在內(nèi)存和硬盤中,其中節(jié)點(diǎn)與端口分別對(duì)應(yīng)參數(shù)args(0)和args(1)。*/val lines=ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) //在每個(gè)批處理時(shí)間間隔內(nèi)對(duì)獲取到的數(shù)據(jù)進(jìn)行單詞統(tǒng)計(jì)并且打印。 val words= lines.flatMap(_.split(",")) val wordcounts = words.map(x=>(x,1)).reduceByKey(_+_) wordcounts.print() ssc.start() ssc.awaitTermination() } }

運(yùn)行演示

第1步,在IntelliJ IDEA中編輯運(yùn)行Streaming程序。master節(jié)點(diǎn)啟動(dòng)IntelliJ IDEA,創(chuàng)建工程N(yùn)etworkWordCount,編輯模擬器與分析器。模擬器如圖10所示,分析器如圖11所示。

大數(shù)據(jù)

圖10 IntelliJ IDEA中數(shù)據(jù)流模擬器示意圖

大數(shù)據(jù)

圖11 IntelliJ IDEA中分析器示意圖

第2步,創(chuàng)建模擬器數(shù)據(jù)源文件。在master節(jié)點(diǎn)創(chuàng)建/home/dong/Streamingtext目錄,在其中創(chuàng)建文本文件file1.txt。

file1.txt內(nèi)容如下:

spark,
hello,
hbase,
world,

第3步,打包數(shù)據(jù)流模擬器。打包過(guò)程詳見(jiàn)本書(shū)4.3.3節(jié)。在Artifacts打包配置界面中,根據(jù)用戶實(shí)際scala安裝目錄,在Class Path中添加下述scala依賴包,如圖12所示。

/usr/scala-2.10.4/lib/scala-swing.jar /usr/scala-2.10.4/lib/scala-library.jar /usr/scala-2.10.4/lib/scala-actors.jar

大數(shù)據(jù)

圖12 在Class Path中添加scala依賴包

打包后在主目錄下生成NetworkWordCount.jar,如圖13所示。

圖13 在dong用戶主目錄下生成NetworkWordCount.jar示意圖

第4步,啟動(dòng)數(shù)據(jù)流模擬器。在master節(jié)點(diǎn)開(kāi)啟控制終端,通過(guò)下面代碼啟動(dòng)數(shù)據(jù)流模擬器。

$ java -cp /home/dong/NetworkWordCount.jar dong.spark.SocketSimulation/ home/dong/Streamingtest/file1.txt 9999 1000

數(shù)據(jù)流模擬器每間隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機(jī)截取一行文本發(fā)送給master節(jié)點(diǎn)的9999端口。在分析器未連接時(shí),數(shù)據(jù)流模擬器處于阻塞狀態(tài),終端不會(huì)顯示輸出的文本。

第5步,運(yùn)行分析器。在master上啟動(dòng)IntelliJ IDEA編寫(xiě)分析器代碼,然后單擊菜單”Build->”Build Artifacts”,通過(guò)Application選項(xiàng)配置分析器運(yùn)行所需的參數(shù),其中Socket主機(jī)名為master、端口號(hào)為9999,參數(shù)之間用空格間隔,如圖13所示。

大數(shù)據(jù)

圖13 分析器參數(shù)配置示意圖

配置好參數(shù)后返回IntelliJ IDEA菜單欄,單擊”Run”->”Build Artifacts”運(yùn)行分析器。

查看結(jié)果

第1步,在master上查看數(shù)據(jù)流模擬器運(yùn)行情況。IntelliJ IDEA運(yùn)行分析器從而與數(shù)據(jù)流模擬器建立連接。當(dāng)檢測(cè)到外部連接時(shí),數(shù)據(jù)流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機(jī)截取一行文本發(fā)送給master節(jié)點(diǎn)上的9999端口。為方便講解和說(shuō)明,file1.txt中每一行只包含一個(gè)單詞,因此數(shù)據(jù)流模擬器每次僅發(fā)送一個(gè)單詞給端口,如圖14所示。

大數(shù)據(jù)

圖14 在master上模擬器運(yùn)行結(jié)果

第2步,在master的IntelliJ IDEA中查看分析器運(yùn)行情況。在IntelliJ IDEA的運(yùn)行日志窗口中,可以觀察到統(tǒng)計(jì)結(jié)果。通過(guò)分析可知Spark Streaming每個(gè)批處理時(shí)間間隔內(nèi)獲取的單詞數(shù)為5,剛好是5秒內(nèi)發(fā)送單詞的總數(shù),并對(duì)各單詞進(jìn)行了統(tǒng)計(jì),如圖15所示。

大數(shù)據(jù)

圖15 IntelliJ IDEA中分析器運(yùn)行結(jié)果

stateful應(yīng)用案例

在很多數(shù)據(jù)流相關(guān)的實(shí)際應(yīng)用場(chǎng)景中,對(duì)當(dāng)前數(shù)據(jù)的統(tǒng)計(jì)分析需要借助于先前的數(shù)據(jù)處理結(jié)果完成。例如電商每間隔10分鐘統(tǒng)計(jì)某一商品當(dāng)前累計(jì)銷售總額、車站每隔3小時(shí)統(tǒng)計(jì)當(dāng)前客流總量,等等。此類應(yīng)用需求可借助于Spark Streaming的有狀態(tài)轉(zhuǎn)換操作實(shí)現(xiàn)。

功能需求

監(jiān)聽(tīng)網(wǎng)絡(luò)中某節(jié)點(diǎn)上指定端口傳輸?shù)臄?shù)據(jù)流(slave1節(jié)點(diǎn)9999端口的英文文本數(shù)據(jù),以逗號(hào)間隔單詞),每5秒分別統(tǒng)計(jì)各單詞的累計(jì)出現(xiàn)次數(shù)。

代碼實(shí)現(xiàn)

本案例功能的實(shí)現(xiàn)涉及數(shù)據(jù)流模擬器和分析器兩部分。

分析器代碼:

package dong.spark import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StatefulWordCount { def main(args:Array[String]): Unit ={ /*定義更新?tīng)顟B(tài)方法,參數(shù)values為當(dāng)前批處理時(shí)間間隔內(nèi)各單詞出現(xiàn)的次數(shù),state為以往所有批次各單詞累計(jì)出現(xiàn)次數(shù)。*/ val updateFunc=(values: Seq[Int],state:Option[Int])=>{ val currentCount=values.foldLeft(0)(_+_) val previousCount=state.getOrElse(0) Some(currentCount+previousCount) } val conf=new SparkConf(). setAppName("StatefulWordCount").setMaster("spark://192.168.149.132:7077") val sc=new SparkContext(conf) //創(chuàng)建StreamingContext,Spark Steaming運(yùn)行時(shí)間間隔為5秒。 val ssc=new StreamingContext(sc, Seconds(5)) /*使用updateStateByKey時(shí)需要checkpoint持久化接收到的數(shù)據(jù)。在集群模式下運(yùn)行時(shí),需要將持久化目錄設(shè)為HDFS上的目錄。*/ ssc.checkpoint("hdfs://master:9000/user/dong/input/StatefulWordCountlog") /*通過(guò)Socket獲取指定節(jié)點(diǎn)指定端口的數(shù)據(jù)創(chuàng)建DStream,其中節(jié)點(diǎn)與端口分別由參數(shù)args(0)和args(1)給出。*/ val lines=ssc.socketTextStream(args(0),args(1).toInt) val words=lines.flatMap(_.split(",")) val wordcounts=words.map(x=>(x,1)) //使用updateStateByKey來(lái)更新?tīng)顟B(tài),統(tǒng)計(jì)從運(yùn)行開(kāi)始以來(lái)單詞總的次數(shù)。 val stateDstream=wordcounts.updateStateByKey[Int](updateFunc) stateDstream.print() ssc.start() ssc.awaitTermination() } }

運(yùn)行演示

第1步,slave1節(jié)點(diǎn)啟動(dòng)數(shù)據(jù)流模擬器。

第2步,打包分析器。master節(jié)點(diǎn)啟動(dòng)IntelliJ IDEA創(chuàng)建工程StatefulWordCount編輯分析器,如圖16所示,并將分析器直接打包至master節(jié)點(diǎn)dong用戶的主目錄下,如圖17所示。

大數(shù)據(jù)

圖16 IntelliJ IDEA中StatefulWordCount示意圖

大數(shù)據(jù)

圖17 master上的StatefulWordCount.jar示意圖

第3步,運(yùn)行分析器。在master節(jié)點(diǎn)開(kāi)啟終端,通過(guò)下面代碼向Spark集群提交應(yīng)用程序。

$ bin/spark-submit ~/StatefulWordCount.jar slave1 9999

查看結(jié)果

第1步,查看slave1上數(shù)據(jù)流模擬器運(yùn)行情況。分析器在集群上提交運(yùn)行后與slave1上運(yùn)行的數(shù)據(jù)流模擬器建立連接。當(dāng)檢測(cè)到外部連接時(shí),數(shù)據(jù)流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機(jī)截取一行文本發(fā)送給slave1節(jié)點(diǎn)上的9999端口。由于該文本文件中每一行只包含一個(gè)單詞,因此每秒僅發(fā)送一個(gè)單詞給端口。如圖18所示。

大數(shù)據(jù)

圖18 slave1上數(shù)據(jù)流模擬器運(yùn)行示意圖

第2步,查看master上分析器運(yùn)行情況。在master節(jié)點(diǎn)的提交窗口中可以查看到統(tǒng)計(jì)結(jié)果,如圖19所示。

大數(shù)據(jù)

圖19 master上分析器運(yùn)行示意圖

圖中表明截至147920770500ms分析器共接收到14個(gè)單詞,其中”spark”累計(jì)出現(xiàn)3次,”hbase”累計(jì)出現(xiàn)5次,”hello”累計(jì)出現(xiàn)3次,”world”累計(jì)出現(xiàn)3次。由于批處理時(shí)間間隔是5s,模擬器每1秒發(fā)送1個(gè)單詞,使得分析器在5s內(nèi)共接收到5個(gè)單詞,因此截止至147920771000ms,分析器共收到19個(gè)單詞,其中”spark”累計(jì)出現(xiàn)5次,”hbase”累計(jì)出現(xiàn)7次,”hello”累計(jì)出現(xiàn)4次,”world”累計(jì)出現(xiàn)3次。

第3步,查看HDFS中持久化目錄。運(yùn)行后查看HDFS上的持久化目錄/user/dong/input/StatefulWordCountlog,如圖20所示。Streaming應(yīng)用程序?qū)⒔邮盏降木W(wǎng)絡(luò)數(shù)據(jù)持久化至該目錄下,便于容錯(cuò)處理。

大數(shù)據(jù)

圖20 HDFS上持久化目錄示意圖

window應(yīng)用案例

在實(shí)際生產(chǎn)環(huán)境中,與窗口相關(guān)的應(yīng)用場(chǎng)景很常見(jiàn),例如電商每間隔10分鐘小時(shí)統(tǒng)計(jì)某一商品前30分鐘內(nèi)累計(jì)銷售總額、車站每隔1小時(shí)統(tǒng)計(jì)前3個(gè)小時(shí)內(nèi)的客流量等,此類需求可借助Spark Streaming中的window相關(guān)操作實(shí)現(xiàn)。window應(yīng)用案例同時(shí)涉及批處理時(shí)間間隔、窗口時(shí)間間隔與滑動(dòng)時(shí)間間隔。

功能需求

監(jiān)聽(tīng)網(wǎng)絡(luò)中某節(jié)點(diǎn)上指定端口傳輸?shù)臄?shù)據(jù)流(slave1節(jié)點(diǎn)上9999端口的英文文本數(shù)據(jù),以逗號(hào)間隔單詞),每10秒統(tǒng)計(jì)前30秒各單詞累計(jì)出現(xiàn)的次數(shù)。

代碼實(shí)現(xiàn)

本例功能的實(shí)現(xiàn)涉及數(shù)據(jù)流模擬器和分析器兩部分。

分析器代碼:

package dong.spark import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevel object WindowWordCount { def main(args:Array[String]) ={ val conf=new SparkConf().setAppName("WindowWordCount"). setMaster("spark://192.168.149.132:7077") val sc=new SparkContext(conf) val ssc=new StreamingContext(sc, Seconds(5)) ssc.checkpoint("hdfs://master:9000/user/dong/WindowWordCountlog") val lines=ssc.socketTextStream( args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER) val words= lines.flatMap(_.split(",")) /*采用reduceByKeyAndWindow操作進(jìn)行疊加處理,窗口時(shí)間間隔與滑動(dòng)時(shí)間間隔分別由參數(shù)args(2)和args(3)給出。*/ val wordcounts=words.map(x=>(x,1)). reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(ar gs(2).toInt),Seconds(args(3).toInt)) wordcounts.print() ssc.start() ssc.awaitTermination() } }

運(yùn)行演示

第1步,slave1節(jié)點(diǎn)啟動(dòng)數(shù)據(jù)流模擬器。

第2步,打包分析器。在master節(jié)點(diǎn)啟動(dòng)IntelliJ IDEA創(chuàng)建工程WindowWordCount編輯分析器,如圖21,并將分析器直接打包至master節(jié)點(diǎn)dong用戶的主目錄下,如圖22所示。

大數(shù)據(jù)

圖21 IntelliJ IDEA中WindowWordCount示意圖

大數(shù)據(jù)

圖22 master上WindowWordCount.jar示意圖

第3步,運(yùn)行分析器。在master節(jié)點(diǎn)開(kāi)啟終端,通過(guò)下面代碼向Spark集群提交應(yīng)用程序。

$ bin/spark-submit ~/WindowWordCount.jar slave1 9999 30 10

查看結(jié)果

第1步 在slave1上查看數(shù)據(jù)流模擬器運(yùn)行情況。分析器在集群上提交運(yùn)行后與slave1上運(yùn)行的數(shù)據(jù)流模擬器建立連接。當(dāng)檢測(cè)到外部連接時(shí),數(shù)據(jù)流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機(jī)截取一行文本發(fā)送給slave1節(jié)點(diǎn)的9999端口。由于該文本文件中每一行只包含一個(gè)單詞和一個(gè)逗號(hào),因此每秒僅發(fā)送一個(gè)單詞和一個(gè)逗號(hào)給端口,如圖23所示。

大數(shù)據(jù)

圖23 在slave1上數(shù)據(jù)流模擬器運(yùn)行示意圖

第2步,在master上查看分析器運(yùn)行情況。在master節(jié)點(diǎn)的提交窗口中可以查看到統(tǒng)計(jì)結(jié)果。在WindowWordCount應(yīng)用程序啟動(dòng)初期,窗口并沒(méi)有被接收到的單詞填滿,但隨著時(shí)間的推進(jìn),每個(gè)窗口中的單詞數(shù)目最終固定為30個(gè)。圖7.35只是截取了運(yùn)行結(jié)果中的三個(gè)批次。由于設(shè)定了窗口時(shí)間間隔是30s,滑動(dòng)時(shí)間間隔是10s,且數(shù)據(jù)流模擬器每間隔1s發(fā)送一個(gè)單詞,因此WindowWordCount每間隔10s對(duì)過(guò)去30s內(nèi)收到的各單詞個(gè)數(shù)進(jìn)行統(tǒng)計(jì)。圖24中截至1479276925000ms分析器對(duì)過(guò)去30s內(nèi)收到的30個(gè)單詞進(jìn)行統(tǒng)計(jì),其中”spark”累計(jì)出現(xiàn)5次,”hbase”累計(jì)出現(xiàn)8次,”hello”累計(jì)出現(xiàn)9次,”world”累計(jì)出現(xiàn)8次。再間隔10s,截至1479276935000ms,分析器對(duì)過(guò)去30s內(nèi)收到的30個(gè)單詞進(jìn)行統(tǒng)計(jì),其中”spark”累計(jì)出現(xiàn)8次,”hbase”累計(jì)出現(xiàn)9次,”hello”累計(jì)出現(xiàn)7次,”world”累計(jì)出現(xiàn)6次。

大數(shù)據(jù)

圖24 在master上分析器運(yùn)行示意圖

第3步,查看持久化數(shù)據(jù)。運(yùn)行后查看HDFS上的持久化目錄/user/dong/input/WindowWordCountlog,如圖25所示。Streaming應(yīng)用程序?qū)⒔邮盏降木W(wǎng)絡(luò)數(shù)據(jù)持久化至該目錄下,便于容錯(cuò)處理。

大數(shù)據(jù)

圖25 HDFS上持久化目錄示意圖

性能考量

在開(kāi)發(fā)Spark Streaming應(yīng)用程序時(shí),要結(jié)合集群中各節(jié)點(diǎn)的配置情況盡可能地提高數(shù)據(jù)處理的實(shí)時(shí)性。在調(diào)優(yōu)的過(guò)程中,一方面要盡可能利用集群資源來(lái)減少每個(gè)批處理的時(shí)間;另一方面要確保接收到的數(shù)據(jù)能及時(shí)處理掉。

運(yùn)行時(shí)間優(yōu)化

設(shè)置合理的批處理時(shí)間和窗口大小

Spark Streaming中作業(yè)之間通常存在依賴關(guān)系,后面的作業(yè)必須確保前面的作業(yè)執(zhí)行結(jié)束后才能提交,若前面的作業(yè)的執(zhí)行時(shí)間超過(guò)了設(shè)置的批處理時(shí)間間隔,那么后續(xù)的作業(yè)將無(wú)法按時(shí)提交執(zhí)行,造成作業(yè)的堵塞。也就是說(shuō)若想Spark Streaming應(yīng)用程序穩(wěn)定地在集群中運(yùn)行,對(duì)于接收到的數(shù)據(jù)必須盡快處理掉。例如若設(shè)定批處理時(shí)間為1秒鐘,那么系統(tǒng)每1秒鐘生成一個(gè)RDD,如果系統(tǒng)計(jì)算一個(gè)RDD的時(shí)間大于1秒,那么當(dāng)前的RDD還沒(méi)來(lái)得及處理,后續(xù)的RDD已經(jīng)提交上來(lái)在等待處理了,這就產(chǎn)生了堵塞。因此需要設(shè)置一個(gè)合理的批處理時(shí)間間隔以確保作業(yè)能夠在這個(gè)批處理時(shí)間間隔時(shí)間內(nèi)結(jié)束。許多實(shí)驗(yàn)數(shù)據(jù)表明,500毫秒對(duì)大多Spark Streaming應(yīng)用而言是較好的批處理時(shí)間間隔。

類似地,對(duì)于窗口操作,滑動(dòng)時(shí)間間隔對(duì)于性能也有很大的影響。當(dāng)單批次數(shù)據(jù)計(jì)算代價(jià)過(guò)高時(shí),可以考慮適當(dāng)增大滑動(dòng)時(shí)間間隔。

對(duì)于批處理時(shí)間和窗口大小的設(shè)定,并沒(méi)有統(tǒng)一的標(biāo)準(zhǔn)。通常是先從一個(gè)比較大的批處理時(shí)間(10秒左右)開(kāi)始,然后不斷地使用更小的值進(jìn)行對(duì)比測(cè)試。如果Spark Streaming用戶界面中顯示的處理時(shí)間保持不變,則可以進(jìn)一步設(shè)定更小的值;如果處理時(shí)間開(kāi)始增加,則可能已經(jīng)達(dá)到了應(yīng)用的極限,再減小該值則可能會(huì)影響系統(tǒng)的性能。

提高并行度

提高并行度也是一種減少批處理所消耗時(shí)間的常見(jiàn)方法。有以下三種方式可以提高并行度。一種方法是增加接收器數(shù)目。如果獲取的數(shù)據(jù)太多,則可能導(dǎo)致單個(gè)節(jié)點(diǎn)來(lái)不及對(duì)數(shù)據(jù)進(jìn)行讀入與分發(fā),使得接收器成為系統(tǒng)瓶頸。這時(shí)可以通過(guò)創(chuàng)建多個(gè)輸入DStream來(lái)增加接收器數(shù)目,然后再使用union來(lái)把數(shù)據(jù)合并為一個(gè)數(shù)據(jù)源。第二種方法是將收到的數(shù)據(jù)顯式地重新分區(qū)。如果接收器數(shù)目無(wú)法再增加,可以通過(guò)使用DStream.repartition、spark.streaming.blocklnterval等參數(shù)顯式地對(duì)Dstream進(jìn)行重新分區(qū)。第三種方法是提高聚合計(jì)算的并行度。對(duì)于會(huì)導(dǎo)致shuffle的操作,例如reduceByKey、reduceByKeyAndWindow等操作,可通過(guò)顯示設(shè)置更高的行度參數(shù)確保更為充分地使用集群資源。

內(nèi)存使用與垃圾回收

控制批處理時(shí)間間隔內(nèi)的數(shù)據(jù)量

Spark Streaming會(huì)把批處理時(shí)間間隔內(nèi)獲取到的所有數(shù)據(jù)存放在Spark內(nèi)部可用的內(nèi)存中。因此必須確保在當(dāng)前節(jié)點(diǎn)上SparkStreaming可用的內(nèi)存容量至少能容下一個(gè)批處理時(shí)間間隔內(nèi)所有的數(shù)據(jù)。比如一個(gè)批處理時(shí)間間隔是1秒,但是1秒產(chǎn)生了1GB的數(shù)據(jù),那么要確保當(dāng)前的節(jié)點(diǎn)上至少有可供SparkStreaming使用的1GB內(nèi)存。

及時(shí)清理不再使用的數(shù)據(jù)

對(duì)于內(nèi)存中處理過(guò)的、不再需要的數(shù)據(jù)應(yīng)及時(shí)清理,以確保Spark Streaming能夠擁有足夠的內(nèi)存空間可以使用。一種方法是可以通過(guò)設(shè)置合理的spark.cleaner.ttl時(shí)長(zhǎng)來(lái)及時(shí)清理超時(shí)的無(wú)用數(shù)據(jù),但該方法應(yīng)慎重使用,以免后續(xù)數(shù)據(jù)在需要時(shí)被錯(cuò)誤清理。另一種方法是將spark.streaming.unpersist設(shè)置為true,系統(tǒng)將自動(dòng)清理已經(jīng)不需要的RDD。該方法能顯著減少RDD對(duì)內(nèi)存的需要,同時(shí)潛在地提高GC的性能。此外用戶還可以通過(guò)配置參數(shù)streamingContext.remember為數(shù)據(jù)設(shè)置更長(zhǎng)的保留時(shí)間。

減少序列化與反序列化的負(fù)擔(dān)

SparkStreaming默認(rèn)將接收到的數(shù)據(jù)序列化后放入內(nèi)存,以減少內(nèi)存使用。序列化和反序列化需要更多的CPU資源,因此使用適當(dāng)?shù)男蛄谢ぞ?例如Kryo)和自定義的序列化接口可以更高效地使用CPU。除了使用更好的序列化工具外還可以結(jié)合壓縮機(jī)制,通過(guò)配置spark.rdd.compress,以CPU的時(shí)間開(kāi)銷來(lái)?yè)Q取內(nèi)存資源,降低GC開(kāi)銷。

極客網(wǎng)企業(yè)會(huì)員

免責(zé)聲明:本網(wǎng)站內(nèi)容主要來(lái)自原創(chuàng)、合作伙伴供稿和第三方自媒體作者投稿,凡在本網(wǎng)站出現(xiàn)的信息,均僅供參考。本網(wǎng)站將盡力確保所提供信息的準(zhǔn)確性及可靠性,但不保證有關(guān)資料的準(zhǔn)確性及可靠性,讀者在使用前請(qǐng)進(jìn)一步核實(shí),并對(duì)任何自主決定的行為負(fù)責(zé)。本網(wǎng)站對(duì)有關(guān)資料所引致的錯(cuò)誤、不確或遺漏,概不負(fù)任何法律責(zé)任。任何單位或個(gè)人認(rèn)為本網(wǎng)站中的網(wǎng)頁(yè)或鏈接內(nèi)容可能涉嫌侵犯其知識(shí)產(chǎn)權(quán)或存在不實(shí)內(nèi)容時(shí),應(yīng)及時(shí)向本網(wǎng)站提出書(shū)面權(quán)利通知或不實(shí)情況說(shuō)明,并提供身份證明、權(quán)屬證明及詳細(xì)侵權(quán)或不實(shí)情況證明。本網(wǎng)站在收到上述法律文件后,將會(huì)依法盡快聯(lián)系相關(guān)文章源頭核實(shí),溝通刪除相關(guān)內(nèi)容或斷開(kāi)相關(guān)鏈接。

2017-10-12
大數(shù)據(jù)分析技術(shù)與實(shí)戰(zhàn)之Spark Streaming
隨著信息技術(shù)的迅猛發(fā)展,數(shù)據(jù)量呈現(xiàn)出爆炸式增長(zhǎng)趨勢(shì),數(shù)據(jù)的種類與變化速度也遠(yuǎn)遠(yuǎn)超出人們的想象,因此人們對(duì)大數(shù)據(jù)處理提出了更高的要求,越來(lái)越多的領(lǐng)域迫切需要大數(shù)據(jù)

長(zhǎng)按掃碼 閱讀全文