Added async redis connection

This commit is contained in:
Alex Renoki 2020-09-25 22:16:06 +03:00
parent 40ee5fb805
commit c1bef4db5b
10 changed files with 677 additions and 18 deletions

View File

@ -41,6 +41,7 @@
"illuminate/broadcasting": "^6.3|^7.0|^8.0", "illuminate/broadcasting": "^6.3|^7.0|^8.0",
"illuminate/console": "^6.3|^7.0|^8.0", "illuminate/console": "^6.3|^7.0|^8.0",
"illuminate/http": "^6.3|^7.0|^8.0", "illuminate/http": "^6.3|^7.0|^8.0",
"illuminate/queue": "^6.3|^7.0|^8.0",
"illuminate/routing": "^6.3|^7.0|^8.0", "illuminate/routing": "^6.3|^7.0|^8.0",
"illuminate/support": "^6.3|^7.0|^8.0", "illuminate/support": "^6.3|^7.0|^8.0",
"pusher/pusher-php-server": "^3.0|^4.0", "pusher/pusher-php-server": "^3.0|^4.0",

View File

@ -137,7 +137,7 @@ return [
'redis' => [ 'redis' => [
'connection' => 'default', 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
/* /*
|-------------------------------------------------------------------------- |--------------------------------------------------------------------------

View File

@ -40,3 +40,29 @@ You can set the connection name to the Redis database under `redis`:
``` ```
The connections can be found in your `config/database.php` file, under the `redis` key. The connections can be found in your `config/database.php` file, under the `redis` key.
## Async Redis Queue
The default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server.
To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example.
Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver:
```php
'connections' => [
'async-redis' => [
'driver' => 'async-redis',
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
],
]
```
Also, make sure that the default queue driver is set to `async-redis`:
```
QUEUE_CONNECTION=async-redis
```

View File

@ -519,6 +519,16 @@ class RedisChannelManager extends LocalChannelManager
return $this->publishClient; return $this->publishClient;
} }
/**
* Get the Redis client used by other classes.
*
* @return Client
*/
public function getRedisClient()
{
return $this->getPublishClient();
}
/** /**
* Get the unique identifier for the server. * Get the unique identifier for the server.
* *

View File

@ -0,0 +1,24 @@
<?php
namespace BeyondCode\LaravelWebSockets\Queue;
use Illuminate\Queue\Connectors\RedisConnector;
class AsyncRedisConnector extends RedisConnector
{
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new AsyncRedisQueue(
$this->redis, $config['queue'],
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60,
$config['block_for'] ?? null
);
}
}

View File

@ -0,0 +1,26 @@
<?php
namespace BeyondCode\LaravelWebSockets\Queue;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Queue\RedisQueue;
class AsyncRedisQueue extends RedisQueue
{
/**
* Get the connection for the queue.
*
* @return \BeyondCode\LaravelWebSockets\Contracts\ChannelManager|\Illuminate\Redis\Connections\Connection
*/
public function getConnection()
{
$channelManager = $this->container->bound(ChannelManager::Class)
? $this->container->make(ChannelManager::class)
: null;
return $channelManager && method_exists($channelManager, 'getRedisClient')
? $channelManager->getRedisClient()
: parent::getConnection();
}
}

View File

@ -11,6 +11,7 @@ use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowStatistics;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard;
use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Server\Router;
use Illuminate\Support\Facades\Gate; use Illuminate\Support\Facades\Gate;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\Facades\Route; use Illuminate\Support\Facades\Route;
use Illuminate\Support\ServiceProvider; use Illuminate\Support\ServiceProvider;
@ -36,6 +37,12 @@ class WebSocketsServiceProvider extends ServiceProvider
__DIR__.'/../database/migrations/0000_00_00_000000_rename_statistics_counters.php' => database_path('migrations/0000_00_00_000000_rename_statistics_counters.php'), __DIR__.'/../database/migrations/0000_00_00_000000_rename_statistics_counters.php' => database_path('migrations/0000_00_00_000000_rename_statistics_counters.php'),
], 'migrations'); ], 'migrations');
$this->registerAsyncRedisQueueDriver();
$this->registerRouter();
$this->registerManagers();
$this->registerStatistics(); $this->registerStatistics();
$this->registerDashboard(); $this->registerDashboard();
@ -50,8 +57,19 @@ class WebSocketsServiceProvider extends ServiceProvider
*/ */
public function register() public function register()
{ {
$this->registerRouter(); //
$this->registerManagers(); }
/**
* Register the async, non-blocking Redis queue driver.
*
* @return void
*/
protected function registerAsyncRedisQueueDriver()
{
Queue::extend('async-redis', function () {
return new Queue\AsyncRedisConnector($this->app['redis']);
});
} }
/** /**

273
tests/LocalQueueTest.php Normal file
View File

@ -0,0 +1,273 @@
<?php
namespace BeyondCode\LaravelWebSockets\Test;
use Illuminate\Container\Container;
use Illuminate\Contracts\Redis\Factory;
use Illuminate\Queue\LuaScripts;
use Illuminate\Queue\Queue;
use Illuminate\Queue\RedisQueue;
use Illuminate\Queue\Jobs\RedisJob;
use Illuminate\Support\Carbon;
use Illuminate\Support\Str;
use Mockery as m;
use stdClass;
class LocalQueueTest extends TestCase
{
/**
* {@inheritdoc}
*/
public function setUp(): void
{
parent::setUp();
$this->runOnlyOnLocalReplication();
}
/**
* {@inheritdoc}
*/
protected function tearDown(): void
{
m::close();
}
public function testPushProperlyPushesJobOntoRedis()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('eval')
->once()
->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]));
$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
Str::createUuidsNormally();
}
public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('eval')
->once()
->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'custom' => 'taylor', 'id' => 'foo', 'attempts' => 0]));
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
return ['custom' => 'taylor'];
});
$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
Queue::createPayloadUsing(null);
Str::createUuidsNormally();
}
public function testPushProperlyPushesJobOntoRedisWithTwoCustomPayloadHook()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('eval')
->once()
->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'custom' => 'taylor', 'bar' => 'foo', 'id' => 'foo', 'attempts' => 0]));
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
return ['custom' => 'taylor'];
});
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
return ['bar' => 'foo'];
});
$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
Queue::createPayloadUsing(null);
Str::createUuidsNormally();
}
public function testDelayedPushProperlyPushesJobOntoRedis()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['availableAt', 'getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$queue->expects($this->once())
->method('availableAt')
->with(1)
->willReturn(2);
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('zadd')
->once()
->with('queues:default:delayed', 2, json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]));
$id = $queue->later(1, 'foo', ['data']);
$this->assertSame('foo', $id);
Str::createUuidsNormally();
}
public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$date = Carbon::now();
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['availableAt', 'getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$queue->expects($this->once())
->method('availableAt')
->with($date)
->willReturn(2);
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('zadd')
->once()
->with('queues:default:delayed', 2, json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]));
$queue->later($date, 'foo', ['data']);
Str::createUuidsNormally();
}
public function testFireProperlyCallsTheJobHandler()
{
$job = $this->getJob();
$job->getContainer()
->shouldReceive('make')
->once()->with('foo')
->andReturn($handler = m::mock(stdClass::class));
$handler->shouldReceive('fire')
->once()
->with($job, ['data']);
$job->fire();
}
public function testDeleteRemovesTheJobFromRedis()
{
$job = $this->getJob();
$job->getRedisQueue()
->shouldReceive('deleteReserved')
->once()
->with('default', $job);
$job->delete();
}
public function testReleaseProperlyReleasesJobOntoRedis()
{
$job = $this->getJob();
$job->getRedisQueue()
->shouldReceive('deleteAndRelease')
->once()
->with('default', $job, 1);
$job->release(1);
}
protected function getJob()
{
return new RedisJob(
m::mock(Container::class),
m::mock(RedisQueue::class),
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 1]),
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2]),
'connection-name',
'default'
);
}
}

