diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 9b71d9a..f17e96e 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -160,7 +160,7 @@ class RedisChannelManager extends LocalChannelManager { $this->subscribeToTopic($connection->app->id, $channelName); - $this->addConnectionToSet($connection); + $this->addConnectionToSet($connection, Carbon::now()); $this->addChannelToSet( $connection->app->id, $channelName @@ -416,7 +416,7 @@ class RedisChannelManager extends LocalChannelManager public function connectionPonged(ConnectionInterface $connection): bool { // This will update the score with the current timestamp. - $this->addConnectionToSet($connection); + $this->addConnectionToSet($connection, Carbon::now()); return parent::connectionPonged($connection); } @@ -431,9 +431,7 @@ class RedisChannelManager extends LocalChannelManager $this->lock()->get(function () { $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) ->then(function ($connections) { - foreach ($connections as $connection => $score) { - [$appId, $socketId] = explode(':', $connection); - + foreach ($connections as $appId => $socketId) { $this->unsubscribeFromAllChannels( $this->fakeConnectionForApp($appId, $socketId) ); @@ -571,9 +569,11 @@ class RedisChannelManager extends LocalChannelManager */ public function addConnectionToSet(ConnectionInterface $connection, $moment = null) { + $moment = $moment ? Carbon::parse($moment) : Carbon::now(); + $this->publishClient->zadd( $this->getRedisKey(null, null, ['sockets']), - Carbon::parse($moment)->format('U'), "{$connection->app->id}:{$connection->socketId}" + $moment->format('U'), "{$connection->app->id}:{$connection->socketId}" ); } @@ -597,16 +597,26 @@ class RedisChannelManager extends LocalChannelManager * * @param int $start * @param int $stop + * @param bool $strict * @return PromiseInterface */ - public function getConnectionsFromSet(int $start = 0, int $stop = 0) + public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true) { - return $this->publishClient->zrange( + if ($strict) { + $start = "({$start}"; + $stop = "({$stop}"; + } + + return $this->publishClient->zrangebyscore( $this->getRedisKey(null, null, ['sockets']), - $start, $stop, 'withscores' + $start, $stop ) ->then(function ($list) { - return Helpers::redisListToArray($list); + return collect($list)->mapWithKeys(function ($appWithSocket) { + [$appId, $socketId] = explode(':', $appWithSocket); + + return [$appId => $socketId]; + })->toArray(); }); } diff --git a/tests/Mocks/PromiseResolver.php b/tests/Mocks/PromiseResolver.php index bbc0df7..dfec306 100644 --- a/tests/Mocks/PromiseResolver.php +++ b/tests/Mocks/PromiseResolver.php @@ -52,8 +52,8 @@ class PromiseResolver implements PromiseInterface $result = call_user_func($onFulfilled, $result); return $result instanceof PromiseInterface - ? $result - : new FulfilledPromise($result); + ? new self($result, $this->loop) + : new self(new FulfilledPromise($result), $this->loop); } /**