diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index 8f3c0e3..4a8a35d 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -28,6 +28,10 @@ use Ratchet\WebSocket\MessageComponentInterface; class Handler implements MessageComponentInterface { + /** + * Track channel connections using associative arrays for O(1) lookup + * Structure: [channel_name => [socket_id => true]] + */ protected $channel_connections = []; /** @@ -41,36 +45,18 @@ class Handler implements MessageComponentInterface public function onOpen(ConnectionInterface $connection) { + if (! $this->connectionCanBeMade($connection)) { + return $connection->close(); + } + try { - if (! $this->connectionCanBeMade($connection)) { - return $connection->close(); - } - - // Set IP to connection - $connection->remoteAddress = trim( - explode( - ',', - $connection->httpRequest->getHeaderLine('X-Forwarded-For') - )[0] ?? $connection->remoteAddress - ); - request()->server->set('REMOTE_ADDR', $connection->remoteAddress); - Log::channel('websocket')->info('WS onOpen IP: ' . $connection->remoteAddress); - + $this->setupConnectionAddress($connection); $this->verifyAppKey($connection); $this->verifyOrigin($connection); $this->limitConcurrentConnections($connection); $this->generateSocketId($connection); $this->establishConnection($connection); - - if (isset($connection->app)) { - $this->channelManager->subscribeToApp($connection->app->id); - $this->channelManager->connectionPonged($connection); - - NewConnection::dispatch( - $connection->app->id, - $connection->socketId - ); - } + $this->initializeAppConnection($connection); } catch (UnknownAppKey $e) { Log::channel('websocket')->error('Root level error: ' . $e->getMessage(), [ 'file' => $e->getFile(), @@ -84,11 +70,11 @@ class Handler implements MessageComponentInterface ConnectionInterface $connection, MessageInterface $message ) { - try { - if (! isset($connection->app)) { - return; - } + if (!isset($connection->app)) { + return; + } + try { request()->server->set('REMOTE_ADDR', $connection->remoteAddress); PusherMessageFactory::createForMessage( @@ -97,108 +83,28 @@ class Handler implements MessageComponentInterface $this->channelManager )->respond(); - // Payload json to array - $message = json_decode($message->getPayload(), true); + $message = json_decode($message->getPayload(), true, 512, JSON_THROW_ON_ERROR); - // Cut short for ping pong - if ( - (strtolower($message['event']) === 'pusher:ping') - || (strtolower($message['event']) === 'pusher.ping') - ) { - $this->channelManager->connectionPonged($connection); - return gc_collect_cycles(); + if ($this->handlePingPong($message, $connection)) { + return; } $channel = $this->handleChannelSubscriptions($message, $connection); - if (! optional($channel)->hasConnection($connection) && !( - $message['event'] !== 'pusher:unsubscribe' - && $message['event'] !== 'pusher.unsubscribe' - )) { - return $connection->send(json_encode([ - 'event' => $message['event'] . ':error', - 'data' => [ - 'message' => 'Subscription not established', - 'meta' => $message, - ], - ])); - } - - if (!$channel) { - return $connection->send(json_encode([ - 'event' => $message['event'] . ':error', - 'data' => [ - 'message' => 'Channel not found', - 'meta' => $message, - ], - ])); + if ($this->shouldRejectMessage($channel, $connection, $message)) { + return; } $this->authenticateConnection($connection, $channel, $message); - \Log::channel('websocket')->info('[' . $connection->socketId . ']@' . $channel->getName() . ' | ' . json_encode($message)); - if (strpos($message['event'], 'pusher') !== false) { - return $connection->send(json_encode([ - 'event' => $message['event'] . ':response', - 'data' => [ - 'message' => 'Success', - ], - ])); + if ($this->handlePusherEvent($message, $connection)) { + return; } - $pid = pcntl_fork(); - - if ($pid == -1) { - Log::error('Fork error'); - } elseif ($pid == 0) { - try { - DB::disconnect(); - DB::reconnect(); - - $this->setRequest($message, $connection); - $mock = new MockConnection($connection); - - Controller::controll_message( - $mock, - $channel, - $message, - $this->channelManager - ); - - // Run deferred callbacks - \Illuminate\Container\Container::getInstance() - ->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class) - ->invokeWhen(fn($callback) => true); - } catch (Exception $e) { - $mock->send(json_encode([ - 'event' => $message['event'] . ':error', - 'data' => [ - 'message' => $e->getMessage(), - ], - ])); - - // if sentry is defined capture exception - if (app()->bound('sentry')) { - app('sentry')->captureException($e); - } - } - - exit(0); - } else { - $this->addDataCheckLoop($connection, $message, $pid); - } + $this->forkAndProcessMessage($connection, $channel, $message); } catch (\Throwable $e) { - Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [ - 'file' => $e->getFile(), - 'line' => $e->getLine(), - 'trace' => $e->getTraceAsString(), - ]); - - // if sentry is defined capture exception - if (app()->bound('sentry')) { - app('sentry')->captureException($e); - } + $this->handleMessageError($e); } } @@ -209,57 +115,214 @@ class Handler implements MessageComponentInterface { $this->authenticateConnection($connection, null); - if (@$connection?->remoteAddress) { + if (isset($connection->remoteAddress)) { request()->server->set('REMOTE_ADDR', $connection->remoteAddress); } - // remove connection from $channel_connections - foreach ($this->channel_connections as $channel => $connections) { - if (in_array($connection->socketId, $connections)) { - $this->channel_connections[$channel] = array_diff($connections, [$connection->socketId]); - } + $this->cleanupChannelConnections($connection); + $this->finalizeConnectionClose($connection); + } - if (empty(@$this->channel_connections[$channel])) { - unset($this->channel_connections[$channel]); - } - cache()->forget( - 'ws_socket_auth_' . $connection->socketId, - ); + protected function setupConnectionAddress(ConnectionInterface $connection): void + { + $connection->remoteAddress = trim( + explode( + ',', + $connection->httpRequest->getHeaderLine('X-Forwarded-For') + )[0] ?? $connection->remoteAddress + ); + request()->server->set('REMOTE_ADDR', $connection->remoteAddress); + Log::channel('websocket')->info('WS onOpen IP: ' . $connection->remoteAddress); + } - if (@$this->channel_connections[$channel]) { - cache()->forever( - 'ws_channel_connections_' . $channel, - @$this->channel_connections[$channel] - ); - } else { - cache()->forget('ws_channel_connections_' . $channel); - } - - cache()->forever( - 'ws_active_channels', - array_keys($this->channel_connections) - ); - - $authed_users = cache()->get('ws_socket_authed_users') ?? []; - unset($authed_users[$connection->socketId]); - cache()->forever('ws_socket_authed_users', $authed_users); - - \BlaxSoftware\LaravelWebSockets\Services\WebsocketService::clearUserAuthed( - $connection->socketId - ); + protected function initializeAppConnection(ConnectionInterface $connection): void + { + if (!isset($connection->app)) { + return; } + $this->channelManager->subscribeToApp($connection->app->id); + $this->channelManager->connectionPonged($connection); + + NewConnection::dispatch( + $connection->app->id, + $connection->socketId + ); + } + + protected function handlePingPong(array $message, ConnectionInterface $connection): bool + { + $eventLower = strtolower($message['event']); + if ($eventLower !== 'pusher:ping' && $eventLower !== 'pusher.ping') { + return false; + } + + $this->channelManager->connectionPonged($connection); + gc_collect_cycles(); + return true; + } + + protected function shouldRejectMessage(?Channel $channel, ConnectionInterface $connection, array $message): bool + { + $isUnsubscribe = $message['event'] === 'pusher:unsubscribe' || $message['event'] === 'pusher.unsubscribe'; + + if (!$channel?->hasConnection($connection) && !$isUnsubscribe) { + $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'Subscription not established', + 'meta' => $message, + ], + ])); + return true; + } + + if (!$channel) { + $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'Channel not found', + 'meta' => $message, + ], + ])); + return true; + } + + return false; + } + + protected function handlePusherEvent(array $message, ConnectionInterface $connection): bool + { + if (!str_contains($message['event'], 'pusher')) { + return false; + } + + $connection->send(json_encode([ + 'event' => $message['event'] . ':response', + 'data' => [ + 'message' => 'Success', + ], + ])); + return true; + } + + protected function forkAndProcessMessage( + ConnectionInterface $connection, + Channel $channel, + array $message + ): void { + $pid = pcntl_fork(); + + if ($pid === -1) { + Log::error('Fork error'); + return; + } + + if ($pid === 0) { + $this->processMessageInChild($connection, $channel, $message); + exit(0); + } + + $this->addDataCheckLoop($connection, $message, $pid); + } + + protected function processMessageInChild( + ConnectionInterface $connection, + Channel $channel, + array $message + ): void { + try { + DB::disconnect(); + DB::reconnect(); + + $this->setRequest($message, $connection); + $mock = new MockConnection($connection); + + Controller::controll_message( + $mock, + $channel, + $message, + $this->channelManager + ); + + \Illuminate\Container\Container::getInstance() + ->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class) + ->invokeWhen(fn($callback) => true); + } catch (Exception $e) { + $mock->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => $e->getMessage(), + ], + ])); + + if (app()->bound('sentry')) { + app('sentry')->captureException($e); + } + } + } + + protected function handleMessageError(\Throwable $e): void + { + Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'trace' => $e->getTraceAsString(), + ]); + + if (app()->bound('sentry')) { + app('sentry')->captureException($e); + } + } + + protected function cleanupChannelConnections(ConnectionInterface $connection): void + { + $cacheUpdates = []; + $cacheDeletes = ['ws_socket_auth_' . $connection->socketId]; + + foreach ($this->channel_connections as $channel => $connections) { + if (!isset($connections[$connection->socketId])) { + continue; + } + + unset($this->channel_connections[$channel][$connection->socketId]); + + if (empty($this->channel_connections[$channel])) { + unset($this->channel_connections[$channel]); + $cacheDeletes[] = 'ws_channel_connections_' . $channel; + continue; + } + + $cacheUpdates['ws_channel_connections_' . $channel] = array_keys($this->channel_connections[$channel]); + } + + $cacheUpdates['ws_active_channels'] = array_keys($this->channel_connections); + + $authed_users = cache()->get('ws_socket_authed_users') ?? []; + unset($authed_users[$connection->socketId]); + $cacheUpdates['ws_socket_authed_users'] = $authed_users; + + cache()->setMultiple($cacheUpdates); + cache()->deleteMultiple($cacheDeletes); + + \BlaxSoftware\LaravelWebSockets\Services\WebsocketService::clearUserAuthed( + $connection->socketId + ); + } + + protected function finalizeConnectionClose(ConnectionInterface $connection): void + { $this->channelManager ->unsubscribeFromAllChannels($connection) ->then(function (bool $unsubscribed) use ($connection): void { - if (isset($connection->app)) { - $this->channelManager->unsubscribeFromApp($connection->app->id); - - ConnectionClosed::dispatch($connection->app->id, $connection->socketId); - - cache()->forget('ws_connection_' . $connection->socketId); + if (!isset($connection->app)) { + return; } + + $this->channelManager->unsubscribeFromApp($connection->app->id); + ConnectionClosed::dispatch($connection->app->id, $connection->socketId); + cache()->forget('ws_connection_' . $connection->socketId); }); } @@ -389,7 +452,7 @@ class Handler implements MessageComponentInterface protected function get_connection_channel(&$connection, &$message): ?Channel { // Put channel on its place - if (! @$message['channel'] && $message['data'] && $message['data']['channel']) { + if (! isset($message['channel']) && isset($message['data']['channel'])) { $message['channel'] = $message['data']['channel']; unset($message['data']['channel']); } @@ -408,81 +471,85 @@ class Handler implements MessageComponentInterface protected function handleChannelSubscriptions($message, $connection): ?Channel { $channel = $this->get_connection_channel($connection, $message); - $channel_name = optional($channel)->getName(); - $socket_id = $connection->socketId; + $channel_name = $channel?->getName(); - if (! $channel_name || ! $channel) { + if (!$channel_name || !$channel) { return null; } - // if not in $channel_connections add it - if ( - (strtolower($message['event']) === 'pusher.subscribe') - || (strtolower($message['event']) === 'pusher:subscribe') - ) { - if (! isset($this->channel_connections[$channel_name])) { - $this->channel_connections[$channel_name] = []; - } + $eventLower = strtolower($message['event']); - if (! in_array($connection->socketId, $this->channel_connections[$this->get_connection_channel($connection, $message)->getName()])) { - $this->channel_connections[$channel_name][] = $connection->socketId; - } - - cache()->forever( - 'ws_channel_connections_' . $channel_name, - $this->channel_connections[$channel_name] - ); - - cache()->forever( - 'ws_active_channels', - array_keys($this->channel_connections) - ); - - if (! $channel->hasConnection($connection)) { - try { - $channel->subscribe($connection, (object) $message); - } catch (\Throwable $e) { - return null; - } - } + if ($eventLower === 'pusher.subscribe' || $eventLower === 'pusher:subscribe') { + $this->handleSubscription($channel, $channel_name, $connection, $message); } - if (strpos($message['event'], '.unsubscribe') !== false) { - if (isset($this->channel_connections[$channel_name])) { - $this->channel_connections[$channel_name] = array_diff($this->channel_connections[$channel_name], [$socket_id]); - } - - if (empty($this->channel_connections[$channel_name])) { - unset($this->channel_connections[$channel_name]); - } - - if (@$this->channel_connections[$channel_name]) { - cache()->forever( - 'ws_channel_connections_' . $channel_name, - $this->channel_connections[$channel_name] - ); - } else { - cache()->forget('ws_channel_connections_' . $channel_name); - } - - cache()->forever( - 'ws_active_channels', - array_keys($this->channel_connections) - ); - - $channel->unsubscribe($connection); + if (str_contains($message['event'], '.unsubscribe')) { + $this->handleUnsubscription($channel, $channel_name, $connection); } return $channel; } + protected function handleSubscription( + Channel $channel, + string $channel_name, + ConnectionInterface $connection, + array $message + ): void { + if (!isset($this->channel_connections[$channel_name])) { + $this->channel_connections[$channel_name] = []; + } + + if (!isset($this->channel_connections[$channel_name][$connection->socketId])) { + $this->channel_connections[$channel_name][$connection->socketId] = true; + } + + cache()->setMultiple([ + 'ws_channel_connections_' . $channel_name => array_keys($this->channel_connections[$channel_name]), + 'ws_active_channels' => array_keys($this->channel_connections) + ]); + + if ($channel->hasConnection($connection)) { + return; + } + + try { + $channel->subscribe($connection, (object) $message); + } catch (\Throwable $e) { + // Silently handle subscription errors + } + } + + protected function handleUnsubscription( + Channel $channel, + string $channel_name, + ConnectionInterface $connection + ): void { + if (isset($this->channel_connections[$channel_name][$connection->socketId])) { + unset($this->channel_connections[$channel_name][$connection->socketId]); + } + + if (empty($this->channel_connections[$channel_name])) { + unset($this->channel_connections[$channel_name]); + cache()->forget('ws_channel_connections_' . $channel_name); + cache()->forever('ws_active_channels', array_keys($this->channel_connections)); + } else { + cache()->setMultiple([ + 'ws_channel_connections_' . $channel_name => array_keys($this->channel_connections[$channel_name]), + 'ws_active_channels' => array_keys($this->channel_connections) + ]); + } + + $channel->unsubscribe($connection); + } + protected function setRequest($message, $connection) { foreach (request()->keys() as $key) { request()->offsetUnset($key); } - request()->merge(@$message['data'] ?? []); + request()->merge($message['data'] ?? []); } protected function authenticateConnection( @@ -490,55 +557,78 @@ class Handler implements MessageComponentInterface PrivateChannel|Channel|PresenceChannel|null $channel, $message = [] ) { + $this->loadCachedAuth($connection, $channel); + $this->ensureUserIsSet($connection, $channel); + $this->updateAuthState($connection); + $this->cacheAuthenticatedUser($connection); + $this->scheduleLogout(); + } - if ( - !optional($connection)->auth - && $connection->socketId - && ($cached_auth = cache()->get('socket_' . $connection->socketId)) - && @$cached_auth['type'] - ) { - $connection->user = @$cached_auth['type']::find($cached_auth['id']); - - if ($channel) { - $channel->saveConnection($connection); - } + protected function loadCachedAuth(ConnectionInterface $connection, $channel): void + { + if (isset($connection->auth)) { + return; } - // Update last online of user if user - if (! optional($connection)->user) { - $connection->user = false; - if ($channel) { - $channel->saveConnection($connection); - } + if (!$connection->socketId) { + return; } - // Set auth or logout - ($connection->user) + $cached_auth = cache()->get('socket_' . $connection->socketId); + if (!$cached_auth || !isset($cached_auth['type'])) { + return; + } + + $connection->user = $cached_auth['type']::find($cached_auth['id']); + + if ($channel) { + $channel->saveConnection($connection); + } + } + + protected function ensureUserIsSet(ConnectionInterface $connection, $channel): void + { + if (isset($connection->user) && $connection->user) { + return; + } + + $connection->user = false; + if ($channel) { + $channel->saveConnection($connection); + } + } + + protected function updateAuthState(ConnectionInterface $connection): void + { + $connection->user ? Auth::login($connection->user) : Auth::logout(); + } - if (Auth::user()) { - /** @var \App\Models\User */ - $user = Auth::user(); - $user->refresh(); - - cache()->forever( - 'ws_socket_auth_' . $connection->socketId, - $user, - ); - - - $authed_users = cache()->get('ws_socket_authed_users') ?? []; - $authed_users[$connection->socketId] = $user->id; - cache()->forever('ws_socket_authed_users', $authed_users); - - \BlaxSoftware\LaravelWebSockets\Services\WebsocketService::setUserAuthed( - $connection->socketId, - $user - ); + protected function cacheAuthenticatedUser(ConnectionInterface $connection): void + { + if (!Auth::user()) { + return; } - // add next in loop logout + /** @var \App\Models\User */ + $user = Auth::user(); + $user->refresh(); + + cache()->forever('ws_socket_auth_' . $connection->socketId, $user); + + $authed_users = cache()->get('ws_socket_authed_users') ?? []; + $authed_users[$connection->socketId] = $user->id; + cache()->forever('ws_socket_authed_users', $authed_users); + + \BlaxSoftware\LaravelWebSockets\Services\WebsocketService::setUserAuthed( + $connection->socketId, + $user + ); + } + + protected function scheduleLogout(): void + { $this->channelManager->loop->futureTick(function () { Auth::logout(); }); @@ -551,17 +641,10 @@ class Handler implements MessageComponentInterface $optional = false, $iteration = false ) { - $pid = explode('_', $pid . '')[0]; - - if ($iteration >= 0 && $iteration !== false) { - $pid .= '_' . $iteration; - } - - // Set timeout start + $pid = $this->preparePid($pid, $iteration); $pidcache_start = 'dedicated_start_' . $pid; cache()->put($pidcache_start, microtime(true), 100); - // Periodic check for data $this->channelManager->loop->addPeriodicTimer(0.01, function ($timer) use ( $pidcache_start, $message, @@ -570,72 +653,126 @@ class Handler implements MessageComponentInterface $optional, $iteration ) { - $pidcache_data = 'dedicated_data_' . $pid; - $pidcache_done = 'dedicated_data_' . $pid . '_done'; - $pidcache_complete = 'dedicated_data_' . $pid . '_complete'; + $this->checkDataLoopIteration( + $timer, + $pidcache_start, + $message, + $pid, + $connection, + $optional, + $iteration + ); - if ( - cache()->has($pidcache_start) - && ($diff = microtime(true) - ((int) cache()->get($pidcache_start))) > 60 - ) { - if (! $optional) { - $connection->send(json_encode([ - 'event' => $message['event'] . ':error', - 'data' => [ - 'message' => $message['event'] . ' timeout', - 'diff' => $diff, - ], - ])); - } - - $this->channelManager->loop->cancelTimer($timer); - cache()->put($pidcache_complete, true, 360); - } - - if (cache()->has($pidcache_done)) { - // call self with pid + '_0' and optional - if ($iteration === false) { - $this->addDataCheckLoop($connection, $message, $pid, true, 0); - } else { - $this->addDataCheckLoop($connection, $message, $pid, true, $iteration + 1); - } - - // Retrieve cached data - $sending = @cache()->get($pidcache_data); - $bm = json_decode($sending, true); - - - // Send the data to client - if (@$bm['broadcast']) { - $this->broadcast( - $connection->app->id, - $bm['data'] ?? null, - $bm['event'] ?? null, - $bm['channel'] ?? null, - $bm['including_self'], - $connection - ); - } elseif (@$bm['whisper']) { - $this->whisper( - $connection->app->id, - $bm['data'] ?? null, - $bm['event'] ?? null, - $bm['socket_ids'] ?? [], - $bm['channel'] ?? null, - ); - } else { - $connection->send($sending); - } - - // Stop periodic check - $this->channelManager->loop->cancelTimer($timer); - } - - // Prevent zombie processes pcntl_waitpid(-1, $status, WNOHANG); }); } + protected function preparePid($pid, $iteration): string + { + $pid = explode('_', $pid . '')[0]; + + if ($iteration >= 0 && $iteration !== false) { + $pid .= '_' . $iteration; + } + + return $pid; + } + + protected function checkDataLoopIteration( + $timer, + string $pidcache_start, + array $message, + string $pid, + $connection, + bool $optional, + $iteration + ): void { + $pidcache_data = 'dedicated_data_' . $pid; + $pidcache_done = 'dedicated_data_' . $pid . '_done'; + $pidcache_complete = 'dedicated_data_' . $pid . '_complete'; + + if ($this->handleTimeout($timer, $pidcache_start, $pidcache_complete, $message, $connection, $optional)) { + return; + } + + if (!cache()->has($pidcache_done)) { + return; + } + + $this->scheduleNextIteration($connection, $message, $pid, $iteration); + $this->processAndSendData($connection, $pidcache_data); + $this->channelManager->loop->cancelTimer($timer); + } + + protected function handleTimeout( + $timer, + string $pidcache_start, + string $pidcache_complete, + array $message, + $connection, + bool $optional + ): bool { + if (!cache()->has($pidcache_start)) { + return false; + } + + $diff = microtime(true) - ((int) cache()->get($pidcache_start)); + if ($diff <= 60) { + return false; + } + + if (!$optional) { + $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => $message['event'] . ' timeout', + 'diff' => $diff, + ], + ])); + } + + $this->channelManager->loop->cancelTimer($timer); + cache()->put($pidcache_complete, true, 360); + return true; + } + + protected function scheduleNextIteration($connection, array $message, string $pid, $iteration): void + { + $nextIteration = ($iteration === false) ? 0 : $iteration + 1; + $this->addDataCheckLoop($connection, $message, $pid, true, $nextIteration); + } + + protected function processAndSendData($connection, string $pidcache_data): void + { + $sending = cache()->get($pidcache_data); + $bm = json_decode($sending, true); + + if (isset($bm['broadcast']) && $bm['broadcast']) { + $this->broadcast( + $connection->app->id, + $bm['data'] ?? null, + $bm['event'] ?? null, + $bm['channel'] ?? null, + $bm['including_self'] ?? false, + $connection + ); + return; + } + + if (isset($bm['whisper']) && $bm['whisper']) { + $this->whisper( + $connection->app->id, + $bm['data'] ?? null, + $bm['event'] ?? null, + $bm['socket_ids'] ?? [], + $bm['channel'] ?? null, + ); + return; + } + + $connection->send($sending); + } + public function broadcast( string $appId, mixed $payload, @@ -679,8 +816,9 @@ class Handler implements MessageComponentInterface 'channel' => $channel->getName(), ]; + $socketIdLookup = array_flip($socketIds); foreach ($channel->getConnections() as $channel_conection) { - if (in_array($channel_conection->socketId, $socketIds)) { + if (isset($socketIdLookup[$channel_conection->socketId])) { $channel_conection->send(json_encode($p)); } }