From c79bac07c4eecfb9390faa1f67f16057b50e85fe Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Mon, 24 Aug 2020 14:06:58 +0300 Subject: [PATCH] Fixed the subscribed topic names --- src/PubSub/Drivers/RedisClient.php | 35 +++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index 14b9357..0d91e72 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -104,13 +104,13 @@ class RedisClient extends LocalClient $payload = json_encode($payload); - $this->publishClient->__call('publish', ["{$appId}:{$channel}", $payload]); + $this->publishClient->__call('publish', [$this->getTopicName($appId, $channel), $payload]); DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [ 'channel' => $channel, 'serverId' => $this->getServerId(), 'payload' => $payload, - 'pubsub' => "{$appId}:{$channel}", + 'pubsub' => $this->getTopicName($appId, $channel), ]); return true; @@ -127,7 +127,7 @@ class RedisClient extends LocalClient { if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) { // We're not subscribed to the channel yet, subscribe and set the count to 1 - $this->subscribeClient->__call('subscribe', ["{$appId}:{$channel}"]); + $this->subscribeClient->__call('subscribe', [$this->getTopicName($appId, $channel)]); $this->subscribedChannels["{$appId}:{$channel}"] = 1; } else { // Increment the subscribe count if we've already subscribed @@ -137,7 +137,7 @@ class RedisClient extends LocalClient DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [ 'channel' => $channel, 'serverId' => $this->getServerId(), - 'pubsub' => "{$appId}:{$channel}", + 'pubsub' => $this->getTopicName($appId, $channel), ]); return true; @@ -169,7 +169,7 @@ class RedisClient extends LocalClient DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [ 'channel' => $channel, 'serverId' => $this->getServerId(), - 'pubsub' => "{$appId}:{$channel}", + 'pubsub' => $this->getTopicName($appId, $channel), ]); return true; @@ -194,7 +194,7 @@ class RedisClient extends LocalClient 'serverId' => $this->getServerId(), 'socketId' => $socketId, 'data' => $data, - 'pubsub' => "{$appId}:{$channel}", + 'pubsub' => $this->getTopicName($appId, $channel), ]); } @@ -209,13 +209,13 @@ class RedisClient extends LocalClient */ public function leaveChannel($appId, string $channel, string $socketId) { - $this->publishClient->__call('hdel', ["{$appId}:{$channel}", $socketId]); + $this->publishClient->__call('hdel', [$this->getTopicName($appId, $channel), $socketId]); DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [ 'channel' => $channel, 'serverId' => $this->getServerId(), 'socketId' => $socketId, - 'pubsub' => "{$appId}:{$channel}", + 'pubsub' => $this->getTopicName($appId, $channel), ]); } @@ -228,7 +228,7 @@ class RedisClient extends LocalClient */ public function channelMembers($appId, string $channel): PromiseInterface { - return $this->publishClient->__call('hgetall', ["{$appId}:{$channel}"]) + return $this->publishClient->__call('hgetall', [$this->getTopicName($appId, $channel)]) ->then(function ($members) { // The data is expected as objects, so we need to JSON decode return array_map(function ($user) { @@ -249,7 +249,7 @@ class RedisClient extends LocalClient $this->publishClient->__call('multi', []); foreach ($channelNames as $channel) { - $this->publishClient->__call('hlen', ["{$appId}:{$channel}"]); + $this->publishClient->__call('hlen', [$this->getTopicName($appId, $channel)]); } return $this->publishClient->__call('exec', []) @@ -371,4 +371,19 @@ class RedisClient extends LocalClient { return $this->serverId; } + + /** + * Get the Pub/Sub Topic name to subscribe based on the + * app ID and channel name. + * + * @param mixed $appId + * @param string $channel + * @return string + */ + protected function getTopicName($appId, string $channel): string + { + $prefix = config('database.redis.options.prefix', null); + + return "{$prefix}{$appId}:{$channel}"; + } }