R cache, connection session, performance

This commit is contained in:
Fabian @ Blax Software 2026-02-09 13:50:42 +01:00
parent 7c1fca5c38
commit 2a47591907
5 changed files with 244 additions and 38 deletions

View File

@ -31,7 +31,7 @@ class StartServer extends Command
protected $signature = 'websockets:serve protected $signature = 'websockets:serve
{--host=0.0.0.0} {--host=0.0.0.0}
{--port=6001} {--port=6001}
{--cache-driver=file : The cache driver to use for the server. Redis will not work due to concurrency issues.} {--cache-driver= : Override cache driver (defaults to app config, typically redis). File driver available as fallback.}
{--disable-statistics=true : Disable the statistics tracking.} {--disable-statistics=true : Disable the statistics tracking.}
{--statistics-interval= : The amount of seconds to tick between statistics saving.} {--statistics-interval= : The amount of seconds to tick between statistics saving.}
{--debug : Forces the loggers to be enabled and thereby overriding the APP_DEBUG setting.} {--debug : Forces the loggers to be enabled and thereby overriding the APP_DEBUG setting.}
@ -116,9 +116,13 @@ class StartServer extends Command
define('LARAVEL_IS_WEBSOCKET', true); define('LARAVEL_IS_WEBSOCKET', true);
} }
// Fixes redis concurrency issues // Use app's configured cache driver (typically redis) unless explicitly overridden
config(['cache.default' => $this->option('cache-driver', 'file')]); // Previously forced to 'file' due to Redis concurrency issues after fork(),
\Log::channel('websocket')->debug('Cache driver configured', ['driver' => $this->option('cache-driver', 'file')]); // but now safe because: (1) cache() is only called in parent process,
// (2) child processes purge inherited Redis connections and get fresh ones
$cacheDriver = $this->option('cache-driver') ?: config('cache.default');
config(['cache.default' => $cacheDriver]);
\Log::channel('websocket')->debug('Cache driver configured', ['driver' => $cacheDriver]);
WebsocketService::resetAllTracking(); WebsocketService::resetAllTracking();
\Log::channel('websocket')->debug('WebsocketService tracking reset'); \Log::channel('websocket')->debug('WebsocketService tracking reset');

View File

