Merge pull request #140 from francislavoie/redis-replication
Redis as a replication backend for scalability
This commit is contained in:
commit
8dc28561f9
|
|
@ -3,3 +3,4 @@ composer.lock
|
||||||
vendor
|
vendor
|
||||||
coverage
|
coverage
|
||||||
.phpunit.result.cache
|
.phpunit.result.cache
|
||||||
|
.idea/
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@
|
||||||
"ext-json": "*",
|
"ext-json": "*",
|
||||||
"cboden/ratchet": "^0.4.1",
|
"cboden/ratchet": "^0.4.1",
|
||||||
"clue/buzz-react": "^2.5",
|
"clue/buzz-react": "^2.5",
|
||||||
|
"clue/redis-react": "^2.3",
|
||||||
"facade/ignition-contracts": "^1.0",
|
"facade/ignition-contracts": "^1.0",
|
||||||
"guzzlehttp/psr7": "^1.5",
|
"guzzlehttp/psr7": "^1.5",
|
||||||
"illuminate/broadcasting": "5.8.*|^6.0|^7.0",
|
"illuminate/broadcasting": "5.8.*|^6.0|^7.0",
|
||||||
|
|
|
||||||
|
|
@ -141,6 +141,34 @@ return [
|
||||||
|
|
||||||
],
|
],
|
||||||
|
|
||||||
|
/*
|
||||||
|
|--------------------------------------------------------------------------
|
||||||
|
| Broadcasting Replication
|
||||||
|
|--------------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
| You can enable replication to publish and subscribe to
|
||||||
|
| messages across the driver.
|
||||||
|
|
|
||||||
|
| By default, it is disabled, but you can configure it to use drivers
|
||||||
|
| like Redis to ensure connection between multiple instances of
|
||||||
|
| WebSocket servers.
|
||||||
|
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
'replication' => [
|
||||||
|
|
||||||
|
'enabled' => false,
|
||||||
|
|
||||||
|
'driver' => 'redis',
|
||||||
|
|
||||||
|
'redis' => [
|
||||||
|
|
||||||
|
'connection' => 'default',
|
||||||
|
|
||||||
|
],
|
||||||
|
|
||||||
|
],
|
||||||
|
|
||||||
'statistics' => [
|
'statistics' => [
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ class ConfigAppProvider implements AppProvider
|
||||||
{
|
{
|
||||||
return $this->apps
|
return $this->apps
|
||||||
->map(function (array $appAttributes) {
|
->map(function (array $appAttributes) {
|
||||||
return $this->instanciate($appAttributes);
|
return $this->instantiate($appAttributes);
|
||||||
})
|
})
|
||||||
->toArray();
|
->toArray();
|
||||||
}
|
}
|
||||||
|
|
@ -30,7 +30,7 @@ class ConfigAppProvider implements AppProvider
|
||||||
->apps
|
->apps
|
||||||
->firstWhere('id', $appId);
|
->firstWhere('id', $appId);
|
||||||
|
|
||||||
return $this->instanciate($appAttributes);
|
return $this->instantiate($appAttributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function findByKey(string $appKey): ?App
|
public function findByKey(string $appKey): ?App
|
||||||
|
|
@ -39,7 +39,7 @@ class ConfigAppProvider implements AppProvider
|
||||||
->apps
|
->apps
|
||||||
->firstWhere('key', $appKey);
|
->firstWhere('key', $appKey);
|
||||||
|
|
||||||
return $this->instanciate($appAttributes);
|
return $this->instantiate($appAttributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function findBySecret(string $appSecret): ?App
|
public function findBySecret(string $appSecret): ?App
|
||||||
|
|
@ -48,10 +48,10 @@ class ConfigAppProvider implements AppProvider
|
||||||
->apps
|
->apps
|
||||||
->firstWhere('secret', $appSecret);
|
->firstWhere('secret', $appSecret);
|
||||||
|
|
||||||
return $this->instanciate($appAttributes);
|
return $this->instantiate($appAttributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function instanciate(?array $appAttributes): ?App
|
protected function instantiate(?array $appAttributes): ?App
|
||||||
{
|
{
|
||||||
if (! $appAttributes) {
|
if (! $appAttributes) {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ namespace BeyondCode\LaravelWebSockets\Console;
|
||||||
|
|
||||||
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
|
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
|
||||||
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
|
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
||||||
use BeyondCode\LaravelWebSockets\Server\Logger\ConnectionLogger;
|
use BeyondCode\LaravelWebSockets\Server\Logger\ConnectionLogger;
|
||||||
use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger;
|
use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger;
|
||||||
use BeyondCode\LaravelWebSockets\Server\Logger\WebsocketsLogger;
|
use BeyondCode\LaravelWebSockets\Server\Logger\WebsocketsLogger;
|
||||||
|
|
@ -49,6 +50,7 @@ class StartWebSocketServer extends Command
|
||||||
->configureRestartTimer()
|
->configureRestartTimer()
|
||||||
->registerEchoRoutes()
|
->registerEchoRoutes()
|
||||||
->registerCustomRoutes()
|
->registerCustomRoutes()
|
||||||
|
->configurePubSubReplication()
|
||||||
->startWebSocketServer();
|
->startWebSocketServer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -61,7 +63,7 @@ class StartWebSocketServer extends Command
|
||||||
|
|
||||||
$browser = new Browser($this->loop, $connector);
|
$browser = new Browser($this->loop, $connector);
|
||||||
|
|
||||||
app()->singleton(StatisticsLoggerInterface::class, function () use ($browser) {
|
$this->laravel->singleton(StatisticsLoggerInterface::class, function () use ($browser) {
|
||||||
$class = config('websockets.statistics.logger', \BeyondCode\LaravelWebSockets\Statistics\Logger::class);
|
$class = config('websockets.statistics.logger', \BeyondCode\LaravelWebSockets\Statistics\Logger::class);
|
||||||
|
|
||||||
return new $class(app(ChannelManager::class), $browser);
|
return new $class(app(ChannelManager::class), $browser);
|
||||||
|
|
@ -76,7 +78,7 @@ class StartWebSocketServer extends Command
|
||||||
|
|
||||||
protected function configureHttpLogger()
|
protected function configureHttpLogger()
|
||||||
{
|
{
|
||||||
app()->singleton(HttpLogger::class, function () {
|
$this->laravel->singleton(HttpLogger::class, function () {
|
||||||
return (new HttpLogger($this->output))
|
return (new HttpLogger($this->output))
|
||||||
->enable($this->option('debug') ?: config('app.debug'))
|
->enable($this->option('debug') ?: config('app.debug'))
|
||||||
->verbose($this->output->isVerbose());
|
->verbose($this->output->isVerbose());
|
||||||
|
|
@ -87,7 +89,7 @@ class StartWebSocketServer extends Command
|
||||||
|
|
||||||
protected function configureMessageLogger()
|
protected function configureMessageLogger()
|
||||||
{
|
{
|
||||||
app()->singleton(WebsocketsLogger::class, function () {
|
$this->laravel->singleton(WebsocketsLogger::class, function () {
|
||||||
return (new WebsocketsLogger($this->output))
|
return (new WebsocketsLogger($this->output))
|
||||||
->enable($this->option('debug') ?: config('app.debug'))
|
->enable($this->option('debug') ?: config('app.debug'))
|
||||||
->verbose($this->output->isVerbose());
|
->verbose($this->output->isVerbose());
|
||||||
|
|
@ -98,7 +100,7 @@ class StartWebSocketServer extends Command
|
||||||
|
|
||||||
protected function configureConnectionLogger()
|
protected function configureConnectionLogger()
|
||||||
{
|
{
|
||||||
app()->bind(ConnectionLogger::class, function () {
|
$this->laravel->bind(ConnectionLogger::class, function () {
|
||||||
return (new ConnectionLogger($this->output))
|
return (new ConnectionLogger($this->output))
|
||||||
->enable(config('app.debug'))
|
->enable(config('app.debug'))
|
||||||
->verbose($this->output->isVerbose());
|
->verbose($this->output->isVerbose());
|
||||||
|
|
@ -151,6 +153,13 @@ class StartWebSocketServer extends Command
|
||||||
->run();
|
->run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected function configurePubSubReplication()
|
||||||
|
{
|
||||||
|
$this->laravel->get(ReplicationInterface::class)->boot($this->loop);
|
||||||
|
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
protected function getDnsResolver(): ResolverInterface
|
protected function getDnsResolver(): ResolverInterface
|
||||||
{
|
{
|
||||||
if (! config('websockets.statistics.perform_dns_lookup')) {
|
if (! config('websockets.statistics.perform_dns_lookup')) {
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,10 @@ namespace BeyondCode\LaravelWebSockets\Facades;
|
||||||
use BeyondCode\LaravelWebSockets\Statistics\Logger\StatisticsLogger as StatisticsLoggerInterface;
|
use BeyondCode\LaravelWebSockets\Statistics\Logger\StatisticsLogger as StatisticsLoggerInterface;
|
||||||
use Illuminate\Support\Facades\Facade;
|
use Illuminate\Support\Facades\Facade;
|
||||||
|
|
||||||
/** @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger */
|
/**
|
||||||
|
* @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger
|
||||||
|
* @mixin \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger
|
||||||
|
*/
|
||||||
class StatisticsLogger extends Facade
|
class StatisticsLogger extends Facade
|
||||||
{
|
{
|
||||||
protected static function getFacadeAccessor()
|
protected static function getFacadeAccessor()
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,10 @@ namespace BeyondCode\LaravelWebSockets\Facades;
|
||||||
|
|
||||||
use Illuminate\Support\Facades\Facade;
|
use Illuminate\Support\Facades\Facade;
|
||||||
|
|
||||||
/** @see \BeyondCode\LaravelWebSockets\Server\Router */
|
/**
|
||||||
|
* @see \BeyondCode\LaravelWebSockets\Server\Router
|
||||||
|
* @mixin \BeyondCode\LaravelWebSockets\Server\Router
|
||||||
|
*/
|
||||||
class WebSocketsRouter extends Facade
|
class WebSocketsRouter extends Facade
|
||||||
{
|
{
|
||||||
protected static function getFacadeAccessor()
|
protected static function getFacadeAccessor()
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ use Psr\Http\Message\RequestInterface;
|
||||||
use Pusher\Pusher;
|
use Pusher\Pusher;
|
||||||
use Ratchet\ConnectionInterface;
|
use Ratchet\ConnectionInterface;
|
||||||
use Ratchet\Http\HttpServerInterface;
|
use Ratchet\Http\HttpServerInterface;
|
||||||
|
use React\Promise\PromiseInterface;
|
||||||
use Symfony\Bridge\PsrHttpMessage\Factory\HttpFoundationFactory;
|
use Symfony\Bridge\PsrHttpMessage\Factory\HttpFoundationFactory;
|
||||||
use Symfony\Component\HttpKernel\Exception\HttpException;
|
use Symfony\Component\HttpKernel\Exception\HttpException;
|
||||||
|
|
||||||
|
|
@ -30,7 +31,7 @@ abstract class Controller implements HttpServerInterface
|
||||||
/** @var int */
|
/** @var int */
|
||||||
protected $contentLength;
|
protected $contentLength;
|
||||||
|
|
||||||
/** @var \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager */
|
/** @var ChannelManager */
|
||||||
protected $channelManager;
|
protected $channelManager;
|
||||||
|
|
||||||
public function __construct(ChannelManager $channelManager)
|
public function __construct(ChannelManager $channelManager)
|
||||||
|
|
@ -46,7 +47,11 @@ abstract class Controller implements HttpServerInterface
|
||||||
|
|
||||||
$this->requestBuffer = (string) $request->getBody();
|
$this->requestBuffer = (string) $request->getBody();
|
||||||
|
|
||||||
$this->checkContentLength($connection);
|
if (! $this->verifyContentLength()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->handleRequest($connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function findContentLength(array $headers): int
|
protected function findContentLength(array $headers): int
|
||||||
|
|
@ -60,31 +65,53 @@ abstract class Controller implements HttpServerInterface
|
||||||
{
|
{
|
||||||
$this->requestBuffer .= $msg;
|
$this->requestBuffer .= $msg;
|
||||||
|
|
||||||
$this->checkContentLength($from);
|
if (! $this->verifyContentLength()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->handleRequest($from);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function checkContentLength(ConnectionInterface $connection)
|
protected function verifyContentLength()
|
||||||
{
|
{
|
||||||
if (strlen($this->requestBuffer) === $this->contentLength) {
|
return strlen($this->requestBuffer) === $this->contentLength;
|
||||||
$serverRequest = (new ServerRequest(
|
}
|
||||||
$this->request->getMethod(),
|
|
||||||
$this->request->getUri(),
|
|
||||||
$this->request->getHeaders(),
|
|
||||||
$this->requestBuffer,
|
|
||||||
$this->request->getProtocolVersion()
|
|
||||||
))->withQueryParams(QueryParameters::create($this->request)->all());
|
|
||||||
|
|
||||||
$laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest));
|
protected function handleRequest(ConnectionInterface $connection)
|
||||||
|
{
|
||||||
|
$serverRequest = (new ServerRequest(
|
||||||
|
$this->request->getMethod(),
|
||||||
|
$this->request->getUri(),
|
||||||
|
$this->request->getHeaders(),
|
||||||
|
$this->requestBuffer,
|
||||||
|
$this->request->getProtocolVersion()
|
||||||
|
))->withQueryParams(QueryParameters::create($this->request)->all());
|
||||||
|
|
||||||
$this
|
$laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest));
|
||||||
->ensureValidAppId($laravelRequest->appId)
|
|
||||||
->ensureValidSignature($laravelRequest);
|
|
||||||
|
|
||||||
$response = $this($laravelRequest);
|
$this
|
||||||
|
->ensureValidAppId($laravelRequest->appId)
|
||||||
|
->ensureValidSignature($laravelRequest);
|
||||||
|
|
||||||
$connection->send(JsonResponse::create($response));
|
// Invoke the controller action
|
||||||
$connection->close();
|
$response = $this($laravelRequest);
|
||||||
|
|
||||||
|
// Allow for async IO in the controller action
|
||||||
|
if ($response instanceof PromiseInterface) {
|
||||||
|
$response->then(function ($response) use ($connection) {
|
||||||
|
$this->sendAndClose($connection, $response);
|
||||||
|
});
|
||||||
|
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$this->sendAndClose($connection, $response);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function sendAndClose(ConnectionInterface $connection, $response)
|
||||||
|
{
|
||||||
|
$connection->send(JsonResponse::create($response));
|
||||||
|
$connection->close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function onClose(ConnectionInterface $connection)
|
public function onClose(ConnectionInterface $connection)
|
||||||
|
|
@ -122,7 +149,7 @@ abstract class Controller implements HttpServerInterface
|
||||||
/*
|
/*
|
||||||
* The `auth_signature` & `body_md5` parameters are not included when calculating the `auth_signature` value.
|
* The `auth_signature` & `body_md5` parameters are not included when calculating the `auth_signature` value.
|
||||||
*
|
*
|
||||||
* The `appId`, `appKey` & `channelName` parameters are actually route paramaters and are never supplied by the client.
|
* The `appId`, `appKey` & `channelName` parameters are actually route parameters and are never supplied by the client.
|
||||||
*/
|
*/
|
||||||
$params = Arr::except($request->query(), ['auth_signature', 'body_md5', 'appId', 'appKey', 'channelName']);
|
$params = Arr::except($request->query(), ['auth_signature', 'body_md5', 'appId', 'appKey', 'channelName']);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,6 @@ class FetchChannelController extends Controller
|
||||||
throw new HttpException(404, "Unknown channel `{$request->channelName}`.");
|
throw new HttpException(404, "Unknown channel `{$request->channelName}`.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return $channel->toArray();
|
return $channel->toArray($request->appId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,9 +6,22 @@ use Illuminate\Http\Request;
|
||||||
use Illuminate\Support\Collection;
|
use Illuminate\Support\Collection;
|
||||||
use Illuminate\Support\Str;
|
use Illuminate\Support\Str;
|
||||||
use Symfony\Component\HttpKernel\Exception\HttpException;
|
use Symfony\Component\HttpKernel\Exception\HttpException;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
||||||
|
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
|
||||||
|
use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel;
|
||||||
|
|
||||||
class FetchChannelsController extends Controller
|
class FetchChannelsController extends Controller
|
||||||
{
|
{
|
||||||
|
/** @var ReplicationInterface */
|
||||||
|
protected $replication;
|
||||||
|
|
||||||
|
public function __construct(ChannelManager $channelManager, ReplicationInterface $replication)
|
||||||
|
{
|
||||||
|
parent::__construct($channelManager);
|
||||||
|
|
||||||
|
$this->replication = $replication;
|
||||||
|
}
|
||||||
|
|
||||||
public function __invoke(Request $request)
|
public function __invoke(Request $request)
|
||||||
{
|
{
|
||||||
$attributes = [];
|
$attributes = [];
|
||||||
|
|
@ -29,15 +42,28 @@ class FetchChannelsController extends Controller
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
return [
|
// We want to get the channel user count all in one shot when
|
||||||
'channels' => $channels->map(function ($channel) use ($attributes) {
|
// using a replication backend rather than doing individual queries.
|
||||||
$info = new \stdClass;
|
// To do so, we first collect the list of channel names.
|
||||||
if (in_array('user_count', $attributes)) {
|
$channelNames = $channels->map(function (PresenceChannel $channel) use ($request) {
|
||||||
$info->user_count = count($channel->getUsers());
|
return $channel->getChannelName();
|
||||||
}
|
})->toArray();
|
||||||
|
|
||||||
return $info;
|
// We ask the replication backend to get us the member count per channel.
|
||||||
})->toArray() ?: new \stdClass,
|
// We get $counts back as a key-value array of channel names and their member count.
|
||||||
];
|
return $this->replication
|
||||||
|
->channelMemberCounts($request->appId, $channelNames)
|
||||||
|
->then(function (array $counts) use ($channels, $attributes) {
|
||||||
|
return [
|
||||||
|
'channels' => $channels->map(function (PresenceChannel $channel) use ($counts, $attributes) {
|
||||||
|
$info = new \stdClass;
|
||||||
|
if (in_array('user_count', $attributes)) {
|
||||||
|
$info->user_count = $counts[$channel->getChannelName()];
|
||||||
|
}
|
||||||
|
|
||||||
|
return $info;
|
||||||
|
})->toArray() ?: new \stdClass,
|
||||||
|
];
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,10 +21,14 @@ class FetchUsersController extends Controller
|
||||||
throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"');
|
throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"');
|
||||||
}
|
}
|
||||||
|
|
||||||
return [
|
return $channel
|
||||||
'users' => Collection::make($channel->getUsers())->map(function ($user) {
|
->getUsers($request->appId)
|
||||||
return ['id' => $user->user_id];
|
->then(function (array $users) {
|
||||||
})->values(),
|
return [
|
||||||
];
|
'users' => Collection::make($users)->map(function ($user) {
|
||||||
|
return ['id' => $user->user_id];
|
||||||
|
})->values(),
|
||||||
|
];
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ class TriggerEventController extends Controller
|
||||||
'channel' => $channelName,
|
'channel' => $channelName,
|
||||||
'event' => $request->json()->get('name'),
|
'event' => $request->json()->get('name'),
|
||||||
'data' => $request->json()->get('data'),
|
'data' => $request->json()->get('data'),
|
||||||
], $request->json()->get('socket_id'));
|
], $request->json()->get('socket_id'), $request->appId);
|
||||||
|
|
||||||
DashboardLogger::apiMessage(
|
DashboardLogger::apiMessage(
|
||||||
$request->appId,
|
$request->appId,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,150 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\PubSub\Broadcasters;
|
||||||
|
|
||||||
|
use Pusher\Pusher;
|
||||||
|
use Illuminate\Support\Arr;
|
||||||
|
use Illuminate\Support\Str;
|
||||||
|
use Illuminate\Contracts\Redis\Factory as Redis;
|
||||||
|
use Illuminate\Broadcasting\Broadcasters\Broadcaster;
|
||||||
|
use Illuminate\Broadcasting\Broadcasters\UsePusherChannelConventions;
|
||||||
|
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
|
||||||
|
|
||||||
|
class RedisPusherBroadcaster extends Broadcaster
|
||||||
|
{
|
||||||
|
use UsePusherChannelConventions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Pusher SDK instance.
|
||||||
|
*
|
||||||
|
* @var \Pusher\Pusher
|
||||||
|
*/
|
||||||
|
protected $pusher;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Pusher app ID, to be passed in the payload.
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
protected $appId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Redis instance.
|
||||||
|
*
|
||||||
|
* @var \Illuminate\Contracts\Redis\Factory
|
||||||
|
*/
|
||||||
|
protected $redis;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Redis connection to use for broadcasting.
|
||||||
|
*
|
||||||
|
* @var string|null
|
||||||
|
*/
|
||||||
|
protected $connection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new broadcaster instance.
|
||||||
|
*
|
||||||
|
* @param Pusher $pusher
|
||||||
|
* @param string $appId
|
||||||
|
* @param \Illuminate\Contracts\Redis\Factory $redis
|
||||||
|
* @param string|null $connection
|
||||||
|
*/
|
||||||
|
public function __construct(Pusher $pusher, string $appId, Redis $redis, $connection = null)
|
||||||
|
{
|
||||||
|
$this->pusher = $pusher;
|
||||||
|
$this->appId = $appId;
|
||||||
|
$this->redis = $redis;
|
||||||
|
$this->connection = $connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Authenticate the incoming request for a given channel.
|
||||||
|
*
|
||||||
|
* @param \Illuminate\Http\Request $request
|
||||||
|
* @return mixed
|
||||||
|
*
|
||||||
|
* @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException
|
||||||
|
*/
|
||||||
|
public function auth($request)
|
||||||
|
{
|
||||||
|
$channelName = $this->normalizeChannelName($request->channel_name);
|
||||||
|
|
||||||
|
if ($this->isGuardedChannel($request->channel_name) &&
|
||||||
|
! $this->retrieveUser($request, $channelName)) {
|
||||||
|
throw new AccessDeniedHttpException;
|
||||||
|
}
|
||||||
|
|
||||||
|
return parent::verifyUserCanAccessChannel(
|
||||||
|
$request, $channelName
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the valid authentication response.
|
||||||
|
*
|
||||||
|
* @param \Illuminate\Http\Request $request
|
||||||
|
* @param mixed $result
|
||||||
|
* @return mixed
|
||||||
|
* @throws \Pusher\PusherException
|
||||||
|
*/
|
||||||
|
public function validAuthenticationResponse($request, $result)
|
||||||
|
{
|
||||||
|
if (Str::startsWith($request->channel_name, 'private')) {
|
||||||
|
return $this->decodePusherResponse(
|
||||||
|
$request, $this->pusher->socket_auth($request->channel_name, $request->socket_id)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
$channelName = $this->normalizeChannelName($request->channel_name);
|
||||||
|
|
||||||
|
return $this->decodePusherResponse(
|
||||||
|
$request,
|
||||||
|
$this->pusher->presence_auth(
|
||||||
|
$request->channel_name, $request->socket_id,
|
||||||
|
$this->retrieveUser($request, $channelName)->getAuthIdentifier(), $result
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decode the given Pusher response.
|
||||||
|
*
|
||||||
|
* @param \Illuminate\Http\Request $request
|
||||||
|
* @param mixed $response
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
protected function decodePusherResponse($request, $response)
|
||||||
|
{
|
||||||
|
if (! $request->input('callback', false)) {
|
||||||
|
return json_decode($response, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
return response()->json(json_decode($response, true))
|
||||||
|
->withCallback($request->callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Broadcast the given event.
|
||||||
|
*
|
||||||
|
* @param array $channels
|
||||||
|
* @param string $event
|
||||||
|
* @param array $payload
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function broadcast(array $channels, $event, array $payload = [])
|
||||||
|
{
|
||||||
|
$connection = $this->redis->connection($this->connection);
|
||||||
|
|
||||||
|
$payload = json_encode([
|
||||||
|
'appId' => $this->appId,
|
||||||
|
'event' => $event,
|
||||||
|
'data' => $payload,
|
||||||
|
'socket' => Arr::pull($payload, 'socket'),
|
||||||
|
]);
|
||||||
|
|
||||||
|
foreach ($this->formatChannels($channels) as $channel) {
|
||||||
|
$connection->publish("{$this->appId}:$channel", $payload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,138 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
|
||||||
|
|
||||||
|
use stdClass;
|
||||||
|
use React\EventLoop\LoopInterface;
|
||||||
|
use React\Promise\FulfilledPromise;
|
||||||
|
use React\Promise\PromiseInterface;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
||||||
|
|
||||||
|
class LocalClient implements ReplicationInterface
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Mapping of the presence JSON data for users in each channel.
|
||||||
|
*
|
||||||
|
* @var string[][]
|
||||||
|
*/
|
||||||
|
protected $channelData = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Boot the pub/sub provider (open connections, initial subscriptions, etc).
|
||||||
|
*
|
||||||
|
* @param LoopInterface $loop
|
||||||
|
* @return self
|
||||||
|
*/
|
||||||
|
public function boot(LoopInterface $loop) : ReplicationInterface
|
||||||
|
{
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a payload on a specific channel, for a specific app.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param stdClass $payload
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function publish(string $appId, string $channel, stdClass $payload) : bool
|
||||||
|
{
|
||||||
|
// Nothing to do, nobody to publish to
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe to receive messages for a channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function subscribe(string $appId, string $channel) : bool
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unsubscribe from a channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function unsubscribe(string $appId, string $channel) : bool
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a member to a channel. To be called when they have
|
||||||
|
* subscribed to the channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param string $socketId
|
||||||
|
* @param string $data
|
||||||
|
*/
|
||||||
|
public function joinChannel(string $appId, string $channel, string $socketId, string $data)
|
||||||
|
{
|
||||||
|
$this->channelData["$appId:$channel"][$socketId] = $data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a member from the channel. To be called when they have
|
||||||
|
* unsubscribed from the channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param string $socketId
|
||||||
|
*/
|
||||||
|
public function leaveChannel(string $appId, string $channel, string $socketId)
|
||||||
|
{
|
||||||
|
unset($this->channelData["$appId:$channel"][$socketId]);
|
||||||
|
if (empty($this->channelData["$appId:$channel"])) {
|
||||||
|
unset($this->channelData["$appId:$channel"]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the full information about the members in a presence channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function channelMembers(string $appId, string $channel) : PromiseInterface
|
||||||
|
{
|
||||||
|
$members = $this->channelData["$appId:$channel"] ?? [];
|
||||||
|
|
||||||
|
// The data is expected as objects, so we need to JSON decode
|
||||||
|
$members = array_map(function ($user) {
|
||||||
|
return json_decode($user);
|
||||||
|
}, $members);
|
||||||
|
|
||||||
|
return new FulfilledPromise($members);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the amount of users subscribed for each presence channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param array $channelNames
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface
|
||||||
|
{
|
||||||
|
$results = [];
|
||||||
|
|
||||||
|
// Count the number of users per channel
|
||||||
|
foreach ($channelNames as $channel) {
|
||||||
|
$results[$channel] = isset($this->channelData["$appId:$channel"])
|
||||||
|
? count($this->channelData["$appId:$channel"])
|
||||||
|
: 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new FulfilledPromise($results);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,276 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
|
||||||
|
|
||||||
|
use stdClass;
|
||||||
|
use Illuminate\Support\Str;
|
||||||
|
use Clue\React\Redis\Client;
|
||||||
|
use Clue\React\Redis\Factory;
|
||||||
|
use React\EventLoop\LoopInterface;
|
||||||
|
use React\Promise\PromiseInterface;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
||||||
|
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
|
||||||
|
|
||||||
|
class RedisClient implements ReplicationInterface
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @var LoopInterface
|
||||||
|
*/
|
||||||
|
protected $loop;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
protected $serverId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var Client
|
||||||
|
*/
|
||||||
|
protected $publishClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var Client
|
||||||
|
*/
|
||||||
|
protected $subscribeClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mapping of subscribed channels, where the key is the channel name,
|
||||||
|
* and the value is the amount of connections which are subscribed to
|
||||||
|
* that channel. Used to keep track of whether we still need to stay
|
||||||
|
* subscribed to those channels with Redis.
|
||||||
|
*
|
||||||
|
* @var int[]
|
||||||
|
*/
|
||||||
|
protected $subscribedChannels = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RedisClient constructor.
|
||||||
|
*/
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this->serverId = Str::uuid()->toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Boot the RedisClient, initializing the connections.
|
||||||
|
*
|
||||||
|
* @param LoopInterface $loop
|
||||||
|
* @return ReplicationInterface
|
||||||
|
*/
|
||||||
|
public function boot(LoopInterface $loop): ReplicationInterface
|
||||||
|
{
|
||||||
|
$this->loop = $loop;
|
||||||
|
|
||||||
|
$connectionUri = $this->getConnectionUri();
|
||||||
|
$factory = new Factory($this->loop);
|
||||||
|
|
||||||
|
$this->publishClient = $factory->createLazyClient($connectionUri);
|
||||||
|
$this->subscribeClient = $factory->createLazyClient($connectionUri);
|
||||||
|
|
||||||
|
$this->subscribeClient->on('message', function ($channel, $payload) {
|
||||||
|
$this->onMessage($channel, $payload);
|
||||||
|
});
|
||||||
|
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle a message received from Redis on a specific channel.
|
||||||
|
*
|
||||||
|
* @param string $redisChannel
|
||||||
|
* @param string $payload
|
||||||
|
*/
|
||||||
|
protected function onMessage(string $redisChannel, string $payload)
|
||||||
|
{
|
||||||
|
$payload = json_decode($payload);
|
||||||
|
|
||||||
|
// Ignore messages sent by ourselves
|
||||||
|
if (isset($payload->serverId) && $this->serverId === $payload->serverId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pull out the app ID. See RedisPusherBroadcaster
|
||||||
|
$appId = $payload->appId;
|
||||||
|
|
||||||
|
// We need to put the channel name in the payload.
|
||||||
|
// We strip the app ID from the channel name, websocket clients
|
||||||
|
// expect the channel name to not include the app ID.
|
||||||
|
$payload->channel = Str::after($redisChannel, "$appId:");
|
||||||
|
|
||||||
|
/* @var ChannelManager $channelManager */
|
||||||
|
$channelManager = app(ChannelManager::class);
|
||||||
|
|
||||||
|
// Load the Channel instance, if any
|
||||||
|
$channel = $channelManager->find($appId, $payload->channel);
|
||||||
|
|
||||||
|
// If no channel is found, none of our connections want to
|
||||||
|
// receive this message, so we ignore it.
|
||||||
|
if (! $channel) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$socket = $payload->socket ?? null;
|
||||||
|
|
||||||
|
// Remove fields intended for internal use from the payload
|
||||||
|
unset($payload->socket);
|
||||||
|
unset($payload->serverId);
|
||||||
|
unset($payload->appId);
|
||||||
|
|
||||||
|
// Push the message out to connected websocket clients
|
||||||
|
$channel->broadcastToEveryoneExcept($payload, $socket, $appId, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe to a channel on behalf of websocket user.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function subscribe(string $appId, string $channel): bool
|
||||||
|
{
|
||||||
|
if (! isset($this->subscribedChannels["$appId:$channel"])) {
|
||||||
|
// We're not subscribed to the channel yet, subscribe and set the count to 1
|
||||||
|
$this->subscribeClient->__call('subscribe', ["$appId:$channel"]);
|
||||||
|
$this->subscribedChannels["$appId:$channel"] = 1;
|
||||||
|
} else {
|
||||||
|
// Increment the subscribe count if we've already subscribed
|
||||||
|
$this->subscribedChannels["$appId:$channel"]++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unsubscribe from a channel on behalf of a websocket user.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function unsubscribe(string $appId, string $channel): bool
|
||||||
|
{
|
||||||
|
if (! isset($this->subscribedChannels["$appId:$channel"])) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decrement the subscription count for this channel
|
||||||
|
$this->subscribedChannels["$appId:$channel"]--;
|
||||||
|
|
||||||
|
// If we no longer have subscriptions to that channel, unsubscribe
|
||||||
|
if ($this->subscribedChannels["$appId:$channel"] < 1) {
|
||||||
|
$this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]);
|
||||||
|
unset($this->subscribedChannels["$appId:$channel"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a message to a channel on behalf of a websocket user.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param stdClass $payload
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function publish(string $appId, string $channel, stdClass $payload): bool
|
||||||
|
{
|
||||||
|
$payload->appId = $appId;
|
||||||
|
$payload->serverId = $this->serverId;
|
||||||
|
|
||||||
|
$this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a member to a channel. To be called when they have
|
||||||
|
* subscribed to the channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param string $socketId
|
||||||
|
* @param string $data
|
||||||
|
*/
|
||||||
|
public function joinChannel(string $appId, string $channel, string $socketId, string $data)
|
||||||
|
{
|
||||||
|
$this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a member from the channel. To be called when they have
|
||||||
|
* unsubscribed from the channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param string $socketId
|
||||||
|
*/
|
||||||
|
public function leaveChannel(string $appId, string $channel, string $socketId)
|
||||||
|
{
|
||||||
|
$this->publishClient->__call('hdel', ["$appId:$channel", $socketId]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the full information about the members in a presence channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function channelMembers(string $appId, string $channel): PromiseInterface
|
||||||
|
{
|
||||||
|
return $this->publishClient->__call('hgetall', ["$appId:$channel"])
|
||||||
|
->then(function ($members) {
|
||||||
|
// The data is expected as objects, so we need to JSON decode
|
||||||
|
return array_map(function ($user) {
|
||||||
|
return json_decode($user);
|
||||||
|
}, $members);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the amount of users subscribed for each presence channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param array $channelNames
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface
|
||||||
|
{
|
||||||
|
$this->publishClient->__call('multi', []);
|
||||||
|
|
||||||
|
foreach ($channelNames as $channel) {
|
||||||
|
$this->publishClient->__call('hlen', ["$appId:$channel"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $this->publishClient->__call('exec', [])
|
||||||
|
->then(function ($data) use ($channelNames) {
|
||||||
|
return array_combine($channelNames, $data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the Redis connection URL from Laravel database config.
|
||||||
|
*
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
protected function getConnectionUri()
|
||||||
|
{
|
||||||
|
$name = config('websockets.replication.connection') ?? 'default';
|
||||||
|
$config = config("database.redis.$name");
|
||||||
|
$host = $config['host'];
|
||||||
|
$port = $config['port'] ? (':'.$config['port']) : ':6379';
|
||||||
|
|
||||||
|
$query = [];
|
||||||
|
if ($config['password']) {
|
||||||
|
$query['password'] = $config['password'];
|
||||||
|
}
|
||||||
|
if ($config['database']) {
|
||||||
|
$query['database'] = $config['database'];
|
||||||
|
}
|
||||||
|
$query = http_build_query($query);
|
||||||
|
|
||||||
|
return "redis://$host$port".($query ? '?'.$query : '');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,85 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\PubSub;
|
||||||
|
|
||||||
|
use stdClass;
|
||||||
|
use React\EventLoop\LoopInterface;
|
||||||
|
use React\Promise\PromiseInterface;
|
||||||
|
|
||||||
|
interface ReplicationInterface
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Boot the pub/sub provider (open connections, initial subscriptions, etc).
|
||||||
|
*
|
||||||
|
* @param LoopInterface $loop
|
||||||
|
* @return self
|
||||||
|
*/
|
||||||
|
public function boot(LoopInterface $loop): self;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a payload on a specific channel, for a specific app.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param stdClass $payload
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function publish(string $appId, string $channel, stdClass $payload): bool;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe to receive messages for a channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function subscribe(string $appId, string $channel): bool;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unsubscribe from a channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function unsubscribe(string $appId, string $channel): bool;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a member to a channel. To be called when they have
|
||||||
|
* subscribed to the channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param string $socketId
|
||||||
|
* @param string $data
|
||||||
|
*/
|
||||||
|
public function joinChannel(string $appId, string $channel, string $socketId, string $data);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a member from the channel. To be called when they have
|
||||||
|
* unsubscribed from the channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param string $socketId
|
||||||
|
*/
|
||||||
|
public function leaveChannel(string $appId, string $channel, string $socketId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the full information about the members in a presence channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function channelMembers(string $appId, string $channel): PromiseInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the amount of users subscribed for each presence channel.
|
||||||
|
*
|
||||||
|
* @param string $appId
|
||||||
|
* @param array $channelNames
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface;
|
||||||
|
}
|
||||||
|
|
@ -94,7 +94,7 @@ class Router
|
||||||
* If the given action is a class that handles WebSockets, then it's not a regular
|
* If the given action is a class that handles WebSockets, then it's not a regular
|
||||||
* controller but a WebSocketHandler that needs to converted to a WsServer.
|
* controller but a WebSocketHandler that needs to converted to a WsServer.
|
||||||
*
|
*
|
||||||
* If the given action is a regular controller we'll just instanciate it.
|
* If the given action is a regular controller we'll just instantiate it.
|
||||||
*/
|
*/
|
||||||
$action = is_subclass_of($action, MessageComponentInterface::class)
|
$action = is_subclass_of($action, MessageComponentInterface::class)
|
||||||
? $this->createWebSocketsServer($action)
|
? $this->createWebSocketsServer($action)
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
|
namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
|
||||||
|
|
||||||
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
|
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
||||||
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
|
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
|
||||||
use Illuminate\Support\Str;
|
use Illuminate\Support\Str;
|
||||||
use Ratchet\ConnectionInterface;
|
use Ratchet\ConnectionInterface;
|
||||||
|
|
@ -13,12 +14,21 @@ class Channel
|
||||||
/** @var string */
|
/** @var string */
|
||||||
protected $channelName;
|
protected $channelName;
|
||||||
|
|
||||||
|
/** @var ReplicationInterface */
|
||||||
|
protected $replication;
|
||||||
|
|
||||||
/** @var \Ratchet\ConnectionInterface[] */
|
/** @var \Ratchet\ConnectionInterface[] */
|
||||||
protected $subscribedConnections = [];
|
protected $subscribedConnections = [];
|
||||||
|
|
||||||
public function __construct(string $channelName)
|
public function __construct(string $channelName)
|
||||||
{
|
{
|
||||||
$this->channelName = $channelName;
|
$this->channelName = $channelName;
|
||||||
|
$this->replication = app(ReplicationInterface::class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getChannelName(): string
|
||||||
|
{
|
||||||
|
return $this->channelName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function hasConnections(): bool
|
public function hasConnections(): bool
|
||||||
|
|
@ -31,6 +41,9 @@ class Channel
|
||||||
return $this->subscribedConnections;
|
return $this->subscribedConnections;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws InvalidSignature
|
||||||
|
*/
|
||||||
protected function verifySignature(ConnectionInterface $connection, stdClass $payload)
|
protected function verifySignature(ConnectionInterface $connection, stdClass $payload)
|
||||||
{
|
{
|
||||||
$signature = "{$connection->socketId}:{$this->channelName}";
|
$signature = "{$connection->socketId}:{$this->channelName}";
|
||||||
|
|
@ -39,18 +52,24 @@ class Channel
|
||||||
$signature .= ":{$payload->channel_data}";
|
$signature .= ":{$payload->channel_data}";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Str::after($payload->auth, ':') !== hash_hmac('sha256', $signature, $connection->app->secret)) {
|
if (! hash_equals(
|
||||||
|
hash_hmac('sha256', $signature, $connection->app->secret),
|
||||||
|
Str::after($payload->auth, ':'))
|
||||||
|
) {
|
||||||
throw new InvalidSignature();
|
throw new InvalidSignature();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
|
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
|
||||||
*/
|
*/
|
||||||
public function subscribe(ConnectionInterface $connection, stdClass $payload)
|
public function subscribe(ConnectionInterface $connection, stdClass $payload)
|
||||||
{
|
{
|
||||||
$this->saveConnection($connection);
|
$this->saveConnection($connection);
|
||||||
|
|
||||||
|
// Subscribe to broadcasted messages from the pub/sub backend
|
||||||
|
$this->replication->subscribe($connection->app->id, $this->channelName);
|
||||||
|
|
||||||
$connection->send(json_encode([
|
$connection->send(json_encode([
|
||||||
'event' => 'pusher_internal:subscription_succeeded',
|
'event' => 'pusher_internal:subscription_succeeded',
|
||||||
'channel' => $this->channelName,
|
'channel' => $this->channelName,
|
||||||
|
|
@ -61,6 +80,9 @@ class Channel
|
||||||
{
|
{
|
||||||
unset($this->subscribedConnections[$connection->socketId]);
|
unset($this->subscribedConnections[$connection->socketId]);
|
||||||
|
|
||||||
|
// Unsubscribe from the pub/sub backend
|
||||||
|
$this->replication->unsubscribe($connection->app->id, $this->channelName);
|
||||||
|
|
||||||
if (! $this->hasConnections()) {
|
if (! $this->hasConnections()) {
|
||||||
DashboardLogger::vacated($connection, $this->channelName);
|
DashboardLogger::vacated($connection, $this->channelName);
|
||||||
}
|
}
|
||||||
|
|
@ -88,13 +110,26 @@ class Channel
|
||||||
|
|
||||||
public function broadcastToOthers(ConnectionInterface $connection, $payload)
|
public function broadcastToOthers(ConnectionInterface $connection, $payload)
|
||||||
{
|
{
|
||||||
$this->broadcastToEveryoneExcept($payload, $connection->socketId);
|
$this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function broadcastToEveryoneExcept($payload, ?string $socketId = null)
|
public function broadcastToEveryoneExcept($payload, ?string $socketId, string $appId, bool $publish = true)
|
||||||
{
|
{
|
||||||
|
// Also broadcast via the other websocket server instances.
|
||||||
|
// This is set false in the Redis client because we don't want to cause a loop
|
||||||
|
// in this case. If this came from TriggerEventController, then we still want
|
||||||
|
// to publish to get the message out to other server instances.
|
||||||
|
if ($publish) {
|
||||||
|
$this->replication->publish($appId, $this->channelName, $payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Performance optimization, if we don't have a socket ID,
|
||||||
|
// then we avoid running the if condition in the foreach loop below
|
||||||
|
// by calling broadcast() instead.
|
||||||
if (is_null($socketId)) {
|
if (is_null($socketId)) {
|
||||||
return $this->broadcast($payload);
|
$this->broadcast($payload);
|
||||||
|
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach ($this->subscribedConnections as $connection) {
|
foreach ($this->subscribedConnections as $connection) {
|
||||||
|
|
@ -104,7 +139,7 @@ class Channel
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function toArray(): array
|
public function toArray(string $appId = null)
|
||||||
{
|
{
|
||||||
return [
|
return [
|
||||||
'occupied' => count($this->subscribedConnections) > 0,
|
'occupied' => count($this->subscribedConnections) > 0,
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ class ArrayChannelManager implements ChannelManager
|
||||||
/** @var string */
|
/** @var string */
|
||||||
protected $appId;
|
protected $appId;
|
||||||
|
|
||||||
/** @var array */
|
/** @var Channel[][] */
|
||||||
protected $channels = [];
|
protected $channels = [];
|
||||||
|
|
||||||
public function findOrCreate(string $appId, string $channelName): Channel
|
public function findOrCreate(string $appId, string $channelName): Channel
|
||||||
|
|
|
||||||
|
|
@ -2,20 +2,42 @@
|
||||||
|
|
||||||
namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
|
namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
|
||||||
use Ratchet\ConnectionInterface;
|
use Ratchet\ConnectionInterface;
|
||||||
|
use React\Promise\PromiseInterface;
|
||||||
use stdClass;
|
use stdClass;
|
||||||
|
|
||||||
class PresenceChannel extends Channel
|
class PresenceChannel extends Channel
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Data for the users connected to this channel.
|
||||||
|
*
|
||||||
|
* Note: If replication is enabled, this will only contain entries
|
||||||
|
* for the users directly connected to this server instance. Requests
|
||||||
|
* for data for all users in the channel should be routed through
|
||||||
|
* ReplicationInterface.
|
||||||
|
*
|
||||||
|
* @var string[]
|
||||||
|
*/
|
||||||
protected $users = [];
|
protected $users = [];
|
||||||
|
|
||||||
public function getUsers(): array
|
/**
|
||||||
|
* @param string $appId
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function getUsers(string $appId)
|
||||||
{
|
{
|
||||||
return $this->users;
|
// Get the members list from the replication backend
|
||||||
|
return $this->replication
|
||||||
|
->channelMembers($appId, $this->channelName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
|
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
|
||||||
|
*
|
||||||
|
* @param ConnectionInterface $connection
|
||||||
|
* @param stdClass $payload
|
||||||
|
* @throws InvalidSignature
|
||||||
*/
|
*/
|
||||||
public function subscribe(ConnectionInterface $connection, stdClass $payload)
|
public function subscribe(ConnectionInterface $connection, stdClass $payload)
|
||||||
{
|
{
|
||||||
|
|
@ -26,14 +48,29 @@ class PresenceChannel extends Channel
|
||||||
$channelData = json_decode($payload->channel_data);
|
$channelData = json_decode($payload->channel_data);
|
||||||
$this->users[$connection->socketId] = $channelData;
|
$this->users[$connection->socketId] = $channelData;
|
||||||
|
|
||||||
// Send the success event
|
// Add the connection as a member of the channel
|
||||||
$connection->send(json_encode([
|
$this->replication
|
||||||
'event' => 'pusher_internal:subscription_succeeded',
|
->joinChannel(
|
||||||
'channel' => $this->channelName,
|
$connection->app->id,
|
||||||
'data' => json_encode($this->getChannelData()),
|
$this->channelName,
|
||||||
]));
|
$connection->socketId,
|
||||||
|
json_encode($channelData)
|
||||||
|
);
|
||||||
|
|
||||||
$this->broadcastToOthers($connection, [
|
// We need to pull the channel data from the replication backend,
|
||||||
|
// otherwise we won't be sending the full details of the channel
|
||||||
|
$this->replication
|
||||||
|
->channelMembers($connection->app->id, $this->channelName)
|
||||||
|
->then(function ($users) use ($connection) {
|
||||||
|
// Send the success event
|
||||||
|
$connection->send(json_encode([
|
||||||
|
'event' => 'pusher_internal:subscription_succeeded',
|
||||||
|
'channel' => $this->channelName,
|
||||||
|
'data' => json_encode($this->getChannelData($users)),
|
||||||
|
]));
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->broadcastToOthers($connection, (object) [
|
||||||
'event' => 'pusher_internal:member_added',
|
'event' => 'pusher_internal:member_added',
|
||||||
'channel' => $this->channelName,
|
'channel' => $this->channelName,
|
||||||
'data' => json_encode($channelData),
|
'data' => json_encode($channelData),
|
||||||
|
|
@ -48,7 +85,15 @@ class PresenceChannel extends Channel
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->broadcastToOthers($connection, [
|
// Remove the connection as a member of the channel
|
||||||
|
$this->replication
|
||||||
|
->leaveChannel(
|
||||||
|
$connection->app->id,
|
||||||
|
$this->channelName,
|
||||||
|
$connection->socketId
|
||||||
|
);
|
||||||
|
|
||||||
|
$this->broadcastToOthers($connection, (object) [
|
||||||
'event' => 'pusher_internal:member_removed',
|
'event' => 'pusher_internal:member_removed',
|
||||||
'channel' => $this->channelName,
|
'channel' => $this->channelName,
|
||||||
'data' => json_encode([
|
'data' => json_encode([
|
||||||
|
|
@ -59,38 +104,46 @@ class PresenceChannel extends Channel
|
||||||
unset($this->users[$connection->socketId]);
|
unset($this->users[$connection->socketId]);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function getChannelData(): array
|
/**
|
||||||
|
* @param string|null $appId
|
||||||
|
* @return PromiseInterface
|
||||||
|
*/
|
||||||
|
public function toArray(string $appId = null)
|
||||||
|
{
|
||||||
|
return $this->replication
|
||||||
|
->channelMembers($appId, $this->channelName)
|
||||||
|
->then(function ($users) {
|
||||||
|
return array_merge(parent::toArray(), [
|
||||||
|
'user_count' => count($users),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function getChannelData(array $users): array
|
||||||
{
|
{
|
||||||
return [
|
return [
|
||||||
'presence' => [
|
'presence' => [
|
||||||
'ids' => $this->getUserIds(),
|
'ids' => $this->getUserIds($users),
|
||||||
'hash' => $this->getHash(),
|
'hash' => $this->getHash($users),
|
||||||
'count' => count($this->users),
|
'count' => count($users),
|
||||||
],
|
],
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
public function toArray(): array
|
protected function getUserIds(array $users): array
|
||||||
{
|
|
||||||
return array_merge(parent::toArray(), [
|
|
||||||
'user_count' => count($this->users),
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected function getUserIds(): array
|
|
||||||
{
|
{
|
||||||
$userIds = array_map(function ($channelData) {
|
$userIds = array_map(function ($channelData) {
|
||||||
return (string) $channelData->user_id;
|
return (string) $channelData->user_id;
|
||||||
}, $this->users);
|
}, $users);
|
||||||
|
|
||||||
return array_values($userIds);
|
return array_values($userIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function getHash(): array
|
protected function getHash(array $users): array
|
||||||
{
|
{
|
||||||
$hash = [];
|
$hash = [];
|
||||||
|
|
||||||
foreach ($this->users as $socketId => $channelData) {
|
foreach ($users as $socketId => $channelData) {
|
||||||
$hash[$channelData->user_id] = $channelData->user_info;
|
$hash[$channelData->user_id] = $channelData->user_info;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,11 +2,15 @@
|
||||||
|
|
||||||
namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
|
namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
|
||||||
use Ratchet\ConnectionInterface;
|
use Ratchet\ConnectionInterface;
|
||||||
use stdClass;
|
use stdClass;
|
||||||
|
|
||||||
class PrivateChannel extends Channel
|
class PrivateChannel extends Channel
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* @throws InvalidSignature
|
||||||
|
*/
|
||||||
public function subscribe(ConnectionInterface $connection, stdClass $payload)
|
public function subscribe(ConnectionInterface $connection, stdClass $payload)
|
||||||
{
|
{
|
||||||
$this->verifySignature($connection, $payload);
|
$this->verifySignature($connection, $payload);
|
||||||
|
|
|
||||||
|
|
@ -8,15 +8,22 @@ use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\DashboardApiControll
|
||||||
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage;
|
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage;
|
||||||
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard;
|
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard;
|
||||||
use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard;
|
use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\Broadcasters\RedisPusherBroadcaster;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
||||||
use BeyondCode\LaravelWebSockets\Server\Router;
|
use BeyondCode\LaravelWebSockets\Server\Router;
|
||||||
use BeyondCode\LaravelWebSockets\Statistics\Http\Controllers\WebSocketStatisticsEntriesController;
|
use BeyondCode\LaravelWebSockets\Statistics\Http\Controllers\WebSocketStatisticsEntriesController;
|
||||||
use BeyondCode\LaravelWebSockets\Statistics\Http\Middleware\Authorize as AuthorizeStatistics;
|
use BeyondCode\LaravelWebSockets\Statistics\Http\Middleware\Authorize as AuthorizeStatistics;
|
||||||
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
|
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
|
||||||
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager;
|
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager;
|
||||||
|
use Pusher\Pusher;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Illuminate\Broadcasting\BroadcastManager;
|
||||||
use Illuminate\Support\Facades\Gate;
|
use Illuminate\Support\Facades\Gate;
|
||||||
use Illuminate\Support\Facades\Route;
|
use Illuminate\Support\Facades\Route;
|
||||||
use Illuminate\Support\Facades\Schema;
|
|
||||||
use Illuminate\Support\ServiceProvider;
|
use Illuminate\Support\ServiceProvider;
|
||||||
|
use Illuminate\Support\Facades\Schema;
|
||||||
|
|
||||||
class WebSocketsServiceProvider extends ServiceProvider
|
class WebSocketsServiceProvider extends ServiceProvider
|
||||||
{
|
{
|
||||||
|
|
@ -43,6 +50,41 @@ class WebSocketsServiceProvider extends ServiceProvider
|
||||||
Console\CleanStatistics::class,
|
Console\CleanStatistics::class,
|
||||||
Console\RestartWebSocketServer::class,
|
Console\RestartWebSocketServer::class,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
$this->configurePubSub();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function configurePubSub()
|
||||||
|
{
|
||||||
|
if (config('websockets.replication.enabled') !== true || config('websockets.replication.driver') !== 'redis') {
|
||||||
|
$this->app->singleton(ReplicationInterface::class, function () {
|
||||||
|
return new LocalClient();
|
||||||
|
});
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->app->singleton(ReplicationInterface::class, function () {
|
||||||
|
return (new RedisClient())->boot($this->loop);
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->app->get(BroadcastManager::class)->extend('redis-pusher', function ($app, array $config) {
|
||||||
|
$pusher = new Pusher(
|
||||||
|
$config['key'], $config['secret'],
|
||||||
|
$config['app_id'], $config['options'] ?? []
|
||||||
|
);
|
||||||
|
|
||||||
|
if ($config['log'] ?? false) {
|
||||||
|
$pusher->setLogger($this->app->make(LoggerInterface::class));
|
||||||
|
}
|
||||||
|
|
||||||
|
return new RedisPusherBroadcaster(
|
||||||
|
$pusher,
|
||||||
|
$config['app_id'],
|
||||||
|
$this->app->make('redis'),
|
||||||
|
$config['connection'] ?? null
|
||||||
|
);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public function register()
|
public function register()
|
||||||
|
|
@ -60,7 +102,7 @@ class WebSocketsServiceProvider extends ServiceProvider
|
||||||
});
|
});
|
||||||
|
|
||||||
$this->app->singleton(AppProvider::class, function () {
|
$this->app->singleton(AppProvider::class, function () {
|
||||||
return app(config('websockets.managers.app'));
|
return $this->app->make(config('websockets.managers.app'));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -69,7 +111,7 @@ class WebSocketsServiceProvider extends ServiceProvider
|
||||||
Route::prefix(config('websockets.dashboard.path'))->group(function () {
|
Route::prefix(config('websockets.dashboard.path'))->group(function () {
|
||||||
Route::middleware(config('websockets.dashboard.middleware', [AuthorizeDashboard::class]))->group(function () {
|
Route::middleware(config('websockets.dashboard.middleware', [AuthorizeDashboard::class]))->group(function () {
|
||||||
Route::get('/', ShowDashboard::class);
|
Route::get('/', ShowDashboard::class);
|
||||||
Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']);
|
Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']);
|
||||||
Route::post('auth', AuthenticateDashboard::class);
|
Route::post('auth', AuthenticateDashboard::class);
|
||||||
Route::post('event', SendMessage::class);
|
Route::post('event', SendMessage::class);
|
||||||
});
|
});
|
||||||
|
|
@ -85,7 +127,7 @@ class WebSocketsServiceProvider extends ServiceProvider
|
||||||
protected function registerDashboardGate()
|
protected function registerDashboardGate()
|
||||||
{
|
{
|
||||||
Gate::define('viewWebSocketsDashboard', function ($user = null) {
|
Gate::define('viewWebSocketsDashboard', function ($user = null) {
|
||||||
return app()->environment('local');
|
return $this->app->environment('local');
|
||||||
});
|
});
|
||||||
|
|
||||||
return $this;
|
return $this;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\Channels;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class ChannelReplicationTest extends ChannelTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -123,7 +123,7 @@ class ChannelTest extends TestCase
|
||||||
|
|
||||||
$channel = $this->getChannel($connection1, 'test-channel');
|
$channel = $this->getChannel($connection1, 'test-channel');
|
||||||
|
|
||||||
$channel->broadcastToOthers($connection1, [
|
$channel->broadcastToOthers($connection1, (object) [
|
||||||
'event' => 'broadcasted-event',
|
'event' => 'broadcasted-event',
|
||||||
'channel' => 'test-channel',
|
'channel' => 'test-channel',
|
||||||
]);
|
]);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\Channels;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class PresenceChannelReplicationTest extends PresenceChannelTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -59,4 +59,75 @@ class PresenceChannelTest extends TestCase
|
||||||
'channel' => 'presence-channel',
|
'channel' => 'presence-channel',
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @test */
|
||||||
|
public function clients_with_valid_auth_signatures_can_leave_presence_channels()
|
||||||
|
{
|
||||||
|
$connection = $this->getWebSocketConnection();
|
||||||
|
|
||||||
|
$this->pusherServer->onOpen($connection);
|
||||||
|
|
||||||
|
$channelData = [
|
||||||
|
'user_id' => 1,
|
||||||
|
'user_info' => [
|
||||||
|
'name' => 'Marcel',
|
||||||
|
],
|
||||||
|
];
|
||||||
|
|
||||||
|
$signature = "{$connection->socketId}:presence-channel:".json_encode($channelData);
|
||||||
|
|
||||||
|
$message = new Message(json_encode([
|
||||||
|
'event' => 'pusher:subscribe',
|
||||||
|
'data' => [
|
||||||
|
'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret),
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
'channel_data' => json_encode($channelData),
|
||||||
|
],
|
||||||
|
]));
|
||||||
|
|
||||||
|
$this->pusherServer->onMessage($connection, $message);
|
||||||
|
|
||||||
|
$connection->assertSentEvent('pusher_internal:subscription_succeeded', [
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
]);
|
||||||
|
|
||||||
|
$message = new Message(json_encode([
|
||||||
|
'event' => 'pusher:unsubscribe',
|
||||||
|
'data' => [
|
||||||
|
'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret),
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
],
|
||||||
|
]));
|
||||||
|
|
||||||
|
$this->pusherServer->onMessage($connection, $message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @test */
|
||||||
|
public function clients_with_valid_auth_signatures_cannot_leave_channels_they_are_not_in()
|
||||||
|
{
|
||||||
|
$connection = $this->getWebSocketConnection();
|
||||||
|
|
||||||
|
$this->pusherServer->onOpen($connection);
|
||||||
|
|
||||||
|
$channelData = [
|
||||||
|
'user_id' => 1,
|
||||||
|
'user_info' => [
|
||||||
|
'name' => 'Marcel',
|
||||||
|
],
|
||||||
|
];
|
||||||
|
|
||||||
|
$signature = "{$connection->socketId}:presence-channel:".json_encode($channelData);
|
||||||
|
|
||||||
|
$message = new Message(json_encode([
|
||||||
|
'event' => 'pusher:unsubscribe',
|
||||||
|
'data' => [
|
||||||
|
'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret),
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
],
|
||||||
|
]));
|
||||||
|
|
||||||
|
$this->pusherServer->onMessage($connection, $message);
|
||||||
|
|
||||||
|
$this->markTestAsPassed();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ class AppTest extends TestCase
|
||||||
/** @test */
|
/** @test */
|
||||||
public function it_can_create_a_client()
|
public function it_can_create_a_client()
|
||||||
{
|
{
|
||||||
new App(1, 'appKey', 'appSecret', 'new');
|
new App(1, 'appKey', 'appSecret');
|
||||||
|
|
||||||
$this->markTestAsPassed();
|
$this->markTestAsPassed();
|
||||||
}
|
}
|
||||||
|
|
@ -21,7 +21,7 @@ class AppTest extends TestCase
|
||||||
{
|
{
|
||||||
$this->expectException(InvalidApp::class);
|
$this->expectException(InvalidApp::class);
|
||||||
|
|
||||||
new App(1, '', 'appSecret', 'new');
|
new App(1, '', 'appSecret');
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @test */
|
/** @test */
|
||||||
|
|
@ -29,6 +29,6 @@ class AppTest extends TestCase
|
||||||
{
|
{
|
||||||
$this->expectException(InvalidApp::class);
|
$this->expectException(InvalidApp::class);
|
||||||
|
|
||||||
new App(1, 'appKey', '', 'new');
|
new App(1, 'appKey', '');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ class ConnectionTest extends TestCase
|
||||||
$this->pusherServer->onOpen($connection);
|
$this->pusherServer->onOpen($connection);
|
||||||
|
|
||||||
$this->assertInstanceOf(App::class, $connection->app);
|
$this->assertInstanceOf(App::class, $connection->app);
|
||||||
$this->assertSame(1234, $connection->app->id);
|
$this->assertSame('1234', $connection->app->id);
|
||||||
$this->assertSame('TestKey', $connection->app->key);
|
$this->assertSame('TestKey', $connection->app->key);
|
||||||
$this->assertSame('TestSecret', $connection->app->secret);
|
$this->assertSame('TestSecret', $connection->app->secret);
|
||||||
$this->assertSame('Test App', $connection->app->name);
|
$this->assertSame('Test App', $connection->app->name);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\HttpApi;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class FetchChannelReplicationTest extends FetchChannelTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -66,6 +66,38 @@ class FetchChannelTest extends TestCase
|
||||||
], json_decode($response->getContent(), true));
|
], json_decode($response->getContent(), true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @test */
|
||||||
|
public function it_returns_presence_channel_information()
|
||||||
|
{
|
||||||
|
$this->joinPresenceChannel('presence-channel');
|
||||||
|
$this->joinPresenceChannel('presence-channel');
|
||||||
|
|
||||||
|
$connection = new Connection();
|
||||||
|
|
||||||
|
$requestPath = '/apps/1234/channel/my-channel';
|
||||||
|
$routeParams = [
|
||||||
|
'appId' => '1234',
|
||||||
|
'channelName' => 'presence-channel',
|
||||||
|
];
|
||||||
|
|
||||||
|
$queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath);
|
||||||
|
|
||||||
|
$request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams));
|
||||||
|
|
||||||
|
$controller = app(FetchChannelController::class);
|
||||||
|
|
||||||
|
$controller->onOpen($connection, $request);
|
||||||
|
|
||||||
|
/** @var JsonResponse $response */
|
||||||
|
$response = array_pop($connection->sentRawData);
|
||||||
|
|
||||||
|
$this->assertSame([
|
||||||
|
'occupied' => true,
|
||||||
|
'subscription_count' => 2,
|
||||||
|
'user_count' => 2,
|
||||||
|
], json_decode($response->getContent(), true));
|
||||||
|
}
|
||||||
|
|
||||||
/** @test */
|
/** @test */
|
||||||
public function it_returns_404_for_invalid_channels()
|
public function it_returns_404_for_invalid_channels()
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\HttpApi;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class FetchChannelsReplicationTest extends FetchChannelsTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests\HttpApi;
|
||||||
|
|
||||||
|
use BeyondCode\LaravelWebSockets\Tests\TestsReplication;
|
||||||
|
|
||||||
|
class FetchUsersReplicationTest extends FetchUsersTest
|
||||||
|
{
|
||||||
|
use TestsReplication;
|
||||||
|
|
||||||
|
public function setUp() : void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
|
||||||
|
$this->setupReplication();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -22,16 +22,21 @@ class WebSocketsStatisticsControllerTest extends TestCase
|
||||||
|
|
||||||
$this->assertCount(1, $entries);
|
$this->assertCount(1, $entries);
|
||||||
|
|
||||||
$this->assertArrayHasKey('app_id', $entries->first()->attributesToArray());
|
$actual = $entries->first()->attributesToArray();
|
||||||
|
|
||||||
|
foreach ($this->payload() as $key => $value) {
|
||||||
|
$this->assertArrayHasKey($key, $actual);
|
||||||
|
$this->assertSame($value, $actual[$key]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function payload(): array
|
protected function payload(): array
|
||||||
{
|
{
|
||||||
return [
|
return [
|
||||||
'app_id' => config('websockets.apps.0.id'),
|
'app_id' => config('websockets.apps.0.id'),
|
||||||
'peak_connection_count' => 1,
|
'peak_connection_count' => '1',
|
||||||
'websocket_message_count' => 2,
|
'websocket_message_count' => '2',
|
||||||
'api_message_count' => 3,
|
'api_message_count' => '3',
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,9 +46,10 @@ abstract class TestCase extends \Orchestra\Testbench\TestCase
|
||||||
$app['config']->set('websockets.apps', [
|
$app['config']->set('websockets.apps', [
|
||||||
[
|
[
|
||||||
'name' => 'Test App',
|
'name' => 'Test App',
|
||||||
'id' => 1234,
|
'id' => '1234',
|
||||||
'key' => 'TestKey',
|
'key' => 'TestKey',
|
||||||
'secret' => 'TestSecret',
|
'secret' => 'TestSecret',
|
||||||
|
'host' => 'localhost',
|
||||||
'capacity' => null,
|
'capacity' => null,
|
||||||
'enable_client_messages' => false,
|
'enable_client_messages' => false,
|
||||||
'enable_statistics' => true,
|
'enable_statistics' => true,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace BeyondCode\LaravelWebSockets\Tests;
|
||||||
|
|
||||||
|
use Illuminate\Support\Facades\Config;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient;
|
||||||
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
||||||
|
|
||||||
|
trait TestsReplication
|
||||||
|
{
|
||||||
|
public function setupReplication()
|
||||||
|
{
|
||||||
|
app()->singleton(ReplicationInterface::class, function () {
|
||||||
|
return new LocalClient();
|
||||||
|
});
|
||||||
|
|
||||||
|
Config::set([
|
||||||
|
'websockets.replication.enabled' => true,
|
||||||
|
'websockets.replication.driver' => 'redis',
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue