Merge pull request #497 from beyondcode/fix/app-connections-count

[fix] Redis connection counter didn't work properly
This commit is contained in:
rennokki 2020-09-06 11:02:11 +03:00 committed by GitHub
commit c078e5a2b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 880 additions and 158 deletions

View File

@ -36,11 +36,12 @@
"illuminate/routing": "^6.0|^7.0",
"illuminate/support": "^6.0|^7.0",
"pusher/pusher-php-server": "^3.0|^4.0",
"react/dns": "^1.1",
"react/promise": "^2.0",
"symfony/http-kernel": "^4.0|^5.0",
"symfony/psr-http-message-bridge": "^1.1|^2.0"
},
"require-dev": {
"clue/block-react": "^1.4",
"mockery/mockery": "^1.3",
"orchestra/testbench-browser-kit": "^4.0|^5.0",
"phpunit/phpunit": "^8.0|^9.0"

View File

@ -42,21 +42,6 @@ return [
'app' => \BeyondCode\LaravelWebSockets\Apps\ConfigAppManager::class,
/*
|--------------------------------------------------------------------------
| Channel Manager
|--------------------------------------------------------------------------
|
| When users subscribe or unsubscribe from specific channels,
| the connections are stored to keep track of any interaction with the
| WebSocket server.
| You can however add your own implementation that will help the store
| of the channels alongside their connections.
|
*/
'channel' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,
],
/*
@ -191,6 +176,8 @@ return [
'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger::class,
'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,
],
/*
@ -214,6 +201,8 @@ return [
'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger::class,
'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\RedisChannelManager::class,
],
],

View File

@ -66,6 +66,28 @@ class LocalClient implements ReplicationInterface
return true;
}
/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool
{
return true;
}
/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool
{
return true;
}
/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
@ -137,4 +159,26 @@ class LocalClient implements ReplicationInterface
return new FulfilledPromise($results);
}
/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int
*/
public function getLocalConnectionsCount($appId)
{
return null;
}
/**
* Get the amount of connections aggregated on multiple instances.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function getGlobalConnectionsCount($appId)
{
return null;
}
}

View File

@ -175,6 +175,36 @@ class RedisClient extends LocalClient
return true;
}
/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool
{
$this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]);
$this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', 1]);
return true;
}
/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool
{
$this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]);
$this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', -1]);
return true;
}
/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
@ -258,6 +288,17 @@ class RedisClient extends LocalClient
});
}
/**
* Get the amount of connections aggregated on multiple instances.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function getGlobalConnectionsCount($appId)
{
return $this->publishClient->hget($this->getTopicName($appId), 'connections');
}
/**
* Handle a message received from Redis on a specific channel.
*
@ -321,8 +362,8 @@ class RedisClient extends LocalClient
*/
protected function getConnectionUri()
{
$name = config('websockets.replication.redis.connection') ?: 'default';
$config = config('database.redis')[$name];
$name = config('websockets.replication.redis.connection', 'default');
$config = config("database.redis.{$name}");
$host = $config['host'];
$port = $config['port'] ?: 6379;
@ -377,13 +418,19 @@ class RedisClient extends LocalClient
* app ID and channel name.
*
* @param mixed $appId
* @param string $channel
* @param string|null $channel
* @return string
*/
protected function getTopicName($appId, string $channel): string
protected function getTopicName($appId, string $channel = null): string
{
$prefix = config('database.redis.options.prefix', null);
return "{$prefix}{$appId}:{$channel}";
$hash = "{$prefix}{$appId}";
if ($channel) {
$hash .= ":{$channel}";
}
return $hash;
}
}

View File

@ -45,6 +45,22 @@ interface ReplicationInterface
*/
public function unsubscribe($appId, string $channel): bool;
/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool;
/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool;
/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
@ -85,4 +101,20 @@ interface ReplicationInterface
* @return PromiseInterface
*/
public function channelMemberCounts($appId, array $channelNames): PromiseInterface;
/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int
*/
public function getLocalConnectionsCount($appId);
/**
* Get the amount of connections aggregated on multiple instances.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function getGlobalConnectionsCount($appId);
}

View File

@ -105,7 +105,7 @@ class MemoryStatisticsLogger implements StatisticsLogger
$this->createRecord($statistic, $appId);
$currentConnectionCount = $this->channelManager->getConnectionCount($appId);
$currentConnectionCount = $this->channelManager->getGlobalConnectionsCount($appId);
$statistic->reset($currentConnectionCount);
}

View File

@ -3,10 +3,11 @@
namespace BeyondCode\LaravelWebSockets\Statistics\Logger;
use BeyondCode\LaravelWebSockets\Apps\App;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\Statistics\Drivers\StatisticsDriver;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use Illuminate\Cache\RedisLock;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Redis;
class RedisStatisticsLogger implements StatisticsLogger
{
@ -42,7 +43,11 @@ class RedisStatisticsLogger implements StatisticsLogger
{
$this->channelManager = $channelManager;
$this->driver = $driver;
$this->redis = Cache::getRedis();
$this->replicator = app(ReplicationInterface::class);
$this->redis = Redis::connection(
config('websockets.replication.redis.connection', 'default')
);
}
/**
@ -54,7 +59,7 @@ class RedisStatisticsLogger implements StatisticsLogger
public function webSocketMessage($appId)
{
$this->ensureAppIsSet($appId)
->hincrby($this->getHash($appId), 'websocket_message_count', 1);
->__call('hincrby', [$this->getHash($appId), 'websocket_message_count', 1]);
}
/**
@ -66,7 +71,7 @@ class RedisStatisticsLogger implements StatisticsLogger
public function apiMessage($appId)
{
$this->ensureAppIsSet($appId)
->hincrby($this->getHash($appId), 'api_message_count', 1);
->__call('hincrby', [$this->getHash($appId), 'api_message_count', 1]);
}
/**
@ -77,16 +82,30 @@ class RedisStatisticsLogger implements StatisticsLogger
*/
public function connection($appId)
{
$currentConnectionCount = $this->ensureAppIsSet($appId)
->hincrby($this->getHash($appId), 'current_connection_count', 1);
// Increment the current connections count by 1.
$incremented = $this->ensureAppIsSet($appId)
->__call('hincrby', [$this->getHash($appId), 'current_connection_count', 1]);
$currentPeakConnectionCount = $this->redis->hget($this->getHash($appId), 'peak_connection_count');
$incremented->then(function ($currentConnectionCount) use ($appId) {
// Get the peak connections count from Redis.
$peakConnectionCount = $this->replicator
->getPublishClient()
->__call('hget', [$this->getHash($appId), 'peak_connection_count']);
$peakConnectionCount = is_null($currentPeakConnectionCount)
? $currentConnectionCount
: max($currentPeakConnectionCount, $currentConnectionCount);
$peakConnectionCount->then(function ($currentPeakConnectionCount) use ($currentConnectionCount, $appId) {
// Extract the greatest number between the current peak connection count
// and the current connection number.
$this->redis->hset($this->getHash($appId), 'peak_connection_count', $peakConnectionCount);
$peakConnectionCount = is_null($currentPeakConnectionCount)
? $currentConnectionCount
: max($currentPeakConnectionCount, $currentConnectionCount);
// Then set it to the database.
$this->replicator
->getPublishClient()
->__call('hset', [$this->getHash($appId), 'peak_connection_count', $peakConnectionCount]);
});
});
}
/**
@ -97,16 +116,30 @@ class RedisStatisticsLogger implements StatisticsLogger
*/
public function disconnection($appId)
{
$currentConnectionCount = $this->ensureAppIsSet($appId)
->hincrby($this->getHash($appId), 'current_connection_count', -1);
// Decrement the current connections count by 1.
$decremented = $this->ensureAppIsSet($appId)
->__call('hincrby', [$this->getHash($appId), 'current_connection_count', -1]);
$currentPeakConnectionCount = $this->redis->hget($this->getHash($appId), 'peak_connection_count');
$decremented->then(function ($currentConnectionCount) use ($appId) {
// Get the peak connections count from Redis.
$peakConnectionCount = $this->replicator
->getPublishClient()
->__call('hget', [$this->getHash($appId), 'peak_connection_count']);
$peakConnectionCount = is_null($currentPeakConnectionCount)
? $currentConnectionCount
: max($currentPeakConnectionCount, $currentConnectionCount);
$peakConnectionCount->then(function ($currentPeakConnectionCount) use ($currentConnectionCount, $appId) {
// Extract the greatest number between the current peak connection count
// and the current connection number.
$this->redis->hset($this->getHash($appId), 'peak_connection_count', $peakConnectionCount);
$peakConnectionCount = is_null($currentPeakConnectionCount)
? $currentConnectionCount
: max($currentPeakConnectionCount, $currentConnectionCount);
// Then set it to the database.
$this->replicator
->getPublishClient()
->__call('hset', [$this->getHash($appId), 'peak_connection_count', $peakConnectionCount]);
});
});
}
/**
@ -117,19 +150,44 @@ class RedisStatisticsLogger implements StatisticsLogger
public function save()
{
$this->lock()->get(function () {
foreach ($this->redis->smembers('laravel-websockets:apps') as $appId) {
if (! $statistic = $this->redis->hgetall($this->getHash($appId))) {
continue;
$setMembers = $this->replicator
->getPublishClient()
->__call('smembers', ['laravel-websockets:apps']);
$setMembers->then(function ($members) {
foreach ($members as $appId) {
$member = $this->replicator
->getPublishClient()
->__call('hgetall', [$this->getHash($appId)]);
$member->then(function ($statistic) use ($appId) {
if (! $statistic) {
return;
}
// Statistics come into a list where the keys are on even indexes
// and the values are on odd indexes. This way, we know which
// ones are keys and which ones are values and their get combined
// later to form the key => value array
[$keys, $values] = collect($statistic)->partition(function ($value, $key) {
return $key % 2 === 0;
});
$statistic = array_combine($keys->all(), $values->all());
$this->createRecord($statistic, $appId);
$this->channelManager
->getGlobalConnectionsCount($appId)
->then(function ($currentConnectionCount) use ($appId) {
$currentConnectionCount === 0
? $this->resetAppTraces($appId)
: $this->resetStatistics($appId, $currentConnectionCount);
});
});
}
$this->createRecord($statistic, $appId);
$currentConnectionCount = $this->channelManager->getConnectionCount($appId);
$currentConnectionCount === 0
? $this->resetAppTraces($appId)
: $this->resetStatistics($appId, $currentConnectionCount);
}
});
});
}
@ -141,9 +199,11 @@ class RedisStatisticsLogger implements StatisticsLogger
*/
protected function ensureAppIsSet($appId)
{
$this->redis->sadd('laravel-websockets:apps', $appId);
$this->replicator
->getPublishClient()
->__call('sadd', ['laravel-websockets:apps', $appId]);
return $this->redis;
return $this->replicator->getPublishClient();
}
/**
@ -155,10 +215,21 @@ class RedisStatisticsLogger implements StatisticsLogger
*/
public function resetStatistics($appId, int $currentConnectionCount)
{
$this->redis->hset($this->getHash($appId), 'current_connection_count', $currentConnectionCount);
$this->redis->hset($this->getHash($appId), 'peak_connection_count', $currentConnectionCount);
$this->redis->hset($this->getHash($appId), 'websocket_message_count', 0);
$this->redis->hset($this->getHash($appId), 'api_message_count', 0);
$this->replicator
->getPublishClient()
->__call('hset', [$this->getHash($appId), 'current_connection_count', $currentConnectionCount]);
$this->replicator
->getPublishClient()
->__call('hset', [$this->getHash($appId), 'peak_connection_count', $currentConnectionCount]);
$this->replicator
->getPublishClient()
->__call('hset', [$this->getHash($appId), 'websocket_message_count', 0]);
$this->replicator
->getPublishClient()
->__call('hset', [$this->getHash($appId), 'api_message_count', 0]);
}
/**
@ -170,12 +241,25 @@ class RedisStatisticsLogger implements StatisticsLogger
*/
public function resetAppTraces($appId)
{
$this->redis->hdel($this->getHash($appId), 'current_connection_count');
$this->redis->hdel($this->getHash($appId), 'peak_connection_count');
$this->redis->hdel($this->getHash($appId), 'websocket_message_count');
$this->redis->hdel($this->getHash($appId), 'api_message_count');
$this->replicator
->getPublishClient()
->__call('hdel', [$this->getHash($appId), 'current_connection_count']);
$this->redis->srem('laravel-websockets:apps', $appId);
$this->replicator
->getPublishClient()
->__call('hdel', [$this->getHash($appId), 'peak_connection_count']);
$this->replicator
->getPublishClient()
->__call('hdel', [$this->getHash($appId), 'websocket_message_count']);
$this->replicator
->getPublishClient()
->__call('hdel', [$this->getHash($appId), 'api_message_count']);
$this->replicator
->getPublishClient()
->__call('srem', ['laravel-websockets:apps', $appId]);
}
/**

View File

@ -36,9 +36,17 @@ interface ChannelManager
* Get the connections count on the app.
*
* @param mixed $appId
* @return int
* @return int|\React\Promise\PromiseInterface
*/
public function getConnectionCount($appId): int;
public function getLocalConnectionsCount($appId): int;
/**
* Get the connections count across multiple servers.
*
* @param mixed $appId
* @return int|\React\Promise\PromiseInterface
*/
public function getGlobalConnectionsCount($appId);
/**
* Remove connection from all channels.

View File

@ -71,9 +71,9 @@ class ArrayChannelManager implements ChannelManager
* Get the connections count on the app.
*
* @param mixed $appId
* @return int
* @return int|\React\Promise\PromiseInterface
*/
public function getConnectionCount($appId): int
public function getLocalConnectionsCount($appId): int
{
return collect($this->getChannels($appId))
->flatMap(function (Channel $channel) {
@ -83,6 +83,17 @@ class ArrayChannelManager implements ChannelManager
->count();
}
/**
* Get the connections count across multiple servers.
*
* @param mixed $appId
* @return int|\React\Promise\PromiseInterface
*/
public function getGlobalConnectionsCount($appId)
{
return $this->getLocalConnectionsCount($appId);
}
/**
* Remove connection from all channels.
*

View File

@ -0,0 +1,36 @@
<?php
namespace BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
class RedisChannelManager extends ArrayChannelManager
{
/**
* The replicator driver.
*
* @var \BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface
*/
protected $replicator;
/**
* Initialize the channel manager.
*
* @return void
*/
public function __construct()
{
$this->replicator = app(ReplicationInterface::class);
}
/**
* Get the connections count across multiple servers.
*
* @param mixed $appId
* @return int|\React\Promise\PromiseInterface
*/
public function getGlobalConnectionsCount($appId)
{
return $this->replicator->getGlobalConnectionsCount($appId);
}
}

View File

@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\WebSockets;
use BeyondCode\LaravelWebSockets\Apps\App;
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\QueryParameters;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity;
@ -16,6 +17,7 @@ use Exception;
use Ratchet\ConnectionInterface;
use Ratchet\RFC6455\Messaging\MessageInterface;
use Ratchet\WebSocket\MessageComponentInterface;
use React\Promise\PromiseInterface;
class WebSocketHandler implements MessageComponentInterface
{
@ -26,6 +28,13 @@ class WebSocketHandler implements MessageComponentInterface
*/
protected $channelManager;
/**
* The replicator client.
*
* @var ReplicationInterface
*/
protected $replicator;
/**
* Initialize a new handler.
*
@ -35,6 +44,7 @@ class WebSocketHandler implements MessageComponentInterface
public function __construct(ChannelManager $channelManager)
{
$this->channelManager = $channelManager;
$this->replicator = app(ReplicationInterface::class);
}
/**
@ -83,6 +93,8 @@ class WebSocketHandler implements MessageComponentInterface
]);
StatisticsLogger::disconnection($connection->app->id);
$this->replicator->unsubscribeFromApp($connection->app->id);
}
/**
@ -99,6 +111,8 @@ class WebSocketHandler implements MessageComponentInterface
$exception->getPayload()
));
}
$this->replicator->unsubscribeFromApp($connection->app->id);
}
/**
@ -152,10 +166,14 @@ class WebSocketHandler implements MessageComponentInterface
protected function limitConcurrentConnections(ConnectionInterface $connection)
{
if (! is_null($capacity = $connection->app->capacity)) {
$connectionsCount = $this->channelManager->getConnectionCount($connection->app->id);
$connectionsCount = $this->channelManager->getGlobalConnectionsCount($connection->app->id);
if ($connectionsCount >= $capacity) {
throw new ConnectionsOverCapacity();
if ($connectionsCount instanceof PromiseInterface) {
$connectionsCount->then(function ($connectionsCount) use ($capacity, $connection) {
$this->sendExceptionIfOverCapacity($connectionsCount, $capacity, $connection);
});
} else {
$this->throwExceptionIfOverCapacity($connectionsCount, $capacity);
}
}
@ -203,6 +221,41 @@ class WebSocketHandler implements MessageComponentInterface
StatisticsLogger::connection($connection->app->id);
$this->replicator->subscribeToApp($connection->app->id);
return $this;
}
/**
* Throw a ConnectionsOverCapacity exception.
*
* @param int $connectionsCount
* @param int $capacity
* @return void
* @throws ConnectionsOverCapacity
*/
protected function throwExceptionIfOverCapacity(int $connectionsCount, int $capacity)
{
if ($connectionsCount >= $capacity) {
throw new ConnectionsOverCapacity;
}
}
/**
* Send the ConnectionsOverCapacity exception through
* the connection and close the channel.
*
* @param int $connectionsCount
* @param int $capacity
* @param ConnectionInterface $connection
* @return void
*/
protected function sendExceptionIfOverCapacity(int $connectionsCount, int $capacity, ConnectionInterface $connection)
{
if ($connectionsCount >= $capacity) {
$payload = json_encode((new ConnectionsOverCapacity)->getPayload());
tap($connection)->send($payload)->close();
}
}
}

View File

@ -59,9 +59,11 @@ class WebSocketsServiceProvider extends ServiceProvider
});
$this->app->singleton(ChannelManager::class, function () {
$channelManager = config('websockets.managers.channel', ArrayChannelManager::class);
$replicationDriver = config('websockets.replication.driver', 'local');
return new $channelManager;
$class = config("websockets.replication.{$replicationDriver}.channel_manager", ArrayChannelManager::class);
return new $class;
});
$this->app->singleton(AppManager::class, function () {
@ -69,12 +71,13 @@ class WebSocketsServiceProvider extends ServiceProvider
});
$this->app->singleton(StatisticsDriver::class, function () {
$driver = config('websockets.statistics.driver');
$driver = config('websockets.statistics.driver', 'local');
return $this->app->make(
config('websockets.statistics')[$driver]['driver']
??
\BeyondCode\LaravelWebSockets\Statistics\Drivers\DatabaseDriver::class
config(
"websockets.statistics.{$driver}.driver",
\BeyondCode\LaravelWebSockets\Statistics\Drivers\DatabaseDriver::class
)
);
});
}