@ -161,15 +161,12 @@ class WebsocketService
public static function resetAllTracking() public static function resetAllTracking()
{ {
$previousCache = config('cache.default');
config(['cache.default' => 'file']);
cache()->forget('ws_active_channels'); cache()->forget('ws_active_channels');
cache()->forget('ws_socket_auth'); cache()->forget('ws_socket_auth');
cache()->forget('ws_socket_auth_users'); cache()->forget('ws_socket_auth_users');
cache()->forget('ws_socket_authed_users'); cache()->forget('ws_socket_authed_users');
cache()->forget('ws_channel_connections'); cache()->forget('ws_channel_connections');
cache()->forget('ws_connection'); cache()->forget('ws_connection');
config(['cache.default' => $previousCache]);
return true; return true;
} }
@ -177,47 +174,27 @@ class WebsocketService
public static function getAuth(string $socketId) public static function getAuth(string $socketId)
{ {
$previousCache = config('cache.default'); return cache()->get('ws_socket_auth_' . str()->slug($socketId));
config(['cache.default' => 'file']);
$r = cache()->get('ws_socket_auth_' . str()->slug($socketId));
config(['cache.default' => $previousCache]);
return $r;
} }
public static function getChannelConnections(string $channelName): array public static function getChannelConnections(string $channelName): array
{ {
$previousCache = config('cache.default'); return cache()->get('ws_channel_connections_' . $channelName) ?? [];
config(['cache.default' => 'file']);
$r = cache()->get('ws_channel_connections_' . $channelName) ?? [];
config(['cache.default' => $previousCache]);
return $r;
} }
public static function getActiveChannels(): array public static function getActiveChannels(): array
{ {
$previousCache = config('cache.default'); return cache()->get('ws_active_channels') ?? [];
config(['cache.default' => 'file']);
$r = cache()->get('ws_active_channels') ?? [];
config(['cache.default' => $previousCache]);
return $r;
} }
public static function getConnection(string $socketId) public static function getConnection(string $socketId)
{ {
$previousCache = config('cache.default'); return cache()->get('ws_connection_' . str()->slug($socketId));
config(['cache.default' => 'file']);
$r = cache()->get('ws_connection_' . str()->slug($socketId));
config(['cache.default' => $previousCache]);
return $r;
} }
public static function getAuthedUsers(): array public static function getAuthedUsers(): array
{ {
$previousCache = config('cache.default'); return cache()->get('ws_socket_authed_users') ?? [];
config(['cache.default' => 'file']);
$r = cache()->get('ws_socket_authed_users') ?? [];
config(['cache.default' => $previousCache]);
return $r;
} }
public static function isUserConnected($userId) public static function isUserConnected($userId)
@ -243,11 +220,8 @@ class WebsocketService
$authed_users = static::getAuthedUsers(); $authed_users = static::getAuthedUsers();
$authed_users[$socketId] = $user->id; $authed_users[$socketId] = $user->id;
$previousCache = config('cache.default');
config(['cache.default' => 'file']);
cache()->forever('ws_socket_authed_users', $authed_users); cache()->forever('ws_socket_authed_users', $authed_users);
cache()->forever('ws_socket_auth_' . str()->slug($socketId), $user); cache()->forever('ws_socket_auth_' . str()->slug($socketId), $user);
config(['cache.default' => $previousCache]);
return static::getAuthedUsers(); return static::getAuthedUsers();
} }
@ -257,11 +231,8 @@ class WebsocketService
$authed_users = static::getAuthedUsers(); $authed_users = static::getAuthedUsers();
unset($authed_users[$socketId]); unset($authed_users[$socketId]);
$previousCache = config('cache.default');
config(['cache.default' => 'file']);
cache()->forever('ws_socket_authed_users', $authed_users); cache()->forever('ws_socket_authed_users', $authed_users);
cache()->forget('ws_socket_auth_' . str()->slug($socketId)); cache()->forget('ws_socket_auth_' . str()->slug($socketId));
config(['cache.default' => $previousCache]);
return static::getAuthedUsers(); return static::getAuthedUsers();
} }

View File

@ -0,0 +1,180 @@
<?php
declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Websocket;
/**
* Per-connection session storage backed by Redis cache.
*
* Provides key-value storage scoped to a WebSocket connection.
* Data persists across forked child processes (each child loads
* from Redis, modifies, saves back next child sees changes).
*
* Usage in controllers:
* wsSession()->put('key', $value);
* wsSession()->get('key', 'default');
* wsSession()->forget('key');
* wsSession()->all();
*
* Lifecycle:
* - Created on connection open (empty)
* - Loaded from Redis in each child process
* - Auto-saved before child exits (or explicitly via save())
* - Flushed on connection close
*/
class ConnectionSession
{
private string $cacheKey;
private array $data = [];
private bool $loaded = false;
private bool $dirty = false;
/** @var int TTL in seconds (24 hours — safety net if onClose cleanup misses) */
private const TTL = 86400;
public function __construct(
private readonly string $socketId
) {
$this->cacheKey = 'ws_session_' . $socketId;
}
/**
* Lazy-load session data from Redis on first access.
*/
private function load(): void
{
if ($this->loaded) {
return;
}
$this->data = cache()->get($this->cacheKey) ?? [];
$this->loaded = true;
}
/**
* Get a value from the session.
*/
public function get(string $key, mixed $default = null): mixed
{
$this->load();
return $this->data[$key] ?? $default;
}
/**
* Store a value in the session.
*/
public function put(string $key, mixed $value): static
{
$this->load();
$this->data[$key] = $value;
$this->dirty = true;
return $this;
}
/**
* Check if a key exists in the session.
*/
public function has(string $key): bool
{
$this->load();
return array_key_exists($key, $this->data);
}
/**
* Remove a key from the session.
*/
public function forget(string $key): static
{
$this->load();
if (array_key_exists($key, $this->data)) {
unset($this->data[$key]);
$this->dirty = true;
}
return $this;
}
/**
* Get all session data.
*/
public function all(): array
{
$this->load();
return $this->data;
}
/**
* Replace all session data.
*/
public function replace(array $data): static
{
$this->load();
$this->data = $data;
$this->dirty = true;
return $this;
}
/**
* Increment a numeric value.
*/
public function increment(string $key, int $amount = 1): int
{
$value = (int) $this->get($key, 0) + $amount;
$this->put($key, $value);
return $value;
}
/**
* Save session data to Redis (only if modified).
*/
public function save(): void
{
if (!$this->dirty) {
return;
}
if (empty($this->data)) {
cache()->forget($this->cacheKey);
} else {
cache()->put($this->cacheKey, $this->data, self::TTL);
}
$this->dirty = false;
}
/**
* Flush all session data and remove from Redis.
*/
public function flush(): void
{
$this->data = [];
$this->dirty = false;
$this->loaded = true;
cache()->forget($this->cacheKey);
}
/**
* Check if session has unsaved changes.
*/
public function isDirty(): bool
{
return $this->dirty;
}
/**
* Check if session is empty.
*/
public function isEmpty(): bool
{
$this->load();
return empty($this->data);
}
/**
* Get the socket ID this session belongs to.
*/
public function getSocketId(): string
{
return $this->socketId;
}
}

