2019-03-25 22:00:54 +00:00
|
|
|
<?php
|
|
|
|
|
|
2019-07-28 18:50:10 +00:00
|
|
|
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
|
2019-03-25 22:00:54 +00:00
|
|
|
|
2020-08-14 06:14:14 +00:00
|
|
|
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
|
2020-08-13 11:02:58 +00:00
|
|
|
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
|
|
|
|
|
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
|
2019-03-25 22:00:54 +00:00
|
|
|
use Clue\React\Redis\Client;
|
|
|
|
|
use Clue\React\Redis\Factory;
|
2020-08-13 11:02:58 +00:00
|
|
|
use Illuminate\Support\Str;
|
2019-03-25 22:00:54 +00:00
|
|
|
use React\EventLoop\LoopInterface;
|
2019-03-29 19:33:46 +00:00
|
|
|
use React\Promise\PromiseInterface;
|
2020-08-13 11:02:58 +00:00
|
|
|
use stdClass;
|
2019-03-25 22:00:54 +00:00
|
|
|
|
2020-08-17 18:06:51 +00:00
|
|
|
class RedisClient extends LocalClient
|
2019-03-25 22:00:54 +00:00
|
|
|
{
|
|
|
|
|
/**
|
2020-08-14 10:53:14 +00:00
|
|
|
* The running loop.
|
|
|
|
|
*
|
2019-03-25 22:00:54 +00:00
|
|
|
* @var LoopInterface
|
|
|
|
|
*/
|
|
|
|
|
protected $loop;
|
|
|
|
|
|
|
|
|
|
/**
|
2020-08-14 10:53:14 +00:00
|
|
|
* The unique server identifier.
|
|
|
|
|
*
|
2019-03-25 22:00:54 +00:00
|
|
|
* @var string
|
|
|
|
|
*/
|
|
|
|
|
protected $serverId;
|
|
|
|
|
|
|
|
|
|
/**
|
2020-08-14 10:53:14 +00:00
|
|
|
* The pub client.
|
|
|
|
|
*
|
2019-03-25 22:00:54 +00:00
|
|
|
* @var Client
|
|
|
|
|
*/
|
|
|
|
|
protected $publishClient;
|
|
|
|
|
|
|
|
|
|
/**
|
2020-08-14 10:53:14 +00:00
|
|
|
* The sub client.
|
|
|
|
|
*
|
2019-03-25 22:00:54 +00:00
|
|
|
* @var Client
|
|
|
|
|
*/
|
|
|
|
|
protected $subscribeClient;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Mapping of subscribed channels, where the key is the channel name,
|
|
|
|
|
* and the value is the amount of connections which are subscribed to
|
|
|
|
|
* that channel. Used to keep track of whether we still need to stay
|
|
|
|
|
* subscribed to those channels with Redis.
|
|
|
|
|
*
|
|
|
|
|
* @var int[]
|
|
|
|
|
*/
|
|
|
|
|
protected $subscribedChannels = [];
|
|
|
|
|
|
|
|
|
|
/**
|
2020-08-14 10:53:14 +00:00
|
|
|
* Create a new Redis client.
|
|
|
|
|
*
|
|
|
|
|
* @return void
|
2019-03-25 22:00:54 +00:00
|
|
|
*/
|
|
|
|
|
public function __construct()
|
|
|
|
|
{
|
|
|
|
|
$this->serverId = Str::uuid()->toString();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2019-03-25 22:37:14 +00:00
|
|
|
* Boot the RedisClient, initializing the connections.
|
2019-03-25 22:00:54 +00:00
|
|
|
*
|
2020-08-14 05:42:17 +00:00
|
|
|
* @param LoopInterface $loop
|
2020-08-14 12:35:36 +00:00
|
|
|
* @param string|null $factoryClass
|
2019-03-25 22:00:54 +00:00
|
|
|
* @return ReplicationInterface
|
|
|
|
|
*/
|
2020-08-14 12:35:36 +00:00
|
|
|
public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInterface
|
2019-03-25 22:00:54 +00:00
|
|
|
{
|
2020-08-14 12:35:36 +00:00
|
|
|
$factoryClass = $factoryClass ?: Factory::class;
|
|
|
|
|
|
2019-03-25 22:00:54 +00:00
|
|
|
$this->loop = $loop;
|
|
|
|
|
|
|
|
|
|
$connectionUri = $this->getConnectionUri();
|
2020-08-14 12:35:36 +00:00
|
|
|
$factory = new $factoryClass($this->loop);
|
2019-03-25 22:00:54 +00:00
|
|
|
|
|
|
|
|
$this->publishClient = $factory->createLazyClient($connectionUri);
|
|
|
|
|
$this->subscribeClient = $factory->createLazyClient($connectionUri);
|
|
|
|
|
|
2020-08-14 10:53:14 +00:00
|
|
|
// The subscribed client gets a message, it triggers the onMessage().
|
2019-03-25 22:00:54 +00:00
|
|
|
$this->subscribeClient->on('message', function ($channel, $payload) {
|
|
|
|
|
$this->onMessage($channel, $payload);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return $this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2020-08-17 18:06:51 +00:00
|
|
|
* Publish a message to a channel on behalf of a websocket user.
|
2019-03-25 22:00:54 +00:00
|
|
|
*
|
2020-08-17 18:06:51 +00:00
|
|
|
* @param string $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @param stdClass $payload
|
|
|
|
|
* @return bool
|
2019-03-25 22:00:54 +00:00
|
|
|
*/
|
2020-08-18 17:21:22 +00:00
|
|
|
public function publish($appId, string $channel, stdClass $payload): bool
|
2019-03-25 22:00:54 +00:00
|
|
|
{
|
2020-08-17 18:06:51 +00:00
|
|
|
$payload->appId = $appId;
|
|
|
|
|
$payload->serverId = $this->getServerId();
|
2019-03-29 14:22:36 +00:00
|
|
|
|
2020-08-17 18:06:51 +00:00
|
|
|
$payload = json_encode($payload);
|
2019-03-25 22:00:54 +00:00
|
|
|
|
2020-08-24 11:06:58 +00:00
|
|
|
$this->publishClient->__call('publish', [$this->getTopicName($appId, $channel), $payload]);
|
2019-03-25 22:00:54 +00:00
|
|
|
|
2020-08-17 18:06:51 +00:00
|
|
|
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [
|
|
|
|
|
'channel' => $channel,
|
|
|
|
|
'serverId' => $this->getServerId(),
|
|
|
|
|
'payload' => $payload,
|
2020-08-24 11:06:58 +00:00
|
|
|
'pubsub' => $this->getTopicName($appId, $channel),
|
2020-08-17 18:06:51 +00:00
|
|
|
]);
|
2019-03-25 22:00:54 +00:00
|
|
|
|
2020-08-17 18:06:51 +00:00
|
|
|
return true;
|
2019-03-25 22:00:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2019-03-25 22:37:14 +00:00
|
|
|
* Subscribe to a channel on behalf of websocket user.
|
2019-03-25 22:00:54 +00:00
|
|
|
*
|
2020-08-14 05:42:17 +00:00
|
|
|
* @param string $appId
|
|
|
|
|
* @param string $channel
|
2019-03-25 22:00:54 +00:00
|
|
|
* @return bool
|
|
|
|
|
*/
|
2020-08-18 17:21:22 +00:00
|
|
|
public function subscribe($appId, string $channel): bool
|
2019-03-25 22:00:54 +00:00
|
|
|
{
|
2020-08-17 18:24:18 +00:00
|
|
|
if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
|
2019-03-25 22:00:54 +00:00
|
|
|
// We're not subscribed to the channel yet, subscribe and set the count to 1
|
2020-08-24 11:06:58 +00:00
|
|
|
$this->subscribeClient->__call('subscribe', [$this->getTopicName($appId, $channel)]);
|
2020-08-17 18:24:18 +00:00
|
|
|
$this->subscribedChannels["{$appId}:{$channel}"] = 1;
|
2019-03-25 22:00:54 +00:00
|
|
|
} else {
|
|
|
|
|
// Increment the subscribe count if we've already subscribed
|
2020-08-17 18:24:18 +00:00
|
|
|
$this->subscribedChannels["{$appId}:{$channel}"]++;
|
2019-03-25 22:00:54 +00:00
|
|
|
}
|
|
|
|
|
|
2020-08-17 18:06:51 +00:00
|
|
|
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
|
|
|
|
|
'channel' => $channel,
|
|
|
|
|
'serverId' => $this->getServerId(),
|
2020-08-24 11:06:58 +00:00
|
|
|
'pubsub' => $this->getTopicName($appId, $channel),
|
2020-08-17 18:06:51 +00:00
|
|
|
]);
|
2020-08-14 06:14:14 +00:00
|
|
|
|
2019-03-25 22:00:54 +00:00
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2019-03-25 22:37:14 +00:00
|
|
|
* Unsubscribe from a channel on behalf of a websocket user.
|
2019-03-25 22:00:54 +00:00
|
|
|
*
|
2020-08-14 05:42:17 +00:00
|
|
|
* @param string $appId
|
|
|
|
|
* @param string $channel
|
2019-03-25 22:00:54 +00:00
|
|
|
* @return bool
|
|
|
|
|
*/
|
2020-08-18 17:21:22 +00:00
|
|
|
public function unsubscribe($appId, string $channel): bool
|
2019-03-25 22:00:54 +00:00
|
|
|
{
|
2020-08-17 18:24:18 +00:00
|
|
|
if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
|
2019-03-25 22:00:54 +00:00
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Decrement the subscription count for this channel
|
2020-08-17 18:24:18 +00:00
|
|
|
$this->subscribedChannels["{$appId}:{$channel}"]--;
|
2019-03-25 22:00:54 +00:00
|
|
|
|
|
|
|
|
// If we no longer have subscriptions to that channel, unsubscribe
|
2020-08-17 18:24:18 +00:00
|
|
|
if ($this->subscribedChannels["{$appId}:{$channel}"] < 1) {
|
|
|
|
|
$this->subscribeClient->__call('unsubscribe', ["{$appId}:{$channel}"]);
|
2020-08-14 06:14:14 +00:00
|
|
|
|
2020-08-17 18:24:18 +00:00
|
|
|
unset($this->subscribedChannels["{$appId}:{$channel}"]);
|
2019-03-25 22:00:54 +00:00
|
|
|
}
|
|
|
|
|
|
2020-08-17 18:06:51 +00:00
|
|
|
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
|
|
|
|
|
'channel' => $channel,
|
|
|
|
|
'serverId' => $this->getServerId(),
|
2020-08-24 11:06:58 +00:00
|
|
|
'pubsub' => $this->getTopicName($appId, $channel),
|
2020-08-17 18:06:51 +00:00
|
|
|
]);
|
2019-03-25 22:00:54 +00:00
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2019-03-29 19:33:46 +00:00
|
|
|
/**
|
|
|
|
|
* Add a member to a channel. To be called when they have
|
|
|
|
|
* subscribed to the channel.
|
|
|
|
|
*
|
2020-08-14 05:42:17 +00:00
|
|
|
* @param string $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @param string $socketId
|
|
|
|
|
* @param string $data
|
|
|
|
|
* @return void
|
2019-03-29 19:33:46 +00:00
|
|
|
*/
|
2020-08-18 17:21:22 +00:00
|
|
|
public function joinChannel($appId, string $channel, string $socketId, string $data)
|
2019-03-29 19:33:46 +00:00
|
|
|
{
|
2020-08-17 18:24:18 +00:00
|
|
|
$this->publishClient->__call('hset', ["{$appId}:{$channel}", $socketId, $data]);
|
2020-08-17 18:06:51 +00:00
|
|
|
|
|
|
|
|
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [
|
|
|
|
|
'channel' => $channel,
|
|
|
|
|
'serverId' => $this->getServerId(),
|
|
|
|
|
'socketId' => $socketId,
|
|
|
|
|
'data' => $data,
|
2020-08-24 11:06:58 +00:00
|
|
|
'pubsub' => $this->getTopicName($appId, $channel),
|
2020-08-17 18:06:51 +00:00
|
|
|
]);
|
2019-03-29 19:33:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove a member from the channel. To be called when they have
|
|
|
|
|
* unsubscribed from the channel.
|
|
|
|
|
*
|
2020-08-14 05:42:17 +00:00
|
|
|
* @param string $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @param string $socketId
|
|
|
|
|
* @return void
|
2019-03-29 19:33:46 +00:00
|
|
|
*/
|
2020-08-18 17:21:22 +00:00
|
|
|
public function leaveChannel($appId, string $channel, string $socketId)
|
2019-03-29 19:33:46 +00:00
|
|
|
{
|
2020-08-24 11:06:58 +00:00
|
|
|
$this->publishClient->__call('hdel', [$this->getTopicName($appId, $channel), $socketId]);
|
2020-08-17 18:06:51 +00:00
|
|
|
|
|
|
|
|
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
|
|
|
|
|
'channel' => $channel,
|
|
|
|
|
'serverId' => $this->getServerId(),
|
|
|
|
|
'socketId' => $socketId,
|
2020-08-24 11:06:58 +00:00
|
|
|
'pubsub' => $this->getTopicName($appId, $channel),
|
2020-08-17 18:06:51 +00:00
|
|
|
]);
|
2019-03-29 19:33:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Retrieve the full information about the members in a presence channel.
|
|
|
|
|
*
|
2020-08-14 05:42:17 +00:00
|
|
|
* @param string $appId
|
|
|
|
|
* @param string $channel
|
2019-03-29 19:33:46 +00:00
|
|
|
* @return PromiseInterface
|
|
|
|
|
*/
|
2020-08-18 17:21:22 +00:00
|
|
|
public function channelMembers($appId, string $channel): PromiseInterface
|
2019-03-29 19:33:46 +00:00
|
|
|
{
|
2020-08-24 11:06:58 +00:00
|
|
|
return $this->publishClient->__call('hgetall', [$this->getTopicName($appId, $channel)])
|
2019-03-29 19:33:46 +00:00
|
|
|
->then(function ($members) {
|
|
|
|
|
// The data is expected as objects, so we need to JSON decode
|
2019-04-05 19:30:41 +00:00
|
|
|
return array_map(function ($user) {
|
2019-03-29 19:33:46 +00:00
|
|
|
return json_decode($user);
|
2019-04-05 19:30:41 +00:00
|
|
|
}, $members);
|
2019-03-29 19:33:46 +00:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the amount of users subscribed for each presence channel.
|
|
|
|
|
*
|
2020-08-14 05:42:17 +00:00
|
|
|
* @param string $appId
|
|
|
|
|
* @param array $channelNames
|
2019-03-29 19:33:46 +00:00
|
|
|
* @return PromiseInterface
|
|
|
|
|
*/
|
2020-08-18 17:21:22 +00:00
|
|
|
public function channelMemberCounts($appId, array $channelNames): PromiseInterface
|
2019-03-29 19:33:46 +00:00
|
|
|
{
|
|
|
|
|
$this->publishClient->__call('multi', []);
|
|
|
|
|
|
|
|
|
|
foreach ($channelNames as $channel) {
|
2020-08-24 11:06:58 +00:00
|
|
|
$this->publishClient->__call('hlen', [$this->getTopicName($appId, $channel)]);
|
2019-03-29 19:33:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return $this->publishClient->__call('exec', [])
|
|
|
|
|
->then(function ($data) use ($channelNames) {
|
|
|
|
|
return array_combine($channelNames, $data);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2020-08-17 18:06:51 +00:00
|
|
|
/**
|
|
|
|
|
* Handle a message received from Redis on a specific channel.
|
|
|
|
|
*
|
|
|
|
|
* @param string $redisChannel
|
|
|
|
|
* @param string $payload
|
|
|
|
|
* @return void
|
|
|
|
|
*/
|
2020-08-22 19:22:45 +00:00
|
|
|
public function onMessage(string $redisChannel, string $payload)
|
2020-08-17 18:06:51 +00:00
|
|
|
{
|
|
|
|
|
$payload = json_decode($payload);
|
|
|
|
|
|
|
|
|
|
// Ignore messages sent by ourselves.
|
|
|
|
|
if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Pull out the app ID. See RedisPusherBroadcaster
|
|
|
|
|
$appId = $payload->appId;
|
|
|
|
|
|
|
|
|
|
// We need to put the channel name in the payload.
|
|
|
|
|
// We strip the app ID from the channel name, websocket clients
|
|
|
|
|
// expect the channel name to not include the app ID.
|
|
|
|
|
$payload->channel = Str::after($redisChannel, "{$appId}:");
|
|
|
|
|
|
|
|
|
|
$channelManager = app(ChannelManager::class);
|
|
|
|
|
|
|
|
|
|
// Load the Channel instance to sync.
|
|
|
|
|
$channel = $channelManager->find($appId, $payload->channel);
|
|
|
|
|
|
|
|
|
|
// If no channel is found, none of our connections want to
|
|
|
|
|
// receive this message, so we ignore it.
|
|
|
|
|
if (! $channel) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$socket = $payload->socket ?? null;
|
|
|
|
|
$serverId = $payload->serverId ?? null;
|
|
|
|
|
|
|
|
|
|
// Remove fields intended for internal use from the payload.
|
|
|
|
|
unset($payload->socket);
|
|
|
|
|
unset($payload->serverId);
|
|
|
|
|
unset($payload->appId);
|
|
|
|
|
|
|
|
|
|
// Push the message out to connected websocket clients.
|
|
|
|
|
$channel->broadcastToEveryoneExcept($payload, $socket, $appId, false);
|
|
|
|
|
|
|
|
|
|
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [
|
|
|
|
|
'channel' => $channel->getChannelName(),
|
|
|
|
|
'redisChannel' => $redisChannel,
|
2020-08-22 17:37:00 +00:00
|
|
|
'serverId' => $this->getServerId(),
|
2020-08-17 18:06:51 +00:00
|
|
|
'incomingServerId' => $serverId,
|
|
|
|
|
'incomingSocketId' => $socket,
|
|
|
|
|
'payload' => $payload,
|
|
|
|
|
]);
|
|
|
|
|
}
|
|
|
|
|
|
2019-03-25 22:00:54 +00:00
|
|
|
/**
|
2019-03-25 22:37:14 +00:00
|
|
|
* Build the Redis connection URL from Laravel database config.
|
2019-03-25 22:00:54 +00:00
|
|
|
*
|
|
|
|
|
* @return string
|
|
|
|
|
*/
|
|
|
|
|
protected function getConnectionUri()
|
|
|
|
|
{
|
2020-08-13 16:20:29 +00:00
|
|
|
$name = config('websockets.replication.redis.connection') ?: 'default';
|
2020-08-13 13:18:14 +00:00
|
|
|
$config = config('database.redis')[$name];
|
|
|
|
|
|
2019-03-25 22:00:54 +00:00
|
|
|
$host = $config['host'];
|
2020-08-13 13:18:14 +00:00
|
|
|
$port = $config['port'] ?: 6379;
|
2019-03-25 22:00:54 +00:00
|
|
|
|
|
|
|
|
$query = [];
|
2020-08-13 13:18:14 +00:00
|
|
|
|
2019-03-25 22:00:54 +00:00
|
|
|
if ($config['password']) {
|
|
|
|
|
$query['password'] = $config['password'];
|
|
|
|
|
}
|
2020-08-13 13:18:14 +00:00
|
|
|
|
2019-03-25 22:00:54 +00:00
|
|
|
if ($config['database']) {
|
|
|
|
|
$query['database'] = $config['database'];
|
|
|
|
|
}
|
2020-08-13 13:18:14 +00:00
|
|
|
|
2019-03-25 22:00:54 +00:00
|
|
|
$query = http_build_query($query);
|
|
|
|
|
|
2020-08-13 13:18:14 +00:00
|
|
|
return "redis://{$host}:{$port}".($query ? "?{$query}" : '');
|
2019-03-25 22:00:54 +00:00
|
|
|
}
|
2020-08-14 12:35:36 +00:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the Subscribe client instance.
|
|
|
|
|
*
|
|
|
|
|
* @return Client
|
|
|
|
|
*/
|
|
|
|
|
public function getSubscribeClient()
|
|
|
|
|
{
|
|
|
|
|
return $this->subscribeClient;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the Publish client instance.
|
|
|
|
|
*
|
|
|
|
|
* @return Client
|
|
|
|
|
*/
|
|
|
|
|
public function getPublishClient()
|
|
|
|
|
{
|
|
|
|
|
return $this->publishClient;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the unique identifier for the server.
|
|
|
|
|
*
|
|
|
|
|
* @return string
|
|
|
|
|
*/
|
|
|
|
|
public function getServerId()
|
|
|
|
|
{
|
|
|
|
|
return $this->serverId;
|
|
|
|
|
}
|
2020-08-24 11:06:58 +00:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the Pub/Sub Topic name to subscribe based on the
|
|
|
|
|
* app ID and channel name.
|
|
|
|
|
*
|
|
|
|
|
* @param mixed $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @return string
|
|
|
|
|
*/
|
|
|
|
|
protected function getTopicName($appId, string $channel): string
|
|
|
|
|
{
|
|
|
|
|
$prefix = config('database.redis.options.prefix', null);
|
|
|
|
|
|
|
|
|
|
return "{$prefix}{$appId}:{$channel}";
|
|
|
|
|
}
|
2019-03-25 22:00:54 +00:00
|
|
|
}
|