Merge pull request #552 from beyondcode/feature/async-queue
[2.x] Async Redis queue driver
This commit is contained in:
commit
9cb83dccfd
|
|
@ -41,6 +41,7 @@
|
|||
"illuminate/broadcasting": "^6.3|^7.0|^8.0",
|
||||
"illuminate/console": "^6.3|^7.0|^8.0",
|
||||
"illuminate/http": "^6.3|^7.0|^8.0",
|
||||
"illuminate/queue": "^6.3|^7.0|^8.0",
|
||||
"illuminate/routing": "^6.3|^7.0|^8.0",
|
||||
"illuminate/support": "^6.3|^7.0|^8.0",
|
||||
"pusher/pusher-php-server": "^3.0|^4.0",
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ return [
|
|||
|
||||
'redis' => [
|
||||
|
||||
'connection' => 'default',
|
||||
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
|
||||
|
||||
/*
|
||||
|--------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -40,3 +40,29 @@ You can set the connection name to the Redis database under `redis`:
|
|||
```
|
||||
|
||||
The connections can be found in your `config/database.php` file, under the `redis` key.
|
||||
|
||||
## Async Redis Queue
|
||||
|
||||
The default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server.
|
||||
|
||||
To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example.
|
||||
|
||||
Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver:
|
||||
|
||||
```php
|
||||
'connections' => [
|
||||
'async-redis' => [
|
||||
'driver' => 'async-redis',
|
||||
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
|
||||
'queue' => env('REDIS_QUEUE', 'default'),
|
||||
'retry_after' => 90,
|
||||
'block_for' => null,
|
||||
],
|
||||
]
|
||||
```
|
||||
|
||||
Also, make sure that the default queue driver is set to `async-redis`:
|
||||
|
||||
```
|
||||
QUEUE_CONNECTION=async-redis
|
||||
```
|
||||
|
|
|
|||
|
|
@ -519,6 +519,16 @@ class RedisChannelManager extends LocalChannelManager
|
|||
return $this->publishClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Redis client used by other classes.
|
||||
*
|
||||
* @return Client
|
||||
*/
|
||||
public function getRedisClient()
|
||||
{
|
||||
return $this->getPublishClient();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the unique identifier for the server.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -0,0 +1,24 @@
|
|||
<?php
|
||||
|
||||
namespace BeyondCode\LaravelWebSockets\Queue;
|
||||
|
||||
use Illuminate\Queue\Connectors\RedisConnector;
|
||||
|
||||
class AsyncRedisConnector extends RedisConnector
|
||||
{
|
||||
/**
|
||||
* Establish a queue connection.
|
||||
*
|
||||
* @param array $config
|
||||
* @return \Illuminate\Contracts\Queue\Queue
|
||||
*/
|
||||
public function connect(array $config)
|
||||
{
|
||||
return new AsyncRedisQueue(
|
||||
$this->redis, $config['queue'],
|
||||
$config['connection'] ?? $this->connection,
|
||||
$config['retry_after'] ?? 60,
|
||||
$config['block_for'] ?? null
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
<?php
|
||||
|
||||
namespace BeyondCode\LaravelWebSockets\Queue;
|
||||
|
||||
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
|
||||
use Illuminate\Queue\RedisQueue;
|
||||
|
||||
class AsyncRedisQueue extends RedisQueue
|
||||
{
|
||||
/**
|
||||
* Get the connection for the queue.
|
||||
*
|
||||
* @return \BeyondCode\LaravelWebSockets\Contracts\ChannelManager|\Illuminate\Redis\Connections\Connection
|
||||
*/
|
||||
public function getConnection()
|
||||
{
|
||||
$channelManager = $this->container->bound(ChannelManager::class)
|
||||
? $this->container->make(ChannelManager::class)
|
||||
: null;
|
||||
|
||||
return $channelManager && method_exists($channelManager, 'getRedisClient')
|
||||
? $channelManager->getRedisClient()
|
||||
: parent::getConnection();
|
||||
}
|
||||
}
|
||||
|
|
@ -11,6 +11,7 @@ use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowStatistics;
|
|||
use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard;
|
||||
use BeyondCode\LaravelWebSockets\Server\Router;
|
||||
use Illuminate\Support\Facades\Gate;
|
||||
use Illuminate\Support\Facades\Queue;
|
||||
use Illuminate\Support\Facades\Route;
|
||||
use Illuminate\Support\ServiceProvider;
|
||||
|
||||
|
|
@ -36,6 +37,12 @@ class WebSocketsServiceProvider extends ServiceProvider
|
|||
__DIR__.'/../database/migrations/0000_00_00_000000_rename_statistics_counters.php' => database_path('migrations/0000_00_00_000000_rename_statistics_counters.php'),
|
||||
], 'migrations');
|
||||
|
||||
$this->registerAsyncRedisQueueDriver();
|
||||
|
||||
$this->registerRouter();
|
||||
|
||||
$this->registerManagers();
|
||||
|
||||
$this->registerStatistics();
|
||||
|
||||
$this->registerDashboard();
|
||||
|
|
@ -50,8 +57,19 @@ class WebSocketsServiceProvider extends ServiceProvider
|
|||
*/
|
||||
public function register()
|
||||
{
|
||||
$this->registerRouter();
|
||||
$this->registerManagers();
|
||||
//
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the async, non-blocking Redis queue driver.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
protected function registerAsyncRedisQueueDriver()
|
||||
{
|
||||
Queue::extend('async-redis', function () {
|
||||
return new Queue\AsyncRedisConnector($this->app['redis']);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,213 @@
|
|||
<?php
|
||||
|
||||
namespace BeyondCode\LaravelWebSockets\Test;
|
||||
|
||||
use BeyondCode\LaravelWebSockets\Queue\AsyncRedisConnector;
|
||||
use Illuminate\Queue\Queue;
|
||||
use Illuminate\Support\InteractsWithTime;
|
||||
use Mockery as m;
|
||||
|
||||
class AsyncRedisQueueTest extends TestCase
|
||||
{
|
||||
use InteractsWithTime;
|
||||
|
||||
/**
|
||||
* The testing queue for Redis.
|
||||
*
|
||||
* @var \Illuminate\Queue\RedisQueue
|
||||
*/
|
||||
private $queue;
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function setUp(): void
|
||||
{
|
||||
parent::setUp();
|
||||
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
||||
$connector = new AsyncRedisConnector($this->app['redis'], 'default');
|
||||
|
||||
$this->queue = $connector->connect([
|
||||
'queue' => 'default',
|
||||
'retry_after' => 60,
|
||||
'block_for' => null,
|
||||
]);
|
||||
|
||||
$this->queue->setContainer($this->app);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function tearDown(): void
|
||||
{
|
||||
parent::tearDown();
|
||||
|
||||
m::close();
|
||||
}
|
||||
|
||||
public function test_expired_jobs_are_pushed_with_async_and_popped_with_sync()
|
||||
{
|
||||
$jobs = [
|
||||
new RedisQueueIntegrationTestJob(0),
|
||||
new RedisQueueIntegrationTestJob(1),
|
||||
new RedisQueueIntegrationTestJob(2),
|
||||
new RedisQueueIntegrationTestJob(3),
|
||||
];
|
||||
|
||||
$this->queue->later(1000, $jobs[0]);
|
||||
$this->queue->later(-200, $jobs[1]);
|
||||
$this->queue->later(-300, $jobs[2]);
|
||||
$this->queue->later(-100, $jobs[3]);
|
||||
|
||||
$this->getPublishClient()
|
||||
->zcard('queues:default:delayed')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(4, $count);
|
||||
});
|
||||
|
||||
$this->unregisterManagers();
|
||||
|
||||
$this->assertEquals($jobs[2], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
|
||||
$this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
|
||||
$this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
|
||||
$this->assertNull($this->queue->pop());
|
||||
|
||||
$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));
|
||||
$this->assertEquals(3, $this->app['redis']->connection()->zcard('queues:default:reserved'));
|
||||
}
|
||||
|
||||
public function test_jobs_are_pushed_with_async_and_released_with_sync()
|
||||
{
|
||||
$this->queue->push(
|
||||
$job = new RedisQueueIntegrationTestJob(30)
|
||||
);
|
||||
|
||||
$this->unregisterManagers();
|
||||
|
||||
$this->getPublishClient()
|
||||
->assertCalledCount(1, 'eval');
|
||||
|
||||
$redisJob = $this->queue->pop();
|
||||
|
||||
$before = $this->currentTime();
|
||||
|
||||
$redisJob->release(1000);
|
||||
|
||||
$after = $this->currentTime();
|
||||
|
||||
// check the content of delayed queue
|
||||
$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));
|
||||
|
||||
$results = $this->app['redis']->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['withscores' => true]);
|
||||
|
||||
$payload = array_keys($results)[0];
|
||||
|
||||
$score = $results[$payload];
|
||||
|
||||
$this->assertGreaterThanOrEqual($before + 1000, $score);
|
||||
$this->assertLessThanOrEqual($after + 1000, $score);
|
||||
|
||||
$decoded = json_decode($payload);
|
||||
|
||||
$this->assertEquals(1, $decoded->attempts);
|
||||
$this->assertEquals($job, unserialize($decoded->data->command));
|
||||
|
||||
$this->assertNull($this->queue->pop());
|
||||
}
|
||||
|
||||
public function test_jobs_are_pushed_with_async_and_deleted_with_sync()
|
||||
{
|
||||
$this->queue->push(
|
||||
$job = new RedisQueueIntegrationTestJob(30)
|
||||
);
|
||||
|
||||
$this->unregisterManagers();
|
||||
|
||||
$this->getPublishClient()
|
||||
->assertCalledCount(1, 'eval');
|
||||
|
||||
$redisJob = $this->queue->pop();
|
||||
|
||||
$redisJob->delete();
|
||||
|
||||
$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:delayed'));
|
||||
$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:reserved'));
|
||||
$this->assertEquals(0, $this->app['redis']->connection()->llen('queues:default'));
|
||||
|
||||
$this->assertNull($this->queue->pop());
|
||||
}
|
||||
|
||||
public function test_jobs_are_pushed_with_async_and_cleared_with_sync()
|
||||
{
|
||||
if (! method_exists($this->queue, 'clear')) {
|
||||
$this->markTestSkipped('The Queue has no clear() method to test.');
|
||||
}
|
||||
|
||||
$job1 = new RedisQueueIntegrationTestJob(30);
|
||||
$job2 = new RedisQueueIntegrationTestJob(40);
|
||||
|
||||
$this->queue->push($job1);
|
||||
$this->queue->push($job2);
|
||||
|
||||
$this->getPublishClient()
|
||||
->assertCalledCount(2, 'eval');
|
||||
|
||||
$this->unregisterManagers();
|
||||
|
||||
$this->assertEquals(2, $this->queue->clear(null));
|
||||
$this->assertEquals(0, $this->queue->size());
|
||||
}
|
||||
|
||||
public function test_jobs_are_pushed_with_async_and_size_reflects_in_async_size()
|
||||
{
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(0, $count);
|
||||
});
|
||||
|
||||
$this->queue->push(new RedisQueueIntegrationTestJob(1));
|
||||
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->queue->later(60, new RedisQueueIntegrationTestJob(2));
|
||||
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->queue->push(new RedisQueueIntegrationTestJob(3));
|
||||
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(3, $count);
|
||||
});
|
||||
|
||||
$this->unregisterManagers();
|
||||
|
||||
$job = $this->queue->pop();
|
||||
|
||||
$this->registerManagers();
|
||||
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(3, $count);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class RedisQueueIntegrationTestJob
|
||||
{
|
||||
public $i;
|
||||
|
||||
public function __construct($i)
|
||||
{
|
||||
$this->i = $i;
|
||||
}
|
||||
|
||||
public function handle()
|
||||
{
|
||||
//
|
||||
}
|
||||
}
|
||||
|
|
@ -57,7 +57,11 @@ class LazyClient extends BaseLazyClient
|
|||
$this->calls[] = [$name, $args];
|
||||
|
||||
if (! in_array($name, ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'onMessage'])) {
|
||||
$this->redis->__call($name, $args);
|
||||
if ($name === 'eval') {
|
||||
$this->redis->{$name}(...$args);
|
||||
} else {
|
||||
$this->redis->__call($name, $args);
|
||||
}
|
||||
}
|
||||
|
||||
return new PromiseResolver(
|
||||
|
|
@ -98,6 +102,26 @@ class LazyClient extends BaseLazyClient
|
|||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the method got called.
|
||||
*
|
||||
* @param int $times
|
||||
* @param string $name
|
||||
* @return $this
|
||||
*/
|
||||
public function assertCalledCount(int $times, string $name)
|
||||
{
|
||||
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name) {
|
||||
[$calledName, ] = $function;
|
||||
|
||||
return $calledName === $name;
|
||||
});
|
||||
|
||||
PHPUnit::assertCount($times, $total);
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the method with args got called.
|
||||
*
|
||||
|
|
@ -105,7 +129,7 @@ class LazyClient extends BaseLazyClient
|
|||
* @param array $args
|
||||
* @return $this
|
||||
*/
|
||||
public function assertCalledWithArgs($name, array $args)
|
||||
public function assertCalledWithArgs(string $name, array $args)
|
||||
{
|
||||
foreach ($this->getCalledFunctions() as $function) {
|
||||
[$calledName, $calledArgs] = $function;
|
||||
|
|
@ -125,11 +149,12 @@ class LazyClient extends BaseLazyClient
|
|||
/**
|
||||
* Check if the method with args got called an amount of times.
|
||||
*
|
||||
* @param int $times
|
||||
* @param string $name
|
||||
* @param array $args
|
||||
* @return $this
|
||||
*/
|
||||
public function assertCalledWithArgsCount($times = 1, $name, array $args)
|
||||
public function assertCalledWithArgsCount(int $times, string $name, array $args)
|
||||
{
|
||||
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) {
|
||||
[$calledName, $calledArgs] = $function;
|
||||
|
|
@ -148,7 +173,7 @@ class LazyClient extends BaseLazyClient
|
|||
* @param string $name
|
||||
* @return $this
|
||||
*/
|
||||
public function assertNotCalled($name)
|
||||
public function assertNotCalled(string $name)
|
||||
{
|
||||
foreach ($this->getCalledFunctions() as $function) {
|
||||
[$calledName, ] = $function;
|
||||
|
|
@ -172,7 +197,7 @@ class LazyClient extends BaseLazyClient
|
|||
* @param array $args
|
||||
* @return $this
|
||||
*/
|
||||
public function assertNotCalledWithArgs($name, array $args)
|
||||
public function assertNotCalledWithArgs(string $name, array $args)
|
||||
{
|
||||
foreach ($this->getCalledFunctions() as $function) {
|
||||
[$calledName, $calledArgs] = $function;
|
||||
|
|
@ -192,11 +217,12 @@ class LazyClient extends BaseLazyClient
|
|||
/**
|
||||
* Check if the method with args got called an amount of times.
|
||||
*
|
||||
* @param int $times
|
||||
* @param string $name
|
||||
* @param array $args
|
||||
* @return $this
|
||||
*/
|
||||
public function assertNotCalledWithArgsCount($times = 1, $name, array $args)
|
||||
public function assertNotCalledWithArgsCount(int $times, string $name, array $args)
|
||||
{
|
||||
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) {
|
||||
[$calledName, $calledArgs] = $function;
|
||||
|
|
|
|||
|
|
@ -124,21 +124,29 @@ abstract class TestCase extends Orchestra
|
|||
'prefix' => '',
|
||||
]);
|
||||
|
||||
$app['config']->set(
|
||||
'broadcasting.connections.websockets', [
|
||||
'driver' => 'pusher',
|
||||
'key' => 'TestKey',
|
||||
'secret' => 'TestSecret',
|
||||
'app_id' => '1234',
|
||||
'options' => [
|
||||
'cluster' => 'mt1',
|
||||
'encrypted' => true,
|
||||
'host' => '127.0.0.1',
|
||||
'port' => 6001,
|
||||
'scheme' => 'http',
|
||||
],
|
||||
]
|
||||
);
|
||||
$app['config']->set('broadcasting.connections.websockets', [
|
||||
'driver' => 'pusher',
|
||||
'key' => 'TestKey',
|
||||
'secret' => 'TestSecret',
|
||||
'app_id' => '1234',
|
||||
'options' => [
|
||||
'cluster' => 'mt1',
|
||||
'encrypted' => true,
|
||||
'host' => '127.0.0.1',
|
||||
'port' => 6001,
|
||||
'scheme' => 'http',
|
||||
],
|
||||
]);
|
||||
|
||||
$app['config']->set('queue.default', 'async-redis');
|
||||
|
||||
$app['config']->set('queue.connections.async-redis', [
|
||||
'driver' => 'async-redis',
|
||||
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
|
||||
'queue' => env('REDIS_QUEUE', 'default'),
|
||||
'retry_after' => 90,
|
||||
'block_for' => null,
|
||||
]);
|
||||
|
||||
$app['config']->set('auth.providers.users.model', Models\User::class);
|
||||
|
||||
|
|
@ -244,6 +252,16 @@ abstract class TestCase extends Orchestra
|
|||
$this->channelManager = $this->app->make(ChannelManager::class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister the managers for testing purposes.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
protected function unregisterManagers()
|
||||
{
|
||||
$this->app->offsetUnset(ChannelManager::class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the statistics collectors.
|
||||
*
|
||||
|
|
|
|||
Loading…
Reference in New Issue