A whisper

This commit is contained in:
a6a2f5842 2025-09-15 14:29:07 +02:00
parent ba53960551
commit 69584c8fe2
5 changed files with 137 additions and 16 deletions

View File

@ -66,7 +66,8 @@
}, },
"suggest": { "suggest": {
"ext-pcntl": "Running the server needs pcntl to listen to command signals and soft-shutdown.", "ext-pcntl": "Running the server needs pcntl to listen to command signals and soft-shutdown.",
"doctrine/dbal": "Required to run database migrations (^2.9|^3.0)." "doctrine/dbal": "Required to run database migrations (^2.9|^3.0).",
"textalk/websocket": "Required if you want to send data to the server like a client"
}, },
"autoload": { "autoload": {
"psr-4": { "psr-4": {

View File

@ -4,20 +4,43 @@ declare(strict_types=1);
namespace BlaxSoftware\LaravelWebSockets\Services; namespace BlaxSoftware\LaravelWebSockets\Services;
use BlaxSoftware\LaravelWebSockets\Events\WebsocketMessageEvent;
class WebsocketService class WebsocketService
{ {
public static function send($data) public static function send(
{ string $event,
// TODO make work to send via websocket from anywhere mixed $data,
// WebsocketMessageEvent::dispatch( $channel = 'websocket'
// optional(optional(tenant())->tenantable)->public_id, ) {
// $d['event'], $client = new \WebSocket\Client('ws://0.0.0.0:6001/app/'.config('websockets.apps.0.id'), [
// (is_array($d['data'])) 'timeout' => 5,
// ? $d['data'] 'headers' => [],
// : ['data' => $d['data']] ]);
// );
// Read connection_established
$client->receive();
// Subscribe (public channel)
$client->send(json_encode([
'event' => 'pusher:subscribe',
'data' => ['channel' => 'websocket'],
]));
// (Optionally read subscription_succeeded)
$client->receive();
// Send event to be processed by Handler
$client->send(json_encode([
'event' => $event,
'channel' => $channel ?? 'websocket',
'data' => $data,
]));
// Read any response your controller might send (optional)
$response = $client->receive();
$client->close();
return json_decode($response);
} }
public static function resetAllTracking() public static function resetAllTracking()

View File

@ -280,7 +280,47 @@ class Controller
$including_self $including_self
); );
} }
}
final public function whisper(
array|string|null $payload = null,
?string $event = null,
array $socketIds,
?string $channel = null
){
if (is_string($payload)) {
$payload = [
'message' => $payload,
];
}
$channel ??= ($this->channel ? $this->channel->getName() : null);
$p = [
'event' => ($event ?? $this->event),
'data' => $payload,
'channel' => $channel,
];
if (get_class($this->connection) !== MockConnection::class) {
if (! $channel) {
$this->error('Channel not found');
return;
}
foreach ($this->channel->getConnections() as $channel_conection) {
if (in_array($channel_conection->socketId, $socketIds)) {
$channel_conection->send(json_encode($p));
}
}
}else{
$connection = clone $this->connection;
$connection->whisper(
$p,
$socketIds,
$channel
);
}
} }
protected static function get_uniquifyer($event) protected static function get_uniquifyer($event)

View File

@ -112,7 +112,7 @@ class Handler implements MessageComponentInterface
return $connection->send(json_encode([ return $connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'] . ':error',
'data' => [ 'data' => [
'message' => 'Channel not found', 'message' => 'Channel not established',
'meta' => $message, 'meta' => $message,
], ],
])); ]));
@ -357,11 +357,19 @@ class Handler implements MessageComponentInterface
unset($message['data']['channel']); unset($message['data']['channel']);
} }
$this->channelManager->findOrCreate( $channel = $this->channelManager->findOrCreate(
$connection->app->id, $connection->app->id,
$message['channel'] $message['channel']
); );
if (! $channel->hasConnection($connection)) {
try{
$channel->subscribe($connection, (object) $message);
} catch (\Throwable $e) {
return null;
}
}
return $this->channelManager->find( return $this->channelManager->find(
$connection->app->id, $connection->app->id,
$message['channel'] $message['channel']
@ -370,9 +378,13 @@ class Handler implements MessageComponentInterface
protected function handleChannelSubscriptions($message, $connection) protected function handleChannelSubscriptions($message, $connection)
{ {
$channel_name = optional($this->get_connection_channel($connection, $message))->getName() ?? 'no-channel'; $channel_name = optional($this->get_connection_channel($connection, $message))->getName();
$socket_id = $connection->socketId; $socket_id = $connection->socketId;
if(! $channel_name) {
return;
}
// if not in $channel_connections add it // if not in $channel_connections add it
if (strpos($message['event'], '.subscribe') !== false) { if (strpos($message['event'], '.subscribe') !== false) {
if (! isset($this->channel_connections[$channel_name])) { if (! isset($this->channel_connections[$channel_name])) {
@ -392,6 +404,8 @@ class Handler implements MessageComponentInterface
'ws_active_channels', 'ws_active_channels',
array_keys($this->channel_connections) array_keys($this->channel_connections)
); );
$channel->subscribe($connection, $message);
} }
if (strpos($message['event'], '.unsubscribe') !== false) { if (strpos($message['event'], '.unsubscribe') !== false) {
@ -546,6 +560,14 @@ class Handler implements MessageComponentInterface
$bm['including_self'], $bm['including_self'],
$connection $connection
); );
} elseif (@$bm['whisper']) {
$this->whisper(
$connection->app->id,
$bm['data'] ?? null,
$bm['event'] ?? null,
$bm['socket_ids'] ?? [],
$bm['channel'] ?? null,
);
} else { } else {
$connection->send($sending); $connection->send($sending);
} }
@ -586,4 +608,26 @@ class Handler implements MessageComponentInterface
} }
} }
} }
public function whisper(
string $appId,
mixed $payload,
?string $event = null,
array $socketIds = [],
?string $channel = null
): void {
$channel = $this->channelManager->findOrCreate($appId, $channel);
$p = [
'event' => ($event ?? $event),
'data' => $payload,
'channel' => $channel->getName(),
];
foreach ($channel->getConnections() as $channel_conection) {
if (in_array($channel_conection->socketId, $socketIds)) {
$channel_conection->send(json_encode($p));
}
}
}
} }

View File

@ -82,6 +82,19 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface
return $this->send(json_encode($data)); return $this->send(json_encode($data));
} }
public function whisper(
$data,
array $socketIds,
?string $channel = null,
) {
$data ??= [];
$data['whisper'] = true;
$data['channel'] ??= $channel;
$data['socket_ids'] = $socketIds;
return $this->send(json_encode($data));
}
private static function getDataKey() private static function getDataKey()
{ {
$key = 'dedicated_data_' . getmypid(); $key = 'dedicated_data_' . getmypid();