laravel-websockets/src/Websocket/Handler.php

689 lines
22 KiB
PHP
Raw Normal View History

2025-01-16 07:54:02 +00:00
<?php
declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Websocket;
use BlaxSoftware\LaravelWebSockets\Apps\App;
use BlaxSoftware\LaravelWebSockets\Channels\Channel;
use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel;
use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel;
use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager;
use BlaxSoftware\LaravelWebSockets\Events\ConnectionClosed;
use BlaxSoftware\LaravelWebSockets\Events\NewConnection;
use BlaxSoftware\LaravelWebSockets\Exceptions\WebSocketException;
use BlaxSoftware\LaravelWebSockets\Server\Exceptions\ConnectionsOverCapacity;
use BlaxSoftware\LaravelWebSockets\Server\Exceptions\OriginNotAllowed;
use BlaxSoftware\LaravelWebSockets\Server\Exceptions\UnknownAppKey;
use BlaxSoftware\LaravelWebSockets\Server\Exceptions\WebSocketException as ExceptionsWebSocketException;
use BlaxSoftware\LaravelWebSockets\Server\Messages\PusherMessageFactory;
use BlaxSoftware\LaravelWebSockets\Server\QueryParameters;
use Exception;
2025-05-08 08:54:11 +00:00
use Illuminate\Support\Facades\Auth;
2025-01-17 09:45:53 +00:00
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
2025-01-16 07:54:02 +00:00
use Ratchet\ConnectionInterface;
use Ratchet\RFC6455\Messaging\MessageInterface;
use Ratchet\WebSocket\MessageComponentInterface;
class Handler implements MessageComponentInterface
{
protected $channel_connections = [];
/**
* Initialize a new handler.
*
* @return void
*/
public function __construct(
protected ChannelManager $channelManager
) {}
public function onOpen(ConnectionInterface $connection)
{
2025-09-14 13:00:27 +00:00
try {
2025-06-12 14:16:07 +00:00
if (! $this->connectionCanBeMade($connection)) {
return $connection->close();
}
// Set IP to connection
$connection->remoteAddress = trim(
explode(
',',
$connection->httpRequest->getHeaderLine('X-Forwarded-For')
)[0] ?? $connection->remoteAddress
2025-01-16 07:54:02 +00:00
);
2025-06-12 14:16:07 +00:00
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
Log::channel('websocket')->info('WS onOpen IP: ' . $connection->remoteAddress);
$this->verifyAppKey($connection);
$this->verifyOrigin($connection);
$this->limitConcurrentConnections($connection);
$this->generateSocketId($connection);
$this->establishConnection($connection);
if (isset($connection->app)) {
$this->channelManager->subscribeToApp($connection->app->id);
$this->channelManager->connectionPonged($connection);
NewConnection::dispatch(
$connection->app->id,
$connection->socketId
);
}
2025-09-14 13:00:27 +00:00
} catch (UnknownAppKey $e) {
Log::channel('websocket')->error('Root level error: ' . $e->getMessage(), [
2025-06-12 14:16:07 +00:00
'file' => $e->getFile(),
'line' => $e->getLine(),
'trace' => $e->getTraceAsString(),
]);
2025-01-16 07:54:02 +00:00
}
}
2025-01-19 08:01:22 +00:00
public function onMessage(
ConnectionInterface $connection,
MessageInterface $message
) {
2025-06-13 08:31:59 +00:00
try {
if (! isset($connection->app)) {
return;
}
2025-01-16 07:54:02 +00:00
2025-06-13 08:31:59 +00:00
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
2025-01-16 07:54:02 +00:00
2025-06-13 08:31:59 +00:00
PusherMessageFactory::createForMessage(
$message,
$connection,
$this->channelManager
)->respond();
2025-01-16 07:54:02 +00:00
2025-06-13 08:31:59 +00:00
// Payload json to array
$message = json_decode($message->getPayload(), true);
2025-01-16 07:54:02 +00:00
2025-06-13 08:31:59 +00:00
// Cut short for ping pong
2025-09-15 14:31:02 +00:00
if (
(strtolower($message['event']) === 'pusher:ping')
|| (strtolower($message['event']) === 'pusher.ping')
) {
2025-09-14 13:00:27 +00:00
$this->channelManager->connectionPonged($connection);
2025-06-13 08:31:59 +00:00
return gc_collect_cycles();
}
2025-01-16 07:54:02 +00:00
2025-09-15 14:22:59 +00:00
$channel = $this->handleChannelSubscriptions($message, $connection);
2025-01-16 07:54:02 +00:00
2025-09-18 15:55:52 +00:00
if (! optional($channel)->hasConnection($connection) && !(
2025-09-16 11:39:48 +00:00
$message['event'] !== 'pusher:unsubscribe'
&& $message['event'] !== 'pusher.unsubscribe'
)) {
2025-06-13 08:31:59 +00:00
return $connection->send(json_encode([
2025-09-14 13:00:27 +00:00
'event' => $message['event'] . ':error',
2025-06-13 08:31:59 +00:00
'data' => [
2025-09-15 14:22:59 +00:00
'message' => 'Subscription not established',
2025-06-13 08:31:59 +00:00
'meta' => $message,
],
]));
}
2025-01-16 07:54:02 +00:00
2025-09-18 16:07:15 +00:00
if (!$channel) {
return $connection->send(json_encode([
'event' => $message['event'] . ':error',
'data' => [
'message' => 'Channel not found',
'meta' => $message,
],
]));
}
2025-10-15 08:32:35 +00:00
$this->authenticateConnection($connection, $channel, $message);
2025-01-16 07:54:02 +00:00
2025-09-18 13:56:13 +00:00
\Log::channel('websocket')->info('[' . $connection->socketId . ']@' . $channel->getName() . ' | ' . json_encode($message));
2025-01-16 07:54:02 +00:00
2025-06-13 08:31:59 +00:00
if (strpos($message['event'], 'pusher') !== false) {
return $connection->send(json_encode([
2025-09-14 13:00:27 +00:00
'event' => $message['event'] . ':response',
2025-01-16 07:54:02 +00:00
'data' => [
2025-06-13 08:31:59 +00:00
'message' => 'Success',
2025-01-16 07:54:02 +00:00
],
]));
}
2025-06-13 08:31:59 +00:00
$pid = pcntl_fork();
if ($pid == -1) {
Log::error('Fork error');
} elseif ($pid == 0) {
try {
2025-09-19 10:18:40 +00:00
DB::disconnect();
2025-06-13 08:31:59 +00:00
DB::reconnect();
$this->setRequest($message, $connection);
$mock = new MockConnection($connection);
Controller::controll_message(
$mock,
$channel,
$message,
$this->channelManager
);
2025-09-01 12:16:30 +00:00
// Run deferred callbacks
\Illuminate\Container\Container::getInstance()
->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class)
->invokeWhen(fn($callback) => true);
2025-06-13 08:31:59 +00:00
} catch (Exception $e) {
$mock->send(json_encode([
2025-09-14 13:00:27 +00:00
'event' => $message['event'] . ':error',
2025-06-13 08:31:59 +00:00
'data' => [
'message' => $e->getMessage(),
],
]));
2025-12-02 17:33:52 +00:00
// if sentry is defined capture exception
if (app()->bound('sentry')) {
app('sentry')->captureException($e);
}
2025-06-13 08:31:59 +00:00
}
exit(0);
} else {
$this->addDataCheckLoop($connection, $message, $pid);
}
} catch (\Throwable $e) {
2025-09-14 13:00:27 +00:00
Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [
2025-06-13 08:31:59 +00:00
'file' => $e->getFile(),
'line' => $e->getLine(),
'trace' => $e->getTraceAsString(),
]);
2025-10-15 07:27:37 +00:00
2025-12-02 17:33:52 +00:00
// if sentry is defined capture exception
if (app()->bound('sentry')) {
app('sentry')->captureException($e);
}
}
2025-01-16 07:54:02 +00:00
}
/**
* Handle the websocket close.
*/
2025-01-17 09:45:53 +00:00
public function onClose(ConnectionInterface $connection): void
2025-01-16 07:54:02 +00:00
{
2025-10-15 07:27:37 +00:00
$this->authenticateConnection($connection, null);
if (@$connection?->remoteAddress) {
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
}
2025-01-16 07:54:02 +00:00
// remove connection from $channel_connections
foreach ($this->channel_connections as $channel => $connections) {
if (in_array($connection->socketId, $connections)) {
$this->channel_connections[$channel] = array_diff($connections, [$connection->socketId]);
}
if (empty(@$this->channel_connections[$channel])) {
unset($this->channel_connections[$channel]);
}
cache()->forget(
2025-09-15 08:20:13 +00:00
'ws_socket_auth_' . $connection->socketId,
2025-01-16 07:54:02 +00:00
);
if (@$this->channel_connections[$channel]) {
cache()->forever(
2025-05-08 08:54:11 +00:00
'ws_channel_connections_' . $channel,
2025-01-16 07:54:02 +00:00
@$this->channel_connections[$channel]
);
} else {
2025-05-08 08:54:11 +00:00
cache()->forget('ws_channel_connections_' . $channel);
2025-01-16 07:54:02 +00:00
}
cache()->forever(
'ws_active_channels',
array_keys($this->channel_connections)
);
2025-09-15 08:20:13 +00:00
$authed_users = cache()->get('ws_socket_authed_users') ?? [];
unset($authed_users[$connection->socketId]);
cache()->forever('ws_socket_authed_users', $authed_users);
2025-10-15 07:27:37 +00:00
\BlaxSoftware\LaravelWebSockets\Services\WebsocketService::clearUserAuthed(
$connection->socketId
);
2025-01-16 07:54:02 +00:00
}
$this->channelManager
->unsubscribeFromAllChannels($connection)
2025-01-17 09:45:53 +00:00
->then(function (bool $unsubscribed) use ($connection): void {
2025-01-16 07:54:02 +00:00
if (isset($connection->app)) {
$this->channelManager->unsubscribeFromApp($connection->app->id);
ConnectionClosed::dispatch($connection->app->id, $connection->socketId);
2025-05-08 08:54:11 +00:00
cache()->forget('ws_connection_' . $connection->socketId);
2025-01-16 07:54:02 +00:00
}
});
}
/**
* Handle the websocket errors.
*
* @param WebSocketException $exception
*/
2025-01-17 09:45:53 +00:00
public function onError(ConnectionInterface $connection, Exception $exception): void
2025-01-16 07:54:02 +00:00
{
if ($exception instanceof ExceptionsWebSocketException) {
$connection->send(json_encode(
$exception->getPayload()
));
}
}
/**
* Check if the connection can be made for the
* current server instance.
*/
2025-01-17 09:45:53 +00:00
protected function connectionCanBeMade(ConnectionInterface $connection): bool
2025-01-16 07:54:02 +00:00
{
return $this->channelManager->acceptsNewConnections();
}
/**
* Verify the app key validity.
*
* @return $this
*/
protected function verifyAppKey(ConnectionInterface $connection)
{
$query = QueryParameters::create($connection->httpRequest);
$appKey = $query->get('appKey');
if (! $app = App::findByKey($appKey)) {
throw new UnknownAppKey($appKey);
}
2025-01-17 09:45:53 +00:00
$app->then(function ($app) use ($connection) {
$connection->app = $app;
});
2025-01-16 07:54:02 +00:00
return $this;
}
/**
* Verify the origin.
*
* @return $this
*/
protected function verifyOrigin(ConnectionInterface $connection)
{
if (! $connection->app->allowedOrigins) {
return $this;
}
$header = (string) ($connection->httpRequest->getHeader('Origin')[0] ?? null);
$origin = parse_url($header, PHP_URL_HOST) ?: $header;
if (! $header || ! in_array($origin, $connection->app->allowedOrigins)) {
throw new OriginNotAllowed($connection->app->key);
}
return $this;
}
/**
* Limit the connections count by the app.
*
* @return $this
*/
protected function limitConcurrentConnections(ConnectionInterface $connection)
{
if (! is_null($capacity = $connection->app->capacity)) {
$this->channelManager
->getGlobalConnectionsCount($connection->app->id)
2025-01-17 09:45:53 +00:00
->then(function ($connectionsCount) use ($capacity, $connection): void {
2025-01-16 07:54:02 +00:00
if ($connectionsCount >= $capacity) {
$exception = new ConnectionsOverCapacity;
$payload = json_encode($exception->getPayload());
tap($connection)->send($payload)->close();
}
});
}
return $this;
}
/**
* Create a socket id.
*
* @return $this
*/
protected function generateSocketId(ConnectionInterface $connection)
{
$socketId = sprintf('%d.%d', random_int(1, 1000000000), random_int(1, 1000000000));
$connection->socketId = $socketId;
return $this;
}
/**
* Establish connection with the client.
*
* @return $this
*/
protected function establishConnection(ConnectionInterface $connection)
{
$connection->send(json_encode([
2025-01-18 16:06:52 +00:00
'event' => 'pusher.connection_established',
2025-01-16 07:54:02 +00:00
'data' => json_encode([
'socket_id' => $connection->socketId,
'activity_timeout' => 30,
]),
]));
return $this;
}
2025-09-14 13:00:27 +00:00
protected function get_connection_channel(&$connection, &$message): ?Channel
2025-01-16 07:54:02 +00:00
{
// Put channel on its place
if (! @$message['channel'] && $message['data'] && $message['data']['channel']) {
$message['channel'] = $message['data']['channel'];
unset($message['data']['channel']);
}
2025-09-16 06:54:13 +00:00
$this->channelManager->findOrCreate(
2025-01-16 07:54:02 +00:00
$connection->app->id,
$message['channel']
);
return $this->channelManager->find(
$connection->app->id,
$message['channel']
);
}
2025-09-18 13:56:13 +00:00
protected function handleChannelSubscriptions($message, $connection): ?Channel
2025-01-16 07:54:02 +00:00
{
2025-09-15 14:22:59 +00:00
$channel = $this->get_connection_channel($connection, $message);
$channel_name = optional($channel)->getName();
2025-01-16 07:54:02 +00:00
$socket_id = $connection->socketId;
2025-09-18 13:56:13 +00:00
if (! $channel_name || ! $channel) {
2025-09-15 14:22:59 +00:00
return null;
2025-09-15 12:29:07 +00:00
}
2025-01-16 07:54:02 +00:00
// if not in $channel_connections add it
2025-09-16 06:54:13 +00:00
if (
(strtolower($message['event']) === 'pusher.subscribe')
|| (strtolower($message['event']) === 'pusher:subscribe')
) {
2025-01-16 07:54:02 +00:00
if (! isset($this->channel_connections[$channel_name])) {
$this->channel_connections[$channel_name] = [];
}
if (! in_array($connection->socketId, $this->channel_connections[$this->get_connection_channel($connection, $message)->getName()])) {
$this->channel_connections[$channel_name][] = $connection->socketId;
}
cache()->forever(
2025-09-14 13:00:27 +00:00
'ws_channel_connections_' . $channel_name,
2025-01-16 07:54:02 +00:00
$this->channel_connections[$channel_name]
);
cache()->forever(
'ws_active_channels',
array_keys($this->channel_connections)
);
2025-09-15 12:29:07 +00:00
2025-09-15 14:22:59 +00:00
if (! $channel->hasConnection($connection)) {
2025-09-18 13:56:13 +00:00
try {
2025-09-15 14:22:59 +00:00
$channel->subscribe($connection, (object) $message);
} catch (\Throwable $e) {
return null;
}
}
2025-01-16 07:54:02 +00:00
}
2025-01-19 08:01:22 +00:00
if (strpos($message['event'], '.unsubscribe') !== false) {
2025-01-16 07:54:02 +00:00
if (isset($this->channel_connections[$channel_name])) {
$this->channel_connections[$channel_name] = array_diff($this->channel_connections[$channel_name], [$socket_id]);
}
if (empty($this->channel_connections[$channel_name])) {
unset($this->channel_connections[$channel_name]);
}
if (@$this->channel_connections[$channel_name]) {
cache()->forever(
2025-09-14 13:00:27 +00:00
'ws_channel_connections_' . $channel_name,
2025-01-16 07:54:02 +00:00
$this->channel_connections[$channel_name]
);
} else {
2025-09-14 13:00:27 +00:00
cache()->forget('ws_channel_connections_' . $channel_name);
2025-01-16 07:54:02 +00:00
}
cache()->forever(
'ws_active_channels',
array_keys($this->channel_connections)
);
2025-09-16 09:20:48 +00:00
$channel->unsubscribe($connection);
2025-01-16 07:54:02 +00:00
}
2025-09-15 14:22:59 +00:00
return $channel;
2025-01-16 07:54:02 +00:00
}
protected function setRequest($message, $connection)
{
foreach (request()->keys() as $key) {
request()->offsetUnset($key);
}
request()->merge(@$message['data'] ?? []);
}
protected function authenticateConnection(
ConnectionInterface $connection,
PrivateChannel|Channel|PresenceChannel|null $channel,
2025-10-15 07:45:04 +00:00
$message = []
2025-01-16 07:54:02 +00:00
) {
2025-10-15 08:32:35 +00:00
if (
!optional($connection)->auth
&& $connection->socketId
&& ($cached_auth = cache()->get('socket_' . $connection->socketId))
&& @$cached_auth['type']
) {
2025-01-17 09:45:53 +00:00
$connection->user = @$cached_auth['type']::find($cached_auth['id']);
2025-01-16 07:54:02 +00:00
2025-10-15 08:32:35 +00:00
if ($channel) {
2025-10-15 07:27:37 +00:00
$channel->saveConnection($connection);
}
2025-01-16 07:54:02 +00:00
}
// Update last online of user if user
if (! optional($connection)->user) {
$connection->user = false;
2025-12-02 17:33:52 +00:00
if ($channel) {
2025-10-15 07:27:37 +00:00
$channel->saveConnection($connection);
}
2025-01-16 07:54:02 +00:00
}
// Set auth or logout
($connection->user)
2025-05-08 08:54:11 +00:00
? Auth::login($connection->user)
: Auth::logout();
if (Auth::user()) {
/** @var \App\Models\User */
$user = Auth::user();
$user->refresh();
2025-09-15 08:20:13 +00:00
cache()->forever(
'ws_socket_auth_' . $connection->socketId,
$user,
);
$authed_users = cache()->get('ws_socket_authed_users') ?? [];
$authed_users[$connection->socketId] = $user->id;
cache()->forever('ws_socket_authed_users', $authed_users);
2025-10-15 07:35:07 +00:00
\BlaxSoftware\LaravelWebSockets\Services\WebsocketService::setUserAuthed(
$connection->socketId,
$user
);
2025-05-08 08:54:11 +00:00
}
2025-10-15 07:27:37 +00:00
// add next in loop logout
$this->channelManager->loop->futureTick(function () {
Auth::logout();
});
2025-01-16 07:54:02 +00:00
}
private function addDataCheckLoop(
$connection,
$message,
$pid,
$optional = false,
$iteration = false
) {
2025-09-14 13:00:27 +00:00
$pid = explode('_', $pid . '')[0];
2025-01-16 07:54:02 +00:00
if ($iteration >= 0 && $iteration !== false) {
2025-09-14 13:00:27 +00:00
$pid .= '_' . $iteration;
2025-01-16 07:54:02 +00:00
}
// Set timeout start
2025-09-14 13:00:27 +00:00
$pidcache_start = 'dedicated_start_' . $pid;
2025-01-16 07:54:02 +00:00
cache()->put($pidcache_start, microtime(true), 100);
// Periodic check for data
$this->channelManager->loop->addPeriodicTimer(0.01, function ($timer) use (
$pidcache_start,
$message,
$pid,
$connection,
$optional,
$iteration
) {
2025-09-14 13:00:27 +00:00
$pidcache_data = 'dedicated_data_' . $pid;
$pidcache_done = 'dedicated_data_' . $pid . '_done';
$pidcache_complete = 'dedicated_data_' . $pid . '_complete';
2025-01-16 07:54:02 +00:00
if (
cache()->has($pidcache_start)
2025-05-08 15:50:05 +00:00
&& ($diff = microtime(true) - ((int) cache()->get($pidcache_start))) > 60
2025-01-16 07:54:02 +00:00
) {
if (! $optional) {
$connection->send(json_encode([
2025-09-14 13:00:27 +00:00
'event' => $message['event'] . ':error',
2025-01-16 07:54:02 +00:00
'data' => [
2025-09-13 17:33:29 +00:00
'message' => $message['event'] . ' timeout',
2025-01-16 07:54:02 +00:00
'diff' => $diff,
],
]));
}
$this->channelManager->loop->cancelTimer($timer);
2025-05-08 15:50:05 +00:00
cache()->put($pidcache_complete, true, 360);
2025-01-16 07:54:02 +00:00
}
if (cache()->has($pidcache_done)) {
// call self with pid + '_0' and optional
if ($iteration === false) {
$this->addDataCheckLoop($connection, $message, $pid, true, 0);
} else {
$this->addDataCheckLoop($connection, $message, $pid, true, $iteration + 1);
}
// Retrieve cached data
$sending = @cache()->get($pidcache_data);
2025-09-14 13:00:27 +00:00
$bm = json_decode($sending, true);
2025-01-16 07:54:02 +00:00
2025-09-13 17:33:29 +00:00
2025-01-16 07:54:02 +00:00
// Send the data to client
2025-09-14 13:00:27 +00:00
if (@$bm['broadcast']) {
2025-09-13 17:33:29 +00:00
$this->broadcast(
$connection->app->id,
$bm['data'] ?? null,
$bm['event'] ?? null,
$bm['channel'] ?? null,
$bm['including_self'],
$connection
);
2025-09-15 12:29:07 +00:00
} elseif (@$bm['whisper']) {
$this->whisper(
$connection->app->id,
$bm['data'] ?? null,
$bm['event'] ?? null,
$bm['socket_ids'] ?? [],
$bm['channel'] ?? null,
);
2025-09-14 13:00:27 +00:00
} else {
2025-09-13 17:33:29 +00:00
$connection->send($sending);
}
2025-01-16 07:54:02 +00:00
// Stop periodic check
$this->channelManager->loop->cancelTimer($timer);
}
// Prevent zombie processes
pcntl_waitpid(-1, $status, WNOHANG);
});
}
2025-09-13 17:33:29 +00:00
public function broadcast(
string $appId,
mixed $payload,
?string $event = null,
?string $channel = null,
bool $including_self = false,
$connection = null
2025-09-14 13:00:27 +00:00
): void {
2025-09-13 17:33:29 +00:00
2025-09-14 13:00:27 +00:00
$channel = $this->channelManager->findOrCreate($appId, $channel);
$p = [
'event' => ($event ?? $event),
'data' => $payload,
'channel' => $channel->getName(),
];
2025-09-13 17:33:29 +00:00
foreach ($channel->getConnections() as $channel_conection) {
2025-09-16 08:58:54 +00:00
if ($channel_conection->socketId !== $connection->socketId) {
2025-09-14 13:00:27 +00:00
$channel_conection->send(json_encode($p));
2025-09-13 17:33:29 +00:00
}
if ($including_self) {
2025-09-14 13:00:27 +00:00
$connection->send(json_encode($p));
2025-09-13 17:33:29 +00:00
}
}
}
2025-09-15 12:29:07 +00:00
public function whisper(
string $appId,
mixed $payload,
?string $event = null,
array $socketIds = [],
?string $channel = null
): void {
$channel = $this->channelManager->findOrCreate($appId, $channel);
$p = [
'event' => ($event ?? $event),
'data' => $payload,
'channel' => $channel->getName(),
];
foreach ($channel->getConnections() as $channel_conection) {
if (in_array($channel_conection->socketId, $socketIds)) {
$channel_conection->send(json_encode($p));
}
}
}
2025-01-16 07:54:02 +00:00
}