View File

@ -52,6 +52,10 @@ class PresenceChannelReplicationTest extends TestCase
])
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-channel'])
->assertCalled('publish');
$this->assertNotNull(
$this->redis->hget('laravel_database_1234:presence-channel', $connection->socketId)
);
}
/** @test */
@ -130,7 +134,7 @@ class PresenceChannelReplicationTest extends TestCase
$this->getPublishClient()
->assertCalled('hset')
->assertcalledWithArgs('hgetall', ['laravel_database_1234:presence-channel'])
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-channel'])
->assertCalled('publish');
}
}

View File

@ -31,6 +31,8 @@ class ConnectionTest extends TestCase
/** @test */
public function app_can_not_exceed_maximum_capacity()
{
$this->runOnlyOnLocalReplication();
$this->app['config']->set('websockets.apps.0.capacity', 2);
$this->getConnectedWebSocketConnection(['test-channel']);
@ -39,6 +41,28 @@ class ConnectionTest extends TestCase
$this->getConnectedWebSocketConnection(['test-channel']);
}
/** @test */
public function app_can_not_exceed_maximum_capacity_on_redis_replication()
{
$this->runOnlyOnRedisReplication();
$this->redis->hdel('laravel_database_1234', 'connections');
$this->app['config']->set('websockets.apps.0.capacity', 2);
$this->getConnectedWebSocketConnection(['test-channel']);
$this->getConnectedWebSocketConnection(['test-channel']);
$this->getPublishClient()
->assertCalledWithArgsCount(2, 'hincrby', ['laravel_database_1234', 'connections', 1]);
$failedConnection = $this->getConnectedWebSocketConnection(['test-channel']);
$failedConnection
->assertSentEvent('pusher:error', ['data' => ['message' => 'Over capacity', 'code' => 4100]])
->assertClosed();
}
/** @test */
public function successful_connections_have_the_app_attached()
{

View File

@ -0,0 +1,73 @@
<?php
namespace BeyondCode\LaravelWebSockets\Tests\Dashboard;
use BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger;
use BeyondCode\LaravelWebSockets\Tests\Models\User;
use BeyondCode\LaravelWebSockets\Tests\TestCase;
class RedisStatisticsTest extends TestCase
{
/**
* {@inheritdoc}
*/
public function setUp(): void
{
parent::setUp();
$this->runOnlyOnRedisReplication();
}
/** @test */
public function can_get_statistics()
{
$connection = $this->getConnectedWebSocketConnection(['channel-1']);
$logger = new RedisStatisticsLogger(
$this->channelManager,
$this->statisticsDriver
);
$logger->webSocketMessage($connection->app->id);
$logger->apiMessage($connection->app->id);
$logger->connection($connection->app->id);
$logger->disconnection($connection->app->id);
$logger->save();
$this->actingAs(factory(User::class)->create())
->json('GET', route('laravel-websockets.statistics', ['appId' => '1234']))
->assertResponseOk()
->seeJsonStructure([
'peak_connections' => ['x', 'y'],
'websocket_message_count' => ['x', 'y'],
'api_message_count' => ['x', 'y'],
]);
}
/** @test */
public function cant_get_statistics_for_invalid_app_id()
{
$connection = $this->getConnectedWebSocketConnection(['channel-1']);
$logger = new RedisStatisticsLogger(
$this->channelManager,
$this->statisticsDriver
);
$logger->webSocketMessage($connection->app->id);
$logger->apiMessage($connection->app->id);
$logger->connection($connection->app->id);
$logger->disconnection($connection->app->id);
$logger->save();
$this->actingAs(factory(User::class)->create())
->json('GET', route('laravel-websockets.statistics', ['appId' => 'not_found']))
->seeJson([
'peak_connections' => ['x' => [], 'y' => []],
'websocket_message_count' => ['x' => [], 'y' => []],
'api_message_count' => ['x' => [], 'y' => []],
]);
}
}

View File

@ -8,6 +8,16 @@ use BeyondCode\LaravelWebSockets\Tests\TestCase;
class StatisticsTest extends TestCase
{
/**
* {@inheritdoc}
*/
public function setUp(): void
{
parent::setUp();
$this->runOnlyOnLocalReplication();
}
/** @test */
public function can_get_statistics()
{

View File

@ -63,7 +63,7 @@ class Connection implements ConnectionInterface
*
* @param string $name
* @param array $additionalParameters
* @return void
* @return $this
*/
public function assertSentEvent(string $name, array $additionalParameters = [])
{
@ -76,13 +76,15 @@ class Connection implements ConnectionInterface
foreach ($additionalParameters as $parameter => $value) {
PHPUnit::assertSame($event[$parameter], $value);
}
return $this;
}
/**
* Assert that an event got not sent.
*
* @param string $name
* @return void
* @return $this
*/
public function assertNotSentEvent(string $name)
{
@ -91,15 +93,19 @@ class Connection implements ConnectionInterface
PHPUnit::assertTrue(
is_null($event)
);
return $this;
}
/**
* Assert the connection is closed.
*
* @return void
* @return $this
*/
public function assertClosed()
{
PHPUnit::assertTrue($this->closed);
return $this;
}
}

View File

@ -12,7 +12,8 @@ class FakeMemoryStatisticsLogger extends MemoryStatisticsLogger
public function save()
{
foreach ($this->statistics as $appId => $statistic) {
$currentConnectionCount = $this->channelManager->getConnectionCount($appId);
$currentConnectionCount = $this->channelManager->getGlobalConnectionsCount($appId);
$statistic->reset($currentConnectionCount);
}
}

View File

@ -0,0 +1,24 @@
<?php
namespace BeyondCode\LaravelWebSockets\Tests\Mocks;
use BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger;
class FakeRedisStatisticsLogger extends RedisStatisticsLogger
{
/**
* Get app by id.
*
* @param mixed $appId
* @return array
*/
public function getForAppId($appId): array
{
return [
'app_id' => $appId,
'peak_connection_count' => $this->redis->hget($this->getHash($appId), 'peak_connection_count') ?: 0,
'websocket_message_count' => $this->redis->hget($this->getHash($appId), 'websocket_message_count') ?: 0,
'api_message_count' => $this->redis->hget($this->getHash($appId), 'api_message_count') ?: 0,
];
}
}

View File

@ -2,8 +2,11 @@
namespace BeyondCode\LaravelWebSockets\Tests\Mocks;
use Clue\React\Redis\Factory;
use Clue\React\Redis\LazyClient as BaseLazyClient;
use Illuminate\Support\Facades\Redis;
use PHPUnit\Framework\Assert as PHPUnit;
use React\EventLoop\LoopInterface;
class LazyClient extends BaseLazyClient
{
@ -21,6 +24,31 @@ class LazyClient extends BaseLazyClient
*/
protected $events = [];
/**
* The Redis manager instance.
*
* @var \Illuminate\Redis\RedisManager
*/
protected $redis;
/**
* The loop.
*
* @var \React\EventLoop\LoopInterface
*/
protected $loop;
/**
* {@inheritdoc}
*/
public function __construct($target, Factory $factory, LoopInterface $loop)
{
parent::__construct($target, $factory, $loop);
$this->loop = $loop;
$this->redis = Redis::connection();
}
/**
* {@inheritdoc}
*/
@ -28,7 +56,13 @@ class LazyClient extends BaseLazyClient
{
$this->calls[] = [$name, $args];
return parent::__call($name, $args);
if (! in_array($name, ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'onMessage'])) {
$this->redis->__call($name, $args);
}
return new PromiseResolver(
parent::__call($name, $args), $this->loop
);
}
/**
@ -88,6 +122,26 @@ class LazyClient extends BaseLazyClient
return $this;
}
/**
* Check if the method with args got called an amount of times.
*
* @param string $name
* @param array $args
* @return $this
*/
public function assertCalledWithArgsCount($times = 1, $name, array $args)
{
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) {
[$calledName, $calledArgs] = $function;
return $calledName === $name && $calledArgs === $args;
});
PHPUnit::assertCount($times, $total);
return $this;
}
/**
* Check if the method didn't call.
*
@ -135,6 +189,26 @@ class LazyClient extends BaseLazyClient
return $this;
}
/**
* Check if the method with args got called an amount of times.
*
* @param string $name
* @param array $args
* @return $this
*/
public function assertNotCalledWithArgsCount($times = 1, $name, array $args)
{
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) {
[$calledName, $calledArgs] = $function;
return $calledName === $name && $calledArgs === $args;
});
PHPUnit::assertNotCount($times, $total);
return $this;
}
/**
* Check if no function got called.
*

View File

@ -0,0 +1,67 @@
<?php
namespace BeyondCode\LaravelWebSockets\Tests\Mocks;
use Clue\React\Block;
use React\Promise\PromiseInterface;
class PromiseResolver implements PromiseInterface
{
/**
* The promise to resolve.
*
* @var \React\Promise\PromiseInterface
*/
protected $promise;
/**
* The loop.
*
* @var \React\EventLoop\LoopInterface
*/
protected $loop;
/**
* Initialize the promise resolver.
*
* @param PromiseInterface $promise
* @param LoopInterface $loop
* @return void
*/
public function __construct($promise, $loop)
{
$this->promise = $promise;
$this->loop = $loop;
}
/**
* Intercept the promise then() and run it in sync.
*
* @param callable|null $onFulfilled
* @param callable|null $onRejected
* @param callable|null $onProgress
* @return PromiseInterface
*/
public function then(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null)
{
$result = Block\await(
$this->promise, $this->loop
);
$onFulfilled($result);
return $this->promise;
}
/**
* Pass the calls to the promise.
*
* @param string $method
* @param array $args
* @return mixed
*/
public function __call($method, $args)
{
return call_user_func([$this->promise, $method], $args);
}
}

