laravel-websockets/src/Server/WebSocketHandler.php

286 lines
8.8 KiB
PHP
Raw Normal View History

2018-11-21 11:13:40 +00:00
<?php
2025-01-16 07:54:02 +00:00
namespace BlaxSoftware\LaravelWebSockets\Server;
2018-11-21 11:13:40 +00:00
2025-01-16 07:54:02 +00:00
use BlaxSoftware\LaravelWebSockets\Apps\App;
use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager;
use BlaxSoftware\LaravelWebSockets\DashboardLogger;
use BlaxSoftware\LaravelWebSockets\Events\ConnectionClosed;
use BlaxSoftware\LaravelWebSockets\Events\NewConnection;
use BlaxSoftware\LaravelWebSockets\Events\WebSocketMessageReceived;
use BlaxSoftware\LaravelWebSockets\Facades\StatisticsCollector;
use BlaxSoftware\LaravelWebSockets\Helpers;
use BlaxSoftware\LaravelWebSockets\Server\Exceptions\WebSocketException;
2020-03-04 09:58:39 +00:00
use Exception;
use Ratchet\ConnectionInterface;
use Ratchet\RFC6455\Messaging\MessageInterface;
2020-09-10 19:59:49 +00:00
use Ratchet\WebSocket\MessageComponentInterface;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
2018-11-21 11:13:40 +00:00
2018-11-27 20:06:04 +00:00
class WebSocketHandler implements MessageComponentInterface
2018-11-21 11:13:40 +00:00
{
2020-08-18 17:21:22 +00:00
/**
* The channel manager.
*
2020-09-10 19:59:26 +00:00
* @var ChannelManager
2020-08-18 17:21:22 +00:00
*/
2018-11-21 11:13:40 +00:00
protected $channelManager;
2020-08-18 17:21:22 +00:00
/**
* Initialize a new handler.
*
2025-01-16 07:54:02 +00:00
* @param \BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager $channelManager
2020-08-18 17:21:22 +00:00
* @return void
*/
2018-11-24 22:52:55 +00:00
public function __construct(ChannelManager $channelManager)
2018-11-21 11:13:40 +00:00
{
$this->channelManager = $channelManager;
}
2020-08-18 17:21:22 +00:00
/**
* Handle the socket opening.
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
*/
2018-11-30 16:24:55 +00:00
public function onOpen(ConnectionInterface $connection)
2018-11-21 11:13:40 +00:00
{
if (! $this->connectionCanBeMade($connection)) {
return $connection->close();
}
2020-08-18 17:21:22 +00:00
$this->verifyAppKey($connection)
->then(function () use ($connection) {
try {
$this->verifyOrigin($connection)
->limitConcurrentConnections($connection)
->generateSocketId($connection)
->establishConnection($connection);
2020-09-10 19:59:26 +00:00
if (isset($connection->app)) {
/** @var \GuzzleHttp\Psr7\Request $request */
$request = $connection->httpRequest;
2020-09-10 19:59:26 +00:00
if ($connection->app->statisticsEnabled) {
StatisticsCollector::connection($connection->app->id);
}
2020-09-10 19:59:26 +00:00
$this->channelManager->subscribeToApp($connection->app->id);
2020-09-10 19:59:26 +00:00
$this->channelManager->connectionPonged($connection);
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_CONNECTED, [
'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}",
'socketId' => $connection->socketId,
]);
NewConnection::dispatch($connection->app->id, $connection->socketId);
}
} catch (WebSocketException $exception) {
$this->onError($connection, $exception);
}
}, function ($exception) use ($connection) {
$this->onError($connection, $exception);
});
2018-11-21 11:13:40 +00:00
}
2020-08-18 17:21:22 +00:00
/**
* Handle the incoming message.
*
* @param \Ratchet\ConnectionInterface $connection
* @param \Ratchet\RFC6455\Messaging\MessageInterface $message
* @return void
*/
2018-11-22 07:31:45 +00:00
public function onMessage(ConnectionInterface $connection, MessageInterface $message)
2018-11-21 11:13:40 +00:00
{
if (! isset($connection->app)) {
return;
}
2020-09-10 19:59:26 +00:00
Messages\PusherMessageFactory::createForMessage(
$message, $connection, $this->channelManager
)->respond();
2018-12-03 12:33:20 +00:00
if ($connection->app->statisticsEnabled) {
StatisticsCollector::webSocketMessage($connection->app->id);
}
WebSocketMessageReceived::dispatch(
$connection->app->id,
$connection->socketId,
$message
);
2018-11-21 11:13:40 +00:00
}
2020-08-18 17:21:22 +00:00
/**
* Handle the websocket close.
*
* @param \Ratchet\ConnectionInterface $connection
* @return PromiseInterface
2020-08-18 17:21:22 +00:00
*/
2018-11-21 21:36:50 +00:00
public function onClose(ConnectionInterface $connection)
{
return $this->channelManager
2020-09-19 11:16:26 +00:00
->unsubscribeFromAllChannels($connection)
->then(function (bool $unsubscribed) use ($connection) {
if (isset($connection->app)) {
if ($connection->app->statisticsEnabled) {
StatisticsCollector::disconnection($connection->app->id);
}
2020-09-19 11:16:26 +00:00
return $this->channelManager->unsubscribeFromApp($connection->app->id);
}
2020-09-19 11:16:26 +00:00
return Helpers::createFulfilledPromise(true);
})
->then(function () use ($connection) {
if (isset($connection->app)) {
2020-09-19 11:16:26 +00:00
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_DISCONNECTED, [
'socketId' => $connection->socketId,
]);
ConnectionClosed::dispatch($connection->app->id, $connection->socketId);
2020-09-19 11:16:26 +00:00
}
});
2018-11-21 21:36:50 +00:00
}
2018-11-22 09:54:51 +00:00
2020-08-18 17:21:22 +00:00
/**
* Handle the websocket errors.
*
* @param \Ratchet\ConnectionInterface $connection
* @param WebSocketException $exception
* @return void
*/
2018-11-26 23:13:22 +00:00
public function onError(ConnectionInterface $connection, Exception $exception)
2018-11-22 09:54:51 +00:00
{
2020-09-10 19:59:26 +00:00
if ($exception instanceof Exceptions\WebSocketException) {
2018-11-22 09:54:51 +00:00
$connection->send(json_encode(
$exception->getPayload()
));
}
}
2018-11-22 10:29:12 +00:00
/**
* Check if the connection can be made for the
* current server instance.
*
* @param \Ratchet\ConnectionInterface $connection
* @return bool
*/
protected function connectionCanBeMade(ConnectionInterface $connection): bool
{
return $this->channelManager->acceptsNewConnections();
}
2020-08-18 17:21:22 +00:00
/**
* Verify the app key validity.
*
* @param \Ratchet\ConnectionInterface $connection
* @return PromiseInterface
2020-08-18 17:21:22 +00:00
*/
protected function verifyAppKey(ConnectionInterface $connection): PromiseInterface
2018-11-22 10:29:12 +00:00
{
$deferred = new Deferred();
2020-09-10 19:59:26 +00:00
$query = QueryParameters::create($connection->httpRequest);
$appKey = $query->get('appKey');
2018-11-22 10:30:57 +00:00
App::findByKey($appKey)
->then(function ($app) use ($appKey, $connection, $deferred) {
if (! $app) {
$deferred->reject(new Exceptions\UnknownAppKey($appKey));
}
$connection->app = $app;
2018-11-22 16:59:56 +00:00
$deferred->resolve();
});
2018-11-26 23:13:22 +00:00
return $deferred->promise();
2018-11-22 10:29:12 +00:00
}
2018-11-22 10:30:57 +00:00
2020-08-18 17:21:22 +00:00
/**
* Verify the origin.
*
* @param \Ratchet\ConnectionInterface $connection
* @return $this
*/
2020-08-18 13:04:52 +00:00
protected function verifyOrigin(ConnectionInterface $connection)
{
if (! $connection->app->allowedOrigins) {
return $this;
}
$header = (string) ($connection->httpRequest->getHeader('Origin')[0] ?? null);
$origin = parse_url($header, PHP_URL_HOST) ?: $header;
if (! $header || ! in_array($origin, $connection->app->allowedOrigins)) {
2020-09-10 19:59:26 +00:00
throw new Exceptions\OriginNotAllowed($connection->app->key);
2020-08-18 13:04:52 +00:00
}
return $this;
}
2020-08-18 17:21:22 +00:00
/**
* Limit the connections count by the app.
*
* @param \Ratchet\ConnectionInterface $connection
* @return $this
*/
protected function limitConcurrentConnections(ConnectionInterface $connection)
{
if (! is_null($capacity = $connection->app->capacity)) {
2020-09-10 19:59:26 +00:00
$this->channelManager
->getGlobalConnectionsCount($connection->app->id)
->then(function ($connectionsCount) use ($capacity, $connection) {
if ($connectionsCount >= $capacity) {
$exception = new Exceptions\ConnectionsOverCapacity;
2020-08-24 09:43:59 +00:00
2020-09-10 19:59:26 +00:00
$payload = json_encode($exception->getPayload());
2020-09-06 17:29:33 +00:00
2020-09-10 19:59:26 +00:00
tap($connection)->send($payload)->close();
}
});
}
return $this;
}
2020-08-18 17:21:22 +00:00
/**
* Create a socket id.
*
* @param \Ratchet\ConnectionInterface $connection
* @return $this
*/
2018-12-01 12:40:18 +00:00
protected function generateSocketId(ConnectionInterface $connection)
{
2018-12-04 21:22:33 +00:00
$socketId = sprintf('%d.%d', random_int(1, 1000000000), random_int(1, 1000000000));
2018-12-01 12:40:18 +00:00
$connection->socketId = $socketId;
return $this;
}
2020-08-18 17:21:22 +00:00
/**
* Establish connection with the client.
*
* @param \Ratchet\ConnectionInterface $connection
* @return $this
*/
2018-11-22 10:30:57 +00:00
protected function establishConnection(ConnectionInterface $connection)
{
$connection->send(json_encode([
'event' => 'pusher:connection_established',
'data' => json_encode([
'socket_id' => $connection->socketId,
2018-11-28 16:09:50 +00:00
'activity_timeout' => 30,
2018-12-04 21:22:33 +00:00
]),
2018-11-22 10:30:57 +00:00
]));
2018-11-26 21:18:00 +00:00
2018-11-26 23:13:22 +00:00
return $this;
2018-11-22 10:30:57 +00:00
}
2018-12-04 21:22:33 +00:00
}