文 | 潘國慶 攜程大資料平臺實時計算平臺負責人

攜程實時計算平臺架構與實踐丨DataPipeline

本文主要從攜程大資料平臺概況、架構設計及實現、在實現當中踩坑及填坑的過程、實時計算領域詳細的應用場景,以及未來規劃五個方面闡述攜程實時計算平臺架構與實踐,希望對需要構建實時資料平臺的公司和同學有所借鑑。

一、攜程大資料平臺之總體架構

攜程大資料平臺結構分為三層:

應用層:

開發平臺Zeus(分為排程系統、Datax資料傳輸系統、主資料系統、資料質量系統)、查詢平臺(ArtNova報表系統、Adhoc查詢)、機器學習(基於tensorflow、spark等開源框架進行開發;GPU雲平臺基於K8S實現)、實時計算平臺Muise;

中間層:

基於開源的大資料基礎架構,分為分散式儲存和計算框架、實時計算框架;

離線主要是基於Hadoop、HDFS分散式儲存、分散式離線計算基於Hive及Spark、KV儲存基於HBase、Presto和Kylin用於Adhoc以及報表系統;

實時計算框架底層是基於Kafka封裝的訊息佇列系統Hermes, Qmq是攜程自研的訊息佇列, Qmq主要用於定單交易系統,確保百分之百不丟失資料而打造的訊息佇列。

底層:

資源監控與運維監控,分為自動化運維繫統、大資料框架設施監控、大資料業務監控。

攜程實時計算平臺架構與實踐丨DataPipeline

二、架構設計與實現

1.Muise平臺介紹

1)Muise是什麼

Muise,取自希臘神話的文藝女神繆斯之名,是攜程的實時資料分析和處理的平臺;Muise平臺底層基於訊息佇列和開源的實時處理系統JStorm、Spark Streaming和Flink,能夠支援秒級,甚至是毫秒級延遲的流式資料處理。

2)Muise的功能

資料來源:

Hermes Kafka/Mysql、Qmq;

資料處理:

提供Muise JStorm/Spark/FlinkCore API消費Hermes或Qmq資料,底層使用Jstorm、Spark或實時處理資料,並提供自己封裝的API給使用者使用。API對接了所有資料來源系統,方便使用者直接使用;

作業管理:

Portal提供對於JStorm、Spark Streaming和Flink作業的管理,包含新建作業,上傳jar包以及釋出生產等功能;

監控和告警:

使用Jstorm、Spark和Flink提供的Metrics框架,支援自定義的metrics;metrics資訊中心化管理,接入Ops的監控和告警系統,提供全面的監控和告警支援,幫助使用者在第一時間內監控到作業是否發生問題。

2.Muise平臺現狀

平臺現狀:

Jstorm 2。1。1、Spark 2。0。1、Flink1。6。0、Kafka 2。0;

叢集規模:

13個叢集、200+臺機器150+Jstorm、50+Yarn、100+ Kafka;

作業規模:

11個業務線、350+Jstorm作業、120+SS/Flink作業;

訊息規模:

Topic 1300+、增量 100T+ PD、Avg 200K TPS、Max 900K TPS;

訊息延時:

Hermes 200ms以內、Storm 20ms以內;

訊息處理成功率:

99。99%。

3.Muise平臺演進之路

2015 Q2~2015 Q3 :基於Storm開發實時計算平臺;

2016 Q1~2016 Q2 :Storm遷移JStorm、引入StreamCQL;

2017 Q1~2017 Q2 :Spark Streaming調研與接入;

2017 Q3~2018 Q1 :Flink調研與接入。

4.Muise平臺架構

1)Muise平臺架構

應用層:

Muise Portal 目前主要支援了 Storm 與 Spark Streaming兩類作業,支援新建作業、Jar包釋出、作業執行與停止等一系列功能;

中間層:

對底層Infrastructure做了封裝,為使用者提供基於Storm、Spark、Flink相對應的API以及各方面Services;

底層:

Hermes & Qmq是資料來源、Redis、HBase、HDFS、DB等作為外部的資料儲存、Graphite、Grafana、ES主要用於監控。

攜程實時計算平臺架構與實踐丨DataPipeline

2)Muise實時計算流程

Producer端:

使用者先申請Kafka的topic,然後將資料實時寫到Kafka中;

Muise Portal端:

使用者基於我們提供的API做開發,開發完以後透過Muise Portal配置、上傳和啟動作業;作業啟動後,jar包會分發到各個對應的叢集消費Kafka資料;

儲存端:

資料在被消費之後可以寫回QMQ或Kafka,也可以儲存到外部系統Redis、HBase、HDFS/Hive、DB。

