diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 75b1b7c..9ec36e2 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -31,7 +31,7 @@ class StartServer extends Command protected $signature = 'websockets:serve {--host=0.0.0.0} {--port=6001} - {--cache-driver=file : The cache driver to use for the server. Redis will not work due to concurrency issues.} + {--cache-driver= : Override cache driver (defaults to app config, typically redis). File driver available as fallback.} {--disable-statistics=true : Disable the statistics tracking.} {--statistics-interval= : The amount of seconds to tick between statistics saving.} {--debug : Forces the loggers to be enabled and thereby overriding the APP_DEBUG setting.} @@ -116,9 +116,13 @@ class StartServer extends Command define('LARAVEL_IS_WEBSOCKET', true); } - // Fixes redis concurrency issues - config(['cache.default' => $this->option('cache-driver', 'file')]); - \Log::channel('websocket')->debug('Cache driver configured', ['driver' => $this->option('cache-driver', 'file')]); + // Use app's configured cache driver (typically redis) unless explicitly overridden + // Previously forced to 'file' due to Redis concurrency issues after fork(), + // but now safe because: (1) cache() is only called in parent process, + // (2) child processes purge inherited Redis connections and get fresh ones + $cacheDriver = $this->option('cache-driver') ?: config('cache.default'); + config(['cache.default' => $cacheDriver]); + \Log::channel('websocket')->debug('Cache driver configured', ['driver' => $cacheDriver]); WebsocketService::resetAllTracking(); \Log::channel('websocket')->debug('WebsocketService tracking reset'); diff --git a/src/Services/WebsocketService.php b/src/Services/WebsocketService.php index ac2d265..a3cf0d2 100644 --- a/src/Services/WebsocketService.php +++ b/src/Services/WebsocketService.php @@ -161,15 +161,12 @@ class WebsocketService public static function resetAllTracking() { - $previousCache = config('cache.default'); - config(['cache.default' => 'file']); cache()->forget('ws_active_channels'); cache()->forget('ws_socket_auth'); cache()->forget('ws_socket_auth_users'); cache()->forget('ws_socket_authed_users'); cache()->forget('ws_channel_connections'); cache()->forget('ws_connection'); - config(['cache.default' => $previousCache]); return true; } @@ -177,47 +174,27 @@ class WebsocketService public static function getAuth(string $socketId) { - $previousCache = config('cache.default'); - config(['cache.default' => 'file']); - $r = cache()->get('ws_socket_auth_' . str()->slug($socketId)); - config(['cache.default' => $previousCache]); - return $r; + return cache()->get('ws_socket_auth_' . str()->slug($socketId)); } public static function getChannelConnections(string $channelName): array { - $previousCache = config('cache.default'); - config(['cache.default' => 'file']); - $r = cache()->get('ws_channel_connections_' . $channelName) ?? []; - config(['cache.default' => $previousCache]); - return $r; + return cache()->get('ws_channel_connections_' . $channelName) ?? []; } public static function getActiveChannels(): array { - $previousCache = config('cache.default'); - config(['cache.default' => 'file']); - $r = cache()->get('ws_active_channels') ?? []; - config(['cache.default' => $previousCache]); - return $r; + return cache()->get('ws_active_channels') ?? []; } public static function getConnection(string $socketId) { - $previousCache = config('cache.default'); - config(['cache.default' => 'file']); - $r = cache()->get('ws_connection_' . str()->slug($socketId)); - config(['cache.default' => $previousCache]); - return $r; + return cache()->get('ws_connection_' . str()->slug($socketId)); } public static function getAuthedUsers(): array { - $previousCache = config('cache.default'); - config(['cache.default' => 'file']); - $r = cache()->get('ws_socket_authed_users') ?? []; - config(['cache.default' => $previousCache]); - return $r; + return cache()->get('ws_socket_authed_users') ?? []; } public static function isUserConnected($userId) @@ -243,11 +220,8 @@ class WebsocketService $authed_users = static::getAuthedUsers(); $authed_users[$socketId] = $user->id; - $previousCache = config('cache.default'); - config(['cache.default' => 'file']); cache()->forever('ws_socket_authed_users', $authed_users); cache()->forever('ws_socket_auth_' . str()->slug($socketId), $user); - config(['cache.default' => $previousCache]); return static::getAuthedUsers(); } @@ -257,11 +231,8 @@ class WebsocketService $authed_users = static::getAuthedUsers(); unset($authed_users[$socketId]); - $previousCache = config('cache.default'); - config(['cache.default' => 'file']); cache()->forever('ws_socket_authed_users', $authed_users); cache()->forget('ws_socket_auth_' . str()->slug($socketId)); - config(['cache.default' => $previousCache]); return static::getAuthedUsers(); } diff --git a/src/Websocket/ConnectionSession.php b/src/Websocket/ConnectionSession.php new file mode 100644 index 0000000..f6bc3f7 --- /dev/null +++ b/src/Websocket/ConnectionSession.php @@ -0,0 +1,180 @@ +put('key', $value); + * wsSession()->get('key', 'default'); + * wsSession()->forget('key'); + * wsSession()->all(); + * + * Lifecycle: + * - Created on connection open (empty) + * - Loaded from Redis in each child process + * - Auto-saved before child exits (or explicitly via save()) + * - Flushed on connection close + */ +class ConnectionSession +{ + private string $cacheKey; + private array $data = []; + private bool $loaded = false; + private bool $dirty = false; + + /** @var int TTL in seconds (24 hours — safety net if onClose cleanup misses) */ + private const TTL = 86400; + + public function __construct( + private readonly string $socketId + ) { + $this->cacheKey = 'ws_session_' . $socketId; + } + + /** + * Lazy-load session data from Redis on first access. + */ + private function load(): void + { + if ($this->loaded) { + return; + } + + $this->data = cache()->get($this->cacheKey) ?? []; + $this->loaded = true; + } + + /** + * Get a value from the session. + */ + public function get(string $key, mixed $default = null): mixed + { + $this->load(); + return $this->data[$key] ?? $default; + } + + /** + * Store a value in the session. + */ + public function put(string $key, mixed $value): static + { + $this->load(); + $this->data[$key] = $value; + $this->dirty = true; + return $this; + } + + /** + * Check if a key exists in the session. + */ + public function has(string $key): bool + { + $this->load(); + return array_key_exists($key, $this->data); + } + + /** + * Remove a key from the session. + */ + public function forget(string $key): static + { + $this->load(); + if (array_key_exists($key, $this->data)) { + unset($this->data[$key]); + $this->dirty = true; + } + return $this; + } + + /** + * Get all session data. + */ + public function all(): array + { + $this->load(); + return $this->data; + } + + /** + * Replace all session data. + */ + public function replace(array $data): static + { + $this->load(); + $this->data = $data; + $this->dirty = true; + return $this; + } + + /** + * Increment a numeric value. + */ + public function increment(string $key, int $amount = 1): int + { + $value = (int) $this->get($key, 0) + $amount; + $this->put($key, $value); + return $value; + } + + /** + * Save session data to Redis (only if modified). + */ + public function save(): void + { + if (!$this->dirty) { + return; + } + + if (empty($this->data)) { + cache()->forget($this->cacheKey); + } else { + cache()->put($this->cacheKey, $this->data, self::TTL); + } + + $this->dirty = false; + } + + /** + * Flush all session data and remove from Redis. + */ + public function flush(): void + { + $this->data = []; + $this->dirty = false; + $this->loaded = true; + cache()->forget($this->cacheKey); + } + + /** + * Check if session has unsaved changes. + */ + public function isDirty(): bool + { + return $this->dirty; + } + + /** + * Check if session is empty. + */ + public function isEmpty(): bool + { + $this->load(); + return empty($this->data); + } + + /** + * Get the socket ID this session belongs to. + */ + public function getSocketId(): string + { + return $this->socketId; + } +} diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index 34a81d5..4121195 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -270,6 +270,9 @@ class Handler implements MessageComponentInterface $this->cleanupChannelConnections($connection); $this->finalizeConnectionClose($connection); + + // Clean up per-connection session from Redis + cache()->forget('ws_session_' . $connection->socketId); } @@ -492,8 +495,20 @@ class Handler implements MessageComponentInterface // This saves ~5-15ms for methods that don't use the database DB::disconnect(); + // Purge inherited Redis/cache connections from parent process. + // After fork(), child inherits parent's Redis socket fd — using it + // would corrupt parent's protocol state. Purging forces fresh + // connections on next cache() call (predis connects lazily). + app()->forgetInstance('cache'); + app()->forgetInstance('cache.store'); + app()->forgetInstance('redis'); + $this->setRequest($message, $connection); + // Set up per-connection session (backed by Redis) + $session = new ConnectionSession($connection->socketId); + app()->instance('ws.session', $session); + // Create mock that sends via socket pair $mock = new MockConnectionSocketPair($connection, $ipc); @@ -507,6 +522,9 @@ class Handler implements MessageComponentInterface \Illuminate\Container\Container::getInstance() ->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class) ->invokeWhen(fn($callback) => true); + + // Persist session changes to Redis before exit + $session->save(); } catch (Exception $e) { // Send error via socket pair $ipc->sendToParent(json_encode([ diff --git a/src/helpers_global.php b/src/helpers_global.php index b7ddcdf..b57dad8 100644 --- a/src/helpers_global.php +++ b/src/helpers_global.php @@ -100,3 +100,36 @@ if (!function_exists('ws_available')) { return BroadcastClient::instance()->isAvailable(); } } + +if (!function_exists('wsSession')) { + /** + * Get the current WebSocket connection's session. + * + * Returns a per-connection key-value store backed by Redis. + * Available in child processes during message handling. + * Data persists across messages for the same connection. + * + * @return \BlaxSoftware\LaravelWebSockets\Websocket\ConnectionSession|null + * + * @example + * // Store data + * wsSession()->put('last_action', 'transmitted'); + * wsSession()->put('transmit_count', 0); + * + * // Retrieve data (persists across messages) + * $count = wsSession()->get('transmit_count', 0); + * wsSession()->put('transmit_count', $count + 1); + * + * // Or use increment helper + * wsSession()->increment('transmit_count'); + * + * // Check & remove + * if (wsSession()->has('pending_action')) { + * wsSession()->forget('pending_action'); + * } + */ + function wsSession(): ?\BlaxSoftware\LaravelWebSockets\Websocket\ConnectionSession + { + return app()->bound('ws.session') ? app('ws.session') : null; + } +}