Fixed backend streaming

This commit is contained in:
Alex Renoki 2020-08-27 14:13:17 +03:00
parent 1877819edc
commit a897ce2cd3
11 changed files with 46 additions and 252 deletions

View File

@ -27,43 +27,8 @@ To enable the replication, simply change the `replication.driver` name in the `w
], ],
``` ```
Now, when your app broadcasts the message, it will make sure the connection reaches other servers which are under the same load balancer.
The available drivers for replication are: The available drivers for replication are:
- [Redis](redis) - [Redis](redis)
## Configure the Broadcasting driver
Laravel WebSockets comes with an additional `websockets` broadcaster driver that accepts configurations like the Pusher driver, but will make sure the broadcasting will work across all websocket servers:
```php
'connections' => [
'pusher' => [
...
],
'websockets' => [
'driver' => 'websockets',
'key' => env('PUSHER_APP_KEY'),
'secret' => env('PUSHER_APP_SECRET'),
'app_id' => env('PUSHER_APP_ID'),
'options' => [
'cluster' => env('PUSHER_APP_CLUSTER'),
'encrypted' => true,
'host' => env('PUSHER_APP_HOST', '127.0.0.1'),
'port' => env('PUSHER_APP_PORT', 6001),
'scheme' => env('PUSHER_APP_SCHEME', 'http'),
'curl_options' => [
CURLOPT_SSL_VERIFYHOST => 0,
CURLOPT_SSL_VERIFYPEER => 0,
],
],
],
```
Make sure to change the `BROADCAST_DRIVER`:
```
BROADCAST_DRIVER=websockets
```
Now, when your app broadcasts the message, it will make sure the connection reaches other servers which are under the same load balancer.

View File

@ -16,16 +16,13 @@ trait PushesToPusher
*/ */
public function getPusherBroadcaster(array $app) public function getPusherBroadcaster(array $app)
{ {
if (config('websockets.replication.driver') === 'redis') {
return new RedisPusherBroadcaster(
new Pusher($app['key'], $app['secret'], $app['id'], config('broadcasting.connections.websockets.options', [])),
$app['id'],
app('redis')
);
}
return new PusherBroadcaster( return new PusherBroadcaster(
new Pusher($app['key'], $app['secret'], $app['id'], config('broadcasting.connections.pusher.options', [])) new Pusher(
$app['key'],
$app['secret'],
$app['id'],
config('broadcasting.connections.pusher.options', [])
)
); );
} }
} }

View File

@ -19,6 +19,7 @@ use Ratchet\Http\HttpServerInterface;
use React\Promise\PromiseInterface; use React\Promise\PromiseInterface;
use Symfony\Bridge\PsrHttpMessage\Factory\HttpFoundationFactory; use Symfony\Bridge\PsrHttpMessage\Factory\HttpFoundationFactory;
use Symfony\Component\HttpKernel\Exception\HttpException; use Symfony\Component\HttpKernel\Exception\HttpException;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
abstract class Controller implements HttpServerInterface abstract class Controller implements HttpServerInterface
{ {
@ -51,15 +52,24 @@ abstract class Controller implements HttpServerInterface
*/ */
protected $channelManager; protected $channelManager;
/**
* The replicator driver.
*
* @var \BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface
*/
protected $replicator;
/** /**
* Initialize the request. * Initialize the request.
* *
* @param \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager $channelManager * @param \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager $channelManager
* @param \BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface $replicator
* @return void * @return void
*/ */
public function __construct(ChannelManager $channelManager) public function __construct(ChannelManager $channelManager, ReplicationInterface $replicator)
{ {
$this->channelManager = $channelManager; $this->channelManager = $channelManager;
$this->replicator = $replicator;
} }
/** /**

View File

@ -13,27 +13,6 @@ use Symfony\Component\HttpKernel\Exception\HttpException;
class FetchChannelsController extends Controller class FetchChannelsController extends Controller
{ {
/**
* The replicator driver.
*
* @var \BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface
*/
protected $replicator;
/**
* Initialize the class.
*
* @param \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager $channelManager
* @param \BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface $replicator
* @return void
*/
public function __construct(ChannelManager $channelManager, ReplicationInterface $replicator)
{
parent::__construct($channelManager);
$this->replicator = $replicator;
}
/** /**
* Handle the incoming request. * Handle the incoming request.
* *

View File

@ -18,14 +18,26 @@ class TriggerEventController extends Controller
{ {
$this->ensureValidSignature($request); $this->ensureValidSignature($request);
foreach ($request->json()->get('channels', []) as $channelName) { $channels = $request->channels ?: [];
foreach ($channels as $channelName) {
$channel = $this->channelManager->find($request->appId, $channelName); $channel = $this->channelManager->find($request->appId, $channelName);
optional($channel)->broadcastToEveryoneExcept((object) [ $payload = (object) [
'channel' => $channelName, 'channel' => $channelName,
'event' => $request->json()->get('name'), 'event' => $request->name,
'data' => $request->json()->get('data'), 'data' => $request->data,
], $request->json()->get('socket_id'), $request->appId); ];
optional($channel)->broadcastToEveryoneExcept($payload, $request->socket_id, $request->appId);
// If the setup is horizontally-scaled using the Redis Pub/Sub,
// then we're going to make sure it gets streamed to the other
// servers as well that are subscribed to the Pub/Sub topics
// attached to the current iterated app & channel.
// For local setups, the local driver will ignore the publishes.
$this->replicator->publish($request->appId, $channelName, $payload);
DashboardLogger::log($request->appId, DashboardLogger::TYPE_API_MESSAGE, [ DashboardLogger::log($request->appId, DashboardLogger::TYPE_API_MESSAGE, [
'channel' => $channelName, 'channel' => $channelName,

View File

@ -1,142 +0,0 @@
<?php
namespace BeyondCode\LaravelWebSockets\PubSub\Broadcasters;
use Illuminate\Broadcasting\Broadcasters\Broadcaster;
use Illuminate\Broadcasting\Broadcasters\UsePusherChannelConventions;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Pusher\Pusher;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
class RedisPusherBroadcaster extends Broadcaster
{
use UsePusherChannelConventions;
/**
* The Pusher SDK instance.
*
* @var \Pusher\Pusher
*/
protected $pusher;
/**
* The Pusher app ID, to be passed in the payload.
*
* @var string
*/
protected $appId;
/**
* The Redis instance.
*
* @var \Illuminate\Contracts\Redis\Factory
*/
protected $redis;
/**
* Create a new broadcaster instance.
*
* @param Pusher $pusher
* @param mixed $appId
* @param \Illuminate\Contracts\Redis\Factory $redis
*/
public function __construct(Pusher $pusher, $appId, Redis $redis)
{
$this->pusher = $pusher;
$this->appId = $appId;
$this->redis = $redis;
}
/**
* Authenticate the incoming request for a given channel.
*
* @param \Illuminate\Http\Request $request
* @return mixed
* @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException
*/
public function auth($request)
{
$channelName = $this->normalizeChannelName($request->channel_name);
if ($this->isGuardedChannel($request->channel_name) &&
! $this->retrieveUser($request, $channelName)) {
throw new AccessDeniedHttpException;
}
return parent::verifyUserCanAccessChannel(
$request, $channelName
);
}
/**
* Return the valid authentication response.
*
* @param \Illuminate\Http\Request $request
* @param mixed $result
* @return mixed
* @throws \Pusher\PusherException
*/
public function validAuthenticationResponse($request, $result)
{
if (Str::startsWith($request->channel_name, 'private')) {
return $this->decodePusherResponse(
$request, $this->pusher->socket_auth($request->channel_name, $request->socket_id)
);
}
$channelName = $this->normalizeChannelName($request->channel_name);
return $this->decodePusherResponse(
$request,
$this->pusher->presence_auth(
$request->channel_name, $request->socket_id,
$this->retrieveUser($request, $channelName)->getAuthIdentifier(), $result
)
);
}
/**
* Decode the given Pusher response.
*
* @param \Illuminate\Http\Request $request
* @param mixed $response
* @return array
*/
protected function decodePusherResponse($request, $response)
{
if (! $request->input('callback', false)) {
return json_decode($response, true);
}
return response()->json(json_decode($response, true))
->withCallback($request->callback);
}
/**
* Broadcast the given event.
*
* @param array $channels
* @param string $event
* @param array $payload
* @return void
*/
public function broadcast(array $channels, $event, array $payload = [])
{
$connection = $this->redis->connection(
config('websockets.replication.redis.connection') ?: 'default'
);
$payload = json_encode([
'appId' => $this->appId,
'event' => $event,
'data' => $payload,
'socket' => Arr::pull($payload, 'socket'),
]);
foreach ($this->formatChannels($channels) as $channel) {
$connection->publish("{$this->appId}:{$channel}", $payload);
}
}
}

View File

@ -293,23 +293,23 @@ class RedisClient extends LocalClient
return; return;
} }
$socket = $payload->socket ?? null; $socketId = $payload->socketId ?? null;
$serverId = $payload->serverId ?? null; $serverId = $payload->serverId ?? null;
// Remove fields intended for internal use from the payload. // Remove fields intended for internal use from the payload.
unset($payload->socket); unset($payload->socketId);
unset($payload->serverId); unset($payload->serverId);
unset($payload->appId); unset($payload->appId);
// Push the message out to connected websocket clients. // Push the message out to connected websocket clients.
$channel->broadcastToEveryoneExcept($payload, $socket, $appId, false); $channel->broadcastToEveryoneExcept($payload, $socketId, $appId, false);
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [
'channel' => $channel->getChannelName(), 'channel' => $channel->getChannelName(),
'redisChannel' => $redisChannel, 'redisChannel' => $redisChannel,
'serverId' => $this->getServerId(), 'serverId' => $this->getServerId(),
'incomingServerId' => $serverId, 'incomingServerId' => $serverId,
'incomingSocketId' => $socket, 'incomingSocketId' => $socketId,
'payload' => $payload, 'payload' => $payload,
]); ]);
} }

View File

@ -8,7 +8,6 @@ use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowStatistics; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowStatistics;
use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard;
use BeyondCode\LaravelWebSockets\PubSub\Broadcasters\RedisPusherBroadcaster;
use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Server\Router;
use BeyondCode\LaravelWebSockets\Statistics\Drivers\StatisticsDriver; use BeyondCode\LaravelWebSockets\Statistics\Drivers\StatisticsDriver;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
@ -47,8 +46,6 @@ class WebSocketsServiceProvider extends ServiceProvider
Console\CleanStatistics::class, Console\CleanStatistics::class,
Console\RestartWebSocketServer::class, Console\RestartWebSocketServer::class,
]); ]);
$this->configurePubSub();
} }
/** /**
@ -85,31 +82,6 @@ class WebSocketsServiceProvider extends ServiceProvider
}); });
} }
/**
* Configure the PubSub replication.
*
* @return void
*/
protected function configurePubSub()
{
$this->app->make(BroadcastManager::class)->extend('websockets', function ($app, array $config) {
$pusher = new Pusher(
$config['key'], $config['secret'],
$config['app_id'], $config['options'] ?? []
);
if ($config['log'] ?? false) {
$pusher->setLogger($this->app->make(LoggerInterface::class));
}
return new RedisPusherBroadcaster(
$pusher,
$config['app_id'],
$this->app->make('redis')
);
});
}
/** /**
* Register the dashboard routes. * Register the dashboard routes.
* *

View File

@ -50,7 +50,8 @@ class SendMessageTest extends TestCase
'data' => json_encode(['data' => 'yes']), 'data' => json_encode(['data' => 'yes']),
]) ])
->seeJson([ ->seeJson([
'ok' => true, 'exception' => 'Failed to connect to Pusher.',
'ok' => false,
]); ]);
} }

View File

@ -37,7 +37,7 @@ class RedisDriverTest extends TestCase
'appId' => '1234', 'appId' => '1234',
'event' => 'test', 'event' => 'test',
'data' => $channelData, 'data' => $channelData,
'socket' => $connection->socketId, 'socketId' => $connection->socketId,
]); ]);
$this->getSubscribeClient()->onMessage('1234:test-channel', $payload); $this->getSubscribeClient()->onMessage('1234:test-channel', $payload);
@ -68,7 +68,7 @@ class RedisDriverTest extends TestCase
'appId' => '1234', 'appId' => '1234',
'event' => 'test', 'event' => 'test',
'data' => $channelData, 'data' => $channelData,
'socket' => $connection->socketId, 'socketId' => $connection->socketId,
]); ]);
$client = (new RedisClient)->boot( $client = (new RedisClient)->boot(

View File

@ -137,7 +137,7 @@ abstract class TestCase extends BaseTestCase
$app['config']->set( $app['config']->set(
'broadcasting.connections.websockets', [ 'broadcasting.connections.websockets', [
'driver' => 'websockets', 'driver' => 'pusher',
'key' => 'TestKey', 'key' => 'TestKey',
'secret' => 'TestSecret', 'secret' => 'TestSecret',
'app_id' => '1234', 'app_id' => '1234',
@ -152,7 +152,7 @@ abstract class TestCase extends BaseTestCase
); );
if (in_array($replicationDriver, ['redis'])) { if (in_array($replicationDriver, ['redis'])) {
$app['config']->set('broadcasting.default', 'websockets'); $app['config']->set('broadcasting.default', 'pusher');
} }
} }