一、前言

Data lineage includes the data origin, what happens to it and where it moves over time。 Data lineage gives visibility while greatly simplifying the ability to trace errors back to the root cause in a data analytics process。 ──百科Data lineage

大資料時代,資料的來源極其廣泛,各種型別的資料在快速產生,資料也是爆發性增長。從資料的產生,透過加工融合流轉產生新的資料,到最終消亡,資料之間的關聯關係可以稱之為資料血緣關係。

資料血緣是元資料管理、資料治理、資料質量的重要一環,追蹤資料的來源、處理、出處,對資料價值評估提供依據,描述源資料流程、表、報表、即席查詢之間的流向關係,表與表的依賴關係、表與離線ETL任務,排程平臺,計算引擎之間的依賴關係。資料倉庫是構建在Hive之上,而Hive的原始資料往往來自於生產DB,也會把計算結果匯出到外部儲存,異構資料來源的表之間是有血緣關係的。

資料血緣用途:

追蹤資料溯源:當資料發生異常,幫助追蹤到異常發生的原因;影響面分析,追蹤資料的來源,追蹤資料處理過程。

評估資料價值:從資料受眾、更新量級、更新頻次等幾個方面給資料價值的評估提供依據。

生命週期:直觀地得到資料整個生命週期,為資料治理提供依據。

安全管控:對源頭打上敏感等級標籤後,傳遞敏感等級標籤到下游。

本文介紹攜程資料血緣如何構建及應用場景。第一版T+1構建Hive引擎的表級別的血緣關係,第二版近實時構建Hive,Spark,Presto多個查詢引擎和DataX傳輸工具的欄位級別血緣關係。

二、構建血緣的方案

2.1 收集方式

方案一

:只收集SQL,事後分析。

當SQL執行結束,收集SQL到DB或者Kafka。

優點

:當計算引擎和工具不多的時候,語法相對相容的時候,用Hive自帶的LineageLogger重新解析SQL可以獲得表和欄位級別的關係。

缺點

:重放SQL的時候可能元資料發生改變,比如臨時表可能被Drop,沒有臨時自定義函式UDF,或者SQL解析失敗。

方案二

:執行時分析SQL並收集。

當SQL執行結束後立即分析Lineage,非同步傳送到Kafka。

優點

:執行時的狀態和資訊是最準確的,不會有SQL解析語法錯誤。

缺點

:需要針對各個引擎和工具開發解析模組,解析速度需要足夠快。

2.2 開源方案

Apache Atlas

Apache Atlas是Hadoop社群為解決Hadoop生態系統的元資料治理問題而產生的開源專案,它為Hadoop叢集提供了包括資料分類、集中策略引擎、資料血緣、安全和生命週期管理在內的元資料治理核心能力。官方外掛支援HBase、Hive、Sqoop、Storm、Storm、Kafka、Falcon元件。

Hook在執行時採集血緣資料,傳送到Kafka。Atlas消費Kafka資料,將關係寫到圖資料庫JanusGraph,並提供REST API。

其中Hive Hook支援表和列級別血緣,Spark需要使用GitHub的

hortonworks-spark/spark-atlas-connector,不支援列級別,Presto則不支援。

攜程資料血緣構建及應用

Linkedin DataHub

WhereHows專案已於2018年重新被LinkedIn公司設計為DataHub專案。它從不同的源系統中採集元資料,並進行標準化和建模,從而作為元資料倉庫完成血緣分析。

社群提供了一個Demo,演示地址:

https://

demo。datahubproject。io/

與Airflow整合較好,支援資料集級別血緣,欄位級別在2021Q3的Roadmap。

攜程資料血緣構建及應用

三、攜程方案

攜程採用了方案二,執行時分析SQL並收集分析結果到Kafka。由於開源方案在現階段不滿足需求,則自行開發。

由於當時缺少血緣關係,對資料治理難度較大,表級別的血緣解析難度較低,表的數量遠小於欄位的數量,早期先快速實現了表級別版本。

在16-17年實現和上線了第一個版本,收集常用的工具和引擎的表級別的血緣關係,T+1構建關係。

