Fixed issues with connections being closed within 10 seconds

This commit is contained in:
Alex Renoki 2020-09-17 13:56:09 +03:00
parent 16ff2aa2b6
commit 23e8b3db44
2 changed files with 22 additions and 12 deletions

View File

@ -160,7 +160,7 @@ class RedisChannelManager extends LocalChannelManager
{ {
$this->subscribeToTopic($connection->app->id, $channelName); $this->subscribeToTopic($connection->app->id, $channelName);
$this->addConnectionToSet($connection); $this->addConnectionToSet($connection, Carbon::now());
$this->addChannelToSet( $this->addChannelToSet(
$connection->app->id, $channelName $connection->app->id, $channelName
@ -416,7 +416,7 @@ class RedisChannelManager extends LocalChannelManager
public function connectionPonged(ConnectionInterface $connection): bool public function connectionPonged(ConnectionInterface $connection): bool
{ {
// This will update the score with the current timestamp. // This will update the score with the current timestamp.
$this->addConnectionToSet($connection); $this->addConnectionToSet($connection, Carbon::now());
return parent::connectionPonged($connection); return parent::connectionPonged($connection);
} }
@ -431,9 +431,7 @@ class RedisChannelManager extends LocalChannelManager
$this->lock()->get(function () { $this->lock()->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) { ->then(function ($connections) {
foreach ($connections as $connection => $score) { foreach ($connections as $appId => $socketId) {
[$appId, $socketId] = explode(':', $connection);
$this->unsubscribeFromAllChannels( $this->unsubscribeFromAllChannels(
$this->fakeConnectionForApp($appId, $socketId) $this->fakeConnectionForApp($appId, $socketId)
); );
@ -571,9 +569,11 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function addConnectionToSet(ConnectionInterface $connection, $moment = null) public function addConnectionToSet(ConnectionInterface $connection, $moment = null)
{ {
$moment = $moment ? Carbon::parse($moment) : Carbon::now();
$this->publishClient->zadd( $this->publishClient->zadd(
$this->getRedisKey(null, null, ['sockets']), $this->getRedisKey(null, null, ['sockets']),
Carbon::parse($moment)->format('U'), "{$connection->app->id}:{$connection->socketId}" $moment->format('U'), "{$connection->app->id}:{$connection->socketId}"
); );
} }
@ -597,16 +597,26 @@ class RedisChannelManager extends LocalChannelManager
* *
* @param int $start * @param int $start
* @param int $stop * @param int $stop
* @param bool $strict
* @return PromiseInterface * @return PromiseInterface
*/ */
public function getConnectionsFromSet(int $start = 0, int $stop = 0) public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true)
{ {
return $this->publishClient->zrange( if ($strict) {
$start = "({$start}";
$stop = "({$stop}";
}
return $this->publishClient->zrangebyscore(
$this->getRedisKey(null, null, ['sockets']), $this->getRedisKey(null, null, ['sockets']),
$start, $stop, 'withscores' $start, $stop
) )
->then(function ($list) { ->then(function ($list) {
return Helpers::redisListToArray($list); return collect($list)->mapWithKeys(function ($appWithSocket) {
[$appId, $socketId] = explode(':', $appWithSocket);
return [$appId => $socketId];
})->toArray();
}); });
} }

View File

@ -52,8 +52,8 @@ class PromiseResolver implements PromiseInterface
$result = call_user_func($onFulfilled, $result); $result = call_user_func($onFulfilled, $result);
return $result instanceof PromiseInterface return $result instanceof PromiseInterface
? $result ? new self($result, $this->loop)
: new FulfilledPromise($result); : new self(new FulfilledPromise($result), $this->loop);
} }
/** /**