diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 0db8768..75b1b7c 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -510,7 +510,6 @@ class StartServer extends Command $commonClasses = [ \BlaxSoftware\LaravelWebSockets\Websocket\Controller::class, \BlaxSoftware\LaravelWebSockets\Websocket\ControllerResolver::class, - \BlaxSoftware\LaravelWebSockets\Websocket\MockConnection::class, \BlaxSoftware\LaravelWebSockets\Websocket\MockConnectionSocketPair::class, \BlaxSoftware\LaravelWebSockets\Ipc\SocketPairIpc::class, \BlaxSoftware\LaravelWebSockets\Cache\IpcCache::class, diff --git a/src/Websocket/Controller.php b/src/Websocket/Controller.php index 5ec87dc..ffb86f8 100644 --- a/src/Websocket/Controller.php +++ b/src/Websocket/Controller.php @@ -15,7 +15,7 @@ use Illuminate\Support\Facades\Log; class Controller { protected bool $isMockConnection; - protected MockConnection|MockConnectionSocketPair|null $mockConnectionClone = null; + protected MockConnectionSocketPair|null $mockConnectionClone = null; final public function __construct( protected ConnectionInterface $connection, @@ -25,8 +25,7 @@ class Controller ) { // Cache class check to avoid repeated get_class() calls (reflection is slow) $connectionClass = get_class($connection); - $this->isMockConnection = $connectionClass === MockConnection::class - || $connectionClass === MockConnectionSocketPair::class; + $this->isMockConnection = $connectionClass === MockConnectionSocketPair::class; // Pre-clone MockConnection once if needed (reuse across method calls) if ($this->isMockConnection) { diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index 7c16850..ae7d81c 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -5,7 +5,6 @@ declare(strict_types=1); namespace BlaxSoftware\LaravelWebSockets\Websocket; use BlaxSoftware\LaravelWebSockets\Apps\App; -use BlaxSoftware\LaravelWebSockets\Cache\IpcCache; use BlaxSoftware\LaravelWebSockets\Channels\Channel; use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel; use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel; @@ -31,12 +30,6 @@ use Ratchet\WebSocket\MessageComponentInterface; class Handler implements MessageComponentInterface { - /** - * Whether to use event-driven socket pair IPC (true) or polling (false) - * Socket pairs are instant but require sockets extension - */ - private bool $useSocketPairIpc; - /** * Track channel connections using associative arrays for O(1) lookup * Structure: [channel_name => [socket_id => true]] @@ -73,10 +66,7 @@ class Handler implements MessageComponentInterface */ public function __construct( protected ChannelManager $channelManager - ) { - // Use socket pair IPC if available (instant), otherwise fall back to polling - $this->useSocketPairIpc = SocketPairIpc::isSupported(); - } + ) {} /** * Handle incoming WebSocket message with optimized fast path for ping/pong @@ -194,7 +184,7 @@ class Handler implements MessageComponentInterface return; } - $this->forkAndProcessMessage($connection, $channel, $messageArray); + $this->forkWithSocketPair($connection, $channel, $messageArray); } /** @@ -354,18 +344,6 @@ class Handler implements MessageComponentInterface return true; } - protected function forkAndProcessMessage( - ConnectionInterface $connection, - Channel $channel, - array $message - ): void { - if ($this->useSocketPairIpc) { - $this->forkWithSocketPair($connection, $channel, $message); - } else { - $this->forkWithPolling($connection, $channel, $message); - } - } - /** * Check if hot reload mode is enabled */ @@ -606,73 +584,6 @@ class Handler implements MessageComponentInterface } } - /** - * Fork with polling-based IPC (fallback when socket pairs unavailable) - */ - protected function forkWithPolling( - ConnectionInterface $connection, - Channel $channel, - array $message - ): void { - // Generate unique request ID BEFORE forking to avoid race conditions - $requestId = uniqid('req_', true) . '_' . bin2hex(random_bytes(4)); - - $pid = pcntl_fork(); - - if ($pid === -1) { - Log::error('Fork error'); - return; - } - - if ($pid === 0) { - $this->processMessageInChild($connection, $channel, $message, $requestId); - exit(0); - } - - $this->addDataCheckLoop($connection, $message, $requestId); - } - - protected function processMessageInChild( - ConnectionInterface $connection, - Channel $channel, - array $message, - string $requestId - ): void { - // Hot reload: clear all caches for fresh code loading (only in dev mode) - $this->hotReloadChild(); - - try { - // Lazy DB reconnect: disconnect now, reconnect only when first query runs - // This saves ~5-15ms for methods that don't use the database - DB::disconnect(); - - $this->setRequest($message, $connection); - $mock = new MockConnection($connection, $requestId); - - Controller::controll_message( - $mock, - $channel, - $message, - $this->channelManager - ); - - \Illuminate\Container\Container::getInstance() - ->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class) - ->invokeWhen(fn($callback) => true); - } catch (Exception $e) { - $mock->send(json_encode([ - 'event' => $message['event'] . ':error', - 'data' => [ - 'message' => $e->getMessage(), - ], - ])); - - if (app()->bound('sentry')) { - app('sentry')->captureException($e); - } - } - } - protected function handleMessageError(\Throwable $e): void { Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [ @@ -1136,172 +1047,6 @@ class Handler implements MessageComponentInterface $this->cacheBufferScheduled = false; } - /** - * IPC polling interval in seconds. - * Lower = faster response, higher CPU usage. - * 0.001 = 1ms, 0.002 = 2ms, 0.01 = 10ms - */ - private const IPC_POLL_INTERVAL = 0.002; // 2ms - good balance of speed and CPU - - private function addDataCheckLoop( - $connection, - $message, - string $requestId, - $optional = false, - int $iteration = 0 - ) { - $iterationKey = $requestId . ($iteration > 0 ? '_' . $iteration : ''); - $cacheKeyStart = 'dedicated_start_' . $iterationKey; - IpcCache::put($cacheKeyStart, microtime(true), 100); - - $this->channelManager->loop->addPeriodicTimer(self::IPC_POLL_INTERVAL, function ($timer) use ( - $cacheKeyStart, - $iterationKey, - $message, - $requestId, - $connection, - $optional, - $iteration - ) { - $this->checkDataLoopIteration( - $timer, - $cacheKeyStart, - $message, - $iterationKey, - $requestId, - $connection, - $optional, - $iteration - ); - - pcntl_waitpid(-1, $status, WNOHANG); - }); - } - - protected function checkDataLoopIteration( - $timer, - string $cacheKeyStart, - array $message, - string $iterationKey, - string $requestId, - $connection, - bool $optional, - int $iteration - ): void { - $cacheKeyData = 'dedicated_data_' . $iterationKey; - $cacheKeyDone = 'dedicated_data_' . $iterationKey . '_done'; - $cacheKeyComplete = 'dedicated_data_' . $iterationKey . '_complete'; - - if ($this->handleTimeout($timer, $cacheKeyStart, $cacheKeyComplete, $message, $connection, $optional)) { - return; - } - - if (!IpcCache::has($cacheKeyDone)) { - return; - } - - // Clean up cache entries for this iteration before processing - // This prevents memory leaks and stale data issues - $this->cleanupIterationCache($iterationKey); - - $this->scheduleNextIteration($connection, $message, $requestId, $iteration); - $this->processAndSendData($connection, $cacheKeyData); - $this->channelManager->loop->cancelTimer($timer); - } - - /** - * Clean up cache entries for a completed iteration - */ - protected function cleanupIterationCache(string $iterationKey): void - { - $keysToDelete = [ - 'dedicated_start_' . $iterationKey, - 'dedicated_data_' . $iterationKey . '_done', - // Note: We don't delete 'dedicated_data_' here as we need it for processAndSendData - // It will expire naturally after 60 seconds - ]; - - IpcCache::forgetMultiple($keysToDelete); - } - - protected function handleTimeout( - $timer, - string $cacheKeyStart, - string $cacheKeyComplete, - array $message, - $connection, - bool $optional - ): bool { - $startTime = IpcCache::get($cacheKeyStart); - if ($startTime === null) { - return false; - } - - $diff = microtime(true) - ((float) $startTime); - if ($diff <= 60) { - return false; - } - - if (!$optional) { - $connection->send(json_encode([ - 'event' => $message['event'] . ':error', - 'data' => [ - 'message' => $message['event'] . ' timeout', - 'diff' => $diff, - ], - ])); - } - - $this->channelManager->loop->cancelTimer($timer); - IpcCache::put($cacheKeyComplete, true, 360); - return true; - } - - protected function scheduleNextIteration($connection, array $message, string $requestId, int $iteration): void - { - $nextIteration = $iteration + 1; - $this->addDataCheckLoop($connection, $message, $requestId, true, $nextIteration); - } - - protected function processAndSendData($connection, string $cacheKeyData): void - { - $sending = IpcCache::get($cacheKeyData); - - // Clean up the data cache key immediately after reading - IpcCache::forget($cacheKeyData); - - if (!$sending) { - return; - } - - $bm = json_decode($sending, true); - - if (isset($bm['broadcast']) && $bm['broadcast']) { - $this->broadcast( - $connection->app->id, - $bm['data'] ?? null, - $bm['event'] ?? null, - $bm['channel'] ?? null, - $bm['including_self'] ?? false, - $connection - ); - return; - } - - if (isset($bm['whisper']) && $bm['whisper']) { - $this->whisper( - $connection->app->id, - $bm['data'] ?? null, - $bm['event'] ?? null, - $bm['socket_ids'] ?? [], - $bm['channel'] ?? null, - ); - return; - } - - $connection->send($sending); - } - public function broadcast( string $appId, mixed $payload,