Initial implementation of Redis as a pub/sub backend, WIP

TODO:
- Presence channels need the user lists stored in Redis (tricky, requires a lot of changes and async code in HTTP controllers)
- Channels in Redis should be scoped by the app ID
This commit is contained in:
Francis Lavoie 2019-03-25 18:00:54 -04:00
parent c203d24469
commit e454f53eaa
No known key found for this signature in database
GPG Key ID: B9E0E04A76AF4692
10 changed files with 448 additions and 151 deletions

View File

@ -1,12 +0,0 @@
<?php
namespace BeyondCode\LaravelWebSockets\PubSub;
use React\EventLoop\LoopInterface;
interface PubSubInterface
{
public function publish(string $appId, array $payload): bool;
public function subscribe(LoopInterface $loop): PubSubInterface;
}

View File

@ -1,118 +0,0 @@
<?php
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use Clue\React\Block;
use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;
use Illuminate\Support\Str;
use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;
class RedisClient implements PubSubInterface
{
const REDIS_KEY = ':websockets:replication:';
protected $apps;
protected $loop;
protected $serverId;
protected $publishClient;
protected $subscribeClient;
public function __construct()
{
$this->apps = collect(config('websockets.apps'));
$this->serverId = Str::uuid()->toString();
}
public function publish(string $appId, array $payload): bool
{
$payload['appId'] = $appId;
$payload['serverId'] = $this->serverId;
$this->publishClient->publish(self::REDIS_KEY, json_encode($payload));
return true;
}
public function subscribe(LoopInterface $loop): PubSubInterface
{
$this->loop = $loop;
[$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop);
return $this->publishClient;
}
protected function publishConnection(): PromiseInterface
{
$connectionUri = $this->getConnectionUri();
$factory = new Factory($this->loop);
return $factory->createClient($connectionUri)->then(
function (Client $client) {
$this->publishClient = $client;
return $this;
}
);
}
protected function subscribeConnection(): PromiseInterface
{
$connectionUri = $this->getConnectionUri();
$factory = new Factory($this->loop);
return $factory->createClient($connectionUri)->then(
function (Client $client) {
$this->subscribeClient = $client;
$this->onConnected();
return $this;
}
);
}
protected function getConnectionUri()
{
$name = config('websockets.replication.connection') ?? 'default';
$config = config('database.redis.' . $name);
$host = $config['host'];
$port = $config['port'] ? (':' . $config['port']) : ':6379';
$query = [];
if ($config['password']) {
$query['password'] = $config['password'];
}
if ($config['database']) {
$query['database'] = $config['database'];
}
$query = http_build_query($query);
return "redis://$host$port" . ($query ? '?' . $query : '');
}
protected function onConnected()
{
$this->subscribeClient->subscribe(self::REDIS_KEY);
$this->subscribeClient->on('message', function ($channel, $payload) {
$this->onMessage($channel, $payload);
});
}
protected function onMessage($channel, $payload)
{
$payload = json_decode($payload);
if ($this->serverId === $payload->serverId) {
return false;
}
/* @var $channelManager ChannelManager */
$channelManager = app(ChannelManager::class);
$channelSearch = $channelManager->find($payload->appId, $payload->channel);
if ($channelSearch === null) {
return false;
}
$channel->broadcast($payload);
return true;
}
}

View File

@ -25,9 +25,8 @@
"php": "^7.1",
"ext-json": "*",
"cboden/ratchet": "^0.4.1",
"clue/block-react": "^1.3",
"clue/buzz-react": "^2.5",
"clue/redis-react": "^2.2",
"clue/redis-react": "^2.3",
"facade/ignition-contracts": "^1.0",
"guzzlehttp/psr7": "^1.5",
"illuminate/broadcasting": "5.7.* || 5.8.* || ^6.0",

View File

@ -9,8 +9,7 @@ use React\Dns\Config\Config as DnsConfig;
use React\Dns\Resolver\ResolverInterface;
use React\EventLoop\Factory as LoopFactory;
use React\Dns\Resolver\Factory as DnsFactory;
use BeyondCode\LaravelWebSockets\Statistics\DnsResolver;
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\Statistics\DnsResolver;
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
@ -146,13 +145,11 @@ class StartWebSocketServer extends Command
}
if (config('websockets.replication.driver') === 'redis') {
$connection = (new RedisClient())->subscribe($this->loop);
app()->singleton(ReplicationInterface::class, function () {
return (new RedisClient())->boot($this->loop);
});
}
app()->singleton(PubSubInterface::class, function () use ($connection) {
return $connection;
});
return $this;
}

