Implement presence channel storage in Redis

This commit is contained in:
Francis Lavoie 2019-03-29 15:33:46 -04:00
parent 87c00fb340
commit 4baac7ef00
No known key found for this signature in database
GPG Key ID: B9E0E04A76AF4692
8 changed files with 287 additions and 31 deletions

View File

@ -11,6 +11,7 @@ use Ratchet\ConnectionInterface;
use Illuminate\Http\JsonResponse; use Illuminate\Http\JsonResponse;
use GuzzleHttp\Psr7\ServerRequest; use GuzzleHttp\Psr7\ServerRequest;
use Illuminate\Support\Collection; use Illuminate\Support\Collection;
use React\Promise\PromiseInterface;
use Ratchet\Http\HttpServerInterface; use Ratchet\Http\HttpServerInterface;
use Psr\Http\Message\RequestInterface; use Psr\Http\Message\RequestInterface;
use BeyondCode\LaravelWebSockets\Apps\App; use BeyondCode\LaravelWebSockets\Apps\App;
@ -30,7 +31,7 @@ abstract class Controller implements HttpServerInterface
/** @var int */ /** @var int */
protected $contentLength; protected $contentLength;
/** @var \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager */ /** @var ChannelManager */
protected $channelManager; protected $channelManager;
public function __construct(ChannelManager $channelManager) public function __construct(ChannelManager $channelManager)
@ -92,8 +93,23 @@ abstract class Controller implements HttpServerInterface
->ensureValidAppId($laravelRequest->appId) ->ensureValidAppId($laravelRequest->appId)
->ensureValidSignature($laravelRequest); ->ensureValidSignature($laravelRequest);
// Invoke the controller action
$response = $this($laravelRequest); $response = $this($laravelRequest);
// Allow for async IO in the controller action
if ($response instanceof PromiseInterface) {
$response->then(function ($response) use ($connection) {
$this->sendAndClose($connection, $response);
});
return;
}
$this->sendAndClose($connection, $response);
}
protected function sendAndClose(ConnectionInterface $connection, $response)
{
$connection->send(JsonResponse::create($response)); $connection->send(JsonResponse::create($response));
$connection->close(); $connection->close();
} }

View File

