Added redis connection counter.

This commit is contained in:
Alex Renoki 2020-09-03 16:31:19 +03:00
parent e9b85bbfc7
commit fadb3fc123
10 changed files with 252 additions and 22 deletions

View File

@ -42,21 +42,6 @@ return [
'app' => \BeyondCode\LaravelWebSockets\Apps\ConfigAppManager::class,
/*
|--------------------------------------------------------------------------
| Channel Manager
|--------------------------------------------------------------------------
|
| When users subscribe or unsubscribe from specific channels,
| the connections are stored to keep track of any interaction with the
| WebSocket server.
| You can however add your own implementation that will help the store
| of the channels alongside their connections.
|
*/
'channel' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,
],
/*
@ -191,6 +176,8 @@ return [
'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger::class,
'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,
],
/*
@ -214,6 +201,8 @@ return [
'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger::class,
'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\RedisChannelManager::class,
],
],

View File

@ -66,6 +66,28 @@ class LocalClient implements ReplicationInterface
return true;
}
/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool
{
return true;
}
/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool
{
return true;
}
/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
@ -137,4 +159,15 @@ class LocalClient implements ReplicationInterface
return new FulfilledPromise($results);
}
/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function appConnectionsCount($appId)
{
return null;
}
}

View File

@ -7,6 +7,7 @@ use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Str;
use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;
@ -42,6 +43,13 @@ class RedisClient extends LocalClient
*/
protected $subscribeClient;
/**
* The Redis manager instance.
*
* @var \Illuminate\Redis\RedisManager
*/
protected $redis;
/**
* Mapping of subscribed channels, where the key is the channel name,
* and the value is the amount of connections which are subscribed to
@ -60,6 +68,7 @@ class RedisClient extends LocalClient
public function __construct()
{
$this->serverId = Str::uuid()->toString();
$this->redis = Cache::getRedis();
}
/**
@ -175,6 +184,36 @@ class RedisClient extends LocalClient
return true;
}
/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool
{
$this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]);
$this->redis->hincrby($this->getTopicName($appId), 'connections', 1);
return true;
}
/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool
{
$this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]);
$this->redis->hincrby($this->getTopicName($appId), 'connections', -1);
return true;
}
/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
@ -258,6 +297,19 @@ class RedisClient extends LocalClient
});
}
/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function appConnectionsCount($appId)
{
// Use the in-built Redis manager to avoid async run.
return $this->redis->hget($this->getTopicName($appId), 'connections') ?: 0;
}
/**
* Handle a message received from Redis on a specific channel.
*
@ -377,13 +429,19 @@ class RedisClient extends LocalClient
* app ID and channel name.
*
* @param mixed $appId
* @param string $channel
* @param string|null $channel
* @return string
*/
protected function getTopicName($appId, string $channel): string
protected function getTopicName($appId, string $channel = null): string
{
$prefix = config('database.redis.options.prefix', null);
return "{$prefix}{$appId}:{$channel}";
$hash = "{$prefix}{$appId}";
if ($channel) {
$hash .= ":{$channel}";
}
return $hash;
}
}

View File

@ -45,6 +45,22 @@ interface ReplicationInterface
*/
public function unsubscribe($appId, string $channel): bool;
/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool;
/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool;
/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
@ -85,4 +101,12 @@ interface ReplicationInterface
* @return PromiseInterface
*/
public function channelMemberCounts($appId, array $channelNames): PromiseInterface;
/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function appConnectionsCount($appId);
}

View File

@ -0,0 +1,36 @@
<?php
namespace BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
class RedisChannelManager extends ArrayChannelManager
{
/**
* The replicator driver.
*
* @var \BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface
*/
protected $replicator;
/**
* Initialize the channel manager.
*
* @return void
*/
public function __construct()
{
$this->replicator = app(ReplicationInterface::class);
}
/**
* Get the connections count on the app.
*
* @param mixed $appId
* @return int
*/
public function getConnectionCount($appId): int
{
return $this->replicator->appConnectionsCount($appId);
}
}

View File

