diff --git a/composer.json b/composer.json index 9df2218..06a9d11 100644 --- a/composer.json +++ b/composer.json @@ -38,11 +38,11 @@ "evenement/evenement": "^2.0|^3.0", "facade/ignition-contracts": "^1.0", "guzzlehttp/psr7": "^1.5", - "illuminate/broadcasting": "^6.0|^7.0|^8.0", - "illuminate/console": "^6.0|^7.0|^8.0", - "illuminate/http": "^6.0|^7.0|^8.0", - "illuminate/routing": "^6.0|^7.0|^8.0", - "illuminate/support": "^6.0|^7.0|^8.0", + "illuminate/broadcasting": "^6.3|^7.0|^8.0", + "illuminate/console": "^6.3|^7.0|^8.0", + "illuminate/http": "^6.3|^7.0|^8.0", + "illuminate/routing": "^6.3|^7.0|^8.0", + "illuminate/support": "^6.3|^7.0|^8.0", "pusher/pusher-php-server": "^3.0|^4.0", "react/promise": "^2.0", "symfony/http-kernel": "^4.0|^5.0", diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 4d4c835..ad01f7a 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -7,6 +7,9 @@ use BeyondCode\LaravelWebSockets\Channels\PresenceChannel; use BeyondCode\LaravelWebSockets\Channels\PrivateChannel; use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\Helpers; +use Carbon\Carbon; +use Illuminate\Cache\ArrayLock; +use Illuminate\Cache\ArrayStore; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use React\EventLoop\LoopInterface; @@ -43,6 +46,21 @@ class LocalChannelManager implements ChannelManager */ protected $acceptsNewConnections = true; + /** + * The ArrayStore instance of locks. + * + * @var \Illuminate\Cache\ArrayStore + */ + protected $store; + + /** + * The lock name to use on Array to avoid multiple + * actions that might lead to multiple processings. + * + * @var string + */ + protected static $lockName = 'laravel-websockets:channel-manager:lock'; + /** * Create a new channel manager instance. * @@ -52,7 +70,7 @@ class LocalChannelManager implements ChannelManager */ public function __construct(LoopInterface $loop, $factoryClass = null) { - // + $this->store = new ArrayStore; } /** @@ -398,7 +416,9 @@ class LocalChannelManager implements ChannelManager */ public function connectionPonged(ConnectionInterface $connection): PromiseInterface { - return Helpers::createFulfilledPromise(true); + $connection->lastPongedAt = Carbon::now(); + + return $this->updateConnectionInChannels($connection); } /** @@ -408,7 +428,43 @@ class LocalChannelManager implements ChannelManager */ public function removeObsoleteConnections(): PromiseInterface { - return Helpers::createFulfilledPromise(true); + if (! $this->lock()->acquire()) { + return Helpers::createFulfilledPromise(false); + } + + $this->getLocalConnections()->then(function ($connections) { + foreach ($connections as $connection) { + $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); + + if ($differenceInSeconds > 120) { + $this->unsubscribeFromAllChannels($connection); + } + } + }); + + return Helpers::createFulfilledPromise( + $this->lock()->release() + ); + } + + /** + * Update the connection in all channels. + * + * @param ConnectionInterface $connection + * @return PromiseInterface[bool] + */ + public function updateConnectionInChannels($connection): PromiseInterface + { + return $this->getLocalChannels($connection->app->id) + ->then(function ($channels) use ($connection) { + foreach ($channels as $channel) { + if ($channel->hasConnection($connection)) { + $channel->saveConnection($connection); + } + } + + return true; + }); } /** @@ -452,4 +508,14 @@ class LocalChannelManager implements ChannelManager return Channel::class; } + + /** + * Get a new ArrayLock instance to avoid race conditions. + * + * @return \Illuminate\Cache\CacheLock + */ + protected function lock() + { + return new ArrayLock($this->store, static::$lockName, 0); + } } diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 58ae6d4..c099bbf 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -59,7 +59,7 @@ class RedisChannelManager extends LocalChannelManager * * @var string */ - protected static $redisLockName = 'laravel-websockets:channel-manager:lock'; + protected static $lockName = 'laravel-websockets:channel-manager:lock'; /** * Create a new channel manager instance. @@ -768,7 +768,7 @@ class RedisChannelManager extends LocalChannelManager */ protected function lock() { - return new RedisLock($this->redis, static::$redisLockName, 0); + return new RedisLock($this->redis, static::$lockName, 0); } /** diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 2abf150..e64a4d1 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -100,7 +100,7 @@ class Channel */ public function unsubscribe(ConnectionInterface $connection): bool { - if (! isset($this->connections[$connection->socketId])) { + if (! $this->hasConnection($connection)) { return false; } @@ -109,13 +109,24 @@ class Channel return true; } + /** + * Check if the given connection exists. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function hasConnection(ConnectionInterface $connection): bool + { + return isset($this->connections[$connection->socketId]); + } + /** * Store the connection to the subscribers list. * * @param \Ratchet\ConnectionInterface $connection * @return void */ - protected function saveConnection(ConnectionInterface $connection) + public function saveConnection(ConnectionInterface $connection) { $this->connections[$connection->socketId] = $connection; } diff --git a/src/Server/WebSocketHandler.php b/src/Server/WebSocketHandler.php index 4b7f7bc..9fd3fe2 100644 --- a/src/Server/WebSocketHandler.php +++ b/src/Server/WebSocketHandler.php @@ -57,6 +57,8 @@ class WebSocketHandler implements MessageComponentInterface $this->channelManager->subscribeToApp($connection->app->id); + $this->channelManager->connectionPonged($connection); + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_CONNECTED, [ 'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}", 'socketId' => $connection->socketId, diff --git a/tests/LocalPongRemovalTest.php b/tests/LocalPongRemovalTest.php new file mode 100644 index 0000000..fa643e4 --- /dev/null +++ b/tests/LocalPongRemovalTest.php @@ -0,0 +1,131 @@ +runOnlyOnLocalReplication(); + + $activeConnection = $this->newActiveConnection(['public-channel']); + $obsoleteConnection = $this->newActiveConnection(['public-channel']); + + // The active connection just pinged, it should not be closed. + $activeConnection->lastPongedAt = Carbon::now(); + $obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1); + + $this->channelManager->updateConnectionInChannels($activeConnection); + $this->channelManager->updateConnectionInChannels($obsoleteConnection); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) use ($activeConnection) { + $connection = $connections[$activeConnection->socketId]; + + $this->assertEquals($activeConnection->socketId, $connection->socketId); + }); + } + + public function test_not_ponged_connections_do_get_removed_on_local_for_private_channels() + { + $this->runOnlyOnLocalReplication(); + + $activeConnection = $this->newPrivateConnection('private-channel'); + $obsoleteConnection = $this->newPrivateConnection('private-channel'); + + // The active connection just pinged, it should not be closed. + $activeConnection->lastPongedAt = Carbon::now(); + $obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1); + + $this->channelManager->updateConnectionInChannels($activeConnection); + $this->channelManager->updateConnectionInChannels($obsoleteConnection); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) use ($activeConnection) { + $connection = $connections[$activeConnection->socketId]; + + $this->assertEquals($activeConnection->socketId, $connection->socketId); + }); + } + + public function test_not_ponged_connections_do_get_removed_on_local_for_presence_channels() + { + $this->runOnlyOnLocalReplication(); + + $activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); + $obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]); + + // The active connection just pinged, it should not be closed. + $activeConnection->lastPongedAt = Carbon::now(); + $obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1); + + $this->channelManager->updateConnectionInChannels($activeConnection); + $this->channelManager->updateConnectionInChannels($obsoleteConnection); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(2, $members); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) use ($activeConnection) { + $connection = $connections[$activeConnection->socketId]; + + $this->assertEquals($activeConnection->socketId, $connection->socketId); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(1, $members); + }); + } +} diff --git a/tests/PresenceChannelTest.php b/tests/PresenceChannelTest.php index 9234ab8..d983c78 100644 --- a/tests/PresenceChannelTest.php +++ b/tests/PresenceChannelTest.php @@ -4,7 +4,6 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\API\TriggerEvent; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; -use Carbon\Carbon; use GuzzleHttp\Psr7\Request; use Illuminate\Http\JsonResponse; use Pusher\Pusher; @@ -318,58 +317,6 @@ class PresenceChannelTest extends TestCase }); } - public function test_not_ponged_connections_do_get_removed_for_presence_channels() - { - $this->runOnlyOnRedisReplication(); - - $activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); - $obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]); - - // The active connection just pinged, it should not be closed. - $this->channelManager->addConnectionToSet($activeConnection, Carbon::now()); - - // Make the connection look like it was lost 1 day ago. - $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); - - $this->channelManager - ->getGlobalConnectionsCount('1234', 'presence-channel') - ->then(function ($count) { - $this->assertEquals(2, $count); - }); - - $this->channelManager - ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) - ->then(function ($expiredConnections) { - $this->assertCount(1, $expiredConnections); - }); - - $this->channelManager - ->getChannelMembers('1234', 'presence-channel') - ->then(function ($members) { - $this->assertCount(2, $members); - }); - - $this->channelManager->removeObsoleteConnections(); - - $this->channelManager - ->getGlobalConnectionsCount('1234', 'presence-channel') - ->then(function ($count) { - $this->assertEquals(1, $count); - }); - - $this->channelManager - ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) - ->then(function ($expiredConnections) { - $this->assertCount(0, $expiredConnections); - }); - - $this->channelManager - ->getChannelMembers('1234', 'presence-channel') - ->then(function ($members) { - $this->assertCount(1, $members); - }); - } - public function test_events_are_processed_by_on_message_on_presence_channels() { $this->runOnlyOnRedisReplication(); diff --git a/tests/PrivateChannelTest.php b/tests/PrivateChannelTest.php index 8708bda..90efa6d 100644 --- a/tests/PrivateChannelTest.php +++ b/tests/PrivateChannelTest.php @@ -4,7 +4,6 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\API\TriggerEvent; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; -use Carbon\Carbon; use GuzzleHttp\Psr7\Request; use Illuminate\Http\JsonResponse; use Pusher\Pusher; @@ -159,46 +158,6 @@ class PrivateChannelTest extends TestCase }); } - public function test_not_ponged_connections_do_get_removed_for_private_channels() - { - $this->runOnlyOnRedisReplication(); - - $activeConnection = $this->newPrivateConnection('private-channel'); - $obsoleteConnection = $this->newPrivateConnection('private-channel'); - - // The active connection just pinged, it should not be closed. - $this->channelManager->addConnectionToSet($activeConnection, Carbon::now()); - - // Make the connection look like it was lost 1 day ago. - $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); - - $this->channelManager - ->getGlobalConnectionsCount('1234', 'private-channel') - ->then(function ($count) { - $this->assertEquals(2, $count); - }); - - $this->channelManager - ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) - ->then(function ($expiredConnections) { - $this->assertCount(1, $expiredConnections); - }); - - $this->channelManager->removeObsoleteConnections(); - - $this->channelManager - ->getGlobalConnectionsCount('1234', 'private-channel') - ->then(function ($count) { - $this->assertEquals(1, $count); - }); - - $this->channelManager - ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) - ->then(function ($expiredConnections) { - $this->assertCount(0, $expiredConnections); - }); - } - public function test_events_are_processed_by_on_message_on_private_channels() { $this->runOnlyOnRedisReplication(); diff --git a/tests/PublicChannelTest.php b/tests/PublicChannelTest.php index 84f0d17..b16498d 100644 --- a/tests/PublicChannelTest.php +++ b/tests/PublicChannelTest.php @@ -3,7 +3,6 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\API\TriggerEvent; -use Carbon\Carbon; use GuzzleHttp\Psr7\Request; use Illuminate\Http\JsonResponse; use Pusher\Pusher; @@ -140,46 +139,6 @@ class PublicChannelTest extends TestCase }); } - public function test_not_ponged_connections_do_get_removed_for_public_channels() - { - $this->runOnlyOnRedisReplication(); - - $activeConnection = $this->newActiveConnection(['public-channel']); - $obsoleteConnection = $this->newActiveConnection(['public-channel']); - - // The active connection just pinged, it should not be closed. - $this->channelManager->addConnectionToSet($activeConnection, Carbon::now()); - - // Make the connection look like it was lost 1 day ago. - $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); - - $this->channelManager - ->getGlobalConnectionsCount('1234', 'public-channel') - ->then(function ($count) { - $this->assertEquals(2, $count); - }); - - $this->channelManager - ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) - ->then(function ($expiredConnections) { - $this->assertCount(1, $expiredConnections); - }); - - $this->channelManager->removeObsoleteConnections(); - - $this->channelManager - ->getGlobalConnectionsCount('1234', 'public-channel') - ->then(function ($count) { - $this->assertEquals(1, $count); - }); - - $this->channelManager - ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) - ->then(function ($expiredConnections) { - $this->assertCount(0, $expiredConnections); - }); - } - public function test_events_are_processed_by_on_message_on_public_channels() { $this->runOnlyOnRedisReplication(); diff --git a/tests/RedisPongRemovalTest.php b/tests/RedisPongRemovalTest.php new file mode 100644 index 0000000..14410fb --- /dev/null +++ b/tests/RedisPongRemovalTest.php @@ -0,0 +1,140 @@ +runOnlyOnRedisReplication(); + + $activeConnection = $this->newActiveConnection(['public-channel']); + $obsoleteConnection = $this->newActiveConnection(['public-channel']); + + // The active connection just pinged, it should not be closed. + $this->channelManager->addConnectionToSet($activeConnection, Carbon::now()); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + } + + public function test_not_ponged_connections_do_get_removed_on_redis_for_private_channels() + { + $this->runOnlyOnRedisReplication(); + + $activeConnection = $this->newPrivateConnection('private-channel'); + $obsoleteConnection = $this->newPrivateConnection('private-channel'); + + // The active connection just pinged, it should not be closed. + $this->channelManager->addConnectionToSet($activeConnection, Carbon::now()); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + } + + public function test_not_ponged_connections_do_get_removed_on_redis_for_presence_channels() + { + $this->runOnlyOnRedisReplication(); + + $activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); + $obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]); + + // The active connection just pinged, it should not be closed. + $this->channelManager->addConnectionToSet($activeConnection, Carbon::now()); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(2, $members); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(1, $members); + }); + } +}