近日常有同學來問我如何閱讀程式碼,關於這個問題的一般性答案我特別提了一個問題並自問自答。出於提供一個實際的例子的考量,正好此前綜合地閱讀了 Spark 的 RPC 實現、Flink 基於 Akka 的 RPC 實現和 Actor Model 的通訊模型,寫成本文分享我閱讀分散式計算系統 Spark 和 Flink 中的 RPC 實現的過程和思考。

簡介 Actor Model 及 Akka 的問題

通常來說,閱讀程式碼的流程是首先了解自己要閱讀的程式碼解決了什麼問題,這個問題的現有解決方案是什麼,有什麼優勢和缺點。大致清楚了這些背景之後再在走讀程式碼的過程中思考閱讀的程式碼具體是怎麼解決這個問題的,最後專注到重點難點的程式碼塊的理解上。也就是說,程式碼閱讀最重要的不是程式碼。程式碼只是將思考的結果轉換為實際可用的軟體的手段,思考的結果或者說解決問題的方法才是重要的內容。

分散式計算系統的分散式特性決定了設計過程中必然會考慮節點間的通訊問題,即籠統的 RPC 需求。關於 RPC 和 RMI 及 Actor Model 具體的差別本文不做展開,主要集中在 Spark 和 Flink 的 RPC 實現來介紹 Actor Model 下的 RPC 實現。

Actor Model 的主要概念包括

通訊的主體 Actor

通訊的內容 Message

單執行緒先到先處理的訊息處理器 Mailbox

特別需要提及的是 Actor 之間的通訊是透過類似於地址的 ActorRef 來引用其他的 Actor 的,同時,在實現中,需要一個支援 Actor Model 執行起來的 ActorSystem 環境。這些具體的概念和名詞屬於 Akka,我們會在後面看到它們如何在 Spark 和 Flink 中被一一對應。

Actor Model 一個很少被注意的特點是它的建模過程中只存在 tell 這一個通訊原語,ask 等等只是構建在 tell 上層的方便的通訊模式。這就導致一個問題,即 Actor Model 原生的程式設計模式是明顯不同於傳統的程式設計模型的。傳統的程式設計模型中,函式呼叫是有返回值的,即使採用 Future 作為返回值的佔位符,本質上還是有一一對應的返回值的;而在 Actor Model 中,訊息一經發出就被遺忘,即所謂的 fire and forget 模式。要建立當前發出的訊息和稍後收到的訊息之間的 ask and answer 關係,需要額外的工作。這部分的內容可以參考 Akka 官方文件中介紹通訊模式的章節,本身可以作為 Akka 最佳實踐的一部分,有時間我會專門寫一篇文章介紹 Actor Model 下完全被顛覆的程式設計模型以及透過在其上模擬常見的程式設計模型來探索 Actor Model 的最佳實踐。

關於更多 Actor Model 的概念性和介紹性資料,可以參考的資料有 Akka 的官方文件和《反應式設計模式》等等。

Akka 作為目前最成熟的 Actor Model 的實現之一,以及擁有容易理解的單執行緒 Actor 和併發通訊模型,廣泛地充當了 JVM 系的分散式系統的 RPC 層。Akka 最近的演化有兩個重點,一個是型別化(Typed)的 Akka,另一個是在拆分行為(Behavior)和狀態(State)的概念。前者我們後面看到 Spark 和 Flink 的 RPC 實現時就能看到選擇標準的不同,後者這裡不作展開,可能會在後續討論函數語言程式設計的文章中再次提及。

儘管 Akka 的實現非常成熟,但是直接使用 Akka 的底層 Actor Model 的軟體卻不多。對於業務軟體來說,Akka Model 過於底層,如果要利用它帶來的好處通常會直接使用 Akka Streams 和 Akka HTTP 等上層建築;對於其他分散式系統來說,主要有以下兩個問題。

第一個問題是兩層叢集的負擔。如果我們使用 Akka 作為底層 RPC 的實現,本身 Akka 會有相應的基礎元件,包括 ActorSystem 或者進一步使用 Akka Cluster 的話相應的 Cluster 物件。我們的分散式系統例如 Spark 和 Flink 本身有自己的叢集管理策略,在 Spark 中有 Driver 和 Worker 的概念,在 Flink 中有 JobManager 和 TaskManager 等概念。如果在處理本身系統的叢集管理的同時還要兼顧底層的 Akka 叢集,這樣兩層的叢集在實際開發和運維的過程當中會帶來額外的複雜性。尤其是 Akka 作為一個功能複雜的重量級框架,並且在 Typed Akka 中做出了限制公開的直接溝通兩個 Actor 的能力,強制要求使用 Akka Cluster 的決定。同時處理兩層叢集複雜的狀態機和角色與訊息的轉換將會是一個巨大的負擔。

第二個問題是版本的負擔,這也是 Spark 走向去 Akka 化的直接原因,也是 Flink 社群經常被提問的一個問題。我們知道,為了保證分散式系統的穩定性,它依賴的元件尤其是 RPC 實現這樣底層模組的依賴版本會保持相當的穩定性。這樣就有一個問題,Spark 和 Flink 的使用者在使用它們的同時也很有可能使用 Akka,並且依賴的是另一個 Akka 的版本。這樣,就會出現版本不同帶來的不相容性問題。通常來說,這一點可以透過釋出一個專案專有的第三方依賴並使用 shaded 技術重定位包名來解決問題。但是由於重定位為了覆蓋反射呼叫,是在位元組碼級別對全限定名和字串的包名字首做替換。一般來說,包名都是諸如