View File

@ -270,6 +270,9 @@ class Handler implements MessageComponentInterface
$this->cleanupChannelConnections($connection); $this->cleanupChannelConnections($connection);
$this->finalizeConnectionClose($connection); $this->finalizeConnectionClose($connection);
// Clean up per-connection session from Redis
cache()->forget('ws_session_' . $connection->socketId);
} }
@ -492,8 +495,20 @@ class Handler implements MessageComponentInterface
// This saves ~5-15ms for methods that don't use the database // This saves ~5-15ms for methods that don't use the database
DB::disconnect(); DB::disconnect();
// Purge inherited Redis/cache connections from parent process.
// After fork(), child inherits parent's Redis socket fd — using it
// would corrupt parent's protocol state. Purging forces fresh
// connections on next cache() call (predis connects lazily).
app()->forgetInstance('cache');
app()->forgetInstance('cache.store');
app()->forgetInstance('redis');
$this->setRequest($message, $connection); $this->setRequest($message, $connection);
// Set up per-connection session (backed by Redis)
$session = new ConnectionSession($connection->socketId);
app()->instance('ws.session', $session);
// Create mock that sends via socket pair // Create mock that sends via socket pair
$mock = new MockConnectionSocketPair($connection, $ipc); $mock = new MockConnectionSocketPair($connection, $ipc);
@ -507,6 +522,9 @@ class Handler implements MessageComponentInterface
\Illuminate\Container\Container::getInstance() \Illuminate\Container\Container::getInstance()
->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class) ->make(\Illuminate\Support\Defer\DeferredCallbackCollection::class)
->invokeWhen(fn($callback) => true); ->invokeWhen(fn($callback) => true);
// Persist session changes to Redis before exit
$session->save();
} catch (Exception $e) { } catch (Exception $e) {
// Send error via socket pair // Send error via socket pair
$ipc->sendToParent(json_encode([ $ipc->sendToParent(json_encode([

View File

@ -100,3 +100,36 @@ if (!function_exists('ws_available')) {
return BroadcastClient::instance()->isAvailable(); return BroadcastClient::instance()->isAvailable();
} }
} }
if (!function_exists('wsSession')) {
/**
* Get the current WebSocket connection's session.
*
* Returns a per-connection key-value store backed by Redis.
* Available in child processes during message handling.
* Data persists across messages for the same connection.
*
* @return \BlaxSoftware\LaravelWebSockets\Websocket\ConnectionSession|null
*
* @example
* // Store data
* wsSession()->put('last_action', 'transmitted');
* wsSession()->put('transmit_count', 0);
*
* // Retrieve data (persists across messages)
* $count = wsSession()->get('transmit_count', 0);
* wsSession()->put('transmit_count', $count + 1);
*
* // Or use increment helper
* wsSession()->increment('transmit_count');
*
* // Check & remove
* if (wsSession()->has('pending_action')) {
* wsSession()->forget('pending_action');
* }
*/
function wsSession(): ?\BlaxSoftware\LaravelWebSockets\Websocket\ConnectionSession
{
return app()->bound('ws.session') ? app('ws.session') : null;
}
}