思路
利用 WebSocket 協議讓客戶端和伺服器端保持有狀態的長連結,儲存連結上來的客戶端 id。訂閱釋出者釋出的訊息針對已儲存的客戶端 id 進行廣播訊息。
我的官方群點選此處。
WebSocket 服務
composer require hyperf/websocket-server
配置檔案 [config/autoload/server。php]
<?php
return [
‘mode’ => SWOOLE_PROCESS,
‘servers’ => [
[
‘name’ => ‘http’,
‘type’ => Server::SERVER_HTTP,
‘host’ => ‘0。0。0。0’,
‘port’ => 11111,
‘sock_type’ => SWOOLE_SOCK_TCP,
‘callbacks’ => [
SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, ‘onRequest’],
],
],
[
‘name’ => ‘ws’,
‘type’ => Server::SERVER_WEBSOCKET,
‘host’ => ‘0。0。0。0’,
‘port’ => 12222,
‘sock_type’ => SWOOLE_SOCK_TCP,
‘callbacks’ => [
SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, ‘onHandShake’],
SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, ‘onMessage’],
SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, ‘onClose’],
],
],
],
WebSocket 伺服器端程式碼示例
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf。
*
* @link https://www。hyperf。io
* @document https://doc。hyperf。io
* @contact group@hyperf。io
* @license https://github。com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace App\Controller;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
/**
* 傳送訊息
* @param WebSocketServer $server
* @param Frame $frame
*/
public function onMessage(WebSocketServer $server, Frame $frame): void
{
//心跳重新整理快取
$redis = $this->container->get(\Redis::class);
//獲取所有的客戶端id
$fdList = $redis->sMembers(‘websocket_sjd_1’);
//如果當前客戶端在客戶端集合中,就重新整理
if (in_array($frame->fd, $fdList)) {
$redis->sAdd(‘websocket_sjd_1’, $frame->fd);
$redis->expire(‘websocket_sjd_1’, 7200);
}
$server->push($frame->fd, ‘Recv: ’ 。 $frame->data);
}
/**
* 客戶端失去連結
* @param Server $server
* @param int $fd
* @param int $reactorId
*/
public function onClose(Server $server, int $fd, int $reactorId): void
{
//刪掉客戶端id
$redis = $this->container->get(\Redis::class);
//移除集合中指定的value
$redis->sRem(‘websocket_sjd_1’, $fd);
var_dump(‘closed’);
}
/**
* 客戶端連結
* @param WebSocketServer $server
* @param Request $request
*/
public function onOpen(WebSocketServer $server, Request $request): void
{
//儲存客戶端id
$redis = $this->container->get(\Redis::class);
$res1 = $redis->sAdd(‘websocket_sjd_1’, $request->fd);
var_dump($res1);
$res = $redis->expire(‘websocket_sjd_1’, 7200);
var_dump($res);
$server->push($request->fd, ‘Opened’);
}
}
WebSocket 前端程式碼
function WebSocketTest() {
if (“WebSocket” in window) {
console。log(“您的瀏覽器支援 WebSocket!”);
var num = 0
// 開啟一個 web socket
var ws = new WebSocket(“ws://127。0。0。1:12222”);
ws。onopen = function () {
// Web Socket 已連線上,使用 send() 方法傳送資料
//alert(“資料傳送中。。。”);
//ws。send(“傳送資料”);
};
window。setInterval(function () { //每隔5秒鐘傳送一次心跳,避免websocket連線因超時而自動斷開
var ping = {“type”: “ping”};
ws。send(JSON。stringify(ping));
}, 5000);
ws。onmessage = function (evt) {
var d = JSON。parse(evt。data);
console。log(d);
if (d。code == 300) {
$(“。address”)。text(d。address)
}
if (d。code == 200) {
var v = d。data
console。log(v);
num++
var str = `
${v。recordOutTime}
${v。userOutName}
${v。userOutNum}
${v。doorOutName}
$(“。tableHead”)。after(str)
if (num > 7) {
num——
$(“。table 。item:nth-last-child(1)”)。remove()
}
}
};
ws。error = function (e) {
console。log(e)
alert(e)
}
ws。onclose = function () {
// 關閉 websocket
alert(“連線已關閉。。。”);
};
} else {
alert(“您的瀏覽器不支援 WebSocket!”);
}
}
AMQP 元件
composer require hyperf/amqp
配置檔案 [config/autoload/amqp。php]
<?php
return [
‘default’ => [
‘host’ => ‘localhost’,
‘port’ => 5672,
‘user’ => ‘guest’,
‘password’ => ‘guest’,
‘vhost’ => ‘/’,
‘pool’ => [
‘min_connections’ => 1,
‘max_connections’ => 10,
‘connect_timeout’ => 10。0,
‘wait_timeout’ => 3。0,
‘heartbeat’ => -1,
],
‘params’ => [
‘insist’ => false,
‘login_method’ => ‘AMQPLAIN’,
‘login_response’ => null,
‘locale’ => ‘en_US’,
‘connection_timeout’ => 3。0,
‘read_write_timeout’ => 6。0,
‘context’ => null,
‘keepalive’ => false,
‘heartbeat’ => 3,
],
],
];
MQ 消費者程式碼
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;
/**
* @Consumer(exchange=“hyperf”, routingKey=“hyperf”, queue=“hyperf”, nums=1)
*/
class DemoConsumer extends ConsumerMessage
{
/**
* rabbmitMQ消費端程式碼
* @param $data
* @return string
*/
public function consume($data): string
{
print_r($data);
//獲取集合中所有的value
$redis = $this->container->get(\Redis::class);
$fdList=$redis->sMembers(‘websocket_sjd_1’);
$server=$this->container->get(ServerFactory::class)->getServer()->getServer();
foreach($fdList as $key=>$v){
if(!empty($v)){
$server->push((int)$v, $data);
}
}
return Result::ACK;
}
}
控制器程式碼
/**
* test
* @return array
*/
public function test()
{
$data = array(
‘code’ => 200,
‘data’ => [
‘userOutName’ => ‘ccflow’,
‘userOutNum’ => ‘9999’,
‘recordOutTime’ => date(“Y-m-d H:i:s”, time()),
‘doorOutName’ => ‘教師公寓’,
]
);
$data = \GuzzleHttp\json_encode($data);
$message = new DemoProducer($data);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$result = $producer->produce($message);
var_dump($result);
$user = $this->request->input(‘user’, ‘Hyperf’);
$method = $this->request->getMethod();
return [
‘method’ => $method,
‘message’ => “{$user}。”,
];
}
最終效果
更多學習內容請訪問:
以上內容希望幫助到大家,
很多PHPer在進階的時候總會遇到一些問題和瓶頸,業務程式碼寫多了沒有方向感,不知道該從那裡入手去提升,對此我整理了一些資料,包括但不限於:分散式架構、高可擴充套件、高效能、高併發、伺服器效能調優、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql最佳化、shell指令碼、Docker、微服務、Nginx等多個知識點高階進階乾貨需要的可以免費分享給大家
,需要的可以加入
我的官方群點選此處。