在19年迭代了第二個版本,支援解析Hive,Spark,Presto多個查詢引擎和DataX傳輸工具的欄位級別血緣關係,近實時構建關係。

四、第一個版本-表級別血緣關係

4.1 處理流程

針對Hive引擎開發了一個Hook,實現ExecuteWithHookContext介面,從HookContext可以獲得執行計劃,輸入表,輸出表等豐富資訊,非同步傳送到Kafka,部署的時候在hive。exec。post。hooks新增外掛即可。

在17年引入Spark2後,大部分Hive作業遷移到Spark引擎上,這時候針對Spark SQL CLI快速開發一個類似Hive Hook機制,收集表級別的血緣關係。

傳輸工具DataX作為一個異構資料來源同步的工具,單獨對其開發了收集外掛。

在經過解析處理後,將資料寫到圖資料庫Neo4j,提供元資料系統展示和REST API服務,落地成Hive關係表,供使用者查詢和治理使用。

攜程資料血緣構建及應用

4.2 效果

在元資料系統上,可以檢視一張表多層級的上下游血緣關係,在關係邊上會有任務ID等一些屬性。

攜程資料血緣構建及應用

4.3 痛點

隨著計算引擎的增加,業務的增長,表級別的血緣關係已經不滿足需求。

覆蓋面不足,缺少Spark ThriftServer , Presto引擎,缺少即席查詢平臺,報表平臺等。

關係不夠實時,期望寫入表後可以快速查詢到關係,使用者可以直觀檢視輸入和輸出,資料質量系統,排程系統可以根據任務ID查詢到輸出表,對錶執行質量校驗任務。

圖資料庫Neo4j社群版為單機版本,儲存數量有限,穩定性欠佳,當時使用的版本較低,對邊不能使用索引(3。5支援),這使得想從關係搜尋到關聯的上下游較為麻煩。

五、第二版本-欄位級別血緣關係

之前實現的第一個版本,對於細粒度的治理和追蹤還不夠,不僅缺少對欄位級別的血緣關係,也不支援採集各個系統的埋點資訊和自定義擴充套件屬性,難以追蹤完整鏈路來源,並且關係是T+1,不夠實時。

針對各個計算引擎和傳輸工具DataX開發不同的解析外掛,將解析好的血緣資料傳送到Kafka,實時消費Kafka,把關係資料寫到分散式圖資料JanusGraph。

攜程資料血緣構建及應用

5.1 傳輸工具DataX

阿里開源的Druid是一個 JDBC 元件庫,包含資料庫連線池、SQL Parser 等元件。透過重寫MySqlASTVisitor、SQLServerASTVisitor來解析MySQL / SQLServer的查詢SQL,獲得列級別的關係。

5.2 計算引擎

計算引擎統一格式,收集輸入表、輸出表,輸入欄位、輸出欄位,流轉的表示式等一些資訊。

攜程資料血緣構建及應用

Hive

參考

org。apache。hadoop。hive。ql。hooks。LineageLogger 實現,非同步傳送血緣資料到 Kafka。

Atlas的HiveHook也是實現ExecuteWithHookContext介面,從HookContext獲得LineageInfo,也可以參考HIVE-19288 引入的

org。apache。hadoop。hive。ql。hooks。HiveProtoLoggingHook,採集更多引擎相關的資訊。

其中遇到幾個問題:

透過HiveServer2執行獲取的start time不正確

HIVE-10957 QueryPlan‘s start time is incorrect in certain cases

獲取執行計劃空指標,導致收集失敗

HIVE-12709 further improve user level explain

獲取執行計劃有可能出現卡住,可以加個呼叫超時。

Spark

前置條件:引入 SPARK-19558 Add config key to register QueryExecutionListeners automatically,實現自動註冊QueryExecutionListener。

實現方式:透過實現QueryExecutionListener介面,在onSuccess回撥函式拿到當前執行的QueryExecution,透過LogicalPlan的output方法,獲得所有Attribute,利用NamedExpression的exprId對映關係,對其進行遍歷和解析,構建列級別關係。

覆蓋範圍:Spark SQL CLI、Thrift Server、使用Dataset/DataFrame API(如spark-submit、spark-shell、pyspark)

遇到問題:

使用analyzedPlan而不是optimizedPlan,optimizer的執行計劃可能會丟失一些資訊,可以在analyzedPlan的基礎上apply一些有助於分析的Rule,如CombineUnions。

傳遞的初始化用的hiveconf/hivevar變數被Thrift Server忽略,導致初始化Connection沒有辦法埋點。

打上Patch SPARK-13983 ,可以實現第一步,傳遞變數,但是這個變數在每次執行新的statement都重新初始化,導致使用者set的變數不可更新。後續給社群提交PR SPARK-26598,修復變數不可更新的問題。

SPARK-13983 Fix HiveThriftServer2 can not get “——hiveconf” and “——hivevar” variables since 2。0

SPARK-26598 Fix HiveThriftServer2 cannot be modified hiveconf/hivevar variables

Drop Table 的限制,DropTableCommand執行成功的時候,該表不一定在之前存在過,如果在Drop之前存在過,元資料也已經被刪除了,無從考證。

在DropTableCommand增加了一個標誌位,真正在有執行Drop操作的話再置為True,保證收集的血緣資料是對的。

使用Transform使用者自定義指令碼的限制

Transform不像java UDF,只輸入需要用到的欄位即可,而是需要將所有後續用到的欄位都輸入到自定義指令碼,指令碼再決定輸出哪些欄位,這其中列與列之間的對映關係無法透過執行計劃獲得,只能簡單的記錄輸出列的表示式,如transform(c1,c2,c3) script xxx。py to c4。

Presto

開發Presto EventListener Plugin,實現EventListener介面,從queryCompleted回撥函式的QueryCompletedEvent解析得到相應的資訊。

上線的時候遇到一個無法載入Kafka載入StringSerializer的問題(StringSerializer could not be found)。

Kafka客戶端使用 Class。forName(trimmed, true,

Utils。getContextOrKafkaClassLoader()) 來載入Class,優先從當前執行緒的ContextClassLoader載入,與Presto的ThreadContextClassLoader有衝突,需要初化始KafkaProducer的時候,將ContextClassLoader暫時置為NULL。

https://

stackoverflow。com/a/509

81469/1673775

5.3 圖資料庫JanusGraph

JanusGraph是一個開源的分散式圖資料庫。具有很好的擴充套件性,透過多機叢集可支援儲存和查詢數百億的頂點和邊的圖資料。JanusGraph是一個事務資料庫,支援大量使用者高併發地執行復雜的實時圖遍歷。

生產上,儲存我們使用Cassandra,索引使用Elasticsearch,使用Gremlin查詢/遍歷語言來讀寫JanusGraph,有上手難度,熟悉Neo4j的Cypher語法可以使用cypher-for-gremlin plugin。

攜程資料血緣構建及應用

以下是資料血緣寫入圖資料庫的模型,Hive欄位單獨為一個Lable,關係型DB欄位為一個Label,關係分兩種,LABELWRITE,LABELWRITE_TTL。

只有輸入沒有輸出(Query查詢操作),只有輸出沒有輸入(建表等DDL操作)也會強制繫結一個來源系統的ID及擴充套件屬性。

在生產上使用JanusGraph,儲存億級的血緣關係,但是在開發過程中也遇到了一些效能問題。

寫入速度最佳化

以DB名+表名+欄位名作為唯一key,實現getOrCreateVertex,並對vertex id快取,加速頂點的載入速度。

關係批次刪除

關係LABELWRITETTL表示寫入的關係有存活時間(TTL-Time to live),這是因為在批次刪除關係的時候,JanusGraph速度相當慢,而且很容易OOM。比如要一次性刪除,Label為WRITE,x=y,寫入時間小於等於某個時間的邊,這時候Vertex和Edge load到記憶體中,容易OOM。

g。E()。hasLabel(“WRITE”)。has(“x”,eq(“y”))。has(“publishedDate”,P。lte(new Date(1610640000)))。drop()。iterate()

