From e454f53eaaaaaa4f2e42fab65460eb556f626d47 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 25 Mar 2019 18:00:54 -0400 Subject: [PATCH] Initial implementation of Redis as a pub/sub backend, WIP TODO: - Presence channels need the user lists stored in Redis (tricky, requires a lot of changes and async code in HTTP controllers) - Channels in Redis should be scoped by the app ID --- PubSub/PubSubInterface.php | 12 -- PubSub/Redis/RedisClient.php | 118 ----------- composer.json | 3 +- src/Console/StartWebSocketServer.php | 11 +- src/HttpApi/Controllers/Controller.php | 8 +- src/PubSub/Redis/RedisClient.php | 204 ++++++++++++++++++++ src/PubSub/Redis/RedisPusherBroadcaster.php | 150 ++++++++++++++ src/PubSub/ReplicationInterface.php | 43 +++++ src/WebSockets/Channels/Channel.php | 26 ++- src/WebSocketsServiceProvider.php | 24 ++- 10 files changed, 448 insertions(+), 151 deletions(-) delete mode 100644 PubSub/PubSubInterface.php delete mode 100644 PubSub/Redis/RedisClient.php create mode 100644 src/PubSub/Redis/RedisClient.php create mode 100644 src/PubSub/Redis/RedisPusherBroadcaster.php create mode 100644 src/PubSub/ReplicationInterface.php diff --git a/PubSub/PubSubInterface.php b/PubSub/PubSubInterface.php deleted file mode 100644 index 3a28ffc..0000000 --- a/PubSub/PubSubInterface.php +++ /dev/null @@ -1,12 +0,0 @@ -apps = collect(config('websockets.apps')); - $this->serverId = Str::uuid()->toString(); - } - - public function publish(string $appId, array $payload): bool - { - $payload['appId'] = $appId; - $payload['serverId'] = $this->serverId; - $this->publishClient->publish(self::REDIS_KEY, json_encode($payload)); - return true; - } - - public function subscribe(LoopInterface $loop): PubSubInterface - { - $this->loop = $loop; - [$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop); - return $this->publishClient; - } - - protected function publishConnection(): PromiseInterface - { - $connectionUri = $this->getConnectionUri(); - $factory = new Factory($this->loop); - return $factory->createClient($connectionUri)->then( - function (Client $client) { - $this->publishClient = $client; - return $this; - } - ); - } - - - protected function subscribeConnection(): PromiseInterface - { - $connectionUri = $this->getConnectionUri(); - $factory = new Factory($this->loop); - return $factory->createClient($connectionUri)->then( - function (Client $client) { - $this->subscribeClient = $client; - $this->onConnected(); - return $this; - } - ); - } - - protected function getConnectionUri() - { - $name = config('websockets.replication.connection') ?? 'default'; - $config = config('database.redis.' . $name); - $host = $config['host']; - $port = $config['port'] ? (':' . $config['port']) : ':6379'; - - $query = []; - if ($config['password']) { - $query['password'] = $config['password']; - } - if ($config['database']) { - $query['database'] = $config['database']; - } - $query = http_build_query($query); - - return "redis://$host$port" . ($query ? '?' . $query : ''); - } - - protected function onConnected() - { - $this->subscribeClient->subscribe(self::REDIS_KEY); - $this->subscribeClient->on('message', function ($channel, $payload) { - $this->onMessage($channel, $payload); - }); - } - - protected function onMessage($channel, $payload) - { - $payload = json_decode($payload); - - if ($this->serverId === $payload->serverId) { - return false; - } - - /* @var $channelManager ChannelManager */ - $channelManager = app(ChannelManager::class); - $channelSearch = $channelManager->find($payload->appId, $payload->channel); - - if ($channelSearch === null) { - return false; - } - - $channel->broadcast($payload); - return true; - } - -} \ No newline at end of file diff --git a/composer.json b/composer.json index 8a2133e..e21a3fc 100644 --- a/composer.json +++ b/composer.json @@ -25,9 +25,8 @@ "php": "^7.1", "ext-json": "*", "cboden/ratchet": "^0.4.1", - "clue/block-react": "^1.3", "clue/buzz-react": "^2.5", - "clue/redis-react": "^2.2", + "clue/redis-react": "^2.3", "facade/ignition-contracts": "^1.0", "guzzlehttp/psr7": "^1.5", "illuminate/broadcasting": "5.7.* || 5.8.* || ^6.0", diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index 8a882a5..f92039c 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -9,8 +9,7 @@ use React\Dns\Config\Config as DnsConfig; use React\Dns\Resolver\ResolverInterface; use React\EventLoop\Factory as LoopFactory; use React\Dns\Resolver\Factory as DnsFactory; -use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; -use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; @@ -146,13 +145,11 @@ class StartWebSocketServer extends Command } if (config('websockets.replication.driver') === 'redis') { - $connection = (new RedisClient())->subscribe($this->loop); + app()->singleton(ReplicationInterface::class, function () { + return (new RedisClient())->boot($this->loop); + }); } - app()->singleton(PubSubInterface::class, function () use ($connection) { - return $connection; - }); - return $this; } diff --git a/src/HttpApi/Controllers/Controller.php b/src/HttpApi/Controllers/Controller.php index 48ecb5d..863a507 100644 --- a/src/HttpApi/Controllers/Controller.php +++ b/src/HttpApi/Controllers/Controller.php @@ -46,7 +46,7 @@ abstract class Controller implements HttpServerInterface $this->requestBuffer = (string) $request->getBody(); - if (! $this->checkContentLength()) { + if (! $this->verifyContentLength()) { return; } @@ -64,16 +64,16 @@ abstract class Controller implements HttpServerInterface { $this->requestBuffer .= $msg; - if (! $this->checkContentLength()) { + if (! $this->verifyContentLength()) { return; } $this->handleRequest($from); } - protected function checkContentLength() + protected function verifyContentLength() { - return strlen($this->requestBuffer) !== $this->contentLength; + return strlen($this->requestBuffer) === $this->contentLength; } protected function handleRequest(ConnectionInterface $connection) diff --git a/src/PubSub/Redis/RedisClient.php b/src/PubSub/Redis/RedisClient.php new file mode 100644 index 0000000..a393ac1 --- /dev/null +++ b/src/PubSub/Redis/RedisClient.php @@ -0,0 +1,204 @@ +serverId = Str::uuid()->toString(); + } + + /** + * Boot the RedisClient, initializing the connections + * + * @param LoopInterface $loop + * @return ReplicationInterface + */ + public function boot(LoopInterface $loop): ReplicationInterface + { + $this->loop = $loop; + + $connectionUri = $this->getConnectionUri(); + $factory = new Factory($this->loop); + + $this->publishClient = $factory->createLazyClient($connectionUri); + $this->subscribeClient = $factory->createLazyClient($connectionUri); + + $this->subscribeClient->on('message', function ($channel, $payload) { + $this->onMessage($channel, $payload); + }); + + return $this; + } + + /** + * Handle a message received from Redis on a specific channel + * + * @param string $redisChannel + * @param string $payload + * @return bool + */ + protected function onMessage(string $redisChannel, string $payload) + { + $payload = json_decode($payload); + + // Ignore messages sent by ourselves + if (isset($payload->serverId) && $this->serverId === $payload->serverId) { + return false; + } + + // We need to put the channel name in the payload + $payload->channel = $redisChannel; + + /* @var $channelManager ChannelManager */ + $channelManager = app(ChannelManager::class); + + // Load the Channel instance, if any + $channel = $channelManager->find($payload->appId, $payload->channel); + if ($channel === null) { + return false; + } + + $socket = $payload->socket; + + // Remove the internal keys from the payload + unset($payload->socket); + unset($payload->serverId); + unset($payload->appId); + + // Push the message out to connected websocket clients + $channel->broadcastToEveryoneExcept($payload, $socket); + + return true; + } + + /** + * Subscribe to a channel on behalf of websocket user + * + * @param string $appId + * @param string $channel + * @return bool + */ + public function subscribe(string $appId, string $channel): bool + { + if (! isset($this->subscribedChannels[$channel])) { + // We're not subscribed to the channel yet, subscribe and set the count to 1 + $this->subscribeClient->__call('subscribe', [$channel]); + $this->subscribedChannels[$channel] = 1; + } else { + // Increment the subscribe count if we've already subscribed + $this->subscribedChannels[$channel]++; + } + + return true; + } + + /** + * Unsubscribe from a channel on behalf of a websocket user + * + * @param string $appId + * @param string $channel + * @return bool + */ + public function unsubscribe(string $appId, string $channel): bool + { + if (! isset($this->subscribedChannels[$channel])) { + return false; + } + + // Decrement the subscription count for this channel + $this->subscribedChannels[$channel]--; + + // If we no longer have subscriptions to that channel, unsubscribe + if ($this->subscribedChannels[$channel] < 1) { + $this->subscribeClient->__call('unsubscribe', [$channel]); + unset($this->subscribedChannels[$channel]); + } + + return true; + } + + /** + * Publish a message to a channel on behalf of a websocket user + * + * @param string $appId + * @param string $channel + * @param stdClass $payload + * @return bool + */ + public function publish(string $appId, string $channel, stdClass $payload): bool + { + $payload->appId = $appId; + $payload->serverId = $this->serverId; + + $this->publishClient->__call('publish', [$channel, json_encode($payload)]); + + return true; + } + + /** + * Build the Redis connection URL from Laravel database config + * + * @return string + */ + protected function getConnectionUri() + { + $name = config('websockets.replication.connection') ?? 'default'; + $config = config("database.redis.$name"); + $host = $config['host']; + $port = $config['port'] ? (':' . $config['port']) : ':6379'; + + $query = []; + if ($config['password']) { + $query['password'] = $config['password']; + } + if ($config['database']) { + $query['database'] = $config['database']; + } + $query = http_build_query($query); + + return "redis://$host$port".($query ? '?'.$query : ''); + } +} diff --git a/src/PubSub/Redis/RedisPusherBroadcaster.php b/src/PubSub/Redis/RedisPusherBroadcaster.php new file mode 100644 index 0000000..6f88179 --- /dev/null +++ b/src/PubSub/Redis/RedisPusherBroadcaster.php @@ -0,0 +1,150 @@ +pusher = $pusher; + $this->appId = $appId; + $this->redis = $redis; + $this->connection = $connection; + } + + /** + * Authenticate the incoming request for a given channel. + * + * @param \Illuminate\Http\Request $request + * @return mixed + * + * @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException + */ + public function auth($request) + { + $channelName = $this->normalizeChannelName($request->channel_name); + + if ($this->isGuardedChannel($request->channel_name) && + ! $this->retrieveUser($request, $channelName)) { + throw new AccessDeniedHttpException; + } + + return parent::verifyUserCanAccessChannel( + $request, $channelName + ); + } + + /** + * Return the valid authentication response. + * + * @param \Illuminate\Http\Request $request + * @param mixed $result + * @return mixed + * @throws \Pusher\PusherException + */ + public function validAuthenticationResponse($request, $result) + { + if (Str::startsWith($request->channel_name, 'private')) { + return $this->decodePusherResponse( + $request, $this->pusher->socket_auth($request->channel_name, $request->socket_id) + ); + } + + $channelName = $this->normalizeChannelName($request->channel_name); + + return $this->decodePusherResponse( + $request, + $this->pusher->presence_auth( + $request->channel_name, $request->socket_id, + $this->retrieveUser($request, $channelName)->getAuthIdentifier(), $result + ) + ); + } + + /** + * Decode the given Pusher response. + * + * @param \Illuminate\Http\Request $request + * @param mixed $response + * @return array + */ + protected function decodePusherResponse($request, $response) + { + if (! $request->input('callback', false)) { + return json_decode($response, true); + } + + return response()->json(json_decode($response, true)) + ->withCallback($request->callback); + } + + /** + * Broadcast the given event. + * + * @param array $channels + * @param string $event + * @param array $payload + * @return void + */ + public function broadcast(array $channels, $event, array $payload = []) + { + $connection = $this->redis->connection($this->connection); + + $payload = json_encode([ + 'appId' => $this->appId, + 'event' => $event, + 'data' => $payload, + 'socket' => Arr::pull($payload, 'socket'), + ]); + + foreach ($this->formatChannels($channels) as $channel) { + $connection->publish($channel, $payload); + } + } +} diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php new file mode 100644 index 0000000..f1049c4 --- /dev/null +++ b/src/PubSub/ReplicationInterface.php @@ -0,0 +1,43 @@ +saveConnection($connection); + if (config('websockets.replication.enabled') === true) { + // Subscribe for broadcasted messages from the pub/sub backend + app(ReplicationInterface::class) + ->subscribe($connection->app->id, $this->channelName); + } + $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', 'channel' => $this->channelName, @@ -62,6 +68,12 @@ class Channel { unset($this->subscribedConnections[$connection->socketId]); + if (config('websockets.replication.enabled') === true) { + // Unsubscribe from the pub/sub backend + app(ReplicationInterface::class) + ->unsubscribe($connection->app->id, $this->channelName); + } + if (! $this->hasConnections()) { DashboardLogger::vacated($connection, $this->channelName); } @@ -89,17 +101,17 @@ class Channel public function broadcastToOthers(ConnectionInterface $connection, $payload) { + if (config('websockets.replication.enabled') === true) { + // Also broadcast via the other websocket servers + app(ReplicationInterface::class) + ->publish($connection->app->id, $payload); + } + $this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id); } public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null) { - if (config('websockets.replication.enabled') === true) { - // Also broadcast via the other websocket instances - app()->get(PubSubInterface::class) - ->publish($appId, $payload); - } - if (is_null($socketId)) { $this->broadcast($payload); return; diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index 9c57842..9057a48 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -2,12 +2,16 @@ namespace BeyondCode\LaravelWebSockets; +use Pusher\Pusher; +use Psr\Log\LoggerInterface; use Illuminate\Support\Facades\Gate; use Illuminate\Support\Facades\Route; use Illuminate\Support\ServiceProvider; +use Illuminate\Broadcasting\BroadcastManager; use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Apps\AppProvider; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; +use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisPusherBroadcaster; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\AuthenticateDashboard; @@ -19,7 +23,7 @@ use BeyondCode\LaravelWebSockets\Statistics\Http\Controllers\WebSocketStatistics class WebSocketsServiceProvider extends ServiceProvider { - public function boot() + public function boot(BroadcastManager $broadcastManager) { $this->publishes([ __DIR__.'/../config/websockets.php' => base_path('config/websockets.php'), @@ -41,6 +45,24 @@ class WebSocketsServiceProvider extends ServiceProvider Console\StartWebSocketServer::class, Console\CleanStatistics::class, ]); + + $broadcastManager->extend('redis-pusher', function(array $config) { + $pusher = new Pusher( + $config['key'], $config['secret'], + $config['app_id'], $config['options'] ?? [] + ); + + if ($config['log'] ?? false) { + $pusher->setLogger($this->app->make(LoggerInterface::class)); + } + + return new RedisPusherBroadcaster( + $pusher, + $config['app_id'], + $this->app->make('redis'), + $config['connection'] ?? null + ); + }); } public function register()