作者:周慧婷

寫在前面

系統開發的過程中,我們經常需要實現訊息推送的需求。單端單例項的情況下很好處理(網上有許多教程這裡不做展開),但在分散式系統及多端需要推送的情況下該如何處理呢?

在分散式系統中,訊息推送服務端是多例項的。某系統中一個服務生成一條訊息,這條訊息需要實時推送到多個終端,此時該如何進行有效的 WebSocket 推送呢?首先一起看看如下場景:

假設推送訊息由訊息例項 2 產生,但是終端真正連線的訊息例項是例項 1 和例項 3,並沒有連線到生產訊息的例項 2,系統是如何將例項 2 的訊息同步推送到終端 1 和終端 2 的呢?下文將詳細描述。

Go 語言實現 WebSocket 推送

基本原理

為了滿足需求我們採用 redis 做協同中介軟體,用於儲存使用者資訊、生成使用者連線的唯一性標識以及 pod address,訊息的生產者例項透過訂閱 redis 獲取終端連線的唯一性標識和 pod address,並通知到對應的訊息例項,最終由相應連線終端的訊息例項透過 WebSocket 將訊息發推送到使用者終端。具體流程如下圖:

Go 語言實現 WebSocket 推送

服務端實現

Client

Client 元件的作用,是當用戶與訊息服務中某個例項建立連線後,管理這個連線的資訊,這裡透過一個 Golang 結構體來定義:

type

Client

struct

{

UUID

string

UserID

string

Socket

*

websocket

Conn

Send

chan

[]

byte

}

結構體中的資料型別說明如下:

UUID:對連線進行唯一性的標識,透過此標識可以查詢到連線資訊。

UserID:使用者 ID。

Socket:連線物件。

Send:訊息資料 channel。

我們為 Client 結構體實現了兩個方法:Read、Write 來處理訊息的接受和傳送。

Read 方法

Read 方法比較簡單,從終端接收請求訊息後,訊息例項透過 WebSocket 迴應接收訊息狀態,並不返回請求結果。結果透過 Write 方法返回。

func (c *Client) Read(close, renewal chan *Client) {

defer func() {

close <- c

}()

for {

_, message, err := c。Socket。ReadMessage()

if err != nil {

break

}

// 。。。

// message logic

}

}

Write 方法

Write 方法將請求結果返回給終端。Client 會監聽 send channel,當 channel 有資料時,透過 socket 連線將訊息傳送給終端。

func (c *Client) Write(close chan *Client) {

for {

select {

case message, ok := <-c。Send:

if !ok {

return

}

c。Socket。WriteMessage(websocket。TextMessage, message)

case <-c。Ctx。Done():

return

}

}

}

ClientManger

ClientManager 元件相當於連線池,可以管理所有的終端連線,並提供註冊、登出、續期功能。

type ClientManager struct {

sync。RWMutex

Clients map[string]*Client

Register chan *Client

Unregister chan *Client

Renewal chan *Client

}

結構體的資料型別說明如下:

Clients:是一個集合,用於儲存建立的 Client 物件。

Register:註冊的 channel。

把連線註冊到 Clients 中,並透過 key-value 加入 Client 集合中,key 是連線的唯一性標識 ,value 是連線本身。

把連線的唯一性標識和使用者的 ID 以及建立連線的 pod address 資訊,儲存到 redis 中。

Unregister:登出的 channel。

從 ClientManager 元件的 Clients 集合中移除連線物件。

刪除 redis 對應的快取資訊。

Renewal:續期的 channel,用於對 redis 的鍵續期。

ClientManager 只提供了一個 Start 方法,Start 方法提供監聽註冊、登出以及續期的 channel,透過監聽這些 channel 來管理建立的連線物件。當這些 channel 有資料時,執行對應的操作。

func (manager *ClientManager) Start(ctx context。Context) {

for {

select {

case conn := <-manager。Register:

manager。Lock()

manager。Clients[conn。UUID] = conn

manager。Unlock()

_, err := manager。affair。Register(ctx, &RegisterReq{

UserID: conn。UserID,

UUID: conn。UUID,

IP: manager。IP,

})

case conn := <-manager。Unregister:

_, err := manager。affair。Unregister(ctx, &UnregisterReq{

UserID: conn。UserID,

UUID: conn。UUID,

})

conn。Socket。Close()

close(conn。Send)

delete(manager。Clients, conn。UUID)

case conn := <-manager。Renewal:

//。。。

// Key renewal to redis

}

}

}

訊息推送

當一個訊息服務例項生產使用者的訊息,需要推送訊息給終端時,推送步驟如下:

根據 userID 從 redis 讀取資料,得到連線唯一性標識和 pod address 地址,這些資訊是在終端第一次與服務端建立連線的時候寫入 redis 的。

此時根據 pod address,向對應的伺服器傳送請求。

相應的訊息服務例項接收到請求。

服務端接收請求的處理邏輯如下:

根據傳遞過來連線唯一性標識的引數,找到標識對應的連線。我們為 ClientManager 提供了一個 Write 方法。

func (manager *ClientManager) Write(message *Message) error {

manager。RLock()

client, ok := manager。Clients[message。Recipient]

manager。RUnlock()

if !ok {

return errors。New(“client miss [” + message。Recipient + “]”)

}

return client。SendOut(message)

}

此方法用到 ClientManager 元件的 Clients 集合,根據唯一性標識找到對應的 Client。再利用 Client 的 SendOut 方法,寫出資料到終端。

2。 定義 Client 的 SendOut 方法。此方法只負責:把接收到的訊息轉換為位元組陣列後,傳送 Client 的 Send Channel 中。

func (c *Client) SendOut(message *Message) error {

content, err := json。Marshal(message。Content)

if err != nil {

return err

}

c。Send <- content

return nil

}

3。 傳送資料給終端。在前文介紹 Client 元件中,已說明 Client 元件的 send channel 有資料時,會讀取 channel 產生的資料,透過連線物件傳送給對應的終端。

總結

以上是 Web Socket 推送訊息給終端的主要思路:透過 redis 把使用者的資訊以及連線的標識和 pod address 儲存起來,當某個訊息服務例項產生訊息,從 redis 讀取資訊,通知連線著終端的訊息服務例項,再由這些服務例項透過 WebSocket 物件給終端傳送訊息。全象雲低程式碼平臺也集成了訊息的實時推送,使用者使用平臺時能及時獲取最新訊息狀態。

下期我們將為大家帶來 Knative Serving 自定義彈性伸縮,請大家持續關注。

關於全象雲

全象雲平臺(

https://

portal。clouden。io

)是青雲科技自主研發的低程式碼平臺,是基於雲原生、用於輔助構建企業各類數字化應用的工具和整合平臺。

平臺目前提供雲上無程式碼和低程式碼兩種應用開發模式,遮蔽了技術的複雜度。支援視覺化設計器,讓開發人員和業務使用者能夠透過簡單的拖拽、引數配置等方式快速完成應用開發。同時集成了 IDaaS 身份認證能力、容器 DevOps 能力,支援企業存量業務與全象雲業務融合。平臺還包含豐富的開發介面和強大的外掛機制,開發者可根據需要不斷拓展平臺的應用能力。

全象雲的願景是:在企業生產經營的各個象限、各個環節提供軟體構件或支援服務。