diff --git a/src/PubSub/Redis/RedisClient.php b/src/PubSub/Redis/RedisClient.php index 6634ecd..9e6048c 100644 --- a/src/PubSub/Redis/RedisClient.php +++ b/src/PubSub/Redis/RedisClient.php @@ -78,7 +78,6 @@ class RedisClient implements ReplicationInterface * * @param string $redisChannel * @param string $payload - * @return bool */ protected function onMessage(string $redisChannel, string $payload) { @@ -86,32 +85,38 @@ class RedisClient implements ReplicationInterface // Ignore messages sent by ourselves if (isset($payload->serverId) && $this->serverId === $payload->serverId) { - return false; + return; } - // We need to put the channel name in the payload - $payload->channel = $redisChannel; + // Pull out the app ID. See RedisPusherBroadcaster + $appId = $payload->appId; + + // We need to put the channel name in the payload. + // We strip the app ID from the channel name, websocket clients + // expect the channel name to not include the app ID. + $payload->channel = Str::after($redisChannel, "$appId:"); /* @var $channelManager ChannelManager */ $channelManager = app(ChannelManager::class); // Load the Channel instance, if any - $channel = $channelManager->find($payload->appId, $payload->channel); - if ($channel === null) { - return false; + $channel = $channelManager->find($appId, $payload->channel); + + // If no channel is found, none of our connections want to + // receive this message, so we ignore it. + if (! $channel) { + return; } - $socket = $payload->socket; + $socket = $payload->socket ?? null; - // Remove the internal keys from the payload + // Remove fields intended for internal use from the payload unset($payload->socket); unset($payload->serverId); unset($payload->appId); // Push the message out to connected websocket clients $channel->broadcastToEveryoneExcept($payload, $socket); - - return true; } /** @@ -123,13 +128,13 @@ class RedisClient implements ReplicationInterface */ public function subscribe(string $appId, string $channel): bool { - if (! isset($this->subscribedChannels[$channel])) { + if (! isset($this->subscribedChannels["$appId:$channel"])) { // We're not subscribed to the channel yet, subscribe and set the count to 1 - $this->subscribeClient->__call('subscribe', [$channel]); - $this->subscribedChannels[$channel] = 1; + $this->subscribeClient->__call('subscribe', ["$appId:$channel"]); + $this->subscribedChannels["$appId:$channel"] = 1; } else { // Increment the subscribe count if we've already subscribed - $this->subscribedChannels[$channel]++; + $this->subscribedChannels["$appId:$channel"]++; } return true; @@ -144,17 +149,17 @@ class RedisClient implements ReplicationInterface */ public function unsubscribe(string $appId, string $channel): bool { - if (! isset($this->subscribedChannels[$channel])) { + if (! isset($this->subscribedChannels["$appId:$channel"])) { return false; } // Decrement the subscription count for this channel - $this->subscribedChannels[$channel]--; + $this->subscribedChannels["$appId:$channel"]--; // If we no longer have subscriptions to that channel, unsubscribe - if ($this->subscribedChannels[$channel] < 1) { - $this->subscribeClient->__call('unsubscribe', [$channel]); - unset($this->subscribedChannels[$channel]); + if ($this->subscribedChannels["$appId:$channel"] < 1) { + $this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]); + unset($this->subscribedChannels["$appId:$channel"]); } return true; @@ -173,7 +178,7 @@ class RedisClient implements ReplicationInterface $payload->appId = $appId; $payload->serverId = $this->serverId; - $this->publishClient->__call('publish', [$channel, json_encode($payload)]); + $this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]); return true; } diff --git a/src/PubSub/Redis/RedisPusherBroadcaster.php b/src/PubSub/Redis/RedisPusherBroadcaster.php index 6f88179..9905914 100644 --- a/src/PubSub/Redis/RedisPusherBroadcaster.php +++ b/src/PubSub/Redis/RedisPusherBroadcaster.php @@ -144,7 +144,7 @@ class RedisPusherBroadcaster extends Broadcaster ]); foreach ($this->formatChannels($channels) as $channel) { - $connection->publish($channel, $payload); + $connection->publish("{$this->appId}:$channel", $payload); } } } diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 605a7db..9db18ad 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -107,11 +107,14 @@ class Channel ->publish($connection->app->id, $payload); } - $this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id); + $this->broadcastToEveryoneExcept($payload, $connection->socketId); } - public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null) + public function broadcastToEveryoneExcept($payload, ?string $socketId = null) { + // Performance optimization, if we don't have a socket ID, + // then we avoid running the if condition in the foreach loop below + // by calling broadcast() instead. if (is_null($socketId)) { $this->broadcast($payload);