IA helpers

This commit is contained in:
Fabian @ Blax Software 2026-02-03 15:03:50 +01:00
parent 986ce76fb7
commit 826c198a77
7 changed files with 763 additions and 1 deletions

View File

@ -74,7 +74,10 @@
"autoload": { "autoload": {
"psr-4": { "psr-4": {
"BlaxSoftware\\LaravelWebSockets\\": "src/" "BlaxSoftware\\LaravelWebSockets\\": "src/"
} },
"files": [
"src/helpers_global.php"
]
}, },
"autoload-dev": { "autoload-dev": {
"psr-4": { "psr-4": {

View File

@ -24,6 +24,25 @@ return [
*/ */
'hot_reload' => env('WEBSOCKET_HOT_RELOAD', env('APP_DEBUG', false)), 'hot_reload' => env('WEBSOCKET_HOT_RELOAD', env('APP_DEBUG', false)),
/*
|--------------------------------------------------------------------------
| Broadcast Socket Settings
|--------------------------------------------------------------------------
|
| The broadcast socket allows external PHP processes (queue workers, HTTP
| requests, etc.) to send broadcasts to WebSocket clients efficiently via
| a Unix domain socket, without the overhead of creating new connections.
|
| This provides global helper functions:
| - ws_broadcast($event, $data, $channel) - Broadcast to all clients
| - ws_whisper($event, $data, $sockets, $channel) - Send to specific sockets
| - ws_broadcast_except($event, $data, $exclude, $channel) - Broadcast except some
| - ws_available() - Check if broadcast socket is available
|
*/
'broadcast_socket_enabled' => env('WEBSOCKET_BROADCAST_SOCKET', true),
'broadcast_socket' => env('WEBSOCKET_BROADCAST_SOCKET_PATH', '/tmp/laravel-websockets-broadcast.sock'),
/* /*
|-------------------------------------------------------------------------- |--------------------------------------------------------------------------
| Dashboard Settings | Dashboard Settings

View File

@ -0,0 +1,302 @@
<?php
declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Broadcast;
use Illuminate\Support\Facades\Log;
/**
* Client for sending broadcasts to the WebSocket server.
*
* Uses a persistent Unix socket connection to efficiently send
* multiple broadcast commands without connection overhead.
*
* This is a singleton - use BroadcastClient::instance() or the
* global ws_broadcast() helper.
*/
class BroadcastClient
{
/**
* Singleton instance
*/
protected static ?self $instance = null;
/**
* Socket connection to the broadcast server
* @var resource|null
*/
protected $socket = null;
/**
* Path to the Unix socket
*/
protected string $socketPath;
/**
* Whether we're currently connected
*/
protected bool $connected = false;
/**
* Maximum reconnection attempts
*/
protected int $maxReconnectAttempts = 3;
/**
* Buffer for reading responses
*/
protected string $readBuffer = '';
protected function __construct()
{
$this->socketPath = config('websockets.broadcast_socket', '/tmp/laravel-websockets-broadcast.sock');
}
/**
* Get the singleton instance
*/
public static function instance(): self
{
if (self::$instance === null) {
self::$instance = new self();
}
return self::$instance;
}
/**
* Reset the singleton (useful for testing or when socket path changes)
*/
public static function reset(): void
{
if (self::$instance !== null) {
self::$instance->disconnect();
self::$instance = null;
}
}
/**
* Connect to the broadcast socket server
*/
protected function connect(): bool
{
if ($this->connected && $this->socket !== null) {
// Check if socket is still valid
if ($this->isSocketValid()) {
return true;
}
// Socket became invalid, disconnect and reconnect
$this->disconnect();
}
if (!file_exists($this->socketPath)) {
Log::debug('[BroadcastClient] Socket file does not exist: ' . $this->socketPath);
return false;
}
$this->socket = @stream_socket_client(
'unix://' . $this->socketPath,
$errno,
$errstr,
1.0, // 1 second timeout for connection
STREAM_CLIENT_CONNECT
);
if ($this->socket === false) {
Log::warning('[BroadcastClient] Failed to connect: ' . $errstr . ' (' . $errno . ')');
$this->socket = null;
return false;
}
// Set socket options for efficiency
stream_set_blocking($this->socket, true);
stream_set_timeout($this->socket, 1); // 1 second read timeout
$this->connected = true;
$this->readBuffer = '';
return true;
}
/**
* Check if the socket is still valid
*/
protected function isSocketValid(): bool
{
if ($this->socket === null) {
return false;
}
// Check if socket is still open
$meta = @stream_get_meta_data($this->socket);
if ($meta === false || ($meta['eof'] ?? false)) {
return false;
}
return true;
}
/**
* Disconnect from the socket
*/
public function disconnect(): void
{
if ($this->socket !== null) {
@fclose($this->socket);
$this->socket = null;
}
$this->connected = false;
$this->readBuffer = '';
}
/**
* Send a broadcast command to the WebSocket server
*
* @param string $event Event name
* @param array $data Event data
* @param string $channel Channel name (default: 'websocket')
* @param array|null $sockets Target specific socket IDs (null = all)
* @param array $excludeSockets Socket IDs to exclude
* @return bool Success
*/
public function send(
string $event,
array $data,
string $channel = 'websocket',
?array $sockets = null,
array $excludeSockets = []
): bool {
$payload = [
'event' => $event,
'channel' => $channel,
'data' => $data,
];
if ($sockets !== null) {
$payload['sockets'] = $sockets;
}
if (!empty($excludeSockets)) {
$payload['exclude_sockets'] = $excludeSockets;
}
return $this->sendRaw($payload);
}
/**
* Send raw payload to the broadcast server
*/
protected function sendRaw(array $payload): bool
{
$message = json_encode($payload) . "\n";
for ($attempt = 0; $attempt < $this->maxReconnectAttempts; $attempt++) {
if (!$this->connect()) {
// Socket not available, try after small delay
if ($attempt < $this->maxReconnectAttempts - 1) {
usleep(10000); // 10ms
}
continue;
}
$written = @fwrite($this->socket, $message);
if ($written === false || $written !== strlen($message)) {
// Write failed, connection might be broken
$this->disconnect();
continue;
}
// Read response (optional, for confirmation)
$response = $this->readResponse();
if ($response !== null) {
return $response['success'] ?? false;
}
// No response but write succeeded - assume success
return true;
}
Log::warning('[BroadcastClient] Failed to send after ' . $this->maxReconnectAttempts . ' attempts');
return false;
}
/**
* Read a response from the socket
*/
protected function readResponse(): ?array
{
if ($this->socket === null) {
return null;
}
// Try to read with timeout
$data = @fgets($this->socket, 8192);
if ($data === false) {
// Check if it's a timeout or error
$meta = @stream_get_meta_data($this->socket);
if (($meta['timed_out'] ?? false) || ($meta['eof'] ?? false)) {
return null;
}
return null;
}
$data = trim($data);
if ($data === '') {
return null;
}
return json_decode($data, true);
}
/**
* Whisper (send to specific sockets only)
*/
public function whisper(
string $event,
array $data,
array $sockets,
string $channel = 'websocket'
): bool {
return $this->send($event, $data, $channel, $sockets);
}
/**
* Broadcast to all except specified sockets
*/
public function broadcastExcept(
string $event,
array $data,
array $excludeSockets,
string $channel = 'websocket'
): bool {
return $this->send($event, $data, $channel, null, $excludeSockets);
}
/**
* Check if the broadcast socket is available
*/
public function isAvailable(): bool
{
return file_exists($this->socketPath);
}
/**
* Get the socket path
*/
public function getSocketPath(): string
{
return $this->socketPath;
}
/**
* Destructor - clean up socket
*/
public function __destruct()
{
$this->disconnect();
}
}

View File

@ -0,0 +1,201 @@
<?php
declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Broadcast;
use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager;
use Illuminate\Support\Facades\Log;
use React\EventLoop\LoopInterface;
use React\Socket\ConnectionInterface;
use React\Socket\UnixServer;
/**
* Unix domain socket server for receiving broadcast commands.
*
* Runs inside the WebSocket server process and listens for broadcast
* commands from other PHP processes (queue workers, HTTP requests, etc.)
*
* Protocol:
* - Each message is newline-delimited JSON
* - Format: {"channel": "...", "event": "...", "data": {...}, "sockets": [...]}
*/
class BroadcastSocketServer
{
protected LoopInterface $loop;
protected ?UnixServer $server = null;
protected ChannelManager $channelManager;
protected string $socketPath;
/**
* Active client connections
* @var ConnectionInterface[]
*/
protected array $clients = [];
public function __construct(LoopInterface $loop, ChannelManager $channelManager)
{
$this->loop = $loop;
$this->channelManager = $channelManager;
$this->socketPath = config('websockets.broadcast_socket', '/tmp/laravel-websockets-broadcast.sock');
}
/**
* Start the broadcast socket server
*/
public function start(): void
{
// Remove stale socket file if exists
if (file_exists($this->socketPath)) {
unlink($this->socketPath);
}
try {
$this->server = new UnixServer($this->socketPath, $this->loop);
// Set permissions so other processes can connect
chmod($this->socketPath, 0666);
$this->server->on('connection', [$this, 'handleConnection']);
$this->server->on('error', function (\Exception $e) {
Log::error('[BroadcastSocket] Server error: ' . $e->getMessage());
});
Log::info('[BroadcastSocket] Listening on ' . $this->socketPath);
} catch (\Exception $e) {
Log::error('[BroadcastSocket] Failed to start: ' . $e->getMessage());
}
}
/**
* Handle a new client connection
*/
public function handleConnection(ConnectionInterface $connection): void
{
$clientId = spl_object_hash($connection);
$this->clients[$clientId] = $connection;
$buffer = '';
$connection->on('data', function ($data) use ($connection, &$buffer) {
$buffer .= $data;
// Process complete messages (newline-delimited)
while (($pos = strpos($buffer, "\n")) !== false) {
$message = substr($buffer, 0, $pos);
$buffer = substr($buffer, $pos + 1);
if ($message !== '') {
$this->handleMessage($connection, $message);
}
}
});
$connection->on('close', function () use ($clientId) {
unset($this->clients[$clientId]);
});
$connection->on('error', function (\Exception $e) use ($clientId) {
Log::warning('[BroadcastSocket] Client error: ' . $e->getMessage());
unset($this->clients[$clientId]);
});
}
/**
* Handle a broadcast message from a client
*/
protected function handleMessage(ConnectionInterface $connection, string $message): void
{
try {
$payload = json_decode($message, true);
if (!$payload || !isset($payload['event'])) {
$connection->write(json_encode(['success' => false, 'error' => 'Invalid payload']) . "\n");
return;
}
$channel = $payload['channel'] ?? 'websocket';
$event = $payload['event'];
$data = $payload['data'] ?? [];
$sockets = $payload['sockets'] ?? null; // Target specific sockets
$excludeSockets = $payload['exclude_sockets'] ?? []; // Exclude specific sockets
// Get channel instance and broadcast
$channelInstance = $this->channelManager->find('websockets', $channel);
if ($channelInstance) {
$this->broadcastToChannel($channelInstance, $event, $data, $sockets, $excludeSockets);
$connection->write(json_encode(['success' => true]) . "\n");
} else {
// Channel doesn't exist or no subscribers - still success
$connection->write(json_encode(['success' => true, 'warning' => 'No channel subscribers']) . "\n");
}
} catch (\Exception $e) {
Log::error('[BroadcastSocket] Error handling message: ' . $e->getMessage());
$connection->write(json_encode(['success' => false, 'error' => $e->getMessage()]) . "\n");
}
}
/**
* Broadcast to a channel
*/
protected function broadcastToChannel($channel, string $event, array $data, ?array $sockets, array $excludeSockets): void
{
$payload = json_encode([
'event' => $event,
'channel' => $channel->getName(),
'data' => $data,
]);
// Get subscribers
$subscribers = $channel->getSubscribedConnections();
foreach ($subscribers as $connection) {
$socketId = $connection->socketId ?? null;
// Filter by specific sockets if provided
if ($sockets !== null && !in_array($socketId, $sockets)) {
continue;
}
// Exclude specific sockets
if (in_array($socketId, $excludeSockets)) {
continue;
}
$connection->send($payload);
}
}
/**
* Stop the server
*/
public function stop(): void
{
if ($this->server) {
$this->server->close();
$this->server = null;
}
// Clean up socket file
if (file_exists($this->socketPath)) {
unlink($this->socketPath);
}
// Close all client connections
foreach ($this->clients as $client) {
$client->close();
}
$this->clients = [];
}
/**
* Get the socket path
*/
public function getSocketPath(): string
{
return $this->socketPath;
}
}

View File

@ -2,6 +2,7 @@
namespace BlaxSoftware\LaravelWebSockets\Console\Commands; namespace BlaxSoftware\LaravelWebSockets\Console\Commands;
use BlaxSoftware\LaravelWebSockets\Broadcast\BroadcastSocketServer;
use BlaxSoftware\LaravelWebSockets\Cache\IpcCache; use BlaxSoftware\LaravelWebSockets\Cache\IpcCache;
use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager; use BlaxSoftware\LaravelWebSockets\Contracts\ChannelManager;
use BlaxSoftware\LaravelWebSockets\Facades\StatisticsCollector as StatisticsCollectorFacade; use BlaxSoftware\LaravelWebSockets\Facades\StatisticsCollector as StatisticsCollectorFacade;
@ -151,6 +152,10 @@ class StartServer extends Command
$this->configurePcntlSignal(); $this->configurePcntlSignal();
\Log::channel('websocket')->debug('PCNTL signals configured'); \Log::channel('websocket')->debug('PCNTL signals configured');
\Log::channel('websocket')->debug('Configuring broadcast socket...');
$this->configureBroadcastSocket();
\Log::channel('websocket')->debug('Broadcast socket configured');
// $this->configurePongTracker(); // $this->configurePongTracker();
\Log::channel('websocket')->debug('Starting server...'); \Log::channel('websocket')->debug('Starting server...');
@ -317,6 +322,37 @@ class StartServer extends Command
\Log::channel('websocket')->debug('SIGINT handler registered'); \Log::channel('websocket')->debug('SIGINT handler registered');
} }
/**
* Configure the broadcast socket server for efficient broadcasting.
*
* This creates a Unix domain socket that external processes (queue workers,
* HTTP requests, etc.) can connect to for sending broadcasts without the
* overhead of creating new WebSocket connections.
*
* @return void
*/
protected function configureBroadcastSocket(): void
{
if (config('websockets.broadcast_socket_enabled', true) === false) {
\Log::channel('websocket')->debug('Broadcast socket disabled by config');
return;
}
try {
$channelManager = $this->laravel->make(ChannelManager::class);
$broadcastServer = new BroadcastSocketServer($this->loop, $channelManager);
$broadcastServer->start();
// Store reference for cleanup on shutdown
$this->laravel->instance(BroadcastSocketServer::class, $broadcastServer);
$this->components->info('Broadcast socket listening on ' . $broadcastServer->getSocketPath());
} catch (\Throwable $e) {
\Log::channel('websocket')->warning('Failed to start broadcast socket: ' . $e->getMessage());
$this->components->warn('Broadcast socket failed to start: ' . $e->getMessage());
}
}
/** /**
* Configure the tracker that will delete * Configure the tracker that will delete
* from the store the connections that. * from the store the connections that.
@ -666,6 +702,9 @@ class StartServer extends Command
\Log::channel('websocket')->info('Triggering hard shutdown...'); \Log::channel('websocket')->info('Triggering hard shutdown...');
$this->line('Hard shutdown initiated, stopping server immediately...'); $this->line('Hard shutdown initiated, stopping server immediately...');
// Stop the broadcast socket server
$this->stopBroadcastSocket();
$this->loop->stop(); $this->loop->stop();
} }
@ -702,7 +741,29 @@ class StartServer extends Command
}) })
->then(function () { ->then(function () {
\Log::channel('websocket')->debug('All connections closed, stopping loop...'); \Log::channel('websocket')->debug('All connections closed, stopping loop...');
// Stop the broadcast socket server
$this->stopBroadcastSocket();
$this->loop->stop(); $this->loop->stop();
}); });
} }
/**
* Stop the broadcast socket server if running.
*
* @return void
*/
protected function stopBroadcastSocket(): void
{
try {
if ($this->laravel->bound(BroadcastSocketServer::class)) {
$broadcastServer = $this->laravel->make(BroadcastSocketServer::class);
$broadcastServer->stop();
\Log::channel('websocket')->debug('Broadcast socket server stopped');
}
} catch (\Throwable $e) {
\Log::channel('websocket')->warning('Error stopping broadcast socket: ' . $e->getMessage());
}
}
} }

View File

@ -4,12 +4,86 @@ declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Services; namespace BlaxSoftware\LaravelWebSockets\Services;
use BlaxSoftware\LaravelWebSockets\Broadcast\BroadcastClient;
class WebsocketService class WebsocketService
{ {
/**
* Send a message via WebSocket.
*
* Automatically uses the efficient Unix socket broadcast when available,
* falling back to creating a new WebSocket connection when not.
*/
public static function send( public static function send(
string $event, string $event,
mixed $data, mixed $data,
$channel = 'websocket' $channel = 'websocket'
) {
// Try efficient broadcast socket first (Unix socket IPC)
if (ws_available()) {
$success = ws_broadcast($event, is_array($data) ? $data : ['data' => $data], $channel ?? 'websocket');
if ($success) {
return (object)['success' => true, 'method' => 'broadcast_socket'];
}
// Fall through to WebSocket client if broadcast socket fails
}
// Fallback: Create new WebSocket connection (slower, for when broadcast socket not available)
return static::sendViaWebSocket($event, $data, $channel);
}
/**
* Send a message to specific socket IDs only.
*
* @param string $event Event name
* @param mixed $data Event data
* @param array $sockets Target socket IDs
* @param string $channel Channel name
* @return bool Success
*/
public static function whisper(
string $event,
mixed $data,
array $sockets,
string $channel = 'websocket'
): bool {
if (!ws_available()) {
return false;
}
return ws_whisper($event, is_array($data) ? $data : ['data' => $data], $sockets, $channel);
}
/**
* Broadcast to all except specified socket IDs.
*
* @param string $event Event name
* @param mixed $data Event data
* @param array $excludeSockets Socket IDs to exclude
* @param string $channel Channel name
* @return bool Success
*/
public static function broadcastExcept(
string $event,
mixed $data,
array $excludeSockets,
string $channel = 'websocket'
): bool {
if (!ws_available()) {
return false;
}
return ws_broadcast_except($event, is_array($data) ? $data : ['data' => $data], $excludeSockets, $channel);
}
/**
* Send a message by creating a new WebSocket connection.
* This is the legacy method, kept for fallback when broadcast socket is unavailable.
*/
protected static function sendViaWebSocket(
string $event,
mixed $data,
$channel = 'websocket'
) { ) {
$client = new \WebSocket\Client('ws://0.0.0.0:6001/app/'.config('websockets.apps.0.id'), [ $client = new \WebSocket\Client('ws://0.0.0.0:6001/app/'.config('websockets.apps.0.id'), [
'timeout' => 5, 'timeout' => 5,

102
src/helpers_global.php Normal file
View File

@ -0,0 +1,102 @@
<?php
/**
* Global helper functions for WebSocket broadcasting.
*
* These functions provide an efficient way to broadcast messages
* to WebSocket clients from anywhere in your Laravel application.
*/
use BlaxSoftware\LaravelWebSockets\Broadcast\BroadcastClient;
if (!function_exists('ws_broadcast')) {
/**
* Broadcast a message to all clients on a channel.
*
* @param string $event Event name
* @param array $data Event data
* @param string $channel Channel name (default: 'websocket')
* @return bool Success
*
* @example
* // Broadcast to all clients on the default 'websocket' channel
* ws_broadcast('notification', ['message' => 'Hello!']);
*
* // Broadcast to a specific channel
* ws_broadcast('update', ['status' => 'complete'], 'private-user.123');
*/
function ws_broadcast(string $event, array $data, string $channel = 'websocket'): bool
{
return BroadcastClient::instance()->send($event, $data, $channel);
}
}
if (!function_exists('ws_whisper')) {
/**
* Send a message to specific socket IDs only.
*
* @param string $event Event name
* @param array $data Event data
* @param array $sockets Target socket IDs
* @param string $channel Channel name (default: 'websocket')
* @return bool Success
*
* @example
* // Send to specific sockets
* ws_whisper('typing', ['user' => 'John'], ['socket-123', 'socket-456']);
*/
function ws_whisper(string $event, array $data, array $sockets, string $channel = 'websocket'): bool
{
return BroadcastClient::instance()->whisper($event, $data, $sockets, $channel);
}
}
if (!function_exists('ws_broadcast_except')) {
/**
* Broadcast a message to all clients except specified socket IDs.
*
* @param string $event Event name
* @param array $data Event data
* @param array $excludeSockets Socket IDs to exclude
* @param string $channel Channel name (default: 'websocket')
* @return bool Success
*
* @example
* // Broadcast to all except the sender
* ws_broadcast_except('message', ['text' => 'Hi'], [$currentSocketId]);
*/
function ws_broadcast_except(string $event, array $data, array $excludeSockets, string $channel = 'websocket'): bool
{
return BroadcastClient::instance()->broadcastExcept($event, $data, $excludeSockets, $channel);
}
}
if (!function_exists('ws_client')) {
/**
* Get the WebSocket broadcast client instance.
*
* @return BroadcastClient
*
* @example
* // Check if WebSocket server is available
* if (ws_client()->isAvailable()) {
* ws_broadcast('event', $data);
* }
*/
function ws_client(): BroadcastClient
{
return BroadcastClient::instance();
}
}
if (!function_exists('ws_available')) {
/**
* Check if the WebSocket broadcast server is available.
*
* @return bool
*/
function ws_available(): bool
{
return BroadcastClient::instance()->isAvailable();
}
}