From 5997dd4df8bf5267910d8bb339f83f813228b7bd Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Fri, 14 Aug 2020 08:42:17 +0300 Subject: [PATCH] wip docblocks --- src/PubSub/Drivers/LocalClient.php | 42 +++++++++-------- src/PubSub/Drivers/RedisClient.php | 45 +++++++++--------- src/PubSub/ReplicationInterface.php | 40 ++++++++-------- src/WebSockets/Channels/Channel.php | 6 +-- src/WebSockets/Channels/PresenceChannel.php | 52 ++++++++++++++------- 5 files changed, 105 insertions(+), 80 deletions(-) diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php index 22b2fe9..437ed98 100644 --- a/src/PubSub/Drivers/LocalClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -20,7 +20,7 @@ class LocalClient implements ReplicationInterface /** * Boot the pub/sub provider (open connections, initial subscriptions, etc). * - * @param LoopInterface $loop + * @param LoopInterface $loop * @return self */ public function boot(LoopInterface $loop): ReplicationInterface @@ -31,9 +31,9 @@ class LocalClient implements ReplicationInterface /** * Publish a payload on a specific channel, for a specific app. * - * @param string $appId - * @param string $channel - * @param stdClass $payload + * @param string $appId + * @param string $channel + * @param stdClass $payload * @return bool */ public function publish(string $appId, string $channel, stdClass $payload): bool @@ -44,8 +44,8 @@ class LocalClient implements ReplicationInterface /** * Subscribe to receive messages for a channel. * - * @param string $appId - * @param string $channel + * @param string $appId + * @param string $channel * @return bool */ public function subscribe(string $appId, string $channel): bool @@ -56,8 +56,8 @@ class LocalClient implements ReplicationInterface /** * Unsubscribe from a channel. * - * @param string $appId - * @param string $channel + * @param string $appId + * @param string $channel * @return bool */ public function unsubscribe(string $appId, string $channel): bool @@ -69,10 +69,11 @@ class LocalClient implements ReplicationInterface * Add a member to a channel. To be called when they have * subscribed to the channel. * - * @param string $appId - * @param string $channel - * @param string $socketId - * @param string $data + * @param string $appId + * @param string $channel + * @param string $socketId + * @param string $data + * @return void */ public function joinChannel(string $appId, string $channel, string $socketId, string $data) { @@ -83,13 +84,15 @@ class LocalClient implements ReplicationInterface * Remove a member from the channel. To be called when they have * unsubscribed from the channel. * - * @param string $appId - * @param string $channel - * @param string $socketId + * @param string $appId + * @param string $channel + * @param string $socketId + * @return void */ public function leaveChannel(string $appId, string $channel, string $socketId) { unset($this->channelData["$appId:$channel"][$socketId]); + if (empty($this->channelData["$appId:$channel"])) { unset($this->channelData["$appId:$channel"]); } @@ -98,15 +101,14 @@ class LocalClient implements ReplicationInterface /** * Retrieve the full information about the members in a presence channel. * - * @param string $appId - * @param string $channel + * @param string $appId + * @param string $channel * @return PromiseInterface */ public function channelMembers(string $appId, string $channel): PromiseInterface { $members = $this->channelData["$appId:$channel"] ?? []; - // The data is expected as objects, so we need to JSON decode $members = array_map(function ($user) { return json_decode($user); }, $members); @@ -117,8 +119,8 @@ class LocalClient implements ReplicationInterface /** * Get the amount of users subscribed for each presence channel. * - * @param string $appId - * @param array $channelNames + * @param string $appId + * @param array $channelNames * @return PromiseInterface */ public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index 672ce84..6d8aa28 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -54,7 +54,7 @@ class RedisClient implements ReplicationInterface /** * Boot the RedisClient, initializing the connections. * - * @param LoopInterface $loop + * @param LoopInterface $loop * @return ReplicationInterface */ public function boot(LoopInterface $loop): ReplicationInterface @@ -77,8 +77,9 @@ class RedisClient implements ReplicationInterface /** * Handle a message received from Redis on a specific channel. * - * @param string $redisChannel - * @param string $payload + * @param string $redisChannel + * @param string $payload + * @return void */ protected function onMessage(string $redisChannel, string $payload) { @@ -123,8 +124,8 @@ class RedisClient implements ReplicationInterface /** * Subscribe to a channel on behalf of websocket user. * - * @param string $appId - * @param string $channel + * @param string $appId + * @param string $channel * @return bool */ public function subscribe(string $appId, string $channel): bool @@ -144,8 +145,8 @@ class RedisClient implements ReplicationInterface /** * Unsubscribe from a channel on behalf of a websocket user. * - * @param string $appId - * @param string $channel + * @param string $appId + * @param string $channel * @return bool */ public function unsubscribe(string $appId, string $channel): bool @@ -169,9 +170,9 @@ class RedisClient implements ReplicationInterface /** * Publish a message to a channel on behalf of a websocket user. * - * @param string $appId - * @param string $channel - * @param stdClass $payload + * @param string $appId + * @param string $channel + * @param stdClass $payload * @return bool */ public function publish(string $appId, string $channel, stdClass $payload): bool @@ -188,10 +189,11 @@ class RedisClient implements ReplicationInterface * Add a member to a channel. To be called when they have * subscribed to the channel. * - * @param string $appId - * @param string $channel - * @param string $socketId - * @param string $data + * @param string $appId + * @param string $channel + * @param string $socketId + * @param string $data + * @return void */ public function joinChannel(string $appId, string $channel, string $socketId, string $data) { @@ -202,9 +204,10 @@ class RedisClient implements ReplicationInterface * Remove a member from the channel. To be called when they have * unsubscribed from the channel. * - * @param string $appId - * @param string $channel - * @param string $socketId + * @param string $appId + * @param string $channel + * @param string $socketId + * @return void */ public function leaveChannel(string $appId, string $channel, string $socketId) { @@ -214,8 +217,8 @@ class RedisClient implements ReplicationInterface /** * Retrieve the full information about the members in a presence channel. * - * @param string $appId - * @param string $channel + * @param string $appId + * @param string $channel * @return PromiseInterface */ public function channelMembers(string $appId, string $channel): PromiseInterface @@ -232,8 +235,8 @@ class RedisClient implements ReplicationInterface /** * Get the amount of users subscribed for each presence channel. * - * @param string $appId - * @param array $channelNames + * @param string $appId + * @param array $channelNames * @return PromiseInterface */ public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index cd1a50c..f40b445 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -11,7 +11,7 @@ interface ReplicationInterface /** * Boot the pub/sub provider (open connections, initial subscriptions, etc). * - * @param LoopInterface $loop + * @param LoopInterface $loop * @return self */ public function boot(LoopInterface $loop): self; @@ -19,9 +19,9 @@ interface ReplicationInterface /** * Publish a payload on a specific channel, for a specific app. * - * @param string $appId - * @param string $channel - * @param stdClass $payload + * @param string $appId + * @param string $channel + * @param stdClass $payload * @return bool */ public function publish(string $appId, string $channel, stdClass $payload): bool; @@ -29,8 +29,8 @@ interface ReplicationInterface /** * Subscribe to receive messages for a channel. * - * @param string $appId - * @param string $channel + * @param string $appId + * @param string $channel * @return bool */ public function subscribe(string $appId, string $channel): bool; @@ -38,8 +38,8 @@ interface ReplicationInterface /** * Unsubscribe from a channel. * - * @param string $appId - * @param string $channel + * @param string $appId + * @param string $channel * @return bool */ public function unsubscribe(string $appId, string $channel): bool; @@ -48,10 +48,11 @@ interface ReplicationInterface * Add a member to a channel. To be called when they have * subscribed to the channel. * - * @param string $appId - * @param string $channel - * @param string $socketId - * @param string $data + * @param string $appId + * @param string $channel + * @param string $socketId + * @param string $data + * @return void */ public function joinChannel(string $appId, string $channel, string $socketId, string $data); @@ -59,17 +60,18 @@ interface ReplicationInterface * Remove a member from the channel. To be called when they have * unsubscribed from the channel. * - * @param string $appId - * @param string $channel - * @param string $socketId + * @param string $appId + * @param string $channel + * @param string $socketId + * @return void */ public function leaveChannel(string $appId, string $channel, string $socketId); /** * Retrieve the full information about the members in a presence channel. * - * @param string $appId - * @param string $channel + * @param string $appId + * @param string $channel * @return PromiseInterface */ public function channelMembers(string $appId, string $channel): PromiseInterface; @@ -77,8 +79,8 @@ interface ReplicationInterface /** * Get the amount of users subscribed for each presence channel. * - * @param string $appId - * @param array $channelNames + * @param string $appId + * @param array $channelNames * @return PromiseInterface */ public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface; diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 9f26f16..8e301c1 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -67,20 +67,18 @@ class Channel { $this->saveConnection($connection); - // Subscribe to broadcasted messages from the pub/sub backend - $this->replicator->subscribe($connection->app->id, $this->channelName); - $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', 'channel' => $this->channelName, ])); + + $this->replicator->subscribe($connection->app->id, $this->channelName); } public function unsubscribe(ConnectionInterface $connection) { unset($this->subscribedConnections[$connection->socketId]); - // Unsubscribe from the pub/sub backend $this->replicator->unsubscribe($connection->app->id, $this->channelName); if (! $this->hasConnections()) { diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index a4b94e9..3217566 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -22,22 +22,24 @@ class PresenceChannel extends Channel protected $users = []; /** - * @param string $appId + * Get the members in the presence channel. + * + * @param string $appId * @return PromiseInterface */ public function getUsers(string $appId) { - // Get the members list from the replication backend - return $this->replicator - ->channelMembers($appId, $this->channelName); + return $this->replicator->channelMembers($appId, $this->channelName); } /** - * @link https://pusher.com/docs/pusher_protocol#presence-channel-events + * Subscribe the connection to the channel. * - * @param ConnectionInterface $connection - * @param stdClass $payload + * @param ConnectionInterface $connection + * @param stdClass $payload + * @return void * @throws InvalidSignature + * @see https://pusher.com/docs/pusher_protocol#presence-channel-events */ public function subscribe(ConnectionInterface $connection, stdClass $payload) { @@ -49,20 +51,18 @@ class PresenceChannel extends Channel $this->users[$connection->socketId] = $channelData; // Add the connection as a member of the channel - $this->replicator - ->joinChannel( - $connection->app->id, - $this->channelName, - $connection->socketId, - json_encode($channelData) - ); + $this->replicator->joinChannel( + $connection->app->id, + $this->channelName, + $connection->socketId, + json_encode($channelData) + ); // We need to pull the channel data from the replication backend, // otherwise we won't be sending the full details of the channel $this->replicator ->channelMembers($connection->app->id, $this->channelName) ->then(function ($users) use ($connection) { - // Send the success event $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', 'channel' => $this->channelName, @@ -77,6 +77,12 @@ class PresenceChannel extends Channel ]); } + /** + * Unsubscribe the connection from the Presence channel. + * + * @param ConnectionInterface $connection + * @return void + */ public function unsubscribe(ConnectionInterface $connection) { parent::unsubscribe($connection); @@ -105,7 +111,9 @@ class PresenceChannel extends Channel } /** - * @param string|null $appId + * Get the Presence Channel to array. + * + * @param string|null $appId * @return PromiseInterface */ public function toArray(string $appId = null) @@ -119,6 +127,12 @@ class PresenceChannel extends Channel }); } + /** + * Get the Presence channel data. + * + * @param array $users + * @return array + */ protected function getChannelData(array $users): array { return [ @@ -130,6 +144,12 @@ class PresenceChannel extends Channel ]; } + /** + * Get the Presence Channel's users. + * + * @param array $users + * @return array + */ protected function getUserIds(array $users): array { $userIds = array_map(function ($channelData) {