@ -5,6 +5,9 @@ namespace BeyondCode\LaravelWebSockets\HttpApi\Controllers;
use Illuminate\Support\Str; use Illuminate\Support\Str;
use Illuminate\Http\Request; use Illuminate\Http\Request;
use Illuminate\Support\Collection; use Illuminate\Support\Collection;
use React\Promise\PromiseInterface;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel;
use Symfony\Component\HttpKernel\Exception\HttpException; use Symfony\Component\HttpKernel\Exception\HttpException;
class FetchChannelsController extends Controller class FetchChannelsController extends Controller
@ -29,11 +32,40 @@ class FetchChannelsController extends Controller
}); });
} }
if (config('websockets.replication.enabled') === true) {
// We want to get the channel user count all in one shot when
// using a replication backend rather than doing individual queries.
// To do so, we first collect the list of channel names.
$channelNames = $channels->map(function (PresenceChannel $channel) use ($request) {
return $channel->getChannelName();
})->toArray();
/** @var PromiseInterface $memberCounts */
// We ask the replication backend to get us the member count per channel
$memberCounts = app(ReplicationInterface::class)
->channelMemberCounts($request->appId, $channelNames);
// We return a promise since the backend runs async. We get $counts back
// as a key-value array of channel names and their member count.
return $memberCounts->then(function (array $counts) use ($channels) {
return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) use ($counts) {
return $counts[$channel->getChannelName()];
});
});
}
return $this->collectUserCounts($channels, $attributes, function (PresenceChannel $channel) {
return $channel->getUserCount();
});
}
protected function collectUserCounts(Collection $channels, array $attributes, callable $transformer)
{
return [ return [
'channels' => $channels->map(function ($channel) use ($attributes) { 'channels' => $channels->map(function (PresenceChannel $channel) use ($transformer, $attributes) {
$info = new \stdClass; $info = new \stdClass;
if (in_array('user_count', $attributes)) { if (in_array('user_count', $attributes)) {
$info->user_count = count($channel->getUsers()); $info->user_count = $transformer($channel);
} }
return $info; return $info;

View File

@ -4,6 +4,7 @@ namespace BeyondCode\LaravelWebSockets\HttpApi\Controllers;
use Illuminate\Http\Request; use Illuminate\Http\Request;
use Illuminate\Support\Collection; use Illuminate\Support\Collection;
use React\Promise\PromiseInterface;
use Symfony\Component\HttpKernel\Exception\HttpException; use Symfony\Component\HttpKernel\Exception\HttpException;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel; use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel;
@ -21,8 +22,21 @@ class FetchUsersController extends Controller
throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"'); throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"');
} }
$users = $channel->getUsers($request->appId);
if ($users instanceof PromiseInterface) {
return $users->then(function (array $users) {
return $this->collectUsers($users);
});
}
return $this->collectUsers($users);
}
protected function collectUsers(array $users)
{
return [ return [
'users' => Collection::make($channel->getUsers())->map(function ($user) { 'users' => Collection::make($users)->map(function ($user) {
return ['id' => $user->user_id]; return ['id' => $user->user_id];
})->values(), })->values(),
]; ];

View File

@ -7,6 +7,7 @@ use Illuminate\Support\Str;
use Clue\React\Redis\Client; use Clue\React\Redis\Client;
use Clue\React\Redis\Factory; use Clue\React\Redis\Factory;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
@ -183,6 +184,72 @@ class RedisClient implements ReplicationInterface
return true; return true;
} }
/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
*
* @param string $appId
* @param string $channel
* @param string $socketId
* @param string $data
*/
public function joinChannel(string $appId, string $channel, string $socketId, string $data)
{
$this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]);
}
/**
* Remove a member from the channel. To be called when they have
* unsubscribed from the channel.
*
* @param string $appId
* @param string $channel
* @param string $socketId
*/
public function leaveChannel(string $appId, string $channel, string $socketId)
{
$this->publishClient->__call('hdel', ["$appId:$channel", $socketId]);
}
/**
* Retrieve the full information about the members in a presence channel.
*
* @param string $appId
* @param string $channel
* @return PromiseInterface
*/
public function channelMembers(string $appId, string $channel): PromiseInterface
{
return $this->publishClient->__call('hgetall', ["$appId:$channel"])
->then(function ($members) {
// The data is expected as objects, so we need to JSON decode
return array_walk($members, function ($user) {
return json_decode($user);
});
});
}
/**
* Get the amount of users subscribed for each presence channel.
*
* @param string $appId
* @param array $channelNames
* @return PromiseInterface
*/
public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface
{
$this->publishClient->__call('multi', []);
foreach ($channelNames as $channel) {
$this->publishClient->__call('hlen', ["$appId:$channel"]);
}
return $this->publishClient->__call('exec', [])
->then(function ($data) use ($channelNames) {
return array_combine($channelNames, $data);
});
}
/** /**
* Build the Redis connection URL from Laravel database config. * Build the Redis connection URL from Laravel database config.
* *

View File

@ -4,6 +4,7 @@ namespace BeyondCode\LaravelWebSockets\PubSub;
use stdClass; use stdClass;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;
interface ReplicationInterface interface ReplicationInterface
{ {
@ -40,4 +41,43 @@ interface ReplicationInterface
* @return bool * @return bool
*/ */
public function unsubscribe(string $appId, string $channel): bool; public function unsubscribe(string $appId, string $channel): bool;
/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
*
* @param string $appId
* @param string $channel
* @param string $socketId
* @param string $data
*/
public function joinChannel(string $appId, string $channel, string $socketId, string $data);
/**
* Remove a member from the channel. To be called when they have
* unsubscribed from the channel.
*
* @param string $appId
* @param string $channel
* @param string $socketId
*/
public function leaveChannel(string $appId, string $channel, string $socketId);
/**
* Retrieve the full information about the members in a presence channel.
*
* @param string $appId
* @param string $channel
* @return PromiseInterface
*/
public function channelMembers(string $appId, string $channel): PromiseInterface;
/**
* Get the amount of users subscribed for each presence channel.
*
* @param string $appId
* @param array $channelNames
* @return PromiseInterface
*/
public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface;
} }

View File