View File

@ -46,7 +46,7 @@ abstract class Controller implements HttpServerInterface
$this->requestBuffer = (string) $request->getBody();
if (! $this->checkContentLength()) {
if (! $this->verifyContentLength()) {
return;
}
@ -64,16 +64,16 @@ abstract class Controller implements HttpServerInterface
{
$this->requestBuffer .= $msg;
if (! $this->checkContentLength()) {
if (! $this->verifyContentLength()) {
return;
}
$this->handleRequest($from);
}
protected function checkContentLength()
protected function verifyContentLength()
{
return strlen($this->requestBuffer) !== $this->contentLength;
return strlen($this->requestBuffer) === $this->contentLength;
}
protected function handleRequest(ConnectionInterface $connection)

View File

@ -0,0 +1,204 @@
<?php
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
use stdClass;
use Illuminate\Support\Str;
use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;
use React\EventLoop\LoopInterface;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
class RedisClient implements ReplicationInterface
{
/**
* @var LoopInterface
*/
protected $loop;
/**
* @var string
*/
protected $serverId;
/**
* @var Client
*/
protected $publishClient;
/**
* @var Client
*/
protected $subscribeClient;
/**
* Mapping of subscribed channels, where the key is the channel name,
* and the value is the amount of connections which are subscribed to
* that channel. Used to keep track of whether we still need to stay
* subscribed to those channels with Redis.
*
* @var int[]
*/
protected $subscribedChannels = [];
/**
* RedisClient constructor.
*/
public function __construct()
{
$this->serverId = Str::uuid()->toString();
}
/**
* Boot the RedisClient, initializing the connections
*
* @param LoopInterface $loop
* @return ReplicationInterface
*/
public function boot(LoopInterface $loop): ReplicationInterface
{
$this->loop = $loop;
$connectionUri = $this->getConnectionUri();
$factory = new Factory($this->loop);
$this->publishClient = $factory->createLazyClient($connectionUri);
$this->subscribeClient = $factory->createLazyClient($connectionUri);
$this->subscribeClient->on('message', function ($channel, $payload) {
$this->onMessage($channel, $payload);
});
return $this;
}
/**
* Handle a message received from Redis on a specific channel
*
* @param string $redisChannel
* @param string $payload
* @return bool
*/
protected function onMessage(string $redisChannel, string $payload)
{
$payload = json_decode($payload);
// Ignore messages sent by ourselves
if (isset($payload->serverId) && $this->serverId === $payload->serverId) {
return false;
}
// We need to put the channel name in the payload
$payload->channel = $redisChannel;
/* @var $channelManager ChannelManager */
$channelManager = app(ChannelManager::class);
// Load the Channel instance, if any
$channel = $channelManager->find($payload->appId, $payload->channel);
if ($channel === null) {
return false;
}
$socket = $payload->socket;
// Remove the internal keys from the payload
unset($payload->socket);
unset($payload->serverId);
unset($payload->appId);
// Push the message out to connected websocket clients
$channel->broadcastToEveryoneExcept($payload, $socket);
return true;
}
/**
* Subscribe to a channel on behalf of websocket user
*
* @param string $appId
* @param string $channel
* @return bool
*/
public function subscribe(string $appId, string $channel): bool
{
if (! isset($this->subscribedChannels[$channel])) {
// We're not subscribed to the channel yet, subscribe and set the count to 1
$this->subscribeClient->__call('subscribe', [$channel]);
$this->subscribedChannels[$channel] = 1;
} else {
// Increment the subscribe count if we've already subscribed
$this->subscribedChannels[$channel]++;
}
return true;
}
/**
* Unsubscribe from a channel on behalf of a websocket user
*
* @param string $appId
* @param string $channel
* @return bool
*/
public function unsubscribe(string $appId, string $channel): bool
{
if (! isset($this->subscribedChannels[$channel])) {
return false;
}
// Decrement the subscription count for this channel
$this->subscribedChannels[$channel]--;
// If we no longer have subscriptions to that channel, unsubscribe
if ($this->subscribedChannels[$channel] < 1) {
$this->subscribeClient->__call('unsubscribe', [$channel]);
unset($this->subscribedChannels[$channel]);
}
return true;
}
/**
* Publish a message to a channel on behalf of a websocket user
*
* @param string $appId
* @param string $channel
* @param stdClass $payload
* @return bool
*/
public function publish(string $appId, string $channel, stdClass $payload): bool
{
$payload->appId = $appId;
$payload->serverId = $this->serverId;
$this->publishClient->__call('publish', [$channel, json_encode($payload)]);
return true;
}
/**
* Build the Redis connection URL from Laravel database config
*
* @return string
*/
protected function getConnectionUri()
{
$name = config('websockets.replication.connection') ?? 'default';
$config = config("database.redis.$name");
$host = $config['host'];
$port = $config['port'] ? (':' . $config['port']) : ':6379';
$query = [];
if ($config['password']) {
$query['password'] = $config['password'];
}
if ($config['database']) {
$query['database'] = $config['database'];
}
$query = http_build_query($query);
return "redis://$host$port".($query ? '?'.$query : '');
}
}

View File

@ -0,0 +1,150 @@
<?php
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
use Pusher\Pusher;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Broadcasting\Broadcasters\Broadcaster;
use Illuminate\Broadcasting\Broadcasters\UsePusherChannelConventions;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
class RedisPusherBroadcaster extends Broadcaster
{
use UsePusherChannelConventions;
/**
* The Pusher SDK instance.
*
* @var \Pusher\Pusher
*/
protected $pusher;
/**
* The Pusher app ID, to be passed in the payload.
*
* @var string
*/
protected $appId;
/**
* The Redis instance.
*
* @var \Illuminate\Contracts\Redis\Factory
*/
protected $redis;
/**
* The Redis connection to use for broadcasting.
*
* @var string|null
*/
protected $connection;
/**
* Create a new broadcaster instance.
*
* @param Pusher $pusher
* @param string $appId
* @param \Illuminate\Contracts\Redis\Factory $redis
* @param string|null $connection
*/
public function __construct(Pusher $pusher, string $appId, Redis $redis, $connection = null)
{
$this->pusher = $pusher;
$this->appId = $appId;
$this->redis = $redis;
$this->connection = $connection;
}
/**
* Authenticate the incoming request for a given channel.
*
* @param \Illuminate\Http\Request $request
* @return mixed
*
* @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException
*/
public function auth($request)
{
$channelName = $this->normalizeChannelName($request->channel_name);
if ($this->isGuardedChannel($request->channel_name) &&
! $this->retrieveUser($request, $channelName)) {
throw new AccessDeniedHttpException;
}
return parent::verifyUserCanAccessChannel(
$request, $channelName
);
}
/**
* Return the valid authentication response.
*
* @param \Illuminate\Http\Request $request
* @param mixed $result
* @return mixed
* @throws \Pusher\PusherException
*/
public function validAuthenticationResponse($request, $result)
{
if (Str::startsWith($request->channel_name, 'private')) {
return $this->decodePusherResponse(
$request, $this->pusher->socket_auth($request->channel_name, $request->socket_id)
);
}
$channelName = $this->normalizeChannelName($request->channel_name);
return $this->decodePusherResponse(
$request,
$this->pusher->presence_auth(
$request->channel_name, $request->socket_id,
$this->retrieveUser($request, $channelName)->getAuthIdentifier(), $result
)
);
}
/**
* Decode the given Pusher response.
*
* @param \Illuminate\Http\Request $request
* @param mixed $response
* @return array
*/
protected function decodePusherResponse($request, $response)
{
if (! $request->input('callback', false)) {
return json_decode($response, true);
}
return response()->json(json_decode($response, true))
->withCallback($request->callback);
}
/**
* Broadcast the given event.
*
* @param array $channels
* @param string $event
* @param array $payload
* @return void
*/
public function broadcast(array $channels, $event, array $payload = [])
{
$connection = $this->redis->connection($this->connection);
$payload = json_encode([
'appId' => $this->appId,
'event' => $event,
'data' => $payload,
'socket' => Arr::pull($payload, 'socket'),
]);
foreach ($this->formatChannels($channels) as $channel) {
$connection->publish($channel, $payload);
}
}
}

View File

@ -0,0 +1,43 @@
<?php
namespace BeyondCode\LaravelWebSockets\PubSub;
use stdClass;
use React\EventLoop\LoopInterface;
interface ReplicationInterface
{
/**
* Boot the pub/sub provider (open connections, initial subscriptions, etc.)
*
* @param LoopInterface $loop
* @return self
*/
public function boot(LoopInterface $loop): self;
/**
* Publish a payload on a specific channel, for a specific app
*
* @param string $appId
* @param string $channel
* @param stdClass $payload
* @return bool
*/
public function publish(string $appId, string $channel, stdClass $payload): bool;
/**
* Subscribe to receive messages for a channel
*
* @param string $channel
* @return bool
*/
public function subscribe(string $appId, string $channel): bool;
/**
* Unsubscribe from a channel
*
* @param string $channel
* @return bool
*/
public function unsubscribe(string $appId, string $channel): bool;
}

View File

@ -5,7 +5,7 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
use stdClass;
use Illuminate\Support\Str;
use Ratchet\ConnectionInterface;
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
@ -52,6 +52,12 @@ class Channel
{
$this->saveConnection($connection);
if (config('websockets.replication.enabled') === true) {
// Subscribe for broadcasted messages from the pub/sub backend
app(ReplicationInterface::class)
->subscribe($connection->app->id, $this->channelName);
}
$connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded',
'channel' => $this->channelName,
@ -62,6 +68,12 @@ class Channel
{
unset($this->subscribedConnections[$connection->socketId]);
if (config('websockets.replication.enabled') === true) {
// Unsubscribe from the pub/sub backend
app(ReplicationInterface::class)
->unsubscribe($connection->app->id, $this->channelName);
}
if (! $this->hasConnections()) {
DashboardLogger::vacated($connection, $this->channelName);
}
@ -89,17 +101,17 @@ class Channel
public function broadcastToOthers(ConnectionInterface $connection, $payload)
{
if (config('websockets.replication.enabled') === true) {
// Also broadcast via the other websocket servers
app(ReplicationInterface::class)
->publish($connection->app->id, $payload);
}
$this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id);
}
public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null)
{
if (config('websockets.replication.enabled') === true) {
// Also broadcast via the other websocket instances
app()->get(PubSubInterface::class)
->publish($appId, $payload);
}
if (is_null($socketId)) {
$this->broadcast($payload);
return;

View File

@ -2,12 +2,16 @@
namespace BeyondCode\LaravelWebSockets;
use Pusher\Pusher;
use Psr\Log\LoggerInterface;
use Illuminate\Support\Facades\Gate;
use Illuminate\Support\Facades\Route;
use Illuminate\Support\ServiceProvider;
use Illuminate\Broadcasting\BroadcastManager;
use BeyondCode\LaravelWebSockets\Server\Router;
use BeyondCode\LaravelWebSockets\Apps\AppProvider;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisPusherBroadcaster;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\AuthenticateDashboard;
@ -19,7 +23,7 @@ use BeyondCode\LaravelWebSockets\Statistics\Http\Controllers\WebSocketStatistics
class WebSocketsServiceProvider extends ServiceProvider
{
public function boot()
public function boot(BroadcastManager $broadcastManager)
{
$this->publishes([
__DIR__.'/../config/websockets.php' => base_path('config/websockets.php'),
@ -41,6 +45,24 @@ class WebSocketsServiceProvider extends ServiceProvider
Console\StartWebSocketServer::class,
Console\CleanStatistics::class,
]);
$broadcastManager->extend('redis-pusher', function(array $config) {
$pusher = new Pusher(
$config['key'], $config['secret'],
$config['app_id'], $config['options'] ?? []
);
if ($config['log'] ?? false) {
$pusher->setLogger($this->app->make(LoggerInterface::class));
}
return new RedisPusherBroadcaster(
$pusher,
$config['app_id'],
$this->app->make('redis'),
$config['connection'] ?? null
);
});
}
public function register()