作者:周慧婷
寫在前面
系統開發的過程中,我們經常需要實現訊息推送的需求。單端單例項的情況下很好處理(網上有許多教程這裡不做展開),但在分散式系統及多端需要推送的情況下該如何處理呢?
在分散式系統中,訊息推送服務端是多例項的。某系統中一個服務生成一條訊息,這條訊息需要實時推送到多個終端,此時該如何進行有效的 WebSocket 推送呢?首先一起看看如下場景:
假設推送訊息由訊息例項 2 產生,但是終端真正連線的訊息例項是例項 1 和例項 3,並沒有連線到生產訊息的例項 2,系統是如何將例項 2 的訊息同步推送到終端 1 和終端 2 的呢?下文將詳細描述。
基本原理
為了滿足需求我們採用 redis 做協同中介軟體,用於儲存使用者資訊、生成使用者連線的唯一性標識以及 pod address,訊息的生產者例項透過訂閱 redis 獲取終端連線的唯一性標識和 pod address,並通知到對應的訊息例項,最終由相應連線終端的訊息例項透過 WebSocket 將訊息發推送到使用者終端。具體流程如下圖:
服務端實現
Client
Client 元件的作用,是當用戶與訊息服務中某個例項建立連線後,管理這個連線的資訊,這裡透過一個 Golang 結構體來定義:
type
Client
struct
{
UUID
string
UserID
string
Socket
*
websocket
。
Conn
Send
chan
[]
byte
}
結構體中的資料型別說明如下:
UUID:對連線進行唯一性的標識,透過此標識可以查詢到連線資訊。
UserID:使用者 ID。
Socket:連線物件。
Send:訊息資料 channel。
我們為 Client 結構體實現了兩個方法:Read、Write 來處理訊息的接受和傳送。
Read 方法
Read 方法比較簡單,從終端接收請求訊息後,訊息例項透過 WebSocket 迴應接收訊息狀態,並不返回請求結果。結果透過 Write 方法返回。
func (c *Client) Read(close, renewal chan *Client) {
defer func() {
close <- c
}()
for {
_, message, err := c。Socket。ReadMessage()
if err != nil {
break
}
// 。。。
// message logic
}
}
Write 方法
Write 方法將請求結果返回給終端。Client 會監聽 send channel,當 channel 有資料時,透過 socket 連線將訊息傳送給終端。
func (c *Client) Write(close chan *Client) {
for {
select {
case message, ok := <-c。Send:
if !ok {
return
}
c。Socket。WriteMessage(websocket。TextMessage, message)
case <-c。Ctx。Done():
return
}
}
}
ClientManger
ClientManager 元件相當於連線池,可以管理所有的終端連線,並提供註冊、登出、續期功能。
type ClientManager struct {
sync。RWMutex
Clients map[string]*Client
Register chan *Client
Unregister chan *Client
Renewal chan *Client
}
結構體的資料型別說明如下:
Clients:是一個集合,用於儲存建立的 Client 物件。
Register:註冊的 channel。
把連線註冊到 Clients 中,並透過 key-value 加入 Client 集合中,key 是連線的唯一性標識 ,value 是連線本身。
把連線的唯一性標識和使用者的 ID 以及建立連線的 pod address 資訊,儲存到 redis 中。
Unregister:登出的 channel。
從 ClientManager 元件的 Clients 集合中移除連線物件。
刪除 redis 對應的快取資訊。
Renewal:續期的 channel,用於對 redis 的鍵續期。
ClientManager 只提供了一個 Start 方法,Start 方法提供監聽註冊、登出以及續期的 channel,透過監聽這些 channel 來管理建立的連線物件。當這些 channel 有資料時,執行對應的操作。
func (manager *ClientManager) Start(ctx context。Context) {
for {
select {
case conn := <-manager。Register:
manager。Lock()
manager。Clients[conn。UUID] = conn
manager。Unlock()
_, err := manager。affair。Register(ctx, &RegisterReq{
UserID: conn。UserID,
UUID: conn。UUID,
IP: manager。IP,
})
case conn := <-manager。Unregister:
_, err := manager。affair。Unregister(ctx, &UnregisterReq{
UserID: conn。UserID,
UUID: conn。UUID,
})
conn。Socket。Close()
close(conn。Send)
delete(manager。Clients, conn。UUID)
case conn := <-manager。Renewal:
//。。。
// Key renewal to redis
}
}
}
訊息推送
當一個訊息服務例項生產使用者的訊息,需要推送訊息給終端時,推送步驟如下:
根據 userID 從 redis 讀取資料,得到連線唯一性標識和 pod address 地址,這些資訊是在終端第一次與服務端建立連線的時候寫入 redis 的。
此時根據 pod address,向對應的伺服器傳送請求。
相應的訊息服務例項接收到請求。
服務端接收請求的處理邏輯如下:
根據傳遞過來連線唯一性標識的引數,找到標識對應的連線。我們為 ClientManager 提供了一個 Write 方法。
func (manager *ClientManager) Write(message *Message) error {
manager。RLock()
client, ok := manager。Clients[message。Recipient]
manager。RUnlock()
if !ok {
return errors。New(“client miss [” + message。Recipient + “]”)
}
return client。SendOut(message)
}
此方法用到 ClientManager 元件的 Clients 集合,根據唯一性標識找到對應的 Client。再利用 Client 的 SendOut 方法,寫出資料到終端。
2。 定義 Client 的 SendOut 方法。此方法只負責:把接收到的訊息轉換為位元組陣列後,傳送 Client 的 Send Channel 中。
func (c *Client) SendOut(message *Message) error {
content, err := json。Marshal(message。Content)
if err != nil {
return err
}
c。Send <- content
return nil
}
3。 傳送資料給終端。在前文介紹 Client 元件中,已說明 Client 元件的 send channel 有資料時,會讀取 channel 產生的資料,透過連線物件傳送給對應的終端。
總結
以上是 Web Socket 推送訊息給終端的主要思路:透過 redis 把使用者的資訊以及連線的標識和 pod address 儲存起來,當某個訊息服務例項產生訊息,從 redis 讀取資訊,通知連線著終端的訊息服務例項,再由這些服務例項透過 WebSocket 物件給終端傳送訊息。全象雲低程式碼平臺也集成了訊息的實時推送,使用者使用平臺時能及時獲取最新訊息狀態。
下期我們將為大家帶來 Knative Serving 自定義彈性伸縮,請大家持續關注。
關於全象雲
全象雲平臺(
https://
portal。clouden。io
)是青雲科技自主研發的低程式碼平臺,是基於雲原生、用於輔助構建企業各類數字化應用的工具和整合平臺。
平臺目前提供雲上無程式碼和低程式碼兩種應用開發模式,遮蔽了技術的複雜度。支援視覺化設計器,讓開發人員和業務使用者能夠透過簡單的拖拽、引數配置等方式快速完成應用開發。同時集成了 IDaaS 身份認證能力、容器 DevOps 能力,支援企業存量業務與全象雲業務融合。平臺還包含豐富的開發介面和強大的外掛機制,開發者可根據需要不斷拓展平臺的應用能力。
全象雲的願景是:在企業生產經營的各個象限、各個環節提供軟體構件或支援服務。