思路

利用 WebSocket 協議讓客戶端和伺服器端保持有狀態的長連結,儲存連結上來的客戶端 id。訂閱釋出者釋出的訊息針對已儲存的客戶端 id 進行廣播訊息。

我的官方群點選此處。

WebSocket 服務

composer require hyperf/websocket-server

配置檔案 [config/autoload/server。php]

<?php

return [

‘mode’ => SWOOLE_PROCESS,

‘servers’ => [

‘name’ => ‘http’,

‘type’ => Server::SERVER_HTTP,

‘host’ => ‘0。0。0。0’,

‘port’ => 11111,

‘sock_type’ => SWOOLE_SOCK_TCP,

‘callbacks’ => [

SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, ‘onRequest’],

],

],

‘name’ => ‘ws’,

‘type’ => Server::SERVER_WEBSOCKET,

‘host’ => ‘0。0。0。0’,

‘port’ => 12222,

‘sock_type’ => SWOOLE_SOCK_TCP,

‘callbacks’ => [

SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, ‘onHandShake’],

SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, ‘onMessage’],

SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, ‘onClose’],

],

],

],

WebSocket 伺服器端程式碼示例

<?php

declare(strict_types=1);

/**

* This file is part of Hyperf。

*

* @link https://www。hyperf。io

* @document https://doc。hyperf。io

* @contact group@hyperf。io

* @license https://github。com/hyperf-cloud/hyperf/blob/master/LICENSE

*/

namespace App\Controller;

use Hyperf\Contract\OnCloseInterface;

use Hyperf\Contract\OnMessageInterface;

use Hyperf\Contract\OnOpenInterface;

use Swoole\Http\Request;

use Swoole\Server;

use Swoole\Websocket\Frame;

use Swoole\WebSocket\Server as WebSocketServer;

class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface

{

/**

* 傳送訊息

* @param WebSocketServer $server

* @param Frame $frame

*/

public function onMessage(WebSocketServer $server, Frame $frame): void

{

//心跳重新整理快取

$redis = $this->container->get(\Redis::class);

//獲取所有的客戶端id

$fdList = $redis->sMembers(‘websocket_sjd_1’);

//如果當前客戶端在客戶端集合中,就重新整理

if (in_array($frame->fd, $fdList)) {

$redis->sAdd(‘websocket_sjd_1’, $frame->fd);

$redis->expire(‘websocket_sjd_1’, 7200);

}

$server->push($frame->fd, ‘Recv: ’ 。 $frame->data);

}

/**

* 客戶端失去連結

* @param Server $server

* @param int $fd

* @param int $reactorId

*/

public function onClose(Server $server, int $fd, int $reactorId): void

{

//刪掉客戶端id

$redis = $this->container->get(\Redis::class);

//移除集合中指定的value

$redis->sRem(‘websocket_sjd_1’, $fd);

var_dump(‘closed’);

}

/**

* 客戶端連結

* @param WebSocketServer $server

* @param Request $request

*/

public function onOpen(WebSocketServer $server, Request $request): void

{

//儲存客戶端id

$redis = $this->container->get(\Redis::class);

$res1 = $redis->sAdd(‘websocket_sjd_1’, $request->fd);

var_dump($res1);

$res = $redis->expire(‘websocket_sjd_1’, 7200);

var_dump($res);

$server->push($request->fd, ‘Opened’);

}

}

WebSocket 前端程式碼

