From 97e215b68eddc5263af26ccf863bc03cea4eb81d Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Wed, 2 Sep 2020 14:44:44 +0300 Subject: [PATCH] Making channels easily extendable by replacing contents with traits. --- src/Concerns/Channelable.php | 242 ++++++++++++++++++++ src/Concerns/PresencelyChannelable.php | 178 ++++++++++++++ src/Concerns/PrivatelyChannelable.php | 27 +++ src/WebSockets/Channels/Channel.php | 234 +------------------ src/WebSockets/Channels/PresenceChannel.php | 172 +------------- src/WebSockets/Channels/PrivateChannel.php | 20 +- 6 files changed, 453 insertions(+), 420 deletions(-) create mode 100644 src/Concerns/Channelable.php create mode 100644 src/Concerns/PresencelyChannelable.php create mode 100644 src/Concerns/PrivatelyChannelable.php diff --git a/src/Concerns/Channelable.php b/src/Concerns/Channelable.php new file mode 100644 index 0000000..979f2e8 --- /dev/null +++ b/src/Concerns/Channelable.php @@ -0,0 +1,242 @@ +channelName = $channelName; + $this->replicator = app(ReplicationInterface::class); + } + + /** + * Get the channel name. + * + * @return string + */ + public function getChannelName(): string + { + return $this->channelName; + } + + /** + * Check if the channel has connections. + * + * @return bool + */ + public function hasConnections(): bool + { + return count($this->subscribedConnections) > 0; + } + + /** + * Get all subscribed connections. + * + * @return array + */ + public function getSubscribedConnections(): array + { + return $this->subscribedConnections; + } + + /** + * Check if the signature for the payload is valid. + * + * @param \Ratchet\ConnectionInterface $connection + * @param \stdClass $payload + * @return void + * @throws InvalidSignature + */ + protected function verifySignature(ConnectionInterface $connection, stdClass $payload) + { + $signature = "{$connection->socketId}:{$this->channelName}"; + + if (isset($payload->channel_data)) { + $signature .= ":{$payload->channel_data}"; + } + + if (! hash_equals( + hash_hmac('sha256', $signature, $connection->app->secret), + Str::after($payload->auth, ':')) + ) { + throw new InvalidSignature(); + } + } + + /** + * Subscribe to the channel. + * + * @see https://pusher.com/docs/pusher_protocol#presence-channel-events + * @param \Ratchet\ConnectionInterface $connection + * @param \stdClass $payload + * @return void + */ + public function subscribe(ConnectionInterface $connection, stdClass $payload) + { + $this->saveConnection($connection); + + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->channelName, + ])); + + $this->replicator->subscribe($connection->app->id, $this->channelName); + + event(new ) + } + + /** + * Unsubscribe connection from the channel. + * + * @param \Ratchet\ConnectionInterface $connection + * @return void + */ + public function unsubscribe(ConnectionInterface $connection) + { + unset($this->subscribedConnections[$connection->socketId]); + + $this->replicator->unsubscribe($connection->app->id, $this->channelName); + + if (! $this->hasConnections()) { + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_VACATED, [ + 'socketId' => $connection->socketId, + 'channel' => $this->channelName, + ]); + } + } + + /** + * Store the connection to the subscribers list. + * + * @param \Ratchet\ConnectionInterface $connection + * @return void + */ + protected function saveConnection(ConnectionInterface $connection) + { + $hadConnectionsPreviously = $this->hasConnections(); + + $this->subscribedConnections[$connection->socketId] = $connection; + + if (! $hadConnectionsPreviously) { + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_OCCUPIED, [ + 'channel' => $this->channelName, + ]); + } + + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ + 'socketId' => $connection->socketId, + 'channel' => $this->channelName, + ]); + } + + /** + * Broadcast a payload to the subscribed connections. + * + * @param \stdClass $payload + * @return void + */ + public function broadcast($payload) + { + foreach ($this->subscribedConnections as $connection) { + $connection->send(json_encode($payload)); + } + } + + /** + * Broadcast the payload, but exclude the current connection. + * + * @param \Ratchet\ConnectionInterface $connection + * @param \stdClass $payload + * @return void + */ + public function broadcastToOthers(ConnectionInterface $connection, stdClass $payload) + { + $this->broadcastToEveryoneExcept( + $payload, $connection->socketId, $connection->app->id + ); + } + + /** + * Broadcast the payload, but exclude a specific socket id. + * + * @param \stdClass $payload + * @param string|null $socketId + * @param mixed $appId + * @param bool $publish + * @return void + */ + public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, $appId, bool $publish = true) + { + // Also broadcast via the other websocket server instances. + // This is set false in the Redis client because we don't want to cause a loop + // in this case. If this came from TriggerEventController, then we still want + // to publish to get the message out to other server instances. + if ($publish) { + $this->replicator->publish($appId, $this->channelName, $payload); + } + + // Performance optimization, if we don't have a socket ID, + // then we avoid running the if condition in the foreach loop below + // by calling broadcast() instead. + if (is_null($socketId)) { + $this->broadcast($payload); + + return; + } + + foreach ($this->subscribedConnections as $connection) { + if ($connection->socketId !== $socketId) { + $connection->send(json_encode($payload)); + } + } + } + + /** + * Convert the channel to array. + * + * @param mixed $appId + * @return array + */ + public function toArray($appId = null) + { + return [ + 'occupied' => count($this->subscribedConnections) > 0, + 'subscription_count' => count($this->subscribedConnections), + ]; + } +} diff --git a/src/Concerns/PresencelyChannelable.php b/src/Concerns/PresencelyChannelable.php new file mode 100644 index 0000000..08cc497 --- /dev/null +++ b/src/Concerns/PresencelyChannelable.php @@ -0,0 +1,178 @@ +replicator->channelMembers($appId, $this->channelName); + } + + /** + * Subscribe the connection to the channel. + * + * @param ConnectionInterface $connection + * @param stdClass $payload + * @return void + * @throws InvalidSignature + * @see https://pusher.com/docs/pusher_protocol#presence-channel-events + */ + public function subscribe(ConnectionInterface $connection, stdClass $payload) + { + $this->verifySignature($connection, $payload); + + $this->saveConnection($connection); + + $channelData = json_decode($payload->channel_data); + $this->users[$connection->socketId] = $channelData; + + // Add the connection as a member of the channel + $this->replicator->joinChannel( + $connection->app->id, + $this->channelName, + $connection->socketId, + json_encode($channelData) + ); + + // We need to pull the channel data from the replication backend, + // otherwise we won't be sending the full details of the channel + $this->replicator + ->channelMembers($connection->app->id, $this->channelName) + ->then(function ($users) use ($connection) { + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->channelName, + 'data' => json_encode($this->getChannelData($users)), + ])); + }); + + $this->broadcastToOthers($connection, (object) [ + 'event' => 'pusher_internal:member_added', + 'channel' => $this->channelName, + 'data' => json_encode($channelData), + ]); + } + + /** + * Unsubscribe the connection from the Presence channel. + * + * @param ConnectionInterface $connection + * @return void + */ + public function unsubscribe(ConnectionInterface $connection) + { + parent::unsubscribe($connection); + + if (! isset($this->users[$connection->socketId])) { + return; + } + + // Remove the connection as a member of the channel + $this->replicator + ->leaveChannel( + $connection->app->id, + $this->channelName, + $connection->socketId + ); + + $this->broadcastToOthers($connection, (object) [ + 'event' => 'pusher_internal:member_removed', + 'channel' => $this->channelName, + 'data' => json_encode([ + 'user_id' => $this->users[$connection->socketId]->user_id, + ]), + ]); + + unset($this->users[$connection->socketId]); + } + + /** + * Get the Presence Channel to array. + * + * @param string|null $appId + * @return PromiseInterface + */ + public function toArray($appId = null) + { + return $this->replicator + ->channelMembers($appId, $this->channelName) + ->then(function ($users) { + return array_merge(parent::toArray(), [ + 'user_count' => count($users), + ]); + }); + } + + /** + * Get the Presence channel data. + * + * @param array $users + * @return array + */ + protected function getChannelData(array $users): array + { + return [ + 'presence' => [ + 'ids' => $this->getUserIds($users), + 'hash' => $this->getHash($users), + 'count' => count($users), + ], + ]; + } + + /** + * Get the Presence Channel's users. + * + * @param array $users + * @return array + */ + protected function getUserIds(array $users): array + { + $userIds = array_map(function ($channelData) { + return (string) $channelData->user_id; + }, $users); + + return array_values($userIds); + } + + /** + * Compute the hash for the presence channel integrity. + * + * @param array $users + * @return array + */ + protected function getHash(array $users): array + { + $hash = []; + + foreach ($users as $socketId => $channelData) { + $hash[$channelData->user_id] = $channelData->user_info ?? []; + } + + return $hash; + } +} diff --git a/src/Concerns/PrivatelyChannelable.php b/src/Concerns/PrivatelyChannelable.php new file mode 100644 index 0000000..d4fbdaa --- /dev/null +++ b/src/Concerns/PrivatelyChannelable.php @@ -0,0 +1,27 @@ +verifySignature($connection, $payload); + + parent::subscribe($connection, $payload); + } +} diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 2828d8a..302e151 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -2,239 +2,9 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels; -use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger; -use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; -use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; -use Illuminate\Support\Str; -use Ratchet\ConnectionInterface; -use stdClass; +use BeyondCode\LaravelWebSockets\Concerns\Channelable; class Channel { - /** - * The channel name. - * - * @var string - */ - protected $channelName; - - /** - * The replicator client. - * - * @var ReplicationInterface - */ - protected $replicator; - - /** - * The connections that got subscribed. - * - * @var array - */ - protected $subscribedConnections = []; - - /** - * Create a new instance. - * - * @param string $channelName - * @return void - */ - public function __construct(string $channelName) - { - $this->channelName = $channelName; - $this->replicator = app(ReplicationInterface::class); - } - - /** - * Get the channel name. - * - * @return string - */ - public function getChannelName(): string - { - return $this->channelName; - } - - /** - * Check if the channel has connections. - * - * @return bool - */ - public function hasConnections(): bool - { - return count($this->subscribedConnections) > 0; - } - - /** - * Get all subscribed connections. - * - * @return array - */ - public function getSubscribedConnections(): array - { - return $this->subscribedConnections; - } - - /** - * Check if the signature for the payload is valid. - * - * @param \Ratchet\ConnectionInterface $connection - * @param \stdClass $payload - * @return void - * @throws InvalidSignature - */ - protected function verifySignature(ConnectionInterface $connection, stdClass $payload) - { - $signature = "{$connection->socketId}:{$this->channelName}"; - - if (isset($payload->channel_data)) { - $signature .= ":{$payload->channel_data}"; - } - - if (! hash_equals( - hash_hmac('sha256', $signature, $connection->app->secret), - Str::after($payload->auth, ':')) - ) { - throw new InvalidSignature(); - } - } - - /** - * Subscribe to the channel. - * - * @see https://pusher.com/docs/pusher_protocol#presence-channel-events - * @param \Ratchet\ConnectionInterface $connection - * @param \stdClass $payload - * @return void - */ - public function subscribe(ConnectionInterface $connection, stdClass $payload) - { - $this->saveConnection($connection); - - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->channelName, - ])); - - $this->replicator->subscribe($connection->app->id, $this->channelName); - } - - /** - * Unsubscribe connection from the channel. - * - * @param \Ratchet\ConnectionInterface $connection - * @return void - */ - public function unsubscribe(ConnectionInterface $connection) - { - unset($this->subscribedConnections[$connection->socketId]); - - $this->replicator->unsubscribe($connection->app->id, $this->channelName); - - if (! $this->hasConnections()) { - DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_VACATED, [ - 'socketId' => $connection->socketId, - 'channel' => $this->channelName, - ]); - } - } - - /** - * Store the connection to the subscribers list. - * - * @param \Ratchet\ConnectionInterface $connection - * @return void - */ - protected function saveConnection(ConnectionInterface $connection) - { - $hadConnectionsPreviously = $this->hasConnections(); - - $this->subscribedConnections[$connection->socketId] = $connection; - - if (! $hadConnectionsPreviously) { - DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_OCCUPIED, [ - 'channel' => $this->channelName, - ]); - } - - DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ - 'socketId' => $connection->socketId, - 'channel' => $this->channelName, - ]); - } - - /** - * Broadcast a payload to the subscribed connections. - * - * @param \stdClass $payload - * @return void - */ - public function broadcast($payload) - { - foreach ($this->subscribedConnections as $connection) { - $connection->send(json_encode($payload)); - } - } - - /** - * Broadcast the payload, but exclude the current connection. - * - * @param \Ratchet\ConnectionInterface $connection - * @param \stdClass $payload - * @return void - */ - public function broadcastToOthers(ConnectionInterface $connection, stdClass $payload) - { - $this->broadcastToEveryoneExcept( - $payload, $connection->socketId, $connection->app->id - ); - } - - /** - * Broadcast the payload, but exclude a specific socket id. - * - * @param \stdClass $payload - * @param string|null $socketId - * @param mixed $appId - * @param bool $publish - * @return void - */ - public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, $appId, bool $publish = true) - { - // Also broadcast via the other websocket server instances. - // This is set false in the Redis client because we don't want to cause a loop - // in this case. If this came from TriggerEventController, then we still want - // to publish to get the message out to other server instances. - if ($publish) { - $this->replicator->publish($appId, $this->channelName, $payload); - } - - // Performance optimization, if we don't have a socket ID, - // then we avoid running the if condition in the foreach loop below - // by calling broadcast() instead. - if (is_null($socketId)) { - $this->broadcast($payload); - - return; - } - - foreach ($this->subscribedConnections as $connection) { - if ($connection->socketId !== $socketId) { - $connection->send(json_encode($payload)); - } - } - } - - /** - * Convert the channel to array. - * - * @param mixed $appId - * @return array - */ - public function toArray($appId = null) - { - return [ - 'occupied' => count($this->subscribedConnections) > 0, - 'subscription_count' => count($this->subscribedConnections), - ]; - } + use Channelable; } diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index a3e58aa..a29e75d 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -2,177 +2,9 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels; -use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; -use Ratchet\ConnectionInterface; -use React\Promise\PromiseInterface; -use stdClass; +use BeyondCode\LaravelWebSockets\Concerns\PresencelyChannelable; class PresenceChannel extends Channel { - /** - * Data for the users connected to this channel. - * - * Note: If replication is enabled, this will only contain entries - * for the users directly connected to this server instance. Requests - * for data for all users in the channel should be routed through - * ReplicationInterface. - * - * @var string[] - */ - protected $users = []; - - /** - * Get the members in the presence channel. - * - * @param string $appId - * @return PromiseInterface - */ - public function getUsers($appId) - { - return $this->replicator->channelMembers($appId, $this->channelName); - } - - /** - * Subscribe the connection to the channel. - * - * @param ConnectionInterface $connection - * @param stdClass $payload - * @return void - * @throws InvalidSignature - * @see https://pusher.com/docs/pusher_protocol#presence-channel-events - */ - public function subscribe(ConnectionInterface $connection, stdClass $payload) - { - $this->verifySignature($connection, $payload); - - $this->saveConnection($connection); - - $channelData = json_decode($payload->channel_data); - $this->users[$connection->socketId] = $channelData; - - // Add the connection as a member of the channel - $this->replicator->joinChannel( - $connection->app->id, - $this->channelName, - $connection->socketId, - json_encode($channelData) - ); - - // We need to pull the channel data from the replication backend, - // otherwise we won't be sending the full details of the channel - $this->replicator - ->channelMembers($connection->app->id, $this->channelName) - ->then(function ($users) use ($connection) { - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->channelName, - 'data' => json_encode($this->getChannelData($users)), - ])); - }); - - $this->broadcastToOthers($connection, (object) [ - 'event' => 'pusher_internal:member_added', - 'channel' => $this->channelName, - 'data' => json_encode($channelData), - ]); - } - - /** - * Unsubscribe the connection from the Presence channel. - * - * @param ConnectionInterface $connection - * @return void - */ - public function unsubscribe(ConnectionInterface $connection) - { - parent::unsubscribe($connection); - - if (! isset($this->users[$connection->socketId])) { - return; - } - - // Remove the connection as a member of the channel - $this->replicator - ->leaveChannel( - $connection->app->id, - $this->channelName, - $connection->socketId - ); - - $this->broadcastToOthers($connection, (object) [ - 'event' => 'pusher_internal:member_removed', - 'channel' => $this->channelName, - 'data' => json_encode([ - 'user_id' => $this->users[$connection->socketId]->user_id, - ]), - ]); - - unset($this->users[$connection->socketId]); - } - - /** - * Get the Presence Channel to array. - * - * @param string|null $appId - * @return PromiseInterface - */ - public function toArray($appId = null) - { - return $this->replicator - ->channelMembers($appId, $this->channelName) - ->then(function ($users) { - return array_merge(parent::toArray(), [ - 'user_count' => count($users), - ]); - }); - } - - /** - * Get the Presence channel data. - * - * @param array $users - * @return array - */ - protected function getChannelData(array $users): array - { - return [ - 'presence' => [ - 'ids' => $this->getUserIds($users), - 'hash' => $this->getHash($users), - 'count' => count($users), - ], - ]; - } - - /** - * Get the Presence Channel's users. - * - * @param array $users - * @return array - */ - protected function getUserIds(array $users): array - { - $userIds = array_map(function ($channelData) { - return (string) $channelData->user_id; - }, $users); - - return array_values($userIds); - } - - /** - * Compute the hash for the presence channel integrity. - * - * @param array $users - * @return array - */ - protected function getHash(array $users): array - { - $hash = []; - - foreach ($users as $socketId => $channelData) { - $hash[$channelData->user_id] = $channelData->user_info ?? []; - } - - return $hash; - } + use PresencelyChannelable; } diff --git a/src/WebSockets/Channels/PrivateChannel.php b/src/WebSockets/Channels/PrivateChannel.php index 5f84308..dfa7d30 100644 --- a/src/WebSockets/Channels/PrivateChannel.php +++ b/src/WebSockets/Channels/PrivateChannel.php @@ -2,25 +2,9 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels; -use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; -use Ratchet\ConnectionInterface; -use stdClass; +use BeyondCode\LaravelWebSockets\Concerns\PrivatelyChannelable; class PrivateChannel extends Channel { - /** - * Subscribe to the channel. - * - * @see https://pusher.com/docs/pusher_protocol#presence-channel-events - * @param \Ratchet\ConnectionInterface $connection - * @param \stdClass $payload - * @return void - * @throws InvalidSignature - */ - public function subscribe(ConnectionInterface $connection, stdClass $payload) - { - $this->verifySignature($connection, $payload); - - parent::subscribe($connection, $payload); - } + use PrivatelyChannelable; }