Add channel storage to LocalDriver to simplify PresenceChannel logic

This commit is contained in:
Francis Lavoie 2019-07-29 16:20:48 -04:00
parent 373b993e64
commit 00e8f3e1a8
No known key found for this signature in database
GPG Key ID: B9E0E04A76AF4692
4 changed files with 94 additions and 90 deletions

View File

@ -32,30 +32,24 @@ class FetchChannelsController extends Controller
}); });
} }
if (config('websockets.replication.enabled') === true) { // We want to get the channel user count all in one shot when
// We want to get the channel user count all in one shot when // using a replication backend rather than doing individual queries.
// using a replication backend rather than doing individual queries. // To do so, we first collect the list of channel names.
// To do so, we first collect the list of channel names. $channelNames = $channels->map(function (PresenceChannel $channel) use ($request) {
$channelNames = $channels->map(function (PresenceChannel $channel) use ($request) { return $channel->getChannelName();
return $channel->getChannelName(); })->toArray();
})->toArray();
/** @var PromiseInterface $memberCounts */ /** @var PromiseInterface $memberCounts */
// We ask the replication backend to get us the member count per channel // We ask the replication backend to get us the member count per channel
$memberCounts = app(ReplicationInterface::class) $memberCounts = app(ReplicationInterface::class)
->channelMemberCounts($request->appId, $channelNames); ->channelMemberCounts($request->appId, $channelNames);
// We return a promise since the backend runs async. We get $counts back // We return a promise since the backend runs async. We get $counts back
// as a key-value array of channel names and their member count. // as a key-value array of channel names and their member count.
return $memberCounts->then(function (array $counts) use ($channels, $attributes) { return $memberCounts->then(function (array $counts) use ($channels, $attributes) {
return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) { return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) {
return $counts[$channel->getChannelName()]; return $counts[$channel->getChannelName()];
});
}); });
}
return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) {
return $channel->getUserCount();
}); });
} }

View File

@ -10,6 +10,13 @@ use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
class LocalClient implements ReplicationInterface class LocalClient implements ReplicationInterface
{ {
/**
* Mapping of the presence JSON data for users in each channel
*
* @var string[][]
*/
protected $channelData = [];
/** /**
* Boot the pub/sub provider (open connections, initial subscriptions, etc). * Boot the pub/sub provider (open connections, initial subscriptions, etc).
* *
@ -31,6 +38,7 @@ class LocalClient implements ReplicationInterface
*/ */
public function publish(string $appId, string $channel, stdClass $payload) : bool public function publish(string $appId, string $channel, stdClass $payload) : bool
{ {
// Nothing to do, nobody to publish to
return true; return true;
} }
@ -69,6 +77,7 @@ class LocalClient implements ReplicationInterface
*/ */
public function joinChannel(string $appId, string $channel, string $socketId, string $data) public function joinChannel(string $appId, string $channel, string $socketId, string $data)
{ {
$this->channelData["$appId:$channel"][$socketId] = $data;
} }
/** /**
@ -81,6 +90,10 @@ class LocalClient implements ReplicationInterface
*/ */
public function leaveChannel(string $appId, string $channel, string $socketId) public function leaveChannel(string $appId, string $channel, string $socketId)
{ {
unset($this->channelData["$appId:$channel"][$socketId]);
if (empty($this->channelData["$appId:$channel"])) {
unset($this->channelData["$appId:$channel"]);
}
} }
/** /**
@ -92,7 +105,14 @@ class LocalClient implements ReplicationInterface
*/ */
public function channelMembers(string $appId, string $channel) : PromiseInterface public function channelMembers(string $appId, string $channel) : PromiseInterface
{ {
return new FulfilledPromise(null); $members = $this->channelData["$appId:$channel"] ?? [];
// The data is expected as objects, so we need to JSON decode
$members = array_map(function ($user) {
return json_decode($user);
}, $members);
return new FulfilledPromise($members);
} }
/** /**
@ -104,6 +124,15 @@ class LocalClient implements ReplicationInterface
*/ */
public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface
{ {
return new FulfilledPromise(null); $results = [];
// Count the number of users per channel
foreach ($channelNames as $channel) {
$results[$channel] = isset($this->channelData["$appId:$channel"])
? count($this->channelData["$appId:$channel"])
: 0;
}
return new FulfilledPromise($results);
} }
} }

View File