嘗試使用多執行緒+分批次的方式,即N個執行緒,每個執行緒刪除1000條,速度也不太可接受。

這時候採用了折中的方案,需要刪除關係用另外一種Label來表示,並在建立Label指定了TTL,由於Cassandra支援cell level TTL,所以邊的資料會自動被刪除。但是ES不支援TTL,實現一個定時刪除ES過期資料即可。

攜程資料血緣構建及應用

5.4 覆蓋範圍

Zeus排程平臺 (ETL操作INSERT、CTAS,QUERY)

Ad-Hoc即席查詢平臺 (CTAS,QUERY)

報表平臺 (QUERY)

元資料平臺 (DDL操作)

GPU平臺 (PySpark)

透過ETL任務ID,查詢任務ID,報表ID,都可以獲取到輸入,輸出的表和欄位的關係。

5.5 侷限

使用MapReduce、Spark RDD讀寫HDFS的血緣暫時沒有實現。

思路可以在JobClient。submitJob的時候採集輸入和輸出路徑,又或者透過HDFS的AuditLog、CallerContext來關聯。

5.6 效果

在第一版使用圖的方式展示血緣關係,在上下游關係較多的時候,顯示較為混亂,第二版改成樹狀表格的方式展示。

欄位operator在排程系統Zeus被轉換成hive_account,最後輸出是ArtNova報表系統的一張報表。

攜程資料血緣構建及應用

六、實際應用場景

6.1 資料治理

透過血緣關係篩選,每天清理數千張未使用的臨時表,節約空間。

作為資料資產評估的依據,統計表、欄位讀寫次數,生成的表無下游訪問,包括有沒有排程任務,報表任務,即席查詢。

6.2 元資料管理

統計一張表的生成時間,而不是統計整個任務的完成時間。

資料異常,或者下線一張表、一個欄位的時候,可以找到相關的ETL任務或者報表任務,及時通知下游。

統計表的使用熱度,顯示趨勢。

攜程資料血緣構建及應用

6.3 排程系統

得益於在圖資料庫JanusGraph可以使用關係邊的key作為索引,可以根據任務ID可以輕鬆獲得該任務輸入和輸出表。

當配置一個任務A的依賴任務列表的時候,可以使用推薦依賴,檢查依賴功能,獲得任務A的所有輸入表,再透過輸入的表獲得寫入任務ID列表,即為任務A所需依賴的任務列表。

在任務結束後,獲取該任務所有輸出的表,進行預配的規則進行資料質量校驗。

攜程資料血緣構建及應用

6.4 敏感等級標籤

當源頭的資料來自生產DB時,生產DB有些列的標籤已打上了敏感等級,透過血緣關係,下游的表可以繼承敏感等級,自動打上敏感標籤。

七、總結

以上描述了攜程如何構建表和欄位級別的血緣關係,及在實際應用的場景。

隨著業務需求和資料的增長,資料的加工流程越來越複雜,構建一套資料血緣,可以輕鬆查詢到資料之間的關係,進行表和欄位級的血緣追溯,在元資料管理,資料治理,資料質量上承擔重要一環。

團隊招聘資訊

我們是攜程集團的大資料平臺研發團隊,主要負責攜程大資料平臺的建設,包括但不限於Hadoop生態原始碼二次開發,任務排程,查詢平臺的開發,致力於為集團提供穩定、高效、易用的大資料儲存和計算服務,實現高效的資源排程,打造服務於所有業務的資料平臺產品、服務與應用。

團隊懷有前瞻的技術視野,積極擁抱開源建設,緊跟業界技術趨勢,在這裡有濃厚的技術氛圍,你可以和團隊成員一同參與開源建設,深入探索和交流技術原理,也有技術實施的廣闊場景。

簡歷投遞郵箱:tech@trip。com,郵件標題:【姓名】-【攜程大資料平臺】-【投遞職位方向】。

【作者簡介】

cxzl25,攜程軟體技術專家,關注大資料領域生態建設,對分散式計算和儲存、排程等方面有濃厚興趣。