View File

@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\Tests\PubSub;
use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient;
use BeyondCode\LaravelWebSockets\Tests\Mocks\RedisFactory;
use BeyondCode\LaravelWebSockets\Tests\TestCase;
use Illuminate\Support\Facades\Redis;
use React\EventLoop\Factory as LoopFactory;
class RedisDriverTest extends TestCase
@ -17,6 +18,8 @@ class RedisDriverTest extends TestCase
parent::setUp();
$this->runOnlyOnRedisReplication();
Redis::hdel('laravel_database_1234', 'connections');
}
/** @test */
@ -80,4 +83,40 @@ class RedisDriverTest extends TestCase
$client->getSubscribeClient()
->assertEventDispatched('message');
}
/** @test */
public function redis_tracks_app_connections_count()
{
$connection = $this->getWebSocketConnection();
$this->pusherServer->onOpen($connection);
$this->getSubscribeClient()
->assertCalledWithArgs('subscribe', ['laravel_database_1234']);
$this->getPublishClient()
->assertCalledWithArgs('hincrby', ['laravel_database_1234', 'connections', 1]);
}
/** @test */
public function redis_tracks_app_connections_count_on_disconnect()
{
$connection = $this->getWebSocketConnection();
$this->pusherServer->onOpen($connection);
$this->getSubscribeClient()
->assertCalledWithArgs('subscribe', ['laravel_database_1234'])
->assertNotCalledWithArgs('unsubscribe', ['laravel_database_1234']);
$this->getPublishClient()
->assertCalledWithArgs('hincrby', ['laravel_database_1234', 'connections', 1]);
$this->pusherServer->onClose($connection);
$this->getPublishClient()
->assertCalledWithArgs('hincrby', ['laravel_database_1234', 'connections', -1]);
$this->assertEquals(0, Redis::hget('laravel_database_1234', 'connections'));
}
}

