From ec47925c71a5d1e4f3c2461954a15c52f161669f Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Sat, 12 Sep 2020 17:45:07 +0300 Subject: [PATCH] Added soft closes for connections on SIGTERM/SIGINT --- composer.json | 3 + src/ChannelManagers/LocalChannelManager.php | 52 ++++++++++++++++ src/ChannelManagers/RedisChannelManager.php | 15 ++++- src/Console/Commands/StartServer.php | 66 ++++++++++++++++++--- src/Contracts/ChannelManager.php | 8 +++ src/Server/WebSocketHandler.php | 20 +++++++ tests/Commands/StartServerTest.php | 38 +++++++++++- tests/ConnectionTest.php | 18 ++++++ tests/Mocks/Connection.php | 12 ++++ tests/PresenceChannelTest.php | 19 ++++++ tests/PrivateChannelTest.php | 19 ++++++ tests/PublicChannelTest.php | 20 +++++++ 12 files changed, 278 insertions(+), 12 deletions(-) diff --git a/composer.json b/composer.json index dc8968a..9df2218 100644 --- a/composer.json +++ b/composer.json @@ -56,6 +56,9 @@ "orchestra/database": "^4.0|^5.0|^6.0", "phpunit/phpunit": "^8.0|^9.0" }, + "suggest": { + "ext-pcntl": "Running the server needs pcntl to listen to command signals and soft-shutdown." + }, "autoload": { "psr-4": { "BeyondCode\\LaravelWebSockets\\": "src/" diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 2b8150c..a889960 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager */ protected $users = []; + /** + * Wether the current instance accepts new connections. + * + * @var bool + */ + protected $acceptsNewConnections = true; + /** * Create a new channel manager instance. * @@ -71,6 +78,28 @@ class LocalChannelManager implements ChannelManager return $this->channels[$appId][$channel]; } + /** + * Get the local connections, regardless of the channel + * they are connected to. + * + * @return \React\Promise\PromiseInterface + */ + public function getLocalConnections(): PromiseInterface + { + $connections = collect($this->channels) + ->map(function ($channelsWithConnections, $appId) { + return collect($channelsWithConnections)->values(); + }) + ->values()->collapse() + ->map(function ($channel) { + return collect($channel->getConnections()); + }) + ->values()->collapse() + ->toArray(); + + return new FulfilledPromise($connections); + } + /** * Get all channels for a specific app * for the current instance. @@ -313,6 +342,29 @@ class LocalChannelManager implements ChannelManager return new FulfilledPromise($results); } + /** + * Mark the current instance as unable to accept new connections. + * + * @return $this + */ + public function declineNewConnections() + { + $this->acceptsNewConnections = false; + + return $this; + } + + /** + * Check if the current server instance + * accepts new connections. + * + * @return bool + */ + public function acceptsNewConnections(): bool + { + return $this->acceptsNewConnections; + } + /** * Get the channel class by the channel name. * diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 0d884b3..8bed7cb 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -67,6 +67,17 @@ class RedisChannelManager extends LocalChannelManager $this->serverId = Str::uuid()->toString(); } + /** + * Get the local connections, regardless of the channel + * they are connected to. + * + * @return \React\Promise\PromiseInterface + */ + public function getLocalConnections(): PromiseInterface + { + return parent::getLocalConnections(); + } + /** * Get all channels for a specific app * for the current instance. @@ -108,9 +119,9 @@ class RedisChannelManager extends LocalChannelManager $connection, $channel, new stdClass ); } + })->then(function () use ($connection) { + parent::unsubscribeFromAllChannels($connection); }); - - parent::unsubscribeFromAllChannels($connection); } /** diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 664d6a9..03d6e01 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -26,7 +26,7 @@ class StartServer extends Command {--disable-statistics : Disable the statistics tracking.} {--statistics-interval= : The amount of seconds to tick between statistics saving.} {--debug : Forces the loggers to be enabled and thereby overriding the APP_DEBUG setting.} - {--test : Prepare the server, but do not start it.} + {--loop : Programatically inject the loop.} '; /** @@ -79,6 +79,8 @@ class StartServer extends Command $this->configureRoutes(); + $this->configurePcntlSignal(); + $this->startServer(); } @@ -156,6 +158,31 @@ class StartServer extends Command WebSocketRouter::routes(); } + /** + * Configure the PCNTL signals for soft shutdown. + * + * @return void + */ + protected function configurePcntlSignal() + { + // When the process receives a SIGTERM or a SIGINT + // signal, it should mark the server as unavailable + // to receive new connections, close the current connections, + // then stopping the loop. + + $this->loop->addSignal(SIGTERM, function () { + $this->line('Closing existing connections...'); + + $this->triggerSoftShutdown(); + }); + + $this->loop->addSignal(SIGINT, function () { + $this->line('Closing existing connections...'); + + $this->triggerSoftShutdown(); + }); + } + /** * Configure the HTTP logger class. * @@ -209,14 +236,6 @@ class StartServer extends Command $this->buildServer(); - // For testing, just boot up the server, run it - // but exit after the next tick. - if ($this->option('test')) { - $this->loop->futureTick(function () { - $this->loop->stop(); - }); - } - $this->server->run(); } @@ -231,6 +250,10 @@ class StartServer extends Command $this->option('host'), $this->option('port') ); + if ($loop = $this->option('loop')) { + $this->loop = $loop; + } + $this->server = $this->server ->setLoop($this->loop) ->withRoutes(WebSocketRouter::getRoutes()) @@ -249,4 +272,29 @@ class StartServer extends Command 'beyondcode:websockets:restart', 0 ); } + + /** + * Trigger a soft shutdown for the process. + * + * @return void + */ + protected function triggerSoftShutdown() + { + $channelManager = $this->laravel->make(ChannelManager::class); + + // Close the new connections allowance on this server. + $channelManager->declineNewConnections(); + + // Get all local connections and close them. They will + // be automatically be unsubscribed from all channels. + $channelManager->getLocalConnections() + ->then(function ($connections) use ($channelManager) { + foreach ($connections as $connection) { + $connection->close(); + } + }) + ->then(function () { + $this->loop->stop(); + }); + } } diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index e056e11..ccc15c0 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -36,6 +36,14 @@ interface ChannelManager */ public function findOrCreate($appId, string $channel); + /** + * Get the local connections, regardless of the channel + * they are connected to. + * + * @return \React\Promise\PromiseInterface + */ + public function getLocalConnections(): PromiseInterface; + /** * Get all channels for a specific app * for the current instance. diff --git a/src/Server/WebSocketHandler.php b/src/Server/WebSocketHandler.php index 1016a1a..0dbe8be 100644 --- a/src/Server/WebSocketHandler.php +++ b/src/Server/WebSocketHandler.php @@ -39,6 +39,10 @@ class WebSocketHandler implements MessageComponentInterface */ public function onOpen(ConnectionInterface $connection) { + if (! $this->connectionCanBeMade($connection)) { + return $connection->close(); + } + $this->verifyAppKey($connection) ->verifyOrigin($connection) ->limitConcurrentConnections($connection) @@ -69,6 +73,10 @@ class WebSocketHandler implements MessageComponentInterface */ public function onMessage(ConnectionInterface $connection, MessageInterface $message) { + if (! isset($connection->app)) { + return; + } + Messages\PusherMessageFactory::createForMessage( $message, $connection, $this->channelManager )->respond(); @@ -113,6 +121,18 @@ class WebSocketHandler implements MessageComponentInterface } } + /** + * Check if the connection can be made for the + * current server instance. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + protected function connectionCanBeMade(ConnectionInterface $connection): bool + { + return $this->channelManager->acceptsNewConnections(); + } + /** * Verify the app key validity. * diff --git a/tests/Commands/StartServerTest.php b/tests/Commands/StartServerTest.php index 223331c..08f71a3 100644 --- a/tests/Commands/StartServerTest.php +++ b/tests/Commands/StartServerTest.php @@ -8,7 +8,43 @@ class StartServerTest extends TestCase { public function test_does_not_fail_if_building_up() { - $this->artisan('websockets:serve', ['--test' => true, '--debug' => true]); + $this->loop->futureTick(function () { + $this->loop->stop(); + }); + + $this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6001]); + + $this->assertTrue(true); + } + + public function test_pcntl_sigint_signal() + { + $this->loop->futureTick(function () { + $this->newActiveConnection(['public-channel']); + $this->newActiveConnection(['public-channel']); + + posix_kill(posix_getpid(), SIGINT); + + $this->loop->stop(); + }); + + $this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6002]); + + $this->assertTrue(true); + } + + public function test_pcntl_sigterm_signal() + { + $this->loop->futureTick(function () { + $this->newActiveConnection(['public-channel']); + $this->newActiveConnection(['public-channel']); + + posix_kill(posix_getpid(), SIGTERM); + + $this->loop->stop(); + }); + + $this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6003]); $this->assertTrue(true); } diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index e4e3701..61caf68 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -108,4 +108,22 @@ class ConnectionTest extends TestCase ->assertSentEvent('pusher:error', ['data' => ['message' => 'Over capacity', 'code' => 4100]]) ->assertClosed(); } + + public function test_close_all_new_connections_after_stating_the_server_does_not_accept_new_connections() + { + $allowedConnection = $this->newActiveConnection(['test-channel']); + + $allowedConnection->assertSentEvent('pusher:connection_established') + ->assertSentEvent('pusher_internal:subscription_succeeded'); + + $this->channelManager->declineNewConnections(); + + $this->assertFalse( + $this->channelManager->acceptsNewConnections() + ); + + $this->newActiveConnection(['test-channel']) + ->assertNothingSent() + ->assertClosed(); + } } diff --git a/tests/Mocks/Connection.php b/tests/Mocks/Connection.php index 8de4a7b..42d02c0 100644 --- a/tests/Mocks/Connection.php +++ b/tests/Mocks/Connection.php @@ -97,6 +97,18 @@ class Connection implements ConnectionInterface return $this; } + /** + * Assert that no events occured within the connection. + * + * @return $this + */ + public function assertNothingSent() + { + PHPUnit::assertEquals([], $this->sentData); + + return $this; + } + /** * Assert the connection is closed. * diff --git a/tests/PresenceChannelTest.php b/tests/PresenceChannelTest.php index b7d0b8a..9d4bbcb 100644 --- a/tests/PresenceChannelTest.php +++ b/tests/PresenceChannelTest.php @@ -3,6 +3,7 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; +use Ratchet\ConnectionInterface; class PresenceChannelTest extends TestCase { @@ -185,4 +186,22 @@ class PresenceChannelTest extends TestCase ], $statistic->toArray()); }); } + + public function test_local_connections_for_private_channels() + { + $this->newPresenceConnection('presence-channel', ['user_id' => 1]); + $this->newPresenceConnection('presence-channel-2', ['user_id' => 2]); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); + + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); + } } diff --git a/tests/PrivateChannelTest.php b/tests/PrivateChannelTest.php index bfc4807..f28ce6d 100644 --- a/tests/PrivateChannelTest.php +++ b/tests/PrivateChannelTest.php @@ -3,6 +3,7 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; +use Ratchet\ConnectionInterface; class PrivateChannelTest extends TestCase { @@ -138,4 +139,22 @@ class PrivateChannelTest extends TestCase ], $statistic->toArray()); }); } + + public function test_local_connections_for_private_channels() + { + $this->newPrivateConnection('private-channel'); + $this->newPrivateConnection('private-channel-2'); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); + + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); + } } diff --git a/tests/PublicChannelTest.php b/tests/PublicChannelTest.php index 373f2f3..95d2f50 100644 --- a/tests/PublicChannelTest.php +++ b/tests/PublicChannelTest.php @@ -2,6 +2,8 @@ namespace BeyondCode\LaravelWebSockets\Test; +use Ratchet\ConnectionInterface; + class PublicChannelTest extends TestCase { public function test_connect_to_public_channel() @@ -114,4 +116,22 @@ class PublicChannelTest extends TestCase ], $statistic->toArray()); }); } + + public function test_local_connections_for_public_channels() + { + $this->newActiveConnection(['public-channel']); + $this->newActiveConnection(['public-channel-2']); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); + + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); + } }