Add tests for replication, fix bugs in the implementation
This commit is contained in:
parent
4baac7ef00
commit
b7ae9bac4a
|
|
@ -15,6 +15,6 @@ class FetchChannelController extends Controller
|
||||||
throw new HttpException(404, "Unknown channel `{$request->channelName}`.");
|
throw new HttpException(404, "Unknown channel `{$request->channelName}`.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return $channel->toArray();
|
return $channel->toArray($request->appId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,126 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\PubSub\Fake;
|
||||||
|
|
||||||
|
use stdClass;
|
||||||
|
use React\EventLoop\LoopInterface;
|
||||||
|
use React\Promise\FulfilledPromise;
|
||||||
|
use React\Promise\PromiseInterface;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
||||||
|
|
||||||
|
class FakeReplication implements ReplicationInterface
|
||||||
|
{
|
||||||
|
protected $channels = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Boot the pub/sub provider (open connections, initial subscriptions, etc).
|
||||||
|
*
|
||||||
|
* @param LoopInterface $loop
|
||||||
|
* @return self
|
||||||
|
*/
|
||||||
|
public function boot(LoopInterface $loop) : ReplicationInterface
|
||||||
|
{
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a payload on a specific channel, for a specific app.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param stdClass $payload
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function publish(string $appId, string $channel, stdClass $payload) : bool
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe to receive messages for a channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function subscribe(string $appId, string $channel) : bool
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unsubscribe from a channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function unsubscribe(string $appId, string $channel) : bool
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a member to a channel. To be called when they have
|
||||||
|
* subscribed to the channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param string $socketId
|
||||||
|
* @param string $data
|
||||||
|
*/
|
||||||
|
public function joinChannel(string $appId, string $channel, string $socketId, string $data)
|
||||||
|
{
|
||||||
|
$this->channels["$appId:$channel"][$socketId] = $data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a member from the channel. To be called when they have
|
||||||
|
* unsubscribed from the channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param string $socketId
|
||||||
|
*/
|
||||||
|
public function leaveChannel(string $appId, string $channel, string $socketId)
|
||||||
|
{
|
||||||
|
unset($this->channels["$appId:$channel"][$socketId]);
|
||||||
|
if (empty($this->channels["$appId:$channel"])) {
|
||||||
|
unset($this->channels["$appId:$channel"]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the full information about the members in a presence channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function channelMembers(string $appId, string $channel) : PromiseInterface
|
||||||
|
{
|
||||||
|
$data = array_map(function ($user) {
|
||||||
|
return json_decode($user);
|
||||||
|
}, $this->channels["$appId:$channel"]);
|
||||||
|
|
||||||
|
return new FulfilledPromise($data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the amount of users subscribed for each presence channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param array $channelNames
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface
|
||||||
|
{
|
||||||
|
$data = [];
|
||||||
|
|
||||||
|
foreach ($channelNames as $channel) {
|
||||||
|
$data[$channel] = count($this->channels["$appId:$channel"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new FulfilledPromise($data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -223,9 +223,9 @@ class RedisClient implements ReplicationInterface
|
||||||
return $this->publishClient->__call('hgetall', ["$appId:$channel"])
|
return $this->publishClient->__call('hgetall', ["$appId:$channel"])
|
||||||
->then(function ($members) {
|
->then(function ($members) {
|
||||||
// The data is expected as objects, so we need to JSON decode
|
// The data is expected as objects, so we need to JSON decode
|
||||||
return array_walk($members, function ($user) {
|
return array_map(function ($user) {
|
||||||
return json_decode($user);
|
return json_decode($user);
|
||||||
});
|
}, $members);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ interface ReplicationInterface
|
||||||
/**
|
/**
|
||||||
* Subscribe to receive messages for a channel.
|
* Subscribe to receive messages for a channel.
|
||||||
*
|
*
|
||||||
|
* @param string $appId
|
||||||
* @param string $channel
|
* @param string $channel
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
|
|
@ -37,6 +38,7 @@ interface ReplicationInterface
|
||||||
/**
|
/**
|
||||||
* Unsubscribe from a channel.
|
* Unsubscribe from a channel.
|
||||||
*
|
*
|
||||||
|
* @param string $appId
|
||||||
* @param string $channel
|
* @param string $channel
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ class Channel
|
||||||
if (config('websockets.replication.enabled') === true) {
|
if (config('websockets.replication.enabled') === true) {
|
||||||
// Also broadcast via the other websocket servers
|
// Also broadcast via the other websocket servers
|
||||||
app(ReplicationInterface::class)
|
app(ReplicationInterface::class)
|
||||||
->publish($connection->app->id, $payload);
|
->publish($connection->app->id, $this->channelName, $payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->broadcastToEveryoneExcept($payload, $connection->socketId);
|
$this->broadcastToEveryoneExcept($payload, $connection->socketId);
|
||||||
|
|
@ -139,7 +139,7 @@ class Channel
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function toArray()
|
public function toArray(string $appId = null)
|
||||||
{
|
{
|
||||||
return [
|
return [
|
||||||
'occupied' => count($this->subscribedConnections) > 0,
|
'occupied' => count($this->subscribedConnections) > 0,
|
||||||
|
|
|
||||||
|
|
@ -82,7 +82,7 @@ class PresenceChannel extends Channel
|
||||||
]));
|
]));
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->broadcastToOthers($connection, [
|
$this->broadcastToOthers($connection, (object) [
|
||||||
'event' => 'pusher_internal:member_added',
|
'event' => 'pusher_internal:member_added',
|
||||||
'channel' => $this->channelName,
|
'channel' => $this->channelName,
|
||||||
'data' => json_encode($channelData),
|
'data' => json_encode($channelData),
|
||||||
|
|
@ -107,7 +107,7 @@ class PresenceChannel extends Channel
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->broadcastToOthers($connection, [
|
$this->broadcastToOthers($connection, (object) [
|
||||||
'event' => 'pusher_internal:member_removed',
|
'event' => 'pusher_internal:member_removed',
|
||||||
'channel' => $this->channelName,
|
'channel' => $this->channelName,
|
||||||
'data' => json_encode([
|
'data' => json_encode([
|
||||||
|
|
@ -119,6 +119,7 @@ class PresenceChannel extends Channel
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @param string|null $appId
|
||||||
* @return PromiseInterface|array
|
* @return PromiseInterface|array
|
||||||
*/
|
*/
|
||||||
public function toArray(string $appId = null)
|
public function toArray(string $appId = null)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\Channels;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class ChannelReplicationTest extends ChannelTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -123,7 +123,7 @@ class ChannelTest extends TestCase
|
||||||
|
|
||||||
$channel = $this->getChannel($connection1, 'test-channel');
|
$channel = $this->getChannel($connection1, 'test-channel');
|
||||||
|
|
||||||
$channel->broadcastToOthers($connection1, [
|
$channel->broadcastToOthers($connection1, (object) [
|
||||||
'event' => 'broadcasted-event',
|
'event' => 'broadcasted-event',
|
||||||
'channel' => 'test-channel',
|
'channel' => 'test-channel',
|
||||||
]);
|
]);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\Channels;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class PresenceChannelReplicationTest extends PresenceChannelTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -59,4 +59,75 @@ class PresenceChannelTest extends TestCase
|
||||||
'channel' => 'presence-channel',
|
'channel' => 'presence-channel',
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @test */
|
||||||
|
public function clients_with_valid_auth_signatures_can_leave_presence_channels()
|
||||||
|
{
|
||||||
|
$connection = $this->getWebSocketConnection();
|
||||||
|
|
||||||
|
$this->pusherServer->onOpen($connection);
|
||||||
|
|
||||||
|
$channelData = [
|
||||||
|
'user_id' => 1,
|
||||||
|
'user_info' => [
|
||||||
|
'name' => 'Marcel',
|
||||||
|
],
|
||||||
|
];
|
||||||
|
|
||||||
|
$signature = "{$connection->socketId}:presence-channel:".json_encode($channelData);
|
||||||
|
|
||||||
|
$message = new Message(json_encode([
|
||||||
|
'event' => 'pusher:subscribe',
|
||||||
|
'data' => [
|
||||||
|
'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret),
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
'channel_data' => json_encode($channelData),
|
||||||
|
],
|
||||||
|
]));
|
||||||
|
|
||||||
|
$this->pusherServer->onMessage($connection, $message);
|
||||||
|
|
||||||
|
$connection->assertSentEvent('pusher_internal:subscription_succeeded', [
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
]);
|
||||||
|
|
||||||
|
$message = new Message(json_encode([
|
||||||
|
'event' => 'pusher:unsubscribe',
|
||||||
|
'data' => [
|
||||||
|
'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret),
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
],
|
||||||
|
]));
|
||||||
|
|
||||||
|
$this->pusherServer->onMessage($connection, $message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @test */
|
||||||
|
public function clients_with_valid_auth_signatures_cannot_leave_channels_they_are_not_in()
|
||||||
|
{
|
||||||
|
$connection = $this->getWebSocketConnection();
|
||||||
|
|
||||||
|
$this->pusherServer->onOpen($connection);
|
||||||
|
|
||||||
|
$channelData = [
|
||||||
|
'user_id' => 1,
|
||||||
|
'user_info' => [
|
||||||
|
'name' => 'Marcel',
|
||||||
|
],
|
||||||
|
];
|
||||||
|
|
||||||
|
$signature = "{$connection->socketId}:presence-channel:".json_encode($channelData);
|
||||||
|
|
||||||
|
$message = new Message(json_encode([
|
||||||
|
'event' => 'pusher:unsubscribe',
|
||||||
|
'data' => [
|
||||||
|
'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret),
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
],
|
||||||
|
]));
|
||||||
|
|
||||||
|
$this->pusherServer->onMessage($connection, $message);
|
||||||
|
|
||||||
|
$this->markTestAsPassed();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ class AppTest extends TestCase
|
||||||
/** @test */
|
/** @test */
|
||||||
public function it_can_create_a_client()
|
public function it_can_create_a_client()
|
||||||
{
|
{
|
||||||
new App(1, 'appKey', 'appSecret', 'new');
|
new App(1, 'appKey', 'appSecret');
|
||||||
|
|
||||||
$this->markTestAsPassed();
|
$this->markTestAsPassed();
|
||||||
}
|
}
|
||||||
|
|
@ -21,7 +21,7 @@ class AppTest extends TestCase
|
||||||
{
|
{
|
||||||
$this->expectException(InvalidApp::class);
|
$this->expectException(InvalidApp::class);
|
||||||
|
|
||||||
new App(1, '', 'appSecret', 'new');
|
new App(1, '', 'appSecret');
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @test */
|
/** @test */
|
||||||
|
|
@ -29,6 +29,6 @@ class AppTest extends TestCase
|
||||||
{
|
{
|
||||||
$this->expectException(InvalidApp::class);
|
$this->expectException(InvalidApp::class);
|
||||||
|
|
||||||
new App(1, 'appKey', '', 'new');
|
new App(1, 'appKey', '');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\HttpApi;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class FetchChannelReplicationTest extends FetchChannelTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -66,6 +66,38 @@ class FetchChannelTest extends TestCase
|
||||||
], json_decode($response->getContent(), true));
|
], json_decode($response->getContent(), true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @test */
|
||||||
|
public function it_returns_presence_channel_information()
|
||||||
|
{
|
||||||
|
$this->joinPresenceChannel('presence-channel');
|
||||||
|
$this->joinPresenceChannel('presence-channel');
|
||||||
|
|
||||||
|
$connection = new Connection();
|
||||||
|
|
||||||
|
$requestPath = '/apps/1234/channel/my-channel';
|
||||||
|
$routeParams = [
|
||||||
|
'appId' => '1234',
|
||||||
|
'channelName' => 'presence-channel',
|
||||||
|
];
|
||||||
|
|
||||||
|
$queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath);
|
||||||
|
|
||||||
|
$request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams));
|
||||||
|
|
||||||
|
$controller = app(FetchChannelController::class);
|
||||||
|
|
||||||
|
$controller->onOpen($connection, $request);
|
||||||
|
|
||||||
|
/** @var JsonResponse $response */
|
||||||
|
$response = array_pop($connection->sentRawData);
|
||||||
|
|
||||||
|
$this->assertSame([
|
||||||
|
'occupied' => true,
|
||||||
|
'subscription_count' => 2,
|
||||||
|
'user_count' => 2,
|
||||||
|
], json_decode($response->getContent(), true));
|
||||||
|
}
|
||||||
|
|
||||||
/** @test */
|
/** @test */
|
||||||
public function it_returns_404_for_invalid_channels()
|
public function it_returns_404_for_invalid_channels()
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\HttpApi;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class FetchChannelsReplicationTest extends FetchChannelsTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\HttpApi;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class FetchUsersReplicationTest extends FetchUsersTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -49,6 +49,7 @@ abstract class TestCase extends \Orchestra\Testbench\TestCase
|
||||||
'id' => 1234,
|
'id' => 1234,
|
||||||
'key' => 'TestKey',
|
'key' => 'TestKey',
|
||||||
'secret' => 'TestSecret',
|
'secret' => 'TestSecret',
|
||||||
|
'host' => 'localhost',
|
||||||
'capacity' => null,
|
'capacity' => null,
|
||||||
'enable_client_messages' => false,
|
'enable_client_messages' => false,
|
||||||
'enable_statistics' => true,
|
'enable_statistics' => true,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests;
|
||||||
|
|
||||||
|
use React\EventLoop\Factory;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\Fake\FakeReplication;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
||||||
|
|
||||||
|
trait TestsReplication
|
||||||
|
{
|
||||||
|
public function setupReplication()
|
||||||
|
{
|
||||||
|
app()->singleton(ReplicationInterface::class, function () {
|
||||||
|
return (new FakeReplication())->boot(Factory::create());
|
||||||
|
});
|
||||||
|
|
||||||
|
config([
|
||||||
|
'websockets.replication.enabled' => true,
|
||||||
|
'websockets.replication.driver' => 'fake',
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue