laravel-websockets/tests/AsyncRedisQueueTest.php

209 lines
5.8 KiB
PHP

<?php
namespace BeyondCode\LaravelWebSockets\Test;
use BeyondCode\LaravelWebSockets\Queue\AsyncRedisConnector;
use Illuminate\Queue\Queue;
use Illuminate\Support\InteractsWithTime;
use Mockery as m;
class AsyncRedisQueueTest extends TestCase
{
use InteractsWithTime;
/**
* The testing queue for Redis.
*
* @var \Illuminate\Queue\RedisQueue
*/
private $queue;
/**
* {@inheritdoc}
*/
public function setUp(): void
{
parent::setUp();
$this->runOnlyOnRedisReplication();
$connector = new AsyncRedisConnector($this->app['redis'], 'default');
$this->queue = $connector->connect([
'queue' => 'default',
'retry_after' => 60,
'block_for' => null,
]);
$this->queue->setContainer($this->app);
}
/**
* {@inheritdoc}
*/
protected function tearDown(): void
{
parent::tearDown();
m::close();
}
public function test_expired_jobs_are_pushed_with_async_and_popped_with_sync()
{
$jobs = [
new RedisQueueIntegrationTestJob(0),
new RedisQueueIntegrationTestJob(1),
new RedisQueueIntegrationTestJob(2),
new RedisQueueIntegrationTestJob(3),
];
$this->queue->later(1000, $jobs[0]);
$this->queue->later(-200, $jobs[1]);
$this->queue->later(-300, $jobs[2]);
$this->queue->later(-100, $jobs[3]);
$this->getPublishClient()->zcard('queues:default:delayed')->then(function ($count) {
$this->assertEquals(4, $count);
});
$this->unregisterManagers();
$this->assertEquals($jobs[2], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
$this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
$this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
$this->assertNull($this->queue->pop());
$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));
$this->assertEquals(3, $this->app['redis']->connection()->zcard('queues:default:reserved'));
}
public function test_jobs_are_pushed_with_async_and_released_with_sync()
{
$this->queue->push(
$job = new RedisQueueIntegrationTestJob(30)
);
$this->unregisterManagers();
$this->getPublishClient()->assertCalledCount(1, 'eval');
$redisJob = $this->queue->pop();
$before = $this->currentTime();
$redisJob->release(1000);
$after = $this->currentTime();
// check the content of delayed queue
$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));
$results = $this->app['redis']->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['withscores' => true]);
$payload = array_keys($results)[0];
$score = $results[$payload];
$this->assertGreaterThanOrEqual($before + 1000, $score);
$this->assertLessThanOrEqual($after + 1000, $score);
$decoded = json_decode($payload);
$this->assertEquals(1, $decoded->attempts);
$this->assertEquals($job, unserialize($decoded->data->command));
$this->assertNull($this->queue->pop());
}
public function test_jobs_are_pushed_with_async_and_deleted_with_sync()
{
$this->queue->push(
$job = new RedisQueueIntegrationTestJob(30)
);
$this->unregisterManagers();
$this->getPublishClient()->assertCalledCount(1, 'eval');
$redisJob = $this->queue->pop();
$redisJob->delete();
$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:delayed'));
$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:reserved'));
$this->assertEquals(0, $this->app['redis']->connection()->llen('queues:default'));
$this->assertNull($this->queue->pop());
}
public function test_jobs_are_pushed_with_async_and_cleared_with_sync()
{
if (! method_exists($this->queue, 'clear')) {
$this->markTestSkipped('The Queue has no clear() method to test.');
}
$job1 = new RedisQueueIntegrationTestJob(30);
$job2 = new RedisQueueIntegrationTestJob(40);
$this->queue->push($job1);
$this->queue->push($job2);
$this->getPublishClient()->assertCalledCount(2, 'eval');
$this->unregisterManagers();
$this->assertEquals(2, $this->queue->clear(null));
$this->assertEquals(0, $this->queue->size());
}
public function test_jobs_are_pushed_with_async_and_size_reflects_in_async_size()
{
$this->queue->size()->then(function ($count) {
$this->assertEquals(0, $count);
});
$this->queue->push(new RedisQueueIntegrationTestJob(1));
$this->queue->size()->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->queue->later(60, new RedisQueueIntegrationTestJob(2));
$this->queue->size()->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->queue->push(new RedisQueueIntegrationTestJob(3));
$this->queue->size()->then(function ($count) {
$this->assertEquals(3, $count);
});
$this->unregisterManagers();
$job = $this->queue->pop();
$this->registerManagers();
$this->queue->size()->then(function ($count) {
$this->assertEquals(3, $count);
});
}
}
class RedisQueueIntegrationTestJob
{
public $i;
public function __construct($i)
{
$this->i = $i;
}
public function handle()
{
//
}
}