S4是Yahoo!在2010年10月開(kāi)源的一套通用、分布式、可擴(kuò)展、部分容錯(cuò)、具備可插拔功能的平臺(tái)。這套平臺(tái)主要是為了方便開(kāi)發(fā)者開(kāi)發(fā)處理流式數(shù)據(jù)(continuous unbounded streams of data)的應(yīng)用。項(xiàng)目官方網(wǎng)站為:http://s4.io/。同時(shí),S4的開(kāi)發(fā)者也發(fā)表了一篇技術(shù)論文《S4istributed Stream Computing Platform》來(lái)介紹S4的設(shè)計(jì)。下面我們就來(lái)學(xué)習(xí)這篇論文。 開(kāi)發(fā)動(dòng)機(jī) “We designed this engine to solve real-world problems in the context of search applications that use data mining and machine learning algorithms.” … “To process user feedback, we developed S4, a low latency, scalable stream processing engine.” Yahoo!之所以開(kāi)發(fā)S4系統(tǒng),主要是為了解決它現(xiàn)實(shí)的問(wèn)題:搜索廣告的展現(xiàn)。搜索廣告是當(dāng)前各大搜索引擎的主要收入來(lái)源,用戶發(fā)出查詢請(qǐng)求,搜索引擎在返回正常結(jié)果的同時(shí)也會(huì)返回相關(guān)廣告,而廣告是按照點(diǎn)擊付費(fèi)。為了在最好的位置,放置最相關(guān)(也就是用戶最有可能點(diǎn)擊)的廣告,各大搜索引擎使用了大量的數(shù)據(jù)挖掘和機(jī)器學(xué)習(xí)算法來(lái)進(jìn)行相關(guān)性計(jì)算,以便提高收入,滿足用戶需求。其中很重要的一點(diǎn)就是要不斷分析用戶的點(diǎn)擊反饋,以便捕獲用戶的行為。S4最初主要還只是用來(lái)處理用戶的點(diǎn)擊反饋。 “The streaming paradigm dictates a very different architecture than the one used in batch processing. Attempting to build a general- purpose platform for both batch and stream computing would result in a highly complex system that may end up not being optimal for either task.” 那么Yahoo!為什么沒(méi)有選擇Hadoop來(lái)處理呢? MapReduce系統(tǒng)主要解決的是對(duì)靜態(tài)數(shù)據(jù)的批量處理,即當(dāng)前的MapReduce系統(tǒng)實(shí)現(xiàn)啟動(dòng)計(jì)算時(shí),一般數(shù)據(jù)已經(jīng)到位了(比如保存到了分布式文件系統(tǒng)上)。 而流式計(jì)算系統(tǒng)在啟動(dòng)時(shí),一般數(shù)據(jù)并沒(méi)有完全到位,而是源源不斷地流入,并且不像批處理系統(tǒng)重視的是總數(shù)據(jù)處理的吞吐,而是對(duì)數(shù)據(jù)處理的latency,即希望進(jìn)入的數(shù)據(jù)越快處理越好。 當(dāng)然,現(xiàn)在也有很多基于Hadoop系統(tǒng)來(lái)處理流式數(shù)據(jù)。一般有以下幾種方式。
隨著大量實(shí)時(shí)應(yīng)用的發(fā)展,比如實(shí)時(shí)搜索、實(shí)時(shí)交易系統(tǒng)、實(shí)時(shí)欺騙分析、實(shí)時(shí)監(jiān)控、社交網(wǎng)絡(luò)等,都需要一個(gè)高度可擴(kuò)展的流式計(jì)算解決方案。不同于原來(lái)的流式計(jì)算系統(tǒng),S4主要解決的是高數(shù)據(jù)率和大數(shù)據(jù)量的流式處理。 設(shè)計(jì)假設(shè)和目標(biāo) 為了簡(jiǎn)化設(shè)計(jì),S4給出了下面的假設(shè)。 Lossy failover is acceptable,即一旦一個(gè)節(jié)點(diǎn)失敗,會(huì)failover到另一個(gè)standby節(jié)點(diǎn),但是會(huì)丟失原節(jié)點(diǎn)的內(nèi)存狀態(tài)。這也是為什么說(shuō)S4是一個(gè)部分容錯(cuò)的系統(tǒng)。 節(jié)點(diǎn)不能動(dòng)態(tài)增加和減少。 設(shè)計(jì)目標(biāo)包括以下幾個(gè)方面。
Event Stream 一個(gè)Stream是Events的序列流。每個(gè)Event是一個(gè)(K,A)數(shù)據(jù),通過(guò)EventType來(lái)標(biāo)示其類型。K、A分別表示這種類型的Event的keys和attributes。key和attribute都是tuple-valued,即key=value這種元組值。下面給出一個(gè)event的例子: EV:ClickLog → event type KEY:product=“search”, type=”online” → keys VAL: userid=”123”, ip=”10.0.0.0”, cookieid=”3” → attributes Processing Elements Processing Element(PE)是S4中的基本運(yùn)算單元。一個(gè)PE通過(guò)下面四個(gè)組件來(lái)表示。
每個(gè)PE只負(fù)責(zé)處理自己所關(guān)心的eventtype,并且只處理自己所對(duì)應(yīng)的key值的event。PE處理后可能輸出一個(gè)或多個(gè)event。當(dāng)平臺(tái)處理一個(gè)key值時(shí),會(huì)先檢查相應(yīng)的PE是否已經(jīng)存在,如果不存在,會(huì)先初始化相應(yīng)的PE,然后交由這個(gè)PE進(jìn)行處理。舉例如圖1所示。 在圖1中,PE2負(fù)責(zé)處理相應(yīng)的單詞事件(WordEvent),主要邏輯是統(tǒng)計(jì)所關(guān)心單詞的個(gè)數(shù),然后輸出給下游的PE。PE2所關(guān)心的eventtype為WorkEvent,所關(guān)心的key為word,所關(guān)心的key值為“said”。假如又來(lái)了一個(gè)WordEvent,key為word=“l(fā)isten”,那么這個(gè)事件就不是PE2所關(guān)心的,所以平臺(tái)可能會(huì)為“l(fā)isten”值啟動(dòng)一個(gè)新的PE來(lái)處理。 有一類特殊的PE,即keylessPE(沒(méi)有key和key值),這些PE會(huì)接收相應(yīng)eventtype的所有event進(jìn)行處理。這類PE主要用來(lái)作為S4cluster的輸入層(InputLayer),即外圍應(yīng)用會(huì)產(chǎn)生相應(yīng)的event(keylessevent),將這些event發(fā)到任何一個(gè)節(jié)點(diǎn)。而S4cluster中的每個(gè)節(jié)點(diǎn)都會(huì)啟動(dòng)一個(gè)keylessPE,這些PE做簡(jiǎn)單的輸入處理后,轉(zhuǎn)化為keyedevent,交給集群中的其他PE類型進(jìn)行處理。 PE的邏輯主要由應(yīng)用程序員來(lái)開(kāi)發(fā)。 Processing Node Processing Node是一個(gè)邏輯節(jié)點(diǎn),負(fù)責(zé)監(jiān)聽(tīng)消息的到來(lái),對(duì)消息進(jìn)行處理,然后通過(guò)Communication Layer將event在集群中分發(fā)。S4主要依據(jù)上面提到的eventtype和key/key值,對(duì)key值求hash,在集群中進(jìn)行分發(fā)。關(guān)注的key集合通過(guò)配置文件來(lái)得到。對(duì)于需要處理的event,會(huì)交給PN中的Processing Element Container(PEC),然后PEC調(diào)用相應(yīng)的PE進(jìn)行處理。PN功能框如圖2所示。 通過(guò)圖2的設(shè)計(jì),可以保證,對(duì)應(yīng)于相同event type,key和key值的event一定會(huì)被路由到對(duì)應(yīng)的PN。 底下的Communication Layer和Zookeeper共同完成了集群管理和自動(dòng)failover功能。 編程模型 應(yīng)用的主要任務(wù)就是實(shí)現(xiàn)一些相應(yīng)的PE。PE一般提供如下接口供應(yīng)用實(shí)現(xiàn)。
論文中給出了一個(gè)Word Count的例子,大家可以仔細(xì)研究一下。在性能測(cè)試部分,論文總結(jié)了將S4應(yīng)用到實(shí)際的CTR(Click-Through Rate)預(yù)估中的效果。在應(yīng)用舉例中,給出了S4在在線參數(shù)優(yōu)化的應(yīng)用。 隨著大量實(shí)時(shí)計(jì)算需求的增加,分布式流式計(jì)算將會(huì)成為分布式計(jì)算的下一個(gè)主要研究重點(diǎn),將會(huì)成為類似Hadoop這類MapReduce框架的有力補(bǔ)充。這一方向的工作還處在初級(jí)發(fā)展階段,大家需要多加關(guān)注。 |