diff --git a/config/websockets.php b/config/websockets.php index b369922..f5d9faf 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -42,21 +42,6 @@ return [ 'app' => \BeyondCode\LaravelWebSockets\Apps\ConfigAppManager::class, - /* - |-------------------------------------------------------------------------- - | Channel Manager - |-------------------------------------------------------------------------- - | - | When users subscribe or unsubscribe from specific channels, - | the connections are stored to keep track of any interaction with the - | WebSocket server. - | You can however add your own implementation that will help the store - | of the channels alongside their connections. - | - */ - - 'channel' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class, - ], /* @@ -191,6 +176,8 @@ return [ 'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger::class, + 'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class, + ], /* @@ -214,6 +201,8 @@ return [ 'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger::class, + 'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\RedisChannelManager::class, + ], ], diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php index fe55715..7a4c2a5 100644 --- a/src/PubSub/Drivers/LocalClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -66,6 +66,28 @@ class LocalClient implements ReplicationInterface return true; } + /** + * Subscribe to the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function subscribeToApp($appId): bool + { + return true; + } + + /** + * Unsubscribe from the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function unsubscribeFromApp($appId): bool + { + return true; + } + /** * Add a member to a channel. To be called when they have * subscribed to the channel. @@ -137,4 +159,15 @@ class LocalClient implements ReplicationInterface return new FulfilledPromise($results); } + + /** + * Get the amount of unique connections. + * + * @param mixed $appId + * @return null|int|\React\Promise\PromiseInterface + */ + public function appConnectionsCount($appId) + { + return null; + } } diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index 85dafdf..40ae12a 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -7,6 +7,7 @@ use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use Clue\React\Redis\Client; use Clue\React\Redis\Factory; +use Illuminate\Support\Facades\Cache; use Illuminate\Support\Str; use React\EventLoop\LoopInterface; use React\Promise\PromiseInterface; @@ -42,6 +43,13 @@ class RedisClient extends LocalClient */ protected $subscribeClient; + /** + * The Redis manager instance. + * + * @var \Illuminate\Redis\RedisManager + */ + protected $redis; + /** * Mapping of subscribed channels, where the key is the channel name, * and the value is the amount of connections which are subscribed to @@ -60,6 +68,7 @@ class RedisClient extends LocalClient public function __construct() { $this->serverId = Str::uuid()->toString(); + $this->redis = Cache::getRedis(); } /** @@ -175,6 +184,36 @@ class RedisClient extends LocalClient return true; } + /** + * Subscribe to the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function subscribeToApp($appId): bool + { + $this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]); + + $this->redis->hincrby($this->getTopicName($appId), 'connections', 1); + + return true; + } + + /** + * Unsubscribe from the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function unsubscribeFromApp($appId): bool + { + $this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]); + + $this->redis->hincrby($this->getTopicName($appId), 'connections', -1); + + return true; + } + /** * Add a member to a channel. To be called when they have * subscribed to the channel. @@ -258,6 +297,19 @@ class RedisClient extends LocalClient }); } + /** + * Get the amount of unique connections. + * + * @param mixed $appId + * @return null|int|\React\Promise\PromiseInterface + */ + public function appConnectionsCount($appId) + { + // Use the in-built Redis manager to avoid async run. + + return $this->redis->hget($this->getTopicName($appId), 'connections') ?: 0; + } + /** * Handle a message received from Redis on a specific channel. * @@ -377,13 +429,19 @@ class RedisClient extends LocalClient * app ID and channel name. * * @param mixed $appId - * @param string $channel + * @param string|null $channel * @return string */ - protected function getTopicName($appId, string $channel): string + protected function getTopicName($appId, string $channel = null): string { $prefix = config('database.redis.options.prefix', null); - return "{$prefix}{$appId}:{$channel}"; + $hash = "{$prefix}{$appId}"; + + if ($channel) { + $hash .= ":{$channel}"; + } + + return $hash; } } diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index e0b39a8..7c50ae6 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -45,6 +45,22 @@ interface ReplicationInterface */ public function unsubscribe($appId, string $channel): bool; + /** + * Subscribe to the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function subscribeToApp($appId): bool; + + /** + * Unsubscribe from the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function unsubscribeFromApp($appId): bool; + /** * Add a member to a channel. To be called when they have * subscribed to the channel. @@ -85,4 +101,12 @@ interface ReplicationInterface * @return PromiseInterface */ public function channelMemberCounts($appId, array $channelNames): PromiseInterface; + + /** + * Get the amount of unique connections. + * + * @param mixed $appId + * @return null|int|\React\Promise\PromiseInterface + */ + public function appConnectionsCount($appId); } diff --git a/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php b/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php new file mode 100644 index 0000000..ed701dd --- /dev/null +++ b/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php @@ -0,0 +1,36 @@ +replicator = app(ReplicationInterface::class); + } + + /** + * Get the connections count on the app. + * + * @param mixed $appId + * @return int + */ + public function getConnectionCount($appId): int + { + return $this->replicator->appConnectionsCount($appId); + } +} diff --git a/src/WebSockets/WebSocketHandler.php b/src/WebSockets/WebSocketHandler.php index f99e0be..29f258a 100644 --- a/src/WebSockets/WebSocketHandler.php +++ b/src/WebSockets/WebSocketHandler.php @@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\WebSockets; use BeyondCode\LaravelWebSockets\Apps\App; use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\QueryParameters; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity; @@ -26,6 +27,13 @@ class WebSocketHandler implements MessageComponentInterface */ protected $channelManager; + /** + * The replicator client. + * + * @var ReplicationInterface + */ + protected $replicator; + /** * Initialize a new handler. * @@ -35,6 +43,7 @@ class WebSocketHandler implements MessageComponentInterface public function __construct(ChannelManager $channelManager) { $this->channelManager = $channelManager; + $this->replicator = app(ReplicationInterface::class); } /** @@ -83,6 +92,8 @@ class WebSocketHandler implements MessageComponentInterface ]); StatisticsLogger::disconnection($connection->app->id); + + $this->replicator->unsubscribeFromApp($connection->app->id); } /** @@ -99,6 +110,8 @@ class WebSocketHandler implements MessageComponentInterface $exception->getPayload() )); } + + $this->replicator->unsubscribeFromApp($connection->app->id); } /** @@ -203,6 +216,8 @@ class WebSocketHandler implements MessageComponentInterface StatisticsLogger::connection($connection->app->id); + $this->replicator->subscribeToApp($connection->app->id); + return $this; } } diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index 5530ecd..c7b6e31 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -59,9 +59,11 @@ class WebSocketsServiceProvider extends ServiceProvider }); $this->app->singleton(ChannelManager::class, function () { - $channelManager = config('websockets.managers.channel', ArrayChannelManager::class); + $replicationDriver = config('websockets.replication.driver', 'local'); - return new $channelManager; + $class = config("websockets.replication.{$replicationDriver}.channel_manager", ArrayChannelManager::class); + + return new $class; }); $this->app->singleton(AppManager::class, function () { diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index 3e17566..526bb07 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -7,6 +7,7 @@ use BeyondCode\LaravelWebSockets\Tests\Mocks\Message; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\OriginNotAllowed; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\UnknownAppKey; +use Illuminate\Support\Facades\Cache; class ConnectionTest extends TestCase { @@ -31,6 +32,25 @@ class ConnectionTest extends TestCase /** @test */ public function app_can_not_exceed_maximum_capacity() { + $this->runOnlyOnLocalReplication(); + + $this->app['config']->set('websockets.apps.0.capacity', 2); + + $this->getConnectedWebSocketConnection(['test-channel']); + $this->getConnectedWebSocketConnection(['test-channel']); + $this->expectException(ConnectionsOverCapacity::class); + $this->getConnectedWebSocketConnection(['test-channel']); + } + + /** @test */ + public function app_can_not_exceed_maximum_capacity_on_redis_replication() + { + $this->runOnlyOnRedisReplication(); + + $redis = Cache::getRedis(); + + $redis->hdel('laravel_database_1234', 'connections'); + $this->app['config']->set('websockets.apps.0.capacity', 2); $this->getConnectedWebSocketConnection(['test-channel']); diff --git a/tests/PubSub/RedisDriverTest.php b/tests/PubSub/RedisDriverTest.php index 361b30e..dae8f7c 100644 --- a/tests/PubSub/RedisDriverTest.php +++ b/tests/PubSub/RedisDriverTest.php @@ -5,10 +5,18 @@ namespace BeyondCode\LaravelWebSockets\Tests\PubSub; use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; use BeyondCode\LaravelWebSockets\Tests\Mocks\RedisFactory; use BeyondCode\LaravelWebSockets\Tests\TestCase; +use Illuminate\Support\Facades\Cache; use React\EventLoop\Factory as LoopFactory; class RedisDriverTest extends TestCase { + /** + * The Redis manager instance. + * + * @var \Illuminate\Redis\RedisManager + */ + protected $redis; + /** * {@inheritdoc} */ @@ -17,6 +25,10 @@ class RedisDriverTest extends TestCase parent::setUp(); $this->runOnlyOnRedisReplication(); + + $this->redis = Cache::getRedis(); + + $this->redis->hdel('laravel_database_1234', 'connections'); } /** @test */ @@ -80,4 +92,44 @@ class RedisDriverTest extends TestCase $client->getSubscribeClient() ->assertEventDispatched('message'); } + + /** @test */ + public function redis_tracks_app_connections_count() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $this->getSubscribeClient() + ->assertCalledWithArgs('subscribe', ['laravel_database_1234']); + + $this->getPublishClient() + ->assertNothingCalled(); + + $this->assertEquals(1, $this->redis->hget('laravel_database_1234', 'connections')); + } + + /** @test */ + public function redis_tracks_app_connections_count_on_disconnect() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $this->getSubscribeClient() + ->assertCalledWithArgs('subscribe', ['laravel_database_1234']) + ->assertNotCalledWithArgs('unsubscribe', ['laravel_database_1234']); + + $this->getPublishClient() + ->assertNothingCalled(); + + $this->assertEquals(1, $this->redis->hget('laravel_database_1234', 'connections')); + + $this->pusherServer->onClose($connection); + + $this->getPublishClient() + ->assertNothingCalled(); + + $this->assertEquals(0, $this->redis->hget('laravel_database_1234', 'connections')); + } } diff --git a/tests/TestCase.php b/tests/TestCase.php index ade4b52..9df4b29 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -50,7 +50,7 @@ abstract class TestCase extends BaseTestCase $this->withFactories(__DIR__.'/database/factories'); - $this->pusherServer = $this->app->make(config('websockets.handlers.websocket')); + $this->configurePubSub(); $this->channelManager = $this->app->make(ChannelManager::class); @@ -63,7 +63,7 @@ abstract class TestCase extends BaseTestCase $this->loadMigrationsFrom(__DIR__.'/../database/migrations'); - $this->configurePubSub(); + $this->pusherServer = $this->app->make(config('websockets.handlers.websocket')); } /** @@ -151,6 +151,7 @@ abstract class TestCase extends BaseTestCase if (in_array($replicationDriver, ['redis'])) { $app['config']->set('broadcasting.default', 'pusher'); + $app['config']->set('cache.default', 'redis'); } }