R handler

This commit is contained in:
a6a2f5842 2025-12-05 20:53:52 +01:00
parent 55c819700b
commit 21037b617b
1 changed files with 461 additions and 323 deletions

View File

@ -28,6 +28,10 @@ use Ratchet\WebSocket\MessageComponentInterface;
class Handler implements MessageComponentInterface class Handler implements MessageComponentInterface
{ {
/**
* Track channel connections using associative arrays for O(1) lookup
* Structure: [channel_name => [socket_id => true]]
*/
protected $channel_connections = []; protected $channel_connections = [];
/** /**
@ -41,36 +45,18 @@ class Handler implements MessageComponentInterface
public function onOpen(ConnectionInterface $connection) public function onOpen(ConnectionInterface $connection)
{ {
try {
if (! $this->connectionCanBeMade($connection)) { if (! $this->connectionCanBeMade($connection)) {
return $connection->close(); return $connection->close();
} }
// Set IP to connection try {
$connection->remoteAddress = trim( $this->setupConnectionAddress($connection);
explode(
',',
$connection->httpRequest->getHeaderLine('X-Forwarded-For')
)[0] ?? $connection->remoteAddress
);
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
Log::channel('websocket')->info('WS onOpen IP: ' . $connection->remoteAddress);
$this->verifyAppKey($connection); $this->verifyAppKey($connection);
$this->verifyOrigin($connection); $this->verifyOrigin($connection);
$this->limitConcurrentConnections($connection); $this->limitConcurrentConnections($connection);
$this->generateSocketId($connection); $this->generateSocketId($connection);
$this->establishConnection($connection); $this->establishConnection($connection);
$this->initializeAppConnection($connection);
if (isset($connection->app)) {
$this->channelManager->subscribeToApp($connection->app->id);
$this->channelManager->connectionPonged($connection);
NewConnection::dispatch(
$connection->app->id,
$connection->socketId
);
}
} catch (UnknownAppKey $e) { } catch (UnknownAppKey $e) {
Log::channel('websocket')->error('Root level error: ' . $e->getMessage(), [ Log::channel('websocket')->error('Root level error: ' . $e->getMessage(), [
'file' => $e->getFile(), 'file' => $e->getFile(),
@ -84,11 +70,11 @@ class Handler implements MessageComponentInterface
ConnectionInterface $connection, ConnectionInterface $connection,
MessageInterface $message MessageInterface $message
) { ) {
try {
if (!isset($connection->app)) { if (!isset($connection->app)) {
return; return;
} }
try {
request()->server->set('REMOTE_ADDR', $connection->remoteAddress); request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
PusherMessageFactory::createForMessage( PusherMessageFactory::createForMessage(
@ -97,61 +83,155 @@ class Handler implements MessageComponentInterface
$this->channelManager $this->channelManager
)->respond(); )->respond();
// Payload json to array $message = json_decode($message->getPayload(), true, 512, JSON_THROW_ON_ERROR);
$message = json_decode($message->getPayload(), true);
// Cut short for ping pong if ($this->handlePingPong($message, $connection)) {
if ( return;
(strtolower($message['event']) === 'pusher:ping')
|| (strtolower($message['event']) === 'pusher.ping')
) {
$this->channelManager->connectionPonged($connection);
return gc_collect_cycles();
} }
$channel = $this->handleChannelSubscriptions($message, $connection); $channel = $this->handleChannelSubscriptions($message, $connection);
if (! optional($channel)->hasConnection($connection) && !( if ($this->shouldRejectMessage($channel, $connection, $message)) {
$message['event'] !== 'pusher:unsubscribe' return;
&& $message['event'] !== 'pusher.unsubscribe' }
)) {
return $connection->send(json_encode([ $this->authenticateConnection($connection, $channel, $message);
\Log::channel('websocket')->info('[' . $connection->socketId . ']@' . $channel->getName() . ' | ' . json_encode($message));
if ($this->handlePusherEvent($message, $connection)) {
return;
}
$this->forkAndProcessMessage($connection, $channel, $message);
} catch (\Throwable $e) {
$this->handleMessageError($e);
}
}
/**
* Handle the websocket close.
*/
public function onClose(ConnectionInterface $connection): void
{
$this->authenticateConnection($connection, null);
if (isset($connection->remoteAddress)) {
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
}
$this->cleanupChannelConnections($connection);
$this->finalizeConnectionClose($connection);
}
protected function setupConnectionAddress(ConnectionInterface $connection): void
{
$connection->remoteAddress = trim(
explode(
',',
$connection->httpRequest->getHeaderLine('X-Forwarded-For')
)[0] ?? $connection->remoteAddress
);
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
Log::channel('websocket')->info('WS onOpen IP: ' . $connection->remoteAddress);
}
protected function initializeAppConnection(ConnectionInterface $connection): void
{
if (!isset($connection->app)) {
return;
}
$this->channelManager->subscribeToApp($connection->app->id);
$this->channelManager->connectionPonged($connection);
NewConnection::dispatch(
$connection->app->id,
$connection->socketId
);
}
protected function handlePingPong(array $message, ConnectionInterface $connection): bool
{
$eventLower = strtolower($message['event']);
if ($eventLower !== 'pusher:ping' && $eventLower !== 'pusher.ping') {
return false;
}
$this->channelManager->connectionPonged($connection);
gc_collect_cycles();
return true;
}
protected function shouldRejectMessage(?Channel $channel, ConnectionInterface $connection, array $message): bool
{
$isUnsubscribe = $message['event'] === 'pusher:unsubscribe' || $message['event'] === 'pusher.unsubscribe';
if (!$channel?->hasConnection($connection) && !$isUnsubscribe) {
$connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'] . ':error',
'data' => [ 'data' => [
'message' => 'Subscription not established', 'message' => 'Subscription not established',
'meta' => $message, 'meta' => $message,
], ],
])); ]));
return true;
} }
if (!$channel) { if (!$channel) {
return $connection->send(json_encode([ $connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'] . ':error',
'data' => [ 'data' => [
'message' => 'Channel not found', 'message' => 'Channel not found',
'meta' => $message, 'meta' => $message,
], ],
])); ]));
return true;
} }
$this->authenticateConnection($connection, $channel, $message); return false;
}
\Log::channel('websocket')->info('[' . $connection->socketId . ']@' . $channel->getName() . ' | ' . json_encode($message)); protected function handlePusherEvent(array $message, ConnectionInterface $connection): bool
{
if (!str_contains($message['event'], 'pusher')) {
return false;
}
if (strpos($message['event'], 'pusher') !== false) { $connection->send(json_encode([
return $connection->send(json_encode([
'event' => $message['event'] . ':response', 'event' => $message['event'] . ':response',
'data' => [ 'data' => [
'message' => 'Success', 'message' => 'Success',
], ],
])); ]));
return true;
} }
protected function forkAndProcessMessage(
ConnectionInterface $connection,
Channel $channel,
array $message
): void {
$pid = pcntl_fork(); $pid = pcntl_fork();
if ($pid == -1) { if ($pid === -1) {
Log::error('Fork error'); Log::error('Fork error');
} elseif ($pid == 0) { return;
}
if ($pid === 0) {
$this->processMessageInChild($connection, $channel, $message);
exit(0);
}
$this->addDataCheckLoop($connection, $message, $pid);
}
protected function processMessageInChild(
ConnectionInterface $connection,
Channel $channel,
array $message
): void {
try { try {
DB::disconnect(); DB::disconnect();
DB::reconnect(); DB::reconnect();
@ -166,7 +246,6 @@ class Handler implements MessageComponentInterface
$this->channelManager $this->channelManager
); );
// Run deferred callbacks
\Illuminate\Container\Container::getInstance() \Illuminate\Container\Container::getInstance()
->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class) ->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class)
->invokeWhen(fn($callback) => true); ->invokeWhen(fn($callback) => true);
@ -178,88 +257,72 @@ class Handler implements MessageComponentInterface
], ],
])); ]));
// if sentry is defined capture exception
if (app()->bound('sentry')) { if (app()->bound('sentry')) {
app('sentry')->captureException($e); app('sentry')->captureException($e);
} }
} }
exit(0);
} else {
$this->addDataCheckLoop($connection, $message, $pid);
} }
} catch (\Throwable $e) {
protected function handleMessageError(\Throwable $e): void
{
Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [ Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [
'file' => $e->getFile(), 'file' => $e->getFile(),
'line' => $e->getLine(), 'line' => $e->getLine(),
'trace' => $e->getTraceAsString(), 'trace' => $e->getTraceAsString(),
]); ]);
// if sentry is defined capture exception
if (app()->bound('sentry')) { if (app()->bound('sentry')) {
app('sentry')->captureException($e); app('sentry')->captureException($e);
} }
} }
}
/** protected function cleanupChannelConnections(ConnectionInterface $connection): void
* Handle the websocket close.
*/
public function onClose(ConnectionInterface $connection): void
{ {
$this->authenticateConnection($connection, null); $cacheUpdates = [];
$cacheDeletes = ['ws_socket_auth_' . $connection->socketId];
if (@$connection?->remoteAddress) {
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
}
// remove connection from $channel_connections
foreach ($this->channel_connections as $channel => $connections) { foreach ($this->channel_connections as $channel => $connections) {
if (in_array($connection->socketId, $connections)) { if (!isset($connections[$connection->socketId])) {
$this->channel_connections[$channel] = array_diff($connections, [$connection->socketId]); continue;
} }
if (empty(@$this->channel_connections[$channel])) { unset($this->channel_connections[$channel][$connection->socketId]);
if (empty($this->channel_connections[$channel])) {
unset($this->channel_connections[$channel]); unset($this->channel_connections[$channel]);
$cacheDeletes[] = 'ws_channel_connections_' . $channel;
continue;
} }
cache()->forget( $cacheUpdates['ws_channel_connections_' . $channel] = array_keys($this->channel_connections[$channel]);
'ws_socket_auth_' . $connection->socketId,
);
if (@$this->channel_connections[$channel]) {
cache()->forever(
'ws_channel_connections_' . $channel,
@$this->channel_connections[$channel]
);
} else {
cache()->forget('ws_channel_connections_' . $channel);
} }
cache()->forever( $cacheUpdates['ws_active_channels'] = array_keys($this->channel_connections);
'ws_active_channels',
array_keys($this->channel_connections)
);
$authed_users = cache()->get('ws_socket_authed_users') ?? []; $authed_users = cache()->get('ws_socket_authed_users') ?? [];
unset($authed_users[$connection->socketId]); unset($authed_users[$connection->socketId]);
cache()->forever('ws_socket_authed_users', $authed_users); $cacheUpdates['ws_socket_authed_users'] = $authed_users;
cache()->setMultiple($cacheUpdates);
cache()->deleteMultiple($cacheDeletes);
\BlaxSoftware\LaravelWebSockets\Services\WebsocketService::clearUserAuthed( \BlaxSoftware\LaravelWebSockets\Services\WebsocketService::clearUserAuthed(
$connection->socketId $connection->socketId
); );
} }
protected function finalizeConnectionClose(ConnectionInterface $connection): void
{
$this->channelManager $this->channelManager
->unsubscribeFromAllChannels($connection) ->unsubscribeFromAllChannels($connection)
->then(function (bool $unsubscribed) use ($connection): void { ->then(function (bool $unsubscribed) use ($connection): void {
if (isset($connection->app)) { if (!isset($connection->app)) {
$this->channelManager->unsubscribeFromApp($connection->app->id); return;
ConnectionClosed::dispatch($connection->app->id, $connection->socketId);
cache()->forget('ws_connection_' . $connection->socketId);
} }
$this->channelManager->unsubscribeFromApp($connection->app->id);
ConnectionClosed::dispatch($connection->app->id, $connection->socketId);
cache()->forget('ws_connection_' . $connection->socketId);
}); });
} }
@ -389,7 +452,7 @@ class Handler implements MessageComponentInterface
protected function get_connection_channel(&$connection, &$message): ?Channel protected function get_connection_channel(&$connection, &$message): ?Channel
{ {
// Put channel on its place // Put channel on its place
if (! @$message['channel'] && $message['data'] && $message['data']['channel']) { if (! isset($message['channel']) && isset($message['data']['channel'])) {
$message['channel'] = $message['data']['channel']; $message['channel'] = $message['data']['channel'];
unset($message['data']['channel']); unset($message['data']['channel']);
} }
@ -408,81 +471,85 @@ class Handler implements MessageComponentInterface
protected function handleChannelSubscriptions($message, $connection): ?Channel protected function handleChannelSubscriptions($message, $connection): ?Channel
{ {
$channel = $this->get_connection_channel($connection, $message); $channel = $this->get_connection_channel($connection, $message);
$channel_name = optional($channel)->getName(); $channel_name = $channel?->getName();
$socket_id = $connection->socketId;
if (!$channel_name || !$channel) { if (!$channel_name || !$channel) {
return null; return null;
} }
// if not in $channel_connections add it $eventLower = strtolower($message['event']);
if (
(strtolower($message['event']) === 'pusher.subscribe') if ($eventLower === 'pusher.subscribe' || $eventLower === 'pusher:subscribe') {
|| (strtolower($message['event']) === 'pusher:subscribe') $this->handleSubscription($channel, $channel_name, $connection, $message);
) { }
if (str_contains($message['event'], '.unsubscribe')) {
$this->handleUnsubscription($channel, $channel_name, $connection);
}
return $channel;
}
protected function handleSubscription(
Channel $channel,
string $channel_name,
ConnectionInterface $connection,
array $message
): void {
if (!isset($this->channel_connections[$channel_name])) { if (!isset($this->channel_connections[$channel_name])) {
$this->channel_connections[$channel_name] = []; $this->channel_connections[$channel_name] = [];
} }
if (! in_array($connection->socketId, $this->channel_connections[$this->get_connection_channel($connection, $message)->getName()])) { if (!isset($this->channel_connections[$channel_name][$connection->socketId])) {
$this->channel_connections[$channel_name][] = $connection->socketId; $this->channel_connections[$channel_name][$connection->socketId] = true;
} }
cache()->forever( cache()->setMultiple([
'ws_channel_connections_' . $channel_name, 'ws_channel_connections_' . $channel_name => array_keys($this->channel_connections[$channel_name]),
$this->channel_connections[$channel_name] 'ws_active_channels' => array_keys($this->channel_connections)
); ]);
cache()->forever( if ($channel->hasConnection($connection)) {
'ws_active_channels', return;
array_keys($this->channel_connections) }
);
if (! $channel->hasConnection($connection)) {
try { try {
$channel->subscribe($connection, (object) $message); $channel->subscribe($connection, (object) $message);
} catch (\Throwable $e) { } catch (\Throwable $e) {
return null; // Silently handle subscription errors
}
} }
} }
if (strpos($message['event'], '.unsubscribe') !== false) { protected function handleUnsubscription(
if (isset($this->channel_connections[$channel_name])) { Channel $channel,
$this->channel_connections[$channel_name] = array_diff($this->channel_connections[$channel_name], [$socket_id]); string $channel_name,
ConnectionInterface $connection
): void {
if (isset($this->channel_connections[$channel_name][$connection->socketId])) {
unset($this->channel_connections[$channel_name][$connection->socketId]);
} }
if (empty($this->channel_connections[$channel_name])) { if (empty($this->channel_connections[$channel_name])) {
unset($this->channel_connections[$channel_name]); unset($this->channel_connections[$channel_name]);
}
if (@$this->channel_connections[$channel_name]) {
cache()->forever(
'ws_channel_connections_' . $channel_name,
$this->channel_connections[$channel_name]
);
} else {
cache()->forget('ws_channel_connections_' . $channel_name); cache()->forget('ws_channel_connections_' . $channel_name);
cache()->forever('ws_active_channels', array_keys($this->channel_connections));
} else {
cache()->setMultiple([
'ws_channel_connections_' . $channel_name => array_keys($this->channel_connections[$channel_name]),
'ws_active_channels' => array_keys($this->channel_connections)
]);
} }
cache()->forever(
'ws_active_channels',
array_keys($this->channel_connections)
);
$channel->unsubscribe($connection); $channel->unsubscribe($connection);
} }
return $channel;
}
protected function setRequest($message, $connection) protected function setRequest($message, $connection)
{ {
foreach (request()->keys() as $key) { foreach (request()->keys() as $key) {
request()->offsetUnset($key); request()->offsetUnset($key);
} }
request()->merge(@$message['data'] ?? []); request()->merge($message['data'] ?? []);
} }
protected function authenticateConnection( protected function authenticateConnection(
@ -490,43 +557,65 @@ class Handler implements MessageComponentInterface
PrivateChannel|Channel|PresenceChannel|null $channel, PrivateChannel|Channel|PresenceChannel|null $channel,
$message = [] $message = []
) { ) {
$this->loadCachedAuth($connection, $channel);
$this->ensureUserIsSet($connection, $channel);
$this->updateAuthState($connection);
$this->cacheAuthenticatedUser($connection);
$this->scheduleLogout();
}
if ( protected function loadCachedAuth(ConnectionInterface $connection, $channel): void
!optional($connection)->auth {
&& $connection->socketId if (isset($connection->auth)) {
&& ($cached_auth = cache()->get('socket_' . $connection->socketId)) return;
&& @$cached_auth['type'] }
) {
$connection->user = @$cached_auth['type']::find($cached_auth['id']); if (!$connection->socketId) {
return;
}
$cached_auth = cache()->get('socket_' . $connection->socketId);
if (!$cached_auth || !isset($cached_auth['type'])) {
return;
}
$connection->user = $cached_auth['type']::find($cached_auth['id']);
if ($channel) { if ($channel) {
$channel->saveConnection($connection); $channel->saveConnection($connection);
} }
} }
// Update last online of user if user protected function ensureUserIsSet(ConnectionInterface $connection, $channel): void
if (! optional($connection)->user) { {
if (isset($connection->user) && $connection->user) {
return;
}
$connection->user = false; $connection->user = false;
if ($channel) { if ($channel) {
$channel->saveConnection($connection); $channel->saveConnection($connection);
} }
} }
// Set auth or logout protected function updateAuthState(ConnectionInterface $connection): void
($connection->user) {
$connection->user
? Auth::login($connection->user) ? Auth::login($connection->user)
: Auth::logout(); : Auth::logout();
}
protected function cacheAuthenticatedUser(ConnectionInterface $connection): void
{
if (!Auth::user()) {
return;
}
if (Auth::user()) {
/** @var \App\Models\User */ /** @var \App\Models\User */
$user = Auth::user(); $user = Auth::user();
$user->refresh(); $user->refresh();
cache()->forever( cache()->forever('ws_socket_auth_' . $connection->socketId, $user);
'ws_socket_auth_' . $connection->socketId,
$user,
);
$authed_users = cache()->get('ws_socket_authed_users') ?? []; $authed_users = cache()->get('ws_socket_authed_users') ?? [];
$authed_users[$connection->socketId] = $user->id; $authed_users[$connection->socketId] = $user->id;
@ -538,7 +627,8 @@ class Handler implements MessageComponentInterface
); );
} }
// add next in loop logout protected function scheduleLogout(): void
{
$this->channelManager->loop->futureTick(function () { $this->channelManager->loop->futureTick(function () {
Auth::logout(); Auth::logout();
}); });
@ -551,17 +641,10 @@ class Handler implements MessageComponentInterface
$optional = false, $optional = false,
$iteration = false $iteration = false
) { ) {
$pid = explode('_', $pid . '')[0]; $pid = $this->preparePid($pid, $iteration);
if ($iteration >= 0 && $iteration !== false) {
$pid .= '_' . $iteration;
}
// Set timeout start
$pidcache_start = 'dedicated_start_' . $pid; $pidcache_start = 'dedicated_start_' . $pid;
cache()->put($pidcache_start, microtime(true), 100); cache()->put($pidcache_start, microtime(true), 100);
// Periodic check for data
$this->channelManager->loop->addPeriodicTimer(0.01, function ($timer) use ( $this->channelManager->loop->addPeriodicTimer(0.01, function ($timer) use (
$pidcache_start, $pidcache_start,
$message, $message,
@ -570,14 +653,74 @@ class Handler implements MessageComponentInterface
$optional, $optional,
$iteration $iteration
) { ) {
$this->checkDataLoopIteration(
$timer,
$pidcache_start,
$message,
$pid,
$connection,
$optional,
$iteration
);
pcntl_waitpid(-1, $status, WNOHANG);
});
}
protected function preparePid($pid, $iteration): string
{
$pid = explode('_', $pid . '')[0];
if ($iteration >= 0 && $iteration !== false) {
$pid .= '_' . $iteration;
}
return $pid;
}
protected function checkDataLoopIteration(
$timer,
string $pidcache_start,
array $message,
string $pid,
$connection,
bool $optional,
$iteration
): void {
$pidcache_data = 'dedicated_data_' . $pid; $pidcache_data = 'dedicated_data_' . $pid;
$pidcache_done = 'dedicated_data_' . $pid . '_done'; $pidcache_done = 'dedicated_data_' . $pid . '_done';
$pidcache_complete = 'dedicated_data_' . $pid . '_complete'; $pidcache_complete = 'dedicated_data_' . $pid . '_complete';
if ( if ($this->handleTimeout($timer, $pidcache_start, $pidcache_complete, $message, $connection, $optional)) {
cache()->has($pidcache_start) return;
&& ($diff = microtime(true) - ((int) cache()->get($pidcache_start))) > 60 }
) {
if (!cache()->has($pidcache_done)) {
return;
}
$this->scheduleNextIteration($connection, $message, $pid, $iteration);
$this->processAndSendData($connection, $pidcache_data);
$this->channelManager->loop->cancelTimer($timer);
}
protected function handleTimeout(
$timer,
string $pidcache_start,
string $pidcache_complete,
array $message,
$connection,
bool $optional
): bool {
if (!cache()->has($pidcache_start)) {
return false;
}
$diff = microtime(true) - ((int) cache()->get($pidcache_start));
if ($diff <= 60) {
return false;
}
if (!$optional) { if (!$optional) {
$connection->send(json_encode([ $connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'] . ':error',
@ -590,32 +733,33 @@ class Handler implements MessageComponentInterface
$this->channelManager->loop->cancelTimer($timer); $this->channelManager->loop->cancelTimer($timer);
cache()->put($pidcache_complete, true, 360); cache()->put($pidcache_complete, true, 360);
return true;
} }
if (cache()->has($pidcache_done)) { protected function scheduleNextIteration($connection, array $message, string $pid, $iteration): void
// call self with pid + '_0' and optional {
if ($iteration === false) { $nextIteration = ($iteration === false) ? 0 : $iteration + 1;
$this->addDataCheckLoop($connection, $message, $pid, true, 0); $this->addDataCheckLoop($connection, $message, $pid, true, $nextIteration);
} else {
$this->addDataCheckLoop($connection, $message, $pid, true, $iteration + 1);
} }
// Retrieve cached data protected function processAndSendData($connection, string $pidcache_data): void
$sending = @cache()->get($pidcache_data); {
$sending = cache()->get($pidcache_data);
$bm = json_decode($sending, true); $bm = json_decode($sending, true);
if (isset($bm['broadcast']) && $bm['broadcast']) {
// Send the data to client
if (@$bm['broadcast']) {
$this->broadcast( $this->broadcast(
$connection->app->id, $connection->app->id,
$bm['data'] ?? null, $bm['data'] ?? null,
$bm['event'] ?? null, $bm['event'] ?? null,
$bm['channel'] ?? null, $bm['channel'] ?? null,
$bm['including_self'], $bm['including_self'] ?? false,
$connection $connection
); );
} elseif (@$bm['whisper']) { return;
}
if (isset($bm['whisper']) && $bm['whisper']) {
$this->whisper( $this->whisper(
$connection->app->id, $connection->app->id,
$bm['data'] ?? null, $bm['data'] ?? null,
@ -623,19 +767,12 @@ class Handler implements MessageComponentInterface
$bm['socket_ids'] ?? [], $bm['socket_ids'] ?? [],
$bm['channel'] ?? null, $bm['channel'] ?? null,
); );
} else { return;
}
$connection->send($sending); $connection->send($sending);
} }
// Stop periodic check
$this->channelManager->loop->cancelTimer($timer);
}
// Prevent zombie processes
pcntl_waitpid(-1, $status, WNOHANG);
});
}
public function broadcast( public function broadcast(
string $appId, string $appId,
mixed $payload, mixed $payload,
@ -679,8 +816,9 @@ class Handler implements MessageComponentInterface
'channel' => $channel->getName(), 'channel' => $channel->getName(),
]; ];
$socketIdLookup = array_flip($socketIds);
foreach ($channel->getConnections() as $channel_conection) { foreach ($channel->getConnections() as $channel_conection) {
if (in_array($channel_conection->socketId, $socketIds)) { if (isset($socketIdLookup[$channel_conection->socketId])) {
$channel_conection->send(json_encode($p)); $channel_conection->send(json_encode($p));
} }
} }