Merge pull request #530 from stayallive/feature/fix-presence-channels
Fix presence channels emitting too much events
This commit is contained in:
commit
805fd5e02c
|
|
@ -22,8 +22,8 @@ class FetchUsersController extends Controller
|
||||||
}
|
}
|
||||||
|
|
||||||
return [
|
return [
|
||||||
'users' => Collection::make($channel->getUsers())->map(function ($user) {
|
'users' => Collection::make($channel->getUsers())->keys()->map(function ($userId) {
|
||||||
return ['id' => $user->user_id];
|
return ['id' => $userId];
|
||||||
})->values(),
|
})->values(),
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,26 +5,44 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
|
||||||
use Ratchet\ConnectionInterface;
|
use Ratchet\ConnectionInterface;
|
||||||
use stdClass;
|
use stdClass;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
|
||||||
|
*/
|
||||||
class PresenceChannel extends Channel
|
class PresenceChannel extends Channel
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* List of users in the channel keyed by their user ID with their info as value.
|
||||||
|
*
|
||||||
|
* @var array<string, array>
|
||||||
|
*/
|
||||||
protected $users = [];
|
protected $users = [];
|
||||||
|
|
||||||
public function getUsers(): array
|
/**
|
||||||
{
|
* List of sockets keyed by their ID with the value pointing to a user ID.
|
||||||
return $this->users;
|
*
|
||||||
}
|
* @var array<string, string>
|
||||||
|
|
||||||
/*
|
|
||||||
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
|
|
||||||
*/
|
*/
|
||||||
|
protected $sockets = [];
|
||||||
|
|
||||||
public function subscribe(ConnectionInterface $connection, stdClass $payload)
|
public function subscribe(ConnectionInterface $connection, stdClass $payload)
|
||||||
{
|
{
|
||||||
$this->verifySignature($connection, $payload);
|
$this->verifySignature($connection, $payload);
|
||||||
|
|
||||||
$this->saveConnection($connection);
|
$this->saveConnection($connection);
|
||||||
|
|
||||||
$channelData = json_decode($payload->channel_data);
|
$channelData = json_decode($payload->channel_data, true);
|
||||||
$this->users[$connection->socketId] = $channelData;
|
|
||||||
|
// The ID of the user connecting
|
||||||
|
$userId = (string) $channelData['user_id'];
|
||||||
|
|
||||||
|
// Check if the user was already connected to the channel before storing the connection in the state
|
||||||
|
$userFirstConnection = ! isset($this->users[$userId]);
|
||||||
|
|
||||||
|
// Add or replace the user info in the state
|
||||||
|
$this->users[$userId] = $channelData['user_info'] ?? [];
|
||||||
|
|
||||||
|
// Add the socket ID to user ID map in the state
|
||||||
|
$this->sockets[$connection->socketId] = $userId;
|
||||||
|
|
||||||
// Send the success event
|
// Send the success event
|
||||||
$connection->send(json_encode([
|
$connection->send(json_encode([
|
||||||
|
|
@ -33,72 +51,74 @@ class PresenceChannel extends Channel
|
||||||
'data' => json_encode($this->getChannelData()),
|
'data' => json_encode($this->getChannelData()),
|
||||||
]));
|
]));
|
||||||
|
|
||||||
$this->broadcastToOthers($connection, [
|
// The `pusher_internal:member_added` event is triggered when a user joins a channel.
|
||||||
'event' => 'pusher_internal:member_added',
|
// It's quite possible that a user can have multiple connections to the same channel
|
||||||
'channel' => $this->channelName,
|
// (for example by having multiple browser tabs open)
|
||||||
'data' => json_encode($channelData),
|
// and in this case the events will only be triggered when the first tab is opened.
|
||||||
]);
|
if ($userFirstConnection) {
|
||||||
|
$this->broadcastToOthers($connection, [
|
||||||
|
'event' => 'pusher_internal:member_added',
|
||||||
|
'channel' => $this->channelName,
|
||||||
|
'data' => json_encode($channelData),
|
||||||
|
]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function unsubscribe(ConnectionInterface $connection)
|
public function unsubscribe(ConnectionInterface $connection)
|
||||||
{
|
{
|
||||||
parent::unsubscribe($connection);
|
parent::unsubscribe($connection);
|
||||||
|
|
||||||
if (! isset($this->users[$connection->socketId])) {
|
if (! isset($this->sockets[$connection->socketId])) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->broadcastToOthers($connection, [
|
// Find the user ID belonging to this socket
|
||||||
'event' => 'pusher_internal:member_removed',
|
$userId = $this->sockets[$connection->socketId];
|
||||||
'channel' => $this->channelName,
|
|
||||||
'data' => json_encode([
|
|
||||||
'user_id' => $this->users[$connection->socketId]->user_id,
|
|
||||||
]),
|
|
||||||
]);
|
|
||||||
|
|
||||||
unset($this->users[$connection->socketId]);
|
// Remove the socket from the state
|
||||||
|
unset($this->sockets[$connection->socketId]);
|
||||||
|
|
||||||
|
// Test if the user still has open sockets to this channel
|
||||||
|
$userHasOpenConnections = (array_flip($this->sockets)[$userId] ?? null) !== null;
|
||||||
|
|
||||||
|
// The `pusher_internal:member_removed` is triggered when a user leaves a channel.
|
||||||
|
// It's quite possible that a user can have multiple connections to the same channel
|
||||||
|
// (for example by having multiple browser tabs open)
|
||||||
|
// and in this case the events will only be triggered when the last one is closed.
|
||||||
|
if (! $userHasOpenConnections) {
|
||||||
|
$this->broadcastToOthers($connection, [
|
||||||
|
'event' => 'pusher_internal:member_removed',
|
||||||
|
'channel' => $this->channelName,
|
||||||
|
'data' => json_encode([
|
||||||
|
'user_id' => $userId,
|
||||||
|
]),
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Remove the user info from the state
|
||||||
|
unset($this->users[$userId]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function getChannelData(): array
|
protected function getChannelData(): array
|
||||||
{
|
{
|
||||||
return [
|
return [
|
||||||
'presence' => [
|
'presence' => [
|
||||||
'ids' => $userIds = $this->getUserIds(),
|
'ids' => array_keys($this->users),
|
||||||
'hash' => $this->getHash(),
|
'hash' => $this->users,
|
||||||
'count' => count($userIds),
|
'count' => count($this->users),
|
||||||
],
|
],
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function getUsers(): array
|
||||||
|
{
|
||||||
|
return $this->users;
|
||||||
|
}
|
||||||
|
|
||||||
public function toArray(): array
|
public function toArray(): array
|
||||||
{
|
{
|
||||||
return array_merge(parent::toArray(), [
|
return array_merge(parent::toArray(), [
|
||||||
'user_count' => count($this->getUserIds()),
|
'user_count' => count($this->users),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function getUserIds(): array
|
|
||||||
{
|
|
||||||
$userIds = array_map(function ($channelData) {
|
|
||||||
return (string) $channelData->user_id;
|
|
||||||
}, $this->users);
|
|
||||||
|
|
||||||
return array_values(array_unique($userIds));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Compute the hash for the presence channel integrity.
|
|
||||||
*
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
protected function getHash(): array
|
|
||||||
{
|
|
||||||
$hash = [];
|
|
||||||
|
|
||||||
foreach ($this->users as $socketId => $channelData) {
|
|
||||||
$hash[$channelData->user_id] = $channelData->user_info ?? [];
|
|
||||||
}
|
|
||||||
|
|
||||||
return $hash;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ class PresenceChannelTest extends TestCase
|
||||||
],
|
],
|
||||||
];
|
];
|
||||||
|
|
||||||
$message = $this->getSignedMessage($connection, 'presence-channel', $channelData);
|
$message = $this->getSignedSubscribeMessage($connection, 'presence-channel', $channelData);
|
||||||
|
|
||||||
$this->pusherServer->onMessage($connection, $message);
|
$this->pusherServer->onMessage($connection, $message);
|
||||||
|
|
||||||
|
|
@ -63,7 +63,7 @@ class PresenceChannelTest extends TestCase
|
||||||
'user_id' => 1,
|
'user_id' => 1,
|
||||||
];
|
];
|
||||||
|
|
||||||
$message = $this->getSignedMessage($connection, 'presence-channel', $channelData);
|
$message = $this->getSignedSubscribeMessage($connection, 'presence-channel', $channelData);
|
||||||
|
|
||||||
$this->pusherServer->onMessage($connection, $message);
|
$this->pusherServer->onMessage($connection, $message);
|
||||||
|
|
||||||
|
|
@ -80,19 +80,19 @@ class PresenceChannelTest extends TestCase
|
||||||
|
|
||||||
$channelName = 'presence-channel';
|
$channelName = 'presence-channel';
|
||||||
$channelData = [
|
$channelData = [
|
||||||
'user_id' => $userId = 1,
|
'user_id' => $userId = 'user:1',
|
||||||
];
|
];
|
||||||
|
|
||||||
$this->pusherServer->onMessage($connection, $this->getSignedMessage($connection, $channelName, $channelData));
|
$this->pusherServer->onMessage($connection, $this->getSignedSubscribeMessage($connection, $channelName, $channelData));
|
||||||
$this->pusherServer->onMessage($connection2, $this->getSignedMessage($connection2, $channelName, $channelData));
|
$this->pusherServer->onMessage($connection2, $this->getSignedSubscribeMessage($connection2, $channelName, $channelData));
|
||||||
|
|
||||||
$connection2->assertSentEvent('pusher_internal:subscription_succeeded', [
|
$connection2->assertSentEvent('pusher_internal:subscription_succeeded', [
|
||||||
'channel' => $channelName,
|
'channel' => $channelName,
|
||||||
'data' => json_encode([
|
'data' => json_encode([
|
||||||
'presence' => [
|
'presence' => [
|
||||||
'ids' => [(string) $userId],
|
'ids' => [$userId],
|
||||||
'hash' => [
|
'hash' => [
|
||||||
(string) $userId => [],
|
$userId => [],
|
||||||
],
|
],
|
||||||
'count' => 1,
|
'count' => 1,
|
||||||
],
|
],
|
||||||
|
|
@ -100,7 +100,44 @@ class PresenceChannelTest extends TestCase
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getSignedMessage(Connection $connection, string $channelName, array $channelData): Message
|
/** @test */
|
||||||
|
public function multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection()
|
||||||
|
{
|
||||||
|
$channelName = 'presence-channel';
|
||||||
|
|
||||||
|
// Connect the `observer` user to the server
|
||||||
|
$this->pusherServer->onOpen($observerConnection = $this->getWebSocketConnection());
|
||||||
|
$this->pusherServer->onMessage($observerConnection, $this->getSignedSubscribeMessage($observerConnection, $channelName, ['user_id' => 'observer']));
|
||||||
|
|
||||||
|
// Connect the first socket for user `user:1` to the server
|
||||||
|
$this->pusherServer->onOpen($firstConnection = $this->getWebSocketConnection());
|
||||||
|
$this->pusherServer->onMessage($firstConnection, $this->getSignedSubscribeMessage($firstConnection, $channelName, ['user_id' => 'user:1']));
|
||||||
|
|
||||||
|
// Make sure the observer sees a `member_added` event for `user:1`
|
||||||
|
$observerConnection->assertSentEvent('pusher_internal:member_added');
|
||||||
|
$observerConnection->resetEvents();
|
||||||
|
|
||||||
|
// Connect the second socket for user `user:1` to the server
|
||||||
|
$this->pusherServer->onOpen($secondConnection = $this->getWebSocketConnection());
|
||||||
|
$this->pusherServer->onMessage($secondConnection, $this->getSignedSubscribeMessage($secondConnection, $channelName, ['user_id' => 'user:1']));
|
||||||
|
|
||||||
|
// Make sure the observer was not notified of a `member_added` event (user was already connected)
|
||||||
|
$observerConnection->assertNotSentEvent('pusher_internal:member_added');
|
||||||
|
|
||||||
|
// Disconnect the first socket for user `user:1` on the server
|
||||||
|
$this->pusherServer->onClose($firstConnection);
|
||||||
|
|
||||||
|
// Make sure the observer was not notified of a `member_removed` event (user still connected on another socket)
|
||||||
|
$observerConnection->assertNotSentEvent('pusher_internal:member_removed');
|
||||||
|
|
||||||
|
// Disconnect the second (and last) socket for user `user:1` on the server
|
||||||
|
$this->pusherServer->onClose($secondConnection);
|
||||||
|
|
||||||
|
// Make sure the observer was notified of a `member_removed` event (last socket for user was disconnected)
|
||||||
|
$observerConnection->assertSentEvent('pusher_internal:member_removed');
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getSignedSubscribeMessage(Connection $connection, string $channelName, array $channelData): Message
|
||||||
{
|
{
|
||||||
$signature = "{$connection->socketId}:{$channelName}:".json_encode($channelData);
|
$signature = "{$connection->socketId}:{$channelName}:".json_encode($channelData);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -66,6 +66,39 @@ class FetchChannelTest extends TestCase
|
||||||
], json_decode($response->getContent(), true));
|
], json_decode($response->getContent(), true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @test */
|
||||||
|
public function it_returns_the_channel_information_for_presence_channel()
|
||||||
|
{
|
||||||
|
$this->joinPresenceChannel('presence-global', 'user:1');
|
||||||
|
$this->joinPresenceChannel('presence-global', 'user:2');
|
||||||
|
$this->joinPresenceChannel('presence-global', 'user:2');
|
||||||
|
|
||||||
|
$connection = new Connection();
|
||||||
|
|
||||||
|
$requestPath = '/apps/1234/channel/presence-global';
|
||||||
|
$routeParams = [
|
||||||
|
'appId' => '1234',
|
||||||
|
'channelName' => 'presence-global',
|
||||||
|
];
|
||||||
|
|
||||||
|
$queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath);
|
||||||
|
|
||||||
|
$request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams));
|
||||||
|
|
||||||
|
$controller = app(FetchChannelController::class);
|
||||||
|
|
||||||
|
$controller->onOpen($connection, $request);
|
||||||
|
|
||||||
|
/** @var JsonResponse $response */
|
||||||
|
$response = array_pop($connection->sentRawData);
|
||||||
|
|
||||||
|
$this->assertSame([
|
||||||
|
'occupied' => true,
|
||||||
|
'subscription_count' => 3,
|
||||||
|
'user_count' => 2,
|
||||||
|
], json_decode($response->getContent(), true));
|
||||||
|
}
|
||||||
|
|
||||||
/** @test */
|
/** @test */
|
||||||
public function it_returns_404_for_invalid_channels()
|
public function it_returns_404_for_invalid_channels()
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -103,10 +103,10 @@ class FetchChannelsTest extends TestCase
|
||||||
/** @test */
|
/** @test */
|
||||||
public function it_returns_the_channel_information_for_prefix_with_user_count()
|
public function it_returns_the_channel_information_for_prefix_with_user_count()
|
||||||
{
|
{
|
||||||
$this->joinPresenceChannel('presence-global.1');
|
$this->joinPresenceChannel('presence-global.1', 'user:1');
|
||||||
$this->joinPresenceChannel('presence-global.1');
|
$this->joinPresenceChannel('presence-global.1', 'user:2');
|
||||||
$this->joinPresenceChannel('presence-global.2');
|
$this->joinPresenceChannel('presence-global.2', 'user:3');
|
||||||
$this->joinPresenceChannel('presence-notglobal.2');
|
$this->joinPresenceChannel('presence-notglobal.2', 'user:4');
|
||||||
|
|
||||||
$connection = new Connection();
|
$connection = new Connection();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,12 @@ class Connection implements ConnectionInterface
|
||||||
$this->closed = true;
|
$this->closed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function resetEvents()
|
||||||
|
{
|
||||||
|
$this->sentData = [];
|
||||||
|
$this->sentRawData = [];
|
||||||
|
}
|
||||||
|
|
||||||
public function assertSentEvent(string $name, array $additionalParameters = [])
|
public function assertSentEvent(string $name, array $additionalParameters = [])
|
||||||
{
|
{
|
||||||
$event = collect($this->sentData)->firstWhere('event', '=', $name);
|
$event = collect($this->sentData)->firstWhere('event', '=', $name);
|
||||||
|
|
|
||||||
|
|
@ -89,14 +89,14 @@ abstract class TestCase extends \Orchestra\Testbench\TestCase
|
||||||
return $connection;
|
return $connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function joinPresenceChannel($channel): Connection
|
protected function joinPresenceChannel($channel, $userId = null): Connection
|
||||||
{
|
{
|
||||||
$connection = $this->getWebSocketConnection();
|
$connection = $this->getWebSocketConnection();
|
||||||
|
|
||||||
$this->pusherServer->onOpen($connection);
|
$this->pusherServer->onOpen($connection);
|
||||||
|
|
||||||
$channelData = [
|
$channelData = [
|
||||||
'user_id' => 1,
|
'user_id' => $userId ?? 1,
|
||||||
'user_info' => [
|
'user_info' => [
|
||||||
'name' => 'Marcel',
|
'name' => 'Marcel',
|
||||||
],
|
],
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue