RI websocket performance, BF race condition

This commit is contained in:
Fabian @ Blax Software 2026-01-24 12:17:46 +01:00
parent 71e0d44e30
commit 6ce8573fed
8 changed files with 682 additions and 229 deletions

212
src/Cache/IpcCache.php Normal file
View File

@ -0,0 +1,212 @@
<?php
declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Cache;
/**
* High-performance IPC cache using tmpfs (RAM-backed filesystem).
*
* This provides near-memory speeds for inter-process communication
* without the complexity of shared memory segments (shmop).
*
* On Linux, /dev/shm is mounted as tmpfs (RAM).
* Falls back to /tmp if /dev/shm is not available.
*/
class IpcCache
{
/**
* Base directory for IPC files
*/
private static ?string $baseDir = null;
/**
* Whether we're using tmpfs (RAM-backed)
*/
private static ?bool $isTmpfs = null;
/**
* Initialize the base directory
*/
private static function init(): void
{
if (self::$baseDir !== null) {
return;
}
// Prefer /dev/shm (RAM-backed on Linux)
if (is_dir('/dev/shm') && is_writable('/dev/shm')) {
self::$baseDir = '/dev/shm/laravel-ws-ipc';
self::$isTmpfs = true;
} else {
// Fall back to /tmp (may or may not be tmpfs)
self::$baseDir = '/tmp/laravel-ws-ipc';
self::$isTmpfs = false;
}
if (!is_dir(self::$baseDir)) {
@mkdir(self::$baseDir, 0755, true);
}
}
/**
* Get the file path for a cache key
*/
private static function getPath(string $key): string
{
self::init();
// Use hash to avoid filesystem issues with special characters
return self::$baseDir . '/' . md5($key);
}
/**
* Check if a key exists (file stat only - very fast)
*/
public static function has(string $key): bool
{
return file_exists(self::getPath($key));
}
/**
* Get a value from cache
*
* @return mixed|null Returns null if not found
*/
public static function get(string $key): mixed
{
$path = self::getPath($key);
if (!file_exists($path)) {
return null;
}
$content = @file_get_contents($path);
if ($content === false) {
return null;
}
// Check expiration (stored as first 10 bytes)
$expireAt = (int) substr($content, 0, 10);
if ($expireAt > 0 && $expireAt < time()) {
@unlink($path);
return null;
}
$data = substr($content, 10);
return $data === '' ? null : unserialize($data);
}
/**
* Set a value in cache
*
* @param string $key
* @param mixed $value
* @param int $ttl Time to live in seconds (0 = no expiry)
*/
public static function put(string $key, mixed $value, int $ttl = 60): bool
{
$path = self::getPath($key);
$expireAt = $ttl > 0 ? time() + $ttl : 0;
// Format: 10 bytes for expiry timestamp + serialized data
$content = sprintf('%010d', $expireAt) . serialize($value);
// Atomic write: write to temp file then rename
$tempPath = $path . '.' . getmypid();
if (@file_put_contents($tempPath, $content) === false) {
return false;
}
return @rename($tempPath, $path);
}
/**
* Delete a key from cache
*/
public static function forget(string $key): bool
{
$path = self::getPath($key);
if (file_exists($path)) {
return @unlink($path);
}
return true;
}
/**
* Delete multiple keys from cache
*/
public static function forgetMultiple(array $keys): void
{
foreach ($keys as $key) {
self::forget($key);
}
}
/**
* Set multiple values atomically
*
* @param array<string, mixed> $values Key => Value pairs
* @param int $ttl Time to live in seconds
*/
public static function putMultiple(array $values, int $ttl = 60): void
{
foreach ($values as $key => $value) {
self::put($key, $value, $ttl);
}
}
/**
* Clean up expired cache files (call periodically)
*/
public static function cleanup(): int
{
self::init();
$cleaned = 0;
$now = time();
$files = @scandir(self::$baseDir);
if ($files === false) {
return 0;
}
foreach ($files as $file) {
if ($file === '.' || $file === '..') {
continue;
}
$path = self::$baseDir . '/' . $file;
$content = @file_get_contents($path);
if ($content === false) {
continue;
}
$expireAt = (int) substr($content, 0, 10);
if ($expireAt > 0 && $expireAt < $now) {
@unlink($path);
$cleaned++;
}
}
return $cleaned;
}
/**
* Check if we're using RAM-backed storage
*/
public static function isTmpfs(): bool
{
self::init();
return self::$isTmpfs ?? false;
}
/**
* Reset (for testing)
*/
public static function reset(): void
{
self::$baseDir = null;
self::$isTmpfs = null;
}
}

View File

