2020-09-10 19:59:26 +00:00
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
namespace BeyondCode\LaravelWebSockets\ChannelManagers;
|
|
|
|
|
|
|
|
|
|
use BeyondCode\LaravelWebSockets\Channels\Channel;
|
|
|
|
|
use BeyondCode\LaravelWebSockets\Channels\PresenceChannel;
|
|
|
|
|
use BeyondCode\LaravelWebSockets\Channels\PrivateChannel;
|
2020-09-10 19:59:49 +00:00
|
|
|
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
|
2020-09-19 11:16:26 +00:00
|
|
|
use BeyondCode\LaravelWebSockets\Helpers;
|
2020-09-19 15:38:08 +00:00
|
|
|
use Carbon\Carbon;
|
|
|
|
|
use Illuminate\Cache\ArrayLock;
|
|
|
|
|
use Illuminate\Cache\ArrayStore;
|
2020-09-10 19:59:49 +00:00
|
|
|
use Illuminate\Support\Str;
|
|
|
|
|
use Ratchet\ConnectionInterface;
|
|
|
|
|
use React\EventLoop\LoopInterface;
|
2020-09-10 19:59:26 +00:00
|
|
|
use React\Promise\PromiseInterface;
|
|
|
|
|
use stdClass;
|
|
|
|
|
|
|
|
|
|
class LocalChannelManager implements ChannelManager
|
|
|
|
|
{
|
|
|
|
|
/**
|
|
|
|
|
* The list of stored channels.
|
|
|
|
|
*
|
|
|
|
|
* @var array
|
|
|
|
|
*/
|
|
|
|
|
protected $channels = [];
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The list of users that joined the presence channel.
|
|
|
|
|
*
|
|
|
|
|
* @var array
|
|
|
|
|
*/
|
|
|
|
|
protected $users = [];
|
|
|
|
|
|
2020-09-16 08:02:58 +00:00
|
|
|
/**
|
|
|
|
|
* The list of users by socket and their attached id.
|
|
|
|
|
*
|
|
|
|
|
* @var array
|
|
|
|
|
*/
|
|
|
|
|
protected $userSockets = [];
|
|
|
|
|
|
2020-09-12 14:45:07 +00:00
|
|
|
/**
|
|
|
|
|
* Wether the current instance accepts new connections.
|
|
|
|
|
*
|
|
|
|
|
* @var bool
|
|
|
|
|
*/
|
|
|
|
|
protected $acceptsNewConnections = true;
|
|
|
|
|
|
2020-09-19 15:46:13 +00:00
|
|
|
/**
|
|
|
|
|
* The ArrayStore instance of locks.
|
|
|
|
|
*
|
|
|
|
|
* @var \Illuminate\Cache\ArrayStore
|
|
|
|
|
*/
|
|
|
|
|
protected $store;
|
|
|
|
|
|
2020-09-19 15:38:08 +00:00
|
|
|
/**
|
|
|
|
|
* The lock name to use on Array to avoid multiple
|
|
|
|
|
* actions that might lead to multiple processings.
|
|
|
|
|
*
|
|
|
|
|
* @var string
|
|
|
|
|
*/
|
|
|
|
|
protected static $lockName = 'laravel-websockets:channel-manager:lock';
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
/**
|
|
|
|
|
* Create a new channel manager instance.
|
|
|
|
|
*
|
|
|
|
|
* @param LoopInterface $loop
|
|
|
|
|
* @param string|null $factoryClass
|
|
|
|
|
* @return void
|
|
|
|
|
*/
|
|
|
|
|
public function __construct(LoopInterface $loop, $factoryClass = null)
|
|
|
|
|
{
|
2020-09-19 15:46:13 +00:00
|
|
|
$this->store = new ArrayStore;
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Find the channel by app & name.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @return null|BeyondCode\LaravelWebSockets\Channels\Channel
|
|
|
|
|
*/
|
|
|
|
|
public function find($appId, string $channel)
|
|
|
|
|
{
|
|
|
|
|
return $this->channels[$appId][$channel] ?? null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Find a channel by app & name or create one.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @return BeyondCode\LaravelWebSockets\Channels\Channel
|
|
|
|
|
*/
|
|
|
|
|
public function findOrCreate($appId, string $channel)
|
|
|
|
|
{
|
|
|
|
|
if (! $channelInstance = $this->find($appId, $channel)) {
|
|
|
|
|
$class = $this->getChannelClassName($channel);
|
|
|
|
|
|
|
|
|
|
$this->channels[$appId][$channel] = new $class($channel);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return $this->channels[$appId][$channel];
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-12 14:45:07 +00:00
|
|
|
/**
|
|
|
|
|
* Get the local connections, regardless of the channel
|
|
|
|
|
* they are connected to.
|
|
|
|
|
*
|
|
|
|
|
* @return \React\Promise\PromiseInterface
|
|
|
|
|
*/
|
|
|
|
|
public function getLocalConnections(): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
$connections = collect($this->channels)
|
|
|
|
|
->map(function ($channelsWithConnections, $appId) {
|
|
|
|
|
return collect($channelsWithConnections)->values();
|
|
|
|
|
})
|
|
|
|
|
->values()->collapse()
|
|
|
|
|
->map(function ($channel) {
|
|
|
|
|
return collect($channel->getConnections());
|
|
|
|
|
})
|
|
|
|
|
->values()->collapse()
|
|
|
|
|
->toArray();
|
|
|
|
|
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise($connections);
|
2020-09-12 14:45:07 +00:00
|
|
|
}
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
/**
|
|
|
|
|
* Get all channels for a specific app
|
|
|
|
|
* for the current instance.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @return \React\Promise\PromiseInterface[array]
|
|
|
|
|
*/
|
|
|
|
|
public function getLocalChannels($appId): PromiseInterface
|
|
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise(
|
2020-09-10 19:59:26 +00:00
|
|
|
$this->channels[$appId] ?? []
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get all channels for a specific app
|
|
|
|
|
* across multiple servers.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @return \React\Promise\PromiseInterface[array]
|
|
|
|
|
*/
|
|
|
|
|
public function getGlobalChannels($appId): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return $this->getLocalChannels($appId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove connection from all channels.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
if (! isset($connection->app)) {
|
2020-09-19 11:16:26 +00:00
|
|
|
return new FuilfilledPromise(false);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$this->getLocalChannels($connection->app->id)
|
|
|
|
|
->then(function ($channels) use ($connection) {
|
|
|
|
|
collect($channels)->each->unsubscribe($connection);
|
|
|
|
|
|
|
|
|
|
collect($channels)
|
|
|
|
|
->reject->hasConnections()
|
|
|
|
|
->each(function (Channel $channel, string $channelName) use ($connection) {
|
|
|
|
|
unset($this->channels[$connection->app->id][$channelName]);
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
$this->getLocalChannels($connection->app->id)
|
|
|
|
|
->then(function ($channels) use ($connection) {
|
|
|
|
|
if (count($channels) === 0) {
|
|
|
|
|
unset($this->channels[$connection->app->id]);
|
|
|
|
|
}
|
|
|
|
|
});
|
2020-09-19 11:16:26 +00:00
|
|
|
|
|
|
|
|
return Helpers::createFulfilledPromise(true);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Subscribe the connection to a specific channel.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param string $channelName
|
|
|
|
|
* @param stdClass $payload
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
$channel = $this->findOrCreate($connection->app->id, $channelName);
|
|
|
|
|
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise(
|
|
|
|
|
$channel->subscribe($connection, $payload)
|
|
|
|
|
);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Unsubscribe the connection from the channel.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param string $channelName
|
|
|
|
|
* @param stdClass $payload
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
$channel = $this->findOrCreate($connection->app->id, $channelName);
|
|
|
|
|
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise(
|
|
|
|
|
$channel->unsubscribe($connection, $payload)
|
|
|
|
|
);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2020-09-19 11:16:26 +00:00
|
|
|
* Subscribe the connection to a specific channel, returning
|
|
|
|
|
* a promise containing the amount of connections.
|
2020-09-10 19:59:26 +00:00
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function subscribeToApp($appId): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise(0);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2020-09-19 11:16:26 +00:00
|
|
|
* Unsubscribe the connection from the channel, returning
|
|
|
|
|
* a promise containing the amount of connections after decrement.
|
2020-09-10 19:59:26 +00:00
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function unsubscribeFromApp($appId): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise(0);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the connections count on the app
|
|
|
|
|
* for the current server instance.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channelName
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
|
|
|
|
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return $this->getLocalChannels($appId)
|
|
|
|
|
->then(function ($channels) use ($channelName) {
|
2020-09-19 11:16:26 +00:00
|
|
|
return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) {
|
|
|
|
|
return $collection->filter(function (Channel $channel) use ($channelName) {
|
|
|
|
|
return $channel->getName() === $channelName;
|
|
|
|
|
});
|
|
|
|
|
})
|
|
|
|
|
->flatMap(function (Channel $channel) {
|
|
|
|
|
return collect($channel->getConnections())->pluck('socketId');
|
|
|
|
|
})
|
|
|
|
|
->unique()->count();
|
2020-09-10 19:59:26 +00:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the connections count
|
|
|
|
|
* across multiple servers.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string|null $channelName
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[int]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
|
|
|
|
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return $this->getLocalConnectionsCount($appId, $channelName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Broadcast the message across multiple servers.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
2020-09-18 08:57:10 +00:00
|
|
|
* @param string|null $socketId
|
2020-09-10 19:59:26 +00:00
|
|
|
* @param string $channel
|
|
|
|
|
* @param stdClass $payload
|
2020-09-18 08:57:10 +00:00
|
|
|
* @param string|null $serverId
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise(true);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle the user when it joined a presence channel.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param stdClass $user
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @param stdClass $payload
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
$this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user);
|
2020-09-16 08:02:58 +00:00
|
|
|
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId;
|
2020-09-19 11:16:26 +00:00
|
|
|
|
|
|
|
|
return Helpers::createFulfilledPromise(true);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle the user when it left a presence channel.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param stdClass $user
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @param stdClass $payload
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-10 19:59:26 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface
|
2020-09-10 19:59:26 +00:00
|
|
|
{
|
|
|
|
|
unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]);
|
2020-09-16 08:02:58 +00:00
|
|
|
|
|
|
|
|
$deletableSocketKey = array_search(
|
|
|
|
|
$connection->socketId,
|
|
|
|
|
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if ($deletableSocketKey !== false) {
|
|
|
|
|
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][$deletableSocketKey]);
|
|
|
|
|
|
|
|
|
|
if (count($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]) === 0) {
|
|
|
|
|
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]);
|
|
|
|
|
}
|
|
|
|
|
}
|
2020-09-19 11:16:26 +00:00
|
|
|
|
|
|
|
|
return Helpers::createFulfilledPromise(true);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the presence channel members.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @return \React\Promise\PromiseInterface
|
|
|
|
|
*/
|
|
|
|
|
public function getChannelMembers($appId, string $channel): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
$members = $this->users["{$appId}:{$channel}"] ?? [];
|
|
|
|
|
|
|
|
|
|
$members = collect($members)->map(function ($user) {
|
|
|
|
|
return json_decode($user);
|
2020-09-17 08:30:36 +00:00
|
|
|
})->unique('user_id')->toArray();
|
2020-09-10 19:59:26 +00:00
|
|
|
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise($members);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get a member from a presence channel based on connection.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
|
|
|
|
* @param string $channel
|
|
|
|
|
* @return \React\Promise\PromiseInterface
|
|
|
|
|
*/
|
|
|
|
|
public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
$member = $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] ?? null;
|
|
|
|
|
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise($member);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the presence channels total members count.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param array $channelNames
|
|
|
|
|
* @return \React\Promise\PromiseInterface
|
|
|
|
|
*/
|
|
|
|
|
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
$results = collect($channelNames)
|
|
|
|
|
->reduce(function ($results, $channel) use ($appId) {
|
|
|
|
|
$results[$channel] = isset($this->users["{$appId}:{$channel}"])
|
|
|
|
|
? count($this->users["{$appId}:{$channel}"])
|
|
|
|
|
: 0;
|
|
|
|
|
|
|
|
|
|
return $results;
|
|
|
|
|
}, []);
|
|
|
|
|
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise($results);
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|
|
|
|
|
|
2020-09-16 08:02:58 +00:00
|
|
|
/**
|
|
|
|
|
* Get the socket IDs for a presence channel member.
|
|
|
|
|
*
|
|
|
|
|
* @param string|int $userId
|
|
|
|
|
* @param string|int $appId
|
|
|
|
|
* @param string $channelName
|
|
|
|
|
* @return \React\Promise\PromiseInterface
|
|
|
|
|
*/
|
|
|
|
|
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
|
|
|
|
|
{
|
2020-09-19 11:16:26 +00:00
|
|
|
return Helpers::createFulfilledPromise(
|
2020-09-16 08:02:58 +00:00
|
|
|
$this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? []
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-15 09:30:17 +00:00
|
|
|
/**
|
|
|
|
|
* Keep tracking the connections availability when they pong.
|
|
|
|
|
*
|
|
|
|
|
* @param \Ratchet\ConnectionInterface $connection
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-15 09:30:17 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
|
2020-09-15 09:30:17 +00:00
|
|
|
{
|
2020-09-19 15:38:08 +00:00
|
|
|
$connection->lastPongedAt = Carbon::now();
|
|
|
|
|
|
|
|
|
|
return $this->updateConnectionInChannels($connection);
|
2020-09-15 09:30:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove the obsolete connections that didn't ponged in a while.
|
|
|
|
|
*
|
2020-09-19 11:16:26 +00:00
|
|
|
* @return PromiseInterface[bool]
|
2020-09-15 09:30:17 +00:00
|
|
|
*/
|
2020-09-19 11:16:26 +00:00
|
|
|
public function removeObsoleteConnections(): PromiseInterface
|
2020-09-15 09:30:17 +00:00
|
|
|
{
|
2020-09-19 15:38:08 +00:00
|
|
|
if (! $this->lock()->acquire()) {
|
|
|
|
|
return Helpers::createFulfilledPromise(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$this->getLocalConnections()->then(function ($connections) {
|
|
|
|
|
foreach ($connections as $connection) {
|
|
|
|
|
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
|
|
|
|
|
|
|
|
|
|
if ($differenceInSeconds > 120) {
|
|
|
|
|
$this->unsubscribeFromAllChannels($connection);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return Helpers::createFulfilledPromise(
|
|
|
|
|
$this->lock()->release()
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Update the connection in all channels.
|
|
|
|
|
*
|
|
|
|
|
* @param ConnectionInterface $connection
|
|
|
|
|
* @return PromiseInterface[bool]
|
|
|
|
|
*/
|
|
|
|
|
public function updateConnectionInChannels($connection): PromiseInterface
|
|
|
|
|
{
|
|
|
|
|
return $this->getLocalChannels($connection->app->id)
|
|
|
|
|
->then(function ($channels) use ($connection) {
|
|
|
|
|
foreach ($channels as $channel) {
|
|
|
|
|
if ($channel->hasConnection($connection)) {
|
|
|
|
|
$channel->saveConnection($connection);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
});
|
2020-09-15 09:30:17 +00:00
|
|
|
}
|
|
|
|
|
|
2020-09-12 14:45:07 +00:00
|
|
|
/**
|
|
|
|
|
* Mark the current instance as unable to accept new connections.
|
|
|
|
|
*
|
|
|
|
|
* @return $this
|
|
|
|
|
*/
|
|
|
|
|
public function declineNewConnections()
|
|
|
|
|
{
|
|
|
|
|
$this->acceptsNewConnections = false;
|
|
|
|
|
|
|
|
|
|
return $this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Check if the current server instance
|
|
|
|
|
* accepts new connections.
|
|
|
|
|
*
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
public function acceptsNewConnections(): bool
|
|
|
|
|
{
|
|
|
|
|
return $this->acceptsNewConnections;
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-10 19:59:26 +00:00
|
|
|
/**
|
|
|
|
|
* Get the channel class by the channel name.
|
|
|
|
|
*
|
|
|
|
|
* @param string $channelName
|
|
|
|
|
* @return string
|
|
|
|
|
*/
|
|
|
|
|
protected function getChannelClassName(string $channelName): string
|
|
|
|
|
{
|
|
|
|
|
if (Str::startsWith($channelName, 'private-')) {
|
|
|
|
|
return PrivateChannel::class;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (Str::startsWith($channelName, 'presence-')) {
|
|
|
|
|
return PresenceChannel::class;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return Channel::class;
|
|
|
|
|
}
|
2020-09-19 15:38:08 +00:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get a new ArrayLock instance to avoid race conditions.
|
|
|
|
|
*
|
|
|
|
|
* @return \Illuminate\Cache\CacheLock
|
|
|
|
|
*/
|
|
|
|
|
protected function lock()
|
|
|
|
|
{
|
2020-09-19 15:46:13 +00:00
|
|
|
return new ArrayLock($this->store, static::$lockName, 0);
|
2020-09-19 15:38:08 +00:00
|
|
|
}
|
2020-09-10 19:59:26 +00:00
|
|
|
}
|