2025-01-16 07:54:02 +00:00
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace BlaxSoftware\LaravelWebSockets\Websocket;
|
|
|
|
|
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Apps\App;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Channels\Channel;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Events\ConnectionClosed;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Events\NewConnection;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Exceptions\WebSocketException;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Server\Exceptions\ConnectionsOverCapacity;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Server\Exceptions\OriginNotAllowed;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Server\Exceptions\UnknownAppKey;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Server\Exceptions\WebSocketException as ExceptionsWebSocketException;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Server\Messages\PusherMessageFactory;
|
|
|
|
|
use BlaxSoftware\LaravelWebSockets\Server\QueryParameters;
|
|
|
|
|
use Exception;
|
2025-05-08 08:54:11 +00:00
|
|
|
use Illuminate\Support\Facades\Auth;
|
2025-01-17 09:45:53 +00:00
|
|
|
use Illuminate\Support\Facades\DB;
|
|
|
|
|
use Illuminate\Support\Facades\Log;
|
2025-01-16 07:54:02 +00:00
|
|
|
use Ratchet\ConnectionInterface;
|
|
|
|
|
use Ratchet\RFC6455\Messaging\MessageInterface;
|
|
|
|
|
use Ratchet\WebSocket\MessageComponentInterface;
|
|
|
|
|
|
|
|
|
|
class Handler implements MessageComponentInterface
|
|
|
|
|
{
|
|
|
|
|
protected $channel_connections = [];
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Initialize a new handler.
|
|
|
|
|
*
|
|
|
|
|
* @return void
|
|
|
|
|
*/
|
|
|
|
|
public function __construct(
|
|
|
|
|
protected ChannelManager $channelManager
|
|
|
|
|
) {}
|
|
|
|
|
|
|
|
|
|
public function onOpen(ConnectionInterface $connection)
|
|
|
|
|
{
|
2025-06-12 14:16:07 +00:00
|
|
|
try{
|
|
|
|
|
if (! $this->connectionCanBeMade($connection)) {
|
|
|
|
|
return $connection->close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set IP to connection
|
|
|
|
|
$connection->remoteAddress = trim(
|
|
|
|
|
explode(
|
|
|
|
|
',',
|
|
|
|
|
$connection->httpRequest->getHeaderLine('X-Forwarded-For')
|
|
|
|
|
)[0] ?? $connection->remoteAddress
|
2025-01-16 07:54:02 +00:00
|
|
|
);
|
2025-06-12 14:16:07 +00:00
|
|
|
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
|
|
|
|
|
Log::channel('websocket')->info('WS onOpen IP: ' . $connection->remoteAddress);
|
|
|
|
|
|
|
|
|
|
$this->verifyAppKey($connection);
|
|
|
|
|
$this->verifyOrigin($connection);
|
|
|
|
|
$this->limitConcurrentConnections($connection);
|
|
|
|
|
$this->generateSocketId($connection);
|
|
|
|
|
$this->establishConnection($connection);
|
|
|
|
|
|
|
|
|
|
if (isset($connection->app)) {
|
|
|
|
|
$this->channelManager->subscribeToApp($connection->app->id);
|
|
|
|
|
$this->channelManager->connectionPonged($connection);
|
|
|
|
|
|
|
|
|
|
NewConnection::dispatch(
|
|
|
|
|
$connection->app->id,
|
|
|
|
|
$connection->socketId
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}catch (UnknownAppKey $e) {
|
|
|
|
|
Log::channel('websocket')->error('Root level error: '. $e->getMessage(), [
|
|
|
|
|
'file' => $e->getFile(),
|
|
|
|
|
'line' => $e->getLine(),
|
|
|
|
|
'trace' => $e->getTraceAsString(),
|
|
|
|
|
]);
|
2025-01-16 07:54:02 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-19 08:01:22 +00:00
|
|
|
public function onMessage(
|
|
|
|
|
ConnectionInterface $connection,
|
|
|
|
|
MessageInterface $message
|
|
|
|
|
) {
|
2025-06-13 08:31:59 +00:00
|
|
|
try {
|
|
|
|
|
if (! isset($connection->app)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
PusherMessageFactory::createForMessage(
|
|
|
|
|
$message,
|
|
|
|
|
$connection,
|
|
|
|
|
$this->channelManager
|
|
|
|
|
)->respond();
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
// Payload json to array
|
|
|
|
|
$message = json_decode($message->getPayload(), true);
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
// Cut short for ping pong
|
|
|
|
|
if (strpos($message['event'], ':ping') !== false) {
|
|
|
|
|
return gc_collect_cycles();
|
|
|
|
|
}
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
$this->handleChannelSubscriptions($message, $connection);
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
if (! $channel = $this->get_connection_channel($connection, $message)) {
|
|
|
|
|
return $connection->send(json_encode([
|
|
|
|
|
'event' => $message['event'].':error',
|
|
|
|
|
'data' => [
|
|
|
|
|
'message' => 'Channel not found',
|
|
|
|
|
'meta' => $message,
|
|
|
|
|
],
|
|
|
|
|
]));
|
|
|
|
|
}
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
$this->authenticateConnection($connection, $channel, $message);
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
Log::channel('websocket')->info('Executing event: '.$message['event']);
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
if (strpos($message['event'], 'pusher') !== false) {
|
|
|
|
|
return $connection->send(json_encode([
|
|
|
|
|
'event' => $message['event'].':response',
|
2025-01-16 07:54:02 +00:00
|
|
|
'data' => [
|
2025-06-13 08:31:59 +00:00
|
|
|
'message' => 'Success',
|
2025-01-16 07:54:02 +00:00
|
|
|
],
|
|
|
|
|
]));
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-13 08:31:59 +00:00
|
|
|
$pid = pcntl_fork();
|
|
|
|
|
|
|
|
|
|
if ($pid == -1) {
|
|
|
|
|
Log::error('Fork error');
|
|
|
|
|
} elseif ($pid == 0) {
|
|
|
|
|
try {
|
|
|
|
|
DB::reconnect();
|
|
|
|
|
|
|
|
|
|
$this->setRequest($message, $connection);
|
|
|
|
|
$mock = new MockConnection($connection);
|
|
|
|
|
|
|
|
|
|
Controller::controll_message(
|
|
|
|
|
$mock,
|
|
|
|
|
$channel,
|
|
|
|
|
$message,
|
|
|
|
|
$this->channelManager
|
|
|
|
|
);
|
2025-09-01 12:16:30 +00:00
|
|
|
|
|
|
|
|
// Run deferred callbacks
|
|
|
|
|
\Illuminate\Container\Container::getInstance()
|
|
|
|
|
->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class)
|
|
|
|
|
->invokeWhen(fn($callback) => true);
|
2025-06-13 08:31:59 +00:00
|
|
|
} catch (Exception $e) {
|
|
|
|
|
$mock->send(json_encode([
|
|
|
|
|
'event' => $message['event'].':error',
|
|
|
|
|
'data' => [
|
|
|
|
|
'message' => $e->getMessage(),
|
|
|
|
|
],
|
|
|
|
|
]));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
exit(0);
|
|
|
|
|
} else {
|
|
|
|
|
$this->addDataCheckLoop($connection, $message, $pid);
|
|
|
|
|
}
|
|
|
|
|
} catch (\Exception $e) {
|
|
|
|
|
Log::channel('websocket')->error('onMessage unhandled error: '. $e->getMessage(), [
|
|
|
|
|
'file' => $e->getFile(),
|
|
|
|
|
'line' => $e->getLine(),
|
|
|
|
|
'trace' => $e->getTraceAsString(),
|
|
|
|
|
]);
|
2025-01-16 07:54:02 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle the websocket close.
|
|
|
|
|
*/
|
2025-01-17 09:45:53 +00:00
|
|
|
public function onClose(ConnectionInterface $connection): void
|
2025-01-16 07:54:02 +00:00
|
|
|
{
|
|
|
|
|
// remove connection from $channel_connections
|
|
|
|
|
foreach ($this->channel_connections as $channel => $connections) {
|
|
|
|
|
if (in_array($connection->socketId, $connections)) {
|
|
|
|
|
$this->channel_connections[$channel] = array_diff($connections, [$connection->socketId]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (empty(@$this->channel_connections[$channel])) {
|
|
|
|
|
unset($this->channel_connections[$channel]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cache()->forget(
|
2025-05-08 08:54:11 +00:00
|
|
|
'ws_socket_tenantable_' . $connection->socketId,
|
2025-01-16 07:54:02 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (@$this->channel_connections[$channel]) {
|
|
|
|
|
cache()->forever(
|
2025-05-08 08:54:11 +00:00
|
|
|
'ws_channel_connections_' . $channel,
|
2025-01-16 07:54:02 +00:00
|
|
|
@$this->channel_connections[$channel]
|
|
|
|
|
);
|
|
|
|
|
} else {
|
2025-05-08 08:54:11 +00:00
|
|
|
cache()->forget('ws_channel_connections_' . $channel);
|
2025-01-16 07:54:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cache()->forever(
|
|
|
|
|
'ws_active_channels',
|
|
|
|
|
array_keys($this->channel_connections)
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$this->channelManager
|
|
|
|
|
->unsubscribeFromAllChannels($connection)
|
2025-01-17 09:45:53 +00:00
|
|
|
->then(function (bool $unsubscribed) use ($connection): void {
|
2025-01-16 07:54:02 +00:00
|
|
|
if (isset($connection->app)) {
|
|
|
|
|
$this->channelManager->unsubscribeFromApp($connection->app->id);
|
|
|
|
|
|
|
|
|
|
ConnectionClosed::dispatch($connection->app->id, $connection->socketId);
|
|
|
|
|
|
2025-05-08 08:54:11 +00:00
|
|
|
cache()->forget('ws_connection_' . $connection->socketId);
|
2025-01-16 07:54:02 +00:00
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle the websocket errors.
|
|
|
|
|
*
|
|
|
|
|
* @param WebSocketException $exception
|
|
|
|
|
*/
|
2025-01-17 09:45:53 +00:00
|
|
|
public function onError(ConnectionInterface $connection, Exception $exception): void
|
2025-01-16 07:54:02 +00:00
|
|
|
{
|
|
|
|
|
if ($exception instanceof ExceptionsWebSocketException) {
|
|
|
|
|
$connection->send(json_encode(
|
|
|
|
|
$exception->getPayload()
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Check if the connection can be made for the
|
|
|
|
|
* current server instance.
|
|
|
|
|
*/
|
2025-01-17 09:45:53 +00:00
|
|
|
protected function connectionCanBeMade(ConnectionInterface $connection): bool
|
2025-01-16 07:54:02 +00:00
|
|
|
{
|
|
|
|
|
return $this->channelManager->acceptsNewConnections();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Verify the app key validity.
|
|
|
|
|
*
|
|
|
|
|
* @return $this
|
|
|
|
|
*/
|
|
|
|
|
protected function verifyAppKey(ConnectionInterface $connection)
|
|
|
|
|
{
|
|
|
|
|
$query = QueryParameters::create($connection->httpRequest);
|
|
|
|
|
|
|
|
|
|
$appKey = $query->get('appKey');
|
|
|
|
|
|
|
|
|
|
if (! $app = App::findByKey($appKey)) {
|
|
|
|
|
throw new UnknownAppKey($appKey);
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-17 09:45:53 +00:00
|
|
|
$app->then(function ($app) use ($connection) {
|
|
|
|
|
$connection->app = $app;
|
|
|
|
|
});
|
2025-01-16 07:54:02 +00:00
|
|
|
|
|
|
|
|
return $this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Verify the origin.
|
|
|
|
|
*
|
|
|
|
|
* @return $this
|
|
|
|
|
*/
|
|
|
|
|
protected function verifyOrigin(ConnectionInterface $connection)
|
|
|
|
|
{
|
|
|
|
|
if (! $connection->app->allowedOrigins) {
|
|
|
|
|
return $this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$header = (string) ($connection->httpRequest->getHeader('Origin')[0] ?? null);
|
|
|
|
|
|
|
|
|
|
$origin = parse_url($header, PHP_URL_HOST) ?: $header;
|
|
|
|
|
|
|
|
|
|
if (! $header || ! in_array($origin, $connection->app->allowedOrigins)) {
|
|
|
|
|
throw new OriginNotAllowed($connection->app->key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return $this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Limit the connections count by the app.
|
|
|
|
|
*
|
|
|
|
|
* @return $this
|
|
|
|
|
*/
|
|
|
|
|
protected function limitConcurrentConnections(ConnectionInterface $connection)
|
|
|
|
|
{
|
|
|
|
|
if (! is_null($capacity = $connection->app->capacity)) {
|
|
|
|
|
$this->channelManager
|
|
|
|
|
->getGlobalConnectionsCount($connection->app->id)
|
2025-01-17 09:45:53 +00:00
|
|
|
->then(function ($connectionsCount) use ($capacity, $connection): void {
|
2025-01-16 07:54:02 +00:00
|
|
|
if ($connectionsCount >= $capacity) {
|
|
|
|
|
$exception = new ConnectionsOverCapacity;
|
|
|
|
|
|
|
|
|
|
$payload = json_encode($exception->getPayload());
|
|
|
|
|
|
|
|
|
|
tap($connection)->send($payload)->close();
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return $this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create a socket id.
|
|
|
|
|
*
|
|
|
|
|
* @return $this
|
|
|
|
|
*/
|
|
|
|
|
protected function generateSocketId(ConnectionInterface $connection)
|
|
|
|
|
{
|
|
|
|
|
$socketId = sprintf('%d.%d', random_int(1, 1000000000), random_int(1, 1000000000));
|
|
|
|
|
|
|
|
|
|
$connection->socketId = $socketId;
|
|
|
|
|
|
|
|
|
|
return $this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Establish connection with the client.
|
|
|
|
|
*
|
|
|
|
|
* @return $this
|
|
|
|
|
*/
|
|
|
|
|
protected function establishConnection(ConnectionInterface $connection)
|
|
|
|
|
{
|
|
|
|
|
$connection->send(json_encode([
|
2025-01-18 16:06:52 +00:00
|
|
|
'event' => 'pusher.connection_established',
|
2025-01-16 07:54:02 +00:00
|
|
|
'data' => json_encode([
|
|
|
|
|
'socket_id' => $connection->socketId,
|
|
|
|
|
'activity_timeout' => 30,
|
|
|
|
|
]),
|
|
|
|
|
]));
|
|
|
|
|
|
|
|
|
|
return $this;
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-17 09:45:53 +00:00
|
|
|
protected function get_connection_channel(&$connection, &$message): ?PrivateChannel
|
2025-01-16 07:54:02 +00:00
|
|
|
{
|
|
|
|
|
// Put channel on its place
|
|
|
|
|
if (! @$message['channel'] && $message['data'] && $message['data']['channel']) {
|
|
|
|
|
$message['channel'] = $message['data']['channel'];
|
|
|
|
|
unset($message['data']['channel']);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$this->channelManager->findOrCreate(
|
|
|
|
|
$connection->app->id,
|
|
|
|
|
$message['channel']
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
return $this->channelManager->find(
|
|
|
|
|
$connection->app->id,
|
|
|
|
|
$message['channel']
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected function handleChannelSubscriptions($message, $connection)
|
|
|
|
|
{
|
|
|
|
|
$channel_name = optional($this->get_connection_channel($connection, $message))->getName() ?? 'no-channel';
|
|
|
|
|
$socket_id = $connection->socketId;
|
|
|
|
|
|
|
|
|
|
// if not in $channel_connections add it
|
2025-01-19 08:01:22 +00:00
|
|
|
if (strpos($message['event'], '.subscribe') !== false) {
|
2025-01-16 07:54:02 +00:00
|
|
|
if (! isset($this->channel_connections[$channel_name])) {
|
|
|
|
|
$this->channel_connections[$channel_name] = [];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (! in_array($connection->socketId, $this->channel_connections[$this->get_connection_channel($connection, $message)->getName()])) {
|
|
|
|
|
$this->channel_connections[$channel_name][] = $connection->socketId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cache()->forever(
|
2025-01-17 09:45:53 +00:00
|
|
|
'ws_channel_connections_'.$channel_name,
|
2025-01-16 07:54:02 +00:00
|
|
|
$this->channel_connections[$channel_name]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
cache()->forever(
|
|
|
|
|
'ws_active_channels',
|
|
|
|
|
array_keys($this->channel_connections)
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-19 08:01:22 +00:00
|
|
|
if (strpos($message['event'], '.unsubscribe') !== false) {
|
2025-01-16 07:54:02 +00:00
|
|
|
if (isset($this->channel_connections[$channel_name])) {
|
|
|
|
|
$this->channel_connections[$channel_name] = array_diff($this->channel_connections[$channel_name], [$socket_id]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (empty($this->channel_connections[$channel_name])) {
|
|
|
|
|
unset($this->channel_connections[$channel_name]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (@$this->channel_connections[$channel_name]) {
|
|
|
|
|
cache()->forever(
|
2025-01-17 09:45:53 +00:00
|
|
|
'ws_channel_connections_'.$channel_name,
|
2025-01-16 07:54:02 +00:00
|
|
|
$this->channel_connections[$channel_name]
|
|
|
|
|
);
|
|
|
|
|
} else {
|
2025-01-17 09:45:53 +00:00
|
|
|
cache()->forget('ws_channel_connections_'.$channel_name);
|
2025-01-16 07:54:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cache()->forever(
|
|
|
|
|
'ws_active_channels',
|
|
|
|
|
array_keys($this->channel_connections)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Log::channel('websocket')->info('Tenant left', ['socketId' => $socket_id, 'channel' => $channel_name]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return $this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected function setRequest($message, $connection)
|
|
|
|
|
{
|
|
|
|
|
foreach (request()->keys() as $key) {
|
|
|
|
|
request()->offsetUnset($key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
request()->merge(@$message['data'] ?? []);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected function authenticateConnection(
|
|
|
|
|
ConnectionInterface $connection,
|
|
|
|
|
PrivateChannel|Channel|PresenceChannel|null $channel,
|
|
|
|
|
$message
|
|
|
|
|
) {
|
|
|
|
|
|
2025-01-17 09:45:53 +00:00
|
|
|
if (! optional($connection)->auth && $connection->socketId && cache()->get('socket_'.$connection->socketId)) {
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-01-17 09:45:53 +00:00
|
|
|
$cached_auth = cache()->get('socket_'.$connection->socketId);
|
2025-01-16 07:54:02 +00:00
|
|
|
|
2025-01-17 09:45:53 +00:00
|
|
|
$connection->user = @$cached_auth['type']::find($cached_auth['id']);
|
2025-01-16 07:54:02 +00:00
|
|
|
|
|
|
|
|
$channel->saveConnection($connection);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update last online of user if user
|
|
|
|
|
if (! optional($connection)->user) {
|
|
|
|
|
$connection->user = false;
|
|
|
|
|
$channel->saveConnection($connection);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set auth or logout
|
|
|
|
|
($connection->user)
|
2025-05-08 08:54:11 +00:00
|
|
|
? Auth::login($connection->user)
|
|
|
|
|
: Auth::logout();
|
|
|
|
|
|
|
|
|
|
if (Auth::user()) {
|
|
|
|
|
/** @var \App\Models\User */
|
|
|
|
|
$user = Auth::user();
|
|
|
|
|
$user->refresh();
|
|
|
|
|
}
|
2025-01-16 07:54:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private function addDataCheckLoop(
|
|
|
|
|
$connection,
|
|
|
|
|
$message,
|
|
|
|
|
$pid,
|
|
|
|
|
$optional = false,
|
|
|
|
|
$iteration = false
|
|
|
|
|
) {
|
2025-01-17 09:45:53 +00:00
|
|
|
$pid = explode('_', $pid.'')[0];
|
2025-01-16 07:54:02 +00:00
|
|
|
|
|
|
|
|
if ($iteration >= 0 && $iteration !== false) {
|
2025-01-17 09:45:53 +00:00
|
|
|
$pid .= '_'.$iteration;
|
2025-01-16 07:54:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set timeout start
|
2025-01-17 09:45:53 +00:00
|
|
|
$pidcache_start = 'dedicated_start_'.$pid;
|
2025-01-16 07:54:02 +00:00
|
|
|
cache()->put($pidcache_start, microtime(true), 100);
|
|
|
|
|
|
|
|
|
|
// Periodic check for data
|
|
|
|
|
$this->channelManager->loop->addPeriodicTimer(0.01, function ($timer) use (
|
|
|
|
|
$pidcache_start,
|
|
|
|
|
$message,
|
|
|
|
|
$pid,
|
|
|
|
|
$connection,
|
|
|
|
|
$optional,
|
|
|
|
|
$iteration
|
|
|
|
|
) {
|
2025-01-17 09:45:53 +00:00
|
|
|
$pidcache_data = 'dedicated_data_'.$pid;
|
|
|
|
|
$pidcache_done = 'dedicated_data_'.$pid.'_done';
|
2025-05-08 15:50:05 +00:00
|
|
|
$pidcache_complete = 'dedicated_data_'.$pid.'_complete';
|
2025-01-16 07:54:02 +00:00
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
cache()->has($pidcache_start)
|
2025-05-08 15:50:05 +00:00
|
|
|
&& ($diff = microtime(true) - ((int) cache()->get($pidcache_start))) > 60
|
2025-01-16 07:54:02 +00:00
|
|
|
) {
|
|
|
|
|
if (! $optional) {
|
|
|
|
|
$connection->send(json_encode([
|
2025-01-17 09:45:53 +00:00
|
|
|
'event' => $message['event'].':error',
|
2025-01-16 07:54:02 +00:00
|
|
|
'data' => [
|
|
|
|
|
'message' => 'Timeout',
|
|
|
|
|
'diff' => $diff,
|
|
|
|
|
],
|
|
|
|
|
]));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$this->channelManager->loop->cancelTimer($timer);
|
2025-05-08 15:50:05 +00:00
|
|
|
cache()->put($pidcache_complete, true, 360);
|
2025-01-16 07:54:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (cache()->has($pidcache_done)) {
|
|
|
|
|
// call self with pid + '_0' and optional
|
|
|
|
|
if ($iteration === false) {
|
|
|
|
|
$this->addDataCheckLoop($connection, $message, $pid, true, 0);
|
|
|
|
|
} else {
|
|
|
|
|
$this->addDataCheckLoop($connection, $message, $pid, true, $iteration + 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Retrieve cached data
|
|
|
|
|
$sending = @cache()->get($pidcache_data);
|
|
|
|
|
|
|
|
|
|
// Send the data to client
|
|
|
|
|
$connection->send($sending);
|
|
|
|
|
|
|
|
|
|
// Stop periodic check
|
|
|
|
|
$this->channelManager->loop->cancelTimer($timer);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Prevent zombie processes
|
|
|
|
|
pcntl_waitpid(-1, $status, WNOHANG);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|