diff --git a/src/HttpApi/Controllers/Controller.php b/src/HttpApi/Controllers/Controller.php index 863a507..7be3d89 100644 --- a/src/HttpApi/Controllers/Controller.php +++ b/src/HttpApi/Controllers/Controller.php @@ -11,6 +11,7 @@ use Ratchet\ConnectionInterface; use Illuminate\Http\JsonResponse; use GuzzleHttp\Psr7\ServerRequest; use Illuminate\Support\Collection; +use React\Promise\PromiseInterface; use Ratchet\Http\HttpServerInterface; use Psr\Http\Message\RequestInterface; use BeyondCode\LaravelWebSockets\Apps\App; @@ -30,7 +31,7 @@ abstract class Controller implements HttpServerInterface /** @var int */ protected $contentLength; - /** @var \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager */ + /** @var ChannelManager */ protected $channelManager; public function __construct(ChannelManager $channelManager) @@ -92,8 +93,23 @@ abstract class Controller implements HttpServerInterface ->ensureValidAppId($laravelRequest->appId) ->ensureValidSignature($laravelRequest); + // Invoke the controller action $response = $this($laravelRequest); + // Allow for async IO in the controller action + if ($response instanceof PromiseInterface) { + $response->then(function ($response) use ($connection) { + $this->sendAndClose($connection, $response); + }); + + return; + } + + $this->sendAndClose($connection, $response); + } + + protected function sendAndClose(ConnectionInterface $connection, $response) + { $connection->send(JsonResponse::create($response)); $connection->close(); } diff --git a/src/HttpApi/Controllers/FetchChannelsController.php b/src/HttpApi/Controllers/FetchChannelsController.php index c57efe7..73a8289 100644 --- a/src/HttpApi/Controllers/FetchChannelsController.php +++ b/src/HttpApi/Controllers/FetchChannelsController.php @@ -5,6 +5,9 @@ namespace BeyondCode\LaravelWebSockets\HttpApi\Controllers; use Illuminate\Support\Str; use Illuminate\Http\Request; use Illuminate\Support\Collection; +use React\Promise\PromiseInterface; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; +use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel; use Symfony\Component\HttpKernel\Exception\HttpException; class FetchChannelsController extends Controller @@ -29,13 +32,42 @@ class FetchChannelsController extends Controller }); } + if (config('websockets.replication.enabled') === true) { + // We want to get the channel user count all in one shot when + // using a replication backend rather than doing individual queries. + // To do so, we first collect the list of channel names. + $channelNames = $channels->map(function (PresenceChannel $channel) use ($request) { + return $channel->getChannelName(); + })->toArray(); + + /** @var PromiseInterface $memberCounts */ + // We ask the replication backend to get us the member count per channel + $memberCounts = app(ReplicationInterface::class) + ->channelMemberCounts($request->appId, $channelNames); + + // We return a promise since the backend runs async. We get $counts back + // as a key-value array of channel names and their member count. + return $memberCounts->then(function (array $counts) use ($channels) { + return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) { + return $counts[$channel->getChannelName()]; + }); + }); + } + + return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) { + return $channel->getUserCount(); + }); + } + + protected function collectUserCounts(Collection $channels, array $attributes, callable $transformer) + { return [ - 'channels' => $channels->map(function ($channel) use ($attributes) { + 'channels' => $channels->map(function (PresenceChannel $channel) use ($transformer, $attributes) { $info = new \stdClass; if (in_array('user_count', $attributes)) { - $info->user_count = count($channel->getUsers()); + $info->user_count = $transformer($channel); } - + return $info; })->toArray() ?: new \stdClass, ]; diff --git a/src/HttpApi/Controllers/FetchUsersController.php b/src/HttpApi/Controllers/FetchUsersController.php index 87960e4..3d7ced7 100644 --- a/src/HttpApi/Controllers/FetchUsersController.php +++ b/src/HttpApi/Controllers/FetchUsersController.php @@ -4,6 +4,7 @@ namespace BeyondCode\LaravelWebSockets\HttpApi\Controllers; use Illuminate\Http\Request; use Illuminate\Support\Collection; +use React\Promise\PromiseInterface; use Symfony\Component\HttpKernel\Exception\HttpException; use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel; @@ -21,8 +22,21 @@ class FetchUsersController extends Controller throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"'); } + $users = $channel->getUsers($request->appId); + + if ($users instanceof PromiseInterface) { + return $users->then(function (array $users) { + return $this->collectUsers($users); + }); + } + + return $this->collectUsers($users); + } + + protected function collectUsers(array $users) + { return [ - 'users' => Collection::make($channel->getUsers())->map(function ($user) { + 'users' => Collection::make($users)->map(function ($user) { return ['id' => $user->user_id]; })->values(), ]; diff --git a/src/PubSub/Redis/RedisClient.php b/src/PubSub/Redis/RedisClient.php index 9e6048c..a2ea8db 100644 --- a/src/PubSub/Redis/RedisClient.php +++ b/src/PubSub/Redis/RedisClient.php @@ -7,6 +7,7 @@ use Illuminate\Support\Str; use Clue\React\Redis\Client; use Clue\React\Redis\Factory; use React\EventLoop\LoopInterface; +use React\Promise\PromiseInterface; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; @@ -183,6 +184,72 @@ class RedisClient implements ReplicationInterface return true; } + /** + * Add a member to a channel. To be called when they have + * subscribed to the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + * @param string $data + */ + public function joinChannel(string $appId, string $channel, string $socketId, string $data) + { + $this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]); + } + + /** + * Remove a member from the channel. To be called when they have + * unsubscribed from the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + */ + public function leaveChannel(string $appId, string $channel, string $socketId) + { + $this->publishClient->__call('hdel', ["$appId:$channel", $socketId]); + } + + /** + * Retrieve the full information about the members in a presence channel. + * + * @param string $appId + * @param string $channel + * @return PromiseInterface + */ + public function channelMembers(string $appId, string $channel): PromiseInterface + { + return $this->publishClient->__call('hgetall', ["$appId:$channel"]) + ->then(function ($members) { + // The data is expected as objects, so we need to JSON decode + return array_walk($members, function ($user) { + return json_decode($user); + }); + }); + } + + /** + * Get the amount of users subscribed for each presence channel. + * + * @param string $appId + * @param array $channelNames + * @return PromiseInterface + */ + public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface + { + $this->publishClient->__call('multi', []); + + foreach ($channelNames as $channel) { + $this->publishClient->__call('hlen', ["$appId:$channel"]); + } + + return $this->publishClient->__call('exec', []) + ->then(function ($data) use ($channelNames) { + return array_combine($channelNames, $data); + }); + } + /** * Build the Redis connection URL from Laravel database config. * diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index 5131ea3..e515e5c 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -4,6 +4,7 @@ namespace BeyondCode\LaravelWebSockets\PubSub; use stdClass; use React\EventLoop\LoopInterface; +use React\Promise\PromiseInterface; interface ReplicationInterface { @@ -40,4 +41,43 @@ interface ReplicationInterface * @return bool */ public function unsubscribe(string $appId, string $channel): bool; + + /** + * Add a member to a channel. To be called when they have + * subscribed to the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + * @param string $data + */ + public function joinChannel(string $appId, string $channel, string $socketId, string $data); + + /** + * Remove a member from the channel. To be called when they have + * unsubscribed from the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + */ + public function leaveChannel(string $appId, string $channel, string $socketId); + + /** + * Retrieve the full information about the members in a presence channel. + * + * @param string $appId + * @param string $channel + * @return PromiseInterface + */ + public function channelMembers(string $appId, string $channel): PromiseInterface; + + /** + * Get the amount of users subscribed for each presence channel. + * + * @param string $appId + * @param array $channelNames + * @return PromiseInterface + */ + public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface; } diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 9db18ad..b5c8413 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -22,6 +22,11 @@ class Channel $this->channelName = $channelName; } + public function getChannelName(): string + { + return $this->channelName; + } + public function hasConnections(): bool { return count($this->subscribedConnections) > 0; @@ -32,6 +37,9 @@ class Channel return $this->subscribedConnections; } + /** + * @throws InvalidSignature + */ protected function verifySignature(ConnectionInterface $connection, stdClass $payload) { $signature = "{$connection->socketId}:{$this->channelName}"; @@ -40,12 +48,15 @@ class Channel $signature .= ":{$payload->channel_data}"; } - if (Str::after($payload->auth, ':') !== hash_hmac('sha256', $signature, $connection->app->secret)) { + if (! hash_equals( + hash_hmac('sha256', $signature, $connection->app->secret), + Str::after($payload->auth, ':')) + ) { throw new InvalidSignature(); } } - /* + /** * @link https://pusher.com/docs/pusher_protocol#presence-channel-events */ public function subscribe(ConnectionInterface $connection, stdClass $payload) @@ -128,7 +139,7 @@ class Channel } } - public function toArray(): array + public function toArray() { return [ 'occupied' => count($this->subscribedConnections) > 0, diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index bb6ec45..21cab87 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -4,18 +4,43 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels; use stdClass; use Ratchet\ConnectionInterface; +use React\Promise\PromiseInterface; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; +use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; class PresenceChannel extends Channel { protected $users = []; - public function getUsers(): array + /** + * @param string $appId + * @return array|PromiseInterface + */ + public function getUsers(string $appId) { + if (config('websockets.replication.enabled') === true) { + // Get the members list from the replication backend + return app(ReplicationInterface::class) + ->channelMembers($appId, $this->channelName); + } + return $this->users; } - /* + /** + * @return array + */ + public function getUserCount() + { + return count($this->users); + } + + /** * @link https://pusher.com/docs/pusher_protocol#presence-channel-events + * + * @param ConnectionInterface $connection + * @param stdClass $payload + * @throws InvalidSignature */ public function subscribe(ConnectionInterface $connection, stdClass $payload) { @@ -26,12 +51,36 @@ class PresenceChannel extends Channel $channelData = json_decode($payload->channel_data); $this->users[$connection->socketId] = $channelData; - // Send the success event - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->channelName, - 'data' => json_encode($this->getChannelData()), - ])); + if (config('websockets.replication.enabled') === true) { + // Add the connection as a member of the channel + app(ReplicationInterface::class) + ->joinChannel( + $connection->app->id, + $this->channelName, + $connection->socketId, + json_encode($channelData) + ); + + // We need to pull the channel data from the replication backend, + // otherwise we won't be sending the full details of the channel + app(ReplicationInterface::class) + ->channelMembers($connection->app->id, $this->channelName) + ->then(function ($users) use ($connection) { + // Send the success event + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->channelName, + 'data' => json_encode($this->getChannelData($users)), + ])); + }); + } else { + // Send the success event + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->channelName, + 'data' => json_encode($this->getChannelData($this->users)), + ])); + } $this->broadcastToOthers($connection, [ 'event' => 'pusher_internal:member_added', @@ -48,6 +97,16 @@ class PresenceChannel extends Channel return; } + if (config('websockets.replication.enabled') === true) { + // Remove the connection as a member of the channel + app(ReplicationInterface::class) + ->leaveChannel( + $connection->app->id, + $this->channelName, + $connection->socketId + ); + } + $this->broadcastToOthers($connection, [ 'event' => 'pusher_internal:member_removed', 'channel' => $this->channelName, @@ -59,38 +118,51 @@ class PresenceChannel extends Channel unset($this->users[$connection->socketId]); } - protected function getChannelData(): array + /** + * @return PromiseInterface|array + */ + public function toArray(string $appId = null) { - return [ - 'presence' => [ - 'ids' => $this->getUserIds(), - 'hash' => $this->getHash(), - 'count' => count($this->users), - ], - ]; - } + if (config('websockets.replication.enabled') === true) { + return app(ReplicationInterface::class) + ->channelMembers($appId, $this->channelName) + ->then(function ($users) { + return array_merge(parent::toArray(), [ + 'user_count' => count($users), + ]); + }); + } - public function toArray(): array - { return array_merge(parent::toArray(), [ 'user_count' => count($this->users), ]); } - protected function getUserIds(): array + protected function getChannelData(array $users): array + { + return [ + 'presence' => [ + 'ids' => $this->getUserIds($users), + 'hash' => $this->getHash($users), + 'count' => count($users), + ], + ]; + } + + protected function getUserIds(array $users): array { $userIds = array_map(function ($channelData) { return (string) $channelData->user_id; - }, $this->users); + }, $users); return array_values($userIds); } - protected function getHash(): array + protected function getHash(array $users): array { $hash = []; - foreach ($this->users as $socketId => $channelData) { + foreach ($users as $socketId => $channelData) { $hash[$channelData->user_id] = $channelData->user_info; } diff --git a/src/WebSockets/Channels/PrivateChannel.php b/src/WebSockets/Channels/PrivateChannel.php index 34f3ac0..03d8e42 100644 --- a/src/WebSockets/Channels/PrivateChannel.php +++ b/src/WebSockets/Channels/PrivateChannel.php @@ -4,9 +4,13 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels; use stdClass; use Ratchet\ConnectionInterface; +use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; class PrivateChannel extends Channel { + /** + * @throws InvalidSignature + */ public function subscribe(ConnectionInterface $connection, stdClass $payload) { $this->verifySignature($connection, $payload);