From 55f13324932034532f9ff451d410aaa9c5c56eec Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Tue, 15 Sep 2020 12:30:17 +0300 Subject: [PATCH] Added tracking for pongs --- src/ChannelManagers/LocalChannelManager.php | 21 +++ src/ChannelManagers/RedisChannelManager.php | 163 +++++++++++++++++- src/Console/Commands/StartServer.php | 17 ++ src/Contracts/ChannelManager.php | 15 ++ src/Helpers.php | 26 +++ .../Messages/PusherChannelProtocolMessage.php | 2 + src/Server/MockableConnection.php | 44 +++++ src/Statistics/Collectors/RedisCollector.php | 29 +--- tests/ReplicationTest.php | 102 +++++++++++ 9 files changed, 387 insertions(+), 32 deletions(-) create mode 100644 src/Helpers.php create mode 100644 src/Server/MockableConnection.php diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index a889960..7ff689b 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -342,6 +342,27 @@ class LocalChannelManager implements ChannelManager return new FulfilledPromise($results); } + /** + * Keep tracking the connections availability when they pong. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function connectionPonged(ConnectionInterface $connection): bool + { + return true; + } + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return bool + */ + public function removeObsoleteConnections(): bool + { + return true; + } + /** * Mark the current instance as unable to accept new connections. * diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 8bed7cb..a4f564b 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -3,10 +3,16 @@ namespace BeyondCode\LaravelWebSockets\ChannelManagers; use BeyondCode\LaravelWebSockets\Channels\Channel; +use BeyondCode\LaravelWebSockets\Helpers; +use BeyondCode\LaravelWebSockets\Server\MockableConnection; +use Carbon\Carbon; use Clue\React\Redis\Client; use Clue\React\Redis\Factory; +use Illuminate\Cache\RedisLock; +use Illuminate\Support\Facades\Redis; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; +use Ratchet\WebSocket\WsConnection; use React\EventLoop\LoopInterface; use React\Promise\PromiseInterface; use stdClass; @@ -41,6 +47,21 @@ class RedisChannelManager extends LocalChannelManager */ protected $subscribeClient; + /** + * The Redis manager instance. + * + * @var \Illuminate\Redis\RedisManager + */ + protected $redis; + + /** + * The lock name to use on Redis to avoid multiple + * actions that might lead to multiple processings. + * + * @var string + */ + protected static $redisLockName = 'laravel-websockets:channel-manager:lock'; + /** * Create a new channel manager instance. * @@ -52,6 +73,10 @@ class RedisChannelManager extends LocalChannelManager { $this->loop = $loop; + $this->redis = Redis::connection( + config('websockets.replication.modes.redis.connection', 'default') + ); + $connectionUri = $this->getConnectionUri(); $factoryClass = $factoryClass ?: Factory::class; @@ -141,6 +166,8 @@ class RedisChannelManager extends LocalChannelManager } }); + $this->addConnectionToSet($connection); + $this->addChannelToSet( $connection->app->id, $channelName ); @@ -167,8 +194,14 @@ class RedisChannelManager extends LocalChannelManager if ($count === 0) { $this->unsubscribeFromTopic($connection->app->id, $channelName); + $this->removeUserData( + $connection->app->id, $channelName, $connection->socketId + ); + $this->removeChannelFromSet($connection->app->id, $channelName); + $this->removeConnectionFromSet($connection); + return; } @@ -179,7 +212,13 @@ class RedisChannelManager extends LocalChannelManager if ($count < 1) { $this->unsubscribeFromTopic($connection->app->id, $channelName); + $this->removeUserData( + $connection->app->id, $channelName, $connection->socketId + ); + $this->removeChannelFromSet($connection->app->id, $channelName); + + $this->removeConnectionFromSet($connection); } }); }); @@ -304,12 +343,8 @@ class RedisChannelManager extends LocalChannelManager { return $this->publishClient ->hgetall($this->getRedisKey($appId, $channel, ['users'])) - ->then(function ($members) { - [$keys, $values] = collect($members)->partition(function ($value, $key) { - return $key % 2 === 0; - }); - - return collect(array_combine($keys->all(), $values->all())) + ->then(function ($list) { + return collect(Helpers::redisListToArray($list)) ->map(function ($user) { return json_decode($user); }) @@ -355,6 +390,43 @@ class RedisChannelManager extends LocalChannelManager }); } + /** + * Keep tracking the connections availability when they pong. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function connectionPonged(ConnectionInterface $connection): bool + { + // This will update the score with the current timestamp. + $this->addConnectionToSet($connection); + + return parent::connectionPonged($connection); + } + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return bool + */ + public function removeObsoleteConnections(): bool + { + $this->lock()->get(function () { + $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($connections) { + foreach ($connections as $connection => $score) { + [$appId, $socketId] = explode(':', $connection); + + $this->unsubscribeFromAllChannels( + $this->fakeConnectionForApp($appId, $socketId) + ); + } + }); + }); + + return parent::removeObsoleteConnections(); + } + /** * Handle a message received from Redis on a specific channel. * @@ -473,6 +545,57 @@ class RedisChannelManager extends LocalChannelManager return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1); } + /** + * Add the connection to the sorted list. + * + * @param \Ratchet\ConnectionInterface $connection + * @param \DateTime|string|null $moment + * @return void + */ + public function addConnectionToSet(ConnectionInterface $connection, $moment = null) + { + $this->getPublishClient() + ->zadd( + $this->getRedisKey(null, null, ['sockets']), + Carbon::parse($moment)->format('U'), "{$connection->app->id}:{$connection->socketId}" + ); + } + + /** + * Remove the connection from the sorted list. + * + * @param \Ratchet\ConnectionInterface $connection + * @return void + */ + public function removeConnectionFromSet(ConnectionInterface $connection) + { + $this->getPublishClient() + ->zrem( + $this->getRedisKey(null, null, ['sockets']), + "{$connection->app->id}:{$connection->socketId}" + ); + } + + /** + * Get the connections from the sorted list, with last + * connection between certain timestamps. + * + * @param int $start + * @param int $stop + * @return PromiseInterface + */ + public function getConnectionsFromSet(int $start = 0, int $stop = 0) + { + return $this->getPublishClient() + ->zrange( + $this->getRedisKey(null, null, ['sockets']), + $start, $stop, 'withscores' + ) + ->then(function ($list) { + return Helpers::redisListToArray($list); + }); + } + /** * Add a channel to the set list. * @@ -566,11 +689,11 @@ class RedisChannelManager extends LocalChannelManager * Get the Redis Keyspace name to handle subscriptions * and other key-value sets. * - * @param mixed $appId + * @param string|int|null $appId * @param string|null $channel * @return string */ - public function getRedisKey($appId, string $channel = null, array $suffixes = []): string + public function getRedisKey($appId = null, string $channel = null, array $suffixes = []): string { $prefix = config('database.redis.options.prefix', null); @@ -588,4 +711,28 @@ class RedisChannelManager extends LocalChannelManager return $hash; } + + /** + * Get a new RedisLock instance to avoid race conditions. + * + * @return \Illuminate\Cache\CacheLock + */ + protected function lock() + { + return new RedisLock($this->redis, static::$redisLockName, 0); + } + + /** + * Create a fake connection for app that will mimick a connection + * by app ID and Socket ID to be able to be passed to the methods + * that accepts a connection class. + * + * @param string|int $appId + * @param string $socketId + * @return ConnectionInterface + */ + public function fakeConnectionForApp($appId, string $socketId) + { + return new MockableConnection($appId, $socketId); + } } diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index bb865b9..e6c0676 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -81,6 +81,8 @@ class StartServer extends Command $this->configurePcntlSignal(); + $this->configurePongTracker(); + $this->startServer(); } @@ -183,6 +185,21 @@ class StartServer extends Command }); } + /** + * Configure the tracker that will delete + * from the store the connections that + * + * @return void + */ + protected function configurePongTracker() + { + $this->loop->addPeriodicTimer(10, function () { + $this->laravel + ->make(ChannelManager::class) + ->removeObsoleteConnections(); + }); + } + /** * Configure the HTTP logger class. * diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index ccc15c0..35d5baf 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -185,4 +185,19 @@ interface ChannelManager * @return \React\Promise\PromiseInterface */ public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface; + + /** + * Keep tracking the connections availability when they pong. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function connectionPonged(ConnectionInterface $connection): bool; + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return bool + */ + public function removeObsoleteConnections(): bool; } diff --git a/src/Helpers.php b/src/Helpers.php new file mode 100644 index 0000000..7354545 --- /dev/null +++ b/src/Helpers.php @@ -0,0 +1,26 @@ + value array. + [$keys, $values] = collect($list)->partition(function ($value, $key) { + return $key % 2 === 0; + }); + + return array_combine($keys->all(), $values->all()); + } +} diff --git a/src/Server/Messages/PusherChannelProtocolMessage.php b/src/Server/Messages/PusherChannelProtocolMessage.php index 14dea23..d70934b 100644 --- a/src/Server/Messages/PusherChannelProtocolMessage.php +++ b/src/Server/Messages/PusherChannelProtocolMessage.php @@ -34,6 +34,8 @@ class PusherChannelProtocolMessage extends PusherClientMessage $connection->send(json_encode([ 'event' => 'pusher:pong', ])); + + $this->channelManager->connectionPonged($connection); } /** diff --git a/src/Server/MockableConnection.php b/src/Server/MockableConnection.php new file mode 100644 index 0000000..9fb5813 --- /dev/null +++ b/src/Server/MockableConnection.php @@ -0,0 +1,44 @@ +app = new stdClass; + + $this->app->id = $appId; + $this->socketId = $socketId; + } + + /** + * Send data to the connection + * @param string $data + * @return \Ratchet\ConnectionInterface + */ + function send($data) + { + // + } + + /** + * Close the connection + * + * @return void + */ + function close() + { + // + } +} diff --git a/src/Statistics/Collectors/RedisCollector.php b/src/Statistics/Collectors/RedisCollector.php index 7b845b5..f7b5074 100644 --- a/src/Statistics/Collectors/RedisCollector.php +++ b/src/Statistics/Collectors/RedisCollector.php @@ -2,6 +2,7 @@ namespace BeyondCode\LaravelWebSockets\Statistics\Collectors; +use BeyondCode\LaravelWebSockets\Helpers; use BeyondCode\LaravelWebSockets\Statistics\Statistic; use Illuminate\Cache\RedisLock; use Illuminate\Support\Facades\Redis; @@ -30,7 +31,7 @@ class RedisCollector extends MemoryCollector * * @var string */ - protected static $redisLockName = 'laravel-websockets:lock'; + protected static $redisLockName = 'laravel-websockets:collector:lock'; /** * Initialize the logger. @@ -178,7 +179,7 @@ class RedisCollector extends MemoryCollector } $statistic = $this->arrayToStatisticInstance( - $appId, $this->redisListToArray($list) + $appId, Helpers::redisListToArray($list) ); $this->createRecord($statistic, $appId); @@ -229,7 +230,7 @@ class RedisCollector extends MemoryCollector ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->then(function ($list) use ($appId, &$appsWithStatistics) { $appsWithStatistics[$appId] = $this->arrayToStatisticInstance( - $appId, $this->redisListToArray($list) + $appId, Helpers::redisListToArray($list) ); }); } @@ -251,7 +252,7 @@ class RedisCollector extends MemoryCollector ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->then(function ($list) use ($appId) { return $this->arrayToStatisticInstance( - $appId, $this->redisListToArray($list) + $appId, Helpers::redisListToArray($list) ); }); } @@ -361,26 +362,6 @@ class RedisCollector extends MemoryCollector return new RedisLock($this->redis, static::$redisLockName, 0); } - /** - * Transform the Redis' list of key after value - * to key-value pairs. - * - * @param array $list - * @return array - */ - protected function redisListToArray(array $list) - { - // Redis lists come into a format where the keys are on even indexes - // and the values are on odd indexes. This way, we know which - // ones are keys and which ones are values and their get combined - // later to form the key => value array. - [$keys, $values] = collect($list)->partition(function ($value, $key) { - return $key % 2 === 0; - }); - - return array_combine($keys->all(), $values->all()); - } - /** * Transform a key-value pair to a Statistic instance. * diff --git a/tests/ReplicationTest.php b/tests/ReplicationTest.php index 00ee615..f08c6b0 100644 --- a/tests/ReplicationTest.php +++ b/tests/ReplicationTest.php @@ -32,4 +32,106 @@ class ReplicationTest extends TestCase 'data' => ['channel' => 'public-channel', 'test' => 'yes'], ]); } + + public function test_not_ponged_connections_do_get_removed_for_public_channels() + { + $this->runOnlyOnRedisReplication(); + + $connection = $this->newActiveConnection(['public-channel']); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($connection, now()->subDays(1)); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + } + + public function test_not_ponged_connections_do_get_removed_for_private_channels() + { + $this->runOnlyOnRedisReplication(); + + $connection = $this->newPrivateConnection('private-channel'); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($connection, now()->subDays(1)); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + } + + public function test_not_ponged_connections_do_get_removed_for_presence_channels() + { + $this->runOnlyOnRedisReplication(); + + $connection = $this->newPresenceConnection('presence-channel'); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($connection, now()->subDays(1)); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(1, $members); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(0, $members); + }); + } }