function WebSocketTest() {

if (“WebSocket” in window) {

console。log(“您的瀏覽器支援 WebSocket!”);

var num = 0

// 開啟一個 web socket

var ws = new WebSocket(“ws://127。0。0。1:12222”);

ws。onopen = function () {

// Web Socket 已連線上,使用 send() 方法傳送資料

//alert(“資料傳送中。。。”);

//ws。send(“傳送資料”);

};

window。setInterval(function () { //每隔5秒鐘傳送一次心跳,避免websocket連線因超時而自動斷開

var ping = {“type”: “ping”};

ws。send(JSON。stringify(ping));

}, 5000);

ws。onmessage = function (evt) {

var d = JSON。parse(evt。data);

console。log(d);

if (d。code == 300) {

$(“。address”)。text(d。address)

}

if (d。code == 200) {

var v = d。data

console。log(v);

num++

var str = `

${v。recordOutTime}

${v。userOutName}

${v。userOutNum}

${v。doorOutName}

`

$(“。tableHead”)。after(str)

if (num > 7) {

num——

$(“。table 。item:nth-last-child(1)”)。remove()

}

}

};

ws。error = function (e) {

console。log(e)

alert(e)

}

ws。onclose = function () {

// 關閉 websocket

alert(“連線已關閉。。。”);

};

} else {

alert(“您的瀏覽器不支援 WebSocket!”);

}

}

AMQP 元件

composer require hyperf/amqp

配置檔案 [config/autoload/amqp。php]

<?php

return [

‘default’ => [

‘host’ => ‘localhost’,

‘port’ => 5672,

‘user’ => ‘guest’,

‘password’ => ‘guest’,

‘vhost’ => ‘/’,

‘pool’ => [

‘min_connections’ => 1,

‘max_connections’ => 10,

‘connect_timeout’ => 10。0,

‘wait_timeout’ => 3。0,

‘heartbeat’ => -1,

],

‘params’ => [

‘insist’ => false,

‘login_method’ => ‘AMQPLAIN’,

‘login_response’ => null,

‘locale’ => ‘en_US’,

‘connection_timeout’ => 3。0,

‘read_write_timeout’ => 6。0,

‘context’ => null,

‘keepalive’ => false,

‘heartbeat’ => 3,

],

],

];

MQ 消費者程式碼

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Annotation\Consumer;

use Hyperf\Amqp\Message\ConsumerMessage;

use Hyperf\Amqp\Result;

use Hyperf\Server\Server;

use Hyperf\Server\ServerFactory;

/**

* @Consumer(exchange=“hyperf”, routingKey=“hyperf”, queue=“hyperf”, nums=1)

*/

class DemoConsumer extends ConsumerMessage

{

/**

* rabbmitMQ消費端程式碼

* @param $data

* @return string

*/

public function consume($data): string

{

print_r($data);

//獲取集合中所有的value

$redis = $this->container->get(\Redis::class);

$fdList=$redis->sMembers(‘websocket_sjd_1’);

$server=$this->container->get(ServerFactory::class)->getServer()->getServer();

foreach($fdList as $key=>$v){

if(!empty($v)){

$server->push((int)$v, $data);

}

}

return Result::ACK;

}

}

控制器程式碼

/**

* test

* @return array

*/

public function test()

{

$data = array(

‘code’ => 200,

‘data’ => [

‘userOutName’ => ‘ccflow’,

‘userOutNum’ => ‘9999’,

‘recordOutTime’ => date(“Y-m-d H:i:s”, time()),

‘doorOutName’ => ‘教師公寓’,

);

$data = \GuzzleHttp\json_encode($data);

$message = new DemoProducer($data);

$producer = ApplicationContext::getContainer()->get(Producer::class);

$result = $producer->produce($message);

var_dump($result);

$user = $this->request->input(‘user’, ‘Hyperf’);

$method = $this->request->getMethod();

return [

‘method’ => $method,

‘message’ => “{$user}。”,

];

}

最終效果

基於 Hyperf 實現 RabbitMQ + WebSocket 訊息推送

更多學習內容請訪問:

以上內容希望幫助到大家,

很多PHPer在進階的時候總會遇到一些問題和瓶頸,業務程式碼寫多了沒有方向感,不知道該從那裡入手去提升,對此我整理了一些資料,包括但不限於:分散式架構、高可擴充套件、高效能、高併發、伺服器效能調優、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql最佳化、shell指令碼、Docker、微服務、Nginx等多個知識點高階進階乾貨需要的可以免費分享給大家

,需要的可以加入

我的官方群點選此處。