View File

@ -0,0 +1,102 @@
<?php
namespace BeyondCode\LaravelWebSockets\Tests\Statistics\Controllers;
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger;
use BeyondCode\LaravelWebSockets\Statistics\Models\WebSocketsStatisticsEntry;
use BeyondCode\LaravelWebSockets\Tests\TestCase;
class RedisStatisticsLoggerTest extends TestCase
{
/**
* {@inheritdoc}
*/
public function setUp(): void
{
parent::setUp();
$this->runOnlyOnRedisReplication();
StatisticsLogger::resetStatistics('1234', 0);
StatisticsLogger::resetAppTraces('1234');
$this->redis->hdel('laravel_database_1234', 'connections');
$this->getPublishClient()->resetAssertions();
}
/** @test */
public function it_counts_connections_on_redis_replication()
{
$connections = [];
$connections[] = $this->getConnectedWebSocketConnection(['channel-1']);
$connections[] = $this->getConnectedWebSocketConnection(['channel-1']);
$connections[] = $this->getConnectedWebSocketConnection(['channel-1']);
$this->getPublishClient()
->assertCalledWithArgsCount(6, 'sadd', ['laravel-websockets:apps', '1234'])
->assertCalledWithArgsCount(3, 'hincrby', ['laravel-websockets:app:1234', 'current_connection_count', 1])
->assertCalledWithArgsCount(3, 'hincrby', ['laravel-websockets:app:1234', 'websocket_message_count', 1]);
$this->pusherServer->onClose(array_pop($connections));
StatisticsLogger::save();
$this->getPublishClient()
->assertCalledWithArgs('hincrby', ['laravel-websockets:app:1234', 'current_connection_count', -1])
->assertCalledWithArgs('smembers', ['laravel-websockets:apps']);
}
/** @test */
public function it_counts_unique_connections_no_channel_subscriptions_on_redis()
{
$connections = [];
$connections[] = $this->getConnectedWebSocketConnection(['channel-1', 'channel-2']);
$connections[] = $this->getConnectedWebSocketConnection(['channel-1', 'channel-2']);
$connections[] = $this->getConnectedWebSocketConnection(['channel-1']);
$this->getPublishClient()
->assertCalledWithArgsCount(3, 'hincrby', ['laravel-websockets:app:1234', 'current_connection_count', 1])
->assertCalledWithArgsCount(5, 'hincrby', ['laravel-websockets:app:1234', 'websocket_message_count', 1]);
$this->pusherServer->onClose(array_pop($connections));
$this->pusherServer->onClose(array_pop($connections));
StatisticsLogger::save();
$this->getPublishClient()
->assertCalledWithArgsCount(2, 'hincrby', ['laravel-websockets:app:1234', 'current_connection_count', -1])
->assertCalledWithArgs('smembers', ['laravel-websockets:apps']);
}
/** @test */
public function it_counts_connections_with_redis_logger_with_no_data()
{
config(['cache.default' => 'redis']);
$logger = new RedisStatisticsLogger(
$this->channelManager,
$this->statisticsDriver
);
$logger->resetAppTraces('1');
$logger->resetAppTraces('1234');
$connection = $this->getConnectedWebSocketConnection(['channel-1']);
$logger->apiMessage($connection->app->id);
$logger->save();
$this->assertCount(1, WebSocketsStatisticsEntry::all());
$entry = WebSocketsStatisticsEntry::first();
$this->assertEquals(1, $entry->peak_connection_count);
$this->assertEquals(1, $entry->websocket_message_count);
$this->assertEquals(1, $entry->api_message_count);
}
}

