S4是Yahoo!在2010年10月開源的一套通用、分布式、可擴展、部分容錯、具備可插拔功能的平臺。這套平臺主要是為了方便開發者開發處理流式數據(continuous unbounded streams of data)的應用。項目官方網站為:http://s4.io/。同時,S4的開發者也發表了一篇技術論文《S4istributed Stream Computing Platform》來介紹S4的設計。下面我們就來學習這篇論文。 開發動機 “We designed this engine to solve real-world problems in the context of search applications that use data mining and machine learning algorithms.” … “To process user feedback, we developed S4, a low latency, scalable stream processing engine.” Yahoo!之所以開發S4系統,主要是為了解決它現實的問題:搜索廣告的展現。搜索廣告是當前各大搜索引擎的主要收入來源,用戶發出查詢請求,搜索引擎在返回正常結果的同時也會返回相關廣告,而廣告是按照點擊付費。為了在最好的位置,放置最相關(也就是用戶最有可能點擊)的廣告,各大搜索引擎使用了大量的數據挖掘和機器學習算法來進行相關性計算,以便提高收入,滿足用戶需求。其中很重要的一點就是要不斷分析用戶的點擊反饋,以便捕獲用戶的行為。S4最初主要還只是用來處理用戶的點擊反饋。 “The streaming paradigm dictates a very different architecture than the one used in batch processing. Attempting to build a general- purpose platform for both batch and stream computing would result in a highly complex system that may end up not being optimal for either task.” 那么Yahoo!為什么沒有選擇Hadoop來處理呢? MapReduce系統主要解決的是對靜態數據的批量處理,即當前的MapReduce系統實現啟動計算時,一般數據已經到位了(比如保存到了分布式文件系統上)。 而流式計算系統在啟動時,一般數據并沒有完全到位,而是源源不斷地流入,并且不像批處理系統重視的是總數據處理的吞吐,而是對數據處理的latency,即希望進入的數據越快處理越好。 當然,現在也有很多基于Hadoop系統來處理流式數據。一般有以下幾種方式。
隨著大量實時應用的發展,比如實時搜索、實時交易系統、實時欺騙分析、實時監控、社交網絡等,都需要一個高度可擴展的流式計算解決方案。不同于原來的流式計算系統,S4主要解決的是高數據率和大數據量的流式處理。 設計假設和目標 為了簡化設計,S4給出了下面的假設。 Lossy failover is acceptable,即一旦一個節點失敗,會failover到另一個standby節點,但是會丟失原節點的內存狀態。這也是為什么說S4是一個部分容錯的系統。 節點不能動態增加和減少。 設計目標包括以下幾個方面。
Event Stream 一個Stream是Events的序列流。每個Event是一個(K,A)數據,通過EventType來標示其類型。K、A分別表示這種類型的Event的keys和attributes。key和attribute都是tuple-valued,即key=value這種元組值。下面給出一個event的例子: EV:ClickLog → event type KEY:product=“search”, type=”online” → keys VAL: userid=”123”, ip=”10.0.0.0”, cookieid=”3” → attributes Processing Elements Processing Element(PE)是S4中的基本運算單元。一個PE通過下面四個組件來表示。
每個PE只負責處理自己所關心的eventtype,并且只處理自己所對應的key值的event。PE處理后可能輸出一個或多個event。當平臺處理一個key值時,會先檢查相應的PE是否已經存在,如果不存在,會先初始化相應的PE,然后交由這個PE進行處理。舉例如圖1所示。 在圖1中,PE2負責處理相應的單詞事件(WordEvent),主要邏輯是統計所關心單詞的個數,然后輸出給下游的PE。PE2所關心的eventtype為WorkEvent,所關心的key為word,所關心的key值為“said”。假如又來了一個WordEvent,key為word=“listen”,那么這個事件就不是PE2所關心的,所以平臺可能會為“listen”值啟動一個新的PE來處理。 有一類特殊的PE,即keylessPE(沒有key和key值),這些PE會接收相應eventtype的所有event進行處理。這類PE主要用來作為S4cluster的輸入層(InputLayer),即外圍應用會產生相應的event(keylessevent),將這些event發到任何一個節點。而S4cluster中的每個節點都會啟動一個keylessPE,這些PE做簡單的輸入處理后,轉化為keyedevent,交給集群中的其他PE類型進行處理。 PE的邏輯主要由應用程序員來開發。 Processing Node Processing Node是一個邏輯節點,負責監聽消息的到來,對消息進行處理,然后通過Communication Layer將event在集群中分發。S4主要依據上面提到的eventtype和key/key值,對key值求hash,在集群中進行分發。關注的key集合通過配置文件來得到。對于需要處理的event,會交給PN中的Processing Element Container(PEC),然后PEC調用相應的PE進行處理。PN功能框如圖2所示。 通過圖2的設計,可以保證,對應于相同event type,key和key值的event一定會被路由到對應的PN。 底下的Communication Layer和Zookeeper共同完成了集群管理和自動failover功能。 編程模型 應用的主要任務就是實現一些相應的PE。PE一般提供如下接口供應用實現。
論文中給出了一個Word Count的例子,大家可以仔細研究一下。在性能測試部分,論文總結了將S4應用到實際的CTR(Click-Through Rate)預估中的效果。在應用舉例中,給出了S4在在線參數優化的應用。 隨著大量實時計算需求的增加,分布式流式計算將會成為分布式計算的下一個主要研究重點,將會成為類似Hadoop這類MapReduce框架的有力補充。這一方向的工作還處在初級發展階段,大家需要多加關注。 |