diff --git a/composer.json b/composer.json index 06a9d11..33c4550 100644 --- a/composer.json +++ b/composer.json @@ -41,6 +41,7 @@ "illuminate/broadcasting": "^6.3|^7.0|^8.0", "illuminate/console": "^6.3|^7.0|^8.0", "illuminate/http": "^6.3|^7.0|^8.0", + "illuminate/queue": "^6.3|^7.0|^8.0", "illuminate/routing": "^6.3|^7.0|^8.0", "illuminate/support": "^6.3|^7.0|^8.0", "pusher/pusher-php-server": "^3.0|^4.0", diff --git a/config/websockets.php b/config/websockets.php index 9dcd4f6..9bb34b4 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -137,7 +137,7 @@ return [ 'redis' => [ - 'connection' => 'default', + 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), /* |-------------------------------------------------------------------------- diff --git a/docs/horizontal-scaling/redis.md b/docs/horizontal-scaling/redis.md index 4f63835..86759db 100644 --- a/docs/horizontal-scaling/redis.md +++ b/docs/horizontal-scaling/redis.md @@ -40,3 +40,29 @@ You can set the connection name to the Redis database under `redis`: ``` The connections can be found in your `config/database.php` file, under the `redis` key. + +## Async Redis Queue + +The default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server. + +To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example. + +Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver: + +```php +'connections' => [ + 'async-redis' => [ + 'driver' => 'async-redis', + 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), + 'queue' => env('REDIS_QUEUE', 'default'), + 'retry_after' => 90, + 'block_for' => null, + ], +] +``` + +Also, make sure that the default queue driver is set to `async-redis`: + +``` +QUEUE_CONNECTION=async-redis +``` diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index c099bbf..51a6d59 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -519,6 +519,16 @@ class RedisChannelManager extends LocalChannelManager return $this->publishClient; } + /** + * Get the Redis client used by other classes. + * + * @return Client + */ + public function getRedisClient() + { + return $this->getPublishClient(); + } + /** * Get the unique identifier for the server. * diff --git a/src/Queue/AsyncRedisConnector.php b/src/Queue/AsyncRedisConnector.php new file mode 100644 index 0000000..ac730c3 --- /dev/null +++ b/src/Queue/AsyncRedisConnector.php @@ -0,0 +1,24 @@ +redis, $config['queue'], + $config['connection'] ?? $this->connection, + $config['retry_after'] ?? 60, + $config['block_for'] ?? null + ); + } +} diff --git a/src/Queue/AsyncRedisQueue.php b/src/Queue/AsyncRedisQueue.php new file mode 100644 index 0000000..6f9874d --- /dev/null +++ b/src/Queue/AsyncRedisQueue.php @@ -0,0 +1,25 @@ +container->bound(ChannelManager::class) + ? $this->container->make(ChannelManager::class) + : null; + + return $channelManager && method_exists($channelManager, 'getRedisClient') + ? $channelManager->getRedisClient() + : parent::getConnection(); + } +} diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index e498c11..f513caa 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -11,6 +11,7 @@ use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowStatistics; use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard; use BeyondCode\LaravelWebSockets\Server\Router; use Illuminate\Support\Facades\Gate; +use Illuminate\Support\Facades\Queue; use Illuminate\Support\Facades\Route; use Illuminate\Support\ServiceProvider; @@ -36,6 +37,12 @@ class WebSocketsServiceProvider extends ServiceProvider __DIR__.'/../database/migrations/0000_00_00_000000_rename_statistics_counters.php' => database_path('migrations/0000_00_00_000000_rename_statistics_counters.php'), ], 'migrations'); + $this->registerAsyncRedisQueueDriver(); + + $this->registerRouter(); + + $this->registerManagers(); + $this->registerStatistics(); $this->registerDashboard(); @@ -50,8 +57,19 @@ class WebSocketsServiceProvider extends ServiceProvider */ public function register() { - $this->registerRouter(); - $this->registerManagers(); + // + } + + /** + * Register the async, non-blocking Redis queue driver. + * + * @return void + */ + protected function registerAsyncRedisQueueDriver() + { + Queue::extend('async-redis', function () { + return new Queue\AsyncRedisConnector($this->app['redis']); + }); } /** diff --git a/tests/AsyncRedisQueueTest.php b/tests/AsyncRedisQueueTest.php new file mode 100644 index 0000000..89db9cd --- /dev/null +++ b/tests/AsyncRedisQueueTest.php @@ -0,0 +1,213 @@ +runOnlyOnRedisReplication(); + + $connector = new AsyncRedisConnector($this->app['redis'], 'default'); + + $this->queue = $connector->connect([ + 'queue' => 'default', + 'retry_after' => 60, + 'block_for' => null, + ]); + + $this->queue->setContainer($this->app); + } + + /** + * {@inheritdoc} + */ + protected function tearDown(): void + { + parent::tearDown(); + + m::close(); + } + + public function test_expired_jobs_are_pushed_with_async_and_popped_with_sync() + { + $jobs = [ + new RedisQueueIntegrationTestJob(0), + new RedisQueueIntegrationTestJob(1), + new RedisQueueIntegrationTestJob(2), + new RedisQueueIntegrationTestJob(3), + ]; + + $this->queue->later(1000, $jobs[0]); + $this->queue->later(-200, $jobs[1]); + $this->queue->later(-300, $jobs[2]); + $this->queue->later(-100, $jobs[3]); + + $this->getPublishClient() + ->zcard('queues:default:delayed') + ->then(function ($count) { + $this->assertEquals(4, $count); + }); + + $this->unregisterManagers(); + + $this->assertEquals($jobs[2], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $this->assertNull($this->queue->pop()); + + $this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed')); + $this->assertEquals(3, $this->app['redis']->connection()->zcard('queues:default:reserved')); + } + + public function test_jobs_are_pushed_with_async_and_released_with_sync() + { + $this->queue->push( + $job = new RedisQueueIntegrationTestJob(30) + ); + + $this->unregisterManagers(); + + $this->getPublishClient() + ->assertCalledCount(1, 'eval'); + + $redisJob = $this->queue->pop(); + + $before = $this->currentTime(); + + $redisJob->release(1000); + + $after = $this->currentTime(); + + // check the content of delayed queue + $this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed')); + + $results = $this->app['redis']->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['withscores' => true]); + + $payload = array_keys($results)[0]; + + $score = $results[$payload]; + + $this->assertGreaterThanOrEqual($before + 1000, $score); + $this->assertLessThanOrEqual($after + 1000, $score); + + $decoded = json_decode($payload); + + $this->assertEquals(1, $decoded->attempts); + $this->assertEquals($job, unserialize($decoded->data->command)); + + $this->assertNull($this->queue->pop()); + } + + public function test_jobs_are_pushed_with_async_and_deleted_with_sync() + { + $this->queue->push( + $job = new RedisQueueIntegrationTestJob(30) + ); + + $this->unregisterManagers(); + + $this->getPublishClient() + ->assertCalledCount(1, 'eval'); + + $redisJob = $this->queue->pop(); + + $redisJob->delete(); + + $this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:delayed')); + $this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:reserved')); + $this->assertEquals(0, $this->app['redis']->connection()->llen('queues:default')); + + $this->assertNull($this->queue->pop()); + } + + public function test_jobs_are_pushed_with_async_and_cleared_with_sync() + { + if (! method_exists($this->queue, 'clear')) { + $this->markTestSkipped('The Queue has no clear() method to test.'); + } + + $job1 = new RedisQueueIntegrationTestJob(30); + $job2 = new RedisQueueIntegrationTestJob(40); + + $this->queue->push($job1); + $this->queue->push($job2); + + $this->getPublishClient() + ->assertCalledCount(2, 'eval'); + + $this->unregisterManagers(); + + $this->assertEquals(2, $this->queue->clear(null)); + $this->assertEquals(0, $this->queue->size()); + } + + public function test_jobs_are_pushed_with_async_and_size_reflects_in_async_size() + { + $this->queue->size()->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->queue->push(new RedisQueueIntegrationTestJob(1)); + + $this->queue->size()->then(function ($count) { + $this->assertEquals(1, $count); + }); + + $this->queue->later(60, new RedisQueueIntegrationTestJob(2)); + + $this->queue->size()->then(function ($count) { + $this->assertEquals(2, $count); + }); + + $this->queue->push(new RedisQueueIntegrationTestJob(3)); + + $this->queue->size()->then(function ($count) { + $this->assertEquals(3, $count); + }); + + $this->unregisterManagers(); + + $job = $this->queue->pop(); + + $this->registerManagers(); + + $this->queue->size()->then(function ($count) { + $this->assertEquals(3, $count); + }); + } +} + +class RedisQueueIntegrationTestJob +{ + public $i; + + public function __construct($i) + { + $this->i = $i; + } + + public function handle() + { + // + } +} diff --git a/tests/Mocks/LazyClient.php b/tests/Mocks/LazyClient.php index abd07ce..539e7db 100644 --- a/tests/Mocks/LazyClient.php +++ b/tests/Mocks/LazyClient.php @@ -57,7 +57,11 @@ class LazyClient extends BaseLazyClient $this->calls[] = [$name, $args]; if (! in_array($name, ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'onMessage'])) { - $this->redis->__call($name, $args); + if ($name === 'eval') { + $this->redis->{$name}(...$args); + } else { + $this->redis->__call($name, $args); + } } return new PromiseResolver( @@ -98,6 +102,26 @@ class LazyClient extends BaseLazyClient return $this; } + /** + * Check if the method got called. + * + * @param int $times + * @param string $name + * @return $this + */ + public function assertCalledCount(int $times, string $name) + { + $total = collect($this->getCalledFunctions())->filter(function ($function) use ($name) { + [$calledName, ] = $function; + + return $calledName === $name; + }); + + PHPUnit::assertCount($times, $total); + + return $this; + } + /** * Check if the method with args got called. * @@ -105,7 +129,7 @@ class LazyClient extends BaseLazyClient * @param array $args * @return $this */ - public function assertCalledWithArgs($name, array $args) + public function assertCalledWithArgs(string $name, array $args) { foreach ($this->getCalledFunctions() as $function) { [$calledName, $calledArgs] = $function; @@ -125,11 +149,12 @@ class LazyClient extends BaseLazyClient /** * Check if the method with args got called an amount of times. * + * @param int $times * @param string $name * @param array $args * @return $this */ - public function assertCalledWithArgsCount($times = 1, $name, array $args) + public function assertCalledWithArgsCount(int $times, string $name, array $args) { $total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) { [$calledName, $calledArgs] = $function; @@ -148,7 +173,7 @@ class LazyClient extends BaseLazyClient * @param string $name * @return $this */ - public function assertNotCalled($name) + public function assertNotCalled(string $name) { foreach ($this->getCalledFunctions() as $function) { [$calledName, ] = $function; @@ -172,7 +197,7 @@ class LazyClient extends BaseLazyClient * @param array $args * @return $this */ - public function assertNotCalledWithArgs($name, array $args) + public function assertNotCalledWithArgs(string $name, array $args) { foreach ($this->getCalledFunctions() as $function) { [$calledName, $calledArgs] = $function; @@ -192,11 +217,12 @@ class LazyClient extends BaseLazyClient /** * Check if the method with args got called an amount of times. * + * @param int $times * @param string $name * @param array $args * @return $this */ - public function assertNotCalledWithArgsCount($times = 1, $name, array $args) + public function assertNotCalledWithArgsCount(int $times, string $name, array $args) { $total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) { [$calledName, $calledArgs] = $function; diff --git a/tests/TestCase.php b/tests/TestCase.php index 6544731..bcf7e28 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -124,21 +124,29 @@ abstract class TestCase extends Orchestra 'prefix' => '', ]); - $app['config']->set( - 'broadcasting.connections.websockets', [ - 'driver' => 'pusher', - 'key' => 'TestKey', - 'secret' => 'TestSecret', - 'app_id' => '1234', - 'options' => [ - 'cluster' => 'mt1', - 'encrypted' => true, - 'host' => '127.0.0.1', - 'port' => 6001, - 'scheme' => 'http', - ], - ] - ); + $app['config']->set('broadcasting.connections.websockets', [ + 'driver' => 'pusher', + 'key' => 'TestKey', + 'secret' => 'TestSecret', + 'app_id' => '1234', + 'options' => [ + 'cluster' => 'mt1', + 'encrypted' => true, + 'host' => '127.0.0.1', + 'port' => 6001, + 'scheme' => 'http', + ], + ]); + + $app['config']->set('queue.default', 'async-redis'); + + $app['config']->set('queue.connections.async-redis', [ + 'driver' => 'async-redis', + 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), + 'queue' => env('REDIS_QUEUE', 'default'), + 'retry_after' => 90, + 'block_for' => null, + ]); $app['config']->set('auth.providers.users.model', Models\User::class); @@ -244,6 +252,16 @@ abstract class TestCase extends Orchestra $this->channelManager = $this->app->make(ChannelManager::class); } + /** + * Unregister the managers for testing purposes. + * + * @return void + */ + protected function unregisterManagers() + { + $this->app->offsetUnset(ChannelManager::class); + } + /** * Register the statistics collectors. *