273
tests/RedisQueueTest.php Normal file
View File

@ -0,0 +1,273 @@
<?php
namespace BeyondCode\LaravelWebSockets\Test;
use Illuminate\Container\Container;
use Illuminate\Contracts\Redis\Factory;
use Illuminate\Queue\LuaScripts;
use Illuminate\Queue\Queue;
use Illuminate\Queue\RedisQueue;
use Illuminate\Queue\Jobs\RedisJob;
use Illuminate\Support\Carbon;
use Illuminate\Support\Str;
use Mockery as m;
use stdClass;
class RedisQueueTest extends TestCase
{
/**
* {@inheritdoc}
*/
public function setUp(): void
{
parent::setUp();
$this->runOnlyOnRedisReplication();
}
/**
* {@inheritdoc}
*/
protected function tearDown(): void
{
m::close();
}
public function testPushProperlyPushesJobOntoRedis()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('eval')
->once()
->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]));
$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
Str::createUuidsNormally();
}
public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('eval')
->once()
->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'custom' => 'taylor', 'id' => 'foo', 'attempts' => 0]));
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
return ['custom' => 'taylor'];
});
$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
Queue::createPayloadUsing(null);
Str::createUuidsNormally();
}
public function testPushProperlyPushesJobOntoRedisWithTwoCustomPayloadHook()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('eval')
->once()
->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'custom' => 'taylor', 'bar' => 'foo', 'id' => 'foo', 'attempts' => 0]));
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
return ['custom' => 'taylor'];
});
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
return ['bar' => 'foo'];
});
$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
Queue::createPayloadUsing(null);
Str::createUuidsNormally();
}
public function testDelayedPushProperlyPushesJobOntoRedis()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['availableAt', 'getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$queue->expects($this->once())
->method('availableAt')
->with(1)
->willReturn(2);
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('zadd')
->once()
->with('queues:default:delayed', 2, json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]));
$id = $queue->later(1, 'foo', ['data']);
$this->assertSame('foo', $id);
Str::createUuidsNormally();
}
public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
{
$uuid = Str::uuid();
Str::createUuidsUsing(function () use ($uuid) {
return $uuid;
});
$date = Carbon::now();
$queue = $this->getMockBuilder(RedisQueue::class)
->setMethods(['availableAt', 'getRandomId'])
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
->getMock();
$queue->expects($this->once())
->method('getRandomId')
->willReturn('foo');
$queue->expects($this->once())
->method('availableAt')
->with($date)
->willReturn(2);
$redis->shouldReceive('connection')
->once()
->andReturn($redis);
$redis->shouldReceive('zadd')
->once()
->with('queues:default:delayed', 2, json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]));
$queue->later($date, 'foo', ['data']);
Str::createUuidsNormally();
}
public function testFireProperlyCallsTheJobHandler()
{
$job = $this->getJob();
$job->getContainer()
->shouldReceive('make')
->once()->with('foo')
->andReturn($handler = m::mock(stdClass::class));
$handler->shouldReceive('fire')
->once()
->with($job, ['data']);
$job->fire();
}
public function testDeleteRemovesTheJobFromRedis()
{
$job = $this->getJob();
$job->getRedisQueue()
->shouldReceive('deleteReserved')
->once()
->with('default', $job);
$job->delete();
}
public function testReleaseProperlyReleasesJobOntoRedis()
{
$job = $this->getJob();
$job->getRedisQueue()
->shouldReceive('deleteAndRelease')
->once()
->with('default', $job, 1);
$job->release(1);
}
protected function getJob()
{
return new RedisJob(
m::mock(Container::class),
m::mock(RedisQueue::class),
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 1]),
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2]),
'connection-name',
'default'
);
}
}

View File

@ -124,21 +124,29 @@ abstract class TestCase extends Orchestra
'prefix' => '', 'prefix' => '',
]); ]);
$app['config']->set( $app['config']->set('broadcasting.connections.websockets', [
'broadcasting.connections.websockets', [ 'driver' => 'pusher',
'driver' => 'pusher', 'key' => 'TestKey',
'key' => 'TestKey', 'secret' => 'TestSecret',
'secret' => 'TestSecret', 'app_id' => '1234',
'app_id' => '1234', 'options' => [
'options' => [ 'cluster' => 'mt1',
'cluster' => 'mt1', 'encrypted' => true,
'encrypted' => true, 'host' => '127.0.0.1',
'host' => '127.0.0.1', 'port' => 6001,
'port' => 6001, 'scheme' => 'http',
'scheme' => 'http', ],
], ]);
]
); $app['config']->set('queue.default', 'async-redis');
$app['config']->set('queue.connections.async-redis', [
'driver' => 'async-redis',
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
]);
$app['config']->set('auth.providers.users.model', Models\User::class); $app['config']->set('auth.providers.users.model', Models\User::class);