Scope pub/sub channels in Redis by appId to avoid crosstalk between apps

This commit is contained in:
Francis Lavoie 2019-03-29 10:22:36 -04:00
parent 668cd29df0
commit eca8c7b846
No known key found for this signature in database
GPG Key ID: B9E0E04A76AF4692
3 changed files with 32 additions and 24 deletions

View File

@ -78,7 +78,6 @@ class RedisClient implements ReplicationInterface
* *
* @param string $redisChannel * @param string $redisChannel
* @param string $payload * @param string $payload
* @return bool
*/ */
protected function onMessage(string $redisChannel, string $payload) protected function onMessage(string $redisChannel, string $payload)
{ {
@ -86,32 +85,38 @@ class RedisClient implements ReplicationInterface
// Ignore messages sent by ourselves // Ignore messages sent by ourselves
if (isset($payload->serverId) && $this->serverId === $payload->serverId) { if (isset($payload->serverId) && $this->serverId === $payload->serverId) {
return false; return;
} }
// We need to put the channel name in the payload // Pull out the app ID. See RedisPusherBroadcaster
$payload->channel = $redisChannel; $appId = $payload->appId;
// We need to put the channel name in the payload.
// We strip the app ID from the channel name, websocket clients
// expect the channel name to not include the app ID.
$payload->channel = Str::after($redisChannel, "$appId:");
/* @var $channelManager ChannelManager */ /* @var $channelManager ChannelManager */
$channelManager = app(ChannelManager::class); $channelManager = app(ChannelManager::class);
// Load the Channel instance, if any // Load the Channel instance, if any
$channel = $channelManager->find($payload->appId, $payload->channel); $channel = $channelManager->find($appId, $payload->channel);
if ($channel === null) {
return false; // If no channel is found, none of our connections want to
// receive this message, so we ignore it.
if (! $channel) {
return;
} }
$socket = $payload->socket; $socket = $payload->socket ?? null;
// Remove the internal keys from the payload // Remove fields intended for internal use from the payload
unset($payload->socket); unset($payload->socket);
unset($payload->serverId); unset($payload->serverId);
unset($payload->appId); unset($payload->appId);
// Push the message out to connected websocket clients // Push the message out to connected websocket clients
$channel->broadcastToEveryoneExcept($payload, $socket); $channel->broadcastToEveryoneExcept($payload, $socket);
return true;
} }
/** /**
@ -123,13 +128,13 @@ class RedisClient implements ReplicationInterface
*/ */
public function subscribe(string $appId, string $channel): bool public function subscribe(string $appId, string $channel): bool
{ {
if (! isset($this->subscribedChannels[$channel])) { if (! isset($this->subscribedChannels["$appId:$channel"])) {
// We're not subscribed to the channel yet, subscribe and set the count to 1 // We're not subscribed to the channel yet, subscribe and set the count to 1
$this->subscribeClient->__call('subscribe', [$channel]); $this->subscribeClient->__call('subscribe', ["$appId:$channel"]);
$this->subscribedChannels[$channel] = 1; $this->subscribedChannels["$appId:$channel"] = 1;
} else { } else {
// Increment the subscribe count if we've already subscribed // Increment the subscribe count if we've already subscribed
$this->subscribedChannels[$channel]++; $this->subscribedChannels["$appId:$channel"]++;
} }
return true; return true;
@ -144,17 +149,17 @@ class RedisClient implements ReplicationInterface
*/ */
public function unsubscribe(string $appId, string $channel): bool public function unsubscribe(string $appId, string $channel): bool
{ {
if (! isset($this->subscribedChannels[$channel])) { if (! isset($this->subscribedChannels["$appId:$channel"])) {
return false; return false;
} }
// Decrement the subscription count for this channel // Decrement the subscription count for this channel
$this->subscribedChannels[$channel]--; $this->subscribedChannels["$appId:$channel"]--;
// If we no longer have subscriptions to that channel, unsubscribe // If we no longer have subscriptions to that channel, unsubscribe
if ($this->subscribedChannels[$channel] < 1) { if ($this->subscribedChannels["$appId:$channel"] < 1) {
$this->subscribeClient->__call('unsubscribe', [$channel]); $this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]);
unset($this->subscribedChannels[$channel]); unset($this->subscribedChannels["$appId:$channel"]);
} }
return true; return true;
@ -173,7 +178,7 @@ class RedisClient implements ReplicationInterface
$payload->appId = $appId; $payload->appId = $appId;
$payload->serverId = $this->serverId; $payload->serverId = $this->serverId;
$this->publishClient->__call('publish', [$channel, json_encode($payload)]); $this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]);
return true; return true;
} }

View File

@ -144,7 +144,7 @@ class RedisPusherBroadcaster extends Broadcaster
]); ]);
foreach ($this->formatChannels($channels) as $channel) { foreach ($this->formatChannels($channels) as $channel) {
$connection->publish($channel, $payload); $connection->publish("{$this->appId}:$channel", $payload);
} }
} }
} }

View File

@ -107,11 +107,14 @@ class Channel
->publish($connection->app->id, $payload); ->publish($connection->app->id, $payload);
} }
$this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id); $this->broadcastToEveryoneExcept($payload, $connection->socketId);
} }
public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null) public function broadcastToEveryoneExcept($payload, ?string $socketId = null)
{ {
// Performance optimization, if we don't have a socket ID,
// then we avoid running the if condition in the foreach loop below
// by calling broadcast() instead.
if (is_null($socketId)) { if (is_null($socketId)) {
$this->broadcast($payload); $this->broadcast($payload);