@ -22,6 +22,11 @@ class Channel
$this->channelName = $channelName; $this->channelName = $channelName;
} }
public function getChannelName(): string
{
return $this->channelName;
}
public function hasConnections(): bool public function hasConnections(): bool
{ {
return count($this->subscribedConnections) > 0; return count($this->subscribedConnections) > 0;
@ -32,6 +37,9 @@ class Channel
return $this->subscribedConnections; return $this->subscribedConnections;
} }
/**
* @throws InvalidSignature
*/
protected function verifySignature(ConnectionInterface $connection, stdClass $payload) protected function verifySignature(ConnectionInterface $connection, stdClass $payload)
{ {
$signature = "{$connection->socketId}:{$this->channelName}"; $signature = "{$connection->socketId}:{$this->channelName}";
@ -40,12 +48,15 @@ class Channel
$signature .= ":{$payload->channel_data}"; $signature .= ":{$payload->channel_data}";
} }
if (Str::after($payload->auth, ':') !== hash_hmac('sha256', $signature, $connection->app->secret)) { if (! hash_equals(
hash_hmac('sha256', $signature, $connection->app->secret),
Str::after($payload->auth, ':'))
) {
throw new InvalidSignature(); throw new InvalidSignature();
} }
} }
/* /**
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events * @link https://pusher.com/docs/pusher_protocol#presence-channel-events
*/ */
public function subscribe(ConnectionInterface $connection, stdClass $payload) public function subscribe(ConnectionInterface $connection, stdClass $payload)
@ -128,7 +139,7 @@ class Channel
} }
} }
public function toArray(): array public function toArray()
{ {
return [ return [
'occupied' => count($this->subscribedConnections) > 0, 'occupied' => count($this->subscribedConnections) > 0,

View File

@ -4,18 +4,43 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
use stdClass; use stdClass;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use React\Promise\PromiseInterface;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
class PresenceChannel extends Channel class PresenceChannel extends Channel
{ {
protected $users = []; protected $users = [];
public function getUsers(): array /**
* @param string $appId
* @return array|PromiseInterface
*/
public function getUsers(string $appId)
{ {
if (config('websockets.replication.enabled') === true) {
// Get the members list from the replication backend
return app(ReplicationInterface::class)
->channelMembers($appId, $this->channelName);
}
return $this->users; return $this->users;
} }
/* /**
* @return array
*/
public function getUserCount()
{
return count($this->users);
}
/**
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events * @link https://pusher.com/docs/pusher_protocol#presence-channel-events
*
* @param ConnectionInterface $connection
* @param stdClass $payload
* @throws InvalidSignature
*/ */
public function subscribe(ConnectionInterface $connection, stdClass $payload) public function subscribe(ConnectionInterface $connection, stdClass $payload)
{ {
@ -26,12 +51,36 @@ class PresenceChannel extends Channel
$channelData = json_decode($payload->channel_data); $channelData = json_decode($payload->channel_data);
$this->users[$connection->socketId] = $channelData; $this->users[$connection->socketId] = $channelData;
// Send the success event if (config('websockets.replication.enabled') === true) {
$connection->send(json_encode([ // Add the connection as a member of the channel
'event' => 'pusher_internal:subscription_succeeded', app(ReplicationInterface::class)
'channel' => $this->channelName, ->joinChannel(
'data' => json_encode($this->getChannelData()), $connection->app->id,
])); $this->channelName,
$connection->socketId,
json_encode($channelData)
);
// We need to pull the channel data from the replication backend,
// otherwise we won't be sending the full details of the channel
app(ReplicationInterface::class)
->channelMembers($connection->app->id, $this->channelName)
->then(function ($users) use ($connection) {
// Send the success event
$connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded',
'channel' => $this->channelName,
'data' => json_encode($this->getChannelData($users)),
]));
});
} else {
// Send the success event
$connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded',
'channel' => $this->channelName,
'data' => json_encode($this->getChannelData($this->users)),
]));
}
$this->broadcastToOthers($connection, [ $this->broadcastToOthers($connection, [
'event' => 'pusher_internal:member_added', 'event' => 'pusher_internal:member_added',
@ -48,6 +97,16 @@ class PresenceChannel extends Channel
return; return;
} }
if (config('websockets.replication.enabled') === true) {
// Remove the connection as a member of the channel
app(ReplicationInterface::class)
->leaveChannel(
$connection->app->id,
$this->channelName,
$connection->socketId
);
}
$this->broadcastToOthers($connection, [ $this->broadcastToOthers($connection, [
'event' => 'pusher_internal:member_removed', 'event' => 'pusher_internal:member_removed',
'channel' => $this->channelName, 'channel' => $this->channelName,
@ -59,38 +118,51 @@ class PresenceChannel extends Channel
unset($this->users[$connection->socketId]); unset($this->users[$connection->socketId]);
} }
protected function getChannelData(): array /**
* @return PromiseInterface|array
*/
public function toArray(string $appId = null)
{ {
return [ if (config('websockets.replication.enabled') === true) {
'presence' => [ return app(ReplicationInterface::class)
'ids' => $this->getUserIds(), ->channelMembers($appId, $this->channelName)
'hash' => $this->getHash(), ->then(function ($users) {
'count' => count($this->users), return array_merge(parent::toArray(), [
], 'user_count' => count($users),
]; ]);
} });
}
public function toArray(): array
{
return array_merge(parent::toArray(), [ return array_merge(parent::toArray(), [
'user_count' => count($this->users), 'user_count' => count($this->users),
]); ]);
} }
protected function getUserIds(): array protected function getChannelData(array $users): array
{
return [
'presence' => [
'ids' => $this->getUserIds($users),
'hash' => $this->getHash($users),
'count' => count($users),
],
];
}
protected function getUserIds(array $users): array
{ {
$userIds = array_map(function ($channelData) { $userIds = array_map(function ($channelData) {
return (string) $channelData->user_id; return (string) $channelData->user_id;
}, $this->users); }, $users);
return array_values($userIds); return array_values($userIds);
} }
protected function getHash(): array protected function getHash(array $users): array
{ {
$hash = []; $hash = [];
foreach ($this->users as $socketId => $channelData) { foreach ($users as $socketId => $channelData) {
$hash[$channelData->user_id] = $channelData->user_info; $hash[$channelData->user_id] = $channelData->user_info;
} }

View File

@ -4,9 +4,13 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;
use stdClass; use stdClass;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature;
class PrivateChannel extends Channel class PrivateChannel extends Channel
{ {
/**
* @throws InvalidSignature
*/
public function subscribe(ConnectionInterface $connection, stdClass $payload) public function subscribe(ConnectionInterface $connection, stdClass $payload)
{ {
$this->verifySignature($connection, $payload); $this->verifySignature($connection, $payload);