Update with pub sub replication and redis driver

This commit is contained in:
Sam Snelling 2018-12-17 06:12:53 -06:00 committed by Francis Lavoie
parent 8e422cbc5b
commit b584d0cacb
No known key found for this signature in database
GPG Key ID: B9E0E04A76AF4692
7 changed files with 174 additions and 3 deletions

View File

@ -0,0 +1,12 @@
<?php
namespace BeyondCode\LaravelWebSockets\PubSub;
use React\EventLoop\LoopInterface;
interface PubSubInterface
{
public function publish(string $appId, array $payload): bool;
public function subscribe(LoopInterface $loop): PubSubInterface;
}

View File

@ -0,0 +1,118 @@
<?php
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use Clue\React\Block;
use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;
use Illuminate\Support\Str;
use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;
class RedisClient implements PubSubInterface
{
const REDIS_KEY = ':websockets:replication:';
protected $apps;
protected $loop;
protected $serverId;
protected $publishClient;
protected $subscribeClient;
public function __construct()
{
$this->apps = collect(config('websockets.apps'));
$this->serverId = Str::uuid()->toString();
}
public function publish(string $appId, array $payload): bool
{
$payload['appId'] = $appId;
$payload['serverId'] = $this->serverId;
$this->publishClient->publish(self::REDIS_KEY, json_encode($payload));
return true;
}
public function subscribe(LoopInterface $loop): PubSubInterface
{
$this->loop = $loop;
[$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop);
return $this->publishClient;
}
protected function publishConnection(): PromiseInterface
{
$connectionUri = $this->getConnectionUri();
$factory = new Factory($this->loop);
return $factory->createClient($connectionUri)->then(
function (Client $client) {
$this->publishClient = $client;
return $this;
}
);
}
protected function subscribeConnection(): PromiseInterface
{
$connectionUri = $this->getConnectionUri();
$factory = new Factory($this->loop);
return $factory->createClient($connectionUri)->then(
function (Client $client) {
$this->subscribeClient = $client;
$this->onConnected();
return $this;
}
);
}
protected function getConnectionUri()
{
$name = config('websockets.replication.connection') ?? 'default';
$config = config('database.redis.' . $name);
$host = $config['host'];
$port = $config['port'] ? (':' . $config['port']) : ':6379';
$query = [];
if ($config['password']) {
$query['password'] = $config['password'];
}
if ($config['database']) {
$query['database'] = $config['database'];
}
$query = http_build_query($query);
return "redis://$host$port" . ($query ? '?' . $query : '');
}
protected function onConnected()
{
$this->subscribeClient->subscribe(self::REDIS_KEY);
$this->subscribeClient->on('message', function ($channel, $payload) {
$this->onMessage($channel, $payload);
});
}
protected function onMessage($channel, $payload)
{
$payload = json_decode($payload);
if ($this->serverId === $payload->serverId) {
return false;
}
/* @var $channelManager ChannelManager */
$channelManager = app(ChannelManager::class);
$channelSearch = $channelManager->find($payload->appId, $payload->channel);
if ($channelSearch === null) {
return false;
}
$channel->broadcast($payload);
return true;
}
}

View File

@ -25,7 +25,9 @@
"php": "^7.1", "php": "^7.1",
"ext-json": "*", "ext-json": "*",
"cboden/ratchet": "^0.4.1", "cboden/ratchet": "^0.4.1",
"clue/block-react": "^1.3",
"clue/buzz-react": "^2.5", "clue/buzz-react": "^2.5",
"clue/redis-react": "^2.2",
"facade/ignition-contracts": "^1.0", "facade/ignition-contracts": "^1.0",
"guzzlehttp/psr7": "^1.5", "guzzlehttp/psr7": "^1.5",
"illuminate/broadcasting": "5.7.* || 5.8.* || ^6.0", "illuminate/broadcasting": "5.7.* || 5.8.* || ^6.0",

View File

@ -124,6 +124,20 @@ return [
'passphrase' => env('LARAVEL_WEBSOCKETS_SSL_PASSPHRASE', null), 'passphrase' => env('LARAVEL_WEBSOCKETS_SSL_PASSPHRASE', null),
], ],
/*
* You can enable replication to publish and subscribe to messages across the driver
*/
'replication' => [
'enabled' => false,
'driver' => 'redis',
'redis' => [
'connection' => 'default',
],
],
/* /*
* Channel Manager * Channel Manager
* This class handles how channel persistence is handled. * This class handles how channel persistence is handled.

View File

@ -10,6 +10,8 @@ use React\Dns\Resolver\ResolverInterface;
use React\EventLoop\Factory as LoopFactory; use React\EventLoop\Factory as LoopFactory;
use React\Dns\Resolver\Factory as DnsFactory; use React\Dns\Resolver\Factory as DnsFactory;
use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; use BeyondCode\LaravelWebSockets\Statistics\DnsResolver;
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient;
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger; use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger;
@ -45,6 +47,7 @@ class StartWebSocketServer extends Command
->configureConnectionLogger() ->configureConnectionLogger()
->registerEchoRoutes() ->registerEchoRoutes()
->registerCustomRoutes() ->registerCustomRoutes()
->configurePubSubReplication()
->startWebSocketServer(); ->startWebSocketServer();
} }
@ -135,6 +138,23 @@ class StartWebSocketServer extends Command
->run(); ->run();
} }
protected function configurePubSubReplication()
{
if (config('websockets.replication.enabled') !== true) {
return $this;
}
if (config('websockets.replication.driver') === 'redis') {
$connection = (new RedisClient())->subscribe($this->loop);
}
app()->singleton(PubSubInterface::class, function () use ($connection) {
return $connection;
});
return $this;
}
protected function getDnsResolver(): ResolverInterface protected function getDnsResolver(): ResolverInterface
{ {
if (! config('websockets.statistics.perform_dns_lookup')) { if (! config('websockets.statistics.perform_dns_lookup')) {

View File

@ -19,7 +19,7 @@ class TriggerEventController extends Controller
'channel' => $channelName, 'channel' => $channelName,
'event' => $request->json()->get('name'), 'event' => $request->json()->get('name'),
'data' => $request->json()->get('data'), 'data' => $request->json()->get('data'),
], $request->json()->get('socket_id')); ], $request->json()->get('socket_id'), $request->appId);
DashboardLogger::apiMessage( DashboardLogger::apiMessage(
$request->appId, $request->appId,

View File

@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
use stdClass; use stdClass;
use Illuminate\Support\Str; use Illuminate\Support\Str;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger; use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
@ -88,11 +89,15 @@ class Channel
public function broadcastToOthers(ConnectionInterface $connection, $payload) public function broadcastToOthers(ConnectionInterface $connection, $payload)
{ {
$this->broadcastToEveryoneExcept($payload, $connection->socketId); $this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id);
} }
public function broadcastToEveryoneExcept($payload, ?string $socketId = null) public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null)
{ {
if (config('websockets.replication.enabled') === true) {
app()->get(PubSubInterface::class)->publish($appId, $payload);
}
if (is_null($socketId)) { if (is_null($socketId)) {
return $this->broadcast($payload); return $this->broadcast($payload);
} }