From a3194247c455c0c9cfe9927ac922c60e855dff7c Mon Sep 17 00:00:00 2001 From: "Fabian @ Blax Software" Date: Thu, 23 Apr 2026 11:56:50 +0200 Subject: [PATCH] BF nullable deprecations --- src/ChannelManagers/LocalChannelManager.php | 6 +- src/Console/Commands/StartServer.php | 100 +++++++++++++++++--- src/Contracts/ChannelManager.php | 6 +- src/Websocket/ChannelManager.php | 17 +++- 4 files changed, 108 insertions(+), 21 deletions(-) diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index dda246b..60ef395 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -272,7 +272,7 @@ class LocalChannelManager implements ChannelManager * @param string|null $channelName * @return PromiseInterface[int] */ - public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface + public function getLocalConnectionsCount($appId, ?string $channelName = null): PromiseInterface { return $this->getLocalChannels($appId) ->then(function ($channels) use ($channelName) { @@ -296,7 +296,7 @@ class LocalChannelManager implements ChannelManager * @param string|null $channelName * @return PromiseInterface[int] */ - public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface + public function getGlobalConnectionsCount($appId, ?string $channelName = null): PromiseInterface { return $this->getLocalConnectionsCount($appId, $channelName); } @@ -311,7 +311,7 @@ class LocalChannelManager implements ChannelManager * @param string|null $serverId * @return PromiseInterface[bool] */ - public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface + public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, ?string $serverId = null): PromiseInterface { return Helpers::createFulfilledPromise(true); } diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 87ae550..3dd58b7 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -4,6 +4,7 @@ namespace BlaxSoftware\LaravelWebSockets\Console\Commands; use BlaxSoftware\LaravelWebSockets\Broadcast\BroadcastSocketServer; use BlaxSoftware\LaravelWebSockets\Cache\IpcCache; +use BlaxSoftware\LaravelWebSockets\ChannelManagers\LocalChannelManager; use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager; use BlaxSoftware\LaravelWebSockets\Facades\WebSocketRouter; use BlaxSoftware\LaravelWebSockets\Ipc\SocketPairIpc; @@ -101,10 +102,8 @@ class StartServer extends Command /** * Run the command. - * - * @return void */ - public function handle() + public function handle(): int { try { \Log::channel('websocket')->debug('websockets:serve command started', [ @@ -134,9 +133,13 @@ class StartServer extends Command // but now safe because: (1) cache() is only called in parent process, // (2) child processes purge inherited Redis connections and get fresh ones $cacheDriver = $this->option('cache-driver') ?: config('cache.default'); - config(['cache.default' => $cacheDriver]); - $this->cacheStore = $cacheDriver; - \Log::channel('websocket')->debug('Cache driver configured', ['driver' => $cacheDriver]); + $resolvedCacheDriver = $this->resolveCacheStore($cacheDriver); + config(['cache.default' => $resolvedCacheDriver]); + $this->cacheStore = $resolvedCacheDriver; + \Log::channel('websocket')->debug('Cache driver configured', [ + 'driver' => $cacheDriver, + 'resolved_driver' => $resolvedCacheDriver, + ]); WebsocketService::resetAllTracking(); \Log::channel('websocket')->debug('WebsocketService tracking reset'); @@ -182,6 +185,8 @@ class StartServer extends Command \Log::channel('websocket')->debug('Starting server...'); $this->startServer(); + + return self::SUCCESS; } catch (\Throwable $e) { $this->error('Error starting the WebSocket server: ' . $e->getMessage()); @@ -193,6 +198,8 @@ class StartServer extends Command if (app()->bound('sentry')) { app('sentry')->captureException($e); } + + return self::FAILURE; } } @@ -223,17 +230,60 @@ class StartServer extends Command $config = $app['config']['websockets']; $mode = $config['replication']['mode'] ?? 'local'; - $class = $config['replication']['modes'][$mode]['channel_manager']; + $class = $config['replication']['modes'][$mode]['channel_manager'] ?? LocalChannelManager::class; \Log::channel('websocket')->debug('Creating ChannelManager', [ 'mode' => $mode, 'class' => $class, ]); - return new $class($this->loop); + try { + return new $class($this->loop); + } catch (\Throwable $e) { + if ($class === LocalChannelManager::class) { + throw $e; + } + + $connectionName = $config['replication']['modes'][$mode]['connection'] ?? 'default'; + $redisHost = config("database.redis.{$connectionName}.host", 'unknown'); + $redisPort = config("database.redis.{$connectionName}.port", 6379); + + \Log::channel('websocket')->error('Failed to initialize replication channel manager; falling back to local mode.', [ + 'mode' => $mode, + 'class' => $class, + 'connection' => $connectionName, + 'redis_host' => $redisHost, + 'redis_port' => $redisPort, + 'error' => $e->getMessage(), + ]); + + $this->components->warn('Replication manager unavailable; using local channel manager fallback.'); + + return new LocalChannelManager($this->loop); + } }); } + /** + * Resolve cache store and gracefully fallback to file if unavailable. + */ + protected function resolveCacheStore(string $driver): string + { + try { + Cache::store($driver)->get('blax:websockets:cache-probe'); + + return $driver; + } catch (\Throwable $e) { + \Log::channel('websocket')->warning('Configured cache store unavailable, falling back to file.', [ + 'configured_store' => $driver, + 'fallback_store' => 'file', + 'error' => $e->getMessage(), + ]); + + return 'file'; + } + } + /** * Register the Statistics Collectors that * are not resolved in the package service provider. @@ -302,7 +352,7 @@ class StartServer extends Command */ public function configureSteerTimer(): void { - $steerData = Cache::store($this->cacheStore)->get('blax:websockets:steer'); + $steerData = $this->getSteerData(); $this->lastSteer = $steerData['time'] ?? null; \Log::channel('websocket')->debug('Steer timer configured', [ @@ -310,7 +360,7 @@ class StartServer extends Command ]); $this->loop->addPeriodicTimer(5, function () { - $steerData = Cache::store($this->cacheStore)->get('blax:websockets:steer'); + $steerData = $this->getSteerData(); $currentSteer = $steerData['time'] ?? null; if ($currentSteer !== null && $currentSteer !== $this->lastSteer) { @@ -323,6 +373,25 @@ class StartServer extends Command }); } + /** + * Read steer signal from cache without crashing the loop on transient outages. + */ + protected function getSteerData(): array + { + try { + $data = Cache::store($this->cacheStore)->get('blax:websockets:steer'); + + return is_array($data) ? $data : []; + } catch (\Throwable $e) { + \Log::channel('websocket')->warning('Unable to read websocket steer signal from cache store.', [ + 'store' => $this->cacheStore, + 'error' => $e->getMessage(), + ]); + + return []; + } + } + /** * Execute a steer action received via the cache signal. */ @@ -745,7 +814,16 @@ class StartServer extends Command */ protected function getLastRestartData() { - $data = Cache::store($this->cacheStore)->get('blax:websockets:restart'); + try { + $data = Cache::store($this->cacheStore)->get('blax:websockets:restart'); + } catch (\Throwable $e) { + \Log::channel('websocket')->warning('Unable to read websocket restart signal from cache store.', [ + 'store' => $this->cacheStore, + 'error' => $e->getMessage(), + ]); + + return ['time' => null, 'soft' => false]; + } // Handle legacy format (just timestamp) for backwards compatibility if (is_numeric($data)) { diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index 8c4a52b..b22d5c6 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -116,7 +116,7 @@ interface ChannelManager * @param string|null $channelName * @return PromiseInterface[int] */ - public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface; + public function getLocalConnectionsCount($appId, ?string $channelName = null): PromiseInterface; /** * Get the connections count @@ -126,7 +126,7 @@ interface ChannelManager * @param string|null $channelName * @return PromiseInterface[int] */ - public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface; + public function getGlobalConnectionsCount($appId, ?string $channelName = null): PromiseInterface; /** * Broadcast the message across multiple servers. @@ -138,7 +138,7 @@ interface ChannelManager * @param string|null $serverId * @return PromiseInterface[bool] */ - public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface; + public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, ?string $serverId = null): PromiseInterface; /** * Handle the user when it joined a presence channel. diff --git a/src/Websocket/ChannelManager.php b/src/Websocket/ChannelManager.php index 174abad..d09c14e 100644 --- a/src/Websocket/ChannelManager.php +++ b/src/Websocket/ChannelManager.php @@ -60,9 +60,7 @@ class ChannelManager extends LocalChannelManager $this->loop = $loop; - $this->redis = Redis::connection( - config('websockets.replication.modes.redis.connection', 'default') - ); + $this->redis = Redis::connection($this->getReplicationConnectionName()); $connectionUri = $this->getConnectionUri(); @@ -727,7 +725,7 @@ class ChannelManager extends LocalChannelManager */ protected function getConnectionUri() { - $name = config('websockets.replication.modes.redis.connection', 'default'); + $name = $this->getReplicationConnectionName(); $config = config("database.redis.{$name}"); $host = $config['host']; @@ -748,6 +746,17 @@ class ChannelManager extends LocalChannelManager return "redis://{$host}:{$port}" . ($query ? "?{$query}" : ''); } + /** + * Resolve the Redis connection name used by the active replication mode. + */ + protected function getReplicationConnectionName(): string + { + $mode = config('websockets.replication.mode', 'redis'); + + return config("websockets.replication.modes.{$mode}.connection") + ?? config('websockets.replication.modes.redis.connection', 'default'); + } + /** * Add the Presence Channel's User's Socket ID to a list. *