@ -97,7 +97,7 @@ class RedisClient implements ReplicationInterface
// expect the channel name to not include the app ID. // expect the channel name to not include the app ID.
$payload->channel = Str::after($redisChannel, "$appId:"); $payload->channel = Str::after($redisChannel, "$appId:");
/* @var $channelManager ChannelManager */ /* @var ChannelManager $channelManager */
$channelManager = app(ChannelManager::class); $channelManager = app(ChannelManager::class);
// Load the Channel instance, if any // Load the Channel instance, if any

View File

@ -10,6 +10,16 @@ use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
class PresenceChannel extends Channel class PresenceChannel extends Channel
{ {
/**
* Data for the users connected to this channel
*
* Note: If replication is enabled, this will only contain entries
* for the users directly connected to this server instance. Requests
* for data for all users in the channel should be routed through
* ReplicationInterface.
*
* @var string[]
*/
protected $users = []; protected $users = [];
/** /**
@ -18,21 +28,9 @@ class PresenceChannel extends Channel
*/ */
public function getUsers(string $appId) public function getUsers(string $appId)
{ {
if (config('websockets.replication.enabled') === true) { // Get the members list from the replication backend
// Get the members list from the replication backend return app(ReplicationInterface::class)
return app(ReplicationInterface::class) ->channelMembers($appId, $this->channelName);
->channelMembers($appId, $this->channelName);
}
return $this->users;
}
/**
* @return array
*/
public function getUserCount()
{
return count($this->users);
} }
/** /**
@ -51,36 +49,27 @@ class PresenceChannel extends Channel
$channelData = json_decode($payload->channel_data); $channelData = json_decode($payload->channel_data);
$this->users[$connection->socketId] = $channelData; $this->users[$connection->socketId] = $channelData;
if (config('websockets.replication.enabled') === true) { // Add the connection as a member of the channel
// Add the connection as a member of the channel app(ReplicationInterface::class)
app(ReplicationInterface::class) ->joinChannel(
->joinChannel( $connection->app->id,
$connection->app->id, $this->channelName,
$this->channelName, $connection->socketId,
$connection->socketId, json_encode($channelData)
json_encode($channelData) );
);
// We need to pull the channel data from the replication backend, // We need to pull the channel data from the replication backend,
// otherwise we won't be sending the full details of the channel // otherwise we won't be sending the full details of the channel
app(ReplicationInterface::class) app(ReplicationInterface::class)
->channelMembers($connection->app->id, $this->channelName) ->channelMembers($connection->app->id, $this->channelName)
->then(function ($users) use ($connection) { ->then(function ($users) use ($connection) {
// Send the success event // Send the success event
$connection->send(json_encode([ $connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded', 'event' => 'pusher_internal:subscription_succeeded',
'channel' => $this->channelName, 'channel' => $this->channelName,
'data' => json_encode($this->getChannelData($users)), 'data' => json_encode($this->getChannelData($users)),
])); ]));
}); });
} else {
// Send the success event
$connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded',
'channel' => $this->channelName,
'data' => json_encode($this->getChannelData($this->users)),
]));
}
$this->broadcastToOthers($connection, (object) [ $this->broadcastToOthers($connection, (object) [
'event' => 'pusher_internal:member_added', 'event' => 'pusher_internal:member_added',
@ -97,15 +86,13 @@ class PresenceChannel extends Channel
return; return;
} }
if (config('websockets.replication.enabled') === true) { // Remove the connection as a member of the channel
// Remove the connection as a member of the channel app(ReplicationInterface::class)
app(ReplicationInterface::class) ->leaveChannel(
->leaveChannel( $connection->app->id,
$connection->app->id, $this->channelName,
$this->channelName, $connection->socketId
$connection->socketId );
);
}
$this->broadcastToOthers($connection, (object) [ $this->broadcastToOthers($connection, (object) [
'event' => 'pusher_internal:member_removed', 'event' => 'pusher_internal:member_removed',
@ -124,19 +111,13 @@ class PresenceChannel extends Channel
*/ */
public function toArray(string $appId = null) public function toArray(string $appId = null)
{ {
if (config('websockets.replication.enabled') === true) { return app(ReplicationInterface::class)
return app(ReplicationInterface::class) ->channelMembers($appId, $this->channelName)
->channelMembers($appId, $this->channelName) ->then(function ($users) {
->then(function ($users) { return array_merge(parent::toArray(), [
return array_merge(parent::toArray(), [ 'user_count' => count($users),
'user_count' => count($users), ]);
]); });
});
}
return array_merge(parent::toArray(), [
'user_count' => count($this->users),
]);
} }
protected function getChannelData(array $users): array protected function getChannelData(array $users): array