cleanup & refactor of pubsub code

This commit is contained in:
anthony 2019-07-28 20:50:10 +02:00 committed by Francis Lavoie
parent ed5503407e
commit d7c30f3b0f
No known key found for this signature in database
GPG Key ID: B9E0E04A76AF4692
10 changed files with 170 additions and 47 deletions

View File

@ -42,7 +42,8 @@
"require-dev": { "require-dev": {
"mockery/mockery": "^1.2", "mockery/mockery": "^1.2",
"orchestra/testbench": "3.7.* || 3.8.* || ^4.0", "orchestra/testbench": "3.7.* || 3.8.* || ^4.0",
"phpunit/phpunit": "^7.0 || ^8.0" "phpunit/phpunit": "^7.0 || ^8.0",
"predis/predis": "^1.1"
}, },
"autoload": { "autoload": {
"psr-4": { "psr-4": {

View File

@ -27,5 +27,6 @@
</logging> </logging>
<php> <php>
<env name="DB_CONNECTION" value="testing"/> <env name="DB_CONNECTION" value="testing"/>
<env name="REDIS_HOST" value="redis"/>
</php> </php>
</phpunit> </phpunit>

View File

@ -12,7 +12,6 @@ use React\Dns\Resolver\Factory as DnsFactory;
use BeyondCode\LaravelWebSockets\Statistics\DnsResolver; use BeyondCode\LaravelWebSockets\Statistics\DnsResolver;
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisClient;
use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger; use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\Server\WebSocketServerFactory; use BeyondCode\LaravelWebSockets\Server\WebSocketServerFactory;
@ -117,7 +116,6 @@ class StartWebSocketServer extends Command
protected function registerCustomRoutes() protected function registerCustomRoutes()
{ {
WebSocketsRouter::customRoutes(); WebSocketsRouter::customRoutes();
return $this; return $this;
} }
@ -140,15 +138,7 @@ class StartWebSocketServer extends Command
protected function configurePubSubReplication() protected function configurePubSubReplication()
{ {
if (config('websockets.replication.enabled') !== true) { app(ReplicationInterface::class)->boot($this->loop);
return $this;
}
if (config('websockets.replication.driver') === 'redis') {
$this->laravel->singleton(ReplicationInterface::class, function () {
return (new RedisClient())->boot($this->loop);
});
}
return $this; return $this;
} }

View File

@ -1,6 +1,6 @@
<?php <?php
namespace BeyondCode\LaravelWebSockets\PubSub\Redis; namespace BeyondCode\LaravelWebSockets\PubSub\Broadcasters;
use Pusher\Pusher; use Pusher\Pusher;
use Illuminate\Support\Arr; use Illuminate\Support\Arr;

View File

@ -0,0 +1,112 @@
<?php
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
use stdClass;
use React\EventLoop\LoopInterface;
use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
class EmptyClient implements ReplicationInterface
{
/**
* Boot the pub/sub provider (open connections, initial subscriptions, etc).
*
* @param LoopInterface $loop
* @return self
*/
public function boot(LoopInterface $loop) : ReplicationInterface
{
return $this;
}
/**
* Publish a payload on a specific channel, for a specific app.
*
* @param string $appId
* @param string $channel
* @param stdClass $payload
* @return bool
*/
public function publish(string $appId, string $channel, stdClass $payload) : bool
{
return true;
}
/**
* Subscribe to receive messages for a channel.
*
* @param string $appId
* @param string $channel
* @return bool
*/
public function subscribe(string $appId, string $channel) : bool
{
return true;
}
/**
* Unsubscribe from a channel.
*
* @param string $appId
* @param string $channel
* @return bool
*/
public function unsubscribe(string $appId, string $channel) : bool
{
return true;
}
/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
*
* @param string $appId
* @param string $channel
* @param string $socketId
* @param string $data
*/
public function joinChannel(string $appId, string $channel, string $socketId, string $data)
{
}
/**
* Remove a member from the channel. To be called when they have
* unsubscribed from the channel.
*
* @param string $appId
* @param string $channel
* @param string $socketId
*/
public function leaveChannel(string $appId, string $channel, string $socketId)
{
}
/**
* Retrieve the full information about the members in a presence channel.
*
* @param string $appId
* @param string $channel
* @return PromiseInterface
*/
public function channelMembers(string $appId, string $channel) : PromiseInterface
{
return new FulfilledPromise(null);
}
/**
* Get the amount of users subscribed for each presence channel.
*
* @param string $appId
* @param array $channelNames
* @return PromiseInterface
*/
public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface
{
return new FulfilledPromise(null);
}
}

View File

@ -1,6 +1,6 @@
<?php <?php
namespace BeyondCode\LaravelWebSockets\PubSub\Redis; namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
use stdClass; use stdClass;
use Illuminate\Support\Str; use Illuminate\Support\Str;
@ -129,6 +129,7 @@ class RedisClient implements ReplicationInterface
*/ */
public function subscribe(string $appId, string $channel): bool public function subscribe(string $appId, string $channel): bool
{ {
$this->publishClient->__call('hset', ["$appId:$channel", 541561516, "qsgdqgsd"]);
if (! isset($this->subscribedChannels["$appId:$channel"])) { if (! isset($this->subscribedChannels["$appId:$channel"])) {
// We're not subscribed to the channel yet, subscribe and set the count to 1 // We're not subscribed to the channel yet, subscribe and set the count to 1
$this->subscribeClient->__call('subscribe', ["$appId:$channel"]); $this->subscribeClient->__call('subscribe', ["$appId:$channel"]);

View File

@ -14,12 +14,18 @@ class Channel
/** @var string */ /** @var string */
protected $channelName; protected $channelName;
/**
* @var ReplicationInterface
*/
protected $pubSub;
/** @var \Ratchet\ConnectionInterface[] */ /** @var \Ratchet\ConnectionInterface[] */
protected $subscribedConnections = []; protected $subscribedConnections = [];
public function __construct(string $channelName) public function __construct(string $channelName)
{ {
$this->channelName = $channelName; $this->channelName = $channelName;
$this->pubSub = app(ReplicationInterface::class);
} }
public function getChannelName(): string public function getChannelName(): string
@ -48,7 +54,7 @@ class Channel
$signature .= ":{$payload->channel_data}"; $signature .= ":{$payload->channel_data}";
} }
if (! hash_equals( if (!hash_equals(
hash_hmac('sha256', $signature, $connection->app->secret), hash_hmac('sha256', $signature, $connection->app->secret),
Str::after($payload->auth, ':')) Str::after($payload->auth, ':'))
) { ) {
@ -63,11 +69,8 @@ class Channel
{ {
$this->saveConnection($connection); $this->saveConnection($connection);
if (config('websockets.replication.enabled') === true) { // Subscribe to broadcasted messages from the pub/sub backend
// Subscribe for broadcasted messages from the pub/sub backend $this->pubSub->subscribe($connection->app->id, $this->channelName);
app(ReplicationInterface::class)
->subscribe($connection->app->id, $this->channelName);
}
$connection->send(json_encode([ $connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded', 'event' => 'pusher_internal:subscription_succeeded',
@ -79,13 +82,10 @@ class Channel
{ {
unset($this->subscribedConnections[$connection->socketId]); unset($this->subscribedConnections[$connection->socketId]);
if (config('websockets.replication.enabled') === true) { // Unsubscribe from the pub/sub backend
// Unsubscribe from the pub/sub backend $this->pubSub->unsubscribe($connection->app->id, $this->channelName);
app(ReplicationInterface::class)
->unsubscribe($connection->app->id, $this->channelName);
}
if (! $this->hasConnections()) { if (!$this->hasConnections()) {
DashboardLogger::vacated($connection, $this->channelName); DashboardLogger::vacated($connection, $this->channelName);
} }
} }
@ -96,7 +96,7 @@ class Channel
$this->subscribedConnections[$connection->socketId] = $connection; $this->subscribedConnections[$connection->socketId] = $connection;
if (! $hadConnectionsPreviously) { if (!$hadConnectionsPreviously) {
DashboardLogger::occupied($connection, $this->channelName); DashboardLogger::occupied($connection, $this->channelName);
} }
@ -112,11 +112,8 @@ class Channel
public function broadcastToOthers(ConnectionInterface $connection, $payload) public function broadcastToOthers(ConnectionInterface $connection, $payload)
{ {
if (config('websockets.replication.enabled') === true) { // Also broadcast via the other websocket servers
// Also broadcast via the other websocket servers $this->pubSub->publish($connection->app->id, $this->channelName, $payload);
app(ReplicationInterface::class)
->publish($connection->app->id, $this->channelName, $payload);
}
$this->broadcastToEveryoneExcept($payload, $connection->socketId); $this->broadcastToEveryoneExcept($payload, $connection->socketId);
} }

View File

@ -2,6 +2,10 @@
namespace BeyondCode\LaravelWebSockets; namespace BeyondCode\LaravelWebSockets;
use BeyondCode\LaravelWebSockets\PubSub\Broadcasters\RedisPusherBroadcaster;
use BeyondCode\LaravelWebSockets\PubSub\Drivers\EmptyClient;
use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use Pusher\Pusher; use Pusher\Pusher;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Illuminate\Support\Facades\Gate; use Illuminate\Support\Facades\Gate;
@ -11,7 +15,6 @@ use Illuminate\Broadcasting\BroadcastManager;
use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Server\Router;
use BeyondCode\LaravelWebSockets\Apps\AppProvider; use BeyondCode\LaravelWebSockets\Apps\AppProvider;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use BeyondCode\LaravelWebSockets\PubSub\Redis\RedisPusherBroadcaster;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\AuthenticateDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\AuthenticateDashboard;
@ -23,15 +26,15 @@ use BeyondCode\LaravelWebSockets\Statistics\Http\Controllers\WebSocketStatistics
class WebSocketsServiceProvider extends ServiceProvider class WebSocketsServiceProvider extends ServiceProvider
{ {
public function boot(BroadcastManager $broadcastManager) public function boot()
{ {
$this->publishes([ $this->publishes([
__DIR__.'/../config/websockets.php' => base_path('config/websockets.php'), __DIR__ . '/../config/websockets.php' => base_path('config/websockets.php'),
], 'config'); ], 'config');
if (! class_exists('CreateWebSocketsStatisticsEntries')) { if (!class_exists('CreateWebSocketsStatisticsEntries')) {
$this->publishes([ $this->publishes([
__DIR__.'/../database/migrations/create_websockets_statistics_entries_table.php.stub' => database_path('migrations/'.date('Y_m_d_His', time()).'_create_websockets_statistics_entries_table.php'), __DIR__ . '/../database/migrations/create_websockets_statistics_entries_table.php.stub' => database_path('migrations/' . date('Y_m_d_His', time()) . '_create_websockets_statistics_entries_table.php'),
], 'migrations'); ], 'migrations');
} }
@ -39,14 +42,31 @@ class WebSocketsServiceProvider extends ServiceProvider
->registerRoutes() ->registerRoutes()
->registerDashboardGate(); ->registerDashboardGate();
$this->loadViewsFrom(__DIR__.'/../resources/views/', 'websockets'); $this->loadViewsFrom(__DIR__ . '/../resources/views/', 'websockets');
$this->commands([ $this->commands([
Console\StartWebSocketServer::class, Console\StartWebSocketServer::class,
Console\CleanStatistics::class, Console\CleanStatistics::class,
]); ]);
$broadcastManager->extend('redis-pusher', function ($app, array $config) { $this->configurePubSub();
}
protected function configurePubSub()
{
if (config('websockets.replication.enabled') !== true || config('websockets.replication.driver') !== 'redis') {
$this->app->singleton(ReplicationInterface::class, function () {
return (new EmptyClient());
});
return;
}
$this->app->singleton(ReplicationInterface::class, function () {
return (new RedisClient())->boot($this->loop);
});
app(BroadcastManager::class)->extend('redis-pusher', function ($app, array $config) {
$pusher = new Pusher( $pusher = new Pusher(
$config['key'], $config['secret'], $config['key'], $config['secret'],
$config['app_id'], $config['options'] ?? [] $config['app_id'], $config['options'] ?? []
@ -67,7 +87,7 @@ class WebSocketsServiceProvider extends ServiceProvider
public function register() public function register()
{ {
$this->mergeConfigFrom(__DIR__.'/../config/websockets.php', 'websockets'); $this->mergeConfigFrom(__DIR__ . '/../config/websockets.php', 'websockets');
$this->app->singleton('websockets.router', function () { $this->app->singleton('websockets.router', function () {
return new Router(); return new Router();
@ -88,7 +108,7 @@ class WebSocketsServiceProvider extends ServiceProvider
Route::prefix(config('websockets.path'))->group(function () { Route::prefix(config('websockets.path'))->group(function () {
Route::middleware(config('websockets.middleware', [AuthorizeDashboard::class]))->group(function () { Route::middleware(config('websockets.middleware', [AuthorizeDashboard::class]))->group(function () {
Route::get('/', ShowDashboard::class); Route::get('/', ShowDashboard::class);
Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']); Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']);
Route::post('auth', AuthenticateDashboard::class); Route::post('auth', AuthenticateDashboard::class);
Route::post('event', SendMessage::class); Route::post('event', SendMessage::class);
}); });

View File

@ -1,6 +1,6 @@
<?php <?php
namespace BeyondCode\LaravelWebSockets\PubSub\Fake; namespace BeyondCode\LaravelWebSockets\Tests\Mocks;
use stdClass; use stdClass;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
@ -8,7 +8,7 @@ use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface; use React\Promise\PromiseInterface;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
class FakeReplication implements ReplicationInterface class FakeReplicationClient implements ReplicationInterface
{ {
protected $channels = []; protected $channels = [];

View File

@ -2,8 +2,9 @@
namespace BeyondCode\LaravelWebSockets\Tests; namespace BeyondCode\LaravelWebSockets\Tests;
use BeyondCode\LaravelWebSockets\Tests\Mocks\FakeReplicationClient;
use Illuminate\Support\Facades\Config;
use React\EventLoop\Factory; use React\EventLoop\Factory;
use BeyondCode\LaravelWebSockets\PubSub\Fake\FakeReplication;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
trait TestsReplication trait TestsReplication
@ -11,12 +12,12 @@ trait TestsReplication
public function setupReplication() public function setupReplication()
{ {
app()->singleton(ReplicationInterface::class, function () { app()->singleton(ReplicationInterface::class, function () {
return (new FakeReplication())->boot(Factory::create()); return (new FakeReplicationClient())->boot(Factory::create());
}); });
config([ Config::set([
'websockets.replication.enabled' => true, 'websockets.replication.enabled' => true,
'websockets.replication.driver' => 'fake', 'websockets.replication.driver' => 'redis',
]); ]);
} }
} }