Added soft closes for connections on SIGTERM/SIGINT

This commit is contained in:
Alex Renoki 2020-09-12 17:45:07 +03:00
parent 86fbf76a0e
commit ec47925c71
12 changed files with 278 additions and 12 deletions

View File

@ -56,6 +56,9 @@
"orchestra/database": "^4.0|^5.0|^6.0",
"phpunit/phpunit": "^8.0|^9.0"
},
"suggest": {
"ext-pcntl": "Running the server needs pcntl to listen to command signals and soft-shutdown."
},
"autoload": {
"psr-4": {
"BeyondCode\\LaravelWebSockets\\": "src/"

View File

@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager
*/
protected $users = [];
/**
* Wether the current instance accepts new connections.
*
* @var bool
*/
protected $acceptsNewConnections = true;
/**
* Create a new channel manager instance.
*
@ -71,6 +78,28 @@ class LocalChannelManager implements ChannelManager
return $this->channels[$appId][$channel];
}
/**
* Get the local connections, regardless of the channel
* they are connected to.
*
* @return \React\Promise\PromiseInterface
*/
public function getLocalConnections(): PromiseInterface
{
$connections = collect($this->channels)
->map(function ($channelsWithConnections, $appId) {
return collect($channelsWithConnections)->values();
})
->values()->collapse()
->map(function ($channel) {
return collect($channel->getConnections());
})
->values()->collapse()
->toArray();
return new FulfilledPromise($connections);
}
/**
* Get all channels for a specific app
* for the current instance.
@ -313,6 +342,29 @@ class LocalChannelManager implements ChannelManager
return new FulfilledPromise($results);
}
/**
* Mark the current instance as unable to accept new connections.
*
* @return $this
*/
public function declineNewConnections()
{
$this->acceptsNewConnections = false;
return $this;
}
/**
* Check if the current server instance
* accepts new connections.
*
* @return bool
*/
public function acceptsNewConnections(): bool
{
return $this->acceptsNewConnections;
}
/**
* Get the channel class by the channel name.
*

View File

@ -67,6 +67,17 @@ class RedisChannelManager extends LocalChannelManager
$this->serverId = Str::uuid()->toString();
}
/**
* Get the local connections, regardless of the channel
* they are connected to.
*
* @return \React\Promise\PromiseInterface
*/
public function getLocalConnections(): PromiseInterface
{
return parent::getLocalConnections();
}
/**
* Get all channels for a specific app
* for the current instance.
@ -108,9 +119,9 @@ class RedisChannelManager extends LocalChannelManager
$connection, $channel, new stdClass
);
}
});
})->then(function () use ($connection) {
parent::unsubscribeFromAllChannels($connection);
});
}
/**

View File

@ -26,7 +26,7 @@ class StartServer extends Command
{--disable-statistics : Disable the statistics tracking.}
{--statistics-interval= : The amount of seconds to tick between statistics saving.}
{--debug : Forces the loggers to be enabled and thereby overriding the APP_DEBUG setting.}
{--test : Prepare the server, but do not start it.}
{--loop : Programatically inject the loop.}
';
/**
@ -79,6 +79,8 @@ class StartServer extends Command
$this->configureRoutes();
$this->configurePcntlSignal();
$this->startServer();
}
@ -156,6 +158,31 @@ class StartServer extends Command
WebSocketRouter::routes();
}
/**
* Configure the PCNTL signals for soft shutdown.
*
* @return void
*/
protected function configurePcntlSignal()
{
// When the process receives a SIGTERM or a SIGINT
// signal, it should mark the server as unavailable
// to receive new connections, close the current connections,
// then stopping the loop.
$this->loop->addSignal(SIGTERM, function () {
$this->line('Closing existing connections...');
$this->triggerSoftShutdown();
});
$this->loop->addSignal(SIGINT, function () {
$this->line('Closing existing connections...');
$this->triggerSoftShutdown();
});
}
/**
* Configure the HTTP logger class.
*
@ -209,14 +236,6 @@ class StartServer extends Command
$this->buildServer();
// For testing, just boot up the server, run it
// but exit after the next tick.
if ($this->option('test')) {
$this->loop->futureTick(function () {
$this->loop->stop();
});
}
$this->server->run();
}
@ -231,6 +250,10 @@ class StartServer extends Command
$this->option('host'), $this->option('port')
);
if ($loop = $this->option('loop')) {
$this->loop = $loop;
}
$this->server = $this->server
->setLoop($this->loop)
->withRoutes(WebSocketRouter::getRoutes())
@ -249,4 +272,29 @@ class StartServer extends Command
'beyondcode:websockets:restart', 0
);
}
/**
* Trigger a soft shutdown for the process.
*
* @return void
*/
protected function triggerSoftShutdown()
{
$channelManager = $this->laravel->make(ChannelManager::class);
// Close the new connections allowance on this server.
$channelManager->declineNewConnections();
// Get all local connections and close them. They will
// be automatically be unsubscribed from all channels.
$channelManager->getLocalConnections()
->then(function ($connections) use ($channelManager) {
foreach ($connections as $connection) {
$connection->close();
}
})
->then(function () {
$this->loop->stop();
});
}
}

View File

@ -36,6 +36,14 @@ interface ChannelManager
*/
public function findOrCreate($appId, string $channel);
/**
* Get the local connections, regardless of the channel
* they are connected to.
*
* @return \React\Promise\PromiseInterface
*/
public function getLocalConnections(): PromiseInterface;
/**
* Get all channels for a specific app
* for the current instance.

View File

@ -39,6 +39,10 @@ class WebSocketHandler implements MessageComponentInterface
*/
public function onOpen(ConnectionInterface $connection)
{
if (! $this->connectionCanBeMade($connection)) {
return $connection->close();
}
$this->verifyAppKey($connection)
->verifyOrigin($connection)
->limitConcurrentConnections($connection)
@ -69,6 +73,10 @@ class WebSocketHandler implements MessageComponentInterface
*/
public function onMessage(ConnectionInterface $connection, MessageInterface $message)
{
if (! isset($connection->app)) {
return;
}
Messages\PusherMessageFactory::createForMessage(
$message, $connection, $this->channelManager
)->respond();
@ -113,6 +121,18 @@ class WebSocketHandler implements MessageComponentInterface
}
}
/**
* Check if the connection can be made for the
* current server instance.
*
* @param \Ratchet\ConnectionInterface $connection
* @return bool
*/
protected function connectionCanBeMade(ConnectionInterface $connection): bool
{
return $this->channelManager->acceptsNewConnections();
}
/**
* Verify the app key validity.
*

View File

@ -8,7 +8,43 @@ class StartServerTest extends TestCase
{
public function test_does_not_fail_if_building_up()
{
$this->artisan('websockets:serve', ['--test' => true, '--debug' => true]);
$this->loop->futureTick(function () {
$this->loop->stop();
});
$this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6001]);
$this->assertTrue(true);
}
public function test_pcntl_sigint_signal()
{
$this->loop->futureTick(function () {
$this->newActiveConnection(['public-channel']);
$this->newActiveConnection(['public-channel']);
posix_kill(posix_getpid(), SIGINT);
$this->loop->stop();
});
$this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6002]);
$this->assertTrue(true);
}
public function test_pcntl_sigterm_signal()
{
$this->loop->futureTick(function () {
$this->newActiveConnection(['public-channel']);
$this->newActiveConnection(['public-channel']);
posix_kill(posix_getpid(), SIGTERM);
$this->loop->stop();
});
$this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6003]);
$this->assertTrue(true);
}

View File

@ -108,4 +108,22 @@ class ConnectionTest extends TestCase
->assertSentEvent('pusher:error', ['data' => ['message' => 'Over capacity', 'code' => 4100]])
->assertClosed();
}
public function test_close_all_new_connections_after_stating_the_server_does_not_accept_new_connections()
{
$allowedConnection = $this->newActiveConnection(['test-channel']);
$allowedConnection->assertSentEvent('pusher:connection_established')
->assertSentEvent('pusher_internal:subscription_succeeded');
$this->channelManager->declineNewConnections();
$this->assertFalse(
$this->channelManager->acceptsNewConnections()
);
$this->newActiveConnection(['test-channel'])
->assertNothingSent()
->assertClosed();
}
}

View File

@ -97,6 +97,18 @@ class Connection implements ConnectionInterface
return $this;
}
/**
* Assert that no events occured within the connection.
*
* @return $this
*/
public function assertNothingSent()
{
PHPUnit::assertEquals([], $this->sentData);
return $this;
}
/**
* Assert the connection is closed.
*

View File

@ -3,6 +3,7 @@
namespace BeyondCode\LaravelWebSockets\Test;
use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature;
use Ratchet\ConnectionInterface;
class PresenceChannelTest extends TestCase
{
@ -185,4 +186,22 @@ class PresenceChannelTest extends TestCase
], $statistic->toArray());
});
}
public function test_local_connections_for_private_channels()
{
$this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$this->newPresenceConnection('presence-channel-2', ['user_id' => 2]);
$this->channelManager
->getLocalConnections()
->then(function ($connections) {
$this->assertCount(2, $connections);
foreach ($connections as $connection) {
$this->assertInstanceOf(
ConnectionInterface::class, $connection
);
}
});
}
}

View File

@ -3,6 +3,7 @@
namespace BeyondCode\LaravelWebSockets\Test;
use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature;
use Ratchet\ConnectionInterface;
class PrivateChannelTest extends TestCase
{
@ -138,4 +139,22 @@ class PrivateChannelTest extends TestCase
], $statistic->toArray());
});
}
public function test_local_connections_for_private_channels()
{
$this->newPrivateConnection('private-channel');
$this->newPrivateConnection('private-channel-2');
$this->channelManager
->getLocalConnections()
->then(function ($connections) {
$this->assertCount(2, $connections);
foreach ($connections as $connection) {
$this->assertInstanceOf(
ConnectionInterface::class, $connection
);
}
});
}
}

View File

@ -2,6 +2,8 @@
namespace BeyondCode\LaravelWebSockets\Test;
use Ratchet\ConnectionInterface;
class PublicChannelTest extends TestCase
{
public function test_connect_to_public_channel()
@ -114,4 +116,22 @@ class PublicChannelTest extends TestCase
], $statistic->toArray());
});
}
public function test_local_connections_for_public_channels()
{
$this->newActiveConnection(['public-channel']);
$this->newActiveConnection(['public-channel-2']);
$this->channelManager
->getLocalConnections()
->then(function ($connections) {
$this->assertCount(2, $connections);
foreach ($connections as $connection) {
$this->assertInstanceOf(
ConnectionInterface::class, $connection
);
}
});
}
}