diff --git a/composer.json b/composer.json index f34b96d..09c15b6 100644 --- a/composer.json +++ b/composer.json @@ -36,11 +36,12 @@ "illuminate/routing": "^6.0|^7.0", "illuminate/support": "^6.0|^7.0", "pusher/pusher-php-server": "^3.0|^4.0", - "react/dns": "^1.1", + "react/promise": "^2.0", "symfony/http-kernel": "^4.0|^5.0", "symfony/psr-http-message-bridge": "^1.1|^2.0" }, "require-dev": { + "clue/block-react": "^1.4", "mockery/mockery": "^1.3", "orchestra/testbench-browser-kit": "^4.0|^5.0", "phpunit/phpunit": "^8.0|^9.0" diff --git a/config/websockets.php b/config/websockets.php index b369922..f5d9faf 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -42,21 +42,6 @@ return [ 'app' => \BeyondCode\LaravelWebSockets\Apps\ConfigAppManager::class, - /* - |-------------------------------------------------------------------------- - | Channel Manager - |-------------------------------------------------------------------------- - | - | When users subscribe or unsubscribe from specific channels, - | the connections are stored to keep track of any interaction with the - | WebSocket server. - | You can however add your own implementation that will help the store - | of the channels alongside their connections. - | - */ - - 'channel' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class, - ], /* @@ -191,6 +176,8 @@ return [ 'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger::class, + 'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class, + ], /* @@ -214,6 +201,8 @@ return [ 'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger::class, + 'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\RedisChannelManager::class, + ], ], diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php index fe55715..67a1d29 100644 --- a/src/PubSub/Drivers/LocalClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -66,6 +66,28 @@ class LocalClient implements ReplicationInterface return true; } + /** + * Subscribe to the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function subscribeToApp($appId): bool + { + return true; + } + + /** + * Unsubscribe from the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function unsubscribeFromApp($appId): bool + { + return true; + } + /** * Add a member to a channel. To be called when they have * subscribed to the channel. @@ -137,4 +159,26 @@ class LocalClient implements ReplicationInterface return new FulfilledPromise($results); } + + /** + * Get the amount of unique connections. + * + * @param mixed $appId + * @return null|int + */ + public function getLocalConnectionsCount($appId) + { + return null; + } + + /** + * Get the amount of connections aggregated on multiple instances. + * + * @param mixed $appId + * @return null|int|\React\Promise\PromiseInterface + */ + public function getGlobalConnectionsCount($appId) + { + return null; + } } diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index 85dafdf..6d3fe7e 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -175,6 +175,36 @@ class RedisClient extends LocalClient return true; } + /** + * Subscribe to the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function subscribeToApp($appId): bool + { + $this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]); + + $this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', 1]); + + return true; + } + + /** + * Unsubscribe from the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function unsubscribeFromApp($appId): bool + { + $this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]); + + $this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', -1]); + + return true; + } + /** * Add a member to a channel. To be called when they have * subscribed to the channel. @@ -258,6 +288,17 @@ class RedisClient extends LocalClient }); } + /** + * Get the amount of connections aggregated on multiple instances. + * + * @param mixed $appId + * @return null|int|\React\Promise\PromiseInterface + */ + public function getGlobalConnectionsCount($appId) + { + return $this->publishClient->hget($this->getTopicName($appId), 'connections'); + } + /** * Handle a message received from Redis on a specific channel. * @@ -321,8 +362,8 @@ class RedisClient extends LocalClient */ protected function getConnectionUri() { - $name = config('websockets.replication.redis.connection') ?: 'default'; - $config = config('database.redis')[$name]; + $name = config('websockets.replication.redis.connection', 'default'); + $config = config("database.redis.{$name}"); $host = $config['host']; $port = $config['port'] ?: 6379; @@ -377,13 +418,19 @@ class RedisClient extends LocalClient * app ID and channel name. * * @param mixed $appId - * @param string $channel + * @param string|null $channel * @return string */ - protected function getTopicName($appId, string $channel): string + protected function getTopicName($appId, string $channel = null): string { $prefix = config('database.redis.options.prefix', null); - return "{$prefix}{$appId}:{$channel}"; + $hash = "{$prefix}{$appId}"; + + if ($channel) { + $hash .= ":{$channel}"; + } + + return $hash; } } diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index e0b39a8..5ca3ee3 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -45,6 +45,22 @@ interface ReplicationInterface */ public function unsubscribe($appId, string $channel): bool; + /** + * Subscribe to the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function subscribeToApp($appId): bool; + + /** + * Unsubscribe from the app's pubsub keyspace. + * + * @param mixed $appId + * @return bool + */ + public function unsubscribeFromApp($appId): bool; + /** * Add a member to a channel. To be called when they have * subscribed to the channel. @@ -85,4 +101,20 @@ interface ReplicationInterface * @return PromiseInterface */ public function channelMemberCounts($appId, array $channelNames): PromiseInterface; + + /** + * Get the amount of unique connections. + * + * @param mixed $appId + * @return null|int + */ + public function getLocalConnectionsCount($appId); + + /** + * Get the amount of connections aggregated on multiple instances. + * + * @param mixed $appId + * @return null|int|\React\Promise\PromiseInterface + */ + public function getGlobalConnectionsCount($appId); } diff --git a/src/Statistics/Logger/MemoryStatisticsLogger.php b/src/Statistics/Logger/MemoryStatisticsLogger.php index c75fa33..f067502 100644 --- a/src/Statistics/Logger/MemoryStatisticsLogger.php +++ b/src/Statistics/Logger/MemoryStatisticsLogger.php @@ -105,7 +105,7 @@ class MemoryStatisticsLogger implements StatisticsLogger $this->createRecord($statistic, $appId); - $currentConnectionCount = $this->channelManager->getConnectionCount($appId); + $currentConnectionCount = $this->channelManager->getGlobalConnectionsCount($appId); $statistic->reset($currentConnectionCount); } diff --git a/src/Statistics/Logger/RedisStatisticsLogger.php b/src/Statistics/Logger/RedisStatisticsLogger.php index 22ec483..5c070e8 100644 --- a/src/Statistics/Logger/RedisStatisticsLogger.php +++ b/src/Statistics/Logger/RedisStatisticsLogger.php @@ -3,10 +3,11 @@ namespace BeyondCode\LaravelWebSockets\Statistics\Logger; use BeyondCode\LaravelWebSockets\Apps\App; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Statistics\Drivers\StatisticsDriver; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use Illuminate\Cache\RedisLock; -use Illuminate\Support\Facades\Cache; +use Illuminate\Support\Facades\Redis; class RedisStatisticsLogger implements StatisticsLogger { @@ -42,7 +43,11 @@ class RedisStatisticsLogger implements StatisticsLogger { $this->channelManager = $channelManager; $this->driver = $driver; - $this->redis = Cache::getRedis(); + $this->replicator = app(ReplicationInterface::class); + + $this->redis = Redis::connection( + config('websockets.replication.redis.connection', 'default') + ); } /** @@ -54,7 +59,7 @@ class RedisStatisticsLogger implements StatisticsLogger public function webSocketMessage($appId) { $this->ensureAppIsSet($appId) - ->hincrby($this->getHash($appId), 'websocket_message_count', 1); + ->__call('hincrby', [$this->getHash($appId), 'websocket_message_count', 1]); } /** @@ -66,7 +71,7 @@ class RedisStatisticsLogger implements StatisticsLogger public function apiMessage($appId) { $this->ensureAppIsSet($appId) - ->hincrby($this->getHash($appId), 'api_message_count', 1); + ->__call('hincrby', [$this->getHash($appId), 'api_message_count', 1]); } /** @@ -77,16 +82,30 @@ class RedisStatisticsLogger implements StatisticsLogger */ public function connection($appId) { - $currentConnectionCount = $this->ensureAppIsSet($appId) - ->hincrby($this->getHash($appId), 'current_connection_count', 1); + // Increment the current connections count by 1. + $incremented = $this->ensureAppIsSet($appId) + ->__call('hincrby', [$this->getHash($appId), 'current_connection_count', 1]); - $currentPeakConnectionCount = $this->redis->hget($this->getHash($appId), 'peak_connection_count'); + $incremented->then(function ($currentConnectionCount) use ($appId) { + // Get the peak connections count from Redis. + $peakConnectionCount = $this->replicator + ->getPublishClient() + ->__call('hget', [$this->getHash($appId), 'peak_connection_count']); - $peakConnectionCount = is_null($currentPeakConnectionCount) - ? $currentConnectionCount - : max($currentPeakConnectionCount, $currentConnectionCount); + $peakConnectionCount->then(function ($currentPeakConnectionCount) use ($currentConnectionCount, $appId) { + // Extract the greatest number between the current peak connection count + // and the current connection number. - $this->redis->hset($this->getHash($appId), 'peak_connection_count', $peakConnectionCount); + $peakConnectionCount = is_null($currentPeakConnectionCount) + ? $currentConnectionCount + : max($currentPeakConnectionCount, $currentConnectionCount); + + // Then set it to the database. + $this->replicator + ->getPublishClient() + ->__call('hset', [$this->getHash($appId), 'peak_connection_count', $peakConnectionCount]); + }); + }); } /** @@ -97,16 +116,30 @@ class RedisStatisticsLogger implements StatisticsLogger */ public function disconnection($appId) { - $currentConnectionCount = $this->ensureAppIsSet($appId) - ->hincrby($this->getHash($appId), 'current_connection_count', -1); + // Decrement the current connections count by 1. + $decremented = $this->ensureAppIsSet($appId) + ->__call('hincrby', [$this->getHash($appId), 'current_connection_count', -1]); - $currentPeakConnectionCount = $this->redis->hget($this->getHash($appId), 'peak_connection_count'); + $decremented->then(function ($currentConnectionCount) use ($appId) { + // Get the peak connections count from Redis. + $peakConnectionCount = $this->replicator + ->getPublishClient() + ->__call('hget', [$this->getHash($appId), 'peak_connection_count']); - $peakConnectionCount = is_null($currentPeakConnectionCount) - ? $currentConnectionCount - : max($currentPeakConnectionCount, $currentConnectionCount); + $peakConnectionCount->then(function ($currentPeakConnectionCount) use ($currentConnectionCount, $appId) { + // Extract the greatest number between the current peak connection count + // and the current connection number. - $this->redis->hset($this->getHash($appId), 'peak_connection_count', $peakConnectionCount); + $peakConnectionCount = is_null($currentPeakConnectionCount) + ? $currentConnectionCount + : max($currentPeakConnectionCount, $currentConnectionCount); + + // Then set it to the database. + $this->replicator + ->getPublishClient() + ->__call('hset', [$this->getHash($appId), 'peak_connection_count', $peakConnectionCount]); + }); + }); } /** @@ -117,19 +150,44 @@ class RedisStatisticsLogger implements StatisticsLogger public function save() { $this->lock()->get(function () { - foreach ($this->redis->smembers('laravel-websockets:apps') as $appId) { - if (! $statistic = $this->redis->hgetall($this->getHash($appId))) { - continue; + $setMembers = $this->replicator + ->getPublishClient() + ->__call('smembers', ['laravel-websockets:apps']); + + $setMembers->then(function ($members) { + foreach ($members as $appId) { + $member = $this->replicator + ->getPublishClient() + ->__call('hgetall', [$this->getHash($appId)]); + + $member->then(function ($statistic) use ($appId) { + if (! $statistic) { + return; + } + + // Statistics come into a list where the keys are on even indexes + // and the values are on odd indexes. This way, we know which + // ones are keys and which ones are values and their get combined + // later to form the key => value array + + [$keys, $values] = collect($statistic)->partition(function ($value, $key) { + return $key % 2 === 0; + }); + + $statistic = array_combine($keys->all(), $values->all()); + + $this->createRecord($statistic, $appId); + + $this->channelManager + ->getGlobalConnectionsCount($appId) + ->then(function ($currentConnectionCount) use ($appId) { + $currentConnectionCount === 0 + ? $this->resetAppTraces($appId) + : $this->resetStatistics($appId, $currentConnectionCount); + }); + }); } - - $this->createRecord($statistic, $appId); - - $currentConnectionCount = $this->channelManager->getConnectionCount($appId); - - $currentConnectionCount === 0 - ? $this->resetAppTraces($appId) - : $this->resetStatistics($appId, $currentConnectionCount); - } + }); }); } @@ -141,9 +199,11 @@ class RedisStatisticsLogger implements StatisticsLogger */ protected function ensureAppIsSet($appId) { - $this->redis->sadd('laravel-websockets:apps', $appId); + $this->replicator + ->getPublishClient() + ->__call('sadd', ['laravel-websockets:apps', $appId]); - return $this->redis; + return $this->replicator->getPublishClient(); } /** @@ -155,10 +215,21 @@ class RedisStatisticsLogger implements StatisticsLogger */ public function resetStatistics($appId, int $currentConnectionCount) { - $this->redis->hset($this->getHash($appId), 'current_connection_count', $currentConnectionCount); - $this->redis->hset($this->getHash($appId), 'peak_connection_count', $currentConnectionCount); - $this->redis->hset($this->getHash($appId), 'websocket_message_count', 0); - $this->redis->hset($this->getHash($appId), 'api_message_count', 0); + $this->replicator + ->getPublishClient() + ->__call('hset', [$this->getHash($appId), 'current_connection_count', $currentConnectionCount]); + + $this->replicator + ->getPublishClient() + ->__call('hset', [$this->getHash($appId), 'peak_connection_count', $currentConnectionCount]); + + $this->replicator + ->getPublishClient() + ->__call('hset', [$this->getHash($appId), 'websocket_message_count', 0]); + + $this->replicator + ->getPublishClient() + ->__call('hset', [$this->getHash($appId), 'api_message_count', 0]); } /** @@ -170,12 +241,25 @@ class RedisStatisticsLogger implements StatisticsLogger */ public function resetAppTraces($appId) { - $this->redis->hdel($this->getHash($appId), 'current_connection_count'); - $this->redis->hdel($this->getHash($appId), 'peak_connection_count'); - $this->redis->hdel($this->getHash($appId), 'websocket_message_count'); - $this->redis->hdel($this->getHash($appId), 'api_message_count'); + $this->replicator + ->getPublishClient() + ->__call('hdel', [$this->getHash($appId), 'current_connection_count']); - $this->redis->srem('laravel-websockets:apps', $appId); + $this->replicator + ->getPublishClient() + ->__call('hdel', [$this->getHash($appId), 'peak_connection_count']); + + $this->replicator + ->getPublishClient() + ->__call('hdel', [$this->getHash($appId), 'websocket_message_count']); + + $this->replicator + ->getPublishClient() + ->__call('hdel', [$this->getHash($appId), 'api_message_count']); + + $this->replicator + ->getPublishClient() + ->__call('srem', ['laravel-websockets:apps', $appId]); } /** diff --git a/src/WebSockets/Channels/ChannelManager.php b/src/WebSockets/Channels/ChannelManager.php index fb1721a..2baedc3 100644 --- a/src/WebSockets/Channels/ChannelManager.php +++ b/src/WebSockets/Channels/ChannelManager.php @@ -36,9 +36,17 @@ interface ChannelManager * Get the connections count on the app. * * @param mixed $appId - * @return int + * @return int|\React\Promise\PromiseInterface */ - public function getConnectionCount($appId): int; + public function getLocalConnectionsCount($appId): int; + + /** + * Get the connections count across multiple servers. + * + * @param mixed $appId + * @return int|\React\Promise\PromiseInterface + */ + public function getGlobalConnectionsCount($appId); /** * Remove connection from all channels. diff --git a/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php b/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php index 8635a46..8043e5e 100644 --- a/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php +++ b/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php @@ -71,9 +71,9 @@ class ArrayChannelManager implements ChannelManager * Get the connections count on the app. * * @param mixed $appId - * @return int + * @return int|\React\Promise\PromiseInterface */ - public function getConnectionCount($appId): int + public function getLocalConnectionsCount($appId): int { return collect($this->getChannels($appId)) ->flatMap(function (Channel $channel) { @@ -83,6 +83,17 @@ class ArrayChannelManager implements ChannelManager ->count(); } + /** + * Get the connections count across multiple servers. + * + * @param mixed $appId + * @return int|\React\Promise\PromiseInterface + */ + public function getGlobalConnectionsCount($appId) + { + return $this->getLocalConnectionsCount($appId); + } + /** * Remove connection from all channels. * diff --git a/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php b/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php new file mode 100644 index 0000000..cda98df --- /dev/null +++ b/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php @@ -0,0 +1,36 @@ +replicator = app(ReplicationInterface::class); + } + + /** + * Get the connections count across multiple servers. + * + * @param mixed $appId + * @return int|\React\Promise\PromiseInterface + */ + public function getGlobalConnectionsCount($appId) + { + return $this->replicator->getGlobalConnectionsCount($appId); + } +} diff --git a/src/WebSockets/WebSocketHandler.php b/src/WebSockets/WebSocketHandler.php index f99e0be..a10dc1f 100644 --- a/src/WebSockets/WebSocketHandler.php +++ b/src/WebSockets/WebSocketHandler.php @@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\WebSockets; use BeyondCode\LaravelWebSockets\Apps\App; use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\QueryParameters; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity; @@ -16,6 +17,7 @@ use Exception; use Ratchet\ConnectionInterface; use Ratchet\RFC6455\Messaging\MessageInterface; use Ratchet\WebSocket\MessageComponentInterface; +use React\Promise\PromiseInterface; class WebSocketHandler implements MessageComponentInterface { @@ -26,6 +28,13 @@ class WebSocketHandler implements MessageComponentInterface */ protected $channelManager; + /** + * The replicator client. + * + * @var ReplicationInterface + */ + protected $replicator; + /** * Initialize a new handler. * @@ -35,6 +44,7 @@ class WebSocketHandler implements MessageComponentInterface public function __construct(ChannelManager $channelManager) { $this->channelManager = $channelManager; + $this->replicator = app(ReplicationInterface::class); } /** @@ -83,6 +93,8 @@ class WebSocketHandler implements MessageComponentInterface ]); StatisticsLogger::disconnection($connection->app->id); + + $this->replicator->unsubscribeFromApp($connection->app->id); } /** @@ -99,6 +111,8 @@ class WebSocketHandler implements MessageComponentInterface $exception->getPayload() )); } + + $this->replicator->unsubscribeFromApp($connection->app->id); } /** @@ -152,10 +166,14 @@ class WebSocketHandler implements MessageComponentInterface protected function limitConcurrentConnections(ConnectionInterface $connection) { if (! is_null($capacity = $connection->app->capacity)) { - $connectionsCount = $this->channelManager->getConnectionCount($connection->app->id); + $connectionsCount = $this->channelManager->getGlobalConnectionsCount($connection->app->id); - if ($connectionsCount >= $capacity) { - throw new ConnectionsOverCapacity(); + if ($connectionsCount instanceof PromiseInterface) { + $connectionsCount->then(function ($connectionsCount) use ($capacity, $connection) { + $this->sendExceptionIfOverCapacity($connectionsCount, $capacity, $connection); + }); + } else { + $this->throwExceptionIfOverCapacity($connectionsCount, $capacity); } } @@ -203,6 +221,41 @@ class WebSocketHandler implements MessageComponentInterface StatisticsLogger::connection($connection->app->id); + $this->replicator->subscribeToApp($connection->app->id); + return $this; } + + /** + * Throw a ConnectionsOverCapacity exception. + * + * @param int $connectionsCount + * @param int $capacity + * @return void + * @throws ConnectionsOverCapacity + */ + protected function throwExceptionIfOverCapacity(int $connectionsCount, int $capacity) + { + if ($connectionsCount >= $capacity) { + throw new ConnectionsOverCapacity; + } + } + + /** + * Send the ConnectionsOverCapacity exception through + * the connection and close the channel. + * + * @param int $connectionsCount + * @param int $capacity + * @param ConnectionInterface $connection + * @return void + */ + protected function sendExceptionIfOverCapacity(int $connectionsCount, int $capacity, ConnectionInterface $connection) + { + if ($connectionsCount >= $capacity) { + $payload = json_encode((new ConnectionsOverCapacity)->getPayload()); + + tap($connection)->send($payload)->close(); + } + } } diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index 5530ecd..a2ca289 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -59,9 +59,11 @@ class WebSocketsServiceProvider extends ServiceProvider }); $this->app->singleton(ChannelManager::class, function () { - $channelManager = config('websockets.managers.channel', ArrayChannelManager::class); + $replicationDriver = config('websockets.replication.driver', 'local'); - return new $channelManager; + $class = config("websockets.replication.{$replicationDriver}.channel_manager", ArrayChannelManager::class); + + return new $class; }); $this->app->singleton(AppManager::class, function () { @@ -69,12 +71,13 @@ class WebSocketsServiceProvider extends ServiceProvider }); $this->app->singleton(StatisticsDriver::class, function () { - $driver = config('websockets.statistics.driver'); + $driver = config('websockets.statistics.driver', 'local'); return $this->app->make( - config('websockets.statistics')[$driver]['driver'] - ?? - \BeyondCode\LaravelWebSockets\Statistics\Drivers\DatabaseDriver::class + config( + "websockets.statistics.{$driver}.driver", + \BeyondCode\LaravelWebSockets\Statistics\Drivers\DatabaseDriver::class + ) ); }); } diff --git a/tests/Channels/PresenceChannelReplicationTest.php b/tests/Channels/PresenceChannelReplicationTest.php index e753f08..67ade9f 100644 --- a/tests/Channels/PresenceChannelReplicationTest.php +++ b/tests/Channels/PresenceChannelReplicationTest.php @@ -52,6 +52,10 @@ class PresenceChannelReplicationTest extends TestCase ]) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-channel']) ->assertCalled('publish'); + + $this->assertNotNull( + $this->redis->hget('laravel_database_1234:presence-channel', $connection->socketId) + ); } /** @test */ @@ -130,7 +134,7 @@ class PresenceChannelReplicationTest extends TestCase $this->getPublishClient() ->assertCalled('hset') - ->assertcalledWithArgs('hgetall', ['laravel_database_1234:presence-channel']) + ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-channel']) ->assertCalled('publish'); } } diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index 3e17566..60392d4 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -31,6 +31,8 @@ class ConnectionTest extends TestCase /** @test */ public function app_can_not_exceed_maximum_capacity() { + $this->runOnlyOnLocalReplication(); + $this->app['config']->set('websockets.apps.0.capacity', 2); $this->getConnectedWebSocketConnection(['test-channel']); @@ -39,6 +41,28 @@ class ConnectionTest extends TestCase $this->getConnectedWebSocketConnection(['test-channel']); } + /** @test */ + public function app_can_not_exceed_maximum_capacity_on_redis_replication() + { + $this->runOnlyOnRedisReplication(); + + $this->redis->hdel('laravel_database_1234', 'connections'); + + $this->app['config']->set('websockets.apps.0.capacity', 2); + + $this->getConnectedWebSocketConnection(['test-channel']); + $this->getConnectedWebSocketConnection(['test-channel']); + + $this->getPublishClient() + ->assertCalledWithArgsCount(2, 'hincrby', ['laravel_database_1234', 'connections', 1]); + + $failedConnection = $this->getConnectedWebSocketConnection(['test-channel']); + + $failedConnection + ->assertSentEvent('pusher:error', ['data' => ['message' => 'Over capacity', 'code' => 4100]]) + ->assertClosed(); + } + /** @test */ public function successful_connections_have_the_app_attached() { diff --git a/tests/Dashboard/RedisStatisticsTest.php b/tests/Dashboard/RedisStatisticsTest.php new file mode 100644 index 0000000..52b0148 --- /dev/null +++ b/tests/Dashboard/RedisStatisticsTest.php @@ -0,0 +1,73 @@ +runOnlyOnRedisReplication(); + } + + /** @test */ + public function can_get_statistics() + { + $connection = $this->getConnectedWebSocketConnection(['channel-1']); + + $logger = new RedisStatisticsLogger( + $this->channelManager, + $this->statisticsDriver + ); + + $logger->webSocketMessage($connection->app->id); + $logger->apiMessage($connection->app->id); + $logger->connection($connection->app->id); + $logger->disconnection($connection->app->id); + + $logger->save(); + + $this->actingAs(factory(User::class)->create()) + ->json('GET', route('laravel-websockets.statistics', ['appId' => '1234'])) + ->assertResponseOk() + ->seeJsonStructure([ + 'peak_connections' => ['x', 'y'], + 'websocket_message_count' => ['x', 'y'], + 'api_message_count' => ['x', 'y'], + ]); + } + + /** @test */ + public function cant_get_statistics_for_invalid_app_id() + { + $connection = $this->getConnectedWebSocketConnection(['channel-1']); + + $logger = new RedisStatisticsLogger( + $this->channelManager, + $this->statisticsDriver + ); + + $logger->webSocketMessage($connection->app->id); + $logger->apiMessage($connection->app->id); + $logger->connection($connection->app->id); + $logger->disconnection($connection->app->id); + + $logger->save(); + + $this->actingAs(factory(User::class)->create()) + ->json('GET', route('laravel-websockets.statistics', ['appId' => 'not_found'])) + ->seeJson([ + 'peak_connections' => ['x' => [], 'y' => []], + 'websocket_message_count' => ['x' => [], 'y' => []], + 'api_message_count' => ['x' => [], 'y' => []], + ]); + } +} diff --git a/tests/Dashboard/StatisticsTest.php b/tests/Dashboard/StatisticsTest.php index 94af6c5..9de6354 100644 --- a/tests/Dashboard/StatisticsTest.php +++ b/tests/Dashboard/StatisticsTest.php @@ -8,6 +8,16 @@ use BeyondCode\LaravelWebSockets\Tests\TestCase; class StatisticsTest extends TestCase { + /** + * {@inheritdoc} + */ + public function setUp(): void + { + parent::setUp(); + + $this->runOnlyOnLocalReplication(); + } + /** @test */ public function can_get_statistics() { diff --git a/tests/Mocks/Connection.php b/tests/Mocks/Connection.php index 904a7a6..f7fb5b4 100644 --- a/tests/Mocks/Connection.php +++ b/tests/Mocks/Connection.php @@ -63,7 +63,7 @@ class Connection implements ConnectionInterface * * @param string $name * @param array $additionalParameters - * @return void + * @return $this */ public function assertSentEvent(string $name, array $additionalParameters = []) { @@ -76,13 +76,15 @@ class Connection implements ConnectionInterface foreach ($additionalParameters as $parameter => $value) { PHPUnit::assertSame($event[$parameter], $value); } + + return $this; } /** * Assert that an event got not sent. * * @param string $name - * @return void + * @return $this */ public function assertNotSentEvent(string $name) { @@ -91,15 +93,19 @@ class Connection implements ConnectionInterface PHPUnit::assertTrue( is_null($event) ); + + return $this; } /** * Assert the connection is closed. * - * @return void + * @return $this */ public function assertClosed() { PHPUnit::assertTrue($this->closed); + + return $this; } } diff --git a/tests/Mocks/FakeMemoryStatisticsLogger.php b/tests/Mocks/FakeMemoryStatisticsLogger.php index 88f1e11..5cc3872 100644 --- a/tests/Mocks/FakeMemoryStatisticsLogger.php +++ b/tests/Mocks/FakeMemoryStatisticsLogger.php @@ -12,7 +12,8 @@ class FakeMemoryStatisticsLogger extends MemoryStatisticsLogger public function save() { foreach ($this->statistics as $appId => $statistic) { - $currentConnectionCount = $this->channelManager->getConnectionCount($appId); + $currentConnectionCount = $this->channelManager->getGlobalConnectionsCount($appId); + $statistic->reset($currentConnectionCount); } } diff --git a/tests/Mocks/FakeRedisStatisticsLogger.php b/tests/Mocks/FakeRedisStatisticsLogger.php new file mode 100644 index 0000000..8fae00d --- /dev/null +++ b/tests/Mocks/FakeRedisStatisticsLogger.php @@ -0,0 +1,24 @@ + $appId, + 'peak_connection_count' => $this->redis->hget($this->getHash($appId), 'peak_connection_count') ?: 0, + 'websocket_message_count' => $this->redis->hget($this->getHash($appId), 'websocket_message_count') ?: 0, + 'api_message_count' => $this->redis->hget($this->getHash($appId), 'api_message_count') ?: 0, + ]; + } +} diff --git a/tests/Mocks/LazyClient.php b/tests/Mocks/LazyClient.php index ab3e224..0382a6f 100644 --- a/tests/Mocks/LazyClient.php +++ b/tests/Mocks/LazyClient.php @@ -2,8 +2,11 @@ namespace BeyondCode\LaravelWebSockets\Tests\Mocks; +use Clue\React\Redis\Factory; use Clue\React\Redis\LazyClient as BaseLazyClient; +use Illuminate\Support\Facades\Redis; use PHPUnit\Framework\Assert as PHPUnit; +use React\EventLoop\LoopInterface; class LazyClient extends BaseLazyClient { @@ -21,6 +24,31 @@ class LazyClient extends BaseLazyClient */ protected $events = []; + /** + * The Redis manager instance. + * + * @var \Illuminate\Redis\RedisManager + */ + protected $redis; + + /** + * The loop. + * + * @var \React\EventLoop\LoopInterface + */ + protected $loop; + + /** + * {@inheritdoc} + */ + public function __construct($target, Factory $factory, LoopInterface $loop) + { + parent::__construct($target, $factory, $loop); + + $this->loop = $loop; + $this->redis = Redis::connection(); + } + /** * {@inheritdoc} */ @@ -28,7 +56,13 @@ class LazyClient extends BaseLazyClient { $this->calls[] = [$name, $args]; - return parent::__call($name, $args); + if (! in_array($name, ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'onMessage'])) { + $this->redis->__call($name, $args); + } + + return new PromiseResolver( + parent::__call($name, $args), $this->loop + ); } /** @@ -88,6 +122,26 @@ class LazyClient extends BaseLazyClient return $this; } + /** + * Check if the method with args got called an amount of times. + * + * @param string $name + * @param array $args + * @return $this + */ + public function assertCalledWithArgsCount($times = 1, $name, array $args) + { + $total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) { + [$calledName, $calledArgs] = $function; + + return $calledName === $name && $calledArgs === $args; + }); + + PHPUnit::assertCount($times, $total); + + return $this; + } + /** * Check if the method didn't call. * @@ -135,6 +189,26 @@ class LazyClient extends BaseLazyClient return $this; } + /** + * Check if the method with args got called an amount of times. + * + * @param string $name + * @param array $args + * @return $this + */ + public function assertNotCalledWithArgsCount($times = 1, $name, array $args) + { + $total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) { + [$calledName, $calledArgs] = $function; + + return $calledName === $name && $calledArgs === $args; + }); + + PHPUnit::assertNotCount($times, $total); + + return $this; + } + /** * Check if no function got called. * diff --git a/tests/Mocks/PromiseResolver.php b/tests/Mocks/PromiseResolver.php new file mode 100644 index 0000000..e5d9aac --- /dev/null +++ b/tests/Mocks/PromiseResolver.php @@ -0,0 +1,67 @@ +promise = $promise; + $this->loop = $loop; + } + + /** + * Intercept the promise then() and run it in sync. + * + * @param callable|null $onFulfilled + * @param callable|null $onRejected + * @param callable|null $onProgress + * @return PromiseInterface + */ + public function then(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null) + { + $result = Block\await( + $this->promise, $this->loop + ); + + $onFulfilled($result); + + return $this->promise; + } + + /** + * Pass the calls to the promise. + * + * @param string $method + * @param array $args + * @return mixed + */ + public function __call($method, $args) + { + return call_user_func([$this->promise, $method], $args); + } +} diff --git a/tests/PubSub/RedisDriverTest.php b/tests/PubSub/RedisDriverTest.php index 361b30e..b018fcc 100644 --- a/tests/PubSub/RedisDriverTest.php +++ b/tests/PubSub/RedisDriverTest.php @@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\Tests\PubSub; use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; use BeyondCode\LaravelWebSockets\Tests\Mocks\RedisFactory; use BeyondCode\LaravelWebSockets\Tests\TestCase; +use Illuminate\Support\Facades\Redis; use React\EventLoop\Factory as LoopFactory; class RedisDriverTest extends TestCase @@ -17,6 +18,8 @@ class RedisDriverTest extends TestCase parent::setUp(); $this->runOnlyOnRedisReplication(); + + Redis::hdel('laravel_database_1234', 'connections'); } /** @test */ @@ -80,4 +83,40 @@ class RedisDriverTest extends TestCase $client->getSubscribeClient() ->assertEventDispatched('message'); } + + /** @test */ + public function redis_tracks_app_connections_count() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $this->getSubscribeClient() + ->assertCalledWithArgs('subscribe', ['laravel_database_1234']); + + $this->getPublishClient() + ->assertCalledWithArgs('hincrby', ['laravel_database_1234', 'connections', 1]); + } + + /** @test */ + public function redis_tracks_app_connections_count_on_disconnect() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $this->getSubscribeClient() + ->assertCalledWithArgs('subscribe', ['laravel_database_1234']) + ->assertNotCalledWithArgs('unsubscribe', ['laravel_database_1234']); + + $this->getPublishClient() + ->assertCalledWithArgs('hincrby', ['laravel_database_1234', 'connections', 1]); + + $this->pusherServer->onClose($connection); + + $this->getPublishClient() + ->assertCalledWithArgs('hincrby', ['laravel_database_1234', 'connections', -1]); + + $this->assertEquals(0, Redis::hget('laravel_database_1234', 'connections')); + } } diff --git a/tests/Statistics/Logger/RedisStatisticsLoggerTest.php b/tests/Statistics/Logger/RedisStatisticsLoggerTest.php new file mode 100644 index 0000000..1b70b7f --- /dev/null +++ b/tests/Statistics/Logger/RedisStatisticsLoggerTest.php @@ -0,0 +1,102 @@ +runOnlyOnRedisReplication(); + + StatisticsLogger::resetStatistics('1234', 0); + StatisticsLogger::resetAppTraces('1234'); + + $this->redis->hdel('laravel_database_1234', 'connections'); + + $this->getPublishClient()->resetAssertions(); + } + + /** @test */ + public function it_counts_connections_on_redis_replication() + { + $connections = []; + + $connections[] = $this->getConnectedWebSocketConnection(['channel-1']); + $connections[] = $this->getConnectedWebSocketConnection(['channel-1']); + $connections[] = $this->getConnectedWebSocketConnection(['channel-1']); + + $this->getPublishClient() + ->assertCalledWithArgsCount(6, 'sadd', ['laravel-websockets:apps', '1234']) + ->assertCalledWithArgsCount(3, 'hincrby', ['laravel-websockets:app:1234', 'current_connection_count', 1]) + ->assertCalledWithArgsCount(3, 'hincrby', ['laravel-websockets:app:1234', 'websocket_message_count', 1]); + + $this->pusherServer->onClose(array_pop($connections)); + + StatisticsLogger::save(); + + $this->getPublishClient() + ->assertCalledWithArgs('hincrby', ['laravel-websockets:app:1234', 'current_connection_count', -1]) + ->assertCalledWithArgs('smembers', ['laravel-websockets:apps']); + } + + /** @test */ + public function it_counts_unique_connections_no_channel_subscriptions_on_redis() + { + $connections = []; + + $connections[] = $this->getConnectedWebSocketConnection(['channel-1', 'channel-2']); + $connections[] = $this->getConnectedWebSocketConnection(['channel-1', 'channel-2']); + $connections[] = $this->getConnectedWebSocketConnection(['channel-1']); + + $this->getPublishClient() + ->assertCalledWithArgsCount(3, 'hincrby', ['laravel-websockets:app:1234', 'current_connection_count', 1]) + ->assertCalledWithArgsCount(5, 'hincrby', ['laravel-websockets:app:1234', 'websocket_message_count', 1]); + + $this->pusherServer->onClose(array_pop($connections)); + $this->pusherServer->onClose(array_pop($connections)); + + StatisticsLogger::save(); + + $this->getPublishClient() + ->assertCalledWithArgsCount(2, 'hincrby', ['laravel-websockets:app:1234', 'current_connection_count', -1]) + ->assertCalledWithArgs('smembers', ['laravel-websockets:apps']); + } + + /** @test */ + public function it_counts_connections_with_redis_logger_with_no_data() + { + config(['cache.default' => 'redis']); + + $logger = new RedisStatisticsLogger( + $this->channelManager, + $this->statisticsDriver + ); + + $logger->resetAppTraces('1'); + $logger->resetAppTraces('1234'); + + $connection = $this->getConnectedWebSocketConnection(['channel-1']); + + $logger->apiMessage($connection->app->id); + + $logger->save(); + + $this->assertCount(1, WebSocketsStatisticsEntry::all()); + + $entry = WebSocketsStatisticsEntry::first(); + + $this->assertEquals(1, $entry->peak_connection_count); + $this->assertEquals(1, $entry->websocket_message_count); + $this->assertEquals(1, $entry->api_message_count); + } +} diff --git a/tests/Statistics/Logger/StatisticsLoggerTest.php b/tests/Statistics/Logger/StatisticsLoggerTest.php index 8374609..08a8039 100644 --- a/tests/Statistics/Logger/StatisticsLoggerTest.php +++ b/tests/Statistics/Logger/StatisticsLoggerTest.php @@ -5,12 +5,21 @@ namespace BeyondCode\LaravelWebSockets\Tests\Statistics\Controllers; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger; use BeyondCode\LaravelWebSockets\Statistics\Logger\NullStatisticsLogger; -use BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger; use BeyondCode\LaravelWebSockets\Statistics\Models\WebSocketsStatisticsEntry; use BeyondCode\LaravelWebSockets\Tests\TestCase; class StatisticsLoggerTest extends TestCase { + /** + * {@inheritdoc} + */ + public function setUp(): void + { + parent::setUp(); + + $this->runOnlyOnLocalReplication(); + } + /** @test */ public function it_counts_connections() { @@ -93,68 +102,4 @@ class StatisticsLoggerTest extends TestCase $this->assertCount(0, WebSocketsStatisticsEntry::all()); } - - /** @test */ - public function it_counts_connections_with_redis_logger_with_no_data() - { - $this->runOnlyOnRedisReplication(); - - config(['cache.default' => 'redis']); - - $connection = $this->getConnectedWebSocketConnection(['channel-1']); - - $logger = new RedisStatisticsLogger( - $this->channelManager, - $this->statisticsDriver - ); - - $logger->resetAppTraces('1234'); - - $logger->webSocketMessage($connection->app->id); - $logger->apiMessage($connection->app->id); - $logger->connection($connection->app->id); - $logger->disconnection($connection->app->id); - - $logger->save(); - - $this->assertCount(1, WebSocketsStatisticsEntry::all()); - - $entry = WebSocketsStatisticsEntry::first(); - - $this->assertEquals(1, $entry->peak_connection_count); - $this->assertEquals(1, $entry->websocket_message_count); - $this->assertEquals(1, $entry->api_message_count); - } - - /** @test */ - public function it_counts_connections_with_redis_logger_with_existing_data() - { - $this->runOnlyOnRedisReplication(); - - config(['cache.default' => 'redis']); - - $connection = $this->getConnectedWebSocketConnection(['channel-1']); - - $logger = new RedisStatisticsLogger( - $this->channelManager, - $this->statisticsDriver - ); - - $logger->resetStatistics('1234', 0); - - $logger->webSocketMessage($connection->app->id); - $logger->apiMessage($connection->app->id); - $logger->connection($connection->app->id); - $logger->disconnection($connection->app->id); - - $logger->save(); - - $this->assertCount(1, WebSocketsStatisticsEntry::all()); - - $entry = WebSocketsStatisticsEntry::first(); - - $this->assertEquals(1, $entry->peak_connection_count); - $this->assertEquals(1, $entry->websocket_message_count); - $this->assertEquals(1, $entry->api_message_count); - } } diff --git a/tests/TestCase.php b/tests/TestCase.php index ade4b52..d83bd9b 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -7,9 +7,11 @@ use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Statistics\Drivers\StatisticsDriver; use BeyondCode\LaravelWebSockets\Tests\Mocks\Connection; use BeyondCode\LaravelWebSockets\Tests\Mocks\FakeMemoryStatisticsLogger; +use BeyondCode\LaravelWebSockets\Tests\Mocks\FakeRedisStatisticsLogger; use BeyondCode\LaravelWebSockets\Tests\Mocks\Message; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use GuzzleHttp\Psr7\Request; +use Illuminate\Support\Facades\Redis; use Orchestra\Testbench\BrowserKit\TestCase as BaseTestCase; use Ratchet\ConnectionInterface; use React\EventLoop\Factory as LoopFactory; @@ -37,6 +39,20 @@ abstract class TestCase extends BaseTestCase */ protected $statisticsDriver; + /** + * The Redis manager instance. + * + * @var \Illuminate\Redis\RedisManager + */ + protected $redis; + + /** + * Get the loop instance. + * + * @var \React\EventLoop\LoopInterface + */ + protected $loop; + /** * {@inheritdoc} */ @@ -44,26 +60,25 @@ abstract class TestCase extends BaseTestCase { parent::setUp(); + $this->loop = LoopFactory::create(); + $this->resetDatabase(); $this->loadLaravelMigrations(['--database' => 'sqlite']); $this->withFactories(__DIR__.'/database/factories'); - $this->pusherServer = $this->app->make(config('websockets.handlers.websocket')); + $this->configurePubSub(); $this->channelManager = $this->app->make(ChannelManager::class); $this->statisticsDriver = $this->app->make(StatisticsDriver::class); - StatisticsLogger::swap(new FakeMemoryStatisticsLogger( - $this->channelManager, - app(StatisticsDriver::class) - )); + $this->configureStatisticsLogger(); $this->loadMigrationsFrom(__DIR__.'/../database/migrations'); - $this->configurePubSub(); + $this->pusherServer = $this->app->make(config('websockets.handlers.websocket')); } /** @@ -151,6 +166,7 @@ abstract class TestCase extends BaseTestCase if (in_array($replicationDriver, ['redis'])) { $app['config']->set('broadcasting.default', 'pusher'); + $app['config']->set('cache.default', 'redis'); } } @@ -254,20 +270,49 @@ abstract class TestCase extends BaseTestCase */ protected function configurePubSub() { + $replicationDriver = config('websockets.replication.driver', 'local'); + // Replace the publish and subscribe clients with a Mocked // factory lazy instance on boot. - $this->app->singleton(ReplicationInterface::class, function () { - $driver = config('websockets.replication.driver', 'local'); - + $this->app->singleton(ReplicationInterface::class, function () use ($replicationDriver) { $client = config( - "websockets.replication.{$driver}.client", + "websockets.replication.{$replicationDriver}.client", \BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient::class ); return (new $client)->boot( - LoopFactory::create(), Mocks\RedisFactory::class + $this->loop, Mocks\RedisFactory::class ); }); + + if ($replicationDriver === 'redis') { + $this->redis = Redis::connection(); + } + } + + /** + * Configure the statistics logger for the right driver. + * + * @return void + */ + protected function configureStatisticsLogger() + { + $replicationDriver = getenv('REPLICATION_DRIVER') ?: 'local'; + + if ($replicationDriver === 'local') { + StatisticsLogger::swap(new FakeMemoryStatisticsLogger( + $this->channelManager, + app(StatisticsDriver::class) + )); + } + + if ($replicationDriver === 'redis') { + StatisticsLogger::swap(new FakeRedisStatisticsLogger( + $this->channelManager, + app(StatisticsDriver::class), + $this->app->make(ReplicationInterface::class) + )); + } } protected function runOnlyOnRedisReplication()