org。apache。spark

或者

org。apache。flink

的形式,具有唯一性,替換起來不會有什麼問題。Akka 就不一樣了,它的包名是

akka。actor

等等,跟配置名稱是一樣的。這就導致重定位會錯誤改動程式碼中的配置名字串導致執行時字串失配出錯。版本問題在 Lightbend 全家桶裡是不存在的,例如 Play 透過介面暴露底層的 Akka 資料結構,並固定依賴到某一個版本,這樣使用 Play 的人需要 Akka 的功能是隻需要透過介面拿到對應的 Akka 資料結構就可以,但是這種方式並沒有考慮和其他系統的版本相容問題。

雖然上述問題可以透過定製 ClassLoader 並精心調整打包策略來繞過,或者要求使用者程式使用跟系統框架相容的 Akka 版本,但是這會導致複雜不友好的使用者體驗,而清楚簡單的使用者體驗很多時候比功能更能決定一個框架的生存空間。同時,Akka 提供的很多功能,例如 Actor Model 基石的監督(Supervise)功能,對於上層提供 Failover 機制的 Spark 和 Flink 來說是多餘的。前有使用者體驗的硬性需求,後有開發輕量化的敏捷需求,Ligetbend 系以外的成熟的分散式系統開發自己的 RPC 實現是理所當然的選擇。

理解了 Spark 和 Flink 為什麼要開發自己的 RPC 實現之後,我們再看到 RPC 實現具體的考量點和內容。

Spark 的 RPC 實現

Spark 開發自己的 RPC 實現以換下 Akka 的理由主要是上面提及的版本依賴問題,在社群中記錄為 SPARK-5293。

閱讀相關程式碼,首先我們要定位程式碼的位置。Spark 的 RPC 實現主要位於

core

模組下的

org。apache。spark。rpc

這個包下,閱讀程式碼的過程中透過跳轉到定義和查詢使用點可以找到完整的脈絡。結果而言,除了實際的 RPC Endpoint 實現之外,主要相關的程式碼還包括

common/network-common

路徑下網路傳輸層相關的底層支援。

Spark 的 RPC 實現雖然是為了替換 Akka 而誕生的,但是它實際上可以看成一個簡化版的 Akka,仍然遵循許多 Actor Model 的抽象。例如

RpcEndpoint 對應 Actor

RpcEndpointRef 對應 ActorRef

RpcEnv 對應 ActorSystem

RpcEndpoint 與訊息處理模型

這其中從模型上來說最簡單的反而是 RpcEndpoint,因為所有的實現邏輯是具體實現類的事情,它其實只是一個簡單的存根(Stub)。總的來說,RpcEndpoint 有以下介面

private

spark

trait

RpcEndpoint

{

final

def

self

RpcEndpointRef

=

???

final

def

stop

()

Unit

=

???

val

rpcEnv

RpcEnv

=

???

def

receive

PartialFunction

Any

Unit

=

???

def

receiveAndReply

context

RpcCallContext

PartialFunction

Any

Unit

=

???

def

onError

cause

Throwable

Unit

=

???

def

onConnected

remoteAddress

RpcAddress

Unit

=

???

def

onDisconnected

remoteAddress

RpcAddress

Unit

=

???

def

onNetworkError

cause

Throwable

remoteAddress

RpcAddress

Unit

=

???

def

onStart

()

Unit

=

???

def

onStop

()

Unit

=

???

}

可以看到,上面的函式我分成了四組,其中第一組是和元操作有關的,望文生義不做解釋;第三組是連線和錯誤處理相關的回撥函式,主要是記錄日誌和處理邊界情況的,也不用多做介紹;第四組實現的比較多,雖然和第三組一樣是掛載在特定事件上的回撥函式,但是 RpcEndpoint 啟動和關閉時常常需要做狀態初始化和終結,以及資源的申請和釋放,所以

onStart

onStop

是經常被實現的介面。

這裡在展開說一下第二組介面。首先是

receive

,這個介面跟 Akka 裡面 Actor 的

receive

是類似的,我們看到它的簽名是

PartialFunction[Any, Unit]

,也就是說實現起來類似於下面的程式碼塊。

override

def

receive

PartialFunction

Any

Unit

=

{

case

Message

=>

。。。

case

BoxedMessage

msg1

msg2

=>

。。。

}

可以看到和 Untyped Akka 別無二致,也就是說 Spark 的 RPC 實現也是非型別化的,程式設計模型上基於訊息和模式匹配來做的。後面我們會看到 Flink 對這一點做了不同的選擇,介紹完 Flink 的情況後我們會做一個總的探討。

另一個介面就比較有意思了,

receiveAndReply

實現了接收資訊後返回的功能。由於沒有實現 Akka 中上下文

sender()

的邏輯,Spark 使用了另一個介面來處理需要返回的呼叫。我們分兩點說明

sender()

的問題和 Spark 基於 Actor Model 實現了傳統的具有返回值的呼叫的方式。

第一點,

sender()

主要的問題是,它是一個方法呼叫,而不是一個確定性的值。這是函數語言程式設計的擁躉喜歡討論的話題,即在不同的時刻呼叫

sender()

會返回不同的值。乍一看我們在每次處理一條訊息的時候都呼叫

sender()

獲得當前訊息的傳送來源並沒有問題,不過這個方法在 Akka 社群給新手帶來了不少麻煩。

