仔细看***poser.json修改成你的
{
"name": "administrator/000",
"require": {
"php": "^8.0",
"workerman/gateway-worker": "^3.1"
},
"autoload": {
"psr-4": {
"Chat\\": "D:/phpstudy_pro/WWW/222/Chat/"
}
},
"scripts": {
},
"config": {
"optimize-autoloader": true,
"preferred-install": "dist"
}
}
start_register.php第一个运行的
<?php
/**
* 注册服务启动文件
*/
require_once __DIR__ . '/vendor/autoload.php';
use GatewayWorker\Register;
// 注册服务(端口可自定义)
$register = new Register('text://0.0.0.0:1236');
// 运行服务
\Workerman\Worker::runAll();
这是一个基于GatewayWorker框架的注册服务(Register)启动文件,主要用于分布式WebSocket架构中的服务注册和发现。
主要功能:
服务注册中心
作为GatewayWorker分布式架构的核心组件
负责维护所有Gateway和Worker服务的注册信息
网络配置
使用text://0.0.0.0:1236协议
监听所有网络接口的1236端口
为Gateway和Worker提供通信桥梁
分布式协调
管理多个Gateway进程的连接信息
协调Worker进程的任务分配
实现服务间的负载均衡
在GatewayWorker架构中的角色:
Register:注册中心(当前文件)
Gateway:网关服务,处理客户端连接
Worker:业务处理服务,处理具体业务逻辑
工作流程:
Gateway服务启动时向Register注册
Worker服务启动时向Register注册
Register维护服务路由表,实现消息转发
start_businessworker.php 第二个运行
<?php
/**
* 业务处理服务启动文件
*/
require_once __DIR__ . '/vendor/autoload.php';
use GatewayWorker\BusinessWorker;
// 业务处理服务
$businessWorker = new BusinessWorker();
$businessWorker->name = 'ChatBusinessWorker';
$businessWorker->count = 4;
$businessWorker->registerAddress = '127.0.0.1:1236';
$businessWorker->eventHandler = 'Chat\Events';
// 运行服务
\Workerman\Worker::runAll();
这是GatewayWorker框架中的业务处理服务(BusinessWorker)启动文件,主要负责处理实际的业务逻辑。代码功能详解:1.引入依赖phpCopy Code1require_once DIR . ‘/vendor/autoload.php’;
加载***poser自动加载器确保所有依赖类能够正确引入2.创建业务处理服务实例phpCopy Code1
b
u
s
i
n
e
s
s
W
o
r
k
e
r
=
n
e
w
B
u
s
i
n
e
s
s
W
o
r
k
e
r
(
)
;
实例化
B
u
s
i
n
e
s
s
W
o
r
k
e
r
核心类作为整个应用的业务逻辑处理中心
3.
配置业务处理服务
p
h
p
C
o
p
y
C
o
d
e
1234
businessWorker = new BusinessWorker(); 实例化BusinessWorker核心类作为整个应用的业务逻辑处理中心3.配置业务处理服务phpCopy Code1234
businessWorker=newBusinessWorker();实例化BusinessWorker核心类作为整个应用的业务逻辑处理中心3.配置业务处理服务phpCopyCode1234businessWorker->name = ‘ChatBusinessWorker’;
$businessWorker->count = 4;
$businessWorker->registerAddress = ‘127.0.0.1:1236’;
$businessWorker->eventHandler = ‘Chat\Events’;
name: 设置业务处理服务名称,用于标识和监控count: 启动4个业务处理进程,提高并发处理能力registerAddress: 连接注册服务地址(127.0.0.1:1236)eventHandler: 指定业务事件处理类为Chat\Events4.启动服务phpCopy Code1\Workerman\Worker::runAll();
运行所有Worker进程开始监听并处理来自Gateway的请求在分布式架构中的角色:接收Gateway转发来的客户端消息执行业务逻辑处理(如聊天消息处理、数据存储等)将处理结果返回给Gateway进行发送工作流程:1.Gateway接收到客户端消息2.Gateway将消息转发给BusinessWorker3.BusinessWorker调用Chat\Events类中的方法处理业务4.处理完成后,BusinessWorker通过Gateway向客户端发送响应
start_gateway.php 第三个运行
<?php
/**
* Gateway服务启动文件
* 基于WebSocket协议
*/
require_once __DIR__ . '/vendor/autoload.php';
use GatewayWorker\Gateway;
// 心跳间隔设置(秒)
define('HEARTBEAT_INTERVAL', 55);
// 允许的未响应次数
define('HEARTBEAT_MAX_RETRY', 1);
// Gateway服务(WebSocket协议)
$gateway = new Gateway('websocket://0.0.0.0:8289');
$gateway->name = 'ChatGateway';
$gateway->count = 2;
$gateway->lanIp = '127.0.0.1';
$gateway->startPort = 2300;
$gateway->registerAddress = '127.0.0.1:1236';
// 配置心跳机制
// 服务端每55秒检测一次连接
$gateway->pingInterval = HEARTBEAT_INTERVAL;
// 允许客户端1次未响应
$gateway->pingNotResponseLimit = HEARTBEAT_MAX_RETRY;
// 服务端发送的心跳数据(JSON格式)
$gateway->pingData = '{"type":"ping"}';
// 运行服务
\Workerman\Worker::runAll();
这是GatewayWorker框架中的网关服务(Gateway)启动文件,作为整个WebSocket应用的接入层和转发中心。代码详细解释:1.引入依赖和命名空间phpCopy Code12require_once DIR . ‘/vendor/autoload.php’;
use GatewayWorker\Gateway;
加载***poser自动加载器引入Gateway类2.心跳参数定义phpCopy Code12define(‘HEARTBEAT_INTERVAL’, 55);
define(‘HEARTBEAT_MAX_RETRY’, 1);
设置心跳检测间隔为55秒允许客户端1次心跳未响应3.创建网关服务实例phpCopy Code1
g
a
t
e
w
a
y
=
n
e
w
G
a
t
e
w
a
y
(
′
w
e
b
s
o
c
k
e
t
:
/
/
0.0.0.0
:
828
9
′
)
;
创建
W
e
b
S
o
c
k
e
t
协议网关,监听
8289
端口
0.0.0.0
表示监听所有网络接口
4.
网关服务配置
p
h
p
C
o
p
y
C
o
d
e
12345
gateway = new Gateway('websocket://0.0.0.0:8289'); 创建WebSocket协议网关,监听8289端口0.0.0.0表示监听所有网络接口4.网关服务配置phpCopy Code12345
gateway=newGateway(′websocket://0.0.0.0:8289′);创建WebSocket协议网关,监听8289端口0.0.0.0表示监听所有网络接口4.网关服务配置phpCopyCode12345gateway->name = ‘ChatGateway’; // 服务名称
$gateway->count = 2; // 启动2个网关进程
$gateway->lanIp = ‘127.0.0.1’; // 局域网IP
$gateway->startPort = 2300; // 内部通信起始端口
g
a
t
e
w
a
y
−
>
r
e
g
i
s
t
e
r
A
d
d
r
e
s
s
=
′
127.0.0.1
:
123
6
′
;
/
/
注册中心地址
5.
心跳机制配置
p
h
p
C
o
p
y
C
o
d
e
123
gateway->registerAddress = '127.0.0.1:1236'; // 注册中心地址 5.心跳机制配置phpCopy Code123
gateway−>registerAddress=′127.0.0.1:1236′;//注册中心地址5.心跳机制配置phpCopyCode123gateway->pingInterval = HEARTBEAT_INTERVAL; // 55秒检测一次
$gateway->pingNotResponseLimit = HEARTBEAT_MAX_RETRY; // 允许1次未响应
$gateway->pingData = ‘{“type”:“ping”}’; // 心跳数据包
在分布式架构中的核心作用:客户端接入点:处理所有WebSocket客户端连接消息转发中心:将客户端消息转发给BusinessWorker处理连接管理器:维护客户端连接状态和心跳检测负载均衡器:在多个BusinessWorker间分配请求工作流程:1.客户端通过8289端口连接Gateway2.Gateway维护连接并进行心跳检测3.Gateway将业务消息转发给注册的BusinessWorker4.Gateway接收BusinessWorker处理结果并发送给客户端这个网关服务是整个WebSocket应用的入口,负责管理所有客户端连接、保证连接活跃性,并实现业务逻辑与网络通信的分离。
client.html
<!DOCTYPE html>
<html lang="zh-***">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket 客户端测试</title>
<script src="https://cdn.tailwindcss.***"></script>
<link href="https://cdn.jsdelivr.***/npm/font-awesome@4.7.0/css/font-awesome.min.css" rel="stylesheet">
<script>
tailwind.config = {
theme: {
extend: {
colors: {
primary: '#3b82f6',
secondary: '#64748b',
su***ess: '#10b981',
warning: '#f59e0b',
danger: '#ef4444',
info: '#06b6d4',
dark: '#1e293b',
light: '#f8fafc'
},
fontFamily: {
sans: ['Inter', 'system-ui', 'sans-serif']
}
}
}
}
</script>
<style type="text/tailwindcss">
@layer utilities {
.glass-effect {
background: rgba(255, 255, 255, 0.2);
backdrop-filter: blur(8px);
border: 1px solid rgba(255, 255, 255, 0.3);
}
.text-shadow {
text-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
}
.transition-all-300 {
transition: all 300ms ease-in-out;
}
}
</style>
</head>
<body class="bg-gradient-to-br from-blue-50 to-indigo-100 min-h-screen">
<div class="container mx-auto px-4 py-8 max-w-4xl">
<div class="bg-white rounded-xl shadow-xl overflow-hidden">
<!-- 顶部标题栏 -->
<div class="bg-primary text-white p-4 flex justify-between items-center">
<h1 class="text-xl font-bold flex items-center">
<i class="fa fa-***ments mr-2"></i>
WebSocket 聊天测试
</h1>
<div class="flex items-center space-x-2">
<span id="connection-status" class="inline-flex items-center px-2 py-1 rounded text-xs font-medium bg-red-500">
<span class="w-2 h-2 rounded-full bg-white mr-1 animate-pulse"></span>
未连接
</span>
<button id="connect-btn" class="px-3 py-1 bg-white text-primary rounded hover:bg-opacity-90 transition-all-300">
<i class="fa fa-plug mr-1"></i> 连接
</button>
</div>
</div>
<!-- 登录表单 -->
<div id="login-form" class="p-6">
<div class="mb-4">
<label for="username" class="block text-sm font-medium text-gray-700 mb-1">用户名</label>
<div class="flex">
<span class="inline-flex items-center px-3 py-2 border border-r-0 border-gray-300 rounded-l-md bg-gray-50 text-gray-500">
<i class="fa fa-user"></i>
</span>
<input type="text" id="username" class="flex-1 px-3 py-2 border border-gray-300 rounded-r-md focus:outline-none focus:ring-2 focus:ring-primary focus:border-transparent" placeholder="请输入用户名" value="User_<?php echo rand(1000, 9999); ?>">
</div>
</div>
<div class="mb-4">
<label for="ws-url" class="block text-sm font-medium text-gray-700 mb-1">WebSocket 地址</label>
<div class="flex">
<span class="inline-flex items-center px-3 py-2 border border-r-0 border-gray-300 rounded-l-md bg-gray-50 text-gray-500">
<i class="fa fa-link"></i>
</span>
<input type="text" id="ws-url" class="flex-1 px-3 py-2 border border-gray-300 rounded-r-md focus:outline-none focus:ring-2 focus:ring-primary focus:border-transparent" value="ws://localhost:8289">
</div>
</div>
<button id="login-btn" class="w-full py-2 bg-primary text-white rounded-md hover:bg-primary/90 transition-all-300 flex items-center justify-center">
<i class="fa fa-sign-in mr-2"></i> 登录并连接
</button>
</div>
<!-- 聊天区域 -->
<div id="chat-area" class="hidden">
<!-- 聊天记录 -->
<div id="message-container" class="h-96 overflow-y-auto p-4 bg-gray-50">
<div class="flex justify-center mb-4">
<span class="bg-gray-200 text-gray-600 text-xs px-2 py-1 rounded-full">今天</span>
</div>
</div>
<!-- 输入区域 -->
<div class="border-t p-4">
<div class="flex">
<input type="text" id="message-input" class="flex-1 px-3 py-2 border border-gray-300 rounded-l-md focus:outline-none focus:ring-2 focus:ring-primary focus:border-transparent" placeholder="请输入消息...">
<button id="send-btn" class="px-4 py-2 bg-primary text-white rounded-r-md hover:bg-primary/90 transition-all-300">
<i class="fa fa-paper-plane"></i>
</button>
</div>
</div>
</div>
<!-- 状态信息 -->
<div class="bg-gray-50 text-xs text-gray-500 p-2 flex justify-between">
<div id="status-info">
<span id="client-id">未连接</span>
</div>
<div id="heartbeat-info">
<span id="last-heartbeat">最后心跳: 无</span>
</div>
</div>
</div>
<!-- 调试信息 -->
<div class="mt-6 bg-white rounded-xl shadow-lg p-4">
<h3 class="text-lg font-semibold mb-2 flex items-center">
<i class="fa fa-bug text-danger mr-2"></i>
调试信息
</h3>
<div id="debug-container" class="h-40 overflow-y-auto bg-gray-50 p-3 rounded-md text-xs font-mono">
<div class="text-gray-500">等待连接...</div>
</div>
</div>
</div>
<script>
document.addEventListener('DOMContentLoaded', function() {
// DOM 元素
const loginForm = document.getElementById('login-form');
const chatArea = document.getElementById('chat-area');
const connectBtn = document.getElementById('connect-btn');
const loginBtn = document.getElementById('login-btn');
const sendBtn = document.getElementById('send-btn');
const messageInput = document.getElementById('message-input');
const messageContainer = document.getElementById('message-container');
const connectionStatus = document.getElementById('connection-status');
const clientIdDisplay = document.getElementById('client-id');
const lastHeartbeatDisplay = document.getElementById('last-heartbeat');
const debugContainer = document.getElementById('debug-container');
// 变量
let ws = null;
let username = '';
let wsUrl = '';
let heartbeatInterval = null;
let heartbeatTimeout = null;
let reconnectInterval = null;
// 配置
const HEARTBEAT_INTERVAL = 50000; // 50秒发送一次心跳
const HEARTBEAT_TIMEOUT = 10000; // 10秒未收到响应则认为连接断开
const RECONNECT_DELAY = 5000; // 5秒后重连
// 连接 WebSocket
function connectWebSocket() {
try {
// 关闭现有连接
if (ws) {
ws.close();
}
// 创建新连接
ws = new WebSocket(wsUrl);
// 连接成功
ws.onopen = function() {
logDebug('WebSocket 连接成功');
updateConnectionStatus(true);
// 登录
sendLogin();
// 启动心跳
startHeartbeat();
// 清除重连定时器
if (reconnectInterval) {
clearInterval(reconnectInterval);
reconnectInterval = null;
}
};
// 接收消息
ws.onmessage = function(event) {
const message = event.data;
logDebug('收到消息: ' + message);
try {
const data = JSON.parse(message);
handleMessage(data);
} catch (e) {
logDebug('解析消息失败: ' + e.message);
addSystemMessage('收到未知格式消息');
}
};
// 连接关闭
ws.onclose = function(event) {
logDebug('WebSocket 连接关闭,代码: ' + event.code + ', 原因: ' + event.reason);
updateConnectionStatus(false);
// 停止心跳
stopHeartbeat();
// 显示重连提示
addSystemMessage('连接已断开,正在尝试重连...');
// 启动重连
if (!reconnectInterval) {
reconnectInterval = setInterval(connectWebSocket, RECONNECT_DELAY);
}
};
// 连接错误
ws.onerror = function(error) {
logDebug('WebSocket 错误: ' + error.message);
updateConnectionStatus(false);
};
} catch (e) {
logDebug('连接失败: ' + e.message);
updateConnectionStatus(false);
}
}
// 发送登录消息
function sendLogin() {
if (ws && ws.readyState === WebSocket.OPEN) {
const loginData = {
type: 'login',
username: username
};
ws.send(JSON.stringify(loginData));
logDebug('发送登录消息: ' + JSON.stringify(loginData));
}
}
// 发送消息
function sendMessage() {
const content = messageInput.value.trim();
if (!content) return;
if (ws && ws.readyState === WebSocket.OPEN) {
const messageData = {
type: 'chat',
content: content
};
ws.send(JSON.stringify(messageData));
logDebug('发送消息: ' + JSON.stringify(messageData));
// 添加到消息列表
addMessage(username, content, true);
// 清空输入框
messageInput.value = '';
} else {
addSystemMessage('连接未建立,无法发送消息');
}
}
// 发送心跳
function sendHeartbeat() {
if (ws && ws.readyState === WebSocket.OPEN) {
const heartbeatData = {
type: 'ping'
};
ws.send(JSON.stringify(heartbeatData));
logDebug('发送心跳: ' + JSON.stringify(heartbeatData));
// 设置心跳超时
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
}
heartbeatTimeout = setTimeout(function() {
logDebug('心跳超时,连接可能已断开');
if (ws) {
ws.close();
}
}, HEARTBEAT_TIMEOUT);
}
}
// 启动心跳
function startHeartbeat() {
// 立即发送一次心跳
sendHeartbeat();
// 设置定时心跳
if (heartbeatInterval) {
clearInterval(heartbeatInterval);
}
heartbeatInterval = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL);
}
// 停止心跳
function stopHeartbeat() {
if (heartbeatInterval) {
clearInterval(heartbeatInterval);
heartbeatInterval = null;
}
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
heartbeatTimeout = null;
}
}
// 处理消息
function handleMessage(data) {
switch (data.type) {
case 'system':
addSystemMessage(data.message);
break;
case 'chat':
addMessage(data.client_id === clientIdDisplay.textContent ? username : data.client_id, data.content, data.client_id === clientIdDisplay.textContent);
break;
case 'pong':
updateLastHeartbeat();
logDebug('收到心跳响应');
// 清除心跳超时
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
heartbeatTimeout = null;
}
break;
case 'login':
if (data.status === 'su***ess') {
loginForm.classList.add('hidden');
chatArea.classList.remove('hidden');
clientIdDisplay.textContent = data.client_id || '未知';
addSystemMessage(data.message);
} else {
addSystemMessage('登录失败: ' + data.message);
}
break;
case 'error':
addSystemMessage('错误: ' + data.message);
break;
}
}
// 添加系统消息
function addSystemMessage(message) {
const messageElement = document.createElement('div');
messageElement.className = 'flex justify-center my-2';
messageElement.innerHTML = `
<span class="bg-gray-200 text-gray-600 text-xs px-2 py-1 rounded-full">${message}</span>
`;
messageContainer.appendChild(messageElement);
scrollToBottom();
}
// 添加聊天消息
function addMessage(sender, content, isSelf) {
const messageElement = document.createElement('div');
messageElement.className = `flex ${isSelf ? 'justify-end' : 'justify-start'} my-2`;
const messageContent = document.createElement('div');
messageContent.className = `max-w-[80%] px-3 py-2 rounded-lg ${isSelf ? 'bg-primary text-white' : 'bg-white border border-gray-200'}`;
const senderElement = document.createElement('div');
senderElement.className = `text-xs ${isSelf ? 'text-right text-primary/80' : 'text-left text-gray-500'} mb-1`;
senderElement.textContent = sender;
const contentElement = document.createElement('div');
contentElement.textContent = content;
const timeElement = document.createElement('div');
timeElement.className = `text-xs ${isSelf ? 'text-right text-primary/60' : 'text-left text-gray-400'} mt-1`;
timeElement.textContent = new Date().toLocaleTimeString();
messageContent.appendChild(senderElement);
messageContent.appendChild(contentElement);
messageContent.appendChild(timeElement);
messageElement.appendChild(messageContent);
messageContainer.appendChild(messageElement);
scrollToBottom();
}
// 滚动到底部
function scrollToBottom() {
messageContainer.scrollTop = messageContainer.scrollHeight;
}
// 更新连接状态
function updateConnectionStatus(isConnected) {
if (isConnected) {
connectionStatus.className = 'inline-flex items-center px-2 py-1 rounded text-xs font-medium bg-green-500';
connectionStatus.innerHTML = `
<span class="w-2 h-2 rounded-full bg-white mr-1"></span>
已连接
`;
connectBtn.innerHTML = '<i class="fa fa-unplug mr-1"></i> 断开';
} else {
connectionStatus.className = 'inline-flex items-center px-2 py-1 rounded text-xs font-medium bg-red-500';
connectionStatus.innerHTML = `
<span class="w-2 h-2 rounded-full bg-white mr-1 animate-pulse"></span>
未连接
`;
connectBtn.innerHTML = '<i class="fa fa-plug mr-1"></i> 连接';
}
}
// 更新最后心跳时间
function updateLastHeartbeat() {
const now = new Date();
const timeString = now.toLocaleTimeString();
lastHeartbeatDisplay.textContent = `最后心跳: ${timeString}`;
}
// 日志调试信息
function logDebug(message) {
const now = new Date();
const timeString = now.toLocaleTimeString();
const debugElement = document.createElement('div');
debugElement.className = 'mb-1';
debugElement.innerHTML = `<span class="text-gray-500">[${timeString}]</span> ${message}`;
debugContainer.appendChild(debugElement);
debugContainer.scrollTop = debugContainer.scrollHeight;
}
// 事件监听
loginBtn.addEventListener('click', function() {
username = document.getElementById('username').value.trim();
wsUrl = document.getElementById('ws-url').value.trim();
if (!username) {
alert('请输入用户名');
return;
}
if (!wsUrl) {
alert('请输入WebSocket地址');
return;
}
logDebug('尝试连接: ' + wsUrl);
connectWebSocket();
});
connectBtn.addEventListener('click', function() {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.close();
} else {
username = document.getElementById('username').value.trim() || 'Anonymous';
wsUrl = document.getElementById('ws-url').value.trim() || 'ws://localhost:8289';
connectWebSocket();
}
});
sendBtn.addEventListener('click', sendMessage);
messageInput.addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
// 初始化
logDebug('页面加载完成');
logDebug('心跳间隔: ' + (HEARTBEAT_INTERVAL / 1000) + '秒');
logDebug('心跳超时: ' + (HEARTBEAT_TIMEOUT / 1000) + '秒');
logDebug('重连延迟: ' + (RECONNECT_DELAY / 1000) + '秒');
});
</script>
</body>
</html>
Chat/Events.php
<?php
/**
* 事件处理类
* 处理WebSocket连接、消息、断开等事件
*/
namespace Chat;
use GatewayWorker\Lib\Gateway;
class Events
{
/**
* 当客户端连接时触发
* @param int $client_id 客户端ID
* @param mixed $data 连接参数
*/
public static function onConnect($client_id)
{
// 记录连接时间
Gateway::setSession($client_id, [
'last_active_time' => time(),
'client_id' => $client_id,
'connected_at' => date('Y-m-d H:i:s')
]);
echo "客户端连接: $client_id\n";
// 向客户端发送连接成功消息
Gateway::sendToClient($client_id, json_encode([
'type' => 'system',
'message' => '连接成功',
'client_id' => $client_id,
'timestamp' => time()
]));
}
/**
* 当客户端断开连接时触发
* @param int $client_id 客户端ID
*/
public static function onClose($client_id)
{
$session = Gateway::getSession($client_id);
$disconnect_time = date('Y-m-d H:i:s');
echo "客户端断开: $client_id\n";
// 广播用户离开消息
$message = json_encode([
'type' => 'system',
'message' => "用户 $client_id 已离开",
'client_id' => $client_id,
'timestamp' => time()
]);
Gateway::sendToAll($message);
}
/**
* 当客户端发送消息时触发
* @param int $client_id 客户端ID
* @param mixed $message 消息内容
*/
public static function onMessage($client_id, $message)
{
try {
// 更新最后活跃时间
Gateway::updateSession($client_id, ['last_active_time' => time()]);
echo "收到消息 from $client_id: $message\n";
// 解析JSON消息
$data = json_decode($message, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('消息格式错误');
}
// 处理不同类型的消息
switch ($data['type']) {
case 'ping':
// 收到客户端心跳,回复pong
Gateway::sendToClient($client_id, json_encode([
'type' => 'pong',
'timestamp' => time()
]));
break;
case 'chat':
// 处理聊天消息
if (empty($data['content'])) {
throw new \Exception('消息内容不能为空');
}
// 广播消息给所有客户端
$response = [
'type' => 'chat',
'client_id' => $client_id,
'content' => $data['content'],
'timestamp' => time(),
'datetime' => date('Y-m-d H:i:s')
];
Gateway::sendToAll(json_encode($response));
break;
case 'login':
// 处理登录消息
$username = isset($data['username']) ? $data['username'] : "User_$client_id";
Gateway::updateSession($client_id, ['username' => $username]);
// 广播用户加入消息
Gateway::sendToAll(json_encode([
'type' => 'system',
'message' => "用户 $username 已加入",
'client_id' => $client_id,
'username' => $username,
'timestamp' => time()
]));
// 回复登录成功
Gateway::sendToClient($client_id, json_encode([
'type' => 'login',
'status' => 'su***ess',
'message' => "欢迎 $username",
'username' => $username,
'timestamp' => time()
]));
break;
default:
// 未知消息类型
Gateway::sendToClient($client_id, json_encode([
'type' => 'error',
'message' => '未知消息类型',
'timestamp' => time()
]));
}
} catch (\Exception $e) {
// 错误处理
Gateway::sendToClient($client_id, json_encode([
'type' => 'error',
'message' => $e->getMessage(),
'timestamp' => time()
]));
echo "处理消息错误: " . $e->getMessage() . "\n";
}
}
}
解释:
这是GatewayWorker框架中的业务事件处理类,负责处理WebSocket连接的所有业务逻辑。类结构说明命名空间和依赖位于Chat命名空间使用GatewayWorker\Lib\Gateway类进行消息收发核心方法功能1. onConnect - 连接建立处理记录客户端连接时间和会话信息发送连接成功确认消息输出连接日志信息2. onClose - 连接断开处理获取客户端会话信息广播用户离开通知记录断开时间3. onMessage - 消息处理(核心)更新客户端最后活跃时间解析JSON格式消息并进行验证通过switch-case处理不同类型的消息消息类型处理心跳消息(ping)收到客户端心跳包回复pong响应聊天消息(chat)验证消息内容不为空构建响应消息包含时间戳和内容广播给所有在线客户端登录消息(login)处理用户登录逻辑设置用户名到会话中广播用户加入通知回复登录成功确认错误处理机制捕获JSON解析错误处理业务逻辑异常向客户端返回错误信息在分布式架构中的角色这个Events类作为业务逻辑的核心处理器:接收Gateway转发的客户端消息执行相应的业务操作通过Gateway发送处理结果这是一个典型的事件驱动架构实现,将网络通信与业务逻辑完全分离,提高了系统的可维护性和扩展性。