BF handlers for l11

This commit is contained in:
Alexander Blasl 2025-01-17 10:45:53 +01:00
parent 3bb4cbab33
commit 03972f9e7d
4 changed files with 76 additions and 71 deletions

View File

@ -4,13 +4,13 @@ declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Websocket; namespace BlaxSoftware\LaravelWebSockets\Websocket;
use Auth;
use BlaxSoftware\LaravelWebSockets\ChannelManagers\LocalChannelManager; use BlaxSoftware\LaravelWebSockets\ChannelManagers\LocalChannelManager;
use BlaxSoftware\LaravelWebSockets\ChannelManagers\RedisChannelManager; use BlaxSoftware\LaravelWebSockets\ChannelManagers\RedisChannelManager;
use BlaxSoftware\LaravelWebSockets\Channels\Channel; use BlaxSoftware\LaravelWebSockets\Channels\Channel;
use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel; use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel;
use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel; use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use Illuminate\Support\Facades\Log;
class Controller class Controller
{ {
@ -107,7 +107,7 @@ class Controller
'channel' => @$message['channel'], 'channel' => @$message['channel'],
'line' => $e->getFile() . ':' . $e->getLine(), 'line' => $e->getFile() . ':' . $e->getLine(),
]; ];
\Log::error($e->getMessage(), $reload); Log::error($e->getMessage(), $reload);
return $connection->send(json_encode([ return $connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'] . ':error',
@ -222,7 +222,7 @@ class Controller
: null; : null;
// log // log
\Log::channel('websocket')->error('Send error: ' . $p['data']['message'], $p); Log::channel('websocket')->error('Send error: ' . $p['data']['message'], $p);
if (get_class($this->connection) === MockConnection::class) { if (get_class($this->connection) === MockConnection::class) {
$connection = clone $this->connection; $connection = clone $this->connection;

View File

@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Websocket\Controllers;
class PusherController extends \BlaxSoftware\LaravelWebSockets\Websocket\Controller
{
public $need_auth = false;
public function unsubscribe($connection, $data, $channel)
{
// $this->broadcast(
// $this->get_users_in_channel(),
// 'channel:left',
// including_self: true
// );
return $this->success([], 'channel:joined');
}
public function subscribe($connection, $data, $channel)
{
// $this->broadcast(
// $this->get_users_in_channel(),
// 'channel:joined',
// including_self: true
// );
return $this->success([], 'channel:joined');
}
}

View File

@ -4,8 +4,6 @@ declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Websocket; namespace BlaxSoftware\LaravelWebSockets\Websocket;
use App\Models\Tenant;
use Auth;
use BlaxSoftware\LaravelWebSockets\Apps\App; use BlaxSoftware\LaravelWebSockets\Apps\App;
use BlaxSoftware\LaravelWebSockets\Channels\Channel; use BlaxSoftware\LaravelWebSockets\Channels\Channel;
use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel; use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel;
@ -21,7 +19,8 @@ use BlaxSoftware\LaravelWebSockets\Server\Exceptions\WebSocketException as Excep
use BlaxSoftware\LaravelWebSockets\Server\Messages\PusherMessageFactory; use BlaxSoftware\LaravelWebSockets\Server\Messages\PusherMessageFactory;
use BlaxSoftware\LaravelWebSockets\Server\QueryParameters; use BlaxSoftware\LaravelWebSockets\Server\QueryParameters;
use Exception; use Exception;
use Log; use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use Ratchet\RFC6455\Messaging\MessageInterface; use Ratchet\RFC6455\Messaging\MessageInterface;
use Ratchet\WebSocket\MessageComponentInterface; use Ratchet\WebSocket\MessageComponentInterface;
@ -45,11 +44,11 @@ class Handler implements MessageComponentInterface
return $connection->close(); return $connection->close();
} }
$this->verifyAppKey($connection) $this->verifyAppKey($connection);
->verifyOrigin($connection) $this->verifyOrigin($connection);
->limitConcurrentConnections($connection) $this->limitConcurrentConnections($connection);
->generateSocketId($connection) $this->generateSocketId($connection);
->establishConnection($connection); $this->establishConnection($connection);
if (isset($connection->app)) { if (isset($connection->app)) {
$this->channelManager->subscribeToApp($connection->app->id); $this->channelManager->subscribeToApp($connection->app->id);
@ -86,7 +85,7 @@ class Handler implements MessageComponentInterface
if (! $channel = $this->get_connection_channel($connection, $message)) { if (! $channel = $this->get_connection_channel($connection, $message)) {
return $connection->send(json_encode([ return $connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'].':error',
'data' => [ 'data' => [
'message' => 'Channel not found', 'message' => 'Channel not found',
'meta' => $message, 'meta' => $message,
@ -96,7 +95,7 @@ class Handler implements MessageComponentInterface
$this->authenticateConnection($connection, $channel, $message); $this->authenticateConnection($connection, $channel, $message);
Log::channel('websocket')->info('Executing event: ' . $message['event']); Log::channel('websocket')->info('Executing event: '.$message['event']);
if (strpos($message['event'], 'pusher') !== false) { if (strpos($message['event'], 'pusher') !== false) {
try { try {
@ -108,7 +107,7 @@ class Handler implements MessageComponentInterface
); );
} catch (Exception $e) { } catch (Exception $e) {
return $connection->send(json_encode([ return $connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'].':error',
'data' => [ 'data' => [
'message' => $e->getMessage(), 'message' => $e->getMessage(),
], ],
@ -122,7 +121,7 @@ class Handler implements MessageComponentInterface
Log::error('Fork error'); Log::error('Fork error');
} elseif ($pid == 0) { } elseif ($pid == 0) {
try { try {
\DB::reconnect(); DB::reconnect();
$this->setRequest($message, $connection); $this->setRequest($message, $connection);
$mock = new MockConnection($connection); $mock = new MockConnection($connection);
@ -135,7 +134,7 @@ class Handler implements MessageComponentInterface
); );
} catch (Exception $e) { } catch (Exception $e) {
$mock->send(json_encode([ $mock->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'].':error',
'data' => [ 'data' => [
'message' => $e->getMessage(), 'message' => $e->getMessage(),
], ],
@ -151,7 +150,7 @@ class Handler implements MessageComponentInterface
/** /**
* Handle the websocket close. * Handle the websocket close.
*/ */
public function onClose(ConnectionInterface $connection) : void public function onClose(ConnectionInterface $connection): void
{ {
if (optional($connection)->tenant) { if (optional($connection)->tenant) {
if (optional($connection->tenant)->tenantable) { if (optional($connection->tenant)->tenantable) {
@ -172,16 +171,16 @@ class Handler implements MessageComponentInterface
} }
cache()->forget( cache()->forget(
'ws_socket_tenantable_' . $connection->socketId, 'ws_socket_tenantable_'.$connection->socketId,
); );
if (@$this->channel_connections[$channel]) { if (@$this->channel_connections[$channel]) {
cache()->forever( cache()->forever(
'ws_channel_connections_' . $channel, 'ws_channel_connections_'.$channel,
@$this->channel_connections[$channel] @$this->channel_connections[$channel]
); );
} else { } else {
cache()->forget('ws_channel_connections_' . $channel); cache()->forget('ws_channel_connections_'.$channel);
} }
cache()->forever( cache()->forever(
@ -192,13 +191,13 @@ class Handler implements MessageComponentInterface
$this->channelManager $this->channelManager
->unsubscribeFromAllChannels($connection) ->unsubscribeFromAllChannels($connection)
->then(function (bool $unsubscribed) use ($connection) : void { ->then(function (bool $unsubscribed) use ($connection): void {
if (isset($connection->app)) { if (isset($connection->app)) {
$this->channelManager->unsubscribeFromApp($connection->app->id); $this->channelManager->unsubscribeFromApp($connection->app->id);
ConnectionClosed::dispatch($connection->app->id, $connection->socketId); ConnectionClosed::dispatch($connection->app->id, $connection->socketId);
cache()->forget('ws_connection_' . $connection->socketId); cache()->forget('ws_connection_'.$connection->socketId);
} }
}); });
} }
@ -208,7 +207,7 @@ class Handler implements MessageComponentInterface
* *
* @param WebSocketException $exception * @param WebSocketException $exception
*/ */
public function onError(ConnectionInterface $connection, Exception $exception) : void public function onError(ConnectionInterface $connection, Exception $exception): void
{ {
if ($exception instanceof ExceptionsWebSocketException) { if ($exception instanceof ExceptionsWebSocketException) {
$connection->send(json_encode( $connection->send(json_encode(
@ -221,7 +220,7 @@ class Handler implements MessageComponentInterface
* Check if the connection can be made for the * Check if the connection can be made for the
* current server instance. * current server instance.
*/ */
protected function connectionCanBeMade(ConnectionInterface $connection) : bool protected function connectionCanBeMade(ConnectionInterface $connection): bool
{ {
return $this->channelManager->acceptsNewConnections(); return $this->channelManager->acceptsNewConnections();
} }
@ -241,7 +240,9 @@ class Handler implements MessageComponentInterface
throw new UnknownAppKey($appKey); throw new UnknownAppKey($appKey);
} }
$connection->app = $app; $app->then(function ($app) use ($connection) {
$connection->app = $app;
});
return $this; return $this;
} }
@ -278,7 +279,7 @@ class Handler implements MessageComponentInterface
if (! is_null($capacity = $connection->app->capacity)) { if (! is_null($capacity = $connection->app->capacity)) {
$this->channelManager $this->channelManager
->getGlobalConnectionsCount($connection->app->id) ->getGlobalConnectionsCount($connection->app->id)
->then(function ($connectionsCount) use ($capacity, $connection) : void { ->then(function ($connectionsCount) use ($capacity, $connection): void {
if ($connectionsCount >= $capacity) { if ($connectionsCount >= $capacity) {
$exception = new ConnectionsOverCapacity; $exception = new ConnectionsOverCapacity;
@ -324,7 +325,7 @@ class Handler implements MessageComponentInterface
return $this; return $this;
} }
protected function get_connection_channel(&$connection, &$message) : ?PrivateChannel protected function get_connection_channel(&$connection, &$message): ?PrivateChannel
{ {
// Put channel on its place // Put channel on its place
if (! @$message['channel'] && $message['data'] && $message['data']['channel']) { if (! @$message['channel'] && $message['data'] && $message['data']['channel']) {
@ -359,7 +360,7 @@ class Handler implements MessageComponentInterface
} }
cache()->forever( cache()->forever(
'ws_channel_connections_' . $channel_name, 'ws_channel_connections_'.$channel_name,
$this->channel_connections[$channel_name] $this->channel_connections[$channel_name]
); );
@ -380,11 +381,11 @@ class Handler implements MessageComponentInterface
if (@$this->channel_connections[$channel_name]) { if (@$this->channel_connections[$channel_name]) {
cache()->forever( cache()->forever(
'ws_channel_connections_' . $channel_name, 'ws_channel_connections_'.$channel_name,
$this->channel_connections[$channel_name] $this->channel_connections[$channel_name]
); );
} else { } else {
cache()->forget('ws_channel_connections_' . $channel_name); cache()->forget('ws_channel_connections_'.$channel_name);
} }
cache()->forever( cache()->forever(
@ -428,37 +429,12 @@ class Handler implements MessageComponentInterface
PrivateChannel|Channel|PresenceChannel|null $channel, PrivateChannel|Channel|PresenceChannel|null $channel,
$message $message
) { ) {
if (! optional($connection)->tenant && isset($message['data']['tenant'])) {
$tenant = Tenant::where('slug', $message['data']['tenant'])->first();
if (! $tenant) { if (! optional($connection)->auth && $connection->socketId && cache()->get('socket_'.$connection->socketId)) {
return $connection->send(json_encode([
'channel' => $message['channel'],
'event' => $message['event'] . ':error',
'data' => [
'message' => 'No tenant has been supplied',
],
]));
}
if (optional($tenant)->is_user) { $cached_auth = cache()->get('socket_'.$connection->socketId);
$connection->user = $tenant->tenantable;
cache()->forever( $connection->user = @$cached_auth['type']::find($cached_auth['id']);
'ws_socket_tenantable_' . $connection->socketId,
$tenant->tenantable
);
}
$connection->tenant = $tenant;
if (optional($connection)->tenant) {
if (optional($connection->tenant)->tenantable) {
$connection->tenant->tenantable->logActivity('Connected to websocket', $connection->tenant->tenantable, 'info', 'websocket');
} else {
$connection->tenant->logActivity('Connected to websocket', $connection->tenant, 'info', 'websocket');
}
}
$channel->saveConnection($connection); $channel->saveConnection($connection);
} }
@ -471,12 +447,8 @@ class Handler implements MessageComponentInterface
// Set auth or logout // Set auth or logout
($connection->user) ($connection->user)
? Auth::login($connection->user) ? auth()->login($connection->user)
: Auth::logout(); : auth()->logout();
if (Auth::user()) {
Auth::user()->update_last_online();
}
} }
private function addDataCheckLoop( private function addDataCheckLoop(
@ -486,14 +458,14 @@ class Handler implements MessageComponentInterface
$optional = false, $optional = false,
$iteration = false $iteration = false
) { ) {
$pid = explode('_', $pid . '')[0]; $pid = explode('_', $pid.'')[0];
if ($iteration >= 0 && $iteration !== false) { if ($iteration >= 0 && $iteration !== false) {
$pid .= '_' . $iteration; $pid .= '_'.$iteration;
} }
// Set timeout start // Set timeout start
$pidcache_start = 'dedicated_start_' . $pid; $pidcache_start = 'dedicated_start_'.$pid;
cache()->put($pidcache_start, microtime(true), 100); cache()->put($pidcache_start, microtime(true), 100);
// Periodic check for data // Periodic check for data
@ -505,8 +477,8 @@ class Handler implements MessageComponentInterface
$optional, $optional,
$iteration $iteration
) { ) {
$pidcache_data = 'dedicated_data_' . $pid; $pidcache_data = 'dedicated_data_'.$pid;
$pidcache_done = 'dedicated_data_' . $pid . '_done'; $pidcache_done = 'dedicated_data_'.$pid.'_done';
if ( if (
cache()->has($pidcache_start) cache()->has($pidcache_start)
@ -514,7 +486,7 @@ class Handler implements MessageComponentInterface
) { ) {
if (! $optional) { if (! $optional) {
$connection->send(json_encode([ $connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'].':error',
'data' => [ 'data' => [
'message' => 'Timeout', 'message' => 'Timeout',
'diff' => $diff, 'diff' => $diff,

View File

@ -5,6 +5,7 @@ declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Websocket; namespace BlaxSoftware\LaravelWebSockets\Websocket;
use React\Socket\Connection; use React\Socket\Connection;
use Illuminate\Support\Facades\Log;
class MockConnection extends Connection implements \Ratchet\ConnectionInterface class MockConnection extends Connection implements \Ratchet\ConnectionInterface
{ {
@ -50,7 +51,7 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface
public function send($data) public function send($data)
{ {
\Log::channel('websocket')->info('[MockConnection] Send for pid: ' . getmypid(), [ Log::channel('websocket')->info('[MockConnection] Send for pid: ' . getmypid(), [
'data' => $data, 'data' => $data,
]); ]);