最大的問題還是上面提到的呼叫點的問題。通常來說,由於 Actor Model 中的 Actor 是單執行緒的處理訊息的,你在同一個訊息處理過程中多次呼叫

sender()

返回的都是當前訊息的來源。不過,在一個常見的場景中,你在處理訊息的時候發起了另一個非同步動作,在非同步動作中呼叫

sender()

來獲取當前訊息的來源。由於非同步動作觸發的時間是未知的,實際上當它觸發時再次呼叫

sender()

的時候,可能返回的就是另一條訊息的來源了。這個問題很好解決,即用一個變數儲存當前的

sender()

後面傳遞這個物件而不是再次呼叫

sender()

獲取物件。顯然,Spark 的

receiveAndReply

中的引數

context

就是這個可用於發回訊息的上下文,與

sender()

類似。而在 Typed Akka 中,由於

sender()

無法確切的型別化,因此採用的是將訊息來源直接編碼在傳送的訊息中的方式以在需要的時候使用它回覆訊息,這要求 ActorRef 在不同的 ActorSystem 上正確的序列化和反序列化。

第二點,我們看到這裡的時候就會想,那我現在有兩個 receive 函式,雖然我可以根據需不需要傳送回覆訊息把訊息處理邏輯拆分到不同的函數里,但是 Spark 又是怎麼知道應該把入站的請求分配到哪個函式的呢?這個就涉及到 Spark RPC 實現的細節。簡單的說我們先看到呼叫兩個 receive 函式的片段。

// Inbox。scala

class

Inbox

{

def

process

dispatcher

Dispatcher

Unit

=

{

// 。。。

message

match

{

case

RpcMessage

_sender

content

context

=>

try

{

endpoint

receiveAndReply

context

)。

applyOrElse

Any

Unit

](

content

{

msg

=>

throw

new

SparkException

s“Unsupported message

$message

from

${

_sender

}

})

}

catch

{

case

e

Throwable

=>

context

sendFailure

e

throw

e

}

case

OneWayMessage

_sender

content

=>

endpoint

receive

applyOrElse

Any

Unit

](

content

{

msg

=>

throw

new

SparkException

s“Unsupported message

$message

from

${

_sender

}

})

case

OnStart

=>

endpoint

onStart

()

// 。。。

case

OnStop

=>

// 。。。

endpoint

onStop

()

assert

isEmpty

“OnStop should be the last message”

case

RemoteProcessConnected

remoteAddress

=>

endpoint

onConnected

remoteAddress

case

RemoteProcessDisconnected

remoteAddress

=>

endpoint

onDisconnected

remoteAddress

case

RemoteProcessConnectionError

cause

remoteAddress

=>

endpoint

onNetworkError

cause

remoteAddress

}

// 。。。

}

}

簡單掃過回撥系列函式,我們看到 Spark RPC 判斷將訊息轉往何處主要是看訊息的型別是

RpcMessage

還是

OneWayMessage

。從名字中我們就可以看出,前者指的是呼叫並返回的訊息,後者是 fire and forget 的訊息。我們跳轉到定義並查詢初始化點,可以發現生成這兩種不同資訊的差異的根源發生在

RpcEndpointRef

是呼叫

ask

還是

send

上,在唯一的 Netty 實現上一路會經過

NettyRpcEnv

對應的

ask

send

方法,生成不同的訊息傳送到遠端。這也就是前面說的 Spark 原生的支援 ask 語義的意思。從熟悉的變成模型出發,可以把 ask 當成返回值不為 void 的函式或者 Pascal 中的 function,send 當成返回值為 void 的函式或者 Pascal 中的 procedure。

send 的語義是比較清楚的,關於 ask 的語義還有一個值得討論的點。我們知道 ask 會有一個返回值,這個返回值是真正有意義的返回值的佔位符 Future,而 Future 一般的處理方式在經過拼接和轉換之後終究是會有一個 onSuccess 或者 onFailure 的觸發動作,這個觸發在哪個執行緒上執行是很重要的。這涉及到我們在編寫 receive 函式的時候對非同步行為和同步策略的判斷。Spark 的實現類似於 Akka 中 AskPattern 引入 PromiseActorRef 的方式,生成一個 Promise 並在對應的返回收到時完成,這個 Promise 作為 ask 的返回值。相關的回撥邏輯發生在

NettyRpcEnv#askAbortable

中,可以看到,本地訊息中 Promise 的完成發生在傳送訊息的同一個執行緒上,而遠端訊息中 Promise 的完成一路探查到 TransportClient 和 TransportChannelHandler 可以發現完成在 Netty 的 channelRead0 上,也就是說,Spark 的 ask 返回的 Future,其完成的時間點並不一定和 RpcEndpoint 的主執行緒同步。這可能會導致在不加同步策略下的一些問題,例如透過 ask 詢問一個遠端節點的狀態和遠端節點主動 send 過來的狀態同時觸發狀態處理邏輯而導致競態條件。補充說明,Spark 的 RpcEndpoint 本身也可能併發的處理訊息,僅當它是 ThreadSafeRpcEndpoint 或 IsolatedRpcEndpoint 時才表現出類似於 Actor Model 下單執行緒 Actor 的行為。上面提到的 ask 導致競態條件的問題在 Akka 中也存在,這倒不算 BUG,只是在使用的時候需要注意採用合適的同步策略。

RpcEnv 與訊息分派模型

