diff --git a/composer.json b/composer.json index ab463cf..b3a6ac6 100644 --- a/composer.json +++ b/composer.json @@ -74,7 +74,10 @@ "autoload": { "psr-4": { "BlaxSoftware\\LaravelWebSockets\\": "src/" - } + }, + "files": [ + "src/helpers_global.php" + ] }, "autoload-dev": { "psr-4": { diff --git a/config/websockets.php b/config/websockets.php index 3226717..d39a3f2 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -24,6 +24,25 @@ return [ */ 'hot_reload' => env('WEBSOCKET_HOT_RELOAD', env('APP_DEBUG', false)), + /* + |-------------------------------------------------------------------------- + | Broadcast Socket Settings + |-------------------------------------------------------------------------- + | + | The broadcast socket allows external PHP processes (queue workers, HTTP + | requests, etc.) to send broadcasts to WebSocket clients efficiently via + | a Unix domain socket, without the overhead of creating new connections. + | + | This provides global helper functions: + | - ws_broadcast($event, $data, $channel) - Broadcast to all clients + | - ws_whisper($event, $data, $sockets, $channel) - Send to specific sockets + | - ws_broadcast_except($event, $data, $exclude, $channel) - Broadcast except some + | - ws_available() - Check if broadcast socket is available + | + */ + 'broadcast_socket_enabled' => env('WEBSOCKET_BROADCAST_SOCKET', true), + 'broadcast_socket' => env('WEBSOCKET_BROADCAST_SOCKET_PATH', '/tmp/laravel-websockets-broadcast.sock'), + /* |-------------------------------------------------------------------------- | Dashboard Settings diff --git a/src/Broadcast/BroadcastClient.php b/src/Broadcast/BroadcastClient.php new file mode 100644 index 0000000..4dbb187 --- /dev/null +++ b/src/Broadcast/BroadcastClient.php @@ -0,0 +1,302 @@ +socketPath = config('websockets.broadcast_socket', '/tmp/laravel-websockets-broadcast.sock'); + } + + /** + * Get the singleton instance + */ + public static function instance(): self + { + if (self::$instance === null) { + self::$instance = new self(); + } + + return self::$instance; + } + + /** + * Reset the singleton (useful for testing or when socket path changes) + */ + public static function reset(): void + { + if (self::$instance !== null) { + self::$instance->disconnect(); + self::$instance = null; + } + } + + /** + * Connect to the broadcast socket server + */ + protected function connect(): bool + { + if ($this->connected && $this->socket !== null) { + // Check if socket is still valid + if ($this->isSocketValid()) { + return true; + } + // Socket became invalid, disconnect and reconnect + $this->disconnect(); + } + + if (!file_exists($this->socketPath)) { + Log::debug('[BroadcastClient] Socket file does not exist: ' . $this->socketPath); + return false; + } + + $this->socket = @stream_socket_client( + 'unix://' . $this->socketPath, + $errno, + $errstr, + 1.0, // 1 second timeout for connection + STREAM_CLIENT_CONNECT + ); + + if ($this->socket === false) { + Log::warning('[BroadcastClient] Failed to connect: ' . $errstr . ' (' . $errno . ')'); + $this->socket = null; + return false; + } + + // Set socket options for efficiency + stream_set_blocking($this->socket, true); + stream_set_timeout($this->socket, 1); // 1 second read timeout + + $this->connected = true; + $this->readBuffer = ''; + + return true; + } + + /** + * Check if the socket is still valid + */ + protected function isSocketValid(): bool + { + if ($this->socket === null) { + return false; + } + + // Check if socket is still open + $meta = @stream_get_meta_data($this->socket); + if ($meta === false || ($meta['eof'] ?? false)) { + return false; + } + + return true; + } + + /** + * Disconnect from the socket + */ + public function disconnect(): void + { + if ($this->socket !== null) { + @fclose($this->socket); + $this->socket = null; + } + $this->connected = false; + $this->readBuffer = ''; + } + + /** + * Send a broadcast command to the WebSocket server + * + * @param string $event Event name + * @param array $data Event data + * @param string $channel Channel name (default: 'websocket') + * @param array|null $sockets Target specific socket IDs (null = all) + * @param array $excludeSockets Socket IDs to exclude + * @return bool Success + */ + public function send( + string $event, + array $data, + string $channel = 'websocket', + ?array $sockets = null, + array $excludeSockets = [] + ): bool { + $payload = [ + 'event' => $event, + 'channel' => $channel, + 'data' => $data, + ]; + + if ($sockets !== null) { + $payload['sockets'] = $sockets; + } + + if (!empty($excludeSockets)) { + $payload['exclude_sockets'] = $excludeSockets; + } + + return $this->sendRaw($payload); + } + + /** + * Send raw payload to the broadcast server + */ + protected function sendRaw(array $payload): bool + { + $message = json_encode($payload) . "\n"; + + for ($attempt = 0; $attempt < $this->maxReconnectAttempts; $attempt++) { + if (!$this->connect()) { + // Socket not available, try after small delay + if ($attempt < $this->maxReconnectAttempts - 1) { + usleep(10000); // 10ms + } + continue; + } + + $written = @fwrite($this->socket, $message); + + if ($written === false || $written !== strlen($message)) { + // Write failed, connection might be broken + $this->disconnect(); + continue; + } + + // Read response (optional, for confirmation) + $response = $this->readResponse(); + + if ($response !== null) { + return $response['success'] ?? false; + } + + // No response but write succeeded - assume success + return true; + } + + Log::warning('[BroadcastClient] Failed to send after ' . $this->maxReconnectAttempts . ' attempts'); + return false; + } + + /** + * Read a response from the socket + */ + protected function readResponse(): ?array + { + if ($this->socket === null) { + return null; + } + + // Try to read with timeout + $data = @fgets($this->socket, 8192); + + if ($data === false) { + // Check if it's a timeout or error + $meta = @stream_get_meta_data($this->socket); + if (($meta['timed_out'] ?? false) || ($meta['eof'] ?? false)) { + return null; + } + return null; + } + + $data = trim($data); + if ($data === '') { + return null; + } + + return json_decode($data, true); + } + + /** + * Whisper (send to specific sockets only) + */ + public function whisper( + string $event, + array $data, + array $sockets, + string $channel = 'websocket' + ): bool { + return $this->send($event, $data, $channel, $sockets); + } + + /** + * Broadcast to all except specified sockets + */ + public function broadcastExcept( + string $event, + array $data, + array $excludeSockets, + string $channel = 'websocket' + ): bool { + return $this->send($event, $data, $channel, null, $excludeSockets); + } + + /** + * Check if the broadcast socket is available + */ + public function isAvailable(): bool + { + return file_exists($this->socketPath); + } + + /** + * Get the socket path + */ + public function getSocketPath(): string + { + return $this->socketPath; + } + + /** + * Destructor - clean up socket + */ + public function __destruct() + { + $this->disconnect(); + } +} diff --git a/src/Broadcast/BroadcastSocketServer.php b/src/Broadcast/BroadcastSocketServer.php new file mode 100644 index 0000000..ef73431 --- /dev/null +++ b/src/Broadcast/BroadcastSocketServer.php @@ -0,0 +1,201 @@ +loop = $loop; + $this->channelManager = $channelManager; + $this->socketPath = config('websockets.broadcast_socket', '/tmp/laravel-websockets-broadcast.sock'); + } + + /** + * Start the broadcast socket server + */ + public function start(): void + { + // Remove stale socket file if exists + if (file_exists($this->socketPath)) { + unlink($this->socketPath); + } + + try { + $this->server = new UnixServer($this->socketPath, $this->loop); + + // Set permissions so other processes can connect + chmod($this->socketPath, 0666); + + $this->server->on('connection', [$this, 'handleConnection']); + $this->server->on('error', function (\Exception $e) { + Log::error('[BroadcastSocket] Server error: ' . $e->getMessage()); + }); + + Log::info('[BroadcastSocket] Listening on ' . $this->socketPath); + } catch (\Exception $e) { + Log::error('[BroadcastSocket] Failed to start: ' . $e->getMessage()); + } + } + + /** + * Handle a new client connection + */ + public function handleConnection(ConnectionInterface $connection): void + { + $clientId = spl_object_hash($connection); + $this->clients[$clientId] = $connection; + $buffer = ''; + + $connection->on('data', function ($data) use ($connection, &$buffer) { + $buffer .= $data; + + // Process complete messages (newline-delimited) + while (($pos = strpos($buffer, "\n")) !== false) { + $message = substr($buffer, 0, $pos); + $buffer = substr($buffer, $pos + 1); + + if ($message !== '') { + $this->handleMessage($connection, $message); + } + } + }); + + $connection->on('close', function () use ($clientId) { + unset($this->clients[$clientId]); + }); + + $connection->on('error', function (\Exception $e) use ($clientId) { + Log::warning('[BroadcastSocket] Client error: ' . $e->getMessage()); + unset($this->clients[$clientId]); + }); + } + + /** + * Handle a broadcast message from a client + */ + protected function handleMessage(ConnectionInterface $connection, string $message): void + { + try { + $payload = json_decode($message, true); + + if (!$payload || !isset($payload['event'])) { + $connection->write(json_encode(['success' => false, 'error' => 'Invalid payload']) . "\n"); + return; + } + + $channel = $payload['channel'] ?? 'websocket'; + $event = $payload['event']; + $data = $payload['data'] ?? []; + $sockets = $payload['sockets'] ?? null; // Target specific sockets + $excludeSockets = $payload['exclude_sockets'] ?? []; // Exclude specific sockets + + // Get channel instance and broadcast + $channelInstance = $this->channelManager->find('websockets', $channel); + + if ($channelInstance) { + $this->broadcastToChannel($channelInstance, $event, $data, $sockets, $excludeSockets); + $connection->write(json_encode(['success' => true]) . "\n"); + } else { + // Channel doesn't exist or no subscribers - still success + $connection->write(json_encode(['success' => true, 'warning' => 'No channel subscribers']) . "\n"); + } + } catch (\Exception $e) { + Log::error('[BroadcastSocket] Error handling message: ' . $e->getMessage()); + $connection->write(json_encode(['success' => false, 'error' => $e->getMessage()]) . "\n"); + } + } + + /** + * Broadcast to a channel + */ + protected function broadcastToChannel($channel, string $event, array $data, ?array $sockets, array $excludeSockets): void + { + $payload = json_encode([ + 'event' => $event, + 'channel' => $channel->getName(), + 'data' => $data, + ]); + + // Get subscribers + $subscribers = $channel->getSubscribedConnections(); + + foreach ($subscribers as $connection) { + $socketId = $connection->socketId ?? null; + + // Filter by specific sockets if provided + if ($sockets !== null && !in_array($socketId, $sockets)) { + continue; + } + + // Exclude specific sockets + if (in_array($socketId, $excludeSockets)) { + continue; + } + + $connection->send($payload); + } + } + + /** + * Stop the server + */ + public function stop(): void + { + if ($this->server) { + $this->server->close(); + $this->server = null; + } + + // Clean up socket file + if (file_exists($this->socketPath)) { + unlink($this->socketPath); + } + + // Close all client connections + foreach ($this->clients as $client) { + $client->close(); + } + $this->clients = []; + } + + /** + * Get the socket path + */ + public function getSocketPath(): string + { + return $this->socketPath; + } +} diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 122aa7c..0db8768 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -2,6 +2,7 @@ namespace BlaxSoftware\LaravelWebSockets\Console\Commands; +use BlaxSoftware\LaravelWebSockets\Broadcast\BroadcastSocketServer; use BlaxSoftware\LaravelWebSockets\Cache\IpcCache; use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager; use BlaxSoftware\LaravelWebSockets\Facades\StatisticsCollector as StatisticsCollectorFacade; @@ -151,6 +152,10 @@ class StartServer extends Command $this->configurePcntlSignal(); \Log::channel('websocket')->debug('PCNTL signals configured'); + \Log::channel('websocket')->debug('Configuring broadcast socket...'); + $this->configureBroadcastSocket(); + \Log::channel('websocket')->debug('Broadcast socket configured'); + // $this->configurePongTracker(); \Log::channel('websocket')->debug('Starting server...'); @@ -317,6 +322,37 @@ class StartServer extends Command \Log::channel('websocket')->debug('SIGINT handler registered'); } + /** + * Configure the broadcast socket server for efficient broadcasting. + * + * This creates a Unix domain socket that external processes (queue workers, + * HTTP requests, etc.) can connect to for sending broadcasts without the + * overhead of creating new WebSocket connections. + * + * @return void + */ + protected function configureBroadcastSocket(): void + { + if (config('websockets.broadcast_socket_enabled', true) === false) { + \Log::channel('websocket')->debug('Broadcast socket disabled by config'); + return; + } + + try { + $channelManager = $this->laravel->make(ChannelManager::class); + $broadcastServer = new BroadcastSocketServer($this->loop, $channelManager); + $broadcastServer->start(); + + // Store reference for cleanup on shutdown + $this->laravel->instance(BroadcastSocketServer::class, $broadcastServer); + + $this->components->info('Broadcast socket listening on ' . $broadcastServer->getSocketPath()); + } catch (\Throwable $e) { + \Log::channel('websocket')->warning('Failed to start broadcast socket: ' . $e->getMessage()); + $this->components->warn('Broadcast socket failed to start: ' . $e->getMessage()); + } + } + /** * Configure the tracker that will delete * from the store the connections that. @@ -666,6 +702,9 @@ class StartServer extends Command \Log::channel('websocket')->info('Triggering hard shutdown...'); $this->line('Hard shutdown initiated, stopping server immediately...'); + // Stop the broadcast socket server + $this->stopBroadcastSocket(); + $this->loop->stop(); } @@ -702,7 +741,29 @@ class StartServer extends Command }) ->then(function () { \Log::channel('websocket')->debug('All connections closed, stopping loop...'); + + // Stop the broadcast socket server + $this->stopBroadcastSocket(); + $this->loop->stop(); }); } + + /** + * Stop the broadcast socket server if running. + * + * @return void + */ + protected function stopBroadcastSocket(): void + { + try { + if ($this->laravel->bound(BroadcastSocketServer::class)) { + $broadcastServer = $this->laravel->make(BroadcastSocketServer::class); + $broadcastServer->stop(); + \Log::channel('websocket')->debug('Broadcast socket server stopped'); + } + } catch (\Throwable $e) { + \Log::channel('websocket')->warning('Error stopping broadcast socket: ' . $e->getMessage()); + } + } } diff --git a/src/Services/WebsocketService.php b/src/Services/WebsocketService.php index dc6eaf5..f437c16 100644 --- a/src/Services/WebsocketService.php +++ b/src/Services/WebsocketService.php @@ -4,12 +4,86 @@ declare(strict_types=1); namespace BlaxSoftware\LaravelWebSockets\Services; +use BlaxSoftware\LaravelWebSockets\Broadcast\BroadcastClient; + class WebsocketService { + /** + * Send a message via WebSocket. + * + * Automatically uses the efficient Unix socket broadcast when available, + * falling back to creating a new WebSocket connection when not. + */ public static function send( string $event, mixed $data, $channel = 'websocket' + ) { + // Try efficient broadcast socket first (Unix socket IPC) + if (ws_available()) { + $success = ws_broadcast($event, is_array($data) ? $data : ['data' => $data], $channel ?? 'websocket'); + if ($success) { + return (object)['success' => true, 'method' => 'broadcast_socket']; + } + // Fall through to WebSocket client if broadcast socket fails + } + + // Fallback: Create new WebSocket connection (slower, for when broadcast socket not available) + return static::sendViaWebSocket($event, $data, $channel); + } + + /** + * Send a message to specific socket IDs only. + * + * @param string $event Event name + * @param mixed $data Event data + * @param array $sockets Target socket IDs + * @param string $channel Channel name + * @return bool Success + */ + public static function whisper( + string $event, + mixed $data, + array $sockets, + string $channel = 'websocket' + ): bool { + if (!ws_available()) { + return false; + } + + return ws_whisper($event, is_array($data) ? $data : ['data' => $data], $sockets, $channel); + } + + /** + * Broadcast to all except specified socket IDs. + * + * @param string $event Event name + * @param mixed $data Event data + * @param array $excludeSockets Socket IDs to exclude + * @param string $channel Channel name + * @return bool Success + */ + public static function broadcastExcept( + string $event, + mixed $data, + array $excludeSockets, + string $channel = 'websocket' + ): bool { + if (!ws_available()) { + return false; + } + + return ws_broadcast_except($event, is_array($data) ? $data : ['data' => $data], $excludeSockets, $channel); + } + + /** + * Send a message by creating a new WebSocket connection. + * This is the legacy method, kept for fallback when broadcast socket is unavailable. + */ + protected static function sendViaWebSocket( + string $event, + mixed $data, + $channel = 'websocket' ) { $client = new \WebSocket\Client('ws://0.0.0.0:6001/app/'.config('websockets.apps.0.id'), [ 'timeout' => 5, diff --git a/src/helpers_global.php b/src/helpers_global.php new file mode 100644 index 0000000..b7ddcdf --- /dev/null +++ b/src/helpers_global.php @@ -0,0 +1,102 @@ + 'Hello!']); + * + * // Broadcast to a specific channel + * ws_broadcast('update', ['status' => 'complete'], 'private-user.123'); + */ + function ws_broadcast(string $event, array $data, string $channel = 'websocket'): bool + { + return BroadcastClient::instance()->send($event, $data, $channel); + } +} + +if (!function_exists('ws_whisper')) { + /** + * Send a message to specific socket IDs only. + * + * @param string $event Event name + * @param array $data Event data + * @param array $sockets Target socket IDs + * @param string $channel Channel name (default: 'websocket') + * @return bool Success + * + * @example + * // Send to specific sockets + * ws_whisper('typing', ['user' => 'John'], ['socket-123', 'socket-456']); + */ + function ws_whisper(string $event, array $data, array $sockets, string $channel = 'websocket'): bool + { + return BroadcastClient::instance()->whisper($event, $data, $sockets, $channel); + } +} + +if (!function_exists('ws_broadcast_except')) { + /** + * Broadcast a message to all clients except specified socket IDs. + * + * @param string $event Event name + * @param array $data Event data + * @param array $excludeSockets Socket IDs to exclude + * @param string $channel Channel name (default: 'websocket') + * @return bool Success + * + * @example + * // Broadcast to all except the sender + * ws_broadcast_except('message', ['text' => 'Hi'], [$currentSocketId]); + */ + function ws_broadcast_except(string $event, array $data, array $excludeSockets, string $channel = 'websocket'): bool + { + return BroadcastClient::instance()->broadcastExcept($event, $data, $excludeSockets, $channel); + } +} + +if (!function_exists('ws_client')) { + /** + * Get the WebSocket broadcast client instance. + * + * @return BroadcastClient + * + * @example + * // Check if WebSocket server is available + * if (ws_client()->isAvailable()) { + * ws_broadcast('event', $data); + * } + */ + function ws_client(): BroadcastClient + { + return BroadcastClient::instance(); + } +} + +if (!function_exists('ws_available')) { + /** + * Check if the WebSocket broadcast server is available. + * + * @return bool + */ + function ws_available(): bool + { + return BroadcastClient::instance()->isAvailable(); + } +}