BF channel subscription

This commit is contained in:
a6a2f5842 2025-09-15 16:22:59 +02:00
parent 4e7629e1d7
commit fc23dff3d9
1 changed files with 16 additions and 17 deletions

View File

@ -106,13 +106,13 @@ class Handler implements MessageComponentInterface
return gc_collect_cycles(); return gc_collect_cycles();
} }
$this->handleChannelSubscriptions($message, $connection); $channel = $this->handleChannelSubscriptions($message, $connection);
if (! $channel = $this->get_connection_channel($connection, $message)) { if (! $channel->hasConnection($connection) ) {
return $connection->send(json_encode([ return $connection->send(json_encode([
'event' => $message['event'] . ':error', 'event' => $message['event'] . ':error',
'data' => [ 'data' => [
'message' => 'Channel not established', 'message' => 'Subscription not established',
'meta' => $message, 'meta' => $message,
], ],
])); ]));
@ -362,27 +362,20 @@ class Handler implements MessageComponentInterface
$message['channel'] $message['channel']
); );
if (! $channel->hasConnection($connection)) {
try{
$channel->subscribe($connection, (object) $message);
} catch (\Throwable $e) {
return null;
}
}
return $this->channelManager->find( return $this->channelManager->find(
$connection->app->id, $connection->app->id,
$message['channel'] $message['channel']
); );
} }
protected function handleChannelSubscriptions($message, $connection) protected function handleChannelSubscriptions($message, $connection) : ?Channel
{ {
$channel_name = optional($this->get_connection_channel($connection, $message))->getName(); $channel = $this->get_connection_channel($connection, $message);
$channel_name = optional($channel)->getName();
$socket_id = $connection->socketId; $socket_id = $connection->socketId;
if(! $channel_name) { if(! $channel_name || ! $channel) {
return; return null;
} }
// if not in $channel_connections add it // if not in $channel_connections add it
@ -405,7 +398,13 @@ class Handler implements MessageComponentInterface
array_keys($this->channel_connections) array_keys($this->channel_connections)
); );
$channel->subscribe($connection, $message); if (! $channel->hasConnection($connection)) {
try{
$channel->subscribe($connection, (object) $message);
} catch (\Throwable $e) {
return null;
}
}
} }
if (strpos($message['event'], '.unsubscribe') !== false) { if (strpos($message['event'], '.unsubscribe') !== false) {
@ -434,7 +433,7 @@ class Handler implements MessageComponentInterface
Log::channel('websocket')->info('Tenant left', ['socketId' => $socket_id, 'channel' => $channel_name]); Log::channel('websocket')->info('Tenant left', ['socketId' => $socket_id, 'channel' => $channel_name]);
} }
return $this; return $channel;
} }
protected function setRequest($message, $connection) protected function setRequest($message, $connection)