View File

@ -5,12 +5,21 @@ namespace BeyondCode\LaravelWebSockets\Tests\Statistics\Controllers;
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger;
use BeyondCode\LaravelWebSockets\Statistics\Logger\NullStatisticsLogger;
use BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger;
use BeyondCode\LaravelWebSockets\Statistics\Models\WebSocketsStatisticsEntry;
use BeyondCode\LaravelWebSockets\Tests\TestCase;
class StatisticsLoggerTest extends TestCase
{
/**
* {@inheritdoc}
*/
public function setUp(): void
{
parent::setUp();
$this->runOnlyOnLocalReplication();
}
/** @test */
public function it_counts_connections()
{
@ -93,68 +102,4 @@ class StatisticsLoggerTest extends TestCase
$this->assertCount(0, WebSocketsStatisticsEntry::all());
}
/** @test */
public function it_counts_connections_with_redis_logger_with_no_data()
{
$this->runOnlyOnRedisReplication();
config(['cache.default' => 'redis']);
$connection = $this->getConnectedWebSocketConnection(['channel-1']);
$logger = new RedisStatisticsLogger(
$this->channelManager,
$this->statisticsDriver
);
$logger->resetAppTraces('1234');
$logger->webSocketMessage($connection->app->id);
$logger->apiMessage($connection->app->id);
$logger->connection($connection->app->id);
$logger->disconnection($connection->app->id);
$logger->save();
$this->assertCount(1, WebSocketsStatisticsEntry::all());
$entry = WebSocketsStatisticsEntry::first();
$this->assertEquals(1, $entry->peak_connection_count);
$this->assertEquals(1, $entry->websocket_message_count);
$this->assertEquals(1, $entry->api_message_count);
}
/** @test */
public function it_counts_connections_with_redis_logger_with_existing_data()
{
$this->runOnlyOnRedisReplication();
config(['cache.default' => 'redis']);
$connection = $this->getConnectedWebSocketConnection(['channel-1']);
$logger = new RedisStatisticsLogger(
$this->channelManager,
$this->statisticsDriver
);
$logger->resetStatistics('1234', 0);
$logger->webSocketMessage($connection->app->id);
$logger->apiMessage($connection->app->id);
$logger->connection($connection->app->id);
$logger->disconnection($connection->app->id);
$logger->save();
$this->assertCount(1, WebSocketsStatisticsEntry::all());
$entry = WebSocketsStatisticsEntry::first();
$this->assertEquals(1, $entry->peak_connection_count);
$this->assertEquals(1, $entry->websocket_message_count);
$this->assertEquals(1, $entry->api_message_count);
}
}

