R removed polling, transitioned to kernel events

This commit is contained in:
Fabian @ Blax Software 2026-02-09 13:04:22 +01:00
parent cc78a9b4d5
commit 1eafa237a0
3 changed files with 4 additions and 261 deletions

View File

@ -510,7 +510,6 @@ class StartServer extends Command
$commonClasses = [ $commonClasses = [
\BlaxSoftware\LaravelWebSockets\Websocket\Controller::class, \BlaxSoftware\LaravelWebSockets\Websocket\Controller::class,
\BlaxSoftware\LaravelWebSockets\Websocket\ControllerResolver::class, \BlaxSoftware\LaravelWebSockets\Websocket\ControllerResolver::class,
\BlaxSoftware\LaravelWebSockets\Websocket\MockConnection::class,
\BlaxSoftware\LaravelWebSockets\Websocket\MockConnectionSocketPair::class, \BlaxSoftware\LaravelWebSockets\Websocket\MockConnectionSocketPair::class,
\BlaxSoftware\LaravelWebSockets\Ipc\SocketPairIpc::class, \BlaxSoftware\LaravelWebSockets\Ipc\SocketPairIpc::class,
\BlaxSoftware\LaravelWebSockets\Cache\IpcCache::class, \BlaxSoftware\LaravelWebSockets\Cache\IpcCache::class,

View File

@ -15,7 +15,7 @@ use Illuminate\Support\Facades\Log;
class Controller class Controller
{ {
protected bool $isMockConnection; protected bool $isMockConnection;
protected MockConnection|MockConnectionSocketPair|null $mockConnectionClone = null; protected MockConnectionSocketPair|null $mockConnectionClone = null;
final public function __construct( final public function __construct(
protected ConnectionInterface $connection, protected ConnectionInterface $connection,
@ -25,8 +25,7 @@ class Controller
) { ) {
// Cache class check to avoid repeated get_class() calls (reflection is slow) // Cache class check to avoid repeated get_class() calls (reflection is slow)
$connectionClass = get_class($connection); $connectionClass = get_class($connection);
$this->isMockConnection = $connectionClass === MockConnection::class $this->isMockConnection = $connectionClass === MockConnectionSocketPair::class;
|| $connectionClass === MockConnectionSocketPair::class;
// Pre-clone MockConnection once if needed (reuse across method calls) // Pre-clone MockConnection once if needed (reuse across method calls)
if ($this->isMockConnection) { if ($this->isMockConnection) {

View File

@ -5,7 +5,6 @@ declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Websocket; namespace BlaxSoftware\LaravelWebSockets\Websocket;
use BlaxSoftware\LaravelWebSockets\Apps\App; use BlaxSoftware\LaravelWebSockets\Apps\App;
use BlaxSoftware\LaravelWebSockets\Cache\IpcCache;
use BlaxSoftware\LaravelWebSockets\Channels\Channel; use BlaxSoftware\LaravelWebSockets\Channels\Channel;
use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel; use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel;
use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel; use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel;
@ -31,12 +30,6 @@ use Ratchet\WebSocket\MessageComponentInterface;
class Handler implements MessageComponentInterface class Handler implements MessageComponentInterface
{ {
/**
* Whether to use event-driven socket pair IPC (true) or polling (false)
* Socket pairs are instant but require sockets extension
*/
private bool $useSocketPairIpc;
/** /**
* Track channel connections using associative arrays for O(1) lookup * Track channel connections using associative arrays for O(1) lookup
* Structure: [channel_name => [socket_id => true]] * Structure: [channel_name => [socket_id => true]]
@ -73,10 +66,7 @@ class Handler implements MessageComponentInterface
*/ */
public function __construct( public function __construct(
protected ChannelManager $channelManager protected ChannelManager $channelManager
) { ) {}
// Use socket pair IPC if available (instant), otherwise fall back to polling
$this->useSocketPairIpc = SocketPairIpc::isSupported();
}
/** /**
* Handle incoming WebSocket message with optimized fast path for ping/pong * Handle incoming WebSocket message with optimized fast path for ping/pong
@ -194,7 +184,7 @@ class Handler implements MessageComponentInterface
return; return;
} }
$this->forkAndProcessMessage($connection, $channel, $messageArray); $this->forkWithSocketPair($connection, $channel, $messageArray);
} }
/** /**
@ -354,18 +344,6 @@ class Handler implements MessageComponentInterface
return true; return true;
} }
protected function forkAndProcessMessage(
ConnectionInterface $connection,
Channel $channel,
array $message
): void {
if ($this->useSocketPairIpc) {
$this->forkWithSocketPair($connection, $channel, $message);
} else {
$this->forkWithPolling($connection, $channel, $message);
}
}
/** /**
* Check if hot reload mode is enabled * Check if hot reload mode is enabled
*/ */
@ -606,73 +584,6 @@ class Handler implements MessageComponentInterface
} }
} }
/**
* Fork with polling-based IPC (fallback when socket pairs unavailable)
*/
protected function forkWithPolling(
ConnectionInterface $connection,
Channel $channel,
array $message
): void {
// Generate unique request ID BEFORE forking to avoid race conditions
$requestId = uniqid('req_', true) . '_' . bin2hex(random_bytes(4));
$pid = pcntl_fork();
if ($pid === -1) {
Log::error('Fork error');
return;
}
if ($pid === 0) {
$this->processMessageInChild($connection, $channel, $message, $requestId);
exit(0);
}
$this->addDataCheckLoop($connection, $message, $requestId);
}
protected function processMessageInChild(
ConnectionInterface $connection,
Channel $channel,
array $message,
string $requestId
): void {
// Hot reload: clear all caches for fresh code loading (only in dev mode)
$this->hotReloadChild();
try {
// Lazy DB reconnect: disconnect now, reconnect only when first query runs
// This saves ~5-15ms for methods that don't use the database
DB::disconnect();
$this->setRequest($message, $connection);
$mock = new MockConnection($connection, $requestId);
Controller::controll_message(
$mock,
$channel,
$message,
$this->channelManager
);
\Illuminate\Container\Container::getInstance()
->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class)
->invokeWhen(fn($callback) => true);
} catch (Exception $e) {
$mock->send(json_encode([
'event' => $message['event'] . ':error',
'data' => [
'message' => $e->getMessage(),
],
]));
if (app()->bound('sentry')) {
app('sentry')->captureException($e);
}
}
}
protected function handleMessageError(\Throwable $e): void protected function handleMessageError(\Throwable $e): void
{ {
Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [ Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [
@ -1136,172 +1047,6 @@ class Handler implements MessageComponentInterface
$this->cacheBufferScheduled = false; $this->cacheBufferScheduled = false;
} }
/**
* IPC polling interval in seconds.
* Lower = faster response, higher CPU usage.
* 0.001 = 1ms, 0.002 = 2ms, 0.01 = 10ms
*/
private const IPC_POLL_INTERVAL = 0.002; // 2ms - good balance of speed and CPU
private function addDataCheckLoop(
$connection,
$message,
string $requestId,
$optional = false,
int $iteration = 0
) {
$iterationKey = $requestId . ($iteration > 0 ? '_' . $iteration : '');
$cacheKeyStart = 'dedicated_start_' . $iterationKey;
IpcCache::put($cacheKeyStart, microtime(true), 100);
$this->channelManager->loop->addPeriodicTimer(self::IPC_POLL_INTERVAL, function ($timer) use (
$cacheKeyStart,
$iterationKey,
$message,
$requestId,
$connection,
$optional,
$iteration
) {
$this->checkDataLoopIteration(
$timer,
$cacheKeyStart,
$message,
$iterationKey,
$requestId,
$connection,
$optional,
$iteration
);
pcntl_waitpid(-1, $status, WNOHANG);
});
}
protected function checkDataLoopIteration(
$timer,
string $cacheKeyStart,
array $message,
string $iterationKey,
string $requestId,
$connection,
bool $optional,
int $iteration
): void {
$cacheKeyData = 'dedicated_data_' . $iterationKey;
$cacheKeyDone = 'dedicated_data_' . $iterationKey . '_done';
$cacheKeyComplete = 'dedicated_data_' . $iterationKey . '_complete';
if ($this->handleTimeout($timer, $cacheKeyStart, $cacheKeyComplete, $message, $connection, $optional)) {
return;
}
if (!IpcCache::has($cacheKeyDone)) {
return;
}
// Clean up cache entries for this iteration before processing
// This prevents memory leaks and stale data issues
$this->cleanupIterationCache($iterationKey);
$this->scheduleNextIteration($connection, $message, $requestId, $iteration);
$this->processAndSendData($connection, $cacheKeyData);
$this->channelManager->loop->cancelTimer($timer);
}
/**
* Clean up cache entries for a completed iteration
*/
protected function cleanupIterationCache(string $iterationKey): void
{
$keysToDelete = [
'dedicated_start_' . $iterationKey,
'dedicated_data_' . $iterationKey . '_done',
// Note: We don't delete 'dedicated_data_' here as we need it for processAndSendData
// It will expire naturally after 60 seconds
];
IpcCache::forgetMultiple($keysToDelete);
}
protected function handleTimeout(
$timer,
string $cacheKeyStart,
string $cacheKeyComplete,
array $message,
$connection,
bool $optional
): bool {
$startTime = IpcCache::get($cacheKeyStart);
if ($startTime === null) {
return false;
}
$diff = microtime(true) - ((float) $startTime);
if ($diff <= 60) {
return false;
}
if (!$optional) {
$connection->send(json_encode([
'event' => $message['event'] . ':error',
'data' => [
'message' => $message['event'] . ' timeout',
'diff' => $diff,
],
]));
}
$this->channelManager->loop->cancelTimer($timer);
IpcCache::put($cacheKeyComplete, true, 360);
return true;
}
protected function scheduleNextIteration($connection, array $message, string $requestId, int $iteration): void
{
$nextIteration = $iteration + 1;
$this->addDataCheckLoop($connection, $message, $requestId, true, $nextIteration);
}
protected function processAndSendData($connection, string $cacheKeyData): void
{
$sending = IpcCache::get($cacheKeyData);
// Clean up the data cache key immediately after reading
IpcCache::forget($cacheKeyData);
if (!$sending) {
return;
}
$bm = json_decode($sending, true);
if (isset($bm['broadcast']) && $bm['broadcast']) {
$this->broadcast(
$connection->app->id,
$bm['data'] ?? null,
$bm['event'] ?? null,
$bm['channel'] ?? null,
$bm['including_self'] ?? false,
$connection
);
return;
}
if (isset($bm['whisper']) && $bm['whisper']) {
$this->whisper(
$connection->app->id,
$bm['data'] ?? null,
$bm['event'] ?? null,
$bm['socket_ids'] ?? [],
$bm['channel'] ?? null,
);
return;
}
$connection->send($sending);
}
public function broadcast( public function broadcast(
string $appId, string $appId,
mixed $payload, mixed $payload,