I support default Channel

This commit is contained in:
a6a2f5842 2025-09-14 15:00:27 +02:00
parent 6031cdf9c5
commit bc6abf6841
3 changed files with 60 additions and 40 deletions

View File

@ -19,4 +19,28 @@ class WebsocketService
// : ['data' => $d['data']] // : ['data' => $d['data']]
// ); // );
} }
function getTenantable(string $socketId)
{
config(['cache.default' => 'file']);
return cache()->get('ws_socket_tenantable_' . $socketId);
}
public static function getChannelConnections(string $channelName)
{
config(['cache.default' => 'file']);
return cache()->get('ws_channel_connections_' . $channelName);
}
public static function getActiveChannels()
{
config(['cache.default' => 'file']);
return cache()->get('ws_active_channels');
}
public static function getConnection(string $socketId)
{
config(['cache.default' => 'file']);
return cache()->get('ws_connection_' . $socketId);
}
} }

View File

@ -23,7 +23,7 @@ class Controller
public static function controll_message( public static function controll_message(
ConnectionInterface $connection, ConnectionInterface $connection,
PrivateChannel $channel, PrivateChannel|Channel|PresenceChannel $channel,
array $message, array $message,
LocalChannelManager|RedisChannelManager $channelManager LocalChannelManager|RedisChannelManager $channelManager
) { ) {

View File

@ -41,7 +41,7 @@ class Handler implements MessageComponentInterface
public function onOpen(ConnectionInterface $connection) public function onOpen(ConnectionInterface $connection)
{ {
try{ try {
if (! $this->connectionCanBeMade($connection)) { if (! $this->connectionCanBeMade($connection)) {
return $connection->close(); return $connection->close();
} }
@ -71,8 +71,8 @@ class Handler implements MessageComponentInterface
$connection->socketId $connection->socketId
); );
} }
}catch (UnknownAppKey $e) { } catch (UnknownAppKey $e) {
Log::channel('websocket')->error('Root level error: '. $e->getMessage(), [ Log::channel('websocket')->error('Root level error: ' . $e->getMessage(), [
'file' => $e->getFile(), 'file' => $e->getFile(),
'line' => $e->getLine(), 'line' => $e->getLine(),
'trace' => $e->getTraceAsString(), 'trace' => $e->getTraceAsString(),
@ -102,6 +102,7 @@ class Handler implements MessageComponentInterface
// Cut short for ping pong // Cut short for ping pong
if (strpos($message['event'], ':ping') !== false) { if (strpos($message['event'], ':ping') !== false) {
$this->channelManager->connectionPonged($connection);
return gc_collect_cycles(); return gc_collect_cycles();
} }
@ -109,7 +110,7 @@ class Handler implements MessageComponentInterface
if (! $channel = $this->get_connection_channel($connection, $message)) { if (! $channel = $this->get_connection_channel($connection, $message)) {
return $connection->send(json_encode([ return $connection->send(json_encode([
'event' => $message['event'].':error', 'event' => $message['event'] . ':error',
'data' => [ 'data' => [
'message' => 'Channel not found', 'message' => 'Channel not found',
'meta' => $message, 'meta' => $message,
@ -119,11 +120,11 @@ class Handler implements MessageComponentInterface
$this->authenticateConnection($connection, $channel, $message); $this->authenticateConnection($connection, $channel, $message);
Log::channel('websocket')->info('Executing event: '.$message['event']); Log::channel('websocket')->info('Executing event: ' . $message['event']);
if (strpos($message['event'], 'pusher') !== false) { if (strpos($message['event'], 'pusher') !== false) {
return $connection->send(json_encode([ return $connection->send(json_encode([
'event' => $message['event'].':response', 'event' => $message['event'] . ':response',
'data' => [ 'data' => [
'message' => 'Success', 'message' => 'Success',
], ],
@ -154,7 +155,7 @@ class Handler implements MessageComponentInterface
->invokeWhen(fn($callback) => true); ->invokeWhen(fn($callback) => true);
} catch (Exception $e) { } catch (Exception $e) {
$mock->send(json_encode([ $mock->send(json_encode([
'event' => $message['event'].':error', 'event' => $message['event'] . ':error',
'data' => [ 'data' => [
'message' => $e->getMessage(), 'message' => $e->getMessage(),
], ],
@ -166,7 +167,7 @@ class Handler implements MessageComponentInterface
$this->addDataCheckLoop($connection, $message, $pid); $this->addDataCheckLoop($connection, $message, $pid);
} }
} catch (\Throwable $e) { } catch (\Throwable $e) {
Log::channel('websocket')->error('onMessage unhandled error: '. $e->getMessage(), [ Log::channel('websocket')->error('onMessage unhandled error: ' . $e->getMessage(), [
'file' => $e->getFile(), 'file' => $e->getFile(),
'line' => $e->getLine(), 'line' => $e->getLine(),
'trace' => $e->getTraceAsString(), 'trace' => $e->getTraceAsString(),
@ -344,7 +345,7 @@ class Handler implements MessageComponentInterface
return $this; return $this;
} }
protected function get_connection_channel(&$connection, &$message): ?PrivateChannel protected function get_connection_channel(&$connection, &$message): ?Channel
{ {
// Put channel on its place // Put channel on its place
if (! @$message['channel'] && $message['data'] && $message['data']['channel']) { if (! @$message['channel'] && $message['data'] && $message['data']['channel']) {
@ -379,7 +380,7 @@ class Handler implements MessageComponentInterface
} }
cache()->forever( cache()->forever(
'ws_channel_connections_'.$channel_name, 'ws_channel_connections_' . $channel_name,
$this->channel_connections[$channel_name] $this->channel_connections[$channel_name]
); );
@ -400,11 +401,11 @@ class Handler implements MessageComponentInterface
if (@$this->channel_connections[$channel_name]) { if (@$this->channel_connections[$channel_name]) {
cache()->forever( cache()->forever(
'ws_channel_connections_'.$channel_name, 'ws_channel_connections_' . $channel_name,
$this->channel_connections[$channel_name] $this->channel_connections[$channel_name]
); );
} else { } else {
cache()->forget('ws_channel_connections_'.$channel_name); cache()->forget('ws_channel_connections_' . $channel_name);
} }
cache()->forever( cache()->forever(
@ -433,9 +434,9 @@ class Handler implements MessageComponentInterface
$message $message
) { ) {
if (! optional($connection)->auth && $connection->socketId && cache()->get('socket_'.$connection->socketId)) { if (! optional($connection)->auth && $connection->socketId && cache()->get('socket_' . $connection->socketId)) {
$cached_auth = cache()->get('socket_'.$connection->socketId); $cached_auth = cache()->get('socket_' . $connection->socketId);
$connection->user = @$cached_auth['type']::find($cached_auth['id']); $connection->user = @$cached_auth['type']::find($cached_auth['id']);
@ -467,14 +468,14 @@ class Handler implements MessageComponentInterface
$optional = false, $optional = false,
$iteration = false $iteration = false
) { ) {
$pid = explode('_', $pid.'')[0]; $pid = explode('_', $pid . '')[0];
if ($iteration >= 0 && $iteration !== false) { if ($iteration >= 0 && $iteration !== false) {
$pid .= '_'.$iteration; $pid .= '_' . $iteration;
} }
// Set timeout start // Set timeout start
$pidcache_start = 'dedicated_start_'.$pid; $pidcache_start = 'dedicated_start_' . $pid;
cache()->put($pidcache_start, microtime(true), 100); cache()->put($pidcache_start, microtime(true), 100);
// Periodic check for data // Periodic check for data
@ -486,9 +487,9 @@ class Handler implements MessageComponentInterface
$optional, $optional,
$iteration $iteration
) { ) {
$pidcache_data = 'dedicated_data_'.$pid; $pidcache_data = 'dedicated_data_' . $pid;
$pidcache_done = 'dedicated_data_'.$pid.'_done'; $pidcache_done = 'dedicated_data_' . $pid . '_done';
$pidcache_complete = 'dedicated_data_'.$pid.'_complete'; $pidcache_complete = 'dedicated_data_' . $pid . '_complete';
if ( if (
cache()->has($pidcache_start) cache()->has($pidcache_start)
@ -496,7 +497,7 @@ class Handler implements MessageComponentInterface
) { ) {
if (! $optional) { if (! $optional) {
$connection->send(json_encode([ $connection->send(json_encode([
'event' => $message['event'].':error', 'event' => $message['event'] . ':error',
'data' => [ 'data' => [
'message' => $message['event'] . ' timeout', 'message' => $message['event'] . ' timeout',
'diff' => $diff, 'diff' => $diff,
@ -518,13 +519,11 @@ class Handler implements MessageComponentInterface
// Retrieve cached data // Retrieve cached data
$sending = @cache()->get($pidcache_data); $sending = @cache()->get($pidcache_data);
$bm = json_decode($sending, true);
// Send the data to client // Send the data to client
if(@$message['broadcast']){ if (@$bm['broadcast']) {
$bm = json_decode($sending, true);
$this->broadcast( $this->broadcast(
$connection->app->id, $connection->app->id,
$bm['data'] ?? null, $bm['data'] ?? null,
@ -533,7 +532,7 @@ class Handler implements MessageComponentInterface
$bm['including_self'], $bm['including_self'],
$connection $connection
); );
} else{ } else {
$connection->send($sending); $connection->send($sending);
} }
@ -553,27 +552,24 @@ class Handler implements MessageComponentInterface
?string $channel = null, ?string $channel = null,
bool $including_self = false, bool $including_self = false,
$connection = null $connection = null
) : void { ): void {
$channel = $this->channelManager->findOrCreate($appId,$channel); $channel = $this->channelManager->findOrCreate($appId, $channel);
$p = [
'event' => ($event ?? $event),
'data' => $payload,
'channel' => $channel->getName(),
];
foreach ($channel->getConnections() as $channel_conection) { foreach ($channel->getConnections() as $channel_conection) {
if ($channel_conection !== $connection) { if ($channel_conection !== $connection) {
$channel_conection->send(json_encode([ $channel_conection->send(json_encode($p));
'event' => ($event ?? $event),
'data' => $payload,
'channel' => $channel->getName(),
]));
} }
if ($including_self) { if ($including_self) {
$connection->send(json_encode([ $connection->send(json_encode($p));
'event' => ($event ?? $event),
'data' => $payload,
'channel' => $channel->getName(),
]));
} }
} }
} }
} }