接下來我們看到更接近 RPC 實現的核心的程式碼。RpcEnv 是正確的處理 RpcEndpoint 存在和執行及其支援的網路環境的上下文,目前 Spark 中只有基於Netty 的實現。

對於服務端來說,RpcEnv 是支援 RpcEndpoint 正常執行的環境,排程執行緒處理訊息並負責 RpcEndpoint 的生命週期管理;對於客戶端來說,可以使用 RpcAddress 等方式從 RpcEnv 中獲取可用的本地或遠端的 RpcEndpointRef,這是一個 RpcEndpoint 的位置透明的引用或者叫控制代碼,可以透過呼叫它的 send 或 ask 方法來向 RpcEndpoint 傳送資訊。

對於訊息的分派,我們從訊息的入站和出站來看。

首先看出站,即本 RpcEnv 向 RpcEndpoint 傳送訊息。注意這裡如果是本地的 RpcEndpoint,會將訊息直接透過 Dispatcher 分派到本地的 RpcEndpoint 上,嚴格來說不算出站。如果是遠端的 RpcEndpoint,NettyRpcEnv 會透過 postToOutbox 方法,對於 ask 來的方法的回覆,構造的訊息來源 RpcEndpointRef 會帶有網路層的 client,因此是直接返回;而對於本地直接出站的訊息,則會根據接收者的地址放入 Outbox 的佇列中。一個地址對應著一個 Outbox,在 Outbox 中的訊息非同步的被取出併發送。

接著看入站,入站的訊息會統一先由 NettyRpcEnv 交給 Dispatcher,Dispatcher 在根據訊息的元資料分派到對應的處理 RpcEndpoint 上。Dispatcher 中每一個和 RpcEndpoint 一一對應的地址都會被關聯上一個 MessageLoop,類似於 EventLoop 它會負責處理發給 RpcEndpoint 的初步分派後的訊息。每個 RpcEndpoint 實際繫結的訊息處理觸發器是 Inbox,Inbox 相當於 Actor Model 中的 Mailbox,負責接收外部發到當前 RpcEndpoint 即 Actor 的訊息。DedicatedMessageLoop 只服務於一個 RpcEndpoint,因此它也只持有一個 Inbox,當訊息由 Dispatcher 發給 DedicatedMessageLoop 之後,它就轉發給唯一的 Inbox;SharedMessageLoop 可服務於多個 RpcEndpoint,所以它的內部有一個 RpcEndpoint 地址對應到 Inbox 的對映,收到 Dispatcher 初步分派後的訊息後它會再次進行分派發送到具體的 RpcEndpoint 中。這種 MessageLoop 的設計對應的是一般的 RpcEndpoint 和 IsolatedRpcEndpoint,主要是提供不同的同步保證和執行緒配置。

具體到 Inbox 的訊息就比較直接了,拋開狀態管理和異常管理不談,主要的內容就是一個同步的先進先出的佇列處理釋出進來的訊息,如上一節程式碼片段所貼,最終根據訊息的型別呼叫 RpcEndpoint 的不同方法。

Flink 的 RPC 實現

現在我們轉過頭來看 Flink 的 RPC 實現。總的來說 Flink 的 RPC 實現依然是基於 Akka 的,這一點與 Spark 基於 Netty 開發的一套不同。Flink 社群有去掉 Akka 依賴的計劃,但進度只是 FLINK-4346 把介面抽象出來的程度,其底層實現仍然是 Akka,並沒有解決一開始我們提到的使用 Akka 帶來的問題。

我們看到 FLINK-4346 描述的目標,先從整體上了解它的設計方向。

It should address the following issues:

- Add type safety to the sender and receiver of messages。 We want proper types methods to be called, rather than having generic message types and pattern matching everywhere。 This is similar to typed actors。

- Make the message receivers testable without involving actors, i。e。 the methods should be callable directly。 When used with other component, the receiver will be wrapped in an actor that calls the methods based on received messages。

- We want to keep the paradigm of single-threaded execution per “actor”

首先我們可以看到的是它仍然強調了 Actor Model 的核心之一,單執行緒的 Actor 訊息處理。其次,我們可以看到和 Spark 有兩個重要的不同點。

其一是不同於 Akka 的 testkit 套路,Flink 強調遠端呼叫和本地呼叫在程式設計模型上的統一性,從而可以在不引入 Actor 一套的情況下直接呼叫 Actor 的方法來進行測試。這一點實際上跟 RMI 是比較相似的,可以建立一個本地的物件除錯,需要訪問遠端物件的時候就建立一個遠端物件的引用。關於這個呼叫程式設計模型上的統一性,後面講到 RpcGateway 和 RpcEndpoint 以及反射呼叫的時候會看到細節,總的來說這一套類似於 Akka 社群已經放棄的 Typed Actors 實現 Actor Model 型別化的方案。

其二是型別化,上面我們提到的程式設計模型本身跟型別關係不大。Flink 為了更好的實現防禦性程式設計,期望在呼叫對應的遠端方法的時候能夠使用上型別系統的優勢來保證引數和返回值的型別匹配,其中主要是返回值的匹配和對應的 RpcGateway 不像無型別的 ActorRef 或 RpcEndpointRef 一樣難以判斷哪些訊息是合法的。不過由於 FLINK-4346 的歷史侷限性,它借鑑了當時 Typed Actors 的實現方案,這個方案後來被廢棄。

