From 5c7d6a1c1bbf1081c799fdc1eb2a37ed07bad42d Mon Sep 17 00:00:00 2001 From: a6a2f5842 Date: Sat, 13 Sep 2025 19:33:29 +0200 Subject: [PATCH] A broadcasting --- src/Websocket/Controller.php | 30 +++++++++++++++++++ src/Websocket/Handler.php | 50 ++++++++++++++++++++++++++++++-- src/Websocket/MockConnection.php | 13 +++++++++ 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/src/Websocket/Controller.php b/src/Websocket/Controller.php index 075ea8b..c3460d9 100644 --- a/src/Websocket/Controller.php +++ b/src/Websocket/Controller.php @@ -268,6 +268,36 @@ class Controller return true; } + final public function broadcasting( + array|string|null $payload = null, + ?string $event = null, + ?string $channel = null, + bool $including_self = false + ){ + if (is_string($payload)) { + $payload = [ + 'message' => $payload, + ]; + } + + $p = [ + 'event' => ($event ?? $this->event), + 'data' => $payload, + 'channel' => $channel ?? ($this->channel ? $this->channel->getName() : null), + ]; + + if (get_class($this->connection) !== MockConnection::class) { + throw new \Exception('This method is only available in async mode'); + } + + $connection = clone $this->connection; + $connection->broadcast( + $p, + $channel, + $including_self + ); + } + protected static function get_uniquifyer($event) { preg_match('/[\[].*[\]]/', $event, $matches); diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index 1591113..9769efb 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -498,7 +498,7 @@ class Handler implements MessageComponentInterface $connection->send(json_encode([ 'event' => $message['event'].':error', 'data' => [ - 'message' => 'Timeout', + 'message' => $message['event'] . ' timeout', 'diff' => $diff, ], ])); @@ -519,8 +519,23 @@ class Handler implements MessageComponentInterface // Retrieve cached data $sending = @cache()->get($pidcache_data); + // Send the data to client - $connection->send($sending); + if(@$message['broadcast']){ + + $bm = json_decode($sending, true); + + $this->broadcast( + $connection->app->id, + $bm['data'] ?? null, + $bm['event'] ?? null, + $bm['channel'] ?? null, + $bm['including_self'], + $connection + ); + } else{ + $connection->send($sending); + } // Stop periodic check $this->channelManager->loop->cancelTimer($timer); @@ -530,4 +545,35 @@ class Handler implements MessageComponentInterface pcntl_waitpid(-1, $status, WNOHANG); }); } + + public function broadcast( + string $appId, + mixed $payload, + ?string $event = null, + ?string $channel = null, + bool $including_self = false, + $connection = null + ) : void { + + $channel = $this->channelManager->findOrCreate($appId,$channel); + + foreach ($channel->getConnections() as $channel_conection) { + if ($channel_conection !== $connection) { + $channel_conection->send(json_encode([ + 'event' => ($event ?? $event), + 'data' => $payload, + 'channel' => $channel ?? $channel->getName(), + ])); + } + + if ($including_self) { + $connection->send(json_encode([ + 'event' => ($event ?? $event), + 'data' => $payload, + 'channel' => $channel ?? $channel->getName(), + ])); + } + } + } + } diff --git a/src/Websocket/MockConnection.php b/src/Websocket/MockConnection.php index 7673e31..af4fd43 100644 --- a/src/Websocket/MockConnection.php +++ b/src/Websocket/MockConnection.php @@ -69,6 +69,19 @@ class MockConnection extends Connection implements \Ratchet\ConnectionInterface return $this; } + public function broadcast( + $data, + ?string $channel = null, + bool $including_self = false, + ){ + $data ??= []; + $data['broadcast'] = true; + $data['channel'] ??= $channel; + $data['including_self'] = $including_self; + + return $this->send(json_encode($data)); + } + private static function getDataKey() { $key = 'dedicated_data_' . getmypid();