BF long lasting subscription

This commit is contained in:
Fabian @ Blax Software 2026-03-27 09:38:07 +01:00
parent 7dd2df48d1
commit 77e12db729
2 changed files with 69 additions and 14 deletions

View File

@ -316,12 +316,49 @@ class ChannelManager extends LocalChannelManager
try { try {
$lock->get(function () { $lock->get(function () {
$this->getConnectionsFromSet(0, (int) now()->subMinutes(2)->format('U')) $this->getConnectionsFromSet(0, (int) now()->subMinutes(2)->format('U'))
->then(function ($connections) { ->then(function ($staleRedisConnections) {
foreach ($connections as $socketId => $appId) { // Cross-check against local connection state before unsubscribing.
$connection = $this->fakeConnectionForApp($appId, $socketId); // The Redis sorted set score may be stale (e.g., connectionPonged()
// Redis call failed) while the connection is still alive locally
// with a fresh lastPongedAt. Only unsubscribe connections that are
// ALSO stale locally, or not present locally at all.
$this->getLocalConnections()->then(function ($localConnections) use ($staleRedisConnections) {
// Build socketId → connection lookup from local connections
$localBySocketId = [];
foreach ($localConnections as $conn) {
if (isset($conn->socketId)) {
$localBySocketId[$conn->socketId] = $conn;
}
}
$this->unsubscribeFromAllChannels($connection); $now = time();
}
foreach ($staleRedisConnections as $socketId => $appId) {
// If the connection exists locally with a fresh pong, skip it.
// The local lastPongedAt is the ground truth — it's updated
// directly on every ping and every message, regardless of Redis.
if (isset($localBySocketId[$socketId])) {
$localConn = $localBySocketId[$socketId];
$lastPong = $localConn->lastPongedAt ?? 0;
if (is_object($lastPong)) {
$age = $lastPong->diffInSeconds(\Carbon\Carbon::now());
} else {
$age = $now - (int) $lastPong;
}
if ($age <= 120) {
// Connection is alive locally — just refresh the Redis score
// so the next cleanup cycle doesn't flag it again.
$this->addConnectionToSet($localConn, Carbon::now());
continue;
}
}
// Connection is either not local (other server) or genuinely stale
$fakeConn = $this->fakeConnectionForApp($appId, $socketId);
$this->unsubscribeFromAllChannels($fakeConn);
}
});
}); });
}); });

View File

@ -146,17 +146,22 @@ class Handler implements MessageComponentInterface
return false; return false;
} }
// Update connection pong timestamp in BOTH local memory AND Redis sorted set. // ALWAYS update local pong timestamp first — this is the ground truth
// This is critical: removeObsoleteConnections() checks the Redis set score, // that proves the connection is alive. Without this, if the Redis
// so a direct $connection->lastPongedAt assignment alone is insufficient — // connectionPonged() call below fails, parent::connectionPonged()
// the Redis-based cleanup would still unsubscribe channels after 120s. // (chained after Redis) never runs, and the local
// connectionPonged() is async (returns a Promise resolved by the event loop), // removeObsoleteConnections() also considers the connection stale.
// so this does not block the ping response. $connection->lastPongedAt = time();
// Also update Redis sorted set score so the Redis-based
// removeObsoleteConnections() doesn't consider this connection stale.
// This is async and does not block the pong response.
$this->channelManager->connectionPonged($connection) $this->channelManager->connectionPonged($connection)
->then(null, function (\Throwable $e) use ($connection) { ->then(null, function (\Throwable $e) use ($connection) {
// If the Redis pong update fails, the connection will appear stale // Redis pong update failed — the local lastPongedAt is still fresh,
// and removeObsoleteConnections() will unsubscribe its channels. // so the local cleanup won't remove this connection. However the
// Log this so we can diagnose connection drops. // Redis-based cleanup may still see a stale score. This is handled
// by cross-checking local connection liveness in removeObsoleteConnections().
Log::channel('websocket')->error('connectionPonged failed for ' . ($connection->socketId ?? '?') . ': ' . $e->getMessage()); Log::channel('websocket')->error('connectionPonged failed for ' . ($connection->socketId ?? '?') . ': ' . $e->getMessage());
}); });
@ -352,6 +357,19 @@ class Handler implements MessageComponentInterface
$isUnsubscribe = $event === 'pusher:unsubscribe' || $event === 'pusher.unsubscribe'; $isUnsubscribe = $event === 'pusher:unsubscribe' || $event === 'pusher.unsubscribe';
if (!$channel?->hasConnection($connection) && !$isUnsubscribe) { if (!$channel?->hasConnection($connection) && !$isUnsubscribe) {
// The connection may have been removed from Channel::$connections by
// removeObsoleteConnections() (Redis stale score race) while the socket
// is still alive. If Handler::$channel_connections still tracks it, the
// connection was legitimately subscribed — silently re-subscribe instead
// of returning an error to the client.
$channelName = $channel?->getName();
if ($channelName && isset($this->channel_connections[$channelName][$connection->socketId])) {
// Re-add to Channel::$connections transparently
$channel->saveConnection($connection);
Log::channel('websocket')->info('Auto-resubscribed connection ' . $connection->socketId . ' to channel ' . $channelName);
return false; // Allow the message to proceed
}
$connection->send(json_encode([ $connection->send(json_encode([
'event' => $event . ':error', 'event' => $event . ':error',
'data' => [ 'data' => [