攜程實時計算平臺架構與實踐丨DataPipeline

5.平臺設計 ——易用性

首先:

作為一個平臺設計第一要點就是要簡單易用,我們提供綜合的Portal,便於使用者自己新建管理它的作業,方便開發實時作業第一時間能夠上線;

其次:

我們封裝了很多Core API,

支援多套實時計算框架:

支援HermesKafka/MySQL 、QMQ;

整合Jstorm、Spark Streaming、Flink;

作業資源管控;

提供DB、Redis、HBase和HDFS輸出元件;

基於內建Metric系統定製多項metric進行作業預警監控;

使用者可自定義Metric用於監控與預警;

支援AtLeast Once 與Exactly Once語義。

上文講到平臺設計要易用,下面講平臺的容錯,確保資料一定不能出問題。

6.平臺設計——容錯

Jstorm:

基於Acker機制確保At Least Once;

Spark Streaming:

基於Checkpoint實現Exactly Once、基於Kafka Offset回溯實現At Least Once;

Flink:

基於Flinktwo-phase commit + Kafka 0。11事務性支援實現Exactly Once。

7.Exactly Once

1)Direct Approach

當前大部分拿Spark Streaming消費Kafka的話,都是用Direct Approach的方式:

優點:

記錄每個批次消費的Offset,作業可透過offset回溯;

缺點:

資料儲存與offset儲存非同步:

資料儲存成功,應用宕機,offset未儲存 (導致資料重複);

offset儲存成功,應用宕機,資料儲存失敗 (導致資料丟失);

2)CheckPoint

優點:

預設記錄每個批次的執行狀態與源資料,宕機時可從cp目錄恢復;

缺點:

1。 非100%保證ExactlyOnce;

https://www。

iteblog。com/archives/17

95

描述了無法保證Exactly once的場景;

https://

issues。apache。org/jira/

browse/SPARK-17606

也存在doCheckPoint時出現塊丟失的情況;

2。 啟用cp帶來額外效能影響;

3。 Streaming作業邏輯改變無法從cp恢復。

適用場景:

比較適合有狀態計算的場景;

使用方式:

建議程式自己儲存offset,當發生宕機時,如果spark程式碼邏輯沒有發生改變,則根據checkpoint目錄建立StreamingContext。如果發生改變,則根據實現自己儲存的offset建立context並設立新的checkpoint點。

攜程實時計算平臺架構與實踐丨DataPipeline

8.平臺設計——監控與告警

如何能夠第一時間幫使用者發現作業問題,是一個重中之重。

叢集監控

伺服器監控:考量的指標有Memory、CPU、Disk IO、Net IO;

平臺監控:Ganglia;

作業監控

基於實時計算框架原生Metric系統;

定製Metrics反應作業狀態;

採集原生與定製Metrics用於監控和告警;

儲存:Graphite展 現:Grafana 告警:Appmon;

我們現在定製的很多Metrics當中比較通用的是:

Fail:

定期時間內,Jstorm資料處理失敗數量、Spark task Fail數量;

Ack:

定期時間內,處理的資料量;

Lag:

定期時間內,資料產生與被消費的中間延遲(kafka 2。0基於自帶bornTime)。

攜程開發了自己告警系統,將Metrics代入系統之後基於規則做告警。透過作業監控看板完成相關指標的監控和檢視,我們會把Flink作為比較關心的Metrics指標,全都匯入到Graphite資料庫裡面,然後基於前端Grafana做展現。透過作業監控看板,我們能夠直接看到Kafka to Flink Delay(Lag),相當於資料從產生到被Flink作業消費,中間延遲是62毫秒,速度相對比較快的。其次我們監控了每次從Kafka中獲取資料的速度。因為從Kafka獲取資料是基於一小塊一小塊去獲取,我們設定的是每次拉2兆的資料量。透過作業監控看板可以監控到每次從Kafka拉取資料時候的平均延遲是25毫秒,Max是 760毫秒。

攜程實時計算平臺架構與實踐丨DataPipeline

接下來講講我們在這幾年踩到的一些坑以及如何填坑的。

三、踩坑與填坑

坑1:

HermesUBT資料量大,埋點資訊眾多,服務端與客戶端均承受巨大壓力;

解決方案:

提供統一分流作業,基於特定規則與配置將資料分流至不同topic。

坑2

:Kafka無法保證全域性有序;

解決方案:

如果在強制全域性有序的場景下,使用單Partition;如果在部分有序的情況下,可基於某個欄位作Hash,保證Partition內部有序。

坑3

:Kafka無法根據時間精確回溯到某時間段的資料;

解決方案:

