updated pubsub messages

This commit is contained in:
Alex Renoki 2020-08-17 21:24:18 +03:00
parent c622f77351
commit 417c8322e0
2 changed files with 26 additions and 21 deletions

View File

@ -78,7 +78,7 @@ class LocalClient implements ReplicationInterface
*/ */
public function joinChannel(string $appId, string $channel, string $socketId, string $data) public function joinChannel(string $appId, string $channel, string $socketId, string $data)
{ {
$this->channelData["$appId:$channel"][$socketId] = $data; $this->channelData["{$appId}:{$channel}"][$socketId] = $data;
} }
/** /**
@ -92,10 +92,10 @@ class LocalClient implements ReplicationInterface
*/ */
public function leaveChannel(string $appId, string $channel, string $socketId) public function leaveChannel(string $appId, string $channel, string $socketId)
{ {
unset($this->channelData["$appId:$channel"][$socketId]); unset($this->channelData["{$appId}:{$channel}"][$socketId]);
if (empty($this->channelData["$appId:$channel"])) { if (empty($this->channelData["{$appId}:{$channel}"])) {
unset($this->channelData["$appId:$channel"]); unset($this->channelData["{$appId}:{$channel}"]);
} }
} }
@ -108,7 +108,7 @@ class LocalClient implements ReplicationInterface
*/ */
public function channelMembers(string $appId, string $channel): PromiseInterface public function channelMembers(string $appId, string $channel): PromiseInterface
{ {
$members = $this->channelData["$appId:$channel"] ?? []; $members = $this->channelData["{$appId}:{$channel}"] ?? [];
$members = array_map(function ($user) { $members = array_map(function ($user) {
return json_decode($user); return json_decode($user);
@ -130,8 +130,8 @@ class LocalClient implements ReplicationInterface
// Count the number of users per channel // Count the number of users per channel
foreach ($channelNames as $channel) { foreach ($channelNames as $channel) {
$results[$channel] = isset($this->channelData["$appId:$channel"]) $results[$channel] = isset($this->channelData["{$appId}:{$channel}"])
? count($this->channelData["$appId:$channel"]) ? count($this->channelData["{$appId}:{$channel}"])
: 0; : 0;
} }

View File

@ -104,12 +104,13 @@ class RedisClient extends LocalClient
$payload = json_encode($payload); $payload = json_encode($payload);
$this->publishClient->__call('publish', ["$appId:$channel", $payload]); $this->publishClient->__call('publish', ["{$appId}:{$channel}", $payload]);
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [
'channel' => $channel, 'channel' => $channel,
'serverId' => $this->getServerId(), 'serverId' => $this->getServerId(),
'payload' => $payload, 'payload' => $payload,
'pubsub' => "{$appId}:{$channel}",
]); ]);
return true; return true;
@ -124,18 +125,19 @@ class RedisClient extends LocalClient
*/ */
public function subscribe(string $appId, string $channel): bool public function subscribe(string $appId, string $channel): bool
{ {
if (! isset($this->subscribedChannels["$appId:$channel"])) { if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
// We're not subscribed to the channel yet, subscribe and set the count to 1 // We're not subscribed to the channel yet, subscribe and set the count to 1
$this->subscribeClient->__call('subscribe', ["$appId:$channel"]); $this->subscribeClient->__call('subscribe', ["{$appId}:{$channel}"]);
$this->subscribedChannels["$appId:$channel"] = 1; $this->subscribedChannels["{$appId}:{$channel}"] = 1;
} else { } else {
// Increment the subscribe count if we've already subscribed // Increment the subscribe count if we've already subscribed
$this->subscribedChannels["$appId:$channel"]++; $this->subscribedChannels["{$appId}:{$channel}"]++;
} }
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
'channel' => $channel, 'channel' => $channel,
'serverId' => $this->getServerId(), 'serverId' => $this->getServerId(),
'pubsub' => "{$appId}:{$channel}",
]); ]);
return true; return true;
@ -150,23 +152,24 @@ class RedisClient extends LocalClient
*/ */
public function unsubscribe(string $appId, string $channel): bool public function unsubscribe(string $appId, string $channel): bool
{ {
if (! isset($this->subscribedChannels["$appId:$channel"])) { if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
return false; return false;
} }
// Decrement the subscription count for this channel // Decrement the subscription count for this channel
$this->subscribedChannels["$appId:$channel"]--; $this->subscribedChannels["{$appId}:{$channel}"]--;
// If we no longer have subscriptions to that channel, unsubscribe // If we no longer have subscriptions to that channel, unsubscribe
if ($this->subscribedChannels["$appId:$channel"] < 1) { if ($this->subscribedChannels["{$appId}:{$channel}"] < 1) {
$this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]); $this->subscribeClient->__call('unsubscribe', ["{$appId}:{$channel}"]);
unset($this->subscribedChannels["$appId:$channel"]); unset($this->subscribedChannels["{$appId}:{$channel}"]);
} }
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
'channel' => $channel, 'channel' => $channel,
'serverId' => $this->getServerId(), 'serverId' => $this->getServerId(),
'pubsub' => "{$appId}:{$channel}",
]); ]);
return true; return true;
@ -184,13 +187,14 @@ class RedisClient extends LocalClient
*/ */
public function joinChannel(string $appId, string $channel, string $socketId, string $data) public function joinChannel(string $appId, string $channel, string $socketId, string $data)
{ {
$this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]); $this->publishClient->__call('hset', ["{$appId}:{$channel}", $socketId, $data]);
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [
'channel' => $channel, 'channel' => $channel,
'serverId' => $this->getServerId(), 'serverId' => $this->getServerId(),
'socketId' => $socketId, 'socketId' => $socketId,
'data' => $data, 'data' => $data,
'pubsub' => "{$appId}:{$channel}",
]); ]);
} }
@ -205,12 +209,13 @@ class RedisClient extends LocalClient
*/ */
public function leaveChannel(string $appId, string $channel, string $socketId) public function leaveChannel(string $appId, string $channel, string $socketId)
{ {
$this->publishClient->__call('hdel', ["$appId:$channel", $socketId]); $this->publishClient->__call('hdel', ["{$appId}:{$channel}", $socketId]);
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
'channel' => $channel, 'channel' => $channel,
'serverId' => $this->getServerId(), 'serverId' => $this->getServerId(),
'socketId' => $socketId, 'socketId' => $socketId,
'pubsub' => "{$appId}:{$channel}",
]); ]);
} }
@ -223,7 +228,7 @@ class RedisClient extends LocalClient
*/ */
public function channelMembers(string $appId, string $channel): PromiseInterface public function channelMembers(string $appId, string $channel): PromiseInterface
{ {
return $this->publishClient->__call('hgetall', ["$appId:$channel"]) return $this->publishClient->__call('hgetall', ["{$appId}:{$channel}"])
->then(function ($members) { ->then(function ($members) {
// The data is expected as objects, so we need to JSON decode // The data is expected as objects, so we need to JSON decode
return array_map(function ($user) { return array_map(function ($user) {
@ -244,7 +249,7 @@ class RedisClient extends LocalClient
$this->publishClient->__call('multi', []); $this->publishClient->__call('multi', []);
foreach ($channelNames as $channel) { foreach ($channelNames as $channel) {
$this->publishClient->__call('hlen', ["$appId:$channel"]); $this->publishClient->__call('hlen', ["{$appId}:{$channel}"]);
} }
return $this->publishClient->__call('exec', []) return $this->publishClient->__call('exec', [])