diff --git a/config/websockets.php b/config/websockets.php index d39a3f2..aa53564 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -24,6 +24,27 @@ return [ */ 'hot_reload' => env('WEBSOCKET_HOT_RELOAD', env('APP_DEBUG', false)), + /* + |-------------------------------------------------------------------------- + | Max Concurrent Children (Fork Limit) + |-------------------------------------------------------------------------- + | + | Each WebSocket message is processed in a forked child process. + | Each child may open its own MySQL connection. To prevent exhausting + | MySQL's max_connections under message bursts, this limits how many + | child processes can run concurrently. + | + | Messages arriving when the limit is reached are queued in memory + | and processed as child slots free up. This provides backpressure + | instead of crashing MySQL. + | + | Recommended: Set to ~60-70% of MySQL's max_connections minus + | connections used by PHP-FPM, queue workers, and other services. + | For MySQL default of 151: 50 is a safe default. + | + */ + 'max_concurrent_children' => (int) env('WEBSOCKET_MAX_CHILDREN', 50), + /* |-------------------------------------------------------------------------- | Broadcast Socket Settings diff --git a/src/Console/Commands/RestartServer.php b/src/Console/Commands/RestartServer.php index 01251fb..277c7f2 100644 --- a/src/Console/Commands/RestartServer.php +++ b/src/Console/Commands/RestartServer.php @@ -16,7 +16,7 @@ class RestartServer extends Command * @var string */ protected $signature = 'websockets:restart - {--cache-driver=file : The cache driver to use for the server. Redis will not work due to concurrency issues.} + {--cache-driver= : The cache driver to use for signaling (defaults to app cache driver).} {--soft : Use soft shutdown (gracefully close connections) instead of hard shutdown.}'; /** @@ -40,8 +40,9 @@ class RestartServer extends Command \Log::channel('websocket')->info('WebSocket restart server command called ...'); - config(['cache.default' => $this->option('cache-driver', 'file')]); - \Log::channel('websocket')->debug('Cache driver configured', ['driver' => $this->option('cache-driver', 'file')]); + $cacheDriver = $this->option('cache-driver') ?: config('cache.default'); + config(['cache.default' => $cacheDriver]); + \Log::channel('websocket')->debug('Cache driver configured', ['driver' => $cacheDriver]); $restartData = [ 'time' => $this->currentTime(), diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 6301548..ccfc642 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -81,6 +81,13 @@ class StartServer extends Command */ protected $lastSteer = null; + /** + * The resolved cache store name for signaling. + * + * @var string + */ + protected string $cacheStore = 'file'; + /** * Initialize the command. * @@ -129,6 +136,7 @@ class StartServer extends Command // (2) child processes purge inherited Redis connections and get fresh ones $cacheDriver = $this->option('cache-driver') ?: config('cache.default'); config(['cache.default' => $cacheDriver]); + $this->cacheStore = $cacheDriver; \Log::channel('websocket')->debug('Cache driver configured', ['driver' => $cacheDriver]); WebsocketService::resetAllTracking(); @@ -308,7 +316,7 @@ class StartServer extends Command */ public function configureSteerTimer(): void { - $steerData = Cache::store('file')->get('blax:websockets:steer'); + $steerData = Cache::store($this->cacheStore)->get('blax:websockets:steer'); $this->lastSteer = $steerData['time'] ?? null; \Log::channel('websocket')->debug('Steer timer configured', [ @@ -316,7 +324,7 @@ class StartServer extends Command ]); $this->loop->addPeriodicTimer(5, function () { - $steerData = Cache::store('file')->get('blax:websockets:steer'); + $steerData = Cache::store($this->cacheStore)->get('blax:websockets:steer'); $currentSteer = $steerData['time'] ?? null; if ($currentSteer !== null && $currentSteer !== $this->lastSteer) { @@ -751,7 +759,7 @@ class StartServer extends Command */ protected function getLastRestartData() { - $data = Cache::get('blax:websockets:restart'); + $data = Cache::store($this->cacheStore)->get('blax:websockets:restart'); // Handle legacy format (just timestamp) for backwards compatibility if (is_numeric($data)) { diff --git a/src/Console/Commands/SteerServer.php b/src/Console/Commands/SteerServer.php index 63c8162..ad5378e 100644 --- a/src/Console/Commands/SteerServer.php +++ b/src/Console/Commands/SteerServer.php @@ -12,7 +12,7 @@ class SteerServer extends Command protected $signature = 'websocket:steer {action : The action to send (cache:clear, restart, restart:soft)} - {--cache-driver=file : The cache driver to use for signaling.}'; + {--cache-driver= : The cache driver to use for signaling (defaults to app cache driver).}'; protected $description = 'Send a steering command to the running WebSocket server.'; @@ -37,7 +37,7 @@ class SteerServer extends Command return self::FAILURE; } - $store = $this->option('cache-driver') ?: 'file'; + $store = $this->option('cache-driver') ?: config('cache.default'); Cache::store($store)->forever('blax:websockets:steer', [ 'action' => $action, diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index d617021..e90b5f0 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -64,12 +64,34 @@ class Handler implements MessageComponentInterface */ private static ?bool $debug = null; + /** + * Track active child processes to limit concurrent DB connections. + * Each forked child may open its own MySQL connection, so we must + * cap concurrency to avoid exhausting MySQL's max_connections. + */ + private int $activeChildCount = 0; + + /** + * Maximum concurrent child processes (and thus DB connections). + * Configurable via websockets.max_concurrent_children config. + * Default 50 leaves headroom for PHP-FPM, queue workers, etc. + */ + private int $maxConcurrentChildren = 50; + + /** + * Queue of deferred messages waiting for a child slot. + * Each entry is [ConnectionInterface, Channel, array $message]. + */ + private array $deferredMessages = []; + /** * Initialize a new handler. */ public function __construct( protected ChannelManager $channelManager - ) {} + ) { + $this->maxConcurrentChildren = (int) config('websockets.max_concurrent_children', 50); + } /** * Handle incoming WebSocket message with optimized fast path for ping/pong @@ -467,19 +489,42 @@ class Handler implements MessageComponentInterface /** * Fork with event-driven socket pair IPC (no polling!) * Parent is notified INSTANTLY when child sends data + * + * Includes a concurrency limiter: if max_concurrent_children is reached, + * the message is queued and processed when a child slot frees up. + * This prevents exhausting MySQL's max_connections under load. */ protected function forkWithSocketPair( ConnectionInterface $connection, Channel $channel, array $message ): void { + // Check concurrency limit before forking + if ($this->activeChildCount >= $this->maxConcurrentChildren) { + // Queue the message for later processing + $this->deferredMessages[] = [$connection, $channel, $message]; + + if (count($this->deferredMessages) === 1) { + // Log only on first deferral to avoid log spam + Log::channel('websocket')->warning('Fork concurrency limit reached (' . $this->maxConcurrentChildren . '), queueing message', [ + 'active_children' => $this->activeChildCount, + 'queued' => count($this->deferredMessages), + ]); + } + return; + } + + $this->activeChildCount++; + // Create socket pair BEFORE fork $ipc = SocketPairIpc::create($this->channelManager->loop); $pid = pcntl_fork(); if ($pid === -1) { + $this->activeChildCount--; Log::error('Fork error'); + $this->processDeferredMessages(); return; } @@ -503,6 +548,18 @@ class Handler implements MessageComponentInterface app()->forgetInstance('cache.store'); app()->forgetInstance('redis'); + // Configure DB reconnect-on-lost-connection for this child. + // If MySQL returns "Too many connections" or "server has gone away", + // Laravel will retry the query once after reconnecting. + try { + $dbConfig = config('database.connections.' . config('database.default'), []); + if (empty($dbConfig['retry_on_connection_loss'] ?? null)) { + config(['database.connections.' . config('database.default') . '.retry_on_connection_loss' => true]); + } + } catch (\Throwable $e) { + // Non-critical, continue without retry config + } + $this->setRequest($message, $connection); // Set up per-connection session (backed by Redis) @@ -512,7 +569,7 @@ class Handler implements MessageComponentInterface // Create mock that sends via socket pair $mock = new MockConnectionSocketPair($connection, $ipc); - Controller::controll_message( + $this->executeControllerWithDbResilience( $mock, $channel, $message, @@ -532,6 +589,14 @@ class Handler implements MessageComponentInterface 'data' => ['message' => $e->getMessage()], ])); + // Log DB connection failures specifically for monitoring + if ($this->isDbConnectionError($e)) { + Log::channel('websocket')->error('DB connection failure in child process', [ + 'error' => $e->getMessage(), + 'event' => $message['event'] ?? 'unknown', + ]); + } + if (app()->bound('sentry')) { app('sentry')->captureException($e); } @@ -560,10 +625,110 @@ class Handler implements MessageComponentInterface function () { // Cleanup zombie process pcntl_waitpid(-1, $status, WNOHANG); + + // Free up a child slot and process any queued messages + $this->activeChildCount = max(0, $this->activeChildCount - 1); + $this->processDeferredMessages(); } ); } + /** + * Process queued messages that were deferred due to concurrency limits. + * Called when a child process exits, freeing a slot. + */ + protected function processDeferredMessages(): void + { + while (!empty($this->deferredMessages) && $this->activeChildCount < $this->maxConcurrentChildren) { + [$connection, $channel, $message] = array_shift($this->deferredMessages); + + // Verify the connection is still open before processing + if (!isset($connection->socketId) || !isset($connection->app)) { + continue; + } + + $this->forkWithSocketPair($connection, $channel, $message); + } + + if (!empty($this->deferredMessages)) { + Log::channel('websocket')->info('Deferred message queue: ' . count($this->deferredMessages) . ' remaining'); + } + } + + /** + * Execute the controller with DB connection resilience. + * If the first attempt fails with a DB connection error (e.g., "Too many connections", + * "server has gone away"), waits briefly and retries once with a fresh connection. + */ + protected function executeControllerWithDbResilience( + $mock, + Channel $channel, + array $message, + ChannelManager $channelManager + ): void { + try { + Controller::controll_message($mock, $channel, $message, $channelManager); + } catch (\Throwable $e) { + if (!$this->isDbConnectionError($e)) { + throw $e; + } + + // DB connection error — wait briefly for connections to free up, then retry + Log::channel('websocket')->warning('DB connection error, retrying after 500ms', [ + 'error' => $e->getMessage(), + 'event' => $message['event'] ?? 'unknown', + ]); + + usleep(500_000); // 500ms backoff + + // Force a completely fresh DB connection + try { + DB::disconnect(); + DB::reconnect(); + } catch (\Throwable $reconnectError) { + // If reconnect also fails, throw the original error + Log::channel('websocket')->error('DB reconnect failed after retry', [ + 'error' => $reconnectError->getMessage(), + ]); + throw $e; + } + + // Retry the controller execution once + Controller::controll_message($mock, $channel, $message, $channelManager); + } + } + + /** + * Check if an exception is a DB connection error (too many connections, gone away, etc.) + */ + protected function isDbConnectionError(\Throwable $e): bool + { + $message = $e->getMessage(); + $previous = $e->getPrevious(); + $fullMessage = $message . ($previous ? ' ' . $previous->getMessage() : ''); + + $dbErrorPatterns = [ + 'Too many connections', + 'SQLSTATE[08004]', + 'SQLSTATE[HY000] [1040]', + 'server has gone away', + 'SQLSTATE[HY000] [2006]', + 'Lost connection to MySQL', + 'SQLSTATE[HY000] [2002]', + 'Connection refused', + 'SQLSTATE[08S01]', + 'no connection to the server', + ]; + + foreach ($dbErrorPatterns as $pattern) { + if (stripos($fullMessage, $pattern) !== false) { + return true; + } + } + + return false; + } + /** * Handle data received from child via socket pair */