由於不需要像 Akka 或 Spark 那樣從 Netty 或者 Aeron 這樣的網路層框架重新搭建訊息分派系統,Flink 的討論主要集中在它復刻 Typed Actors 的程式碼和執行緒模型上。

RMI 式的型別化 RPC 實現

Flink 中的 RPC 實現主要在

flink-runtime

模組下的

org。apache。flink。runtime。rpc

包中。由於複用了 Akka 的基礎設施,它並不像 Spark 那樣直接依賴傳輸層的實現,也不需要自己的分派資訊。上次 Flink 的 PMC Chair Stephan Ewen 來北京,和他交流的時候確認了 Flink 只把 Akka 作為 RPC 底層來用,並沒有使用 Akka 豐富的監督等其他功能,並且在未來有去掉 Akka 依賴的計劃。

Flink 的 RPC 實現的主要抽象包括

ActorSystem 的封裝 RpcService

Actor 與 RpcEndpoint 兩層之間的膠合層 RpcServer

業務邏輯的載體 RpcEndpoint

RpcEndpoint 的位置透明的引用 RpcGateway

迷之執行緒模型輔助介面 MainThreadExecutable 和 MainThreadExecutor

可以看到,這個 Spark 和 Akka 基本一一對應的骨架是不一樣的,主要的矛盾點在 RpcServer 這一層上。這是因為相比於前兩者直接實現 Actor 或其等價物,Flink 的 RPC 實現是基於 Akka 的 Actor 實現了自己的 Actor 等價物 RpcEndpoint,這就導致模型的對應關係適配。

這個問題我們談到 RpcServer 的具體程式碼的時候再提。Flink 的程式碼不能像 Spark 那樣按照不同的型別來看,因為類的實現可能涉及到反射訪問另一個類,這種情況下按照功能點來閱讀程式碼會更好理解。

我們首先看到上面抽象的構造過程。最後的輔助介面放在下一節講,其他的抽象構造過程分別如下。

RpcServices 目前的唯一實現 AkkaRpcService 是 Akka 的 ActorSystem 的封裝,基本可以理解成 ActorSystem 的一個介面卡。所以其構造過程也比較簡單,就是將適配的物件引用儲存後返回。複雜的是由 RpcServices 構造的 RpcServer。

RpcServer 的構造有兩個觸發點。我們先看到連線遠端的 RpcEndpoint 時透過

RpcServices#connect

構造的 RpcServer。這個方法的兩個過載的區別只在於是否實現 fencing 的功能,即區分監聽同一地址的不同任期的 RpcEndpoint。由於 Flink 的 JobManager 等 RpcEndpoint 會透過主節點選舉選出主節點,監聽同一個地址的可能是節點的不同任期,而上一個任期的請求的回覆應該被過濾掉以免影響當前任期的節點狀態。這點先簡單帶過,我們看到 connect 除此以外的共同部分,摘要如下。

private

<

C

extends

RpcGateway

>

CompletableFuture

<

C

>

connectInternal

String

address

Class

<

C

>

clazz

Function

<

ActorRef

InvocationHandler

>

invocationHandlerFactory

{

// 。。。

final

ActorSelection

actorSel

=

actorSystem

actorSelection

address

);

final

Future

<

ActorIdentity

>

identify

=

Patterns

ask

actorSel

new

Identify

42

),

configuration

getTimeout

()。

toMilliseconds

())

mapTo

ClassTag

apply

ActorIdentity

class

));

final

CompletableFuture

<

ActorRef

>

actorRefFuture

=

FutureUtils

toJava

identify

)。

thenApply

ActorIdentity

actorIdentity

->

{

if

actorIdentity

getRef

()

!=

null

{

return

actorIdentity

getRef

();

}

else

。。。

});

final

CompletableFuture

<

HandshakeSuccessMessage

>

handshakeFuture

=

actorRefFuture

thenCompose

ActorRef

actorRef

->

FutureUtils

toJava

Patterns

ask

actorRef

new

RemoteHandshakeMessage

clazz

getVersion

()),

configuration

getTimeout

()。

toMilliseconds

())

mapTo

ClassTag

apply

HandshakeSuccessMessage

class

))));

return

actorRefFuture

thenCombineAsync

handshakeFuture

actorRef

ignored

->

{

InvocationHandler

invocationHandler

=

invocationHandlerFactory

apply

actorRef

);

ClassLoader

classLoader

=

getClass

()。

getClassLoader

();

C

proxy

=

C

Proxy

newProxyInstance

classLoader

new

Class

<?>[]{

clazz

},

invocationHandler

);

return

proxy

},

actorSystem

dispatcher

());

}

連線的過程主要分成三個階段,第一個階段是透過 ActorSelection 和 Identify 找到和地址字串對應的遠端 Actor 的引用,接著傳送握手訊息確保遠端的 Actor 正常工作,隨後將這個 ActorRef 打包為一個 InvocationHandler 並轉換為對應型別的代理後返回。這裡前兩個階段都是 Akka 的基本操作,這裡重點介紹一下最後一個階段,並說明它就是所謂的 RMI 式的 RPC 實現。

InvocationHandler 本身是 Java 內建的介面,其定義如下。

public

interface

InvocationHandler

{

public

Object

invoke

Object

proxy

Method

method

Object

[]

args

throws

Throwable

}

這個介面是給 Java 內建的代理功能使用的,invoke 方法的三個引數分別代表方法的接收方、方法引用和引數列表,或者我們用引數名簡單的對應到方法呼叫,那就是

