diff --git a/src/Websocket/ChannelManager.php b/src/Websocket/ChannelManager.php index 3e1c4c3..02019e9 100644 --- a/src/Websocket/ChannelManager.php +++ b/src/Websocket/ChannelManager.php @@ -316,12 +316,49 @@ class ChannelManager extends LocalChannelManager try { $lock->get(function () { $this->getConnectionsFromSet(0, (int) now()->subMinutes(2)->format('U')) - ->then(function ($connections) { - foreach ($connections as $socketId => $appId) { - $connection = $this->fakeConnectionForApp($appId, $socketId); + ->then(function ($staleRedisConnections) { + // Cross-check against local connection state before unsubscribing. + // The Redis sorted set score may be stale (e.g., connectionPonged() + // Redis call failed) while the connection is still alive locally + // with a fresh lastPongedAt. Only unsubscribe connections that are + // ALSO stale locally, or not present locally at all. + $this->getLocalConnections()->then(function ($localConnections) use ($staleRedisConnections) { + // Build socketId → connection lookup from local connections + $localBySocketId = []; + foreach ($localConnections as $conn) { + if (isset($conn->socketId)) { + $localBySocketId[$conn->socketId] = $conn; + } + } - $this->unsubscribeFromAllChannels($connection); - } + $now = time(); + + foreach ($staleRedisConnections as $socketId => $appId) { + // If the connection exists locally with a fresh pong, skip it. + // The local lastPongedAt is the ground truth — it's updated + // directly on every ping and every message, regardless of Redis. + if (isset($localBySocketId[$socketId])) { + $localConn = $localBySocketId[$socketId]; + $lastPong = $localConn->lastPongedAt ?? 0; + if (is_object($lastPong)) { + $age = $lastPong->diffInSeconds(\Carbon\Carbon::now()); + } else { + $age = $now - (int) $lastPong; + } + + if ($age <= 120) { + // Connection is alive locally — just refresh the Redis score + // so the next cleanup cycle doesn't flag it again. + $this->addConnectionToSet($localConn, Carbon::now()); + continue; + } + } + + // Connection is either not local (other server) or genuinely stale + $fakeConn = $this->fakeConnectionForApp($appId, $socketId); + $this->unsubscribeFromAllChannels($fakeConn); + } + }); }); }); diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index 6df5116..aad9ae8 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -146,17 +146,22 @@ class Handler implements MessageComponentInterface return false; } - // Update connection pong timestamp in BOTH local memory AND Redis sorted set. - // This is critical: removeObsoleteConnections() checks the Redis set score, - // so a direct $connection->lastPongedAt assignment alone is insufficient — - // the Redis-based cleanup would still unsubscribe channels after 120s. - // connectionPonged() is async (returns a Promise resolved by the event loop), - // so this does not block the ping response. + // ALWAYS update local pong timestamp first — this is the ground truth + // that proves the connection is alive. Without this, if the Redis + // connectionPonged() call below fails, parent::connectionPonged() + // (chained after Redis) never runs, and the local + // removeObsoleteConnections() also considers the connection stale. + $connection->lastPongedAt = time(); + + // Also update Redis sorted set score so the Redis-based + // removeObsoleteConnections() doesn't consider this connection stale. + // This is async and does not block the pong response. $this->channelManager->connectionPonged($connection) ->then(null, function (\Throwable $e) use ($connection) { - // If the Redis pong update fails, the connection will appear stale - // and removeObsoleteConnections() will unsubscribe its channels. - // Log this so we can diagnose connection drops. + // Redis pong update failed — the local lastPongedAt is still fresh, + // so the local cleanup won't remove this connection. However the + // Redis-based cleanup may still see a stale score. This is handled + // by cross-checking local connection liveness in removeObsoleteConnections(). Log::channel('websocket')->error('connectionPonged failed for ' . ($connection->socketId ?? '?') . ': ' . $e->getMessage()); }); @@ -352,6 +357,19 @@ class Handler implements MessageComponentInterface $isUnsubscribe = $event === 'pusher:unsubscribe' || $event === 'pusher.unsubscribe'; if (!$channel?->hasConnection($connection) && !$isUnsubscribe) { + // The connection may have been removed from Channel::$connections by + // removeObsoleteConnections() (Redis stale score race) while the socket + // is still alive. If Handler::$channel_connections still tracks it, the + // connection was legitimately subscribed — silently re-subscribe instead + // of returning an error to the client. + $channelName = $channel?->getName(); + if ($channelName && isset($this->channel_connections[$channelName][$connection->socketId])) { + // Re-add to Channel::$connections transparently + $channel->saveConnection($connection); + Log::channel('websocket')->info('Auto-resubscribed connection ' . $connection->socketId . ' to channel ' . $channelName); + return false; // Allow the message to proceed + } + $connection->send(json_encode([ 'event' => $event . ':error', 'data' => [