diff --git a/PubSub/PubSubInterface.php b/PubSub/PubSubInterface.php new file mode 100644 index 0000000..3a28ffc --- /dev/null +++ b/PubSub/PubSubInterface.php @@ -0,0 +1,12 @@ +apps = collect(config('websockets.apps')); + $this->serverId = Str::uuid()->toString(); + } + + public function publish(string $appId, array $payload): bool + { + $payload['appId'] = $appId; + $payload['serverId'] = $this->serverId; + $this->publishClient->publish(self::REDIS_KEY, json_encode($payload)); + return true; + } + + public function subscribe(LoopInterface $loop): PubSubInterface + { + $this->loop = $loop; + [$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop); + return $this->publishClient; + } + + protected function publishConnection(): PromiseInterface + { + $connectionUri = $this->getConnectionUri(); + $factory = new Factory($this->loop); + return $factory->createClient($connectionUri)->then( + function (Client $client) { + $this->publishClient = $client; + return $this; + } + ); + } + + + protected function subscribeConnection(): PromiseInterface + { + $connectionUri = $this->getConnectionUri(); + $factory = new Factory($this->loop); + return $factory->createClient($connectionUri)->then( + function (Client $client) { + $this->subscribeClient = $client; + $this->onConnected(); + return $this; + } + ); + } + + protected function getConnectionUri() + { + $name = config('websockets.replication.connection') ?? 'default'; + $config = config('database.redis.' . $name); + $host = $config['host']; + $port = $config['port'] ? (':' . $config['port']) : ':6379'; + + $query = []; + if ($config['password']) { + $query['password'] = $config['password']; + } + if ($config['database']) { + $query['database'] = $config['database']; + } + $query = http_build_query($query); + + return "redis://$host$port" . ($query ? '?' . $query : ''); + } + + protected function onConnected() + { + $this->subscribeClient->subscribe(self::REDIS_KEY); + $this->subscribeClient->on('message', function ($channel, $payload) { + $this->onMessage($channel, $payload); + }); + } + + protected function onMessage($channel, $payload) + { + $payload = json_decode($payload); + + if ($this->serverId === $payload->serverId) { + return false; + } + + /* @var $channelManager ChannelManager */ + $channelManager = app(ChannelManager::class); + $channelSearch = $channelManager->find($payload->appId, $payload->channel); + + if ($channelSearch === null) { + return false; + } + + $channel->broadcast($payload); + return true; + } + +} \ No newline at end of file diff --git a/composer.json b/composer.json index 87aa7ba..8a2133e 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,9 @@ "php": "^7.1", "ext-json": "*", "cboden/ratchet": "^0.4.1", + "clue/block-react": "^1.3", "clue/buzz-react": "^2.5", + "clue/redis-react": "^2.2", "facade/ignition-contracts": "^1.0", "guzzlehttp/psr7": "^1.5", "illuminate/broadcasting": "5.7.* || 5.8.* || ^6.0", diff --git a/config/websockets.php b/config/websockets.php index 6a2e7f0..f5b43e3 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -124,6 +124,20 @@ return [ 'passphrase' => env('LARAVEL_WEBSOCKETS_SSL_PASSPHRASE', null), ], + /* + * You can enable replication to publish and subscribe to messages across the driver + */ + 'replication' => [ + 'enabled' => false, + + 'driver' => 'redis', + + 'redis' => [ + 'connection' => 'default', + ], + ], + + /* * Channel Manager * This class handles how channel persistence is handled. diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index a4e8ff2..e014e29 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -10,6 +10,8 @@ use React\Dns\Resolver\ResolverInterface; use React\EventLoop\Factory as LoopFactory; use React\Dns\Resolver\Factory as DnsFactory; use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; +use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface; +use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger; @@ -45,6 +47,7 @@ class StartWebSocketServer extends Command ->configureConnectionLogger() ->registerEchoRoutes() ->registerCustomRoutes() + ->configurePubSubReplication() ->startWebSocketServer(); } @@ -135,6 +138,23 @@ class StartWebSocketServer extends Command ->run(); } + protected function configurePubSubReplication() + { + if (config('websockets.replication.enabled') !== true) { + return $this; + } + + if (config('websockets.replication.driver') === 'redis') { + $connection = (new RedisClient())->subscribe($this->loop); + } + + app()->singleton(PubSubInterface::class, function () use ($connection) { + return $connection; + }); + + return $this; + } + protected function getDnsResolver(): ResolverInterface { if (! config('websockets.statistics.perform_dns_lookup')) { diff --git a/src/HttpApi/Controllers/TriggerEventController.php b/src/HttpApi/Controllers/TriggerEventController.php index 0764071..ee8bcb3 100644 --- a/src/HttpApi/Controllers/TriggerEventController.php +++ b/src/HttpApi/Controllers/TriggerEventController.php @@ -19,7 +19,7 @@ class TriggerEventController extends Controller 'channel' => $channelName, 'event' => $request->json()->get('name'), 'data' => $request->json()->get('data'), - ], $request->json()->get('socket_id')); + ], $request->json()->get('socket_id'), $request->appId); DashboardLogger::apiMessage( $request->appId, diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 9415b0b..a6136d4 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels; use stdClass; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; +use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface; use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; @@ -88,11 +89,15 @@ class Channel public function broadcastToOthers(ConnectionInterface $connection, $payload) { - $this->broadcastToEveryoneExcept($payload, $connection->socketId); + $this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id); } - public function broadcastToEveryoneExcept($payload, ?string $socketId = null) + public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null) { + if (config('websockets.replication.enabled') === true) { + app()->get(PubSubInterface::class)->publish($appId, $payload); + } + if (is_null($socketId)) { return $this->broadcast($payload); }