Using the Redis non-blocking client.

This commit is contained in:
Alex Renoki 2020-09-04 09:47:23 +03:00
parent 21db4b3252
commit a45c0bf9cc
7 changed files with 117 additions and 52 deletions

View File

@ -194,7 +194,7 @@ class RedisClient extends LocalClient
{ {
$this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]); $this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]);
$this->redis->hincrby($this->getTopicName($appId), 'connections', 1); $this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', 1]);
return true; return true;
} }
@ -209,7 +209,7 @@ class RedisClient extends LocalClient
{ {
$this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]); $this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]);
$this->redis->hincrby($this->getTopicName($appId), 'connections', -1); $this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', -1]);
return true; return true;
} }
@ -226,7 +226,7 @@ class RedisClient extends LocalClient
*/ */
public function joinChannel($appId, string $channel, string $socketId, string $data) public function joinChannel($appId, string $channel, string $socketId, string $data)
{ {
$this->redis->hset($this->getTopicName($appId, $channel), $socketId, $data); $this->publishClient->__call('hset', [$this->getTopicName($appId, $channel), $socketId, $data]);
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [
'channel' => $channel, 'channel' => $channel,
@ -248,7 +248,7 @@ class RedisClient extends LocalClient
*/ */
public function leaveChannel($appId, string $channel, string $socketId) public function leaveChannel($appId, string $channel, string $socketId)
{ {
$this->redis->hdel($this->getTopicName($appId, $channel), $socketId); $this->publishClient->__call('hdel', [$this->getTopicName($appId, $channel), $socketId]);
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
'channel' => $channel, 'channel' => $channel,
@ -307,7 +307,7 @@ class RedisClient extends LocalClient
{ {
// Use the in-built Redis manager to avoid async run. // Use the in-built Redis manager to avoid async run.
return $this->redis->hget($this->getTopicName($appId), 'connections') ?: 0; return $this->publishClient->hget($this->getTopicName($appId), 'connections') ?: 0;
} }
/** /**

View File

@ -4,17 +4,10 @@ namespace BeyondCode\LaravelWebSockets\Tests\Channels;
use BeyondCode\LaravelWebSockets\Tests\Mocks\Message; use BeyondCode\LaravelWebSockets\Tests\Mocks\Message;
use BeyondCode\LaravelWebSockets\Tests\TestCase; use BeyondCode\LaravelWebSockets\Tests\TestCase;
use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Redis;
class PresenceChannelReplicationTest extends TestCase class PresenceChannelReplicationTest extends TestCase
{ {
/**
* The Redis manager instance.
*
* @var \Illuminate\Redis\RedisManager
*/
protected $redis;
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
@ -23,8 +16,6 @@ class PresenceChannelReplicationTest extends TestCase
parent::setUp(); parent::setUp();
$this->runOnlyOnRedisReplication(); $this->runOnlyOnRedisReplication();
$this->redis = Cache::getRedis();
} }
/** @test */ /** @test */
@ -55,7 +46,7 @@ class PresenceChannelReplicationTest extends TestCase
$this->pusherServer->onMessage($connection, $message); $this->pusherServer->onMessage($connection, $message);
$this->getPublishClient() $this->getPublishClient()
->assertNotCalledWithArgs('hset', [ ->assertCalledWithArgs('hset', [
'laravel_database_1234:presence-channel', 'laravel_database_1234:presence-channel',
$connection->socketId, $connection->socketId,
json_encode($channelData), json_encode($channelData),
@ -64,7 +55,7 @@ class PresenceChannelReplicationTest extends TestCase
->assertCalled('publish'); ->assertCalled('publish');
$this->assertNotNull( $this->assertNotNull(
$this->redis->hget('laravel_database_1234:presence-channel', $connection->socketId) Redis::hget('laravel_database_1234:presence-channel', $connection->socketId)
); );
} }
@ -96,7 +87,7 @@ class PresenceChannelReplicationTest extends TestCase
->assertEventDispatched('message'); ->assertEventDispatched('message');
$this->getPublishClient() $this->getPublishClient()
->assertNotCalled('hset') ->assertCalled('hset')
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-channel']) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-channel'])
->assertCalled('publish'); ->assertCalled('publish');
@ -114,7 +105,7 @@ class PresenceChannelReplicationTest extends TestCase
$this->pusherServer->onMessage($connection, $message); $this->pusherServer->onMessage($connection, $message);
$this->getPublishClient() $this->getPublishClient()
->assertNotCalled('hdel') ->assertCalled('hdel')
->assertCalled('publish'); ->assertCalled('publish');
} }
@ -143,8 +134,8 @@ class PresenceChannelReplicationTest extends TestCase
$this->pusherServer->onMessage($connection, $message); $this->pusherServer->onMessage($connection, $message);
$this->getPublishClient() $this->getPublishClient()
->assertNotCalled('hset') ->assertCalled('hset')
->assertcalledWithArgs('hgetall', ['laravel_database_1234:presence-channel']) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-channel'])
->assertCalled('publish'); ->assertCalled('publish');
} }
} }

