Merge pull request #536 from beyondcode/fix/multiple-channel-members

Presence Channels emit added or removed events more than once for same users
This commit is contained in:
rennokki 2020-09-16 08:16:12 +00:00 committed by GitHub
commit 40f024d031
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 252 additions and 25 deletions

View File

@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager
*/
protected $users = [];
/**
* The list of users by socket and their attached id.
*
* @var array
*/
protected $userSockets = [];
/**
* Wether the current instance accepts new connections.
*
@ -273,6 +280,7 @@ class LocalChannelManager implements ChannelManager
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload)
{
$this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user);
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId;
}
/**
@ -287,6 +295,19 @@ class LocalChannelManager implements ChannelManager
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel)
{
unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]);
$deletableSocketKey = array_search(
$connection->socketId,
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]
);
if ($deletableSocketKey !== false) {
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][$deletableSocketKey]);
if (count($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]) === 0) {
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]);
}
}
}
/**
@ -342,6 +363,21 @@ class LocalChannelManager implements ChannelManager
return new FulfilledPromise($results);
}
/**
* Get the socket IDs for a presence channel member.
*
* @param string|int $userId
* @param string|int $appId
* @param string $channelName
* @return \React\Promise\PromiseInterface
*/
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
{
return new FulfilledPromise(
$this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? []
);
}
/**
* Keep tracking the connections availability when they pong.
*

View File

@ -313,6 +313,10 @@ class RedisChannelManager extends LocalChannelManager
$this->storeUserData(
$connection->app->id, $channel, $connection->socketId, json_encode($user)
);
$this->addUserSocket(
$connection->app->id, $channel, $user, $connection->socketId
);
}
/**
@ -329,6 +333,10 @@ class RedisChannelManager extends LocalChannelManager
$this->removeUserData(
$connection->app->id, $channel, $connection->socketId
);
$this->removeUserSocket(
$connection->app->id, $channel, $user, $connection->socketId
);
}
/**
@ -389,6 +397,21 @@ class RedisChannelManager extends LocalChannelManager
});
}
/**
* Get the socket IDs for a presence channel member.
*
* @param string|int $userId
* @param string|int $appId
* @param string $channelName
* @return \React\Promise\PromiseInterface
*/
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
{
return $this->publishClient->smembers(
$this->getRedisKey($appId, $channelName, [$userId, 'userSockets'])
);
}
/**
* Keep tracking the connections availability when they pong.
*
@ -628,7 +651,7 @@ class RedisChannelManager extends LocalChannelManager
* @param string|int $appId
* @param string|null $channel
* @param string $key
* @param mixed $data
* @param string $data
* @return PromiseInterface
*/
public function storeUserData($appId, string $channel = null, string $key, $data)
@ -681,6 +704,40 @@ class RedisChannelManager extends LocalChannelManager
);
}
/**
* Add the Presence Channel's User's Socket ID to a list.
*
* @param string|int $appId
* @param string $channel
* @param stdClass $user
* @param string $socketId
* @return void
*/
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId)
{
$this->publishClient->sadd(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']),
$socketId
);
}
/**
* Remove the Presence Channel's User's Socket ID from the list.
*
* @param string|int $appId
* @param string $channel
* @param stdClass $user
* @param string $socketId
* @return void
*/
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId)
{
$this->publishClient->srem(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']),
$socketId
);
}
/**
* Get the Redis Keyspace name to handle subscriptions
* and other key-value sets.

View File

@ -55,6 +55,14 @@ class PresenceChannel extends PrivateChannel
]));
});
// The `pusher_internal:member_added` event is triggered when a user joins a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the first tab is opened.
$this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($payload, $connection) {
if (count($sockets) === 1) {
$memberAddedPayload = [
'event' => 'pusher_internal:member_added',
'channel' => $this->getName(),
@ -65,11 +73,14 @@ class PresenceChannel extends PrivateChannel
(object) $memberAddedPayload, $connection->socketId,
$connection->app->id
);
}
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
'socketId' => $connection->socketId,
'channel' => $this->getName(),
'duplicate-connection' => count($sockets) > 1,
]);
});
}
/**
@ -95,6 +106,14 @@ class PresenceChannel extends PrivateChannel
$connection, $user, $this->getName()
);
// The `pusher_internal:member_removed` is triggered when a user leaves a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the last one is closed.
$this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($connection, $user) {
if (count($sockets) === 0) {
$memberRemovedPayload = [
'event' => 'pusher_internal:member_removed',
'channel' => $this->getName(),
@ -107,6 +126,8 @@ class PresenceChannel extends PrivateChannel
(object) $memberRemovedPayload, $connection->socketId,
$connection->app->id
);
}
});
});
}
}

View File

@ -186,6 +186,16 @@ interface ChannelManager
*/
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface;
/**
* Get the socket IDs for a presence channel member.
*
* @param string|int $userId
* @param string|int $appId
* @param string $channelName
* @return \React\Promise\PromiseInterface
*/
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface;
/**
* Keep tracking the connections availability when they pong.
*

View File

@ -58,6 +58,32 @@ class Connection implements ConnectionInterface
$this->closed = true;
}
/**
* Reset the events for assertions.
*
* @return $this
*/
public function resetEvents()
{
$this->sentData = [];
$this->sentRawData = [];
return $this;
}
/**
* Dump & stop execution.
*
* @return void
*/
public function dd()
{
dd([
'sentData' => $this->sentData,
'sentRawData' => $this->sentRawData,
]);
}
/**
* Assert that an event got sent.
*

View File

@ -61,6 +61,31 @@ class PresenceChannelTest extends TestCase
});
}
public function test_connect_to_presence_channel_when_user_with_same_ids_is_already_joined()
{
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
$pickleRick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
foreach ([$rick, $morty, $pickleRick] as $connection) {
$connection->assertSentEvent('pusher_internal:subscription_succeeded', [
'channel' => 'presence-channel',
]);
}
$this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($total) {
$this->assertEquals(3, $total);
});
$this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(2, $members);
});
}
public function test_presence_channel_broadcast_member_events()
{
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
@ -200,4 +225,56 @@ class PresenceChannelTest extends TestCase
}
});
}
public function test_multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection()
{
// Connect the `observer` user to the server
$observerConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 'observer']);
// Connect the first socket for user `1` to the server
$firstConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']);
// Make sure the observer sees a `member_added` event for `user:1`
$observerConnection->assertSentEvent('pusher_internal:member_added', [
'event' => 'pusher_internal:member_added',
'channel' => 'presence-channel',
'data' => json_encode(['user_id' => '1']),
])->resetEvents();
// Connect the second socket for user `1` to the server
$secondConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']);
// Make sure the observer was not notified of a `member_added` event (user was already connected)
$observerConnection->assertNotSentEvent('pusher_internal:member_added');
// Disconnect the first socket for user `1` on the server
$this->pusherServer->onClose($firstConnection);
// Make sure the observer was not notified of a `member_removed` event (user still connected on another socket)
$observerConnection->assertNotSentEvent('pusher_internal:member_removed');
// Disconnect the second (and last) socket for user `1` on the server
$this->pusherServer->onClose($secondConnection);
// Make sure the observer was notified of a `member_removed` event (last socket for user was disconnected)
$observerConnection->assertSentEvent('pusher_internal:member_removed');
$this->channelManager
->getMemberSockets('1', '1234', 'presence-channel')
->then(function ($sockets) {
$this->assertCount(0, $sockets);
});
$this->channelManager
->getMemberSockets('2', '1234', 'presence-channel')
->then(function ($sockets) {
$this->assertCount(0, $sockets);
});
$this->channelManager
->getMemberSockets('observer', '1234', 'presence-channel')
->then(function ($sockets) {
$this->assertCount(1, $sockets);
});
}
}