Merge pull request #547 from beyondcode/feature/remove-obsolete-connections-after-120-seconds
[2.x] Remove obsolete connections older than 120 seconds for local channel manager
This commit is contained in:
commit
40ee5fb805
|
|
@ -38,11 +38,11 @@
|
|||
"evenement/evenement": "^2.0|^3.0",
|
||||
"facade/ignition-contracts": "^1.0",
|
||||
"guzzlehttp/psr7": "^1.5",
|
||||
"illuminate/broadcasting": "^6.0|^7.0|^8.0",
|
||||
"illuminate/console": "^6.0|^7.0|^8.0",
|
||||
"illuminate/http": "^6.0|^7.0|^8.0",
|
||||
"illuminate/routing": "^6.0|^7.0|^8.0",
|
||||
"illuminate/support": "^6.0|^7.0|^8.0",
|
||||
"illuminate/broadcasting": "^6.3|^7.0|^8.0",
|
||||
"illuminate/console": "^6.3|^7.0|^8.0",
|
||||
"illuminate/http": "^6.3|^7.0|^8.0",
|
||||
"illuminate/routing": "^6.3|^7.0|^8.0",
|
||||
"illuminate/support": "^6.3|^7.0|^8.0",
|
||||
"pusher/pusher-php-server": "^3.0|^4.0",
|
||||
"react/promise": "^2.0",
|
||||
"symfony/http-kernel": "^4.0|^5.0",
|
||||
|
|
|
|||
|
|
@ -7,6 +7,9 @@ use BeyondCode\LaravelWebSockets\Channels\PresenceChannel;
|
|||
use BeyondCode\LaravelWebSockets\Channels\PrivateChannel;
|
||||
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
|
||||
use BeyondCode\LaravelWebSockets\Helpers;
|
||||
use Carbon\Carbon;
|
||||
use Illuminate\Cache\ArrayLock;
|
||||
use Illuminate\Cache\ArrayStore;
|
||||
use Illuminate\Support\Str;
|
||||
use Ratchet\ConnectionInterface;
|
||||
use React\EventLoop\LoopInterface;
|
||||
|
|
@ -43,6 +46,21 @@ class LocalChannelManager implements ChannelManager
|
|||
*/
|
||||
protected $acceptsNewConnections = true;
|
||||
|
||||
/**
|
||||
* The ArrayStore instance of locks.
|
||||
*
|
||||
* @var \Illuminate\Cache\ArrayStore
|
||||
*/
|
||||
protected $store;
|
||||
|
||||
/**
|
||||
* The lock name to use on Array to avoid multiple
|
||||
* actions that might lead to multiple processings.
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
protected static $lockName = 'laravel-websockets:channel-manager:lock';
|
||||
|
||||
/**
|
||||
* Create a new channel manager instance.
|
||||
*
|
||||
|
|
@ -52,7 +70,7 @@ class LocalChannelManager implements ChannelManager
|
|||
*/
|
||||
public function __construct(LoopInterface $loop, $factoryClass = null)
|
||||
{
|
||||
//
|
||||
$this->store = new ArrayStore;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -398,7 +416,9 @@ class LocalChannelManager implements ChannelManager
|
|||
*/
|
||||
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
|
||||
{
|
||||
return Helpers::createFulfilledPromise(true);
|
||||
$connection->lastPongedAt = Carbon::now();
|
||||
|
||||
return $this->updateConnectionInChannels($connection);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -408,7 +428,43 @@ class LocalChannelManager implements ChannelManager
|
|||
*/
|
||||
public function removeObsoleteConnections(): PromiseInterface
|
||||
{
|
||||
return Helpers::createFulfilledPromise(true);
|
||||
if (! $this->lock()->acquire()) {
|
||||
return Helpers::createFulfilledPromise(false);
|
||||
}
|
||||
|
||||
$this->getLocalConnections()->then(function ($connections) {
|
||||
foreach ($connections as $connection) {
|
||||
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
|
||||
|
||||
if ($differenceInSeconds > 120) {
|
||||
$this->unsubscribeFromAllChannels($connection);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return Helpers::createFulfilledPromise(
|
||||
$this->lock()->release()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the connection in all channels.
|
||||
*
|
||||
* @param ConnectionInterface $connection
|
||||
* @return PromiseInterface[bool]
|
||||
*/
|
||||
public function updateConnectionInChannels($connection): PromiseInterface
|
||||
{
|
||||
return $this->getLocalChannels($connection->app->id)
|
||||
->then(function ($channels) use ($connection) {
|
||||
foreach ($channels as $channel) {
|
||||
if ($channel->hasConnection($connection)) {
|
||||
$channel->saveConnection($connection);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -452,4 +508,14 @@ class LocalChannelManager implements ChannelManager
|
|||
|
||||
return Channel::class;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a new ArrayLock instance to avoid race conditions.
|
||||
*
|
||||
* @return \Illuminate\Cache\CacheLock
|
||||
*/
|
||||
protected function lock()
|
||||
{
|
||||
return new ArrayLock($this->store, static::$lockName, 0);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ class RedisChannelManager extends LocalChannelManager
|
|||
*
|
||||
* @var string
|
||||
*/
|
||||
protected static $redisLockName = 'laravel-websockets:channel-manager:lock';
|
||||
protected static $lockName = 'laravel-websockets:channel-manager:lock';
|
||||
|
||||
/**
|
||||
* Create a new channel manager instance.
|
||||
|
|
@ -768,7 +768,7 @@ class RedisChannelManager extends LocalChannelManager
|
|||
*/
|
||||
protected function lock()
|
||||
{
|
||||
return new RedisLock($this->redis, static::$redisLockName, 0);
|
||||
return new RedisLock($this->redis, static::$lockName, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ class Channel
|
|||
*/
|
||||
public function unsubscribe(ConnectionInterface $connection): bool
|
||||
{
|
||||
if (! isset($this->connections[$connection->socketId])) {
|
||||
if (! $this->hasConnection($connection)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -109,13 +109,24 @@ class Channel
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the given connection exists.
|
||||
*
|
||||
* @param \Ratchet\ConnectionInterface $connection
|
||||
* @return bool
|
||||
*/
|
||||
public function hasConnection(ConnectionInterface $connection): bool
|
||||
{
|
||||
return isset($this->connections[$connection->socketId]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the connection to the subscribers list.
|
||||
*
|
||||
* @param \Ratchet\ConnectionInterface $connection
|
||||
* @return void
|
||||
*/
|
||||
protected function saveConnection(ConnectionInterface $connection)
|
||||
public function saveConnection(ConnectionInterface $connection)
|
||||
{
|
||||
$this->connections[$connection->socketId] = $connection;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,6 +57,8 @@ class WebSocketHandler implements MessageComponentInterface
|
|||
|
||||
$this->channelManager->subscribeToApp($connection->app->id);
|
||||
|
||||
$this->channelManager->connectionPonged($connection);
|
||||
|
||||
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_CONNECTED, [
|
||||
'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}",
|
||||
'socketId' => $connection->socketId,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,131 @@
|
|||
<?php
|
||||
|
||||
namespace BeyondCode\LaravelWebSockets\Test;
|
||||
|
||||
use Carbon\Carbon;
|
||||
|
||||
class LocalPongRemovalTest extends TestCase
|
||||
{
|
||||
public function test_not_ponged_connections_do_get_removed_on_local_for_public_channels()
|
||||
{
|
||||
$this->runOnlyOnLocalReplication();
|
||||
|
||||
$activeConnection = $this->newActiveConnection(['public-channel']);
|
||||
$obsoleteConnection = $this->newActiveConnection(['public-channel']);
|
||||
|
||||
// The active connection just pinged, it should not be closed.
|
||||
$activeConnection->lastPongedAt = Carbon::now();
|
||||
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
|
||||
|
||||
$this->channelManager->updateConnectionInChannels($activeConnection);
|
||||
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'public-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->channelManager->removeObsoleteConnections();
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'public-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getLocalConnections()
|
||||
->then(function ($connections) use ($activeConnection) {
|
||||
$connection = $connections[$activeConnection->socketId];
|
||||
|
||||
$this->assertEquals($activeConnection->socketId, $connection->socketId);
|
||||
});
|
||||
}
|
||||
|
||||
public function test_not_ponged_connections_do_get_removed_on_local_for_private_channels()
|
||||
{
|
||||
$this->runOnlyOnLocalReplication();
|
||||
|
||||
$activeConnection = $this->newPrivateConnection('private-channel');
|
||||
$obsoleteConnection = $this->newPrivateConnection('private-channel');
|
||||
|
||||
// The active connection just pinged, it should not be closed.
|
||||
$activeConnection->lastPongedAt = Carbon::now();
|
||||
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
|
||||
|
||||
$this->channelManager->updateConnectionInChannels($activeConnection);
|
||||
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'private-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->channelManager->removeObsoleteConnections();
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'private-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getLocalConnections()
|
||||
->then(function ($connections) use ($activeConnection) {
|
||||
$connection = $connections[$activeConnection->socketId];
|
||||
|
||||
$this->assertEquals($activeConnection->socketId, $connection->socketId);
|
||||
});
|
||||
}
|
||||
|
||||
public function test_not_ponged_connections_do_get_removed_on_local_for_presence_channels()
|
||||
{
|
||||
$this->runOnlyOnLocalReplication();
|
||||
|
||||
$activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
|
||||
$obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
|
||||
|
||||
// The active connection just pinged, it should not be closed.
|
||||
$activeConnection->lastPongedAt = Carbon::now();
|
||||
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
|
||||
|
||||
$this->channelManager->updateConnectionInChannels($activeConnection);
|
||||
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'presence-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getChannelMembers('1234', 'presence-channel')
|
||||
->then(function ($members) {
|
||||
$this->assertCount(2, $members);
|
||||
});
|
||||
|
||||
$this->channelManager->removeObsoleteConnections();
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'presence-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getLocalConnections()
|
||||
->then(function ($connections) use ($activeConnection) {
|
||||
$connection = $connections[$activeConnection->socketId];
|
||||
|
||||
$this->assertEquals($activeConnection->socketId, $connection->socketId);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getChannelMembers('1234', 'presence-channel')
|
||||
->then(function ($members) {
|
||||
$this->assertCount(1, $members);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -4,7 +4,6 @@ namespace BeyondCode\LaravelWebSockets\Test;
|
|||
|
||||
use BeyondCode\LaravelWebSockets\API\TriggerEvent;
|
||||
use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature;
|
||||
use Carbon\Carbon;
|
||||
use GuzzleHttp\Psr7\Request;
|
||||
use Illuminate\Http\JsonResponse;
|
||||
use Pusher\Pusher;
|
||||
|
|
@ -318,58 +317,6 @@ class PresenceChannelTest extends TestCase
|
|||
});
|
||||
}
|
||||
|
||||
public function test_not_ponged_connections_do_get_removed_for_presence_channels()
|
||||
{
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
||||
$activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
|
||||
$obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
|
||||
|
||||
// The active connection just pinged, it should not be closed.
|
||||
$this->channelManager->addConnectionToSet($activeConnection, Carbon::now());
|
||||
|
||||
// Make the connection look like it was lost 1 day ago.
|
||||
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'presence-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(1, $expiredConnections);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getChannelMembers('1234', 'presence-channel')
|
||||
->then(function ($members) {
|
||||
$this->assertCount(2, $members);
|
||||
});
|
||||
|
||||
$this->channelManager->removeObsoleteConnections();
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'presence-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(0, $expiredConnections);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getChannelMembers('1234', 'presence-channel')
|
||||
->then(function ($members) {
|
||||
$this->assertCount(1, $members);
|
||||
});
|
||||
}
|
||||
|
||||
public function test_events_are_processed_by_on_message_on_presence_channels()
|
||||
{
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ namespace BeyondCode\LaravelWebSockets\Test;
|
|||
|
||||
use BeyondCode\LaravelWebSockets\API\TriggerEvent;
|
||||
use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature;
|
||||
use Carbon\Carbon;
|
||||
use GuzzleHttp\Psr7\Request;
|
||||
use Illuminate\Http\JsonResponse;
|
||||
use Pusher\Pusher;
|
||||
|
|
@ -159,46 +158,6 @@ class PrivateChannelTest extends TestCase
|
|||
});
|
||||
}
|
||||
|
||||
public function test_not_ponged_connections_do_get_removed_for_private_channels()
|
||||
{
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
||||
$activeConnection = $this->newPrivateConnection('private-channel');
|
||||
$obsoleteConnection = $this->newPrivateConnection('private-channel');
|
||||
|
||||
// The active connection just pinged, it should not be closed.
|
||||
$this->channelManager->addConnectionToSet($activeConnection, Carbon::now());
|
||||
|
||||
// Make the connection look like it was lost 1 day ago.
|
||||
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'private-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(1, $expiredConnections);
|
||||
});
|
||||
|
||||
$this->channelManager->removeObsoleteConnections();
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'private-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(0, $expiredConnections);
|
||||
});
|
||||
}
|
||||
|
||||
public function test_events_are_processed_by_on_message_on_private_channels()
|
||||
{
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@
|
|||
namespace BeyondCode\LaravelWebSockets\Test;
|
||||
|
||||
use BeyondCode\LaravelWebSockets\API\TriggerEvent;
|
||||
use Carbon\Carbon;
|
||||
use GuzzleHttp\Psr7\Request;
|
||||
use Illuminate\Http\JsonResponse;
|
||||
use Pusher\Pusher;
|
||||
|
|
@ -140,46 +139,6 @@ class PublicChannelTest extends TestCase
|
|||
});
|
||||
}
|
||||
|
||||
public function test_not_ponged_connections_do_get_removed_for_public_channels()
|
||||
{
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
||||
$activeConnection = $this->newActiveConnection(['public-channel']);
|
||||
$obsoleteConnection = $this->newActiveConnection(['public-channel']);
|
||||
|
||||
// The active connection just pinged, it should not be closed.
|
||||
$this->channelManager->addConnectionToSet($activeConnection, Carbon::now());
|
||||
|
||||
// Make the connection look like it was lost 1 day ago.
|
||||
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'public-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(1, $expiredConnections);
|
||||
});
|
||||
|
||||
$this->channelManager->removeObsoleteConnections();
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'public-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(0, $expiredConnections);
|
||||
});
|
||||
}
|
||||
|
||||
public function test_events_are_processed_by_on_message_on_public_channels()
|
||||
{
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,140 @@
|
|||
<?php
|
||||
|
||||
namespace BeyondCode\LaravelWebSockets\Test;
|
||||
|
||||
use Carbon\Carbon;
|
||||
|
||||
class RedisPongRemovalTest extends TestCase
|
||||
{
|
||||
public function test_not_ponged_connections_do_get_removed_on_redis_for_public_channels()
|
||||
{
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
||||
$activeConnection = $this->newActiveConnection(['public-channel']);
|
||||
$obsoleteConnection = $this->newActiveConnection(['public-channel']);
|
||||
|
||||
// The active connection just pinged, it should not be closed.
|
||||
$this->channelManager->addConnectionToSet($activeConnection, Carbon::now());
|
||||
|
||||
// Make the connection look like it was lost 1 day ago.
|
||||
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'public-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(1, $expiredConnections);
|
||||
});
|
||||
|
||||
$this->channelManager->removeObsoleteConnections();
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'public-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(0, $expiredConnections);
|
||||
});
|
||||
}
|
||||
|
||||
public function test_not_ponged_connections_do_get_removed_on_redis_for_private_channels()
|
||||
{
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
||||
$activeConnection = $this->newPrivateConnection('private-channel');
|
||||
$obsoleteConnection = $this->newPrivateConnection('private-channel');
|
||||
|
||||
// The active connection just pinged, it should not be closed.
|
||||
$this->channelManager->addConnectionToSet($activeConnection, Carbon::now());
|
||||
|
||||
// Make the connection look like it was lost 1 day ago.
|
||||
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'private-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(1, $expiredConnections);
|
||||
});
|
||||
|
||||
$this->channelManager->removeObsoleteConnections();
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'private-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(0, $expiredConnections);
|
||||
});
|
||||
}
|
||||
|
||||
public function test_not_ponged_connections_do_get_removed_on_redis_for_presence_channels()
|
||||
{
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
||||
$activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
|
||||
$obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
|
||||
|
||||
// The active connection just pinged, it should not be closed.
|
||||
$this->channelManager->addConnectionToSet($activeConnection, Carbon::now());
|
||||
|
||||
// Make the connection look like it was lost 1 day ago.
|
||||
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'presence-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(1, $expiredConnections);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getChannelMembers('1234', 'presence-channel')
|
||||
->then(function ($members) {
|
||||
$this->assertCount(2, $members);
|
||||
});
|
||||
|
||||
$this->channelManager->removeObsoleteConnections();
|
||||
|
||||
$this->channelManager
|
||||
->getGlobalConnectionsCount('1234', 'presence-channel')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
|
||||
->then(function ($expiredConnections) {
|
||||
$this->assertCount(0, $expiredConnections);
|
||||
});
|
||||
|
||||
$this->channelManager
|
||||
->getChannelMembers('1234', 'presence-channel')
|
||||
->then(function ($members) {
|
||||
$this->assertCount(1, $members);
|
||||
});
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue