Added local removal for obsolete connections.

This commit is contained in:
Alex Renoki 2020-09-19 18:38:08 +03:00
parent 1f6e714873
commit 223a789b0d
5 changed files with 209 additions and 6 deletions

View File

@ -7,6 +7,9 @@ use BeyondCode\LaravelWebSockets\Channels\PresenceChannel;
use BeyondCode\LaravelWebSockets\Channels\PrivateChannel;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use BeyondCode\LaravelWebSockets\Helpers;
use Carbon\Carbon;
use Illuminate\Cache\ArrayLock;
use Illuminate\Cache\ArrayStore;
use Illuminate\Support\Str;
use Ratchet\ConnectionInterface;
use React\EventLoop\LoopInterface;
@ -43,6 +46,14 @@ class LocalChannelManager implements ChannelManager
*/
protected $acceptsNewConnections = true;
/**
* The lock name to use on Array to avoid multiple
* actions that might lead to multiple processings.
*
* @var string
*/
protected static $lockName = 'laravel-websockets:channel-manager:lock';
/**
* Create a new channel manager instance.
*
@ -398,7 +409,9 @@ class LocalChannelManager implements ChannelManager
*/
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{
return Helpers::createFulfilledPromise(true);
$connection->lastPongedAt = Carbon::now();
return $this->updateConnectionInChannels($connection);
}
/**
@ -408,7 +421,43 @@ class LocalChannelManager implements ChannelManager
*/
public function removeObsoleteConnections(): PromiseInterface
{
return Helpers::createFulfilledPromise(true);
if (! $this->lock()->acquire()) {
return Helpers::createFulfilledPromise(false);
}
$this->getLocalConnections()->then(function ($connections) {
foreach ($connections as $connection) {
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
if ($differenceInSeconds > 120) {
$this->unsubscribeFromAllChannels($connection);
}
}
});
return Helpers::createFulfilledPromise(
$this->lock()->release()
);
}
/**
* Update the connection in all channels.
*
* @param ConnectionInterface $connection
* @return PromiseInterface[bool]
*/
public function updateConnectionInChannels($connection): PromiseInterface
{
return $this->getLocalChannels($connection->app->id)
->then(function ($channels) use ($connection) {
foreach ($channels as $channel) {
if ($channel->hasConnection($connection)) {
$channel->saveConnection($connection);
}
}
return true;
});
}
/**
@ -452,4 +501,14 @@ class LocalChannelManager implements ChannelManager
return Channel::class;
}
/**
* Get a new ArrayLock instance to avoid race conditions.
*
* @return \Illuminate\Cache\CacheLock
*/
protected function lock()
{
return new ArrayLock(new ArrayStore, static::$lockName, 0);
}
}

View File

@ -59,7 +59,7 @@ class RedisChannelManager extends LocalChannelManager
*
* @var string
*/
protected static $redisLockName = 'laravel-websockets:channel-manager:lock';
protected static $lockName = 'laravel-websockets:channel-manager:lock';
/**
* Create a new channel manager instance.
@ -768,7 +768,7 @@ class RedisChannelManager extends LocalChannelManager
*/
protected function lock()
{
return new RedisLock($this->redis, static::$redisLockName, 0);
return new RedisLock($this->redis, static::$lockName, 0);
}
/**

View File

@ -100,7 +100,7 @@ class Channel
*/
public function unsubscribe(ConnectionInterface $connection): bool
{
if (! isset($this->connections[$connection->socketId])) {
if (! $this->hasConnection($connection)) {
return false;
}
@ -109,13 +109,24 @@ class Channel
return true;
}
/**
* Check if the given connection exists.
*
* @param \Ratchet\ConnectionInterface $connection
* @return bool
*/
public function hasConnection(ConnectionInterface $connection): bool
{
return isset($this->connections[$connection->socketId]);
}
/**
* Store the connection to the subscribers list.
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
*/
protected function saveConnection(ConnectionInterface $connection)
public function saveConnection(ConnectionInterface $connection)
{
$this->connections[$connection->socketId] = $connection;
}

View File

@ -57,6 +57,8 @@ class WebSocketHandler implements MessageComponentInterface
$this->channelManager->subscribeToApp($connection->app->id);
$this->channelManager->connectionPonged($connection);
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_CONNECTED, [
'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}",
'socketId' => $connection->socketId,

View File

@ -0,0 +1,131 @@
<?php
namespace BeyondCode\LaravelWebSockets\Test;
use Carbon\Carbon;
class LocalPongRemovalTest extends TestCase
{
public function test_not_ponged_connections_do_get_removed_on_local_for_public_channels()
{
$this->runOnlyOnLocalReplication();
$activeConnection = $this->newActiveConnection(['public-channel']);
$obsoleteConnection = $this->newActiveConnection(['public-channel']);
// The active connection just pinged, it should not be closed.
$activeConnection->lastPongedAt = Carbon::now();
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
$this->channelManager->updateConnectionInChannels($activeConnection);
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
$this->channelManager
->getGlobalConnectionsCount('1234', 'public-channel')
->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->channelManager->removeObsoleteConnections();
$this->channelManager
->getGlobalConnectionsCount('1234', 'public-channel')
->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->channelManager
->getLocalConnections()
->then(function ($connections) use ($activeConnection) {
$connection = $connections[$activeConnection->socketId];
$this->assertEquals($activeConnection->socketId, $connection->socketId);
});
}
public function test_not_ponged_connections_do_get_removed_on_local_for_private_channels()
{
$this->runOnlyOnLocalReplication();
$activeConnection = $this->newPrivateConnection('private-channel');
$obsoleteConnection = $this->newPrivateConnection('private-channel');
// The active connection just pinged, it should not be closed.
$activeConnection->lastPongedAt = Carbon::now();
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
$this->channelManager->updateConnectionInChannels($activeConnection);
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
$this->channelManager
->getGlobalConnectionsCount('1234', 'private-channel')
->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->channelManager->removeObsoleteConnections();
$this->channelManager
->getGlobalConnectionsCount('1234', 'private-channel')
->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->channelManager
->getLocalConnections()
->then(function ($connections) use ($activeConnection) {
$connection = $connections[$activeConnection->socketId];
$this->assertEquals($activeConnection->socketId, $connection->socketId);
});
}
public function test_not_ponged_connections_do_get_removed_on_local_for_presence_channels()
{
$this->runOnlyOnLocalReplication();
$activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
// The active connection just pinged, it should not be closed.
$activeConnection->lastPongedAt = Carbon::now();
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
$this->channelManager->updateConnectionInChannels($activeConnection);
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
$this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(2, $members);
});
$this->channelManager->removeObsoleteConnections();
$this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->channelManager
->getLocalConnections()
->then(function ($connections) use ($activeConnection) {
$connection = $connections[$activeConnection->socketId];
$this->assertEquals($activeConnection->socketId, $connection->socketId);
});
$this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(1, $members);
});
}
}