BF nullable deprecations

This commit is contained in:
Fabian @ Blax Software 2026-04-23 11:56:50 +02:00
parent 90e51c1bdf
commit a3194247c4
4 changed files with 108 additions and 21 deletions

View File

@ -272,7 +272,7 @@ class LocalChannelManager implements ChannelManager
* @param string|null $channelName * @param string|null $channelName
* @return PromiseInterface[int] * @return PromiseInterface[int]
*/ */
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface public function getLocalConnectionsCount($appId, ?string $channelName = null): PromiseInterface
{ {
return $this->getLocalChannels($appId) return $this->getLocalChannels($appId)
->then(function ($channels) use ($channelName) { ->then(function ($channels) use ($channelName) {
@ -296,7 +296,7 @@ class LocalChannelManager implements ChannelManager
* @param string|null $channelName * @param string|null $channelName
* @return PromiseInterface[int] * @return PromiseInterface[int]
*/ */
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface public function getGlobalConnectionsCount($appId, ?string $channelName = null): PromiseInterface
{ {
return $this->getLocalConnectionsCount($appId, $channelName); return $this->getLocalConnectionsCount($appId, $channelName);
} }
@ -311,7 +311,7 @@ class LocalChannelManager implements ChannelManager
* @param string|null $serverId * @param string|null $serverId
* @return PromiseInterface[bool] * @return PromiseInterface[bool]
*/ */
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, ?string $serverId = null): PromiseInterface
{ {
return Helpers::createFulfilledPromise(true); return Helpers::createFulfilledPromise(true);
} }

View File

@ -4,6 +4,7 @@ namespace BlaxSoftware\LaravelWebSockets\Console\Commands;
use BlaxSoftware\LaravelWebSockets\Broadcast\BroadcastSocketServer; use BlaxSoftware\LaravelWebSockets\Broadcast\BroadcastSocketServer;
use BlaxSoftware\LaravelWebSockets\Cache\IpcCache; use BlaxSoftware\LaravelWebSockets\Cache\IpcCache;
use BlaxSoftware\LaravelWebSockets\ChannelManagers\LocalChannelManager;
use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager; use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager;
use BlaxSoftware\LaravelWebSockets\Facades\WebSocketRouter; use BlaxSoftware\LaravelWebSockets\Facades\WebSocketRouter;
use BlaxSoftware\LaravelWebSockets\Ipc\SocketPairIpc; use BlaxSoftware\LaravelWebSockets\Ipc\SocketPairIpc;
@ -101,10 +102,8 @@ class StartServer extends Command
/** /**
* Run the command. * Run the command.
*
* @return void
*/ */
public function handle() public function handle(): int
{ {
try { try {
\Log::channel('websocket')->debug('websockets:serve command started', [ \Log::channel('websocket')->debug('websockets:serve command started', [
@ -134,9 +133,13 @@ class StartServer extends Command
// but now safe because: (1) cache() is only called in parent process, // but now safe because: (1) cache() is only called in parent process,
// (2) child processes purge inherited Redis connections and get fresh ones // (2) child processes purge inherited Redis connections and get fresh ones
$cacheDriver = $this->option('cache-driver') ?: config('cache.default'); $cacheDriver = $this->option('cache-driver') ?: config('cache.default');
config(['cache.default' => $cacheDriver]); $resolvedCacheDriver = $this->resolveCacheStore($cacheDriver);
$this->cacheStore = $cacheDriver; config(['cache.default' => $resolvedCacheDriver]);
\Log::channel('websocket')->debug('Cache driver configured', ['driver' => $cacheDriver]); $this->cacheStore = $resolvedCacheDriver;
\Log::channel('websocket')->debug('Cache driver configured', [
'driver' => $cacheDriver,
'resolved_driver' => $resolvedCacheDriver,
]);
WebsocketService::resetAllTracking(); WebsocketService::resetAllTracking();
\Log::channel('websocket')->debug('WebsocketService tracking reset'); \Log::channel('websocket')->debug('WebsocketService tracking reset');
@ -182,6 +185,8 @@ class StartServer extends Command
\Log::channel('websocket')->debug('Starting server...'); \Log::channel('websocket')->debug('Starting server...');
$this->startServer(); $this->startServer();
return self::SUCCESS;
} catch (\Throwable $e) { } catch (\Throwable $e) {
$this->error('Error starting the WebSocket server: ' . $e->getMessage()); $this->error('Error starting the WebSocket server: ' . $e->getMessage());
@ -193,6 +198,8 @@ class StartServer extends Command
if (app()->bound('sentry')) { if (app()->bound('sentry')) {
app('sentry')->captureException($e); app('sentry')->captureException($e);
} }
return self::FAILURE;
} }
} }
@ -223,17 +230,60 @@ class StartServer extends Command
$config = $app['config']['websockets']; $config = $app['config']['websockets'];
$mode = $config['replication']['mode'] ?? 'local'; $mode = $config['replication']['mode'] ?? 'local';
$class = $config['replication']['modes'][$mode]['channel_manager']; $class = $config['replication']['modes'][$mode]['channel_manager'] ?? LocalChannelManager::class;
\Log::channel('websocket')->debug('Creating ChannelManager', [ \Log::channel('websocket')->debug('Creating ChannelManager', [
'mode' => $mode, 'mode' => $mode,
'class' => $class, 'class' => $class,
]); ]);
return new $class($this->loop); try {
return new $class($this->loop);
} catch (\Throwable $e) {
if ($class === LocalChannelManager::class) {
throw $e;
}
$connectionName = $config['replication']['modes'][$mode]['connection'] ?? 'default';
$redisHost = config("database.redis.{$connectionName}.host", 'unknown');
$redisPort = config("database.redis.{$connectionName}.port", 6379);
\Log::channel('websocket')->error('Failed to initialize replication channel manager; falling back to local mode.', [
'mode' => $mode,
'class' => $class,
'connection' => $connectionName,
'redis_host' => $redisHost,
'redis_port' => $redisPort,
'error' => $e->getMessage(),
]);
$this->components->warn('Replication manager unavailable; using local channel manager fallback.');
return new LocalChannelManager($this->loop);
}
}); });
} }
/**
* Resolve cache store and gracefully fallback to file if unavailable.
*/
protected function resolveCacheStore(string $driver): string
{
try {
Cache::store($driver)->get('blax:websockets:cache-probe');
return $driver;
} catch (\Throwable $e) {
\Log::channel('websocket')->warning('Configured cache store unavailable, falling back to file.', [
'configured_store' => $driver,
'fallback_store' => 'file',
'error' => $e->getMessage(),
]);
return 'file';
}
}
/** /**
* Register the Statistics Collectors that * Register the Statistics Collectors that
* are not resolved in the package service provider. * are not resolved in the package service provider.
@ -302,7 +352,7 @@ class StartServer extends Command
*/ */
public function configureSteerTimer(): void public function configureSteerTimer(): void
{ {
$steerData = Cache::store($this->cacheStore)->get('blax:websockets:steer'); $steerData = $this->getSteerData();
$this->lastSteer = $steerData['time'] ?? null; $this->lastSteer = $steerData['time'] ?? null;
\Log::channel('websocket')->debug('Steer timer configured', [ \Log::channel('websocket')->debug('Steer timer configured', [
@ -310,7 +360,7 @@ class StartServer extends Command
]); ]);
$this->loop->addPeriodicTimer(5, function () { $this->loop->addPeriodicTimer(5, function () {
$steerData = Cache::store($this->cacheStore)->get('blax:websockets:steer'); $steerData = $this->getSteerData();
$currentSteer = $steerData['time'] ?? null; $currentSteer = $steerData['time'] ?? null;
if ($currentSteer !== null && $currentSteer !== $this->lastSteer) { if ($currentSteer !== null && $currentSteer !== $this->lastSteer) {
@ -323,6 +373,25 @@ class StartServer extends Command
}); });
} }
/**
* Read steer signal from cache without crashing the loop on transient outages.
*/
protected function getSteerData(): array
{
try {
$data = Cache::store($this->cacheStore)->get('blax:websockets:steer');
return is_array($data) ? $data : [];
} catch (\Throwable $e) {
\Log::channel('websocket')->warning('Unable to read websocket steer signal from cache store.', [
'store' => $this->cacheStore,
'error' => $e->getMessage(),
]);
return [];
}
}
/** /**
* Execute a steer action received via the cache signal. * Execute a steer action received via the cache signal.
*/ */
@ -745,7 +814,16 @@ class StartServer extends Command
*/ */
protected function getLastRestartData() protected function getLastRestartData()
{ {
$data = Cache::store($this->cacheStore)->get('blax:websockets:restart'); try {
$data = Cache::store($this->cacheStore)->get('blax:websockets:restart');
} catch (\Throwable $e) {
\Log::channel('websocket')->warning('Unable to read websocket restart signal from cache store.', [
'store' => $this->cacheStore,
'error' => $e->getMessage(),
]);
return ['time' => null, 'soft' => false];
}
// Handle legacy format (just timestamp) for backwards compatibility // Handle legacy format (just timestamp) for backwards compatibility
if (is_numeric($data)) { if (is_numeric($data)) {

View File

@ -116,7 +116,7 @@ interface ChannelManager
* @param string|null $channelName * @param string|null $channelName
* @return PromiseInterface[int] * @return PromiseInterface[int]
*/ */
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface; public function getLocalConnectionsCount($appId, ?string $channelName = null): PromiseInterface;
/** /**
* Get the connections count * Get the connections count
@ -126,7 +126,7 @@ interface ChannelManager
* @param string|null $channelName * @param string|null $channelName
* @return PromiseInterface[int] * @return PromiseInterface[int]
*/ */
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface; public function getGlobalConnectionsCount($appId, ?string $channelName = null): PromiseInterface;
/** /**
* Broadcast the message across multiple servers. * Broadcast the message across multiple servers.
@ -138,7 +138,7 @@ interface ChannelManager
* @param string|null $serverId * @param string|null $serverId
* @return PromiseInterface[bool] * @return PromiseInterface[bool]
*/ */
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface; public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, ?string $serverId = null): PromiseInterface;
/** /**
* Handle the user when it joined a presence channel. * Handle the user when it joined a presence channel.

View File

@ -60,9 +60,7 @@ class ChannelManager extends LocalChannelManager
$this->loop = $loop; $this->loop = $loop;
$this->redis = Redis::connection( $this->redis = Redis::connection($this->getReplicationConnectionName());
config('websockets.replication.modes.redis.connection', 'default')
);
$connectionUri = $this->getConnectionUri(); $connectionUri = $this->getConnectionUri();
@ -727,7 +725,7 @@ class ChannelManager extends LocalChannelManager
*/ */
protected function getConnectionUri() protected function getConnectionUri()
{ {
$name = config('websockets.replication.modes.redis.connection', 'default'); $name = $this->getReplicationConnectionName();
$config = config("database.redis.{$name}"); $config = config("database.redis.{$name}");
$host = $config['host']; $host = $config['host'];
@ -748,6 +746,17 @@ class ChannelManager extends LocalChannelManager
return "redis://{$host}:{$port}" . ($query ? "?{$query}" : ''); return "redis://{$host}:{$port}" . ($query ? "?{$query}" : '');
} }
/**
* Resolve the Redis connection name used by the active replication mode.
*/
protected function getReplicationConnectionName(): string
{
$mode = config('websockets.replication.mode', 'redis');
return config("websockets.replication.modes.{$mode}.connection")
?? config('websockets.replication.modes.redis.connection', 'default');
}
/** /**
* Add the Presence Channel's User's Socket ID to a list. * Add the Presence Channel's User's Socket ID to a list.
* *