diff --git a/src/Cache/IpcCache.php b/src/Cache/IpcCache.php new file mode 100644 index 0000000..d1e4701 --- /dev/null +++ b/src/Cache/IpcCache.php @@ -0,0 +1,212 @@ + 0 && $expireAt < time()) { + @unlink($path); + return null; + } + + $data = substr($content, 10); + return $data === '' ? null : unserialize($data); + } + + /** + * Set a value in cache + * + * @param string $key + * @param mixed $value + * @param int $ttl Time to live in seconds (0 = no expiry) + */ + public static function put(string $key, mixed $value, int $ttl = 60): bool + { + $path = self::getPath($key); + $expireAt = $ttl > 0 ? time() + $ttl : 0; + + // Format: 10 bytes for expiry timestamp + serialized data + $content = sprintf('%010d', $expireAt) . serialize($value); + + // Atomic write: write to temp file then rename + $tempPath = $path . '.' . getmypid(); + if (@file_put_contents($tempPath, $content) === false) { + return false; + } + + return @rename($tempPath, $path); + } + + /** + * Delete a key from cache + */ + public static function forget(string $key): bool + { + $path = self::getPath($key); + if (file_exists($path)) { + return @unlink($path); + } + return true; + } + + /** + * Delete multiple keys from cache + */ + public static function forgetMultiple(array $keys): void + { + foreach ($keys as $key) { + self::forget($key); + } + } + + /** + * Set multiple values atomically + * + * @param array $values Key => Value pairs + * @param int $ttl Time to live in seconds + */ + public static function putMultiple(array $values, int $ttl = 60): void + { + foreach ($values as $key => $value) { + self::put($key, $value, $ttl); + } + } + + /** + * Clean up expired cache files (call periodically) + */ + public static function cleanup(): int + { + self::init(); + + $cleaned = 0; + $now = time(); + + $files = @scandir(self::$baseDir); + if ($files === false) { + return 0; + } + + foreach ($files as $file) { + if ($file === '.' || $file === '..') { + continue; + } + + $path = self::$baseDir . '/' . $file; + $content = @file_get_contents($path); + + if ($content === false) { + continue; + } + + $expireAt = (int) substr($content, 0, 10); + if ($expireAt > 0 && $expireAt < $now) { + @unlink($path); + $cleaned++; + } + } + + return $cleaned; + } + + /** + * Check if we're using RAM-backed storage + */ + public static function isTmpfs(): bool + { + self::init(); + return self::$isTmpfs ?? false; + } + + /** + * Reset (for testing) + */ + public static function reset(): void + { + self::$baseDir = null; + self::$isTmpfs = null; + } +} diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 223e822..5eb9cd7 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -424,17 +424,23 @@ class LocalChannelManager implements ChannelManager /** * Keep tracking the connections availability when they pong. + * Optimized: Uses unix timestamp directly instead of Carbon for performance. * * @param \Ratchet\ConnectionInterface $connection * @return PromiseInterface[bool] */ public function connectionPonged(ConnectionInterface $connection): PromiseInterface { - return $this->pongConnectionInChannels($connection); + // Direct timestamp update on connection object - no channel iteration needed + // The connection object is passed by reference, so this updates it everywhere + $connection->lastPongedAt = time(); + + return Helpers::createFulfilledPromise(true); } /** * Remove the obsolete connections that didn't ponged in a while. + * Optimized: Uses unix timestamp comparison instead of Carbon. * * @return PromiseInterface[bool] */ @@ -444,9 +450,16 @@ class LocalChannelManager implements ChannelManager return $this->getLocalConnections() ->then(function ($connections) { $promises = []; + $now = time(); foreach ($connections as $connection) { - $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); + // Handle both Carbon objects (legacy) and unix timestamps (optimized) + $lastPong = $connection->lastPongedAt ?? 0; + if (is_object($lastPong)) { + $differenceInSeconds = $lastPong->diffInSeconds(Carbon::now()); + } else { + $differenceInSeconds = $now - (int) $lastPong; + } if ($differenceInSeconds > 120) { $promises[] = $this->unsubscribeFromAllChannels($connection); @@ -462,23 +475,17 @@ class LocalChannelManager implements ChannelManager /** * Pong connection in channels. + * Optimized: No longer iterates through channels - timestamp is on connection object. * * @param ConnectionInterface $connection * @return PromiseInterface[bool] */ public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface { - return $this->getLocalChannels($connection->app->id) - ->then(function ($channels) use ($connection) { - foreach ($channels as $channel) { - if ($conn = $channel->getConnection($connection->socketId)) { - $conn->lastPongedAt = Carbon::now(); - $channel->saveConnection($conn); - } - } + // Simply update timestamp on the connection object directly + $connection->lastPongedAt = time(); - return true; - }); + return Helpers::createFulfilledPromise(true); } /** @@ -526,21 +533,23 @@ class LocalChannelManager implements ChannelManager /** * Get the channel class by the channel name. + * Optimized: Direct string comparison instead of Str::startsWith * * @param string $channelName * @return string */ protected function getChannelClassName(string $channelName): string { - if (Str::startsWith($channelName, 'private-')) { + // Direct strncmp is faster than Str::startsWith for fixed prefixes + if (strncmp($channelName, 'private-', 8) === 0) { return PrivateChannel::class; } - if (Str::startsWith($channelName, 'presence-')) { + if (strncmp($channelName, 'presence-', 9) === 0) { return PresenceChannel::class; } - if (Str::startsWith($channelName, 'openpresence-')) { + if (strncmp($channelName, 'openpresence-', 13) === 0) { return OpenPresenceChannel::class; } diff --git a/src/DashboardLogger.php b/src/DashboardLogger.php index 40ea586..26e0b7a 100644 --- a/src/DashboardLogger.php +++ b/src/DashboardLogger.php @@ -9,27 +9,18 @@ class DashboardLogger const LOG_CHANNEL_PREFIX = 'private-websockets-dashboard-'; const TYPE_DISCONNECTED = 'disconnected'; - const TYPE_CONNECTED = 'connected'; - const TYPE_SUBSCRIBED = 'subscribed'; - const TYPE_WS_MESSAGE = 'ws-message'; - const TYPE_API_MESSAGE = 'api-message'; - const TYPE_REPLICATOR_SUBSCRIBED = 'replicator-subscribed'; - const TYPE_REPLICATOR_UNSUBSCRIBED = 'replicator-unsubscribed'; - const TYPE_REPLICATOR_MESSAGE_RECEIVED = 'replicator-message-received'; /** * The list of all channels. - * - * @var array */ - public static $channels = [ + public static array $channels = [ self::TYPE_DISCONNECTED, self::TYPE_CONNECTED, self::TYPE_SUBSCRIBED, @@ -40,44 +31,77 @@ class DashboardLogger self::TYPE_REPLICATOR_MESSAGE_RECEIVED, ]; + /** + * Whether dashboard logging is enabled. + * Cached to avoid repeated config lookups. + */ + private static ?bool $enabled = null; + + /** + * Cached channel manager instance. + */ + private static ?ChannelManager $channelManager = null; + /** * Log an event for an app. + * Optimized: Early exit if disabled, cached config lookups. * * @param mixed $appId * @param string $type * @param array $details - * @return void */ - public static function log($appId, string $type, array $details = []) + public static function log($appId, string $type, array $details = []): void { - $channelManager = app(ChannelManager::class); + // Cache enabled check + if (self::$enabled === null) { + self::$enabled = config('websockets.dashboard.enabled', true); + } - $channelName = static::LOG_CHANNEL_PREFIX.$type; + // Skip if dashboard is disabled + if (!self::$enabled) { + return; + } - $payload = [ + // Cache channel manager + if (self::$channelManager === null) { + self::$channelManager = app(ChannelManager::class); + } + + $channelName = static::LOG_CHANNEL_PREFIX . $type; + + // Build payload - use date() instead of deprecated strftime() + $payload = (object) [ 'event' => 'log-message', 'channel' => $channelName, 'data' => [ 'type' => $type, - 'time' => strftime('%H:%M:%S'), + 'time' => date('H:i:s'), 'details' => $details, ], ]; - // Here you can use the ->find(), even if the channel - // does not exist on the server. If it does not exist, - // then the message simply will get broadcasted - // across the other servers. - $channel = $channelManager->find($appId, $channelName); - + // Check if channel exists locally and broadcast + $channel = self::$channelManager->find($appId, $channelName); if ($channel) { - $channel->broadcastLocally( - $appId, (object) $payload - ); + $channel->broadcastLocally($appId, $payload); } - $channelManager->broadcastAcrossServers( - $appId, null, $channelName, (object) $payload + // Always broadcast across servers (preserving original behavior) + // The channel manager handles the replication logic + self::$channelManager->broadcastAcrossServers( + $appId, + null, + $channelName, + $payload ); } + + /** + * Reset cached state (useful for testing) + */ + public static function reset(): void + { + self::$enabled = null; + self::$channelManager = null; + } } diff --git a/src/Helpers.php b/src/Helpers.php index e7e5939..e31d1bd 100644 --- a/src/Helpers.php +++ b/src/Helpers.php @@ -2,6 +2,7 @@ namespace BlaxSoftware\LaravelWebSockets; +use React\Promise\FulfilledPromise; use React\Promise\PromiseInterface; class Helpers @@ -13,38 +14,66 @@ class Helpers */ public static $loop = null; + /** + * Cached promise resolver class to avoid repeated config() calls + * + * @var string|null + */ + private static ?string $resolverClass = null; + /** * Transform the Redis' list of key after value * to key-value pairs. + * Optimized: Uses array_chunk instead of partition with modulo. * * @param array $list * @return array */ - public static function redisListToArray(array $list) + public static function redisListToArray(array $list): array { - // Redis lists come into a format where the keys are on even indexes - // and the values are on odd indexes. This way, we know which - // ones are keys and which ones are values and their get combined - // later to form the key => value array. - [$keys, $values] = collect($list)->partition(function ($value, $key) { - return $key % 2 === 0; - }); + if (empty($list)) { + return []; + } - return array_combine($keys->all(), $values->all()); + // Faster approach: chunk into pairs and combine + $result = []; + $count = count($list); + for ($i = 0; $i < $count; $i += 2) { + if (isset($list[$i + 1])) { + $result[$list[$i]] = $list[$i + 1]; + } + } + + return $result; } /** * Create a new fulfilled promise with a value. + * Optimized: Caches the resolver class to avoid repeated config() lookups. * * @param mixed $value * @return \React\Promise\PromiseInterface */ public static function createFulfilledPromise($value): PromiseInterface { - $resolver = config( - 'websockets.promise_resolver', \React\Promise\FulfilledPromise::class - ); + // Cache the resolver class on first call + if (self::$resolverClass === null) { + self::$resolverClass = config( + 'websockets.promise_resolver', + FulfilledPromise::class + ); + } - return new $resolver($value, static::$loop); + // PHP 8.0+ dynamic class instantiation + $class = self::$resolverClass; + return new $class($value, static::$loop); + } + + /** + * Reset the cached resolver class (useful for testing) + */ + public static function resetResolverCache(): void + { + self::$resolverClass = null; } } diff --git a/src/Server/Messages/PusherChannelProtocolMessage.php b/src/Server/Messages/PusherChannelProtocolMessage.php index 5f4c565..f126e43 100644 --- a/src/Server/Messages/PusherChannelProtocolMessage.php +++ b/src/Server/Messages/PusherChannelProtocolMessage.php @@ -3,67 +3,87 @@ namespace BlaxSoftware\LaravelWebSockets\Server\Messages; use BlaxSoftware\LaravelWebSockets\Events\ConnectionPonged; -use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use stdClass; class PusherChannelProtocolMessage extends PusherClientMessage { /** - * Respond with the payload. - * - * @return void + * Pre-encoded pong response for performance */ - public function respond() - { - $eventName = Str::camel(Str::after($this->payload->event, ':')); + private const PONG_RESPONSE = '{"event":"pusher.pong"}'; - if (method_exists($this, $eventName) && $eventName !== 'respond') { - call_user_func([$this, $eventName], $this->connection, $this->payload->data ?? new stdClass()); + /** + * Respond with the payload. + * Optimized: Uses direct method dispatch instead of reflection. + */ + public function respond(): void + { + $event = $this->payload->event ?? ''; + + // Fast path for ping - most common pusher protocol message + if ($event === 'pusher:ping' || $event === 'pusher.ping') { + $this->pingFast($this->connection); + return; + } + + // Extract method name from event (e.g., 'pusher:subscribe' -> 'subscribe') + $colonPos = strpos($event, ':'); + if ($colonPos !== false) { + $eventName = substr($event, $colonPos + 1); + } else { + $dotPos = strpos($event, '.'); + $eventName = $dotPos !== false ? substr($event, $dotPos + 1) : ''; + } + + // Convert to camelCase if needed (e.g., 'channel-name' -> 'channelName') + if (strpos($eventName, '-') !== false) { + $eventName = lcfirst(str_replace('-', '', ucwords($eventName, '-'))); + } + + if ($eventName && $eventName !== 'respond' && method_exists($this, $eventName)) { + $this->$eventName($this->connection, $this->payload->data ?? new stdClass()); } } /** - * Ping the connection. - * - * @see https://pusher.com/docs/pusher_protocol#ping-pong - * - * @param \Ratchet\ConnectionInterface $connection - * @return void + * Fast ping handler - avoids promise chain and event dispatch */ - protected function ping(ConnectionInterface $connection) + protected function pingFast(ConnectionInterface $connection): void { - $this->channelManager - ->connectionPonged($connection) - ->then(function () use ($connection) { - $connection->send(json_encode(['event' => 'pusher.pong'])); + // Update timestamp directly on connection (no promise chain) + $connection->lastPongedAt = time(); - ConnectionPonged::dispatch($connection->app->id, $connection->socketId); - }); + // Send pre-encoded response (no json_encode overhead) + $connection->send(self::PONG_RESPONSE); + + // Skip event dispatch for ping - it's high frequency and events are expensive + // If you need ping events, use: ConnectionPonged::dispatch($connection->app->id, $connection->socketId); + } + + /** + * Legacy ping handler - kept for compatibility + * @deprecated Use pingFast instead + */ + protected function ping(ConnectionInterface $connection): void + { + $this->pingFast($connection); } /** * Subscribe to channel. * * @see https://pusher.com/docs/pusher_protocol#pusher-subscribe - * - * @param \Ratchet\ConnectionInterface $connection - * @param \stdClass $payload - * @return void */ - protected function subscribe(ConnectionInterface $connection, stdClass $payload) + protected function subscribe(ConnectionInterface $connection, stdClass $payload): void { $this->channelManager->subscribeToChannel($connection, $payload->channel, $payload); } /** * Unsubscribe from the channel. - * - * @param \Ratchet\ConnectionInterface $connection - * @param \stdClass $payload - * @return void */ - public function unsubscribe(ConnectionInterface $connection, stdClass $payload) + public function unsubscribe(ConnectionInterface $connection, stdClass $payload): void { $this->channelManager->unsubscribeFromChannel($connection, $payload->channel, $payload); } diff --git a/src/Server/Messages/PusherMessageFactory.php b/src/Server/Messages/PusherMessageFactory.php index 04638bd..6eb17fe 100644 --- a/src/Server/Messages/PusherMessageFactory.php +++ b/src/Server/Messages/PusherMessageFactory.php @@ -4,7 +4,6 @@ namespace BlaxSoftware\LaravelWebSockets\Server\Messages; use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager; use BlaxSoftware\LaravelWebSockets\Contracts\PusherMessage; -use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use Ratchet\RFC6455\Messaging\MessageInterface; @@ -12,6 +11,7 @@ class PusherMessageFactory { /** * Create a new message. + * Optimized: Uses direct string comparison instead of Str::startsWith. * * @param \Ratchet\RFC6455\Messaging\MessageInterface $message * @param \Ratchet\ConnectionInterface $connection @@ -21,14 +21,25 @@ class PusherMessageFactory public static function createForMessage( MessageInterface $message, ConnectionInterface $connection, - ChannelManager $channelManager): PusherMessage - { + ChannelManager $channelManager + ): PusherMessage { $payload = json_decode($message->getPayload()); + $event = $payload->event ?? ''; - return ( - Str::startsWith($payload->event, 'pusher.') - || Str::startsWith($payload->event, 'pusher:') - ) + // Fast string prefix check (faster than Str::startsWith) + // Check first 7 chars for 'pusher.' or 'pusher:' + $isPusherEvent = ( + isset($event[6]) && + $event[0] === 'p' && + $event[1] === 'u' && + $event[2] === 's' && + $event[3] === 'h' && + $event[4] === 'e' && + $event[5] === 'r' && + ($event[6] === '.' || $event[6] === ':') + ); + + return $isPusherEvent ? new PusherChannelProtocolMessage($payload, $connection, $channelManager) : new PusherClientMessage($payload, $connection, $channelManager); } diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index 88713c4..a48c65e 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -5,6 +5,7 @@ declare(strict_types=1); namespace BlaxSoftware\LaravelWebSockets\Websocket; use BlaxSoftware\LaravelWebSockets\Apps\App; +use BlaxSoftware\LaravelWebSockets\Cache\IpcCache; use BlaxSoftware\LaravelWebSockets\Channels\Channel; use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel; use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel; @@ -32,29 +33,190 @@ class Handler implements MessageComponentInterface * Track channel connections using associative arrays for O(1) lookup * Structure: [channel_name => [socket_id => true]] */ - protected $channel_connections = []; + protected array $channel_connections = []; /** * Cache write buffer for batching operations * Reduces file I/O when multiple rapid requests occur */ - protected $cacheWriteBuffer = []; - protected $cacheDeleteBuffer = []; - protected $cacheBufferScheduled = false; + protected array $cacheWriteBuffer = []; + protected array $cacheDeleteBuffer = []; + protected bool $cacheBufferScheduled = false; + + /** + * Pre-encoded static JSON responses for performance + * Encoding once at startup is faster than encoding every time + */ + private static string $PONG_RESPONSE = '{"event":"pusher.pong"}'; + + /** + * GC collection counter - only collect every N pings + */ + private int $gcCounter = 0; + private const GC_INTERVAL = 100; /** * Initialize a new handler. - * - * @return void */ public function __construct( protected ChannelManager $channelManager ) {} - public function onOpen(ConnectionInterface $connection) + /** + * Handle incoming WebSocket message with optimized fast path for ping/pong + */ + public function onMessage( + ConnectionInterface $connection, + MessageInterface $message + ): void { + if (!isset($connection->app)) { + return; + } + + // FAST PATH: Check for ping before any heavy processing + // Use raw string comparison on payload to avoid JSON decode overhead + $payload = $message->getPayload(); + + // Quick ping detection using strpos (faster than json_decode + array access) + if ($this->tryHandlePingFast($payload, $connection)) { + return; + } + + // SLOW PATH: Full message processing + try { + $this->processFullMessage($connection, $message, $payload); + } catch (\Throwable $e) { + $this->handleMessageError($e); + } + } + + /** + * Fast path for ping/pong - avoids JSON decode, object creation, promises + * Target: < 1ms processing time + */ + private function tryHandlePingFast(string $payload, ConnectionInterface $connection): bool { - if (! $this->connectionCanBeMade($connection)) { - return $connection->close(); + // Quick string check - if doesn't contain "ping", skip fast path + // strpos is O(n) but very fast for short strings + if (strpos($payload, 'ping') === false) { + return false; + } + + // Now do minimal JSON decode to confirm it's a ping + $data = json_decode($payload, true); + if ($data === null) { + return false; + } + + $event = $data['event'] ?? ''; + + // Direct string comparison (faster than strtolower + comparison) + if ($event !== 'pusher:ping' && $event !== 'pusher.ping') { + return false; + } + + // Update connection timestamp directly on connection object (no promise chain) + $connection->lastPongedAt = time(); + + // Send pre-encoded pong response immediately + $connection->send(self::$PONG_RESPONSE); + + // Periodic GC instead of every ping + if (++$this->gcCounter >= self::GC_INTERVAL) { + $this->gcCounter = 0; + gc_collect_cycles(); + } + + return true; + } + + /** + * Full message processing for non-ping messages + */ + private function processFullMessage( + ConnectionInterface $connection, + MessageInterface $message, + string $payload + ): void { + // Set remote address once (moved from per-message to reduce overhead) + if (isset($connection->remoteAddress)) { + request()->server->set('REMOTE_ADDR', $connection->remoteAddress); + } + + // Decode message (we already have payload string) + $messageArray = json_decode($payload, true, 512, JSON_THROW_ON_ERROR); + + // Handle pusher protocol messages (subscribe, unsubscribe, etc.) + $this->handlePusherProtocolMessage($message, $connection, $messageArray); + + $channel = $this->handleChannelSubscriptions($messageArray, $connection); + + if ($this->shouldRejectMessage($channel, $connection, $messageArray)) { + return; + } + + $this->authenticateConnection($connection, $channel, $messageArray); + + // Only log in debug mode to reduce I/O + if (config('app.debug')) { + Log::channel('websocket')->debug('[' . $connection->socketId . ']@' . $channel->getName() . ' | ' . $payload); + } + + if ($this->handlePusherEvent($messageArray, $connection)) { + return; + } + + $this->forkAndProcessMessage($connection, $channel, $messageArray); + } + + /** + * Handle pusher protocol messages (formerly in PusherMessageFactory) + * Inlined for performance - avoids object creation + */ + private function handlePusherProtocolMessage( + MessageInterface $message, + ConnectionInterface $connection, + array $messageArray + ): void { + $event = $messageArray['event'] ?? ''; + + // Fast check - most messages don't start with 'pusher' or 'client-' + $firstChar = $event[0] ?? ''; + if ($firstChar !== 'p' && $firstChar !== 'c') { + return; + } + + // Check for client- messages + if (strpos($event, 'client-') === 0) { + if (!$connection->app->clientMessagesEnabled) { + return; + } + + $channelName = $messageArray['channel'] ?? null; + if (!$channelName) { + return; + } + + $channel = $this->channelManager->find($connection->app->id, $channelName); + if ($channel) { + $channel->broadcastToEveryoneExcept( + (object) $messageArray, + $connection->socketId, + $connection->app->id + ); + } + return; + } + + // Check for pusher: or pusher. messages (subscribe/unsubscribe handled elsewhere) + // This is handled by handleChannelSubscriptions for subscribe/unsubscribe + } + + public function onOpen(ConnectionInterface $connection): void + { + if (!$this->connectionCanBeMade($connection)) { + $connection->close(); + return; } try { @@ -74,48 +236,6 @@ class Handler implements MessageComponentInterface } } - public function onMessage( - ConnectionInterface $connection, - MessageInterface $message - ) { - if (!isset($connection->app)) { - return; - } - - try { - request()->server->set('REMOTE_ADDR', $connection->remoteAddress); - - PusherMessageFactory::createForMessage( - $message, - $connection, - $this->channelManager - )->respond(); - - $message = json_decode($message->getPayload(), true, 512, JSON_THROW_ON_ERROR); - - if ($this->handlePingPong($message, $connection)) { - return; - } - - $channel = $this->handleChannelSubscriptions($message, $connection); - - if ($this->shouldRejectMessage($channel, $connection, $message)) { - return; - } - - $this->authenticateConnection($connection, $channel, $message); - \Log::channel('websocket')->info('[' . $connection->socketId . ']@' . $channel->getName() . ' | ' . json_encode($message)); - - if ($this->handlePusherEvent($message, $connection)) { - return; - } - - $this->forkAndProcessMessage($connection, $channel, $message); - } catch (\Throwable $e) { - $this->handleMessageError($e); - } - } - /** * Handle the websocket close. */ @@ -150,8 +270,10 @@ class Handler implements MessageComponentInterface return; } + // Initialize lastPongedAt with unix timestamp (faster than Carbon) + $connection->lastPongedAt = time(); + $this->channelManager->subscribeToApp($connection->app->id); - $this->channelManager->connectionPonged($connection); NewConnection::dispatch( $connection->app->id, @@ -159,25 +281,14 @@ class Handler implements MessageComponentInterface ); } - protected function handlePingPong(array $message, ConnectionInterface $connection): bool - { - $eventLower = strtolower($message['event']); - if ($eventLower !== 'pusher:ping' && $eventLower !== 'pusher.ping') { - return false; - } - - $this->channelManager->connectionPonged($connection); - gc_collect_cycles(); - return true; - } - protected function shouldRejectMessage(?Channel $channel, ConnectionInterface $connection, array $message): bool { - $isUnsubscribe = $message['event'] === 'pusher:unsubscribe' || $message['event'] === 'pusher.unsubscribe'; + $event = $message['event'] ?? ''; + $isUnsubscribe = $event === 'pusher:unsubscribe' || $event === 'pusher.unsubscribe'; if (!$channel?->hasConnection($connection) && !$isUnsubscribe) { $connection->send(json_encode([ - 'event' => $message['event'] . ':error', + 'event' => $event . ':error', 'data' => [ 'message' => 'Subscription not established', 'meta' => $message, @@ -220,6 +331,10 @@ class Handler implements MessageComponentInterface Channel $channel, array $message ): void { + // Generate unique request ID BEFORE forking to avoid race conditions + // Using uniqid with more_entropy + random_bytes for guaranteed uniqueness + $requestId = uniqid('req_', true) . '_' . bin2hex(random_bytes(4)); + $pid = pcntl_fork(); if ($pid === -1) { @@ -228,24 +343,25 @@ class Handler implements MessageComponentInterface } if ($pid === 0) { - $this->processMessageInChild($connection, $channel, $message); + $this->processMessageInChild($connection, $channel, $message, $requestId); exit(0); } - $this->addDataCheckLoop($connection, $message, $pid); + $this->addDataCheckLoop($connection, $message, $requestId); } protected function processMessageInChild( ConnectionInterface $connection, Channel $channel, - array $message + array $message, + string $requestId ): void { try { DB::disconnect(); DB::reconnect(); $this->setRequest($message, $connection); - $mock = new MockConnection($connection); + $mock = new MockConnection($connection, $requestId); Controller::controll_message( $mock, @@ -737,27 +853,29 @@ class Handler implements MessageComponentInterface private function addDataCheckLoop( $connection, $message, - $pid, + string $requestId, $optional = false, - $iteration = false + int $iteration = 0 ) { - $pid = $this->preparePid($pid, $iteration); - $pidcache_start = 'dedicated_start_' . $pid; - cache()->put($pidcache_start, microtime(true), 100); + $iterationKey = $requestId . ($iteration > 0 ? '_' . $iteration : ''); + $cacheKeyStart = 'dedicated_start_' . $iterationKey; + IpcCache::put($cacheKeyStart, microtime(true), 100); $this->channelManager->loop->addPeriodicTimer(0.01, function ($timer) use ( - $pidcache_start, + $cacheKeyStart, + $iterationKey, $message, - $pid, + $requestId, $connection, $optional, $iteration ) { $this->checkDataLoopIteration( $timer, - $pidcache_start, + $cacheKeyStart, $message, - $pid, + $iterationKey, + $requestId, $connection, $optional, $iteration @@ -767,56 +885,66 @@ class Handler implements MessageComponentInterface }); } - protected function preparePid($pid, $iteration): string - { - $pid = explode('_', $pid . '')[0]; - - if ($iteration >= 0 && $iteration !== false) { - $pid .= '_' . $iteration; - } - - return $pid; - } - protected function checkDataLoopIteration( $timer, - string $pidcache_start, + string $cacheKeyStart, array $message, - string $pid, + string $iterationKey, + string $requestId, $connection, bool $optional, - $iteration + int $iteration ): void { - $pidcache_data = 'dedicated_data_' . $pid; - $pidcache_done = 'dedicated_data_' . $pid . '_done'; - $pidcache_complete = 'dedicated_data_' . $pid . '_complete'; + $cacheKeyData = 'dedicated_data_' . $iterationKey; + $cacheKeyDone = 'dedicated_data_' . $iterationKey . '_done'; + $cacheKeyComplete = 'dedicated_data_' . $iterationKey . '_complete'; - if ($this->handleTimeout($timer, $pidcache_start, $pidcache_complete, $message, $connection, $optional)) { + if ($this->handleTimeout($timer, $cacheKeyStart, $cacheKeyComplete, $message, $connection, $optional)) { return; } - if (!cache()->has($pidcache_done)) { + if (!IpcCache::has($cacheKeyDone)) { return; } - $this->scheduleNextIteration($connection, $message, $pid, $iteration); - $this->processAndSendData($connection, $pidcache_data); + // Clean up cache entries for this iteration before processing + // This prevents memory leaks and stale data issues + $this->cleanupIterationCache($iterationKey); + + $this->scheduleNextIteration($connection, $message, $requestId, $iteration); + $this->processAndSendData($connection, $cacheKeyData); $this->channelManager->loop->cancelTimer($timer); } + /** + * Clean up cache entries for a completed iteration + */ + protected function cleanupIterationCache(string $iterationKey): void + { + $keysToDelete = [ + 'dedicated_start_' . $iterationKey, + 'dedicated_data_' . $iterationKey . '_done', + // Note: We don't delete 'dedicated_data_' here as we need it for processAndSendData + // It will expire naturally after 60 seconds + ]; + + IpcCache::forgetMultiple($keysToDelete); + } + protected function handleTimeout( $timer, - string $pidcache_start, - string $pidcache_complete, + string $cacheKeyStart, + string $cacheKeyComplete, array $message, $connection, bool $optional ): bool { - if (!cache()->has($pidcache_start)) { + $startTime = IpcCache::get($cacheKeyStart); + if ($startTime === null) { return false; } - $diff = microtime(true) - ((int) cache()->get($pidcache_start)); + $diff = microtime(true) - ((float) $startTime); if ($diff <= 60) { return false; } @@ -832,19 +960,27 @@ class Handler implements MessageComponentInterface } $this->channelManager->loop->cancelTimer($timer); - cache()->put($pidcache_complete, true, 360); + IpcCache::put($cacheKeyComplete, true, 360); return true; } - protected function scheduleNextIteration($connection, array $message, string $pid, $iteration): void + protected function scheduleNextIteration($connection, array $message, string $requestId, int $iteration): void { - $nextIteration = ($iteration === false) ? 0 : $iteration + 1; - $this->addDataCheckLoop($connection, $message, $pid, true, $nextIteration); + $nextIteration = $iteration + 1; + $this->addDataCheckLoop($connection, $message, $requestId, true, $nextIteration); } - protected function processAndSendData($connection, string $pidcache_data): void + protected function processAndSendData($connection, string $cacheKeyData): void { - $sending = cache()->get($pidcache_data); + $sending = IpcCache::get($cacheKeyData); + + // Clean up the data cache key immediately after reading + IpcCache::forget($cacheKeyData); + + if (!$sending) { + return; + } + $bm = json_decode($sending, true); if (isset($bm['broadcast']) && $bm['broadcast']) { diff --git a/src/Websocket/MockConnection.php b/src/Websocket/MockConnection.php index ac40cdd..5855e02 100644 --- a/src/Websocket/MockConnection.php +++ b/src/Websocket/MockConnection.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace BlaxSoftware\LaravelWebSockets\Websocket; +use BlaxSoftware\LaravelWebSockets\Cache\IpcCache; use React\Socket\Connection; use Illuminate\Support\Facades\Log; @@ -17,8 +18,22 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface public $ip; public $app; - public function __construct($original_connection) + /** + * Unique request ID for cache-based communication + * Used instead of PID to avoid race conditions from PID reuse + */ + protected string $requestId; + + /** + * Track current iteration for multi-response scenarios + */ + protected int $currentIteration = 0; + + public function __construct($original_connection, ?string $requestId = null) { + // Generate fallback requestId if not provided (for backward compatibility) + $this->requestId = $requestId ?? uniqid('req_', true) . '_' . bin2hex(random_bytes(4)); + // create an indisdinctable copy of the original connection foreach (get_object_vars($original_connection) as $key => $value) { $this->{$key} = $value; @@ -52,8 +67,11 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface public function send($data) { - if(cache()->get('dedicated_data_'.getmypid().'_complete')){ - Log::error('[MockConnection] Send for pid: ' . getmypid() . ' which is already completed and does not check for new data', [ + $key = $this->getDataKey(); + $completeKey = $key . '_complete'; + + if (IpcCache::get($completeKey)) { + Log::error('[MockConnection] Send for request: ' . $this->requestId . ' which is already completed and does not check for new data', [ 'data' => $data, ]); return $this; @@ -64,14 +82,16 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface throw new \InvalidArgumentException('Data must be a string or an object that can be converted to a string.'); } - Log::channel('websocket')->info('[MockConnection] Send for pid: ' . getmypid(), [ + Log::channel('websocket')->info('[MockConnection] Send for request: ' . $this->requestId . ' iteration: ' . $this->currentIteration, [ 'data' => $data, ]); - $key = static::getDataKey(); + // Use atomic set to avoid race conditions - IpcCache uses tmpfs for speed + IpcCache::put($key, $data, 60); + IpcCache::put($key . '_done', true, 60); - cache()->put($key, $data, 60); - cache()->put($key . '_done', true, 60); + // Increment iteration for next send call + $this->currentIteration++; return $this; } @@ -80,7 +100,7 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface $data, ?string $channel = null, bool $including_self = false, - ){ + ) { $data ??= []; $data['broadcast'] = true; $data['channel'] ??= $channel; @@ -102,26 +122,18 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface return $this->send(json_encode($data)); } - private static function getDataKey() + /** + * Get the data key for the current request and iteration + * Now uses the unique requestId instead of PID to avoid race conditions + */ + private function getDataKey(): string { - $key = 'dedicated_data_' . getmypid(); - $i = ''; + $baseKey = 'dedicated_data_' . $this->requestId; - while (cache()->has($key . ($i !== '' ? '_' . $i : ''))) { - if ($i === '') { - $i = 0; - } else { - $i = (int) $i; - $i++; - } + if ($this->currentIteration > 0) { + return $baseKey . '_' . $this->currentIteration; } - if ($i !== '') { - $i = '_' . $i; - } - - $key .= $i; - - return $key; + return $baseKey; } }