diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php index 3e24c73..8209e83 100644 --- a/src/PubSub/Drivers/LocalClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -78,7 +78,7 @@ class LocalClient implements ReplicationInterface */ public function joinChannel(string $appId, string $channel, string $socketId, string $data) { - $this->channelData["$appId:$channel"][$socketId] = $data; + $this->channelData["{$appId}:{$channel}"][$socketId] = $data; } /** @@ -92,10 +92,10 @@ class LocalClient implements ReplicationInterface */ public function leaveChannel(string $appId, string $channel, string $socketId) { - unset($this->channelData["$appId:$channel"][$socketId]); + unset($this->channelData["{$appId}:{$channel}"][$socketId]); - if (empty($this->channelData["$appId:$channel"])) { - unset($this->channelData["$appId:$channel"]); + if (empty($this->channelData["{$appId}:{$channel}"])) { + unset($this->channelData["{$appId}:{$channel}"]); } } @@ -108,7 +108,7 @@ class LocalClient implements ReplicationInterface */ public function channelMembers(string $appId, string $channel): PromiseInterface { - $members = $this->channelData["$appId:$channel"] ?? []; + $members = $this->channelData["{$appId}:{$channel}"] ?? []; $members = array_map(function ($user) { return json_decode($user); @@ -130,8 +130,8 @@ class LocalClient implements ReplicationInterface // Count the number of users per channel foreach ($channelNames as $channel) { - $results[$channel] = isset($this->channelData["$appId:$channel"]) - ? count($this->channelData["$appId:$channel"]) + $results[$channel] = isset($this->channelData["{$appId}:{$channel}"]) + ? count($this->channelData["{$appId}:{$channel}"]) : 0; } diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index 7b730c3..11a479e 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -104,12 +104,13 @@ class RedisClient extends LocalClient $payload = json_encode($payload); - $this->publishClient->__call('publish', ["$appId:$channel", $payload]); + $this->publishClient->__call('publish', ["{$appId}:{$channel}", $payload]); DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [ 'channel' => $channel, 'serverId' => $this->getServerId(), 'payload' => $payload, + 'pubsub' => "{$appId}:{$channel}", ]); return true; @@ -124,18 +125,19 @@ class RedisClient extends LocalClient */ public function subscribe(string $appId, string $channel): bool { - if (! isset($this->subscribedChannels["$appId:$channel"])) { + if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) { // We're not subscribed to the channel yet, subscribe and set the count to 1 - $this->subscribeClient->__call('subscribe', ["$appId:$channel"]); - $this->subscribedChannels["$appId:$channel"] = 1; + $this->subscribeClient->__call('subscribe', ["{$appId}:{$channel}"]); + $this->subscribedChannels["{$appId}:{$channel}"] = 1; } else { // Increment the subscribe count if we've already subscribed - $this->subscribedChannels["$appId:$channel"]++; + $this->subscribedChannels["{$appId}:{$channel}"]++; } DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [ 'channel' => $channel, 'serverId' => $this->getServerId(), + 'pubsub' => "{$appId}:{$channel}", ]); return true; @@ -150,23 +152,24 @@ class RedisClient extends LocalClient */ public function unsubscribe(string $appId, string $channel): bool { - if (! isset($this->subscribedChannels["$appId:$channel"])) { + if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) { return false; } // Decrement the subscription count for this channel - $this->subscribedChannels["$appId:$channel"]--; + $this->subscribedChannels["{$appId}:{$channel}"]--; // If we no longer have subscriptions to that channel, unsubscribe - if ($this->subscribedChannels["$appId:$channel"] < 1) { - $this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]); + if ($this->subscribedChannels["{$appId}:{$channel}"] < 1) { + $this->subscribeClient->__call('unsubscribe', ["{$appId}:{$channel}"]); - unset($this->subscribedChannels["$appId:$channel"]); + unset($this->subscribedChannels["{$appId}:{$channel}"]); } DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [ 'channel' => $channel, 'serverId' => $this->getServerId(), + 'pubsub' => "{$appId}:{$channel}", ]); return true; @@ -184,13 +187,14 @@ class RedisClient extends LocalClient */ public function joinChannel(string $appId, string $channel, string $socketId, string $data) { - $this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]); + $this->publishClient->__call('hset', ["{$appId}:{$channel}", $socketId, $data]); DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [ 'channel' => $channel, 'serverId' => $this->getServerId(), 'socketId' => $socketId, 'data' => $data, + 'pubsub' => "{$appId}:{$channel}", ]); } @@ -205,12 +209,13 @@ class RedisClient extends LocalClient */ public function leaveChannel(string $appId, string $channel, string $socketId) { - $this->publishClient->__call('hdel', ["$appId:$channel", $socketId]); + $this->publishClient->__call('hdel', ["{$appId}:{$channel}", $socketId]); DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [ 'channel' => $channel, 'serverId' => $this->getServerId(), 'socketId' => $socketId, + 'pubsub' => "{$appId}:{$channel}", ]); } @@ -223,7 +228,7 @@ class RedisClient extends LocalClient */ public function channelMembers(string $appId, string $channel): PromiseInterface { - return $this->publishClient->__call('hgetall', ["$appId:$channel"]) + return $this->publishClient->__call('hgetall', ["{$appId}:{$channel}"]) ->then(function ($members) { // The data is expected as objects, so we need to JSON decode return array_map(function ($user) { @@ -244,7 +249,7 @@ class RedisClient extends LocalClient $this->publishClient->__call('multi', []); foreach ($channelNames as $channel) { - $this->publishClient->__call('hlen', ["$appId:$channel"]); + $this->publishClient->__call('hlen', ["{$appId}:{$channel}"]); } return $this->publishClient->__call('exec', [])