View File

@ -7,7 +7,7 @@ use BeyondCode\LaravelWebSockets\Tests\Mocks\Message;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\OriginNotAllowed; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\OriginNotAllowed;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\UnknownAppKey; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\UnknownAppKey;
use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Redis;
class ConnectionTest extends TestCase class ConnectionTest extends TestCase
{ {
@ -47,15 +47,18 @@ class ConnectionTest extends TestCase
{ {
$this->runOnlyOnRedisReplication(); $this->runOnlyOnRedisReplication();
$redis = Cache::getRedis(); Redis::hdel('laravel_database_1234', 'connections');
$redis->hdel('laravel_database_1234', 'connections');
$this->app['config']->set('websockets.apps.0.capacity', 2); $this->app['config']->set('websockets.apps.0.capacity', 2);
$this->getConnectedWebSocketConnection(['test-channel']); $this->getConnectedWebSocketConnection(['test-channel']);
$this->getConnectedWebSocketConnection(['test-channel']); $this->getConnectedWebSocketConnection(['test-channel']);
$this->getPublishClient()
->assertCalledWithArgsCount(2, 'hincrby', ['laravel_database_1234', 'connections', 1]);
$this->expectException(ConnectionsOverCapacity::class); $this->expectException(ConnectionsOverCapacity::class);
$this->getConnectedWebSocketConnection(['test-channel']); $this->getConnectedWebSocketConnection(['test-channel']);
} }

View File

@ -48,7 +48,7 @@ class FetchChannelsReplicationTest extends TestCase
->assertEventDispatched('message'); ->assertEventDispatched('message');
$this->getPublishClient() $this->getPublishClient()
->assertNotCalled('hset') ->assertCalled('hset')
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-channel']) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-channel'])
->assertCalled('publish') ->assertCalled('publish')
->assertCalled('multi') ->assertCalled('multi')
@ -88,7 +88,7 @@ class FetchChannelsReplicationTest extends TestCase
->assertEventDispatched('message'); ->assertEventDispatched('message');
$this->getPublishClient() $this->getPublishClient()
->assertNotCalled('hset') ->assertCalled('hset')
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-global.1']) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-global.1'])
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-global.2']) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-global.2'])
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-notglobal.2']) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-notglobal.2'])
@ -133,7 +133,7 @@ class FetchChannelsReplicationTest extends TestCase
->assertEventDispatched('message'); ->assertEventDispatched('message');
$this->getPublishClient() $this->getPublishClient()
->assertNotCalled('hset') ->assertCalled('hset')
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-global.1']) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-global.1'])
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-global.2']) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-global.2'])
->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-notglobal.2']) ->assertCalledWithArgs('hgetall', ['laravel_database_1234:presence-notglobal.2'])

View File

