I max concurrency handling, BF restart

This commit is contained in:
Fabian @ Blax Software 2026-03-20 13:44:51 +01:00
parent a2a5524637
commit 35b2731349
5 changed files with 205 additions and 10 deletions

View File

@ -24,6 +24,27 @@ return [
*/
'hot_reload' => env('WEBSOCKET_HOT_RELOAD', env('APP_DEBUG', false)),
/*
|--------------------------------------------------------------------------
| Max Concurrent Children (Fork Limit)
|--------------------------------------------------------------------------
|
| Each WebSocket message is processed in a forked child process.
| Each child may open its own MySQL connection. To prevent exhausting
| MySQL's max_connections under message bursts, this limits how many
| child processes can run concurrently.
|
| Messages arriving when the limit is reached are queued in memory
| and processed as child slots free up. This provides backpressure
| instead of crashing MySQL.
|
| Recommended: Set to ~60-70% of MySQL's max_connections minus
| connections used by PHP-FPM, queue workers, and other services.
| For MySQL default of 151: 50 is a safe default.
|
*/
'max_concurrent_children' => (int) env('WEBSOCKET_MAX_CHILDREN', 50),
/*
|--------------------------------------------------------------------------
| Broadcast Socket Settings

View File

@ -16,7 +16,7 @@ class RestartServer extends Command
* @var string
*/
protected $signature = 'websockets:restart
{--cache-driver=file : The cache driver to use for the server. Redis will not work due to concurrency issues.}
{--cache-driver= : The cache driver to use for signaling (defaults to app cache driver).}
{--soft : Use soft shutdown (gracefully close connections) instead of hard shutdown.}';
/**
@ -40,8 +40,9 @@ class RestartServer extends Command
\Log::channel('websocket')->info('WebSocket restart server command called ...');
config(['cache.default' => $this->option('cache-driver', 'file')]);
\Log::channel('websocket')->debug('Cache driver configured', ['driver' => $this->option('cache-driver', 'file')]);
$cacheDriver = $this->option('cache-driver') ?: config('cache.default');
config(['cache.default' => $cacheDriver]);
\Log::channel('websocket')->debug('Cache driver configured', ['driver' => $cacheDriver]);
$restartData = [
'time' => $this->currentTime(),

View File

@ -81,6 +81,13 @@ class StartServer extends Command
*/
protected $lastSteer = null;
/**
* The resolved cache store name for signaling.
*
* @var string
*/
protected string $cacheStore = 'file';
/**
* Initialize the command.
*
@ -129,6 +136,7 @@ class StartServer extends Command
// (2) child processes purge inherited Redis connections and get fresh ones
$cacheDriver = $this->option('cache-driver') ?: config('cache.default');
config(['cache.default' => $cacheDriver]);
$this->cacheStore = $cacheDriver;
\Log::channel('websocket')->debug('Cache driver configured', ['driver' => $cacheDriver]);
WebsocketService::resetAllTracking();
@ -308,7 +316,7 @@ class StartServer extends Command
*/
public function configureSteerTimer(): void
{
$steerData = Cache::store('file')->get('blax:websockets:steer');
$steerData = Cache::store($this->cacheStore)->get('blax:websockets:steer');
$this->lastSteer = $steerData['time'] ?? null;
\Log::channel('websocket')->debug('Steer timer configured', [
@ -316,7 +324,7 @@ class StartServer extends Command
]);
$this->loop->addPeriodicTimer(5, function () {
$steerData = Cache::store('file')->get('blax:websockets:steer');
$steerData = Cache::store($this->cacheStore)->get('blax:websockets:steer');
$currentSteer = $steerData['time'] ?? null;
if ($currentSteer !== null && $currentSteer !== $this->lastSteer) {
@ -751,7 +759,7 @@ class StartServer extends Command
*/
protected function getLastRestartData()
{
$data = Cache::get('blax:websockets:restart');
$data = Cache::store($this->cacheStore)->get('blax:websockets:restart');
// Handle legacy format (just timestamp) for backwards compatibility
if (is_numeric($data)) {

View File

@ -12,7 +12,7 @@ class SteerServer extends Command
protected $signature = 'websocket:steer
{action : The action to send (cache:clear, restart, restart:soft)}
{--cache-driver=file : The cache driver to use for signaling.}';
{--cache-driver= : The cache driver to use for signaling (defaults to app cache driver).}';
protected $description = 'Send a steering command to the running WebSocket server.';
@ -37,7 +37,7 @@ class SteerServer extends Command
return self::FAILURE;
}
$store = $this->option('cache-driver') ?: 'file';
$store = $this->option('cache-driver') ?: config('cache.default');
Cache::store($store)->forever('blax:websockets:steer', [
'action' => $action,

View File

@ -64,12 +64,34 @@ class Handler implements MessageComponentInterface
*/
private static ?bool $debug = null;
/**
* Track active child processes to limit concurrent DB connections.
* Each forked child may open its own MySQL connection, so we must
* cap concurrency to avoid exhausting MySQL's max_connections.
*/
private int $activeChildCount = 0;
/**
* Maximum concurrent child processes (and thus DB connections).
* Configurable via websockets.max_concurrent_children config.
* Default 50 leaves headroom for PHP-FPM, queue workers, etc.
*/
private int $maxConcurrentChildren = 50;
/**
* Queue of deferred messages waiting for a child slot.
* Each entry is [ConnectionInterface, Channel, array $message].
*/
private array $deferredMessages = [];
/**
* Initialize a new handler.
*/
public function __construct(
protected ChannelManager $channelManager
) {}
) {
$this->maxConcurrentChildren = (int) config('websockets.max_concurrent_children', 50);
}
/**
* Handle incoming WebSocket message with optimized fast path for ping/pong
@ -467,19 +489,42 @@ class Handler implements MessageComponentInterface
/**
* Fork with event-driven socket pair IPC (no polling!)
* Parent is notified INSTANTLY when child sends data
*
* Includes a concurrency limiter: if max_concurrent_children is reached,
* the message is queued and processed when a child slot frees up.
* This prevents exhausting MySQL's max_connections under load.
*/
protected function forkWithSocketPair(
ConnectionInterface $connection,
Channel $channel,
array $message
): void {
// Check concurrency limit before forking
if ($this->activeChildCount >= $this->maxConcurrentChildren) {
// Queue the message for later processing
$this->deferredMessages[] = [$connection, $channel, $message];
if (count($this->deferredMessages) === 1) {
// Log only on first deferral to avoid log spam
Log::channel('websocket')->warning('Fork concurrency limit reached (' . $this->maxConcurrentChildren . '), queueing message', [
'active_children' => $this->activeChildCount,
'queued' => count($this->deferredMessages),
]);
}
return;
}
$this->activeChildCount++;
// Create socket pair BEFORE fork
$ipc = SocketPairIpc::create($this->channelManager->loop);
$pid = pcntl_fork();
if ($pid === -1) {
$this->activeChildCount--;
Log::error('Fork error');
$this->processDeferredMessages();
return;
}
@ -503,6 +548,18 @@ class Handler implements MessageComponentInterface
app()->forgetInstance('cache.store');
app()->forgetInstance('redis');
// Configure DB reconnect-on-lost-connection for this child.
// If MySQL returns "Too many connections" or "server has gone away",
// Laravel will retry the query once after reconnecting.
try {
$dbConfig = config('database.connections.' . config('database.default'), []);
if (empty($dbConfig['retry_on_connection_loss'] ?? null)) {
config(['database.connections.' . config('database.default') . '.retry_on_connection_loss' => true]);
}
} catch (\Throwable $e) {
// Non-critical, continue without retry config
}
$this->setRequest($message, $connection);
// Set up per-connection session (backed by Redis)
@ -512,7 +569,7 @@ class Handler implements MessageComponentInterface
// Create mock that sends via socket pair
$mock = new MockConnectionSocketPair($connection, $ipc);
Controller::controll_message(
$this->executeControllerWithDbResilience(
$mock,
$channel,
$message,
@ -532,6 +589,14 @@ class Handler implements MessageComponentInterface
'data' => ['message' => $e->getMessage()],
]));
// Log DB connection failures specifically for monitoring
if ($this->isDbConnectionError($e)) {
Log::channel('websocket')->error('DB connection failure in child process', [
'error' => $e->getMessage(),
'event' => $message['event'] ?? 'unknown',
]);
}
if (app()->bound('sentry')) {
app('sentry')->captureException($e);
}
@ -560,10 +625,110 @@ class Handler implements MessageComponentInterface
function () {
// Cleanup zombie process
pcntl_waitpid(-1, $status, WNOHANG);
// Free up a child slot and process any queued messages
$this->activeChildCount = max(0, $this->activeChildCount - 1);
$this->processDeferredMessages();
}
);
}
/**
* Process queued messages that were deferred due to concurrency limits.
* Called when a child process exits, freeing a slot.
*/
protected function processDeferredMessages(): void
{
while (!empty($this->deferredMessages) && $this->activeChildCount < $this->maxConcurrentChildren) {
[$connection, $channel, $message] = array_shift($this->deferredMessages);
// Verify the connection is still open before processing
if (!isset($connection->socketId) || !isset($connection->app)) {
continue;
}
$this->forkWithSocketPair($connection, $channel, $message);
}
if (!empty($this->deferredMessages)) {
Log::channel('websocket')->info('Deferred message queue: ' . count($this->deferredMessages) . ' remaining');
}
}
/**
* Execute the controller with DB connection resilience.
* If the first attempt fails with a DB connection error (e.g., "Too many connections",
* "server has gone away"), waits briefly and retries once with a fresh connection.
*/
protected function executeControllerWithDbResilience(
$mock,
Channel $channel,
array $message,
ChannelManager $channelManager
): void {
try {
Controller::controll_message($mock, $channel, $message, $channelManager);
} catch (\Throwable $e) {
if (!$this->isDbConnectionError($e)) {
throw $e;
}
// DB connection error — wait briefly for connections to free up, then retry
Log::channel('websocket')->warning('DB connection error, retrying after 500ms', [
'error' => $e->getMessage(),
'event' => $message['event'] ?? 'unknown',
]);
usleep(500_000); // 500ms backoff
// Force a completely fresh DB connection
try {
DB::disconnect();
DB::reconnect();
} catch (\Throwable $reconnectError) {
// If reconnect also fails, throw the original error
Log::channel('websocket')->error('DB reconnect failed after retry', [
'error' => $reconnectError->getMessage(),
]);
throw $e;
}
// Retry the controller execution once
Controller::controll_message($mock, $channel, $message, $channelManager);
}
}
/**
* Check if an exception is a DB connection error (too many connections, gone away, etc.)
*/
protected function isDbConnectionError(\Throwable $e): bool
{
$message = $e->getMessage();
$previous = $e->getPrevious();
$fullMessage = $message . ($previous ? ' ' . $previous->getMessage() : '');
$dbErrorPatterns = [
'Too many connections',
'SQLSTATE[08004]',
'SQLSTATE[HY000] [1040]',
'server has gone away',
'SQLSTATE[HY000] [2006]',
'Lost connection to MySQL',
'SQLSTATE[HY000] [2002]',
'Connection refused',
'SQLSTATE[08S01]',
'no connection to the server',
];
foreach ($dbErrorPatterns as $pattern) {
if (stripos($fullMessage, $pattern) !== false) {
return true;
}
}
return false;
}
/**
* Handle data received from child via socket pair
*/