Added integrations for Redis
This commit is contained in:
parent
6c8c748b58
commit
fd1a459047
|
|
@ -57,8 +57,12 @@ class LazyClient extends BaseLazyClient
|
|||
$this->calls[] = [$name, $args];
|
||||
|
||||
if (! in_array($name, ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'onMessage'])) {
|
||||
if ($name === 'eval') {
|
||||
$this->redis->{$name}(...$args);
|
||||
} else {
|
||||
$this->redis->__call($name, $args);
|
||||
}
|
||||
}
|
||||
|
||||
return new PromiseResolver(
|
||||
parent::__call($name, $args), $this->loop
|
||||
|
|
@ -98,6 +102,26 @@ class LazyClient extends BaseLazyClient
|
|||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the method got called.
|
||||
*
|
||||
* @param int $times
|
||||
* @param string $name
|
||||
* @return $this
|
||||
*/
|
||||
public function assertCalledCount(int $times, string $name)
|
||||
{
|
||||
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name) {
|
||||
[$calledName, ] = $function;
|
||||
|
||||
return $calledName === $name;
|
||||
});
|
||||
|
||||
PHPUnit::assertCount($times, $total);
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the method with args got called.
|
||||
*
|
||||
|
|
@ -105,7 +129,7 @@ class LazyClient extends BaseLazyClient
|
|||
* @param array $args
|
||||
* @return $this
|
||||
*/
|
||||
public function assertCalledWithArgs($name, array $args)
|
||||
public function assertCalledWithArgs(string $name, array $args)
|
||||
{
|
||||
foreach ($this->getCalledFunctions() as $function) {
|
||||
[$calledName, $calledArgs] = $function;
|
||||
|
|
@ -125,11 +149,12 @@ class LazyClient extends BaseLazyClient
|
|||
/**
|
||||
* Check if the method with args got called an amount of times.
|
||||
*
|
||||
* @param int $times
|
||||
* @param string $name
|
||||
* @param array $args
|
||||
* @return $this
|
||||
*/
|
||||
public function assertCalledWithArgsCount($times = 1, $name, array $args)
|
||||
public function assertCalledWithArgsCount(int $times, string $name, array $args)
|
||||
{
|
||||
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) {
|
||||
[$calledName, $calledArgs] = $function;
|
||||
|
|
@ -148,7 +173,7 @@ class LazyClient extends BaseLazyClient
|
|||
* @param string $name
|
||||
* @return $this
|
||||
*/
|
||||
public function assertNotCalled($name)
|
||||
public function assertNotCalled(string $name)
|
||||
{
|
||||
foreach ($this->getCalledFunctions() as $function) {
|
||||
[$calledName, ] = $function;
|
||||
|
|
@ -172,7 +197,7 @@ class LazyClient extends BaseLazyClient
|
|||
* @param array $args
|
||||
* @return $this
|
||||
*/
|
||||
public function assertNotCalledWithArgs($name, array $args)
|
||||
public function assertNotCalledWithArgs(string $name, array $args)
|
||||
{
|
||||
foreach ($this->getCalledFunctions() as $function) {
|
||||
[$calledName, $calledArgs] = $function;
|
||||
|
|
@ -192,11 +217,12 @@ class LazyClient extends BaseLazyClient
|
|||
/**
|
||||
* Check if the method with args got called an amount of times.
|
||||
*
|
||||
* @param int $times
|
||||
* @param string $name
|
||||
* @param array $args
|
||||
* @return $this
|
||||
*/
|
||||
public function assertNotCalledWithArgsCount($times = 1, $name, array $args)
|
||||
public function assertNotCalledWithArgsCount(int $times, string $name, array $args)
|
||||
{
|
||||
$total = collect($this->getCalledFunctions())->filter(function ($function) use ($name, $args) {
|
||||
[$calledName, $calledArgs] = $function;
|
||||
|
|
|
|||
|
|
@ -2,18 +2,28 @@
|
|||
|
||||
namespace BeyondCode\LaravelWebSockets\Test;
|
||||
|
||||
use BeyondCode\LaravelWebSockets\Queue\AsyncRedisQueue;
|
||||
use Illuminate\Container\Container;
|
||||
use Illuminate\Contracts\Redis\Factory;
|
||||
use Illuminate\Queue\Jobs\RedisJob;
|
||||
use Illuminate\Queue\Queue;
|
||||
use Illuminate\Queue\RedisQueue;
|
||||
use Illuminate\Support\Carbon;
|
||||
use Illuminate\Support\InteractsWithTime;
|
||||
use Illuminate\Support\Str;
|
||||
use Mockery as m;
|
||||
use stdClass;
|
||||
|
||||
class RedisQueueTest extends TestCase
|
||||
{
|
||||
use InteractsWithTime;
|
||||
|
||||
/**
|
||||
* The testing queue for Redis.
|
||||
*
|
||||
* @var \Illuminate\Queue\RedisQueue
|
||||
*/
|
||||
private $queue;
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
|
|
@ -22,6 +32,12 @@ class RedisQueueTest extends TestCase
|
|||
parent::setUp();
|
||||
|
||||
$this->runOnlyOnRedisReplication();
|
||||
|
||||
$this->queue = new AsyncRedisQueue(
|
||||
$this->app['redis'], 'default', null, 60, null
|
||||
);
|
||||
|
||||
$this->queue->setContainer($this->app);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -29,234 +45,167 @@ class RedisQueueTest extends TestCase
|
|||
*/
|
||||
protected function tearDown(): void
|
||||
{
|
||||
parent::tearDown();
|
||||
|
||||
m::close();
|
||||
}
|
||||
|
||||
public function testPushProperlyPushesJobOntoRedis()
|
||||
public function test_expired_jobs_are_popped()
|
||||
{
|
||||
$uuid = Str::uuid();
|
||||
$jobs = [
|
||||
new RedisQueueIntegrationTestJob(0),
|
||||
new RedisQueueIntegrationTestJob(1),
|
||||
new RedisQueueIntegrationTestJob(2),
|
||||
new RedisQueueIntegrationTestJob(3),
|
||||
];
|
||||
|
||||
Str::createUuidsUsing(function () use ($uuid) {
|
||||
return $uuid;
|
||||
$this->queue->later(1000, $jobs[0]);
|
||||
$this->queue->later(-200, $jobs[1]);
|
||||
$this->queue->later(-300, $jobs[2]);
|
||||
$this->queue->later(-100, $jobs[3]);
|
||||
|
||||
$this->getPublishClient()
|
||||
->zcard('queues:default:delayed')
|
||||
->then(function ($count) {
|
||||
$this->assertEquals(4, $count);
|
||||
});
|
||||
|
||||
$queue = $this->getMockBuilder(RedisQueue::class)
|
||||
->setMethods(['getRandomId'])
|
||||
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
|
||||
->getMock();
|
||||
$this->unregisterManagers();
|
||||
|
||||
$queue->expects($this->once())
|
||||
->method('getRandomId')
|
||||
->willReturn('foo');
|
||||
$this->assertEquals($jobs[2], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
|
||||
$this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
|
||||
$this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
|
||||
$this->assertNull($this->queue->pop());
|
||||
|
||||
$redis->shouldReceive('connection')
|
||||
->once()
|
||||
->andReturn($redis);
|
||||
|
||||
$redis->shouldReceive('eval')->once();
|
||||
|
||||
$id = $queue->push('foo', ['data']);
|
||||
|
||||
$this->assertSame('foo', $id);
|
||||
|
||||
Str::createUuidsNormally();
|
||||
$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));
|
||||
$this->assertEquals(3, $this->app['redis']->connection()->zcard('queues:default:reserved'));
|
||||
}
|
||||
|
||||
public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook()
|
||||
public function test_release_job()
|
||||
{
|
||||
$uuid = Str::uuid();
|
||||
|
||||
Str::createUuidsUsing(function () use ($uuid) {
|
||||
return $uuid;
|
||||
});
|
||||
|
||||
$queue = $this->getMockBuilder(RedisQueue::class)
|
||||
->setMethods(['getRandomId'])
|
||||
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
|
||||
->getMock();
|
||||
|
||||
$queue->expects($this->once())
|
||||
->method('getRandomId')
|
||||
->willReturn('foo');
|
||||
|
||||
$redis->shouldReceive('connection')
|
||||
->once()
|
||||
->andReturn($redis);
|
||||
|
||||
$redis->shouldReceive('eval')->once();
|
||||
|
||||
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
|
||||
return ['custom' => 'taylor'];
|
||||
});
|
||||
|
||||
$id = $queue->push('foo', ['data']);
|
||||
|
||||
$this->assertSame('foo', $id);
|
||||
|
||||
Queue::createPayloadUsing(null);
|
||||
|
||||
Str::createUuidsNormally();
|
||||
}
|
||||
|
||||
public function testPushProperlyPushesJobOntoRedisWithTwoCustomPayloadHook()
|
||||
{
|
||||
$uuid = Str::uuid();
|
||||
|
||||
Str::createUuidsUsing(function () use ($uuid) {
|
||||
return $uuid;
|
||||
});
|
||||
|
||||
$queue = $this->getMockBuilder(RedisQueue::class)
|
||||
->setMethods(['getRandomId'])
|
||||
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
|
||||
->getMock();
|
||||
|
||||
$queue->expects($this->once())
|
||||
->method('getRandomId')
|
||||
->willReturn('foo');
|
||||
|
||||
$redis->shouldReceive('connection')
|
||||
->once()
|
||||
->andReturn($redis);
|
||||
|
||||
$redis->shouldReceive('eval')->once();
|
||||
|
||||
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
|
||||
return ['custom' => 'taylor'];
|
||||
});
|
||||
|
||||
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
|
||||
return ['bar' => 'foo'];
|
||||
});
|
||||
|
||||
$id = $queue->push('foo', ['data']);
|
||||
|
||||
$this->assertSame('foo', $id);
|
||||
|
||||
Queue::createPayloadUsing(null);
|
||||
|
||||
Str::createUuidsNormally();
|
||||
}
|
||||
|
||||
public function testDelayedPushProperlyPushesJobOntoRedis()
|
||||
{
|
||||
$uuid = Str::uuid();
|
||||
|
||||
Str::createUuidsUsing(function () use ($uuid) {
|
||||
return $uuid;
|
||||
});
|
||||
|
||||
$queue = $this->getMockBuilder(RedisQueue::class)
|
||||
->setMethods(['availableAt', 'getRandomId'])
|
||||
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
|
||||
->getMock();
|
||||
|
||||
$queue->expects($this->once())
|
||||
->method('getRandomId')
|
||||
->willReturn('foo');
|
||||
|
||||
$queue->expects($this->once())
|
||||
->method('availableAt')
|
||||
->with(1)
|
||||
->willReturn(2);
|
||||
|
||||
$redis->shouldReceive('connection')
|
||||
->once()
|
||||
->andReturn($redis);
|
||||
|
||||
$redis->shouldReceive('zadd')->once();
|
||||
|
||||
$id = $queue->later(1, 'foo', ['data']);
|
||||
|
||||
$this->assertSame('foo', $id);
|
||||
|
||||
Str::createUuidsNormally();
|
||||
}
|
||||
|
||||
public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
|
||||
{
|
||||
$uuid = Str::uuid();
|
||||
|
||||
Str::createUuidsUsing(function () use ($uuid) {
|
||||
return $uuid;
|
||||
});
|
||||
|
||||
$date = Carbon::now();
|
||||
|
||||
$queue = $this->getMockBuilder(RedisQueue::class)
|
||||
->setMethods(['availableAt', 'getRandomId'])
|
||||
->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])
|
||||
->getMock();
|
||||
|
||||
$queue->expects($this->once())
|
||||
->method('getRandomId')
|
||||
->willReturn('foo');
|
||||
|
||||
$queue->expects($this->once())
|
||||
->method('availableAt')
|
||||
->with($date)
|
||||
->willReturn(2);
|
||||
|
||||
$redis->shouldReceive('connection')
|
||||
->once()
|
||||
->andReturn($redis);
|
||||
|
||||
$redis->shouldReceive('zadd')->once();
|
||||
|
||||
$queue->later($date, 'foo', ['data']);
|
||||
|
||||
Str::createUuidsNormally();
|
||||
}
|
||||
|
||||
public function testFireProperlyCallsTheJobHandler()
|
||||
{
|
||||
$job = $this->getJob();
|
||||
|
||||
$job->getContainer()
|
||||
->shouldReceive('make')
|
||||
->once()->with('foo')
|
||||
->andReturn($handler = m::mock(stdClass::class));
|
||||
|
||||
$handler->shouldReceive('fire')
|
||||
->once()
|
||||
->with($job, ['data']);
|
||||
|
||||
$job->fire();
|
||||
}
|
||||
|
||||
public function testDeleteRemovesTheJobFromRedis()
|
||||
{
|
||||
$job = $this->getJob();
|
||||
|
||||
$job->getRedisQueue()
|
||||
->shouldReceive('deleteReserved')
|
||||
->once()
|
||||
->with('default', $job);
|
||||
|
||||
$job->delete();
|
||||
}
|
||||
|
||||
public function testReleaseProperlyReleasesJobOntoRedis()
|
||||
{
|
||||
$job = $this->getJob();
|
||||
|
||||
$job->getRedisQueue()
|
||||
->shouldReceive('deleteAndRelease')
|
||||
->once()
|
||||
->with('default', $job, 1);
|
||||
|
||||
$job->release(1);
|
||||
}
|
||||
|
||||
protected function getJob()
|
||||
{
|
||||
return new RedisJob(
|
||||
m::mock(Container::class),
|
||||
m::mock(RedisQueue::class),
|
||||
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 1]),
|
||||
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2]),
|
||||
'connection-name',
|
||||
'default'
|
||||
$this->queue->push(
|
||||
$job = new RedisQueueIntegrationTestJob(30)
|
||||
);
|
||||
|
||||
$this->unregisterManagers();
|
||||
|
||||
$this->getPublishClient()
|
||||
->assertCalledCount(1, 'eval');
|
||||
|
||||
$redisJob = $this->queue->pop();
|
||||
|
||||
$before = $this->currentTime();
|
||||
|
||||
$redisJob->release(1000);
|
||||
|
||||
$after = $this->currentTime();
|
||||
|
||||
// check the content of delayed queue
|
||||
$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));
|
||||
|
||||
$results = $this->app['redis']->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['withscores' => true]);
|
||||
|
||||
$payload = array_keys($results)[0];
|
||||
|
||||
$score = $results[$payload];
|
||||
|
||||
$this->assertGreaterThanOrEqual($before + 1000, $score);
|
||||
$this->assertLessThanOrEqual($after + 1000, $score);
|
||||
|
||||
$decoded = json_decode($payload);
|
||||
|
||||
$this->assertEquals(1, $decoded->attempts);
|
||||
$this->assertEquals($job, unserialize($decoded->data->command));
|
||||
|
||||
$this->assertNull($this->queue->pop());
|
||||
}
|
||||
|
||||
public function test_delete_job()
|
||||
{
|
||||
$this->queue->push(
|
||||
$job = new RedisQueueIntegrationTestJob(30)
|
||||
);
|
||||
|
||||
$this->unregisterManagers();
|
||||
|
||||
$this->getPublishClient()
|
||||
->assertCalledCount(1, 'eval');
|
||||
|
||||
$redisJob = $this->queue->pop();
|
||||
|
||||
$redisJob->delete();
|
||||
|
||||
$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:delayed'));
|
||||
$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:reserved'));
|
||||
$this->assertEquals(0, $this->app['redis']->connection()->llen('queues:default'));
|
||||
|
||||
$this->assertNull($this->queue->pop());
|
||||
}
|
||||
|
||||
public function test_clear_job()
|
||||
{
|
||||
$job1 = new RedisQueueIntegrationTestJob(30);
|
||||
$job2 = new RedisQueueIntegrationTestJob(40);
|
||||
|
||||
$this->queue->push($job1);
|
||||
$this->queue->push($job2);
|
||||
|
||||
$this->getPublishClient()
|
||||
->assertCalledCount(2, 'eval');
|
||||
|
||||
$this->unregisterManagers();
|
||||
|
||||
$this->assertEquals(2, $this->queue->clear(null));
|
||||
$this->assertEquals(0, $this->queue->size());
|
||||
}
|
||||
|
||||
public function test_size_job()
|
||||
{
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(0, $count);
|
||||
});
|
||||
|
||||
$this->queue->push(new RedisQueueIntegrationTestJob(1));
|
||||
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(1, $count);
|
||||
});
|
||||
|
||||
$this->queue->later(60, new RedisQueueIntegrationTestJob(2));
|
||||
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(2, $count);
|
||||
});
|
||||
|
||||
$this->queue->push(new RedisQueueIntegrationTestJob(3));
|
||||
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(3, $count);
|
||||
});
|
||||
|
||||
$this->unregisterManagers();
|
||||
|
||||
$job = $this->queue->pop();
|
||||
|
||||
$this->registerManagers();
|
||||
|
||||
$this->queue->size()->then(function ($count) {
|
||||
$this->assertEquals(3, $count);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class RedisQueueIntegrationTestJob
|
||||
{
|
||||
public $i;
|
||||
|
||||
public function __construct($i)
|
||||
{
|
||||
$this->i = $i;
|
||||
}
|
||||
|
||||
public function handle()
|
||||
{
|
||||
//
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -252,6 +252,16 @@ abstract class TestCase extends Orchestra
|
|||
$this->channelManager = $this->app->make(ChannelManager::class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister the managers for testing purposes.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
protected function unregisterManagers()
|
||||
{
|
||||
$this->app->offsetUnset(ChannelManager::class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the statistics collectors.
|
||||
*
|
||||
|
|
|
|||
Loading…
Reference in New Issue