@ -3,7 +3,10 @@
namespace BeyondCode\LaravelWebSockets\Tests\Mocks; namespace BeyondCode\LaravelWebSockets\Tests\Mocks;
use Clue\React\Redis\LazyClient as BaseLazyClient; use Clue\React\Redis\LazyClient as BaseLazyClient;
use Clue\React\Redis\Factory;
use Illuminate\Support\Facades\Cache;
use PHPUnit\Framework\Assert as PHPUnit; use PHPUnit\Framework\Assert as PHPUnit;
use React\EventLoop\LoopInterface;
class LazyClient extends BaseLazyClient class LazyClient extends BaseLazyClient
{ {
@ -21,6 +24,23 @@ class LazyClient extends BaseLazyClient
*/ */
protected $events = []; protected $events = [];
/**
* The Redis manager instance.
*
* @var \Illuminate\Redis\RedisManager
*/
protected $redis;
/**
* {@inheritdoc}
*/
public function __construct($target, Factory $factory, LoopInterface $loop)
{
parent::__construct($target, $factory, $loop);
$this->redis = Cache::getRedis();
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
@ -28,6 +48,10 @@ class LazyClient extends BaseLazyClient
{ {
$this->calls[] = [$name, $args]; $this->calls[] = [$name, $args];
if (! in_array($name, ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'onMessage'])) {
$this->redis->__call($name, $args);
}
return parent::__call($name, $args); return parent::__call($name, $args);
} }
@ -88,6 +112,26 @@ class LazyClient extends BaseLazyClient
return $this; return $this;
} }
/**
* Check if the method with args got called an amount of times.
*
* @param string $name
* @param array $args
* @return $this
*/
public function assertCalledWithArgsCount($times = 1, $name, array $args)
{
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) {
[$calledName, $calledArgs] = $function;
return $calledName === $name && $calledArgs === $args;
});
PHPUnit::assertCount($times, $total);
return $this;
}
/** /**
* Check if the method didn't call. * Check if the method didn't call.
* *
@ -135,6 +179,26 @@ class LazyClient extends BaseLazyClient
return $this; return $this;
} }
/**
* Check if the method with args got called an amount of times.
*
* @param string $name
* @param array $args
* @return $this
*/
public function assertNotCalledWithArgsCount($times = 1, $name, array $args)
{
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) {
[$calledName, $calledArgs] = $function;
return $calledName === $name && $calledArgs === $args;
});
PHPUnit::assertNotCount($times, $total);
return $this;
}
/** /**
* Check if no function got called. * Check if no function got called.
* *

View File

@ -5,18 +5,11 @@ namespace BeyondCode\LaravelWebSockets\Tests\PubSub;
use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient;
use BeyondCode\LaravelWebSockets\Tests\Mocks\RedisFactory; use BeyondCode\LaravelWebSockets\Tests\Mocks\RedisFactory;
use BeyondCode\LaravelWebSockets\Tests\TestCase; use BeyondCode\LaravelWebSockets\Tests\TestCase;
use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Redis;
use React\EventLoop\Factory as LoopFactory; use React\EventLoop\Factory as LoopFactory;
class RedisDriverTest extends TestCase class RedisDriverTest extends TestCase
{ {
/**
* The Redis manager instance.
*
* @var \Illuminate\Redis\RedisManager
*/
protected $redis;
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
@ -26,9 +19,7 @@ class RedisDriverTest extends TestCase
$this->runOnlyOnRedisReplication(); $this->runOnlyOnRedisReplication();
$this->redis = Cache::getRedis(); Redis::hdel('laravel_database_1234', 'connections');
$this->redis->hdel('laravel_database_1234', 'connections');
} }
/** @test */ /** @test */
@ -104,9 +95,7 @@ class RedisDriverTest extends TestCase
->assertCalledWithArgs('subscribe', ['laravel_database_1234']); ->assertCalledWithArgs('subscribe', ['laravel_database_1234']);
$this->getPublishClient() $this->getPublishClient()
->assertNothingCalled(); ->assertCalledWithArgs('hincrby', ['laravel_database_1234', 'connections', 1]);
$this->assertEquals(1, $this->redis->hget('laravel_database_1234', 'connections'));
} }
/** @test */ /** @test */
@ -121,15 +110,13 @@ class RedisDriverTest extends TestCase
->assertNotCalledWithArgs('unsubscribe', ['laravel_database_1234']); ->assertNotCalledWithArgs('unsubscribe', ['laravel_database_1234']);
$this->getPublishClient() $this->getPublishClient()
->assertNothingCalled(); ->assertCalledWithArgs('hincrby', ['laravel_database_1234', 'connections', 1]);
$this->assertEquals(1, $this->redis->hget('laravel_database_1234', 'connections'));
$this->pusherServer->onClose($connection); $this->pusherServer->onClose($connection);
$this->getPublishClient() $this->getPublishClient()
->assertNothingCalled(); ->assertCalledWithArgs('hincrby', ['laravel_database_1234', 'connections', -1]);
$this->assertEquals(0, $this->redis->hget('laravel_database_1234', 'connections')); $this->assertEquals(0, Redis::hget('laravel_database_1234', 'connections'));
} }
} }

