From bc6abf6841f327fc18029f8f4a89d74e3e511927 Mon Sep 17 00:00:00 2001 From: a6a2f5842 Date: Sun, 14 Sep 2025 15:00:27 +0200 Subject: [PATCH] I support default Channel --- src/Services/WebsocketService.php | 24 ++++++++++ src/Websocket/Controller.php | 2 +- src/Websocket/Handler.php | 74 +++++++++++++++---------------- 3 files changed, 60 insertions(+), 40 deletions(-) diff --git a/src/Services/WebsocketService.php b/src/Services/WebsocketService.php index e76e5e3..3b990c2 100644 --- a/src/Services/WebsocketService.php +++ b/src/Services/WebsocketService.php @@ -19,4 +19,28 @@ class WebsocketService // : ['data' => $d['data']] // ); } + + function getTenantable(string $socketId) + { + config(['cache.default' => 'file']); + return cache()->get('ws_socket_tenantable_' . $socketId); + } + + public static function getChannelConnections(string $channelName) + { + config(['cache.default' => 'file']); + return cache()->get('ws_channel_connections_' . $channelName); + } + + public static function getActiveChannels() + { + config(['cache.default' => 'file']); + return cache()->get('ws_active_channels'); + } + + public static function getConnection(string $socketId) + { + config(['cache.default' => 'file']); + return cache()->get('ws_connection_' . $socketId); + } } diff --git a/src/Websocket/Controller.php b/src/Websocket/Controller.php index 6e5d6ce..d0f7567 100644 --- a/src/Websocket/Controller.php +++ b/src/Websocket/Controller.php @@ -23,7 +23,7 @@ class Controller public static function controll_message( ConnectionInterface $connection, - PrivateChannel $channel, + PrivateChannel|Channel|PresenceChannel $channel, array $message, LocalChannelManager|RedisChannelManager $channelManager ) { diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php index eb5b907..595ceb0 100644 --- a/src/Websocket/Handler.php +++ b/src/Websocket/Handler.php @@ -41,7 +41,7 @@ class Handler implements MessageComponentInterface public function onOpen(ConnectionInterface $connection) { - try{ + try { if (! $this->connectionCanBeMade($connection)) { return $connection->close(); } @@ -71,8 +71,8 @@ class Handler implements MessageComponentInterface $connection->socketId ); } - }catch (UnknownAppKey $e) { - Log::channel('websocket')->error('Root level error: '. $e->getMessage(), [ + } catch (UnknownAppKey $e) { + Log::channel('websocket')->error('Root level error: ' . $e->getMessage(), [ 'file' => $e->getFile(), 'line' => $e->getLine(), 'trace' => $e->getTraceAsString(), @@ -102,6 +102,7 @@ class Handler implements MessageComponentInterface // Cut short for ping pong if (strpos($message['event'], ':ping') !== false) { + $this->channelManager->connectionPonged($connection); return gc_collect_cycles(); } @@ -109,7 +110,7 @@ class Handler implements MessageComponentInterface if (! $channel = $this->get_connection_channel($connection, $message)) { return $connection->send(json_encode([ - 'event' => $message['event'].':error', + 'event' => $message['event'] . ':error', 'data' => [ 'message' => 'Channel not found', 'meta' => $message, @@ -119,11 +120,11 @@ class Handler implements MessageComponentInterface $this->authenticateConnection($connection, $channel, $message); - Log::channel('websocket')->info('Executing event: '.$message['event']); + Log::channel('websocket')->info('Executing event: ' . $message['event']); if (strpos($message['event'], 'pusher') !== false) { return $connection->send(json_encode([ - 'event' => $message['event'].':response', + 'event' => $message['event'] . ':response', 'data' => [ 'message' => 'Success', ], @@ -154,7 +155,7 @@ class Handler implements MessageComponentInterface ->invokeWhen(fn($callback) => true); } catch (Exception $e) { $mock->send(json_encode([ - 'event' => $message['event'].':error', + 'event' => $message['event'] . ':error', 'data' => [ 'message' => $e->getMessage(), ], @@ -166,7 +167,7 @@ class Handler implements MessageComponentInterface $this->addDataCheckLoop($connection, $message, $pid); } } catch (\Throwable $e) { - Log::channel('websocket')->error('onMessage unhandled error: '. $e->getMessage(), [ + Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [ 'file' => $e->getFile(), 'line' => $e->getLine(), 'trace' => $e->getTraceAsString(), @@ -344,7 +345,7 @@ class Handler implements MessageComponentInterface return $this; } - protected function get_connection_channel(&$connection, &$message): ?PrivateChannel + protected function get_connection_channel(&$connection, &$message): ?Channel { // Put channel on its place if (! @$message['channel'] && $message['data'] && $message['data']['channel']) { @@ -379,7 +380,7 @@ class Handler implements MessageComponentInterface } cache()->forever( - 'ws_channel_connections_'.$channel_name, + 'ws_channel_connections_' . $channel_name, $this->channel_connections[$channel_name] ); @@ -400,11 +401,11 @@ class Handler implements MessageComponentInterface if (@$this->channel_connections[$channel_name]) { cache()->forever( - 'ws_channel_connections_'.$channel_name, + 'ws_channel_connections_' . $channel_name, $this->channel_connections[$channel_name] ); } else { - cache()->forget('ws_channel_connections_'.$channel_name); + cache()->forget('ws_channel_connections_' . $channel_name); } cache()->forever( @@ -433,9 +434,9 @@ class Handler implements MessageComponentInterface $message ) { - if (! optional($connection)->auth && $connection->socketId && cache()->get('socket_'.$connection->socketId)) { + if (! optional($connection)->auth && $connection->socketId && cache()->get('socket_' . $connection->socketId)) { - $cached_auth = cache()->get('socket_'.$connection->socketId); + $cached_auth = cache()->get('socket_' . $connection->socketId); $connection->user = @$cached_auth['type']::find($cached_auth['id']); @@ -467,14 +468,14 @@ class Handler implements MessageComponentInterface $optional = false, $iteration = false ) { - $pid = explode('_', $pid.'')[0]; + $pid = explode('_', $pid . '')[0]; if ($iteration >= 0 && $iteration !== false) { - $pid .= '_'.$iteration; + $pid .= '_' . $iteration; } // Set timeout start - $pidcache_start = 'dedicated_start_'.$pid; + $pidcache_start = 'dedicated_start_' . $pid; cache()->put($pidcache_start, microtime(true), 100); // Periodic check for data @@ -486,9 +487,9 @@ class Handler implements MessageComponentInterface $optional, $iteration ) { - $pidcache_data = 'dedicated_data_'.$pid; - $pidcache_done = 'dedicated_data_'.$pid.'_done'; - $pidcache_complete = 'dedicated_data_'.$pid.'_complete'; + $pidcache_data = 'dedicated_data_' . $pid; + $pidcache_done = 'dedicated_data_' . $pid . '_done'; + $pidcache_complete = 'dedicated_data_' . $pid . '_complete'; if ( cache()->has($pidcache_start) @@ -496,7 +497,7 @@ class Handler implements MessageComponentInterface ) { if (! $optional) { $connection->send(json_encode([ - 'event' => $message['event'].':error', + 'event' => $message['event'] . ':error', 'data' => [ 'message' => $message['event'] . ' timeout', 'diff' => $diff, @@ -518,13 +519,11 @@ class Handler implements MessageComponentInterface // Retrieve cached data $sending = @cache()->get($pidcache_data); + $bm = json_decode($sending, true); // Send the data to client - if(@$message['broadcast']){ - - $bm = json_decode($sending, true); - + if (@$bm['broadcast']) { $this->broadcast( $connection->app->id, $bm['data'] ?? null, @@ -533,7 +532,7 @@ class Handler implements MessageComponentInterface $bm['including_self'], $connection ); - } else{ + } else { $connection->send($sending); } @@ -553,27 +552,24 @@ class Handler implements MessageComponentInterface ?string $channel = null, bool $including_self = false, $connection = null - ) : void { + ): void { - $channel = $this->channelManager->findOrCreate($appId,$channel); + $channel = $this->channelManager->findOrCreate($appId, $channel); + + $p = [ + 'event' => ($event ?? $event), + 'data' => $payload, + 'channel' => $channel->getName(), + ]; foreach ($channel->getConnections() as $channel_conection) { if ($channel_conection !== $connection) { - $channel_conection->send(json_encode([ - 'event' => ($event ?? $event), - 'data' => $payload, - 'channel' => $channel->getName(), - ])); + $channel_conection->send(json_encode($p)); } if ($including_self) { - $connection->send(json_encode([ - 'event' => ($event ?? $event), - 'data' => $payload, - 'channel' => $channel->getName(), - ])); + $connection->send(json_encode($p)); } } } - }