From 00e8f3e1a8450900749dcb3a047280d48ee79ad8 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 29 Jul 2019 16:20:48 -0400 Subject: [PATCH] Add channel storage to LocalDriver to simplify PresenceChannel logic --- .../Controllers/FetchChannelsController.php | 36 +++--- src/PubSub/Drivers/LocalClient.php | 33 ++++- src/PubSub/Drivers/RedisClient.php | 2 +- src/WebSockets/Channels/PresenceChannel.php | 113 ++++++++---------- 4 files changed, 94 insertions(+), 90 deletions(-) diff --git a/src/HttpApi/Controllers/FetchChannelsController.php b/src/HttpApi/Controllers/FetchChannelsController.php index 0ea9681..96f7141 100644 --- a/src/HttpApi/Controllers/FetchChannelsController.php +++ b/src/HttpApi/Controllers/FetchChannelsController.php @@ -32,30 +32,24 @@ class FetchChannelsController extends Controller }); } - if (config('websockets.replication.enabled') === true) { - // We want to get the channel user count all in one shot when - // using a replication backend rather than doing individual queries. - // To do so, we first collect the list of channel names. - $channelNames = $channels->map(function (PresenceChannel $channel) use ($request) { - return $channel->getChannelName(); - })->toArray(); + // We want to get the channel user count all in one shot when + // using a replication backend rather than doing individual queries. + // To do so, we first collect the list of channel names. + $channelNames = $channels->map(function (PresenceChannel $channel) use ($request) { + return $channel->getChannelName(); + })->toArray(); - /** @var PromiseInterface $memberCounts */ - // We ask the replication backend to get us the member count per channel - $memberCounts = app(ReplicationInterface::class) - ->channelMemberCounts($request->appId, $channelNames); + /** @var PromiseInterface $memberCounts */ + // We ask the replication backend to get us the member count per channel + $memberCounts = app(ReplicationInterface::class) + ->channelMemberCounts($request->appId, $channelNames); - // We return a promise since the backend runs async. We get $counts back - // as a key-value array of channel names and their member count. - return $memberCounts->then(function (array $counts) use ($channels, $attributes) { - return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) { - return $counts[$channel->getChannelName()]; - }); + // We return a promise since the backend runs async. We get $counts back + // as a key-value array of channel names and their member count. + return $memberCounts->then(function (array $counts) use ($channels, $attributes) { + return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) { + return $counts[$channel->getChannelName()]; }); - } - - return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) { - return $channel->getUserCount(); }); } diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php index f610a0f..2dfc1fa 100644 --- a/src/PubSub/Drivers/LocalClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -10,6 +10,13 @@ use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; class LocalClient implements ReplicationInterface { + /** + * Mapping of the presence JSON data for users in each channel + * + * @var string[][] + */ + protected $channelData = []; + /** * Boot the pub/sub provider (open connections, initial subscriptions, etc). * @@ -31,6 +38,7 @@ class LocalClient implements ReplicationInterface */ public function publish(string $appId, string $channel, stdClass $payload) : bool { + // Nothing to do, nobody to publish to return true; } @@ -69,6 +77,7 @@ class LocalClient implements ReplicationInterface */ public function joinChannel(string $appId, string $channel, string $socketId, string $data) { + $this->channelData["$appId:$channel"][$socketId] = $data; } /** @@ -81,6 +90,10 @@ class LocalClient implements ReplicationInterface */ public function leaveChannel(string $appId, string $channel, string $socketId) { + unset($this->channelData["$appId:$channel"][$socketId]); + if (empty($this->channelData["$appId:$channel"])) { + unset($this->channelData["$appId:$channel"]); + } } /** @@ -92,7 +105,14 @@ class LocalClient implements ReplicationInterface */ public function channelMembers(string $appId, string $channel) : PromiseInterface { - return new FulfilledPromise(null); + $members = $this->channelData["$appId:$channel"] ?? []; + + // The data is expected as objects, so we need to JSON decode + $members = array_map(function ($user) { + return json_decode($user); + }, $members); + + return new FulfilledPromise($members); } /** @@ -104,6 +124,15 @@ class LocalClient implements ReplicationInterface */ public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface { - return new FulfilledPromise(null); + $results = []; + + // Count the number of users per channel + foreach ($channelNames as $channel) { + $results[$channel] = isset($this->channelData["$appId:$channel"]) + ? count($this->channelData["$appId:$channel"]) + : 0; + } + + return new FulfilledPromise($results); } } diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index e4abe7c..ce9c8fb 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -97,7 +97,7 @@ class RedisClient implements ReplicationInterface // expect the channel name to not include the app ID. $payload->channel = Str::after($redisChannel, "$appId:"); - /* @var $channelManager ChannelManager */ + /* @var ChannelManager $channelManager */ $channelManager = app(ChannelManager::class); // Load the Channel instance, if any diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index b382bb6..895e96a 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -10,6 +10,16 @@ use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; class PresenceChannel extends Channel { + /** + * Data for the users connected to this channel + * + * Note: If replication is enabled, this will only contain entries + * for the users directly connected to this server instance. Requests + * for data for all users in the channel should be routed through + * ReplicationInterface. + * + * @var string[] + */ protected $users = []; /** @@ -18,21 +28,9 @@ class PresenceChannel extends Channel */ public function getUsers(string $appId) { - if (config('websockets.replication.enabled') === true) { - // Get the members list from the replication backend - return app(ReplicationInterface::class) - ->channelMembers($appId, $this->channelName); - } - - return $this->users; - } - - /** - * @return array - */ - public function getUserCount() - { - return count($this->users); + // Get the members list from the replication backend + return app(ReplicationInterface::class) + ->channelMembers($appId, $this->channelName); } /** @@ -51,36 +49,27 @@ class PresenceChannel extends Channel $channelData = json_decode($payload->channel_data); $this->users[$connection->socketId] = $channelData; - if (config('websockets.replication.enabled') === true) { - // Add the connection as a member of the channel - app(ReplicationInterface::class) - ->joinChannel( - $connection->app->id, - $this->channelName, - $connection->socketId, - json_encode($channelData) - ); + // Add the connection as a member of the channel + app(ReplicationInterface::class) + ->joinChannel( + $connection->app->id, + $this->channelName, + $connection->socketId, + json_encode($channelData) + ); - // We need to pull the channel data from the replication backend, - // otherwise we won't be sending the full details of the channel - app(ReplicationInterface::class) - ->channelMembers($connection->app->id, $this->channelName) - ->then(function ($users) use ($connection) { - // Send the success event - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->channelName, - 'data' => json_encode($this->getChannelData($users)), - ])); - }); - } else { - // Send the success event - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->channelName, - 'data' => json_encode($this->getChannelData($this->users)), - ])); - } + // We need to pull the channel data from the replication backend, + // otherwise we won't be sending the full details of the channel + app(ReplicationInterface::class) + ->channelMembers($connection->app->id, $this->channelName) + ->then(function ($users) use ($connection) { + // Send the success event + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->channelName, + 'data' => json_encode($this->getChannelData($users)), + ])); + }); $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_added', @@ -97,15 +86,13 @@ class PresenceChannel extends Channel return; } - if (config('websockets.replication.enabled') === true) { - // Remove the connection as a member of the channel - app(ReplicationInterface::class) - ->leaveChannel( - $connection->app->id, - $this->channelName, - $connection->socketId - ); - } + // Remove the connection as a member of the channel + app(ReplicationInterface::class) + ->leaveChannel( + $connection->app->id, + $this->channelName, + $connection->socketId + ); $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_removed', @@ -124,19 +111,13 @@ class PresenceChannel extends Channel */ public function toArray(string $appId = null) { - if (config('websockets.replication.enabled') === true) { - return app(ReplicationInterface::class) - ->channelMembers($appId, $this->channelName) - ->then(function ($users) { - return array_merge(parent::toArray(), [ - 'user_count' => count($users), - ]); - }); - } - - return array_merge(parent::toArray(), [ - 'user_count' => count($this->users), - ]); + return app(ReplicationInterface::class) + ->channelMembers($appId, $this->channelName) + ->then(function ($users) { + return array_merge(parent::toArray(), [ + 'user_count' => count($users), + ]); + }); } protected function getChannelData(array $users): array