View File

@ -8,13 +8,15 @@ use BeyondCode\LaravelWebSockets\Statistics\Logger\NullStatisticsLogger;
use BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger; use BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger;
use BeyondCode\LaravelWebSockets\Statistics\Models\WebSocketsStatisticsEntry; use BeyondCode\LaravelWebSockets\Statistics\Models\WebSocketsStatisticsEntry;
use BeyondCode\LaravelWebSockets\Tests\TestCase; use BeyondCode\LaravelWebSockets\Tests\TestCase;
use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Redis;
class StatisticsLoggerTest extends TestCase class StatisticsLoggerTest extends TestCase
{ {
/** @test */ /** @test */
public function it_counts_connections() public function it_counts_connections()
{ {
$this->runOnlyOnLocalReplication();
$connections = []; $connections = [];
$connections[] = $this->getConnectedWebSocketConnection(['channel-1']); $connections[] = $this->getConnectedWebSocketConnection(['channel-1']);
@ -30,6 +32,26 @@ class StatisticsLoggerTest extends TestCase
$this->assertEquals(2, StatisticsLogger::getForAppId(1234)['peak_connection_count']); $this->assertEquals(2, StatisticsLogger::getForAppId(1234)['peak_connection_count']);
} }
/** @test */
public function it_counts_connections_on_redis_replication()
{
$this->runOnlyOnRedisReplication();
$connections = [];
$connections[] = $this->getConnectedWebSocketConnection(['channel-1']);
$connections[] = $this->getConnectedWebSocketConnection(['channel-1']);
$connections[] = $this->getConnectedWebSocketConnection(['channel-1']);
$this->assertEquals(3, StatisticsLogger::getForAppId(1234)['peak_connection_count']);
$this->pusherServer->onClose(array_pop($connections));
StatisticsLogger::save();
$this->assertEquals(3, StatisticsLogger::getForAppId(1234)['peak_connection_count']);
}
/** @test */ /** @test */
public function it_counts_unique_connections_no_channel_subscriptions() public function it_counts_unique_connections_no_channel_subscriptions()
{ {
@ -56,9 +78,7 @@ class StatisticsLoggerTest extends TestCase
{ {
$this->runOnlyOnRedisReplication(); $this->runOnlyOnRedisReplication();
$redis = Cache::getRedis(); Redis::hdel('laravel_database_1234', 'connections');
$redis->hdel('laravel_database_1234', 'connections');
$connections = []; $connections = [];
@ -73,7 +93,7 @@ class StatisticsLoggerTest extends TestCase
StatisticsLogger::save(); StatisticsLogger::save();
$this->assertEquals(1, StatisticsLogger::getForAppId(1234)['peak_connection_count']); $this->assertEquals(3, StatisticsLogger::getForAppId(1234)['peak_connection_count']);
} }
/** @test */ /** @test */