平臺提供過濾功能,過濾時間早於設定時間的資料(kafka 0。10之後每條資料都帶有自己的時間戳,所以這個問題在升級kafka之後自然而然的就解決了)。

坑4

:最初,攜程所有的Spark Streaming、Flink作業都是跑在主機群上面的,是一個大Hadoop叢集,目前是幾千臺規模,離線和實時是混布的,一旦一個大的離線作業上來時,會對實時作業有影響;其次是Hadoop叢集經常會做一些升級改造,所以可能會重啟Name Node或者Node Manager,這會導致作業有時會掛掉;

解決方案:

我們採用分開部署,單獨搭建實時叢集,獨立執行實時作業。離線歸離線,實時歸實時的,實時叢集單獨跑Spark Streaming跟Yarn的作業,離線專門跑離線的作業。

當分開部署後,會遇到新的問題,部分實時作業需要去一些離線作業做一些Join或 Feature的操作,所以也是需要訪問主機群資料。這相當於有一個跨叢集訪問的問題。

坑5

:Hadoop實時叢集跨叢集訪問主機群;

解決方案:

Hdfs-site。xml配置ns-prod、ns雙重namespace,分別指向本地與主機群;

Spark配置spark。yarn。access。namenodes or hadoopFlieSystems

坑6

:無論是Jstorm還是接Storm都會遇到一個CPU搶佔的問題,當你上了一個大的作業,尤其是那種消耗CPU特別厲害的,可能我給它分開了一個Worker,一個CPU Core,但是它最後有可能會給我用到3個甚至4個;

解決方案:

啟用cgroup限制cpu使用率。

四、應用場景

1.實時報表統計

實時報表統計與展現也是Spark Streaming使用較多的一個場景,資料可以基於Process Time統計,也可以基於Event Time統計。由於本身Spark Streaming不同批次的job可以視為一個個的滾動視窗,某個獨立的視窗中包含了多個時間段的資料,這使得使用SparkStreaming基於Event Time統計時存在一定的限制。一般較為常用的方式是統計每個批次中不同時間維度的累積值並匯入到外部系統,如ES;然後在報表展現的時基於時間做二次聚合獲得完整的累加值最終求得聚合值。下圖展示了攜程IBU基於Spark Streaming實現的實時看板。

攜程實時計算平臺架構與實踐丨DataPipeline

2.實時數倉

1)Spark Streaming近實時儲存資料

如今市面上有形形色色的工具可以從Kafka實時消費資料並進行過濾清洗最終落地到對應的儲存系統,如:Camus、Flume等。相比較於此類產品,Spark Streaming的優勢首先在於可以支援更為複雜的處理邏輯,其次基於Yarn系統的資源排程使得Spark Streaming的資源配置更加靈活,使用者採用Spark Streaming實時把資料寫到HDFS或者寫到Hive裡面去。

2)基於各種規則作資料質量檢測

基於Spark Streaming,自定義metric功能對資料的資料量、欄位數、資料格式與重複資料進行了資料質量校驗與監控。

3)基於自定義metric實時預警

基於我們封裝提供的Metric註冊系統確定一些規則,然後每個批次基於這些規則做一個校驗,返回一個結果。這個結果會基於Metric sink吐出來,吐出來基於metrics的結果做一個監控。當前我們採用Flink載入TensorFlow模型實時做預測。基本時效性是資料一旦到達兩秒鐘之內就能夠把告警資訊告出來,給使用者非常好的體驗。

攜程實時計算平臺架構與實踐丨DataPipeline

五、未來規劃

1.Flink on K8S

在攜程內部有一些不同的計算框架,有實時計算的,有機器學習的,還有離線計算的,所以需要一個統一的底層框架來進行管理,因此在未來將Flink遷移到了K8S上,進行統一的資源管控。

2.Muise平臺接入Flink SQL

Muise平臺雖然接入了Flink,但是使用者還是得手寫程式碼,我們開發了一個實時特徵平臺,使用者只需要寫SQL,即基於Flink的SQL就可以實時採集使用者所需要的模型裡面或者用到的特徵。之後會把實時特徵平臺跟實時計算平臺做進行合併,使用者最後只需要寫SQL就可以實現所有的實時作業實現。

3.Jstorm全面啟用Cgroup

當前由於部分歷史原因導致現在很多作業跑在Jstorm上面,因此出現了資源分配不均衡的情況,之後會全面啟用Cgroup。

4.線上模型訓練

攜程部分部門需要實時線上模型訓練,透過用Spark訓練了模型之後,然後使用Spark Streaming的模型,實時做一個攔截或者控制,應用在風控等場景。

—end—