BF whisper, broadcast

This commit is contained in:
Fabian @ Blax Software 2026-02-02 11:24:54 +01:00
parent 61d4eb282e
commit f7e08a337b
3 changed files with 72 additions and 19 deletions

View File

@ -15,7 +15,7 @@ use Illuminate\Support\Facades\Log;
class Controller class Controller
{ {
protected bool $isMockConnection; protected bool $isMockConnection;
protected ?MockConnection $mockConnectionClone = null; protected MockConnection|MockConnectionSocketPair|null $mockConnectionClone = null;
final public function __construct( final public function __construct(
protected ConnectionInterface $connection, protected ConnectionInterface $connection,
@ -24,7 +24,9 @@ class Controller
protected LocalChannelManager|RedisChannelManager $channelManager protected LocalChannelManager|RedisChannelManager $channelManager
) { ) {
// Cache class check to avoid repeated get_class() calls (reflection is slow) // Cache class check to avoid repeated get_class() calls (reflection is slow)
$this->isMockConnection = get_class($connection) === MockConnection::class; $connectionClass = get_class($connection);
$this->isMockConnection = $connectionClass === MockConnection::class
|| $connectionClass === MockConnectionSocketPair::class;
// Pre-clone MockConnection once if needed (reuse across method calls) // Pre-clone MockConnection once if needed (reuse across method calls)
if ($this->isMockConnection) { if ($this->isMockConnection) {
@ -319,22 +321,28 @@ class Controller
]; ];
if (!$this->isMockConnection) { if (!$this->isMockConnection) {
if (! $channel) {
$this->error('Channel not found');
return;
}
// Pre-encode ONCE for all matching sockets // Pre-encode ONCE for all matching sockets
$encoded = json_encode($p); $encoded = json_encode($p);
// Use array_flip for O(1) lookup instead of O(n) in_array // Use array_flip for O(1) lookup instead of O(n) in_array
$socketIdLookup = array_flip($socketIds); $socketIdLookup = array_flip($socketIds);
$sentTo = [];
foreach ($this->channel->getConnections() as $channel_conection) { // Search ALL connections across ALL channels to find target socket IDs
if (isset($socketIdLookup[$channel_conection->socketId])) { // This is necessary because whisper targets specific sockets regardless of channel
$channel_conection->send($encoded); $this->channelManager->getLocalConnections()->then(function ($connections) use ($socketIdLookup, $encoded, &$sentTo) {
foreach ($connections as $connection) {
// Skip if already sent to this socket (can appear in multiple channels)
if (isset($sentTo[$connection->socketId])) {
continue;
}
if (isset($socketIdLookup[$connection->socketId])) {
$connection->send($encoded);
$sentTo[$connection->socketId] = true;
} }
} }
});
} else { } else {
$this->mockConnectionClone->whisper( $this->mockConnectionClone->whisper(
$p, $p,

View File

@ -1210,21 +1210,32 @@ class Handler implements MessageComponentInterface
mixed $payload, mixed $payload,
?string $event = null, ?string $event = null,
array $socketIds = [], array $socketIds = [],
?string $channel = null ?string $channelName = null
): void { ): void {
$channel = $this->channelManager->findOrCreate($appId, $channel);
$p = [ $p = [
'event' => ($event ?? $event), 'event' => ($event ?? $event),
'data' => $payload, 'data' => $payload,
'channel' => $channel->getName(), 'channel' => $channelName,
]; ];
$socketIdLookup = array_flip($socketIds); $socketIdLookup = array_flip($socketIds);
foreach ($channel->getConnections() as $channel_conection) { $encoded = json_encode($p);
if (isset($socketIdLookup[$channel_conection->socketId])) { $sentTo = [];
$channel_conection->send(json_encode($p));
// Search ALL connections across ALL channels to find target socket IDs
// This is necessary because whisper targets specific sockets regardless of channel
$this->channelManager->getLocalConnections()->then(function ($connections) use ($socketIdLookup, $encoded, &$sentTo) {
foreach ($connections as $connection) {
// Skip if already sent to this socket (can appear in multiple channels)
if (isset($sentTo[$connection->socketId])) {
continue;
}
if (isset($socketIdLookup[$connection->socketId])) {
$connection->send($encoded);
$sentTo[$connection->socketId] = true;
} }
} }
});
} }
} }

View File

@ -34,6 +34,40 @@ class MockConnectionSocketPair implements ConnectionInterface
return $this; return $this;
} }
/**
* Broadcast a message to all connections in a channel.
* Serializes the data for the parent process to handle.
*/
public function broadcast(
$data,
?string $channel = null,
bool $including_self = false,
): self {
$data ??= [];
$data['broadcast'] = true;
$data['channel'] ??= $channel;
$data['including_self'] = $including_self;
return $this->send(json_encode($data));
}
/**
* Whisper a message to specific socket IDs.
* Serializes the data for the parent process to handle.
*/
public function whisper(
$data,
array $socketIds,
?string $channel = null,
): self {
$data ??= [];
$data['whisper'] = true;
$data['channel'] ??= $channel;
$data['socket_ids'] = $socketIds;
return $this->send(json_encode($data));
}
public function close(): void public function close(): void
{ {
// No-op for mock // No-op for mock