proxy。method(args)

這樣的形式。我們看到穿件 Proxy 那一行的官方文件註釋。

public

class

Proxy

{

/**

* Returns an instance of a proxy class for the specified interfaces

* that dispatches method invocations to the specified invocation

* handler。

*

*

{@code Proxy。newProxyInstance} throws

* {@code IllegalArgumentException} for the same reasons that

* {@code Proxy。getProxyClass} does。

*/

public

static

Object

newProxyInstance

ClassLoader

loader

Class

<?>[]

interfaces

InvocationHandler

h

throws

IllegalArgumentException

{

// 。。。

}

}

可以看出,所謂的代理物件就是可以處理

interfaces

定義的型別能接受的呼叫,並把這些呼叫轉交給 InvocationHandler 來處理的物件。我們在呼叫 connect 方法時傳遞的 class 引數即是 RpcGateway 的一個子介面,而 RpcGateway 的子介面例如 JobMasterGateway 或 TaskExecutorGateway 則定義了 JobMaster 或 TaskExecutor 這個 RpcEndpoint 所能接受的呼叫。透過這種方法我們將產生一個跟特定的 RpcGateway 的子介面語義相同的物件,而這個物件所有的 InvocationHandler 在 Flink 中的實現恰好就是 RpcServer 的有效實現。RpcServer 本身也是 RpcGateway 的一個子介面。最後這一點其實沒有太多的理論支援,只是為了讓編譯透過和更好的處理

InvocationHandler#invoke

的邏輯所做的妥協,這也是 Flink 的 RPC 實現因為阻抗適配而帶來的理解難度其中之一。

我們說到,在 Flink 裡面,InvocationHandler 和 RpcServer 實際上指的是同一種東西,它們的實現只有兩個,AkkaInvocationHandler 和 FencedAkkaInvocationHandler,後者如前所述與 fencing 相關,不做過多分析。我們從前者觸發,主要的邏輯出現在排除了本身方法呼叫之後的實際代理工作

invokeRpc

方法上。這個方法的流程如下。

從所代理的方法的簽名中查出可能的 RpcTimeout 的引數位置並抽取引數,這個超時時間主要是為了 ask 功能提供的。

構造相應的 RpcInvocation 訊息,這個是 Flink 專用的唯一的 Actor 型別 AkkaRpcActor 能識別的訊息型別。根據 RpcGateway 或者說 RpcServer 的位置分別產生 LocalRpcInvocation 訊息或 RemoteRpcInvocation 訊息。兩者的主要區別在於是否支援序列化,因為只有發往遠端的訊息才需要考慮序列化相關的事項。這個訊息包含了方法呼叫的元資訊,即方法名,引數列表和引數型別列表。由於傳送的物件是確定的,就是和 RpcServer 儲存的 ActorRef 對應的物件,因此不需要指定方法接收者。

根據方法的返回值型別進行不同的處理。如果是 void 即無返回值,則進行 tell 後返回;如果是 CompletableFuture 則進行 ask 後轉換返回的 Future 的型別後返回 Future;如果是其他非 CompletableFuture 的返回值,則類似於前者,但是阻塞在 Future 上等待取得返回值後返回。

可以看到,在這裡,方法呼叫被轉換成了 RpcInvocation 這樣的方法呼叫元資訊,在遠端接受到這些資訊後透過反射進行呼叫,具體可以參照

AkkaRpcActor#handleRpcInvocation

的內容。從外表上看,開發者拿到 RpcGateway 的代理物件後,就像操作它們的子介面,例如 JobMasterGateway 一樣,呼叫其介面,例如 registerTaskManager 或 offerSlot 等。而實際的操作經過 AkkaInvocationHandler 的解釋變成發到一個 RpcEndpoint 的訊息,這個過程與 RMI 是異曲同工的。

RpcServer 的另一個構造點是

RpcServices#startServer

,在唯一的實現 AkkaRpcServices 裡它會根據傳入的 RpcEndpoint,解析這個具體的 RpcEndpoint 子類的物件實現的介面,透過

ActorSystem#actorOf

建立對應的 AkkaRpcActor 並拿到 ActorRef 後同 connect 最後階段一樣構造出 RpcServer 的代理物件。這個代理物件由於實現了代理 RpcEndpoint 的方法的邏輯,也即它所實現的 RpcGateway 的邏輯,所以在

RpcEndpoint#getSelfGateway

的時候也可以被強轉成對應的 RpcGateway 來返回。

RpcEndpoint 的構造就比較簡單了,是直接的呼叫建構函式的構造,其基礎建構函式如下。

protected

RpcEndpoint

final

RpcService

rpcService

final

String

endpointId

{

this

rpcService

=

checkNotNull

rpcService

“rpcService”

);

this

endpointId

=

checkNotNull

endpointId

“endpointId”

);

this

rpcServer

=

rpcService

startServer

this

);

this

mainThreadExecutor

=

new

MainThreadExecutor

rpcServer

this

::

validateRunsInMainThread

);

}

可以看到就是在這裡發起對

RpcService#startServer

的呼叫。

總的來說,Flink 的 RPC 實現概念混亂,試圖實現 Actor Model 但是概念對應上由於其本質上是在 Actor Model 上糊了一層 Actor Model 但是又複用了底層的 ActorSystem 而導致說不清道不明,阻抗失配。同時,在程式設計上依賴巧合,例如代理同時是 RpcServer 又是 RpcGateway 的子介面,依賴反射,以及下一節中會講到的同一功能多種暴露手段。可以說是一個勉強能用但是擴充套件困難,出現問題難以排查而且效能絕非最優的實現。

