BF broadcasting server

This commit is contained in:
Fabian @ Blax Software 2026-02-03 15:38:57 +01:00
parent 826c198a77
commit b621a69015
1 changed files with 65 additions and 32 deletions

View File

@ -13,6 +13,12 @@ class WebsocketService
* *
* Automatically uses the efficient Unix socket broadcast when available, * Automatically uses the efficient Unix socket broadcast when available,
* falling back to creating a new WebSocket connection when not. * falling back to creating a new WebSocket connection when not.
*
* Supports the legacy 'app.whisp' pattern where data contains:
* - 'event': The actual event name to broadcast
* - 'data': The actual data payload
* - 'sockets': Target socket IDs (optional)
* - 'channel': Target channel (optional)
*/ */
public static function send( public static function send(
string $event, string $event,
@ -21,11 +27,33 @@ class WebsocketService
) { ) {
// Try efficient broadcast socket first (Unix socket IPC) // Try efficient broadcast socket first (Unix socket IPC)
if (ws_available()) { if (ws_available()) {
$success = ws_broadcast($event, is_array($data) ? $data : ['data' => $data], $channel ?? 'websocket'); // Handle legacy 'app.whisp' pattern - extract inner event, data, and sockets
if ($success) { if ($event === 'app.whisp' && is_array($data)) {
return (object)['success' => true, 'method' => 'broadcast_socket']; $innerEvent = $data['event'] ?? 'info:message';
$innerData = $data['data'] ?? [];
$innerChannel = $data['channel'] ?? $channel ?? 'websocket';
$targetSockets = $data['sockets'] ?? null;
if (!empty($targetSockets) && is_array($targetSockets)) {
// Whisper to specific sockets
$success = ws_whisper($innerEvent, $innerData, $targetSockets, $innerChannel);
} else {
// Broadcast to all
$success = ws_broadcast($innerEvent, $innerData, $innerChannel);
}
if ($success) {
return (object)['success' => true, 'method' => 'broadcast_socket'];
}
// Fall through to WebSocket client if broadcast socket fails
} else {
// Regular broadcast
$success = ws_broadcast($event, is_array($data) ? $data : ['data' => $data], $channel ?? 'websocket');
if ($success) {
return (object)['success' => true, 'method' => 'broadcast_socket'];
}
// Fall through to WebSocket client if broadcast socket fails
} }
// Fall through to WebSocket client if broadcast socket fails
} }
// Fallback: Create new WebSocket connection (slower, for when broadcast socket not available) // Fallback: Create new WebSocket connection (slower, for when broadcast socket not available)
@ -85,36 +113,41 @@ class WebsocketService
mixed $data, mixed $data,
$channel = 'websocket' $channel = 'websocket'
) { ) {
$client = new \WebSocket\Client('ws://0.0.0.0:6001/app/'.config('websockets.apps.0.id'), [ try {
'timeout' => 5, $client = new \WebSocket\Client('ws://0.0.0.0:6001/app/' . config('websockets.apps.0.id'), [
'headers' => [], 'timeout' => 5,
]); 'headers' => [],
]);
// Read connection_established // Read connection_established
$client->receive(); $client->receive();
// Subscribe (public channel) // Subscribe (public channel)
$client->send(json_encode([ $client->send(json_encode([
'event' => 'pusher:subscribe', 'event' => 'pusher:subscribe',
'data' => ['channel' => 'websocket'], 'data' => ['channel' => 'websocket'],
])); ]));
// (Optionally read subscription_succeeded) // (Optionally read subscription_succeeded)
$client->receive(); $client->receive();
// Send event to be processed by Handler // Send event to be processed by Handler
$client->send(json_encode([ $client->send(json_encode([
'event' => $event, 'event' => $event,
'channel' => $channel ?? 'websocket', 'channel' => $channel ?? 'websocket',
'data' => $data, 'data' => $data,
])); ]));
// Read any response your controller might send (optional) // Read any response your controller might send (optional)
$response = $client->receive(); $response = $client->receive();
$client->close(); $client->close();
return json_decode($response); return json_decode($response);
} catch (\Exception $e) {
\Log::warning('[WebsocketService] sendViaWebSocket failed: ' . $e->getMessage());
return (object)['success' => false, 'error' => $e->getMessage()];
}
} }
public static function resetAllTracking() public static function resetAllTracking()
@ -142,20 +175,20 @@ class WebsocketService
return $r; return $r;
} }
public static function getChannelConnections(string $channelName) public static function getChannelConnections(string $channelName): array
{ {
$previousCache = config('cache.default'); $previousCache = config('cache.default');
config(['cache.default' => 'file']); config(['cache.default' => 'file']);
$r = cache()->get('ws_channel_connections_' . $channelName); $r = cache()->get('ws_channel_connections_' . $channelName) ?? [];
config(['cache.default' => $previousCache]); config(['cache.default' => $previousCache]);
return $r; return $r;
} }
public static function getActiveChannels() public static function getActiveChannels(): array
{ {
$previousCache = config('cache.default'); $previousCache = config('cache.default');
config(['cache.default' => 'file']); config(['cache.default' => 'file']);
$r = cache()->get('ws_active_channels'); $r = cache()->get('ws_active_channels') ?? [];
config(['cache.default' => $previousCache]); config(['cache.default' => $previousCache]);
return $r; return $r;
} }
@ -169,7 +202,7 @@ class WebsocketService
return $r; return $r;
} }
public static function getAuthedUsers() public static function getAuthedUsers(): array
{ {
$previousCache = config('cache.default'); $previousCache = config('cache.default');
config(['cache.default' => 'file']); config(['cache.default' => 'file']);