@ -424,17 +424,23 @@ class LocalChannelManager implements ChannelManager
/** /**
* Keep tracking the connections availability when they pong. * Keep tracking the connections availability when they pong.
* Optimized: Uses unix timestamp directly instead of Carbon for performance.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return PromiseInterface[bool] * @return PromiseInterface[bool]
*/ */
public function connectionPonged(ConnectionInterface $connection): PromiseInterface public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{ {
return $this->pongConnectionInChannels($connection); // Direct timestamp update on connection object - no channel iteration needed
// The connection object is passed by reference, so this updates it everywhere
$connection->lastPongedAt = time();
return Helpers::createFulfilledPromise(true);
} }
/** /**
* Remove the obsolete connections that didn't ponged in a while. * Remove the obsolete connections that didn't ponged in a while.
* Optimized: Uses unix timestamp comparison instead of Carbon.
* *
* @return PromiseInterface[bool] * @return PromiseInterface[bool]
*/ */
@ -444,9 +450,16 @@ class LocalChannelManager implements ChannelManager
return $this->getLocalConnections() return $this->getLocalConnections()
->then(function ($connections) { ->then(function ($connections) {
$promises = []; $promises = [];
$now = time();
foreach ($connections as $connection) { foreach ($connections as $connection) {
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); // Handle both Carbon objects (legacy) and unix timestamps (optimized)
$lastPong = $connection->lastPongedAt ?? 0;
if (is_object($lastPong)) {
$differenceInSeconds = $lastPong->diffInSeconds(Carbon::now());
} else {
$differenceInSeconds = $now - (int) $lastPong;
}
if ($differenceInSeconds > 120) { if ($differenceInSeconds > 120) {
$promises[] = $this->unsubscribeFromAllChannels($connection); $promises[] = $this->unsubscribeFromAllChannels($connection);
@ -462,23 +475,17 @@ class LocalChannelManager implements ChannelManager
/** /**
* Pong connection in channels. * Pong connection in channels.
* Optimized: No longer iterates through channels - timestamp is on connection object.
* *
* @param ConnectionInterface $connection * @param ConnectionInterface $connection
* @return PromiseInterface[bool] * @return PromiseInterface[bool]
*/ */
public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface
{ {
return $this->getLocalChannels($connection->app->id) // Simply update timestamp on the connection object directly
->then(function ($channels) use ($connection) { $connection->lastPongedAt = time();
foreach ($channels as $channel) {
if ($conn = $channel->getConnection($connection->socketId)) {
$conn->lastPongedAt = Carbon::now();
$channel->saveConnection($conn);
}
}
return true; return Helpers::createFulfilledPromise(true);
});
} }
/** /**
@ -526,21 +533,23 @@ class LocalChannelManager implements ChannelManager
/** /**
* Get the channel class by the channel name. * Get the channel class by the channel name.
* Optimized: Direct string comparison instead of Str::startsWith
* *
* @param string $channelName * @param string $channelName
* @return string * @return string
*/ */
protected function getChannelClassName(string $channelName): string protected function getChannelClassName(string $channelName): string
{ {
if (Str::startsWith($channelName, 'private-')) { // Direct strncmp is faster than Str::startsWith for fixed prefixes
if (strncmp($channelName, 'private-', 8) === 0) {
return PrivateChannel::class; return PrivateChannel::class;
} }
if (Str::startsWith($channelName, 'presence-')) { if (strncmp($channelName, 'presence-', 9) === 0) {
return PresenceChannel::class; return PresenceChannel::class;
} }
if (Str::startsWith($channelName, 'openpresence-')) { if (strncmp($channelName, 'openpresence-', 13) === 0) {
return OpenPresenceChannel::class; return OpenPresenceChannel::class;
} }

View File

@ -9,27 +9,18 @@ class DashboardLogger
const LOG_CHANNEL_PREFIX = 'private-websockets-dashboard-'; const LOG_CHANNEL_PREFIX = 'private-websockets-dashboard-';
const TYPE_DISCONNECTED = 'disconnected'; const TYPE_DISCONNECTED = 'disconnected';
const TYPE_CONNECTED = 'connected'; const TYPE_CONNECTED = 'connected';
const TYPE_SUBSCRIBED = 'subscribed'; const TYPE_SUBSCRIBED = 'subscribed';
const TYPE_WS_MESSAGE = 'ws-message'; const TYPE_WS_MESSAGE = 'ws-message';
const TYPE_API_MESSAGE = 'api-message'; const TYPE_API_MESSAGE = 'api-message';
const TYPE_REPLICATOR_SUBSCRIBED = 'replicator-subscribed'; const TYPE_REPLICATOR_SUBSCRIBED = 'replicator-subscribed';
const TYPE_REPLICATOR_UNSUBSCRIBED = 'replicator-unsubscribed'; const TYPE_REPLICATOR_UNSUBSCRIBED = 'replicator-unsubscribed';
const TYPE_REPLICATOR_MESSAGE_RECEIVED = 'replicator-message-received'; const TYPE_REPLICATOR_MESSAGE_RECEIVED = 'replicator-message-received';
/** /**
* The list of all channels. * The list of all channels.
*
* @var array
*/ */
public static $channels = [ public static array $channels = [
self::TYPE_DISCONNECTED, self::TYPE_DISCONNECTED,
self::TYPE_CONNECTED, self::TYPE_CONNECTED,
self::TYPE_SUBSCRIBED, self::TYPE_SUBSCRIBED,
@ -40,44 +31,77 @@ class DashboardLogger
self::TYPE_REPLICATOR_MESSAGE_RECEIVED, self::TYPE_REPLICATOR_MESSAGE_RECEIVED,
]; ];
/**
* Whether dashboard logging is enabled.
* Cached to avoid repeated config lookups.
*/
private static ?bool $enabled = null;
/**
* Cached channel manager instance.
*/
private static ?ChannelManager $channelManager = null;
/** /**
* Log an event for an app. * Log an event for an app.
* Optimized: Early exit if disabled, cached config lookups.
* *
* @param mixed $appId * @param mixed $appId
* @param string $type * @param string $type
* @param array $details * @param array $details
* @return void
*/ */
public static function log($appId, string $type, array $details = []) public static function log($appId, string $type, array $details = []): void
{ {
$channelManager = app(ChannelManager::class); // Cache enabled check
if (self::$enabled === null) {
self::$enabled = config('websockets.dashboard.enabled', true);
}
$channelName = static::LOG_CHANNEL_PREFIX.$type; // Skip if dashboard is disabled
if (!self::$enabled) {
return;
}
$payload = [ // Cache channel manager
if (self::$channelManager === null) {
self::$channelManager = app(ChannelManager::class);
}
$channelName = static::LOG_CHANNEL_PREFIX . $type;
// Build payload - use date() instead of deprecated strftime()
$payload = (object) [
'event' => 'log-message', 'event' => 'log-message',
'channel' => $channelName, 'channel' => $channelName,
'data' => [ 'data' => [
'type' => $type, 'type' => $type,
'time' => strftime('%H:%M:%S'), 'time' => date('H:i:s'),
'details' => $details, 'details' => $details,
], ],
]; ];
// Here you can use the ->find(), even if the channel // Check if channel exists locally and broadcast
// does not exist on the server. If it does not exist, $channel = self::$channelManager->find($appId, $channelName);
// then the message simply will get broadcasted
// across the other servers.
$channel = $channelManager->find($appId, $channelName);
if ($channel) { if ($channel) {
$channel->broadcastLocally( $channel->broadcastLocally($appId, $payload);
$appId, (object) $payload
);
} }
$channelManager->broadcastAcrossServers( // Always broadcast across servers (preserving original behavior)
$appId, null, $channelName, (object) $payload // The channel manager handles the replication logic
self::$channelManager->broadcastAcrossServers(
$appId,
null,
$channelName,
$payload
); );
} }
/**
* Reset cached state (useful for testing)
*/
public static function reset(): void
{
self::$enabled = null;
self::$channelManager = null;
}
} }

View File

@ -2,6 +2,7 @@
namespace BlaxSoftware\LaravelWebSockets; namespace BlaxSoftware\LaravelWebSockets;
use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface; use React\Promise\PromiseInterface;
class Helpers class Helpers
@ -13,38 +14,66 @@ class Helpers
*/ */
public static $loop = null; public static $loop = null;
/**
* Cached promise resolver class to avoid repeated config() calls
*
* @var string|null
*/
private static ?string $resolverClass = null;
/** /**
* Transform the Redis' list of key after value * Transform the Redis' list of key after value
* to key-value pairs. * to key-value pairs.
* Optimized: Uses array_chunk instead of partition with modulo.
* *
* @param array $list * @param array $list
* @return array * @return array
*/ */
public static function redisListToArray(array $list) public static function redisListToArray(array $list): array
{ {
// Redis lists come into a format where the keys are on even indexes if (empty($list)) {
// and the values are on odd indexes. This way, we know which return [];
// ones are keys and which ones are values and their get combined }
// later to form the key => value array.
[$keys, $values] = collect($list)->partition(function ($value, $key) {
return $key % 2 === 0;
});
return array_combine($keys->all(), $values->all()); // Faster approach: chunk into pairs and combine
$result = [];
$count = count($list);
for ($i = 0; $i < $count; $i += 2) {
if (isset($list[$i + 1])) {
$result[$list[$i]] = $list[$i + 1];
}
}
return $result;
} }
/** /**
* Create a new fulfilled promise with a value. * Create a new fulfilled promise with a value.
* Optimized: Caches the resolver class to avoid repeated config() lookups.
* *
* @param mixed $value * @param mixed $value
* @return \React\Promise\PromiseInterface * @return \React\Promise\PromiseInterface
*/ */
public static function createFulfilledPromise($value): PromiseInterface public static function createFulfilledPromise($value): PromiseInterface
{ {
$resolver = config( // Cache the resolver class on first call
'websockets.promise_resolver', \React\Promise\FulfilledPromise::class if (self::$resolverClass === null) {
); self::$resolverClass = config(
'websockets.promise_resolver',
FulfilledPromise::class
);
}
return new $resolver($value, static::$loop); // PHP 8.0+ dynamic class instantiation
$class = self::$resolverClass;
return new $class($value, static::$loop);
}
/**
* Reset the cached resolver class (useful for testing)
*/
public static function resetResolverCache(): void
{
self::$resolverClass = null;
} }
} }

View File

@ -3,67 +3,87 @@
namespace BlaxSoftware\LaravelWebSockets\Server\Messages; namespace BlaxSoftware\LaravelWebSockets\Server\Messages;
use BlaxSoftware\LaravelWebSockets\Events\ConnectionPonged; use BlaxSoftware\LaravelWebSockets\Events\ConnectionPonged;
use Illuminate\Support\Str;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use stdClass; use stdClass;
class PusherChannelProtocolMessage extends PusherClientMessage class PusherChannelProtocolMessage extends PusherClientMessage
{ {
/** /**
* Respond with the payload. * Pre-encoded pong response for performance
*
* @return void
*/ */
public function respond() private const PONG_RESPONSE = '{"event":"pusher.pong"}';
{
$eventName = Str::camel(Str::after($this->payload->event, ':'));
if (method_exists($this, $eventName) && $eventName !== 'respond') { /**
call_user_func([$this, $eventName], $this->connection, $this->payload->data ?? new stdClass()); * Respond with the payload.
* Optimized: Uses direct method dispatch instead of reflection.
*/
public function respond(): void
{
$event = $this->payload->event ?? '';
// Fast path for ping - most common pusher protocol message
if ($event === 'pusher:ping' || $event === 'pusher.ping') {
$this->pingFast($this->connection);
return;
}
// Extract method name from event (e.g., 'pusher:subscribe' -> 'subscribe')
$colonPos = strpos($event, ':');
if ($colonPos !== false) {
$eventName = substr($event, $colonPos + 1);
} else {
$dotPos = strpos($event, '.');
$eventName = $dotPos !== false ? substr($event, $dotPos + 1) : '';
}
// Convert to camelCase if needed (e.g., 'channel-name' -> 'channelName')
if (strpos($eventName, '-') !== false) {
$eventName = lcfirst(str_replace('-', '', ucwords($eventName, '-')));
}
if ($eventName && $eventName !== 'respond' && method_exists($this, $eventName)) {
$this->$eventName($this->connection, $this->payload->data ?? new stdClass());
} }
} }
/** /**
* Ping the connection. * Fast ping handler - avoids promise chain and event dispatch
*
* @see https://pusher.com/docs/pusher_protocol#ping-pong
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
*/ */
protected function ping(ConnectionInterface $connection) protected function pingFast(ConnectionInterface $connection): void
{ {
$this->channelManager // Update timestamp directly on connection (no promise chain)
->connectionPonged($connection) $connection->lastPongedAt = time();
->then(function () use ($connection) {
$connection->send(json_encode(['event' => 'pusher.pong']));
ConnectionPonged::dispatch($connection->app->id, $connection->socketId); // Send pre-encoded response (no json_encode overhead)
}); $connection->send(self::PONG_RESPONSE);
// Skip event dispatch for ping - it's high frequency and events are expensive
// If you need ping events, use: ConnectionPonged::dispatch($connection->app->id, $connection->socketId);
}
/**
* Legacy ping handler - kept for compatibility
* @deprecated Use pingFast instead
*/
protected function ping(ConnectionInterface $connection): void
{
$this->pingFast($connection);
} }
/** /**
* Subscribe to channel. * Subscribe to channel.
* *
* @see https://pusher.com/docs/pusher_protocol#pusher-subscribe * @see https://pusher.com/docs/pusher_protocol#pusher-subscribe
*
* @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload
* @return void
*/ */
protected function subscribe(ConnectionInterface $connection, stdClass $payload) protected function subscribe(ConnectionInterface $connection, stdClass $payload): void
{ {
$this->channelManager->subscribeToChannel($connection, $payload->channel, $payload); $this->channelManager->subscribeToChannel($connection, $payload->channel, $payload);
} }
/** /**
* Unsubscribe from the channel. * Unsubscribe from the channel.
*
* @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload
* @return void
*/ */
public function unsubscribe(ConnectionInterface $connection, stdClass $payload) public function unsubscribe(ConnectionInterface $connection, stdClass $payload): void
{ {
$this->channelManager->unsubscribeFromChannel($connection, $payload->channel, $payload); $this->channelManager->unsubscribeFromChannel($connection, $payload->channel, $payload);
} }

View File

@ -4,7 +4,6 @@ namespace BlaxSoftware\LaravelWebSockets\Server\Messages;
use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager; use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager;
use BlaxSoftware\LaravelWebSockets\Contracts\PusherMessage; use BlaxSoftware\LaravelWebSockets\Contracts\PusherMessage;
use Illuminate\Support\Str;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use Ratchet\RFC6455\Messaging\MessageInterface; use Ratchet\RFC6455\Messaging\MessageInterface;
@ -12,6 +11,7 @@ class PusherMessageFactory
{ {
/** /**
* Create a new message. * Create a new message.
* Optimized: Uses direct string comparison instead of Str::startsWith.
* *
* @param \Ratchet\RFC6455\Messaging\MessageInterface $message * @param \Ratchet\RFC6455\Messaging\MessageInterface $message
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
@ -21,14 +21,25 @@ class PusherMessageFactory
public static function createForMessage( public static function createForMessage(
MessageInterface $message, MessageInterface $message,
ConnectionInterface $connection, ConnectionInterface $connection,
ChannelManager $channelManager): PusherMessage ChannelManager $channelManager
{ ): PusherMessage {
$payload = json_decode($message->getPayload()); $payload = json_decode($message->getPayload());
$event = $payload->event ?? '';
return ( // Fast string prefix check (faster than Str::startsWith)
Str::startsWith($payload->event, 'pusher.') // Check first 7 chars for 'pusher.' or 'pusher:'
|| Str::startsWith($payload->event, 'pusher:') $isPusherEvent = (
) isset($event[6]) &&
$event[0] === 'p' &&
$event[1] === 'u' &&
$event[2] === 's' &&
$event[3] === 'h' &&
$event[4] === 'e' &&
$event[5] === 'r' &&
($event[6] === '.' || $event[6] === ':')
);
return $isPusherEvent
? new PusherChannelProtocolMessage($payload, $connection, $channelManager) ? new PusherChannelProtocolMessage($payload, $connection, $channelManager)
: new PusherClientMessage($payload, $connection, $channelManager); : new PusherClientMessage($payload, $connection, $channelManager);
} }

View File

@ -5,6 +5,7 @@ declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Websocket; namespace BlaxSoftware\LaravelWebSockets\Websocket;
use BlaxSoftware\LaravelWebSockets\Apps\App; use BlaxSoftware\LaravelWebSockets\Apps\App;
use BlaxSoftware\LaravelWebSockets\Cache\IpcCache;
use BlaxSoftware\LaravelWebSockets\Channels\Channel; use BlaxSoftware\LaravelWebSockets\Channels\Channel;
use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel; use BlaxSoftware\LaravelWebSockets\Channels\PresenceChannel;
use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel; use BlaxSoftware\LaravelWebSockets\Channels\PrivateChannel;
@ -32,29 +33,190 @@ class Handler implements MessageComponentInterface
* Track channel connections using associative arrays for O(1) lookup * Track channel connections using associative arrays for O(1) lookup
* Structure: [channel_name => [socket_id => true]] * Structure: [channel_name => [socket_id => true]]
*/ */
protected $channel_connections = []; protected array $channel_connections = [];
/** /**
* Cache write buffer for batching operations * Cache write buffer for batching operations
* Reduces file I/O when multiple rapid requests occur * Reduces file I/O when multiple rapid requests occur
*/ */
protected $cacheWriteBuffer = []; protected array $cacheWriteBuffer = [];
protected $cacheDeleteBuffer = []; protected array $cacheDeleteBuffer = [];
protected $cacheBufferScheduled = false; protected bool $cacheBufferScheduled = false;
/**
* Pre-encoded static JSON responses for performance
* Encoding once at startup is faster than encoding every time
*/
private static string $PONG_RESPONSE = '{"event":"pusher.pong"}';
/**
* GC collection counter - only collect every N pings
*/
private int $gcCounter = 0;
private const GC_INTERVAL = 100;
/** /**
* Initialize a new handler. * Initialize a new handler.
*
* @return void
*/ */
public function __construct( public function __construct(
protected ChannelManager $channelManager protected ChannelManager $channelManager
) {} ) {}
public function onOpen(ConnectionInterface $connection) /**
* Handle incoming WebSocket message with optimized fast path for ping/pong
*/
public function onMessage(
ConnectionInterface $connection,
MessageInterface $message
): void {
if (!isset($connection->app)) {
return;
}
// FAST PATH: Check for ping before any heavy processing
// Use raw string comparison on payload to avoid JSON decode overhead
$payload = $message->getPayload();
// Quick ping detection using strpos (faster than json_decode + array access)
if ($this->tryHandlePingFast($payload, $connection)) {
return;
}
// SLOW PATH: Full message processing
try {
$this->processFullMessage($connection, $message, $payload);
} catch (\Throwable $e) {
$this->handleMessageError($e);
}
}
/**
* Fast path for ping/pong - avoids JSON decode, object creation, promises
* Target: < 1ms processing time
*/
private function tryHandlePingFast(string $payload, ConnectionInterface $connection): bool
{ {
if (! $this->connectionCanBeMade($connection)) { // Quick string check - if doesn't contain "ping", skip fast path
return $connection->close(); // strpos is O(n) but very fast for short strings
if (strpos($payload, 'ping') === false) {
return false;
}
// Now do minimal JSON decode to confirm it's a ping
$data = json_decode($payload, true);
if ($data === null) {
return false;
}
$event = $data['event'] ?? '';
// Direct string comparison (faster than strtolower + comparison)
if ($event !== 'pusher:ping' && $event !== 'pusher.ping') {
return false;
}
// Update connection timestamp directly on connection object (no promise chain)
$connection->lastPongedAt = time();
// Send pre-encoded pong response immediately
$connection->send(self::$PONG_RESPONSE);
// Periodic GC instead of every ping
if (++$this->gcCounter >= self::GC_INTERVAL) {
$this->gcCounter = 0;
gc_collect_cycles();
}
return true;
}
/**
* Full message processing for non-ping messages
*/
private function processFullMessage(
ConnectionInterface $connection,
MessageInterface $message,
string $payload
): void {
// Set remote address once (moved from per-message to reduce overhead)
if (isset($connection->remoteAddress)) {
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
}
// Decode message (we already have payload string)
$messageArray = json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
// Handle pusher protocol messages (subscribe, unsubscribe, etc.)
$this->handlePusherProtocolMessage($message, $connection, $messageArray);
$channel = $this->handleChannelSubscriptions($messageArray, $connection);
if ($this->shouldRejectMessage($channel, $connection, $messageArray)) {
return;
}
$this->authenticateConnection($connection, $channel, $messageArray);
// Only log in debug mode to reduce I/O
if (config('app.debug')) {
Log::channel('websocket')->debug('[' . $connection->socketId . ']@' . $channel->getName() . ' | ' . $payload);
}
if ($this->handlePusherEvent($messageArray, $connection)) {
return;
}
$this->forkAndProcessMessage($connection, $channel, $messageArray);
}
/**
* Handle pusher protocol messages (formerly in PusherMessageFactory)
* Inlined for performance - avoids object creation
*/
private function handlePusherProtocolMessage(
MessageInterface $message,
ConnectionInterface $connection,
array $messageArray
): void {
$event = $messageArray['event'] ?? '';
// Fast check - most messages don't start with 'pusher' or 'client-'
$firstChar = $event[0] ?? '';
if ($firstChar !== 'p' && $firstChar !== 'c') {
return;
}
// Check for client- messages
if (strpos($event, 'client-') === 0) {
if (!$connection->app->clientMessagesEnabled) {
return;
}
$channelName = $messageArray['channel'] ?? null;
if (!$channelName) {
return;
}
$channel = $this->channelManager->find($connection->app->id, $channelName);
if ($channel) {
$channel->broadcastToEveryoneExcept(
(object) $messageArray,
$connection->socketId,
$connection->app->id
);
}
return;
}
// Check for pusher: or pusher. messages (subscribe/unsubscribe handled elsewhere)
// This is handled by handleChannelSubscriptions for subscribe/unsubscribe
}
public function onOpen(ConnectionInterface $connection): void
{
if (!$this->connectionCanBeMade($connection)) {
$connection->close();
return;
} }
try { try {
@ -74,48 +236,6 @@ class Handler implements MessageComponentInterface
} }
} }
public function onMessage(
ConnectionInterface $connection,
MessageInterface $message
) {
if (!isset($connection->app)) {
return;
}
try {
request()->server->set('REMOTE_ADDR', $connection->remoteAddress);
PusherMessageFactory::createForMessage(
$message,
$connection,
$this->channelManager
)->respond();
$message = json_decode($message->getPayload(), true, 512, JSON_THROW_ON_ERROR);
if ($this->handlePingPong($message, $connection)) {
return;
}
$channel = $this->handleChannelSubscriptions($message, $connection);
if ($this->shouldRejectMessage($channel, $connection, $message)) {
return;
}
$this->authenticateConnection($connection, $channel, $message);
\Log::channel('websocket')->info('[' . $connection->socketId . ']@' . $channel->getName() . ' | ' . json_encode($message));
if ($this->handlePusherEvent($message, $connection)) {
return;
}
$this->forkAndProcessMessage($connection, $channel, $message);
} catch (\Throwable $e) {
$this->handleMessageError($e);
}
}
/** /**
* Handle the websocket close. * Handle the websocket close.
*/ */
@ -150,8 +270,10 @@ class Handler implements MessageComponentInterface
return; return;
} }
// Initialize lastPongedAt with unix timestamp (faster than Carbon)
$connection->lastPongedAt = time();
$this->channelManager->subscribeToApp($connection->app->id); $this->channelManager->subscribeToApp($connection->app->id);
$this->channelManager->connectionPonged($connection);
NewConnection::dispatch( NewConnection::dispatch(
$connection->app->id, $connection->app->id,
@ -159,25 +281,14 @@ class Handler implements MessageComponentInterface
); );
} }
protected function handlePingPong(array $message, ConnectionInterface $connection): bool
{
$eventLower = strtolower($message['event']);
if ($eventLower !== 'pusher:ping' && $eventLower !== 'pusher.ping') {
return false;
}
$this->channelManager->connectionPonged($connection);
gc_collect_cycles();
return true;
}
protected function shouldRejectMessage(?Channel $channel, ConnectionInterface $connection, array $message): bool protected function shouldRejectMessage(?Channel $channel, ConnectionInterface $connection, array $message): bool
{ {
$isUnsubscribe = $message['event'] === 'pusher:unsubscribe' || $message['event'] === 'pusher.unsubscribe'; $event = $message['event'] ?? '';
$isUnsubscribe = $event === 'pusher:unsubscribe' || $event === 'pusher.unsubscribe';
if (!$channel?->hasConnection($connection) && !$isUnsubscribe) { if (!$channel?->hasConnection($connection) && !$isUnsubscribe) {
$connection->send(json_encode([ $connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $event . ':error',
'data' => [ 'data' => [
'message' => 'Subscription not established', 'message' => 'Subscription not established',
'meta' => $message, 'meta' => $message,
@ -220,6 +331,10 @@ class Handler implements MessageComponentInterface
Channel $channel, Channel $channel,
array $message array $message
): void { ): void {
// Generate unique request ID BEFORE forking to avoid race conditions
// Using uniqid with more_entropy + random_bytes for guaranteed uniqueness
$requestId = uniqid('req_', true) . '_' . bin2hex(random_bytes(4));
$pid = pcntl_fork(); $pid = pcntl_fork();
if ($pid === -1) { if ($pid === -1) {
@ -228,24 +343,25 @@ class Handler implements MessageComponentInterface
} }
if ($pid === 0) { if ($pid === 0) {
$this->processMessageInChild($connection, $channel, $message); $this->processMessageInChild($connection, $channel, $message, $requestId);
exit(0); exit(0);
} }
$this->addDataCheckLoop($connection, $message, $pid); $this->addDataCheckLoop($connection, $message, $requestId);
} }
protected function processMessageInChild( protected function processMessageInChild(
ConnectionInterface $connection, ConnectionInterface $connection,
Channel $channel, Channel $channel,
array $message array $message,
string $requestId
): void { ): void {
try { try {
DB::disconnect(); DB::disconnect();
DB::reconnect(); DB::reconnect();
$this->setRequest($message, $connection); $this->setRequest($message, $connection);
$mock = new MockConnection($connection); $mock = new MockConnection($connection, $requestId);
Controller::controll_message( Controller::controll_message(
$mock, $mock,
@ -737,27 +853,29 @@ class Handler implements MessageComponentInterface
private function addDataCheckLoop( private function addDataCheckLoop(
$connection, $connection,
$message, $message,
$pid, string $requestId,
$optional = false, $optional = false,
$iteration = false int $iteration = 0
) { ) {
$pid = $this->preparePid($pid, $iteration); $iterationKey = $requestId . ($iteration > 0 ? '_' . $iteration : '');
$pidcache_start = 'dedicated_start_' . $pid; $cacheKeyStart = 'dedicated_start_' . $iterationKey;
cache()->put($pidcache_start, microtime(true), 100); IpcCache::put($cacheKeyStart, microtime(true), 100);
$this->channelManager->loop->addPeriodicTimer(0.01, function ($timer) use ( $this->channelManager->loop->addPeriodicTimer(0.01, function ($timer) use (
$pidcache_start, $cacheKeyStart,
$iterationKey,
$message, $message,
$pid, $requestId,
$connection, $connection,
$optional, $optional,
$iteration $iteration
) { ) {
$this->checkDataLoopIteration( $this->checkDataLoopIteration(
$timer, $timer,
$pidcache_start, $cacheKeyStart,
$message, $message,
$pid, $iterationKey,
$requestId,
$connection, $connection,
$optional, $optional,
$iteration $iteration
@ -767,56 +885,66 @@ class Handler implements MessageComponentInterface
}); });
} }
protected function preparePid($pid, $iteration): string
{
$pid = explode('_', $pid . '')[0];
if ($iteration >= 0 && $iteration !== false) {
$pid .= '_' . $iteration;
}
return $pid;
}
protected function checkDataLoopIteration( protected function checkDataLoopIteration(
$timer, $timer,
string $pidcache_start, string $cacheKeyStart,
array $message, array $message,
string $pid, string $iterationKey,
string $requestId,
$connection, $connection,
bool $optional, bool $optional,
$iteration int $iteration
): void { ): void {
$pidcache_data = 'dedicated_data_' . $pid; $cacheKeyData = 'dedicated_data_' . $iterationKey;
$pidcache_done = 'dedicated_data_' . $pid . '_done'; $cacheKeyDone = 'dedicated_data_' . $iterationKey . '_done';
$pidcache_complete = 'dedicated_data_' . $pid . '_complete'; $cacheKeyComplete = 'dedicated_data_' . $iterationKey . '_complete';
if ($this->handleTimeout($timer, $pidcache_start, $pidcache_complete, $message, $connection, $optional)) { if ($this->handleTimeout($timer, $cacheKeyStart, $cacheKeyComplete, $message, $connection, $optional)) {
return; return;
} }
if (!cache()->has($pidcache_done)) { if (!IpcCache::has($cacheKeyDone)) {
return; return;
} }
$this->scheduleNextIteration($connection, $message, $pid, $iteration); // Clean up cache entries for this iteration before processing
$this->processAndSendData($connection, $pidcache_data); // This prevents memory leaks and stale data issues
$this->cleanupIterationCache($iterationKey);
$this->scheduleNextIteration($connection, $message, $requestId, $iteration);
$this->processAndSendData($connection, $cacheKeyData);
$this->channelManager->loop->cancelTimer($timer); $this->channelManager->loop->cancelTimer($timer);
} }
/**
* Clean up cache entries for a completed iteration
*/
protected function cleanupIterationCache(string $iterationKey): void
{
$keysToDelete = [
'dedicated_start_' . $iterationKey,
'dedicated_data_' . $iterationKey . '_done',
// Note: We don't delete 'dedicated_data_' here as we need it for processAndSendData
// It will expire naturally after 60 seconds
];
IpcCache::forgetMultiple($keysToDelete);
}
protected function handleTimeout( protected function handleTimeout(
$timer, $timer,
string $pidcache_start, string $cacheKeyStart,
string $pidcache_complete, string $cacheKeyComplete,
array $message, array $message,
$connection, $connection,
bool $optional bool $optional
): bool { ): bool {
if (!cache()->has($pidcache_start)) { $startTime = IpcCache::get($cacheKeyStart);
if ($startTime === null) {
return false; return false;
} }
$diff = microtime(true) - ((int) cache()->get($pidcache_start)); $diff = microtime(true) - ((float) $startTime);
if ($diff <= 60) { if ($diff <= 60) {
return false; return false;
} }
@ -832,19 +960,27 @@ class Handler implements MessageComponentInterface
} }
$this->channelManager->loop->cancelTimer($timer); $this->channelManager->loop->cancelTimer($timer);
cache()->put($pidcache_complete, true, 360); IpcCache::put($cacheKeyComplete, true, 360);
return true; return true;
} }
protected function scheduleNextIteration($connection, array $message, string $pid, $iteration): void protected function scheduleNextIteration($connection, array $message, string $requestId, int $iteration): void
{ {
$nextIteration = ($iteration === false) ? 0 : $iteration + 1; $nextIteration = $iteration + 1;
$this->addDataCheckLoop($connection, $message, $pid, true, $nextIteration); $this->addDataCheckLoop($connection, $message, $requestId, true, $nextIteration);
} }
protected function processAndSendData($connection, string $pidcache_data): void protected function processAndSendData($connection, string $cacheKeyData): void
{ {
$sending = cache()->get($pidcache_data); $sending = IpcCache::get($cacheKeyData);
// Clean up the data cache key immediately after reading
IpcCache::forget($cacheKeyData);
if (!$sending) {
return;
}
$bm = json_decode($sending, true); $bm = json_decode($sending, true);
if (isset($bm['broadcast']) && $bm['broadcast']) { if (isset($bm['broadcast']) && $bm['broadcast']) {

View File

@ -4,6 +4,7 @@ declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Websocket; namespace BlaxSoftware\LaravelWebSockets\Websocket;
use BlaxSoftware\LaravelWebSockets\Cache\IpcCache;
use React\Socket\Connection; use React\Socket\Connection;
use Illuminate\Support\Facades\Log; use Illuminate\Support\Facades\Log;
@ -17,8 +18,22 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface
public $ip; public $ip;
public $app; public $app;
public function __construct($original_connection) /**
* Unique request ID for cache-based communication
* Used instead of PID to avoid race conditions from PID reuse
*/
protected string $requestId;
/**
* Track current iteration for multi-response scenarios
*/
protected int $currentIteration = 0;
public function __construct($original_connection, ?string $requestId = null)
{ {
// Generate fallback requestId if not provided (for backward compatibility)
$this->requestId = $requestId ?? uniqid('req_', true) . '_' . bin2hex(random_bytes(4));
// create an indisdinctable copy of the original connection // create an indisdinctable copy of the original connection
foreach (get_object_vars($original_connection) as $key => $value) { foreach (get_object_vars($original_connection) as $key => $value) {
$this->{$key} = $value; $this->{$key} = $value;
@ -52,8 +67,11 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface
public function send($data) public function send($data)
{ {
if(cache()->get('dedicated_data_'.getmypid().'_complete')){ $key = $this->getDataKey();
Log::error('[MockConnection] Send for pid: ' . getmypid() . ' which is already completed and does not check for new data', [ $completeKey = $key . '_complete';
if (IpcCache::get($completeKey)) {
Log::error('[MockConnection] Send for request: ' . $this->requestId . ' which is already completed and does not check for new data', [
'data' => $data, 'data' => $data,
]); ]);
return $this; return $this;
@ -64,14 +82,16 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface
throw new \InvalidArgumentException('Data must be a string or an object that can be converted to a string.'); throw new \InvalidArgumentException('Data must be a string or an object that can be converted to a string.');
} }
Log::channel('websocket')->info('[MockConnection] Send for pid: ' . getmypid(), [ Log::channel('websocket')->info('[MockConnection] Send for request: ' . $this->requestId . ' iteration: ' . $this->currentIteration, [
'data' => $data, 'data' => $data,
]); ]);
$key = static::getDataKey(); // Use atomic set to avoid race conditions - IpcCache uses tmpfs for speed
IpcCache::put($key, $data, 60);
IpcCache::put($key . '_done', true, 60);
cache()->put($key, $data, 60); // Increment iteration for next send call
cache()->put($key . '_done', true, 60); $this->currentIteration++;
return $this; return $this;
} }
@ -80,7 +100,7 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface
$data, $data,
?string $channel = null, ?string $channel = null,
bool $including_self = false, bool $including_self = false,
){ ) {
$data ??= []; $data ??= [];
$data['broadcast'] = true; $data['broadcast'] = true;
$data['channel'] ??= $channel; $data['channel'] ??= $channel;
@ -102,26 +122,18 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface
return $this->send(json_encode($data)); return $this->send(json_encode($data));
} }
private static function getDataKey() /**
* Get the data key for the current request and iteration
* Now uses the unique requestId instead of PID to avoid race conditions
*/
private function getDataKey(): string
{ {
$key = 'dedicated_data_' . getmypid(); $baseKey = 'dedicated_data_' . $this->requestId;
$i = '';
while (cache()->has($key . ($i !== '' ? '_' . $i : ''))) { if ($this->currentIteration > 0) {
if ($i === '') { return $baseKey . '_' . $this->currentIteration;
$i = 0;
} else {
$i = (int) $i;
$i++;
}
} }
if ($i !== '') { return $baseKey;
$i = '_' . $i;
}
$key .= $i;
return $key;
} }
} }