MainThreadExecutable 與執行緒模型

上一節中提到 Flink 的 RPC 實現出現了同一功能的多種暴露手段,也出現了【迷之執行緒模型輔助介面 MainThreadExecutable 和 MainThreadExecutor】這樣的字眼。這一節就展開的介紹下 Flink RPC 的執行緒模型。

首先,Flink 的 RPC 實現是基於 Akka 的,所以 Akka 的 Dispatcher 以及上層的 tell 和 ask 的執行緒模型是一樣的。這裡主要神奇的是 MainThreadExecutable 這個介面。我們先看到它的定義。

public

interface

MainThreadExecutable

{

void

runAsync

Runnable

runnable

);

<

V

>

CompletableFuture

<

V

>

callAsync

Callable

<

V

>

callable

Time

callTimeout

);

void

scheduleRunAsync

Runnable

runnable

long

delay

);

}

同上,這裡有一個 Fenced 的子介面,但是主要與 fencing 相關,不做展開。我們看到這個介面的方法,猜想是我們可以將一個 Runnable 或者 Callable 交給一個此介面的實現去非同步地執行。實際情況確實有點像,我們看到它的實現,欸,唯一的實現居然是老朋友,概念糅合的集中點,AkkaInvocationHandler 這個類。真巧。

它的實現除去一些邊界條件和檢查程式碼概要如下。

@Override

public

void

runAsync

Runnable

runnable

{

scheduleRunAsync

runnable

0L

);

}

@Override

public

void

scheduleRunAsync

Runnable

runnable

long

delayMillis

{

if

isLocal

{

long

atTimeNanos

=

delayMillis

==

0

0

System

nanoTime

()

+

delayMillis

*

1_000_000

);

tell

new

RunAsync

runnable

atTimeNanos

));

}

else

{

throw

new

RuntimeException

(。。。);

}

}

@Override

public

<

V

>

CompletableFuture

<

V

>

callAsync

Callable

<

V

>

callable

Time

callTimeout

{

if

isLocal

{

return

CompletableFuture

<

V

>)

ask

new

CallAsync

callable

),

callTimeout

);

}

else

{

throw

new

RuntimeException

(。。。);

}

}

至於 RunAsync 和 CallAsync 的處理邏輯則存在於 AkkaRpcActor 中,簡單地說,當 AkkaRpcActor 收到這個訊息時,如果是 schedule 且未到時間就會排程到 Dispatcher 執行緒中等待,否則立即執行,對於 callAsync 也就是 ask,還會透過 tell 返回結果。再結合上面的程式碼,我們發現,喔,原來這個方法呼叫只能在 local 的情況下使用,而且進一步看其上層在 RpcEndpoint 處暴露的介面是 protected 的。在實際應用的時候,callAsync 和 scheduleRunAsync 基本沒人用,runAsync 則用的不少,如果你熟悉 Akka 的話,你會發現這基本上和

self() ! Msg

沒有太大的差別。

那麼為什麼 Flink 要引入這個介面呢?下面我從 Flink 的 RPC 實現三處和執行緒模型有關的介面來對比 Flink 的實現,其中有一處就是這裡的 runAsync 等。

另一處是 getSelfGateway 方法。我們剛才說,runAsync 基本上就是

self() ! Msg

,那麼 getSelfGateway 不就是這裡的

self()

嗎?嗯,確實是的。Flink 的 getSelfGateway 方法主要用於測試的時候測試程式碼拿到一個 RpcServer 或者你直接管他叫 ActorRef 來進行訊息傳送或者適配對應的型別簽名,另一個主要的作用則是在本地不同的 Actor 之間傳遞 ActorRef,遠端我們有

RpcService#connect

方法來搞定,本地原則上也可以這麼搞,不過 getSelfGateway 看起來更方便一點,也減少了 ActorSelection 和後續確認和握手的來回訊息傳遞。那麼 runAsync 和這個玩意到底有什麼區別呢?答,透過 RMI 的方法進行呼叫,方法必須擁有姓名,而 runAsync 可以神奇的傳遞 Runnable 而使得你不需要去改 RpcGateway 的介面就可以給自己發訊息。底層實際上就是一個通用的 RunAsync 訊息。著實神奇。如果你的 runAsync 的內容是呼叫一個 RpcGateway 上註冊的方法,那麼你實際上也可以寫成 getSelfGateway 然後透過點語法呼叫這個方法。

再另一處是

RpcEndpoint#getMainThreadExecutor

RpcEndpont#getRpcService。getExecutor()

這兩個通常在拼接 CompletableFuture 的 Async 系列方法時作為 Executor 傳入。其中前一個實際上就是上面提到的 runAsync 的又一層包裝,當你將它作為 Executor 傳入的時候,我們看看它的 execute 和 schedule 方法是怎麼寫的。

private

final

MainThreadExecutable

gateway

public

void

runAsync

Runnable

runnable

{

gateway

runAsync

runnable

);

}

public

void

scheduleRunAsync

Runnable

runnable

long

delayMillis

{

gateway

scheduleRunAsync

runnable

delayMillis

);

}

@Override

public

void

execute

@Nonnull

Runnable

command

{

runAsync

command

);

}

@Override

public

ScheduledFuture

<?>

schedule

Runnable

command

long

delay

TimeUnit

unit

