From ec47925c71a5d1e4f3c2461954a15c52f161669f Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Sat, 12 Sep 2020 17:45:07 +0300 Subject: [PATCH 1/7] Added soft closes for connections on SIGTERM/SIGINT --- composer.json | 3 + src/ChannelManagers/LocalChannelManager.php | 52 ++++++++++++++++ src/ChannelManagers/RedisChannelManager.php | 15 ++++- src/Console/Commands/StartServer.php | 66 ++++++++++++++++++--- src/Contracts/ChannelManager.php | 8 +++ src/Server/WebSocketHandler.php | 20 +++++++ tests/Commands/StartServerTest.php | 38 +++++++++++- tests/ConnectionTest.php | 18 ++++++ tests/Mocks/Connection.php | 12 ++++ tests/PresenceChannelTest.php | 19 ++++++ tests/PrivateChannelTest.php | 19 ++++++ tests/PublicChannelTest.php | 20 +++++++ 12 files changed, 278 insertions(+), 12 deletions(-) diff --git a/composer.json b/composer.json index dc8968a..9df2218 100644 --- a/composer.json +++ b/composer.json @@ -56,6 +56,9 @@ "orchestra/database": "^4.0|^5.0|^6.0", "phpunit/phpunit": "^8.0|^9.0" }, + "suggest": { + "ext-pcntl": "Running the server needs pcntl to listen to command signals and soft-shutdown." + }, "autoload": { "psr-4": { "BeyondCode\\LaravelWebSockets\\": "src/" diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 2b8150c..a889960 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager */ protected $users = []; + /** + * Wether the current instance accepts new connections. + * + * @var bool + */ + protected $acceptsNewConnections = true; + /** * Create a new channel manager instance. * @@ -71,6 +78,28 @@ class LocalChannelManager implements ChannelManager return $this->channels[$appId][$channel]; } + /** + * Get the local connections, regardless of the channel + * they are connected to. + * + * @return \React\Promise\PromiseInterface + */ + public function getLocalConnections(): PromiseInterface + { + $connections = collect($this->channels) + ->map(function ($channelsWithConnections, $appId) { + return collect($channelsWithConnections)->values(); + }) + ->values()->collapse() + ->map(function ($channel) { + return collect($channel->getConnections()); + }) + ->values()->collapse() + ->toArray(); + + return new FulfilledPromise($connections); + } + /** * Get all channels for a specific app * for the current instance. @@ -313,6 +342,29 @@ class LocalChannelManager implements ChannelManager return new FulfilledPromise($results); } + /** + * Mark the current instance as unable to accept new connections. + * + * @return $this + */ + public function declineNewConnections() + { + $this->acceptsNewConnections = false; + + return $this; + } + + /** + * Check if the current server instance + * accepts new connections. + * + * @return bool + */ + public function acceptsNewConnections(): bool + { + return $this->acceptsNewConnections; + } + /** * Get the channel class by the channel name. * diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 0d884b3..8bed7cb 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -67,6 +67,17 @@ class RedisChannelManager extends LocalChannelManager $this->serverId = Str::uuid()->toString(); } + /** + * Get the local connections, regardless of the channel + * they are connected to. + * + * @return \React\Promise\PromiseInterface + */ + public function getLocalConnections(): PromiseInterface + { + return parent::getLocalConnections(); + } + /** * Get all channels for a specific app * for the current instance. @@ -108,9 +119,9 @@ class RedisChannelManager extends LocalChannelManager $connection, $channel, new stdClass ); } + })->then(function () use ($connection) { + parent::unsubscribeFromAllChannels($connection); }); - - parent::unsubscribeFromAllChannels($connection); } /** diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 664d6a9..03d6e01 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -26,7 +26,7 @@ class StartServer extends Command {--disable-statistics : Disable the statistics tracking.} {--statistics-interval= : The amount of seconds to tick between statistics saving.} {--debug : Forces the loggers to be enabled and thereby overriding the APP_DEBUG setting.} - {--test : Prepare the server, but do not start it.} + {--loop : Programatically inject the loop.} '; /** @@ -79,6 +79,8 @@ class StartServer extends Command $this->configureRoutes(); + $this->configurePcntlSignal(); + $this->startServer(); } @@ -156,6 +158,31 @@ class StartServer extends Command WebSocketRouter::routes(); } + /** + * Configure the PCNTL signals for soft shutdown. + * + * @return void + */ + protected function configurePcntlSignal() + { + // When the process receives a SIGTERM or a SIGINT + // signal, it should mark the server as unavailable + // to receive new connections, close the current connections, + // then stopping the loop. + + $this->loop->addSignal(SIGTERM, function () { + $this->line('Closing existing connections...'); + + $this->triggerSoftShutdown(); + }); + + $this->loop->addSignal(SIGINT, function () { + $this->line('Closing existing connections...'); + + $this->triggerSoftShutdown(); + }); + } + /** * Configure the HTTP logger class. * @@ -209,14 +236,6 @@ class StartServer extends Command $this->buildServer(); - // For testing, just boot up the server, run it - // but exit after the next tick. - if ($this->option('test')) { - $this->loop->futureTick(function () { - $this->loop->stop(); - }); - } - $this->server->run(); } @@ -231,6 +250,10 @@ class StartServer extends Command $this->option('host'), $this->option('port') ); + if ($loop = $this->option('loop')) { + $this->loop = $loop; + } + $this->server = $this->server ->setLoop($this->loop) ->withRoutes(WebSocketRouter::getRoutes()) @@ -249,4 +272,29 @@ class StartServer extends Command 'beyondcode:websockets:restart', 0 ); } + + /** + * Trigger a soft shutdown for the process. + * + * @return void + */ + protected function triggerSoftShutdown() + { + $channelManager = $this->laravel->make(ChannelManager::class); + + // Close the new connections allowance on this server. + $channelManager->declineNewConnections(); + + // Get all local connections and close them. They will + // be automatically be unsubscribed from all channels. + $channelManager->getLocalConnections() + ->then(function ($connections) use ($channelManager) { + foreach ($connections as $connection) { + $connection->close(); + } + }) + ->then(function () { + $this->loop->stop(); + }); + } } diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index e056e11..ccc15c0 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -36,6 +36,14 @@ interface ChannelManager */ public function findOrCreate($appId, string $channel); + /** + * Get the local connections, regardless of the channel + * they are connected to. + * + * @return \React\Promise\PromiseInterface + */ + public function getLocalConnections(): PromiseInterface; + /** * Get all channels for a specific app * for the current instance. diff --git a/src/Server/WebSocketHandler.php b/src/Server/WebSocketHandler.php index 1016a1a..0dbe8be 100644 --- a/src/Server/WebSocketHandler.php +++ b/src/Server/WebSocketHandler.php @@ -39,6 +39,10 @@ class WebSocketHandler implements MessageComponentInterface */ public function onOpen(ConnectionInterface $connection) { + if (! $this->connectionCanBeMade($connection)) { + return $connection->close(); + } + $this->verifyAppKey($connection) ->verifyOrigin($connection) ->limitConcurrentConnections($connection) @@ -69,6 +73,10 @@ class WebSocketHandler implements MessageComponentInterface */ public function onMessage(ConnectionInterface $connection, MessageInterface $message) { + if (! isset($connection->app)) { + return; + } + Messages\PusherMessageFactory::createForMessage( $message, $connection, $this->channelManager )->respond(); @@ -113,6 +121,18 @@ class WebSocketHandler implements MessageComponentInterface } } + /** + * Check if the connection can be made for the + * current server instance. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + protected function connectionCanBeMade(ConnectionInterface $connection): bool + { + return $this->channelManager->acceptsNewConnections(); + } + /** * Verify the app key validity. * diff --git a/tests/Commands/StartServerTest.php b/tests/Commands/StartServerTest.php index 223331c..08f71a3 100644 --- a/tests/Commands/StartServerTest.php +++ b/tests/Commands/StartServerTest.php @@ -8,7 +8,43 @@ class StartServerTest extends TestCase { public function test_does_not_fail_if_building_up() { - $this->artisan('websockets:serve', ['--test' => true, '--debug' => true]); + $this->loop->futureTick(function () { + $this->loop->stop(); + }); + + $this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6001]); + + $this->assertTrue(true); + } + + public function test_pcntl_sigint_signal() + { + $this->loop->futureTick(function () { + $this->newActiveConnection(['public-channel']); + $this->newActiveConnection(['public-channel']); + + posix_kill(posix_getpid(), SIGINT); + + $this->loop->stop(); + }); + + $this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6002]); + + $this->assertTrue(true); + } + + public function test_pcntl_sigterm_signal() + { + $this->loop->futureTick(function () { + $this->newActiveConnection(['public-channel']); + $this->newActiveConnection(['public-channel']); + + posix_kill(posix_getpid(), SIGTERM); + + $this->loop->stop(); + }); + + $this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6003]); $this->assertTrue(true); } diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index e4e3701..61caf68 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -108,4 +108,22 @@ class ConnectionTest extends TestCase ->assertSentEvent('pusher:error', ['data' => ['message' => 'Over capacity', 'code' => 4100]]) ->assertClosed(); } + + public function test_close_all_new_connections_after_stating_the_server_does_not_accept_new_connections() + { + $allowedConnection = $this->newActiveConnection(['test-channel']); + + $allowedConnection->assertSentEvent('pusher:connection_established') + ->assertSentEvent('pusher_internal:subscription_succeeded'); + + $this->channelManager->declineNewConnections(); + + $this->assertFalse( + $this->channelManager->acceptsNewConnections() + ); + + $this->newActiveConnection(['test-channel']) + ->assertNothingSent() + ->assertClosed(); + } } diff --git a/tests/Mocks/Connection.php b/tests/Mocks/Connection.php index 8de4a7b..42d02c0 100644 --- a/tests/Mocks/Connection.php +++ b/tests/Mocks/Connection.php @@ -97,6 +97,18 @@ class Connection implements ConnectionInterface return $this; } + /** + * Assert that no events occured within the connection. + * + * @return $this + */ + public function assertNothingSent() + { + PHPUnit::assertEquals([], $this->sentData); + + return $this; + } + /** * Assert the connection is closed. * diff --git a/tests/PresenceChannelTest.php b/tests/PresenceChannelTest.php index b7d0b8a..9d4bbcb 100644 --- a/tests/PresenceChannelTest.php +++ b/tests/PresenceChannelTest.php @@ -3,6 +3,7 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; +use Ratchet\ConnectionInterface; class PresenceChannelTest extends TestCase { @@ -185,4 +186,22 @@ class PresenceChannelTest extends TestCase ], $statistic->toArray()); }); } + + public function test_local_connections_for_private_channels() + { + $this->newPresenceConnection('presence-channel', ['user_id' => 1]); + $this->newPresenceConnection('presence-channel-2', ['user_id' => 2]); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); + + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); + } } diff --git a/tests/PrivateChannelTest.php b/tests/PrivateChannelTest.php index bfc4807..f28ce6d 100644 --- a/tests/PrivateChannelTest.php +++ b/tests/PrivateChannelTest.php @@ -3,6 +3,7 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; +use Ratchet\ConnectionInterface; class PrivateChannelTest extends TestCase { @@ -138,4 +139,22 @@ class PrivateChannelTest extends TestCase ], $statistic->toArray()); }); } + + public function test_local_connections_for_private_channels() + { + $this->newPrivateConnection('private-channel'); + $this->newPrivateConnection('private-channel-2'); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); + + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); + } } diff --git a/tests/PublicChannelTest.php b/tests/PublicChannelTest.php index 373f2f3..95d2f50 100644 --- a/tests/PublicChannelTest.php +++ b/tests/PublicChannelTest.php @@ -2,6 +2,8 @@ namespace BeyondCode\LaravelWebSockets\Test; +use Ratchet\ConnectionInterface; + class PublicChannelTest extends TestCase { public function test_connect_to_public_channel() @@ -114,4 +116,22 @@ class PublicChannelTest extends TestCase ], $statistic->toArray()); }); } + + public function test_local_connections_for_public_channels() + { + $this->newActiveConnection(['public-channel']); + $this->newActiveConnection(['public-channel-2']); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); + + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); + } } From 87f5e0c31644a03a32e4f682994a8d9fc7d6d0f5 Mon Sep 17 00:00:00 2001 From: rennokki Date: Sat, 12 Sep 2020 14:45:29 +0000 Subject: [PATCH 2/7] Apply fixes from StyleCI (#522) --- src/Console/Commands/StartServer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 03d6e01..c06ed21 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -288,7 +288,7 @@ class StartServer extends Command // Get all local connections and close them. They will // be automatically be unsubscribed from all channels. $channelManager->getLocalConnections() - ->then(function ($connections) use ($channelManager) { + ->then(function ($connections) { foreach ($connections as $connection) { $connection->close(); } From c26e86ec2c390ff1cdd7cc4d31a7b2912ae6e0ae Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Sun, 13 Sep 2020 09:37:31 +0300 Subject: [PATCH 3/7] Trigger soft-shutdown on timer restart --- src/Console/Commands/StartServer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index c06ed21..bb865b9 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -143,7 +143,7 @@ class StartServer extends Command $this->loop->addPeriodicTimer(10, function () { if ($this->getLastRestart() !== $this->lastRestart) { - $this->loop->stop(); + $this->triggerSoftShutdown(); } }); } From e3e2e4a437a81e7131fddfb665e944b1d104c94e Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Mon, 14 Sep 2020 13:25:21 +0300 Subject: [PATCH 4/7] wip --- tests/ConnectionTest.php | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index 61caf68..2e4f2ed 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -111,9 +111,8 @@ class ConnectionTest extends TestCase public function test_close_all_new_connections_after_stating_the_server_does_not_accept_new_connections() { - $allowedConnection = $this->newActiveConnection(['test-channel']); - - $allowedConnection->assertSentEvent('pusher:connection_established') + $this->newActiveConnection(['test-channel']) + ->assertSentEvent('pusher:connection_established') ->assertSentEvent('pusher_internal:subscription_succeeded'); $this->channelManager->declineNewConnections(); From 55f13324932034532f9ff451d410aaa9c5c56eec Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Tue, 15 Sep 2020 12:30:17 +0300 Subject: [PATCH 5/7] Added tracking for pongs --- src/ChannelManagers/LocalChannelManager.php | 21 +++ src/ChannelManagers/RedisChannelManager.php | 163 +++++++++++++++++- src/Console/Commands/StartServer.php | 17 ++ src/Contracts/ChannelManager.php | 15 ++ src/Helpers.php | 26 +++ .../Messages/PusherChannelProtocolMessage.php | 2 + src/Server/MockableConnection.php | 44 +++++ src/Statistics/Collectors/RedisCollector.php | 29 +--- tests/ReplicationTest.php | 102 +++++++++++ 9 files changed, 387 insertions(+), 32 deletions(-) create mode 100644 src/Helpers.php create mode 100644 src/Server/MockableConnection.php diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index a889960..7ff689b 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -342,6 +342,27 @@ class LocalChannelManager implements ChannelManager return new FulfilledPromise($results); } + /** + * Keep tracking the connections availability when they pong. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function connectionPonged(ConnectionInterface $connection): bool + { + return true; + } + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return bool + */ + public function removeObsoleteConnections(): bool + { + return true; + } + /** * Mark the current instance as unable to accept new connections. * diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 8bed7cb..a4f564b 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -3,10 +3,16 @@ namespace BeyondCode\LaravelWebSockets\ChannelManagers; use BeyondCode\LaravelWebSockets\Channels\Channel; +use BeyondCode\LaravelWebSockets\Helpers; +use BeyondCode\LaravelWebSockets\Server\MockableConnection; +use Carbon\Carbon; use Clue\React\Redis\Client; use Clue\React\Redis\Factory; +use Illuminate\Cache\RedisLock; +use Illuminate\Support\Facades\Redis; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; +use Ratchet\WebSocket\WsConnection; use React\EventLoop\LoopInterface; use React\Promise\PromiseInterface; use stdClass; @@ -41,6 +47,21 @@ class RedisChannelManager extends LocalChannelManager */ protected $subscribeClient; + /** + * The Redis manager instance. + * + * @var \Illuminate\Redis\RedisManager + */ + protected $redis; + + /** + * The lock name to use on Redis to avoid multiple + * actions that might lead to multiple processings. + * + * @var string + */ + protected static $redisLockName = 'laravel-websockets:channel-manager:lock'; + /** * Create a new channel manager instance. * @@ -52,6 +73,10 @@ class RedisChannelManager extends LocalChannelManager { $this->loop = $loop; + $this->redis = Redis::connection( + config('websockets.replication.modes.redis.connection', 'default') + ); + $connectionUri = $this->getConnectionUri(); $factoryClass = $factoryClass ?: Factory::class; @@ -141,6 +166,8 @@ class RedisChannelManager extends LocalChannelManager } }); + $this->addConnectionToSet($connection); + $this->addChannelToSet( $connection->app->id, $channelName ); @@ -167,8 +194,14 @@ class RedisChannelManager extends LocalChannelManager if ($count === 0) { $this->unsubscribeFromTopic($connection->app->id, $channelName); + $this->removeUserData( + $connection->app->id, $channelName, $connection->socketId + ); + $this->removeChannelFromSet($connection->app->id, $channelName); + $this->removeConnectionFromSet($connection); + return; } @@ -179,7 +212,13 @@ class RedisChannelManager extends LocalChannelManager if ($count < 1) { $this->unsubscribeFromTopic($connection->app->id, $channelName); + $this->removeUserData( + $connection->app->id, $channelName, $connection->socketId + ); + $this->removeChannelFromSet($connection->app->id, $channelName); + + $this->removeConnectionFromSet($connection); } }); }); @@ -304,12 +343,8 @@ class RedisChannelManager extends LocalChannelManager { return $this->publishClient ->hgetall($this->getRedisKey($appId, $channel, ['users'])) - ->then(function ($members) { - [$keys, $values] = collect($members)->partition(function ($value, $key) { - return $key % 2 === 0; - }); - - return collect(array_combine($keys->all(), $values->all())) + ->then(function ($list) { + return collect(Helpers::redisListToArray($list)) ->map(function ($user) { return json_decode($user); }) @@ -355,6 +390,43 @@ class RedisChannelManager extends LocalChannelManager }); } + /** + * Keep tracking the connections availability when they pong. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function connectionPonged(ConnectionInterface $connection): bool + { + // This will update the score with the current timestamp. + $this->addConnectionToSet($connection); + + return parent::connectionPonged($connection); + } + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return bool + */ + public function removeObsoleteConnections(): bool + { + $this->lock()->get(function () { + $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($connections) { + foreach ($connections as $connection => $score) { + [$appId, $socketId] = explode(':', $connection); + + $this->unsubscribeFromAllChannels( + $this->fakeConnectionForApp($appId, $socketId) + ); + } + }); + }); + + return parent::removeObsoleteConnections(); + } + /** * Handle a message received from Redis on a specific channel. * @@ -473,6 +545,57 @@ class RedisChannelManager extends LocalChannelManager return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1); } + /** + * Add the connection to the sorted list. + * + * @param \Ratchet\ConnectionInterface $connection + * @param \DateTime|string|null $moment + * @return void + */ + public function addConnectionToSet(ConnectionInterface $connection, $moment = null) + { + $this->getPublishClient() + ->zadd( + $this->getRedisKey(null, null, ['sockets']), + Carbon::parse($moment)->format('U'), "{$connection->app->id}:{$connection->socketId}" + ); + } + + /** + * Remove the connection from the sorted list. + * + * @param \Ratchet\ConnectionInterface $connection + * @return void + */ + public function removeConnectionFromSet(ConnectionInterface $connection) + { + $this->getPublishClient() + ->zrem( + $this->getRedisKey(null, null, ['sockets']), + "{$connection->app->id}:{$connection->socketId}" + ); + } + + /** + * Get the connections from the sorted list, with last + * connection between certain timestamps. + * + * @param int $start + * @param int $stop + * @return PromiseInterface + */ + public function getConnectionsFromSet(int $start = 0, int $stop = 0) + { + return $this->getPublishClient() + ->zrange( + $this->getRedisKey(null, null, ['sockets']), + $start, $stop, 'withscores' + ) + ->then(function ($list) { + return Helpers::redisListToArray($list); + }); + } + /** * Add a channel to the set list. * @@ -566,11 +689,11 @@ class RedisChannelManager extends LocalChannelManager * Get the Redis Keyspace name to handle subscriptions * and other key-value sets. * - * @param mixed $appId + * @param string|int|null $appId * @param string|null $channel * @return string */ - public function getRedisKey($appId, string $channel = null, array $suffixes = []): string + public function getRedisKey($appId = null, string $channel = null, array $suffixes = []): string { $prefix = config('database.redis.options.prefix', null); @@ -588,4 +711,28 @@ class RedisChannelManager extends LocalChannelManager return $hash; } + + /** + * Get a new RedisLock instance to avoid race conditions. + * + * @return \Illuminate\Cache\CacheLock + */ + protected function lock() + { + return new RedisLock($this->redis, static::$redisLockName, 0); + } + + /** + * Create a fake connection for app that will mimick a connection + * by app ID and Socket ID to be able to be passed to the methods + * that accepts a connection class. + * + * @param string|int $appId + * @param string $socketId + * @return ConnectionInterface + */ + public function fakeConnectionForApp($appId, string $socketId) + { + return new MockableConnection($appId, $socketId); + } } diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index bb865b9..e6c0676 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -81,6 +81,8 @@ class StartServer extends Command $this->configurePcntlSignal(); + $this->configurePongTracker(); + $this->startServer(); } @@ -183,6 +185,21 @@ class StartServer extends Command }); } + /** + * Configure the tracker that will delete + * from the store the connections that + * + * @return void + */ + protected function configurePongTracker() + { + $this->loop->addPeriodicTimer(10, function () { + $this->laravel + ->make(ChannelManager::class) + ->removeObsoleteConnections(); + }); + } + /** * Configure the HTTP logger class. * diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index ccc15c0..35d5baf 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -185,4 +185,19 @@ interface ChannelManager * @return \React\Promise\PromiseInterface */ public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface; + + /** + * Keep tracking the connections availability when they pong. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function connectionPonged(ConnectionInterface $connection): bool; + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return bool + */ + public function removeObsoleteConnections(): bool; } diff --git a/src/Helpers.php b/src/Helpers.php new file mode 100644 index 0000000..7354545 --- /dev/null +++ b/src/Helpers.php @@ -0,0 +1,26 @@ + value array. + [$keys, $values] = collect($list)->partition(function ($value, $key) { + return $key % 2 === 0; + }); + + return array_combine($keys->all(), $values->all()); + } +} diff --git a/src/Server/Messages/PusherChannelProtocolMessage.php b/src/Server/Messages/PusherChannelProtocolMessage.php index 14dea23..d70934b 100644 --- a/src/Server/Messages/PusherChannelProtocolMessage.php +++ b/src/Server/Messages/PusherChannelProtocolMessage.php @@ -34,6 +34,8 @@ class PusherChannelProtocolMessage extends PusherClientMessage $connection->send(json_encode([ 'event' => 'pusher:pong', ])); + + $this->channelManager->connectionPonged($connection); } /** diff --git a/src/Server/MockableConnection.php b/src/Server/MockableConnection.php new file mode 100644 index 0000000..9fb5813 --- /dev/null +++ b/src/Server/MockableConnection.php @@ -0,0 +1,44 @@ +app = new stdClass; + + $this->app->id = $appId; + $this->socketId = $socketId; + } + + /** + * Send data to the connection + * @param string $data + * @return \Ratchet\ConnectionInterface + */ + function send($data) + { + // + } + + /** + * Close the connection + * + * @return void + */ + function close() + { + // + } +} diff --git a/src/Statistics/Collectors/RedisCollector.php b/src/Statistics/Collectors/RedisCollector.php index 7b845b5..f7b5074 100644 --- a/src/Statistics/Collectors/RedisCollector.php +++ b/src/Statistics/Collectors/RedisCollector.php @@ -2,6 +2,7 @@ namespace BeyondCode\LaravelWebSockets\Statistics\Collectors; +use BeyondCode\LaravelWebSockets\Helpers; use BeyondCode\LaravelWebSockets\Statistics\Statistic; use Illuminate\Cache\RedisLock; use Illuminate\Support\Facades\Redis; @@ -30,7 +31,7 @@ class RedisCollector extends MemoryCollector * * @var string */ - protected static $redisLockName = 'laravel-websockets:lock'; + protected static $redisLockName = 'laravel-websockets:collector:lock'; /** * Initialize the logger. @@ -178,7 +179,7 @@ class RedisCollector extends MemoryCollector } $statistic = $this->arrayToStatisticInstance( - $appId, $this->redisListToArray($list) + $appId, Helpers::redisListToArray($list) ); $this->createRecord($statistic, $appId); @@ -229,7 +230,7 @@ class RedisCollector extends MemoryCollector ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->then(function ($list) use ($appId, &$appsWithStatistics) { $appsWithStatistics[$appId] = $this->arrayToStatisticInstance( - $appId, $this->redisListToArray($list) + $appId, Helpers::redisListToArray($list) ); }); } @@ -251,7 +252,7 @@ class RedisCollector extends MemoryCollector ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->then(function ($list) use ($appId) { return $this->arrayToStatisticInstance( - $appId, $this->redisListToArray($list) + $appId, Helpers::redisListToArray($list) ); }); } @@ -361,26 +362,6 @@ class RedisCollector extends MemoryCollector return new RedisLock($this->redis, static::$redisLockName, 0); } - /** - * Transform the Redis' list of key after value - * to key-value pairs. - * - * @param array $list - * @return array - */ - protected function redisListToArray(array $list) - { - // Redis lists come into a format where the keys are on even indexes - // and the values are on odd indexes. This way, we know which - // ones are keys and which ones are values and their get combined - // later to form the key => value array. - [$keys, $values] = collect($list)->partition(function ($value, $key) { - return $key % 2 === 0; - }); - - return array_combine($keys->all(), $values->all()); - } - /** * Transform a key-value pair to a Statistic instance. * diff --git a/tests/ReplicationTest.php b/tests/ReplicationTest.php index 00ee615..f08c6b0 100644 --- a/tests/ReplicationTest.php +++ b/tests/ReplicationTest.php @@ -32,4 +32,106 @@ class ReplicationTest extends TestCase 'data' => ['channel' => 'public-channel', 'test' => 'yes'], ]); } + + public function test_not_ponged_connections_do_get_removed_for_public_channels() + { + $this->runOnlyOnRedisReplication(); + + $connection = $this->newActiveConnection(['public-channel']); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($connection, now()->subDays(1)); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + } + + public function test_not_ponged_connections_do_get_removed_for_private_channels() + { + $this->runOnlyOnRedisReplication(); + + $connection = $this->newPrivateConnection('private-channel'); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($connection, now()->subDays(1)); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + } + + public function test_not_ponged_connections_do_get_removed_for_presence_channels() + { + $this->runOnlyOnRedisReplication(); + + $connection = $this->newPresenceConnection('presence-channel'); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($connection, now()->subDays(1)); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(1, $members); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(0, $members); + }); + } } From 980f9271f0c092a7752c5f7e6c7f2cacc44b25ad Mon Sep 17 00:00:00 2001 From: rennokki Date: Tue, 15 Sep 2020 09:30:43 +0000 Subject: [PATCH 6/7] Apply fixes from StyleCI (#526) --- src/ChannelManagers/RedisChannelManager.php | 1 - src/Console/Commands/StartServer.php | 2 +- src/Server/MockableConnection.php | 8 ++++---- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index a4f564b..ee8ce76 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -12,7 +12,6 @@ use Illuminate\Cache\RedisLock; use Illuminate\Support\Facades\Redis; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; -use Ratchet\WebSocket\WsConnection; use React\EventLoop\LoopInterface; use React\Promise\PromiseInterface; use stdClass; diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index e6c0676..890a4f1 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -187,7 +187,7 @@ class StartServer extends Command /** * Configure the tracker that will delete - * from the store the connections that + * from the store the connections that. * * @return void */ diff --git a/src/Server/MockableConnection.php b/src/Server/MockableConnection.php index 9fb5813..46a2f72 100644 --- a/src/Server/MockableConnection.php +++ b/src/Server/MockableConnection.php @@ -23,21 +23,21 @@ class MockableConnection implements ConnectionInterface } /** - * Send data to the connection + * Send data to the connection. * @param string $data * @return \Ratchet\ConnectionInterface */ - function send($data) + public function send($data) { // } /** - * Close the connection + * Close the connection. * * @return void */ - function close() + public function close() { // } From e1f038432a414f6d268c1b13b7930efd36dcbfa7 Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Tue, 15 Sep 2020 12:35:51 +0300 Subject: [PATCH 7/7] space --- src/Server/MockableConnection.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Server/MockableConnection.php b/src/Server/MockableConnection.php index 46a2f72..4d6d8f7 100644 --- a/src/Server/MockableConnection.php +++ b/src/Server/MockableConnection.php @@ -24,6 +24,7 @@ class MockableConnection implements ConnectionInterface /** * Send data to the connection. + * * @param string $data * @return \Ratchet\ConnectionInterface */