@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\WebSockets;
use BeyondCode\LaravelWebSockets\Apps\App;
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\QueryParameters;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity;
@ -26,6 +27,13 @@ class WebSocketHandler implements MessageComponentInterface
*/
protected $channelManager;
/**
* The replicator client.
*
* @var ReplicationInterface
*/
protected $replicator;
/**
* Initialize a new handler.
*
@ -35,6 +43,7 @@ class WebSocketHandler implements MessageComponentInterface
public function __construct(ChannelManager $channelManager)
{
$this->channelManager = $channelManager;
$this->replicator = app(ReplicationInterface::class);
}
/**
@ -83,6 +92,8 @@ class WebSocketHandler implements MessageComponentInterface
]);
StatisticsLogger::disconnection($connection->app->id);
$this->replicator->unsubscribeFromApp($connection->app->id);
}
/**
@ -99,6 +110,8 @@ class WebSocketHandler implements MessageComponentInterface
$exception->getPayload()
));
}
$this->replicator->unsubscribeFromApp($connection->app->id);
}
/**
@ -203,6 +216,8 @@ class WebSocketHandler implements MessageComponentInterface
StatisticsLogger::connection($connection->app->id);
$this->replicator->subscribeToApp($connection->app->id);
return $this;
}
}

View File

@ -59,9 +59,11 @@ class WebSocketsServiceProvider extends ServiceProvider
});
$this->app->singleton(ChannelManager::class, function () {
$channelManager = config('websockets.managers.channel', ArrayChannelManager::class);
$replicationDriver = config('websockets.replication.driver', 'local');
return new $channelManager;
$class = config("websockets.replication.{$replicationDriver}.channel_manager", ArrayChannelManager::class);
return new $class;
});
$this->app->singleton(AppManager::class, function () {

View File

@ -7,6 +7,7 @@ use BeyondCode\LaravelWebSockets\Tests\Mocks\Message;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\OriginNotAllowed;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\UnknownAppKey;
use Illuminate\Support\Facades\Cache;
class ConnectionTest extends TestCase
{
@ -31,6 +32,25 @@ class ConnectionTest extends TestCase
/** @test */
public function app_can_not_exceed_maximum_capacity()
{
$this->runOnlyOnLocalReplication();
$this->app['config']->set('websockets.apps.0.capacity', 2);
$this->getConnectedWebSocketConnection(['test-channel']);
$this->getConnectedWebSocketConnection(['test-channel']);
$this->expectException(ConnectionsOverCapacity::class);
$this->getConnectedWebSocketConnection(['test-channel']);
}
/** @test */
public function app_can_not_exceed_maximum_capacity_on_redis_replication()
{
$this->runOnlyOnRedisReplication();
$redis = Cache::getRedis();
$redis->hdel('laravel_database_1234', 'connections');
$this->app['config']->set('websockets.apps.0.capacity', 2);
$this->getConnectedWebSocketConnection(['test-channel']);

View File

@ -5,10 +5,18 @@ namespace BeyondCode\LaravelWebSockets\Tests\PubSub;
use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient;
use BeyondCode\LaravelWebSockets\Tests\Mocks\RedisFactory;
use BeyondCode\LaravelWebSockets\Tests\TestCase;
use Illuminate\Support\Facades\Cache;
use React\EventLoop\Factory as LoopFactory;
class RedisDriverTest extends TestCase
{
/**
* The Redis manager instance.
*
* @var \Illuminate\Redis\RedisManager
*/
protected $redis;
/**
* {@inheritdoc}
*/
@ -17,6 +25,10 @@ class RedisDriverTest extends TestCase
parent::setUp();
$this->runOnlyOnRedisReplication();
$this->redis = Cache::getRedis();
$this->redis->hdel('laravel_database_1234', 'connections');
}
/** @test */
@ -80,4 +92,44 @@ class RedisDriverTest extends TestCase
$client->getSubscribeClient()
->assertEventDispatched('message');
}
/** @test */
public function redis_tracks_app_connections_count()
{
$connection = $this->getWebSocketConnection();
$this->pusherServer->onOpen($connection);
$this->getSubscribeClient()
->assertCalledWithArgs('subscribe', ['laravel_database_1234']);
$this->getPublishClient()
->assertNothingCalled();
$this->assertEquals(1, $this->redis->hget('laravel_database_1234', 'connections'));
}
/** @test */
public function redis_tracks_app_connections_count_on_disconnect()
{
$connection = $this->getWebSocketConnection();
$this->pusherServer->onOpen($connection);
$this->getSubscribeClient()
->assertCalledWithArgs('subscribe', ['laravel_database_1234'])
->assertNotCalledWithArgs('unsubscribe', ['laravel_database_1234']);
$this->getPublishClient()
->assertNothingCalled();
$this->assertEquals(1, $this->redis->hget('laravel_database_1234', 'connections'));
$this->pusherServer->onClose($connection);
$this->getPublishClient()
->assertNothingCalled();
$this->assertEquals(0, $this->redis->hget('laravel_database_1234', 'connections'));
}
}

View File

@ -50,7 +50,7 @@ abstract class TestCase extends BaseTestCase
$this->withFactories(__DIR__.'/database/factories');
$this->pusherServer = $this->app->make(config('websockets.handlers.websocket'));
$this->configurePubSub();
$this->channelManager = $this->app->make(ChannelManager::class);
@ -63,7 +63,7 @@ abstract class TestCase extends BaseTestCase
$this->loadMigrationsFrom(__DIR__.'/../database/migrations');
$this->configurePubSub();
$this->pusherServer = $this->app->make(config('websockets.handlers.websocket'));
}
/**
@ -151,6 +151,7 @@ abstract class TestCase extends BaseTestCase
if (in_array($replicationDriver, ['redis'])) {
$app['config']->set('broadcasting.default', 'pusher');
$app['config']->set('cache.default', 'redis');
}
}