diff --git a/src/HttpApi/Controllers/FetchChannelController.php b/src/HttpApi/Controllers/FetchChannelController.php index 6a24fd5..188e08c 100644 --- a/src/HttpApi/Controllers/FetchChannelController.php +++ b/src/HttpApi/Controllers/FetchChannelController.php @@ -15,6 +15,6 @@ class FetchChannelController extends Controller throw new HttpException(404, "Unknown channel `{$request->channelName}`."); } - return $channel->toArray(); + return $channel->toArray($request->appId); } } diff --git a/src/PubSub/Fake/FakeReplication.php b/src/PubSub/Fake/FakeReplication.php new file mode 100644 index 0000000..5b3e429 --- /dev/null +++ b/src/PubSub/Fake/FakeReplication.php @@ -0,0 +1,126 @@ +channels["$appId:$channel"][$socketId] = $data; + } + + /** + * Remove a member from the channel. To be called when they have + * unsubscribed from the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + */ + public function leaveChannel(string $appId, string $channel, string $socketId) + { + unset($this->channels["$appId:$channel"][$socketId]); + if (empty($this->channels["$appId:$channel"])) { + unset($this->channels["$appId:$channel"]); + } + } + + /** + * Retrieve the full information about the members in a presence channel. + * + * @param string $appId + * @param string $channel + * @return PromiseInterface + */ + public function channelMembers(string $appId, string $channel) : PromiseInterface + { + $data = array_map(function ($user) { + return json_decode($user); + }, $this->channels["$appId:$channel"]); + + return new FulfilledPromise($data); + } + + /** + * Get the amount of users subscribed for each presence channel. + * + * @param string $appId + * @param array $channelNames + * @return PromiseInterface + */ + public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface + { + $data = []; + + foreach ($channelNames as $channel) { + $data[$channel] = count($this->channels["$appId:$channel"]); + } + + return new FulfilledPromise($data); + } +} diff --git a/src/PubSub/Redis/RedisClient.php b/src/PubSub/Redis/RedisClient.php index a2ea8db..4cc3e18 100644 --- a/src/PubSub/Redis/RedisClient.php +++ b/src/PubSub/Redis/RedisClient.php @@ -223,9 +223,9 @@ class RedisClient implements ReplicationInterface return $this->publishClient->__call('hgetall', ["$appId:$channel"]) ->then(function ($members) { // The data is expected as objects, so we need to JSON decode - return array_walk($members, function ($user) { + return array_map(function ($user) { return json_decode($user); - }); + }, $members); }); } diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index e515e5c..3e120af 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -29,6 +29,7 @@ interface ReplicationInterface /** * Subscribe to receive messages for a channel. * + * @param string $appId * @param string $channel * @return bool */ @@ -37,6 +38,7 @@ interface ReplicationInterface /** * Unsubscribe from a channel. * + * @param string $appId * @param string $channel * @return bool */ diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index b5c8413..87e81e0 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -115,7 +115,7 @@ class Channel if (config('websockets.replication.enabled') === true) { // Also broadcast via the other websocket servers app(ReplicationInterface::class) - ->publish($connection->app->id, $payload); + ->publish($connection->app->id, $this->channelName, $payload); } $this->broadcastToEveryoneExcept($payload, $connection->socketId); @@ -139,7 +139,7 @@ class Channel } } - public function toArray() + public function toArray(string $appId = null) { return [ 'occupied' => count($this->subscribedConnections) > 0, diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index 21cab87..b382bb6 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -82,7 +82,7 @@ class PresenceChannel extends Channel ])); } - $this->broadcastToOthers($connection, [ + $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_added', 'channel' => $this->channelName, 'data' => json_encode($channelData), @@ -107,7 +107,7 @@ class PresenceChannel extends Channel ); } - $this->broadcastToOthers($connection, [ + $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_removed', 'channel' => $this->channelName, 'data' => json_encode([ @@ -119,6 +119,7 @@ class PresenceChannel extends Channel } /** + * @param string|null $appId * @return PromiseInterface|array */ public function toArray(string $appId = null) diff --git a/tests/Channels/ChannelReplicationTest.php b/tests/Channels/ChannelReplicationTest.php new file mode 100644 index 0000000..f8e0872 --- /dev/null +++ b/tests/Channels/ChannelReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/Channels/ChannelTest.php b/tests/Channels/ChannelTest.php index 41272fa..ebaac75 100644 --- a/tests/Channels/ChannelTest.php +++ b/tests/Channels/ChannelTest.php @@ -123,7 +123,7 @@ class ChannelTest extends TestCase $channel = $this->getChannel($connection1, 'test-channel'); - $channel->broadcastToOthers($connection1, [ + $channel->broadcastToOthers($connection1, (object) [ 'event' => 'broadcasted-event', 'channel' => 'test-channel', ]); diff --git a/tests/Channels/PresenceChannelReplicationTest.php b/tests/Channels/PresenceChannelReplicationTest.php new file mode 100644 index 0000000..7070271 --- /dev/null +++ b/tests/Channels/PresenceChannelReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/Channels/PresenceChannelTest.php b/tests/Channels/PresenceChannelTest.php index 8a86560..6add602 100644 --- a/tests/Channels/PresenceChannelTest.php +++ b/tests/Channels/PresenceChannelTest.php @@ -59,4 +59,75 @@ class PresenceChannelTest extends TestCase 'channel' => 'presence-channel', ]); } + + /** @test */ + public function clients_with_valid_auth_signatures_can_leave_presence_channels() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $channelData = [ + 'user_id' => 1, + 'user_info' => [ + 'name' => 'Marcel', + ], + ]; + + $signature = "{$connection->socketId}:presence-channel:".json_encode($channelData); + + $message = new Message(json_encode([ + 'event' => 'pusher:subscribe', + 'data' => [ + 'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret), + 'channel' => 'presence-channel', + 'channel_data' => json_encode($channelData), + ], + ])); + + $this->pusherServer->onMessage($connection, $message); + + $connection->assertSentEvent('pusher_internal:subscription_succeeded', [ + 'channel' => 'presence-channel', + ]); + + $message = new Message(json_encode([ + 'event' => 'pusher:unsubscribe', + 'data' => [ + 'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret), + 'channel' => 'presence-channel', + ], + ])); + + $this->pusherServer->onMessage($connection, $message); + } + + /** @test */ + public function clients_with_valid_auth_signatures_cannot_leave_channels_they_are_not_in() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $channelData = [ + 'user_id' => 1, + 'user_info' => [ + 'name' => 'Marcel', + ], + ]; + + $signature = "{$connection->socketId}:presence-channel:".json_encode($channelData); + + $message = new Message(json_encode([ + 'event' => 'pusher:unsubscribe', + 'data' => [ + 'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret), + 'channel' => 'presence-channel', + ], + ])); + + $this->pusherServer->onMessage($connection, $message); + + $this->markTestAsPassed(); + } } diff --git a/tests/ClientProviders/AppTest.php b/tests/ClientProviders/AppTest.php index 71393d7..73345ac 100644 --- a/tests/ClientProviders/AppTest.php +++ b/tests/ClientProviders/AppTest.php @@ -11,7 +11,7 @@ class AppTest extends TestCase /** @test */ public function it_can_create_a_client() { - new App(1, 'appKey', 'appSecret', 'new'); + new App(1, 'appKey', 'appSecret'); $this->markTestAsPassed(); } @@ -21,7 +21,7 @@ class AppTest extends TestCase { $this->expectException(InvalidApp::class); - new App(1, '', 'appSecret', 'new'); + new App(1, '', 'appSecret'); } /** @test */ @@ -29,6 +29,6 @@ class AppTest extends TestCase { $this->expectException(InvalidApp::class); - new App(1, 'appKey', '', 'new'); + new App(1, 'appKey', ''); } } diff --git a/tests/HttpApi/FetchChannelReplicationTest.php b/tests/HttpApi/FetchChannelReplicationTest.php new file mode 100644 index 0000000..84f4c51 --- /dev/null +++ b/tests/HttpApi/FetchChannelReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/HttpApi/FetchChannelTest.php b/tests/HttpApi/FetchChannelTest.php index 50fcaf1..dd4abf2 100644 --- a/tests/HttpApi/FetchChannelTest.php +++ b/tests/HttpApi/FetchChannelTest.php @@ -66,6 +66,38 @@ class FetchChannelTest extends TestCase ], json_decode($response->getContent(), true)); } + /** @test */ + public function it_returns_presence_channel_information() + { + $this->joinPresenceChannel('presence-channel'); + $this->joinPresenceChannel('presence-channel'); + + $connection = new Connection(); + + $requestPath = '/apps/1234/channel/my-channel'; + $routeParams = [ + 'appId' => '1234', + 'channelName' => 'presence-channel', + ]; + + $queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath); + + $request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams)); + + $controller = app(FetchChannelController::class); + + $controller->onOpen($connection, $request); + + /** @var JsonResponse $response */ + $response = array_pop($connection->sentRawData); + + $this->assertSame([ + 'occupied' => true, + 'subscription_count' => 2, + 'user_count' => 2, + ], json_decode($response->getContent(), true)); + } + /** @test */ public function it_returns_404_for_invalid_channels() { diff --git a/tests/HttpApi/FetchChannelsReplicationTest.php b/tests/HttpApi/FetchChannelsReplicationTest.php new file mode 100644 index 0000000..24eb9b4 --- /dev/null +++ b/tests/HttpApi/FetchChannelsReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/HttpApi/FetchUsersReplicationTest.php b/tests/HttpApi/FetchUsersReplicationTest.php new file mode 100644 index 0000000..2d959a8 --- /dev/null +++ b/tests/HttpApi/FetchUsersReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 14d3655..7b00aed 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -49,6 +49,7 @@ abstract class TestCase extends \Orchestra\Testbench\TestCase 'id' => 1234, 'key' => 'TestKey', 'secret' => 'TestSecret', + 'host' => 'localhost', 'capacity' => null, 'enable_client_messages' => false, 'enable_statistics' => true, diff --git a/tests/TestsReplication.php b/tests/TestsReplication.php new file mode 100644 index 0000000..c0fa2f0 --- /dev/null +++ b/tests/TestsReplication.php @@ -0,0 +1,22 @@ +singleton(ReplicationInterface::class, function () { + return (new FakeReplication())->boot(Factory::create()); + }); + + config([ + 'websockets.replication.enabled' => true, + 'websockets.replication.driver' => 'fake', + ]); + } +}