2020-09-10 19:59:26 +00:00
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
namespace BeyondCode\LaravelWebSockets\ChannelManagers;
|
|
|
|
|
|
|
|
|
|
use BeyondCode\LaravelWebSockets\Channels\Channel;
|
2020-10-05 06:25:20 +00:00
|
|
|
use BeyondCode\LaravelWebSockets\DashboardLogger;
|
2020-09-15 09:30:17 +00:00
|
|
|
use BeyondCode\LaravelWebSockets\Helpers;
|
|
|
|
|
use BeyondCode\LaravelWebSockets\Server\MockableConnection;
|
|
|
|
|
use Carbon\Carbon;
|
2020-09-10 19:59:26 +00:00
|
|
|
use Clue\React\Redis\Client;
|
|
|
|
|
use Clue\React\Redis\Factory;
|
2020-09-15 09:30:17 +00:00
|
|
|
use Illuminate\Cache\RedisLock;
|
|
|
|
|
use Illuminate\Support\Facades\Redis;
|
2020-09-10 19:59:49 +00:00
|
|
|
use Illuminate\Support\Str;
|
2020-09-10 19:59:26 +00:00
|
|
|
use Ratchet\ConnectionInterface;
|
|
|
|
|
use React\EventLoop\LoopInterface;
|
2020-09-10 19:59:49 +00:00
|
|
|
use React\Promise\PromiseInterface;
|
|
|
|
|
use stdClass;
|
2020-09-10 19:59:26 +00:00
|
|
|
|
|
|
|
|
class RedisChannelManager extends LocalChannelManager
|
|
|
|
|
{
|
|
|
|
|
/**
|
|
|
|
|
* The running loop.
|
|
|
|
|
*
|
|
|
|
|
* @var LoopInterface
|
|
|
|
|
*/
|
|
|
|
|
protected $loop;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The unique server identifier.
|
|
|
|
|
*
|
|
|
|
|
* @var string
|
|
|
|
|
*/
|
|
|
|
|
protected $serverId;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The pub client.
|
|
|
|
|
*
|
|
|
|
|
* @var Client
|
|
|
|
|
*/
|
|
|
|
|
protected $publishClient;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The sub client.
|
|
|
|
|
*
|
|
|
|
|
* @var Client
|
|
|
|
|
*/
|
|
|
|
|
protected $subscribeClient;
|
|
|
|
|
|
2020-09-15 09:30:17 +00:00
|
|
|
/**
|
|
|
|
|
* The Redis manager instance.
|
|
|
|
|
*
|
|
|
|
|
* @var \Illuminate\Redis\RedisManager
|
|
|
|
|
*/
|
|
|
|
|
protected $redis;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The lock name to use on Redis to avoid multiple
|
|
|
|
|
* actions that might lead to multiple processings.
|
|
|
|
|
*
|
|
|
|
|
* @var string
|
|
|
|
|
*/
|
2020-09-19 15:38:08 +00:00
|
|
|
protected static $lockName = 'laravel-websockets:channel-manager:lock';
|
2020-09-15 09:30:17 +00:00
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
/**
|
|
|
|
|
* Create a new channel manager instance.
|
|
|
|
|
*
|
|
|
|
|
* @param LoopInterface $loop
|
|
|
|
|
* @param string|null $factoryClass
|
|
|
|
|
* @return void
|
|
|
|
|
*/
|
|
|
|
|
public function __construct(LoopInterface $loop, $factoryClass = null)
|
|
|
|
|
{
|
|
|
|
|
$this->loop = $loop;
|
|
|
|
|
|
2020-09-15 09:30:17 +00:00
|
|
|
$this->redis = Redis::connection(
|
|
|
|
|
config('websockets.replication.modes.redis.connection', 'default')
|
|
|
|
|
);
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
$connectionUri = $this->getConnectionUri();
|
|
|
|
|
|
|
|
|
|
$factoryClass = $factoryClass ?: Factory::class;
|
|
|
|
|
$factory = new $factoryClass($this->loop);
|
|
|
|
|
|
|
|
|
|
$this->publishClient = $factory->createLazyClient($connectionUri);
|
|
|
|
|
$this->subscribeClient = $factory->createLazyClient($connectionUri);
|
|
|
|
|
|
|
|
|
|
$this->subscribeClient->on('message', function ($channel, $payload) {
|
|
|
|
|
$this->onMessage($channel, $payload);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
$this->serverId = Str::uuid()->toString();
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-12 14:45:07 +00:00
|
|
|
/**
|
|
|
|
|
* Get the local connections, regardless of the channel
|
|
|
|
|
* they are connected to.
|
|
|
|
|
*
|
|
|
|
|
* @return \React\Promise\PromiseInterface
|
|
|
|
|
*/
|
|
|
|
|
public function getLocalConnections(): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return parent::getLocalConnections();
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
/**
|
|
|
|
|
* Get all channels for a specific app
|
|
|
|
|
* for the current instance.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @return \React\Promise\PromiseInterface[array]
|
|
|
|
|
*/
|
|
|
|
|
public function getLocalChannels($appId): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return parent::getLocalChannels($appId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get all channels for a specific app
|
|
|
|
|
* across multiple servers.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @return \React\Promise\PromiseInterface[array]
|
|
|
|
|
*/
|
|
|
|
|
public function getGlobalChannels($appId): PromiseInterface
|
|
|
|
|
{
|
2020-09-15 14:03:17 +00:00
|
|
|
return $this->publishClient->smembers(
|
2020-09-10 19:59:26 +00:00
|
|
|
$this->getRedisKey($appId, null, ['channels'])
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove connection from all channels.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->getGlobalChannels($connection->app->id)
|
2020-09-10 19:59:26 +00:00
|
|
|
->then(function ($channels) use ($connection) {
|
|
|
|
|
foreach ($channels as $channel) {
|
2020-09-19 11:16:26 +00:00
|
|
|
$this->unsubscribeFromChannel($connection, $channel, new stdClass);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
2020-09-19 11:16:26 +00:00
|
|
|
})
|
|
|
|
|
->then(function () use ($connection) {
|
|
|
|
|
return parent::unsubscribeFromAllChannels($connection);
|
2020-09-10 19:59:26 +00:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Subscribe the connection to a specific channel.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param string $channelName
|
|
|
|
|
* @param stdClass $payload
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
|
|
|
|
*/
|
|
|
|
|
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return $this->subscribeToTopic($connection->app->id, $channelName)
|
|
|
|
|
->then(function () use ($connection) {
|
|
|
|
|
return $this->addConnectionToSet($connection, Carbon::now());
|
|
|
|
|
})
|
|
|
|
|
->then(function () use ($connection, $channelName) {
|
|
|
|
|
return $this->addChannelToSet($connection->app->id, $channelName);
|
|
|
|
|
})
|
|
|
|
|
->then(function () use ($connection, $channelName) {
|
|
|
|
|
return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1);
|
|
|
|
|
})
|
|
|
|
|
->then(function () use ($connection, $channelName, $payload) {
|
|
|
|
|
return parent::subscribeToChannel($connection, $channelName, $payload);
|
|
|
|
|
});
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Unsubscribe the connection from the channel.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param string $channelName
|
|
|
|
|
* @param stdClass $payload
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->getGlobalConnectionsCount($connection->app->id, $channelName)
|
2020-09-10 19:59:26 +00:00
|
|
|
->then(function ($count) use ($connection, $channelName) {
|
|
|
|
|
if ($count === 0) {
|
2020-09-17 10:57:10 +00:00
|
|
|
// Make sure to not stay subscribed to the PubSub topic
|
|
|
|
|
// if there are no connections.
|
2020-09-10 19:59:26 +00:00
|
|
|
$this->unsubscribeFromTopic($connection->app->id, $channelName);
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-17 10:57:10 +00:00
|
|
|
$this->decrementSubscriptionsCount($connection->app->id, $channelName)
|
|
|
|
|
->then(function ($count) use ($connection, $channelName) {
|
|
|
|
|
// If the total connections count gets to 0 after unsubscribe,
|
|
|
|
|
// try again to check & unsubscribe from the PubSub topic if needed.
|
|
|
|
|
if ($count < 1) {
|
|
|
|
|
$this->unsubscribeFromTopic($connection->app->id, $channelName);
|
|
|
|
|
}
|
|
|
|
|
});
|
2020-09-19 11:16:26 +00:00
|
|
|
})
|
|
|
|
|
->then(function () use ($connection, $channelName) {
|
|
|
|
|
return $this->removeChannelFromSet($connection->app->id, $channelName);
|
|
|
|
|
})
|
2020-09-19 11:16:46 +00:00
|
|
|
->then(function () use ($connection) {
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->removeConnectionFromSet($connection);
|
|
|
|
|
})
|
|
|
|
|
->then(function () use ($connection, $channelName, $payload) {
|
|
|
|
|
return parent::unsubscribeFromChannel($connection, $channelName, $payload);
|
2020-09-10 19:59:26 +00:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2020-09-19 11:16:26 +00:00
|
|
|
* Subscribe the connection to a specific channel, returning
|
|
|
|
|
* a promise containing the amount of connections.
|
2020-09-10 19:59:26 +00:00
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function subscribeToApp($appId): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->subscribeToTopic($appId)
|
|
|
|
|
->then(function () use ($appId) {
|
|
|
|
|
return $this->incrementSubscriptionsCount($appId);
|
|
|
|
|
});
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2020-09-19 11:16:26 +00:00
|
|
|
* Unsubscribe the connection from the channel, returning
|
|
|
|
|
* a promise containing the amount of connections after decrement.
|
2020-09-10 19:59:26 +00:00
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function unsubscribeFromApp($appId): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->unsubscribeFromTopic($appId)
|
|
|
|
|
->then(function () use ($appId) {
|
|
|
|
|
return $this->decrementSubscriptionsCount($appId);
|
|
|
|
|
});
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the connections count on the app
|
|
|
|
|
* for the current server instance.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channelName
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
|
|
|
|
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return parent::getLocalConnectionsCount($appId, $channelName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the connections count
|
|
|
|
|
* across multiple servers.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channelName
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
|
|
|
|
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return $this->publishClient
|
|
|
|
|
->hget($this->getRedisKey($appId, $channelName, ['stats']), 'connections')
|
|
|
|
|
->then(function ($count) {
|
|
|
|
|
return is_null($count) ? 0 : (int) $count;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Broadcast the message across multiple servers.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
2020-09-18 08:57:10 +00:00
|
|
|
* @param string|null $socketId
|
2020-09-10 19:59:26 +00:00
|
|
|
* @param string $channel
|
|
|
|
|
* @param stdClass $payload
|
2020-09-18 08:57:10 +00:00
|
|
|
* @param string|null $serverId
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
$payload->appId = $appId;
|
2020-09-18 08:57:10 +00:00
|
|
|
$payload->socketId = $socketId;
|
|
|
|
|
$payload->serverId = $serverId ?: $this->getServerId();
|
2020-09-10 19:59:26 +00:00
|
|
|
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->publishClient
|
|
|
|
|
->publish($this->getRedisKey($appId, $channel), json_encode($payload))
|
|
|
|
|
->then(function () use ($appId, $socketId, $channel, $payload, $serverId) {
|
|
|
|
|
return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId);
|
|
|
|
|
});
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle the user when it joined a presence channel.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param stdClass $user
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @param stdClass $payload
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user))
|
|
|
|
|
->then(function () use ($connection, $channel, $user) {
|
|
|
|
|
return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId);
|
|
|
|
|
})
|
|
|
|
|
->then(function () use ($connection, $user, $channel, $payload) {
|
|
|
|
|
return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload);
|
|
|
|
|
});
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle the user when it left a presence channel.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param stdClass $user
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @param stdClass $payload
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->removeUserData($connection->app->id, $channel, $connection->socketId)
|
|
|
|
|
->then(function () use ($connection, $channel, $user) {
|
|
|
|
|
return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId);
|
|
|
|
|
})
|
|
|
|
|
->then(function () use ($connection, $user, $channel) {
|
|
|
|
|
return parent::userLeftPresenceChannel($connection, $user, $channel);
|
|
|
|
|
});
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the presence channel members.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channel
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return \React\Promise\PromiseInterface[array]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
|
|
|
|
public function getChannelMembers($appId, string $channel): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return $this->publishClient
|
|
|
|
|
->hgetall($this->getRedisKey($appId, $channel, ['users']))
|
2020-09-15 09:30:17 +00:00
|
|
|
->then(function ($list) {
|
2020-09-19 11:16:26 +00:00
|
|
|
return collect(Helpers::redisListToArray($list))->map(function ($user) {
|
|
|
|
|
return json_decode($user);
|
|
|
|
|
})->unique('user_id')->toArray();
|
2020-09-10 19:59:26 +00:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get a member from a presence channel based on connection.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param string $channel
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return \React\Promise\PromiseInterface[null|array]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
|
|
|
|
public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return $this->publishClient->hget(
|
|
|
|
|
$this->getRedisKey($connection->app->id, $channel, ['users']), $connection->socketId
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the presence channels total members count.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param array $channelNames
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return \React\Promise\PromiseInterface[array]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
|
|
|
|
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
$this->publishClient->multi();
|
|
|
|
|
|
|
|
|
|
foreach ($channelNames as $channel) {
|
|
|
|
|
$this->publishClient->hlen(
|
|
|
|
|
$this->getRedisKey($appId, $channel, ['users'])
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-15 14:03:17 +00:00
|
|
|
return $this->publishClient->exec()
|
2020-09-10 19:59:26 +00:00
|
|
|
->then(function ($data) use ($channelNames) {
|
|
|
|
|
return array_combine($channelNames, $data);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-16 08:02:58 +00:00
|
|
|
/**
|
|
|
|
|
* Get the socket IDs for a presence channel member.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $userId
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channelName
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return \React\Promise\PromiseInterface[array]
|
2020-09-16 08:02:58 +00:00
|
|
|
*/
|
|
|
|
|
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return $this->publishClient->smembers(
|
|
|
|
|
$this->getRedisKey($appId, $channelName, [$userId, 'userSockets'])
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-15 09:30:17 +00:00
|
|
|
/**
|
|
|
|
|
* Keep tracking the connections availability when they pong.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-15 09:30:17 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
|
2020-09-15 09:30:17 +00:00
|
|
|
{
|
|
|
|
|
// This will update the score with the current timestamp.
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->addConnectionToSet($connection, Carbon::now())
|
|
|
|
|
->then(function () use ($connection) {
|
|
|
|
|
return parent::connectionPonged($connection);
|
|
|
|
|
});
|
2020-09-15 09:30:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove the obsolete connections that didn't ponged in a while.
|
|
|
|
|
*
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-15 09:30:17 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function removeObsoleteConnections(): PromiseInterface
|
2020-09-15 09:30:17 +00:00
|
|
|
{
|
|
|
|
|
$this->lock()->get(function () {
|
|
|
|
|
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
|
|
|
|
|
->then(function ($connections) {
|
2020-09-18 09:31:51 +00:00
|
|
|
foreach ($connections as $socketId => $appId) {
|
2020-09-19 11:16:26 +00:00
|
|
|
$connection = $this->fakeConnectionForApp($appId, $socketId);
|
|
|
|
|
|
|
|
|
|
$this->unsubscribeFromAllChannels($connection);
|
2020-09-15 09:30:17 +00:00
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return parent::removeObsoleteConnections();
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
/**
|
|
|
|
|
* Handle a message received from Redis on a specific channel.
|
|
|
|
|
*
|
|
|
|
|
* @param string $redisChannel
|
|
|
|
|
* @param string $payload
|
|
|
|
|
* @return void
|
|
|
|
|
*/
|
|
|
|
|
public function onMessage(string $redisChannel, string $payload)
|
|
|
|
|
{
|
|
|
|
|
$payload = json_decode($payload);
|
|
|
|
|
|
|
|
|
|
if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$payload->channel = Str::after($redisChannel, "{$payload->appId}:");
|
|
|
|
|
|
|
|
|
|
if (! $channel = $this->find($payload->appId, $payload->channel)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$appId = $payload->appId ?? null;
|
|
|
|
|
$socketId = $payload->socketId ?? null;
|
|
|
|
|
$serverId = $payload->serverId ?? null;
|
|
|
|
|
|
2020-10-05 06:25:20 +00:00
|
|
|
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [
|
|
|
|
|
'fromServerId' => $serverId,
|
|
|
|
|
'fromSocketId' => $socketId,
|
|
|
|
|
'receiverServerId' => $this->getServerId(),
|
|
|
|
|
'channel' => $channel,
|
|
|
|
|
'payload' => $payload,
|
|
|
|
|
]);
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
unset($payload->socketId);
|
|
|
|
|
unset($payload->serverId);
|
|
|
|
|
unset($payload->appId);
|
|
|
|
|
|
2020-09-18 08:57:55 +00:00
|
|
|
$channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Build the Redis connection URL from Laravel database config.
|
|
|
|
|
*
|
|
|
|
|
* @return string
|
|
|
|
|
*/
|
|
|
|
|
protected function getConnectionUri()
|
|
|
|
|
{
|
2020-11-17 12:32:44 +00:00
|
|
|
$name = config('websockets.replication.modes.redis.connection', 'default');
|
2020-09-10 19:59:26 +00:00
|
|
|
$config = config("database.redis.{$name}");
|
|
|
|
|
|
|
|
|
|
$host = $config['host'];
|
|
|
|
|
$port = $config['port'] ?: 6379;
|
|
|
|
|
|
|
|
|
|
$query = [];
|
|
|
|
|
|
|
|
|
|
if ($config['password']) {
|
|
|
|
|
$query['password'] = $config['password'];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ($config['database']) {
|
2020-11-17 12:41:29 +00:00
|
|
|
$query['db'] = $config['database'];
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$query = http_build_query($query);
|
|
|
|
|
|
|
|
|
|
return "redis://{$host}:{$port}".($query ? "?{$query}" : '');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the Subscribe client instance.
|
|
|
|
|
*
|
|
|
|
|
* @return Client
|
|
|
|
|
*/
|
|
|
|
|
public function getSubscribeClient()
|
|
|
|
|
{
|
|
|
|
|
return $this->subscribeClient;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the Publish client instance.
|
|
|
|
|
*
|
|
|
|
|
* @return Client
|
|
|
|
|
*/
|
|
|
|
|
public function getPublishClient()
|
|
|
|
|
{
|
|
|
|
|
return $this->publishClient;
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-25 19:16:06 +00:00
|
|
|
/**
|
|
|
|
|
* Get the Redis client used by other classes.
|
|
|
|
|
*
|
|
|
|
|
* @return Client
|
|
|
|
|
*/
|
|
|
|
|
public function getRedisClient()
|
|
|
|
|
{
|
|
|
|
|
return $this->getPublishClient();
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
/**
|
|
|
|
|
* Get the unique identifier for the server.
|
|
|
|
|
*
|
|
|
|
|
* @return string
|
|
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function getServerId(): string
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
return $this->serverId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Increment the subscribed count number.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channel
|
|
|
|
|
* @param int $increment
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
return $this->publishClient->hincrby(
|
|
|
|
|
$this->getRedisKey($appId, $channel, ['stats']), 'connections', $increment
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-11 20:58:16 +00:00
|
|
|
/**
|
|
|
|
|
* Decrement the subscribed count number.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channel
|
|
|
|
|
* @param int $decrement
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-11 20:58:16 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
|
2020-09-11 20:58:16 +00:00
|
|
|
{
|
|
|
|
|
return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1);
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-15 09:30:17 +00:00
|
|
|
/**
|
|
|
|
|
* Add the connection to the sorted list.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param \DateTime|string|null $moment
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface
|
2020-09-15 09:30:17 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function addConnectionToSet(ConnectionInterface $connection, $moment = null): PromiseInterface
|
2020-09-15 09:30:17 +00:00
|
|
|
{
|
2020-09-17 10:56:09 +00:00
|
|
|
$moment = $moment ? Carbon::parse($moment) : Carbon::now();
|
|
|
|
|
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->publishClient->zadd(
|
2020-09-15 14:03:17 +00:00
|
|
|
$this->getRedisKey(null, null, ['sockets']),
|
2020-09-17 10:56:09 +00:00
|
|
|
$moment->format('U'), "{$connection->app->id}:{$connection->socketId}"
|
2020-09-15 14:03:17 +00:00
|
|
|
);
|
2020-09-15 09:30:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove the connection from the sorted list.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface
|
2020-09-15 09:30:17 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface
|
2020-09-15 09:30:17 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->publishClient->zrem(
|
2020-09-15 14:03:17 +00:00
|
|
|
$this->getRedisKey(null, null, ['sockets']),
|
|
|
|
|
"{$connection->app->id}:{$connection->socketId}"
|
|
|
|
|
);
|
2020-09-15 09:30:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the connections from the sorted list, with last
|
|
|
|
|
* connection between certain timestamps.
|
|
|
|
|
*
|
|
|
|
|
* @param int $start
|
|
|
|
|
* @param int $stop
|
2020-09-17 10:56:09 +00:00
|
|
|
* @param bool $strict
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[array]
|
2020-09-15 09:30:17 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true): PromiseInterface
|
2020-09-15 09:30:17 +00:00
|
|
|
{
|
2020-09-17 10:56:09 +00:00
|
|
|
if ($strict) {
|
|
|
|
|
$start = "({$start}";
|
|
|
|
|
$stop = "({$stop}";
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-18 09:53:36 +00:00
|
|
|
return $this->publishClient
|
|
|
|
|
->zrangebyscore($this->getRedisKey(null, null, ['sockets']), $start, $stop)
|
|
|
|
|
->then(function ($list) {
|
|
|
|
|
return collect($list)->mapWithKeys(function ($appWithSocket) {
|
|
|
|
|
[$appId, $socketId] = explode(':', $appWithSocket);
|
|
|
|
|
|
|
|
|
|
return [$socketId => $appId];
|
|
|
|
|
})->toArray();
|
|
|
|
|
});
|
2020-09-15 09:30:17 +00:00
|
|
|
}
|
|
|
|
|
|
2020-09-11 20:58:16 +00:00
|
|
|
/**
|
|
|
|
|
* Add a channel to the set list.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @return PromiseInterface
|
|
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function addChannelToSet($appId, string $channel): PromiseInterface
|
2020-09-11 20:58:16 +00:00
|
|
|
{
|
2020-09-15 14:03:17 +00:00
|
|
|
return $this->publishClient->sadd(
|
2020-09-18 09:53:36 +00:00
|
|
|
$this->getRedisKey($appId, null, ['channels']), $channel
|
2020-09-11 20:58:16 +00:00
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove a channel from the set list.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @return PromiseInterface
|
|
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function removeChannelFromSet($appId, string $channel): PromiseInterface
|
2020-09-11 20:58:16 +00:00
|
|
|
{
|
2020-09-15 14:03:17 +00:00
|
|
|
return $this->publishClient->srem(
|
2020-09-18 09:53:36 +00:00
|
|
|
$this->getRedisKey($appId, null, ['channels']), $channel
|
2020-09-11 20:58:16 +00:00
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
/**
|
|
|
|
|
* Set data for a topic. Might be used for the presence channels.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channel
|
|
|
|
|
* @param string $key
|
2020-09-16 08:02:58 +00:00
|
|
|
* @param string $data
|
2020-09-10 19:59:26 +00:00
|
|
|
* @return PromiseInterface
|
|
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->publishClient->hset(
|
2020-09-10 19:59:26 +00:00
|
|
|
$this->getRedisKey($appId, $channel, ['users']), $key, $data
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove data for a topic. Might be used for the presence channels.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channel
|
|
|
|
|
* @param string $key
|
|
|
|
|
* @return PromiseInterface
|
|
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function removeUserData($appId, string $channel = null, string $key): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
return $this->publishClient->hdel(
|
2020-09-17 10:57:10 +00:00
|
|
|
$this->getRedisKey($appId, $channel, ['users']), $key
|
2020-09-10 19:59:26 +00:00
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Subscribe to the topic for the app, or app and channel.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channel
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function subscribeToTopic($appId, string $channel = null): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-10-05 06:25:20 +00:00
|
|
|
$topic = $this->getRedisKey($appId, $channel);
|
|
|
|
|
|
|
|
|
|
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
|
|
|
|
|
'serverId' => $this->getServerId(),
|
|
|
|
|
'pubsubTopic' => $topic,
|
|
|
|
|
]);
|
|
|
|
|
|
|
|
|
|
return $this->subscribeClient->subscribe($topic);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Unsubscribe from the topic for the app, or app and channel.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channel
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-10-05 06:25:20 +00:00
|
|
|
$topic = $this->getRedisKey($appId, $channel);
|
|
|
|
|
|
|
|
|
|
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
|
|
|
|
|
'serverId' => $this->getServerId(),
|
|
|
|
|
'pubsubTopic' => $topic,
|
|
|
|
|
]);
|
|
|
|
|
|
|
|
|
|
return $this->subscribeClient->unsubscribe($topic);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
2020-09-16 08:02:58 +00:00
|
|
|
/**
|
|
|
|
|
* Add the Presence Channel's User's Socket ID to a list.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @param stdClass $user
|
|
|
|
|
* @param string $socketId
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface
|
2020-09-16 08:02:58 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
|
2020-09-16 08:02:58 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->publishClient->sadd(
|
2020-09-18 09:53:36 +00:00
|
|
|
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId
|
2020-09-16 08:02:58 +00:00
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove the Presence Channel's User's Socket ID from the list.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @param stdClass $user
|
|
|
|
|
* @param string $socketId
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface
|
2020-09-16 08:02:58 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
|
2020-09-16 08:02:58 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return $this->publishClient->srem(
|
2020-09-18 09:53:36 +00:00
|
|
|
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId
|
2020-09-16 08:02:58 +00:00
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
/**
|
|
|
|
|
* Get the Redis Keyspace name to handle subscriptions
|
|
|
|
|
* and other key-value sets.
|
|
|
|
|
*
|
2020-09-15 09:30:17 +00:00
|
|
|
* @param string|int|null $appId
|
2020-09-10 19:59:26 +00:00
|
|
|
* @param string|null $channel
|
|
|
|
|
* @return string
|
|
|
|
|
*/
|
2020-09-15 09:30:17 +00:00
|
|
|
public function getRedisKey($appId = null, string $channel = null, array $suffixes = []): string
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
$prefix = config('database.redis.options.prefix', null);
|
|
|
|
|
|
|
|
|
|
$hash = "{$prefix}{$appId}";
|
|
|
|
|
|
|
|
|
|
if ($channel) {
|
2020-09-17 07:51:01 +00:00
|
|
|
$suffixes = array_merge([$channel], $suffixes);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
2020-09-17 07:51:01 +00:00
|
|
|
$suffixes = implode(':', $suffixes);
|
2020-09-10 19:59:26 +00:00
|
|
|
|
|
|
|
|
if ($suffixes) {
|
2020-09-17 07:51:01 +00:00
|
|
|
$hash .= ":{$suffixes}";
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return $hash;
|
|
|
|
|
}
|
2020-09-15 09:30:17 +00:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get a new RedisLock instance to avoid race conditions.
|
|
|
|
|
*
|
|
|
|
|
* @return \Illuminate\Cache\CacheLock
|
|
|
|
|
*/
|
|
|
|
|
protected function lock()
|
|
|
|
|
{
|
2020-09-19 15:38:08 +00:00
|
|
|
return new RedisLock($this->redis, static::$lockName, 0);
|
2020-09-15 09:30:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create a fake connection for app that will mimick a connection
|
|
|
|
|
* by app ID and Socket ID to be able to be passed to the methods
|
|
|
|
|
* that accepts a connection class.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $socketId
|
|
|
|
|
* @return ConnectionInterface
|
|
|
|
|
*/
|
|
|
|
|
public function fakeConnectionForApp($appId, string $socketId)
|
|
|
|
|
{
|
|
|
|
|
return new MockableConnection($appId, $socketId);
|
|
|
|
|
}
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|