View File

@ -7,9 +7,11 @@ use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\Statistics\Drivers\StatisticsDriver;
use BeyondCode\LaravelWebSockets\Tests\Mocks\Connection;
use BeyondCode\LaravelWebSockets\Tests\Mocks\FakeMemoryStatisticsLogger;
use BeyondCode\LaravelWebSockets\Tests\Mocks\FakeRedisStatisticsLogger;
use BeyondCode\LaravelWebSockets\Tests\Mocks\Message;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use GuzzleHttp\Psr7\Request;
use Illuminate\Support\Facades\Redis;
use Orchestra\Testbench\BrowserKit\TestCase as BaseTestCase;
use Ratchet\ConnectionInterface;
use React\EventLoop\Factory as LoopFactory;
@ -37,6 +39,20 @@ abstract class TestCase extends BaseTestCase
*/
protected $statisticsDriver;
/**
* The Redis manager instance.
*
* @var \Illuminate\Redis\RedisManager
*/
protected $redis;
/**
* Get the loop instance.
*
* @var \React\EventLoop\LoopInterface
*/
protected $loop;
/**
* {@inheritdoc}
*/
@ -44,26 +60,25 @@ abstract class TestCase extends BaseTestCase
{
parent::setUp();
$this->loop = LoopFactory::create();
$this->resetDatabase();
$this->loadLaravelMigrations(['--database' => 'sqlite']);
$this->withFactories(__DIR__.'/database/factories');
$this->pusherServer = $this->app->make(config('websockets.handlers.websocket'));
$this->configurePubSub();
$this->channelManager = $this->app->make(ChannelManager::class);
$this->statisticsDriver = $this->app->make(StatisticsDriver::class);
StatisticsLogger::swap(new FakeMemoryStatisticsLogger(
$this->channelManager,
app(StatisticsDriver::class)
));
$this->configureStatisticsLogger();
$this->loadMigrationsFrom(__DIR__.'/../database/migrations');
$this->configurePubSub();
$this->pusherServer = $this->app->make(config('websockets.handlers.websocket'));
}
/**
@ -151,6 +166,7 @@ abstract class TestCase extends BaseTestCase
if (in_array($replicationDriver, ['redis'])) {
$app['config']->set('broadcasting.default', 'pusher');
$app['config']->set('cache.default', 'redis');
}
}
@ -254,20 +270,49 @@ abstract class TestCase extends BaseTestCase
*/
protected function configurePubSub()
{
$replicationDriver = config('websockets.replication.driver', 'local');
// Replace the publish and subscribe clients with a Mocked
// factory lazy instance on boot.
$this->app->singleton(ReplicationInterface::class, function () {
$driver = config('websockets.replication.driver', 'local');
$this->app->singleton(ReplicationInterface::class, function () use ($replicationDriver) {
$client = config(
"websockets.replication.{$driver}.client",
"websockets.replication.{$replicationDriver}.client",
\BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient::class
);
return (new $client)->boot(
LoopFactory::create(), Mocks\RedisFactory::class
$this->loop, Mocks\RedisFactory::class
);
});
if ($replicationDriver === 'redis') {
$this->redis = Redis::connection();
}
}
/**
* Configure the statistics logger for the right driver.
*
* @return void
*/
protected function configureStatisticsLogger()
{
$replicationDriver = getenv('REPLICATION_DRIVER') ?: 'local';
if ($replicationDriver === 'local') {
StatisticsLogger::swap(new FakeMemoryStatisticsLogger(
$this->channelManager,
app(StatisticsDriver::class)
));
}
if ($replicationDriver === 'redis') {
StatisticsLogger::swap(new FakeRedisStatisticsLogger(
$this->channelManager,
app(StatisticsDriver::class),
$this->app->make(ReplicationInterface::class)
));
}
}
protected function runOnlyOnRedisReplication()