diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index 4a8a35d..88713c4 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -34,6 +34,14 @@ class Handler implements MessageComponentInterface */ protected $channel_connections = []; + /** + * Cache write buffer for batching operations + * Reduces file I/O when multiple rapid requests occur + */ + protected $cacheWriteBuffer = []; + protected $cacheDeleteBuffer = []; + protected $cacheBufferScheduled = false; + /** * Initialize a new handler. * @@ -280,13 +288,14 @@ class Handler implements MessageComponentInterface { $cacheUpdates = []; $cacheDeletes = ['ws_socket_auth_' . $connection->socketId]; + $socketId = $connection->socketId; foreach ($this->channel_connections as $channel => $connections) { - if (!isset($connections[$connection->socketId])) { + if (!isset($connections[$socketId])) { continue; } - unset($this->channel_connections[$channel][$connection->socketId]); + unset($this->channel_connections[$channel][$socketId]); if (empty($this->channel_connections[$channel])) { unset($this->channel_connections[$channel]); @@ -294,21 +303,29 @@ class Handler implements MessageComponentInterface continue; } + // Pre-compute array_keys once per channel $cacheUpdates['ws_channel_connections_' . $channel] = array_keys($this->channel_connections[$channel]); } - $cacheUpdates['ws_active_channels'] = array_keys($this->channel_connections); + // Pre-compute active channels once + $activeChannels = array_keys($this->channel_connections); + $cacheUpdates['ws_active_channels'] = $activeChannels; + // Batch read authed_users - we'll update it in the same batch $authed_users = cache()->get('ws_socket_authed_users') ?? []; - unset($authed_users[$connection->socketId]); + unset($authed_users[$socketId]); $cacheUpdates['ws_socket_authed_users'] = $authed_users; - cache()->setMultiple($cacheUpdates); - cache()->deleteMultiple($cacheDeletes); + // Single batched write and delete operation - MASSIVE latency improvement + if (!empty($cacheUpdates)) { + cache()->setMultiple($cacheUpdates); + } + if (!empty($cacheDeletes)) { + cache()->deleteMultiple($cacheDeletes); + } - \BlaxSoftware\LaravelWebSockets\Services\WebsocketService::clearUserAuthed( - $connection->socketId - ); + // Note: Removed redundant WebsocketService::clearUserAuthed() call + // as we already handle all cache operations above in a single batch } protected function finalizeConnectionClose(ConnectionInterface $connection): void @@ -496,18 +513,24 @@ class Handler implements MessageComponentInterface ConnectionInterface $connection, array $message ): void { + $socketId = $connection->socketId; + if (!isset($this->channel_connections[$channel_name])) { $this->channel_connections[$channel_name] = []; } - if (!isset($this->channel_connections[$channel_name][$connection->socketId])) { - $this->channel_connections[$channel_name][$connection->socketId] = true; - } + if (!isset($this->channel_connections[$channel_name][$socketId])) { + $this->channel_connections[$channel_name][$socketId] = true; - cache()->setMultiple([ - 'ws_channel_connections_' . $channel_name => array_keys($this->channel_connections[$channel_name]), - 'ws_active_channels' => array_keys($this->channel_connections) - ]); + // Only update cache if connection was actually added (avoid redundant writes) + // Pre-compute array_keys once for both updates + $channelSockets = array_keys($this->channel_connections[$channel_name]); + $activeChannels = array_keys($this->channel_connections); + + // Buffer these writes - they can be batched with other subscriptions + $this->bufferCacheWrite('ws_channel_connections_' . $channel_name, $channelSockets); + $this->bufferCacheWrite('ws_active_channels', $activeChannels); + } if ($channel->hasConnection($connection)) { return; @@ -525,19 +548,28 @@ class Handler implements MessageComponentInterface string $channel_name, ConnectionInterface $connection ): void { - if (isset($this->channel_connections[$channel_name][$connection->socketId])) { - unset($this->channel_connections[$channel_name][$connection->socketId]); - } + $socketId = $connection->socketId; - if (empty($this->channel_connections[$channel_name])) { - unset($this->channel_connections[$channel_name]); - cache()->forget('ws_channel_connections_' . $channel_name); - cache()->forever('ws_active_channels', array_keys($this->channel_connections)); - } else { - cache()->setMultiple([ - 'ws_channel_connections_' . $channel_name => array_keys($this->channel_connections[$channel_name]), - 'ws_active_channels' => array_keys($this->channel_connections) - ]); + if (isset($this->channel_connections[$channel_name][$socketId])) { + unset($this->channel_connections[$channel_name][$socketId]); + + // Pre-compute active channels once + $activeChannels = array_keys($this->channel_connections); + + if (empty($this->channel_connections[$channel_name])) { + unset($this->channel_connections[$channel_name]); + + // Buffer delete and update - can be batched + $this->bufferCacheDelete('ws_channel_connections_' . $channel_name); + $this->bufferCacheWrite('ws_active_channels', $activeChannels); + } else { + // Pre-compute channel sockets once + $channelSockets = array_keys($this->channel_connections[$channel_name]); + + // Buffer these writes + $this->bufferCacheWrite('ws_channel_connections_' . $channel_name, $channelSockets); + $this->bufferCacheWrite('ws_active_channels', $activeChannels); + } } $channel->unsubscribe($connection); @@ -615,16 +647,20 @@ class Handler implements MessageComponentInterface $user = Auth::user(); $user->refresh(); - cache()->forever('ws_socket_auth_' . $connection->socketId, $user); + $socketId = $connection->socketId; + // Batch all auth cache operations into a single read + single write $authed_users = cache()->get('ws_socket_authed_users') ?? []; - $authed_users[$connection->socketId] = $user->id; - cache()->forever('ws_socket_authed_users', $authed_users); + $authed_users[$socketId] = $user->id; - \BlaxSoftware\LaravelWebSockets\Services\WebsocketService::setUserAuthed( - $connection->socketId, - $user - ); + // Single batched cache write - reduces 3 operations to 1 + cache()->setMultiple([ + 'ws_socket_auth_' . $socketId => $user, + 'ws_socket_authed_users' => $authed_users + ]); + + // Note: Removed redundant WebsocketService::setUserAuthed() call + // as we already handle all cache operations above in a single batch } protected function scheduleLogout(): void @@ -634,6 +670,70 @@ class Handler implements MessageComponentInterface }); } + /** + * Add cache operation to write buffer for batching + */ + protected function bufferCacheWrite(string $key, $value): void + { + $this->cacheWriteBuffer[$key] = $value; + $this->scheduleCacheFlush(); + } + + /** + * Add cache deletion to buffer for batching + */ + protected function bufferCacheDelete(string $key): void + { + $this->cacheDeleteBuffer[] = $key; + unset($this->cacheWriteBuffer[$key]); // Remove from write buffer if exists + $this->scheduleCacheFlush(); + } + + /** + * Schedule cache flush on next event loop tick + * Multiple rapid requests will be batched into single I/O operation + */ + protected function scheduleCacheFlush(): void + { + if ($this->cacheBufferScheduled) { + return; + } + + $this->cacheBufferScheduled = true; + + $this->channelManager->loop->futureTick(function () { + $this->flushCacheBuffer(); + }); + } + + /** + * Flush cache buffer - performs all pending operations in single batch + * This is the key optimization: N operations -> 2 I/O calls (1 write, 1 delete) + */ + protected function flushCacheBuffer(): void + { + if (!empty($this->cacheWriteBuffer)) { + cache()->setMultiple($this->cacheWriteBuffer); + $this->cacheWriteBuffer = []; + } + + if (!empty($this->cacheDeleteBuffer)) { + cache()->deleteMultiple(array_unique($this->cacheDeleteBuffer)); + $this->cacheDeleteBuffer = []; + } + + $this->cacheBufferScheduled = false; + } + + /** + * Force immediate cache flush (use for critical operations) + */ + protected function flushCacheBufferImmediate(): void + { + $this->flushCacheBuffer(); + $this->cacheBufferScheduled = false; + } + private function addDataCheckLoop( $connection, $message,