{

final

long

delayMillis

=

TimeUnit

MILLISECONDS

convert

delay

unit

);

FutureTask

<

Void

>

ft

=

new

FutureTask

<>(

command

null

);

scheduleRunAsync

ft

delayMillis

);

return

new

ScheduledFutureAdapter

<>(

ft

delayMillis

TimeUnit

MILLISECONDS

);

}

所以其實這就是在外面像在裡面一樣呼叫 runAsync 的一層介面。

後者則是直接拿到 Akka ActorSystem 中的 Dispatcher 作為 Executor,將動作釋出到 ActorSystem 的 Dispatcher 中去排程執行。這裡有一點要特別注意的是前面提到的 MainThreadExecutor,它執行時的執行緒就是 Dispatcher 當中的一個哦。所以如果你把兩個競爭的呼叫,一個放在 MainThreadExecutor 裡跑,一個放在 ActorSystem Dispatcher 裡跑,那麼就有可能會出現死鎖,而且這個死鎖不是必現的,甚至執行順序關係都不是引發死鎖決定性的因素,首先看你釋出到 Dispatcher 的作業是否被分派到 MainThreadExecutor 那個執行緒上。由於 Flink 把這兩個 Executor 直接暴露出來,並且非常方便獲得,兩者很容易誤用,所以相關的併發錯誤在歷史的程序中,尤其是 Dispatcher 和 JobMaster 那一塊,綜合上其他原因,發生並且被修復過很多次。

Flink 的 RPC 實現把一個簡單的 tell 和 ask 的模型,先是搞成 RMI 導致效能堪憂並且實現高度依賴難以理解、維護和擴充套件的反射,再是暴露出多個功能重複的介面,活生生的把 Java 寫成了 Perl 的模樣,降低了開發者犯錯的難度,增加了開發者犯錯的機率。雖然我個人很喜歡 Perl,但是在一個幾百萬行的大型專案裡混進一塊 Perl 風格的程式碼,恐怕還是敬謝不敏。不過另一個角度說,Flink 能把事情搞得這麼複雜,但是系統還算能正常的工作,也算 Java 作為一門開發語言的魅力所在。

閱讀程式碼的技巧簡述

上面就是我閱讀 Spark 和 Flink 的 RPC 實現程式碼的過程和思考。由於評論性的文字已經內聯在閱讀的過程中,而且這點篇幅其實還遠遠沒有展開一些有趣的或者關鍵的技術細節,這裡就不再做評述。

回到最初的問題,不少同學來問我程式碼怎麼閱讀。其實程式碼本身是人的思維的具象化的一種表現,閱讀程式碼不應該只是去讀程式碼本身,一行一行的看它的執行路徑,這樣與機器何異?閱讀程式碼首先應該思考的是所要閱讀的程式碼解決了什麼問題,這個問題為什麼存在,它的現有解法和一般解法是什麼。瞭解到這些基本資訊之後,在閱讀程式碼的過程中,對於同質化的部分就略讀或者通讀過去,對於配置和錯誤處理和邊界情況掃讀,重要的邊界情況再單獨看看。主要精力集中在差異化的部分,對比差異化的部分的考量點,分清孰優孰劣,或者在軟體開發的過程中,通常沒有一方完全好過另一方,有的只是權衡(trade off)。例如在上面的內容中,好像我把 Flink 說得一無是處,那主要是因為我的工作跟它相關,每天深受這些坑折磨,實際上型別化是一件非常有意義的事,Untyped Akka 和 Spark 當你面對一堆只能靠名字來猜他背後是啥的 Ref 的時候,其實你也是在依賴命名約束或者叫命名巧合來程式設計。

另外,這裡推薦一篇介紹 Spark RPC 的文章,它與本文的不同除了範圍以外,主要是更加偏重實踐,有作者本人的腳手架倉庫可以實驗,並且做了時序圖和 UML 類圖。

圖表是非常好的表意手段,寫作本文時我原本想引用一個 Actor Model 的概念圖,但是一時找不到了。對於 Spark 和 Flink 的 RPC 實現,一張粗略的類責任鏈和所屬關係圖也會一圖勝千言。不過時序圖和 UML 類圖恐怕還是太古板和複雜了,就算畫得出來,我也高度懷疑到底有誰沒事盯著那玩意看。粗略的類圖是可以的,UML 類圖容易關注點失焦;時序影象我在上面分析執行緒模型和同步策略的時候,對應的時序關係是要分析的。但是事無鉅細的時序圖恐怕沒有必要,畢竟客戶端到服務端大體就是那麼回事。如前所述,我們閱讀程式碼的時候,主要是要關注差一點。事無鉅細的 UML 類圖和時序圖太容易把一些瑣碎的細節也列上去了。

有實驗環境來測試嘛,當然是最好的。在閱讀 RPC 實現的過程中,雖然我沒有把相關的邏輯抽出來做實驗,但是測試覆蓋率高的專案,其單元測試和可執行的 example 本身就是良好的實驗場地。單元測試可能是我見過的最優秀的理解一塊程式碼意圖的方式之一了。

總的來說,本文展示的是在初步瞭解一個方向的程式碼編寫常識後,針對某一功能點進行主題閱讀和對比整理的過程。在一開始閱讀程式碼的時候,可以先針對某個特定的實現,先把它的邏輯理順,等到對問題的抽象和解決方案的抽象有一定的感覺之後再進行對比閱讀,有的放矢,快速沉澱總結。