From b9dfecab6857c63104bcacf495cf513305f36b6a Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Fri, 4 Sep 2020 11:34:33 +0300 Subject: [PATCH] Using separate connection counts for global & local. --- src/PubSub/Drivers/LocalClient.php | 13 ++++++++++++- src/PubSub/Drivers/RedisClient.php | 17 +++++++++++++---- src/PubSub/ReplicationInterface.php | 10 +++++++++- .../Logger/MemoryStatisticsLogger.php | 2 +- src/Statistics/Logger/RedisStatisticsLogger.php | 2 +- src/WebSockets/Channels/ChannelManager.php | 10 +++++++++- .../ChannelManagers/ArrayChannelManager.php | 13 ++++++++++++- .../ChannelManagers/RedisChannelManager.php | 6 +++--- src/WebSockets/WebSocketHandler.php | 2 +- tests/Mocks/FakeMemoryStatisticsLogger.php | 2 +- .../Statistics/Logger/StatisticsLoggerTest.php | 4 ++-- 11 files changed, 64 insertions(+), 17 deletions(-) diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php index 7a4c2a5..67a1d29 100644 --- a/src/PubSub/Drivers/LocalClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -164,9 +164,20 @@ class LocalClient implements ReplicationInterface * Get the amount of unique connections. * * @param mixed $appId + * @return null|int + */ + public function getLocalConnectionsCount($appId) + { + return null; + } + + /** + * Get the amount of connections aggregated on multiple instances. + * + * @param mixed $appId * @return null|int|\React\Promise\PromiseInterface */ - public function appConnectionsCount($appId) + public function getGlobalConnectionsCount($appId) { return null; } diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index 0cc626c..182e458 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -292,13 +292,22 @@ class RedisClient extends LocalClient * Get the amount of unique connections. * * @param mixed $appId + * @return null|int + */ + public function getLocalConnectionsCount($appId) + { + return null; + } + + /** + * Get the amount of connections aggregated on multiple instances. + * + * @param mixed $appId * @return null|int|\React\Promise\PromiseInterface */ - public function appConnectionsCount($appId) + public function getGlobalConnectionsCount($appId) { - // Use the in-built Redis manager to avoid async run. - - return $this->publishClient->hget($this->getTopicName($appId), 'connections') ?: 0; + return $this->publishClient->hget($this->getTopicName($appId), 'connections'); } /** diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index 7c50ae6..5ca3ee3 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -106,7 +106,15 @@ interface ReplicationInterface * Get the amount of unique connections. * * @param mixed $appId + * @return null|int + */ + public function getLocalConnectionsCount($appId); + + /** + * Get the amount of connections aggregated on multiple instances. + * + * @param mixed $appId * @return null|int|\React\Promise\PromiseInterface */ - public function appConnectionsCount($appId); + public function getGlobalConnectionsCount($appId); } diff --git a/src/Statistics/Logger/MemoryStatisticsLogger.php b/src/Statistics/Logger/MemoryStatisticsLogger.php index c75fa33..f067502 100644 --- a/src/Statistics/Logger/MemoryStatisticsLogger.php +++ b/src/Statistics/Logger/MemoryStatisticsLogger.php @@ -105,7 +105,7 @@ class MemoryStatisticsLogger implements StatisticsLogger $this->createRecord($statistic, $appId); - $currentConnectionCount = $this->channelManager->getConnectionCount($appId); + $currentConnectionCount = $this->channelManager->getGlobalConnectionsCount($appId); $statistic->reset($currentConnectionCount); } diff --git a/src/Statistics/Logger/RedisStatisticsLogger.php b/src/Statistics/Logger/RedisStatisticsLogger.php index 22ec483..ccab93e 100644 --- a/src/Statistics/Logger/RedisStatisticsLogger.php +++ b/src/Statistics/Logger/RedisStatisticsLogger.php @@ -124,7 +124,7 @@ class RedisStatisticsLogger implements StatisticsLogger $this->createRecord($statistic, $appId); - $currentConnectionCount = $this->channelManager->getConnectionCount($appId); + $currentConnectionCount = $this->channelManager->getGlobalConnectionsCount($appId); $currentConnectionCount === 0 ? $this->resetAppTraces($appId) diff --git a/src/WebSockets/Channels/ChannelManager.php b/src/WebSockets/Channels/ChannelManager.php index fb1721a..7e67a64 100644 --- a/src/WebSockets/Channels/ChannelManager.php +++ b/src/WebSockets/Channels/ChannelManager.php @@ -38,7 +38,15 @@ interface ChannelManager * @param mixed $appId * @return int */ - public function getConnectionCount($appId): int; + public function getLocalConnectionsCount($appId): int; + + /** + * Get the connections count across multiple servers. + * + * @param mixed $appId + * @return int + */ + public function getGlobalConnectionsCount($appId): int; /** * Remove connection from all channels. diff --git a/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php b/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php index 8635a46..40a576c 100644 --- a/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php +++ b/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php @@ -73,7 +73,7 @@ class ArrayChannelManager implements ChannelManager * @param mixed $appId * @return int */ - public function getConnectionCount($appId): int + public function getLocalConnectionsCount($appId): int { return collect($this->getChannels($appId)) ->flatMap(function (Channel $channel) { @@ -83,6 +83,17 @@ class ArrayChannelManager implements ChannelManager ->count(); } + /** + * Get the connections count across multiple servers. + * + * @param mixed $appId + * @return int + */ + public function getGlobalConnectionsCount($appId): int + { + return $this->getLocalConnectionsCount($appId); + } + /** * Remove connection from all channels. * diff --git a/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php b/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php index ed701dd..0a9f030 100644 --- a/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php +++ b/src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php @@ -24,13 +24,13 @@ class RedisChannelManager extends ArrayChannelManager } /** - * Get the connections count on the app. + * Get the connections count across multiple servers. * * @param mixed $appId * @return int */ - public function getConnectionCount($appId): int + public function getGlobalConnectionsCount($appId): int { - return $this->replicator->appConnectionsCount($appId); + return $this->replicator->getGlobalConnectionsCount($appId); } } diff --git a/src/WebSockets/WebSocketHandler.php b/src/WebSockets/WebSocketHandler.php index 29f258a..b251ac0 100644 --- a/src/WebSockets/WebSocketHandler.php +++ b/src/WebSockets/WebSocketHandler.php @@ -165,7 +165,7 @@ class WebSocketHandler implements MessageComponentInterface protected function limitConcurrentConnections(ConnectionInterface $connection) { if (! is_null($capacity = $connection->app->capacity)) { - $connectionsCount = $this->channelManager->getConnectionCount($connection->app->id); + $connectionsCount = $this->channelManager->getGlobalConnectionsCount($connection->app->id); if ($connectionsCount >= $capacity) { throw new ConnectionsOverCapacity(); diff --git a/tests/Mocks/FakeMemoryStatisticsLogger.php b/tests/Mocks/FakeMemoryStatisticsLogger.php index 88f1e11..142c29c 100644 --- a/tests/Mocks/FakeMemoryStatisticsLogger.php +++ b/tests/Mocks/FakeMemoryStatisticsLogger.php @@ -12,7 +12,7 @@ class FakeMemoryStatisticsLogger extends MemoryStatisticsLogger public function save() { foreach ($this->statistics as $appId => $statistic) { - $currentConnectionCount = $this->channelManager->getConnectionCount($appId); + $currentConnectionCount = $this->channelManager->getLocalConnectionsCount($appId); $statistic->reset($currentConnectionCount); } } diff --git a/tests/Statistics/Logger/StatisticsLoggerTest.php b/tests/Statistics/Logger/StatisticsLoggerTest.php index a2b1e7b..196e589 100644 --- a/tests/Statistics/Logger/StatisticsLoggerTest.php +++ b/tests/Statistics/Logger/StatisticsLoggerTest.php @@ -49,7 +49,7 @@ class StatisticsLoggerTest extends TestCase StatisticsLogger::save(); - $this->assertEquals(3, StatisticsLogger::getForAppId(1234)['peak_connection_count']); + $this->assertEquals(2, StatisticsLogger::getForAppId(1234)['peak_connection_count']); } /** @test */ @@ -93,7 +93,7 @@ class StatisticsLoggerTest extends TestCase StatisticsLogger::save(); - $this->assertEquals(3, StatisticsLogger::getForAppId(1234)['peak_connection_count']); + $this->assertEquals(1, StatisticsLogger::getForAppId(1234)['peak_connection_count']); } /** @test */