From 69584c8fe23f165d212a39d370e096592011fecf Mon Sep 17 00:00:00 2001 From: a6a2f5842 Date: Mon, 15 Sep 2025 14:29:07 +0200 Subject: [PATCH] A whisper --- composer.json | 3 +- src/Services/WebsocketService.php | 47 +++++++++++++++++++++-------- src/Websocket/Controller.php | 40 +++++++++++++++++++++++++ src/Websocket/Handler.php | 50 +++++++++++++++++++++++++++++-- src/Websocket/MockConnection.php | 13 ++++++++ 5 files changed, 137 insertions(+), 16 deletions(-) diff --git a/composer.json b/composer.json index 92dcbeb..bd11402 100644 --- a/composer.json +++ b/composer.json @@ -66,7 +66,8 @@ }, "suggest": { "ext-pcntl": "Running the server needs pcntl to listen to command signals and soft-shutdown.", - "doctrine/dbal": "Required to run database migrations (^2.9|^3.0)." + "doctrine/dbal": "Required to run database migrations (^2.9|^3.0).", + "textalk/websocket": "Required if you want to send data to the server like a client" }, "autoload": { "psr-4": { diff --git a/src/Services/WebsocketService.php b/src/Services/WebsocketService.php index a7147a7..bb123cf 100644 --- a/src/Services/WebsocketService.php +++ b/src/Services/WebsocketService.php @@ -4,20 +4,43 @@ declare(strict_types=1); namespace BlaxSoftware\LaravelWebSockets\Services; -use BlaxSoftware\LaravelWebSockets\Events\WebsocketMessageEvent; - class WebsocketService { - public static function send($data) - { - // TODO make work to send via websocket from anywhere - // WebsocketMessageEvent::dispatch( - // optional(optional(tenant())->tenantable)->public_id, - // $d['event'], - // (is_array($d['data'])) - // ? $d['data'] - // : ['data' => $d['data']] - // ); + public static function send( + string $event, + mixed $data, + $channel = 'websocket' + ) { + $client = new \WebSocket\Client('ws://0.0.0.0:6001/app/'.config('websockets.apps.0.id'), [ + 'timeout' => 5, + 'headers' => [], + ]); + + // Read connection_established + $client->receive(); + + // Subscribe (public channel) + $client->send(json_encode([ + 'event' => 'pusher:subscribe', + 'data' => ['channel' => 'websocket'], + ])); + + // (Optionally read subscription_succeeded) + $client->receive(); + + // Send event to be processed by Handler + $client->send(json_encode([ + 'event' => $event, + 'channel' => $channel ?? 'websocket', + 'data' => $data, + ])); + + // Read any response your controller might send (optional) + $response = $client->receive(); + + $client->close(); + + return json_decode($response); } public static function resetAllTracking() diff --git a/src/Websocket/Controller.php b/src/Websocket/Controller.php index d0f7567..8787af9 100644 --- a/src/Websocket/Controller.php +++ b/src/Websocket/Controller.php @@ -280,7 +280,47 @@ class Controller $including_self ); } + } + final public function whisper( + array|string|null $payload = null, + ?string $event = null, + array $socketIds, + ?string $channel = null + ){ + if (is_string($payload)) { + $payload = [ + 'message' => $payload, + ]; + } + + $channel ??= ($this->channel ? $this->channel->getName() : null); + + $p = [ + 'event' => ($event ?? $this->event), + 'data' => $payload, + 'channel' => $channel, + ]; + + if (get_class($this->connection) !== MockConnection::class) { + if (! $channel) { + $this->error('Channel not found'); + return; + } + + foreach ($this->channel->getConnections() as $channel_conection) { + if (in_array($channel_conection->socketId, $socketIds)) { + $channel_conection->send(json_encode($p)); + } + } + }else{ + $connection = clone $this->connection; + $connection->whisper( + $p, + $socketIds, + $channel + ); + } } protected static function get_uniquifyer($event) diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index cb8d8ad..0bd1dcb 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -112,7 +112,7 @@ class Handler implements MessageComponentInterface return $connection->send(json_encode([ 'event' => $message['event'] . ':error', 'data' => [ - 'message' => 'Channel not found', + 'message' => 'Channel not established', 'meta' => $message, ], ])); @@ -357,11 +357,19 @@ class Handler implements MessageComponentInterface unset($message['data']['channel']); } - $this->channelManager->findOrCreate( + $channel = $this->channelManager->findOrCreate( $connection->app->id, $message['channel'] ); + if (! $channel->hasConnection($connection)) { + try{ + $channel->subscribe($connection, (object) $message); + } catch (\Throwable $e) { + return null; + } + } + return $this->channelManager->find( $connection->app->id, $message['channel'] @@ -370,9 +378,13 @@ class Handler implements MessageComponentInterface protected function handleChannelSubscriptions($message, $connection) { - $channel_name = optional($this->get_connection_channel($connection, $message))->getName() ?? 'no-channel'; + $channel_name = optional($this->get_connection_channel($connection, $message))->getName(); $socket_id = $connection->socketId; + if(! $channel_name) { + return; + } + // if not in $channel_connections add it if (strpos($message['event'], '.subscribe') !== false) { if (! isset($this->channel_connections[$channel_name])) { @@ -392,6 +404,8 @@ class Handler implements MessageComponentInterface 'ws_active_channels', array_keys($this->channel_connections) ); + + $channel->subscribe($connection, $message); } if (strpos($message['event'], '.unsubscribe') !== false) { @@ -546,6 +560,14 @@ class Handler implements MessageComponentInterface $bm['including_self'], $connection ); + } elseif (@$bm['whisper']) { + $this->whisper( + $connection->app->id, + $bm['data'] ?? null, + $bm['event'] ?? null, + $bm['socket_ids'] ?? [], + $bm['channel'] ?? null, + ); } else { $connection->send($sending); } @@ -586,4 +608,26 @@ class Handler implements MessageComponentInterface } } } + + public function whisper( + string $appId, + mixed $payload, + ?string $event = null, + array $socketIds = [], + ?string $channel = null + ): void { + $channel = $this->channelManager->findOrCreate($appId, $channel); + + $p = [ + 'event' => ($event ?? $event), + 'data' => $payload, + 'channel' => $channel->getName(), + ]; + + foreach ($channel->getConnections() as $channel_conection) { + if (in_array($channel_conection->socketId, $socketIds)) { + $channel_conection->send(json_encode($p)); + } + } + } } diff --git a/src/Websocket/MockConnection.php b/src/Websocket/MockConnection.php index af4fd43..7bc6ded 100644 --- a/src/Websocket/MockConnection.php +++ b/src/Websocket/MockConnection.php @@ -82,6 +82,19 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface return $this->send(json_encode($data)); } + public function whisper( + $data, + array $socketIds, + ?string $channel = null, + ) { + $data ??= []; + $data['whisper'] = true; + $data['channel'] ??= $channel; + $data['socket_ids'] = $socketIds; + + return $this->send(json_encode($data)); + } + private static function getDataKey() { $key = 'dedicated_data_' . getmypid();