Fixing PR #530
This commit is contained in:
parent
88cf779dd7
commit
97ab241fa3
|
|
@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager
|
||||||
*/
|
*/
|
||||||
protected $users = [];
|
protected $users = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The list of users by socket and their attached id.
|
||||||
|
*
|
||||||
|
* @var array
|
||||||
|
*/
|
||||||
|
protected $userSockets = [];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wether the current instance accepts new connections.
|
* Wether the current instance accepts new connections.
|
||||||
*
|
*
|
||||||
|
|
@ -273,6 +280,7 @@ class LocalChannelManager implements ChannelManager
|
||||||
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload)
|
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload)
|
||||||
{
|
{
|
||||||
$this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user);
|
$this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user);
|
||||||
|
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -287,6 +295,19 @@ class LocalChannelManager implements ChannelManager
|
||||||
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel)
|
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel)
|
||||||
{
|
{
|
||||||
unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]);
|
unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]);
|
||||||
|
|
||||||
|
$deletableSocketKey = array_search(
|
||||||
|
$connection->socketId,
|
||||||
|
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]
|
||||||
|
);
|
||||||
|
|
||||||
|
if ($deletableSocketKey !== false) {
|
||||||
|
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][$deletableSocketKey]);
|
||||||
|
|
||||||
|
if (count($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]) === 0) {
|
||||||
|
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -342,6 +363,21 @@ class LocalChannelManager implements ChannelManager
|
||||||
return new FulfilledPromise($results);
|
return new FulfilledPromise($results);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the socket IDs for a presence channel member.
|
||||||
|
*
|
||||||
|
* @param string|int $userId
|
||||||
|
* @param string|int $appId
|
||||||
|
* @param string $channelName
|
||||||
|
* @return \React\Promise\PromiseInterface
|
||||||
|
*/
|
||||||
|
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
|
||||||
|
{
|
||||||
|
return new FulfilledPromise(
|
||||||
|
$this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? []
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Keep tracking the connections availability when they pong.
|
* Keep tracking the connections availability when they pong.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -313,6 +313,10 @@ class RedisChannelManager extends LocalChannelManager
|
||||||
$this->storeUserData(
|
$this->storeUserData(
|
||||||
$connection->app->id, $channel, $connection->socketId, json_encode($user)
|
$connection->app->id, $channel, $connection->socketId, json_encode($user)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
$this->addUserSocket(
|
||||||
|
$connection->app->id, $channel, $user, $connection->socketId
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -329,6 +333,10 @@ class RedisChannelManager extends LocalChannelManager
|
||||||
$this->removeUserData(
|
$this->removeUserData(
|
||||||
$connection->app->id, $channel, $connection->socketId
|
$connection->app->id, $channel, $connection->socketId
|
||||||
);
|
);
|
||||||
|
|
||||||
|
$this->removeUserSocket(
|
||||||
|
$connection->app->id, $channel, $user, $connection->socketId
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -389,6 +397,21 @@ class RedisChannelManager extends LocalChannelManager
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the socket IDs for a presence channel member.
|
||||||
|
*
|
||||||
|
* @param string|int $userId
|
||||||
|
* @param string|int $appId
|
||||||
|
* @param string $channelName
|
||||||
|
* @return \React\Promise\PromiseInterface
|
||||||
|
*/
|
||||||
|
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
|
||||||
|
{
|
||||||
|
return $this->publishClient->smembers(
|
||||||
|
$this->getRedisKey($appId, $channelName, [$userId, 'userSockets'])
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Keep tracking the connections availability when they pong.
|
* Keep tracking the connections availability when they pong.
|
||||||
*
|
*
|
||||||
|
|
@ -628,7 +651,7 @@ class RedisChannelManager extends LocalChannelManager
|
||||||
* @param string|int $appId
|
* @param string|int $appId
|
||||||
* @param string|null $channel
|
* @param string|null $channel
|
||||||
* @param string $key
|
* @param string $key
|
||||||
* @param mixed $data
|
* @param string $data
|
||||||
* @return PromiseInterface
|
* @return PromiseInterface
|
||||||
*/
|
*/
|
||||||
public function storeUserData($appId, string $channel = null, string $key, $data)
|
public function storeUserData($appId, string $channel = null, string $key, $data)
|
||||||
|
|
@ -681,6 +704,40 @@ class RedisChannelManager extends LocalChannelManager
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the Presence Channel's User's Socket ID to a list.
|
||||||
|
*
|
||||||
|
* @param string|int $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param stdClass $user
|
||||||
|
* @param string $socketId
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId)
|
||||||
|
{
|
||||||
|
$this->publishClient->sadd(
|
||||||
|
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']),
|
||||||
|
$socketId
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the Presence Channel's User's Socket ID from the list.
|
||||||
|
*
|
||||||
|
* @param string|int $appId
|
||||||
|
* @param string $channel
|
||||||
|
* @param stdClass $user
|
||||||
|
* @param string $socketId
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId)
|
||||||
|
{
|
||||||
|
$this->publishClient->srem(
|
||||||
|
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']),
|
||||||
|
$socketId
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the Redis Keyspace name to handle subscriptions
|
* Get the Redis Keyspace name to handle subscriptions
|
||||||
* and other key-value sets.
|
* and other key-value sets.
|
||||||
|
|
|
||||||
|
|
@ -55,6 +55,14 @@ class PresenceChannel extends PrivateChannel
|
||||||
]));
|
]));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// The `pusher_internal:member_added` event is triggered when a user joins a channel.
|
||||||
|
// It's quite possible that a user can have multiple connections to the same channel
|
||||||
|
// (for example by having multiple browser tabs open)
|
||||||
|
// and in this case the events will only be triggered when the first tab is opened.
|
||||||
|
$this->channelManager
|
||||||
|
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
|
||||||
|
->then(function ($sockets) use ($payload, $connection) {
|
||||||
|
if (count($sockets) === 1) {
|
||||||
$memberAddedPayload = [
|
$memberAddedPayload = [
|
||||||
'event' => 'pusher_internal:member_added',
|
'event' => 'pusher_internal:member_added',
|
||||||
'channel' => $this->getName(),
|
'channel' => $this->getName(),
|
||||||
|
|
@ -65,10 +73,13 @@ class PresenceChannel extends PrivateChannel
|
||||||
(object) $memberAddedPayload, $connection->socketId,
|
(object) $memberAddedPayload, $connection->socketId,
|
||||||
$connection->app->id
|
$connection->app->id
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
|
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
|
||||||
'socketId' => $connection->socketId,
|
'socketId' => $connection->socketId,
|
||||||
'channel' => $this->getName(),
|
'channel' => $this->getName(),
|
||||||
|
'multi-device' => isset($connection->duplicate),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -95,6 +106,14 @@ class PresenceChannel extends PrivateChannel
|
||||||
$connection, $user, $this->getName()
|
$connection, $user, $this->getName()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// The `pusher_internal:member_removed` is triggered when a user leaves a channel.
|
||||||
|
// It's quite possible that a user can have multiple connections to the same channel
|
||||||
|
// (for example by having multiple browser tabs open)
|
||||||
|
// and in this case the events will only be triggered when the last one is closed.
|
||||||
|
$this->channelManager
|
||||||
|
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
|
||||||
|
->then(function ($sockets) use ($connection, $user) {
|
||||||
|
if (count($sockets) === 0) {
|
||||||
$memberRemovedPayload = [
|
$memberRemovedPayload = [
|
||||||
'event' => 'pusher_internal:member_removed',
|
'event' => 'pusher_internal:member_removed',
|
||||||
'channel' => $this->getName(),
|
'channel' => $this->getName(),
|
||||||
|
|
@ -107,6 +126,8 @@ class PresenceChannel extends PrivateChannel
|
||||||
(object) $memberRemovedPayload, $connection->socketId,
|
(object) $memberRemovedPayload, $connection->socketId,
|
||||||
$connection->app->id
|
$connection->app->id
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -186,6 +186,16 @@ interface ChannelManager
|
||||||
*/
|
*/
|
||||||
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface;
|
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the socket IDs for a presence channel member.
|
||||||
|
*
|
||||||
|
* @param string|int $userId
|
||||||
|
* @param string|int $appId
|
||||||
|
* @param string $channelName
|
||||||
|
* @return \React\Promise\PromiseInterface
|
||||||
|
*/
|
||||||
|
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Keep tracking the connections availability when they pong.
|
* Keep tracking the connections availability when they pong.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,32 @@ class Connection implements ConnectionInterface
|
||||||
$this->closed = true;
|
$this->closed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the events for assertions.
|
||||||
|
*
|
||||||
|
* @return $this
|
||||||
|
*/
|
||||||
|
public function resetEvents()
|
||||||
|
{
|
||||||
|
$this->sentData = [];
|
||||||
|
$this->sentRawData = [];
|
||||||
|
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dump & stop execution.
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function dd()
|
||||||
|
{
|
||||||
|
dd([
|
||||||
|
'sentData' => $this->sentData,
|
||||||
|
'sentRawData' => $this->sentRawData,
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert that an event got sent.
|
* Assert that an event got sent.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -61,6 +61,31 @@ class PresenceChannelTest extends TestCase
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function test_connect_to_presence_channel_when_user_with_same_ids_is_already_joined()
|
||||||
|
{
|
||||||
|
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
|
||||||
|
$morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
|
||||||
|
$pickleRick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
|
||||||
|
|
||||||
|
foreach ([$rick, $morty, $pickleRick] as $connection) {
|
||||||
|
$connection->assertSentEvent('pusher_internal:subscription_succeeded', [
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->channelManager
|
||||||
|
->getGlobalConnectionsCount('1234', 'presence-channel')
|
||||||
|
->then(function ($total) {
|
||||||
|
$this->assertEquals(3, $total);
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->channelManager
|
||||||
|
->getChannelMembers('1234', 'presence-channel')
|
||||||
|
->then(function ($members) {
|
||||||
|
$this->assertCount(2, $members);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public function test_presence_channel_broadcast_member_events()
|
public function test_presence_channel_broadcast_member_events()
|
||||||
{
|
{
|
||||||
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
|
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
|
||||||
|
|
@ -200,4 +225,56 @@ class PresenceChannelTest extends TestCase
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function test_multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection()
|
||||||
|
{
|
||||||
|
// Connect the `observer` user to the server
|
||||||
|
$observerConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 'observer']);
|
||||||
|
|
||||||
|
// Connect the first socket for user `1` to the server
|
||||||
|
$firstConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']);
|
||||||
|
|
||||||
|
// Make sure the observer sees a `member_added` event for `user:1`
|
||||||
|
$observerConnection->assertSentEvent('pusher_internal:member_added', [
|
||||||
|
'event' => 'pusher_internal:member_added',
|
||||||
|
'channel' => 'presence-channel',
|
||||||
|
'data' => json_encode(['user_id' => '1']),
|
||||||
|
])->resetEvents();
|
||||||
|
|
||||||
|
// Connect the second socket for user `1` to the server
|
||||||
|
$secondConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']);
|
||||||
|
|
||||||
|
// Make sure the observer was not notified of a `member_added` event (user was already connected)
|
||||||
|
$observerConnection->assertNotSentEvent('pusher_internal:member_added');
|
||||||
|
|
||||||
|
// Disconnect the first socket for user `1` on the server
|
||||||
|
$this->pusherServer->onClose($firstConnection);
|
||||||
|
|
||||||
|
// Make sure the observer was not notified of a `member_removed` event (user still connected on another socket)
|
||||||
|
$observerConnection->assertNotSentEvent('pusher_internal:member_removed');
|
||||||
|
|
||||||
|
// Disconnect the second (and last) socket for user `1` on the server
|
||||||
|
$this->pusherServer->onClose($secondConnection);
|
||||||
|
|
||||||
|
// Make sure the observer was notified of a `member_removed` event (last socket for user was disconnected)
|
||||||
|
$observerConnection->assertSentEvent('pusher_internal:member_removed');
|
||||||
|
|
||||||
|
$this->channelManager
|
||||||
|
->getMemberSockets('1', '1234', 'presence-channel')
|
||||||
|
->then(function ($sockets) {
|
||||||
|
$this->assertCount(0, $sockets);
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->channelManager
|
||||||
|
->getMemberSockets('2', '1234', 'presence-channel')
|
||||||
|
->then(function ($sockets) {
|
||||||
|
$this->assertCount(0, $sockets);
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->channelManager
|
||||||
|
->getMemberSockets('observer', '1234', 'presence-channel')
|
||||||
|
->then(function ($sockets) {
|
||||||
|
$this->assertCount(1, $sockets);
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue