diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 7d8fde4..f96aff2 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -56,7 +56,7 @@ class RedisChannelManager extends LocalChannelManager */ public function __construct(LoopInterface $loop, $factoryClass = null) { - parent::construct($loop, $factoryClass); + parent::__construct($loop, $factoryClass); $this->loop = $loop; @@ -87,7 +87,7 @@ class RedisChannelManager extends LocalChannelManager public function getGlobalChannels($appId): PromiseInterface { return $this->publishClient->smembers( - $this->getRedisKey($appId, null, ['channels']) + $this->getChannelsRedisHash($appId) ); } @@ -214,7 +214,7 @@ class RedisChannelManager extends LocalChannelManager public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface { return $this->publishClient - ->hget($this->getRedisKey($appId, $channelName, ['stats']), 'connections') + ->hget($this->getStatsRedisHash($appId, $channelName), 'connections') ->then(function ($count) { return is_null($count) ? 0 : (int) $count; }); @@ -237,7 +237,7 @@ class RedisChannelManager extends LocalChannelManager $payload->serverId = $serverId ?: $this->getServerId(); return $this->publishClient - ->publish($this->getRedisKey($appId, $channel), json_encode($payload)) + ->publish($this->getRedisTopicName($appId, $channel), json_encode($payload)) ->then(function () use ($appId, $socketId, $channel, $payload, $serverId) { return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId); }); @@ -293,7 +293,7 @@ class RedisChannelManager extends LocalChannelManager public function getChannelMembers($appId, string $channel): PromiseInterface { return $this->publishClient - ->hgetall($this->getRedisKey($appId, $channel, ['users'])) + ->hgetall($this->getUsersRedisHash($appId, $channel)) ->then(function ($list) { return collect(Helpers::redisListToArray($list))->map(function ($user) { return json_decode($user); @@ -311,7 +311,7 @@ class RedisChannelManager extends LocalChannelManager public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface { return $this->publishClient->hget( - $this->getRedisKey($connection->app->id, $channel, ['users']), $connection->socketId + $this->getUsersRedisHash($connection->app->id, $channel), $connection->socketId ); } @@ -328,7 +328,7 @@ class RedisChannelManager extends LocalChannelManager foreach ($channelNames as $channel) { $this->publishClient->hlen( - $this->getRedisKey($appId, $channel, ['users']) + $this->getUsersRedisHash($appId, $channel) ); } @@ -349,7 +349,7 @@ class RedisChannelManager extends LocalChannelManager public function getMemberSockets($userId, $appId, $channelName): PromiseInterface { return $this->publishClient->smembers( - $this->getRedisKey($appId, $channelName, [$userId, 'userSockets']) + $this->getUserSocketsRedisHash($appId, $channelName, $userId) ); } @@ -498,7 +498,7 @@ class RedisChannelManager extends LocalChannelManager public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface { return $this->publishClient->hincrby( - $this->getRedisKey($appId, $channel, ['stats']), 'connections', $increment + $this->getStatsRedisHash($appId, $channel), 'connections', $increment ); } @@ -527,7 +527,7 @@ class RedisChannelManager extends LocalChannelManager $moment = $moment ? Carbon::parse($moment) : Carbon::now(); return $this->publishClient->zadd( - $this->getRedisKey(null, null, ['sockets']), + $this->getSocketsRedisHash(), $moment->format('U'), "{$connection->app->id}:{$connection->socketId}" ); } @@ -541,7 +541,7 @@ class RedisChannelManager extends LocalChannelManager public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface { return $this->publishClient->zrem( - $this->getRedisKey(null, null, ['sockets']), + $this->getSocketsRedisHash(), "{$connection->app->id}:{$connection->socketId}" ); } @@ -563,7 +563,7 @@ class RedisChannelManager extends LocalChannelManager } return $this->publishClient - ->zrangebyscore($this->getRedisKey(null, null, ['sockets']), $start, $stop) + ->zrangebyscore($this->getSocketsRedisHash(), $start, $stop) ->then(function ($list) { return collect($list)->mapWithKeys(function ($appWithSocket) { [$appId, $socketId] = explode(':', $appWithSocket); @@ -583,7 +583,7 @@ class RedisChannelManager extends LocalChannelManager public function addChannelToSet($appId, string $channel): PromiseInterface { return $this->publishClient->sadd( - $this->getRedisKey($appId, null, ['channels']), $channel + $this->getChannelsRedisHash($appId), $channel ); } @@ -597,7 +597,7 @@ class RedisChannelManager extends LocalChannelManager public function removeChannelFromSet($appId, string $channel): PromiseInterface { return $this->publishClient->srem( - $this->getRedisKey($appId, null, ['channels']), $channel + $this->getChannelsRedisHash($appId), $channel ); } @@ -613,7 +613,7 @@ class RedisChannelManager extends LocalChannelManager public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface { return $this->publishClient->hset( - $this->getRedisKey($appId, $channel, ['users']), $key, $data + $this->getUsersRedisHash($appId, $channel), $key, $data ); } @@ -628,7 +628,7 @@ class RedisChannelManager extends LocalChannelManager public function removeUserData($appId, string $channel = null, string $key): PromiseInterface { return $this->publishClient->hdel( - $this->getRedisKey($appId, $channel, ['users']), $key + $this->getUsersRedisHash($appId, $channel), $key ); } @@ -641,7 +641,7 @@ class RedisChannelManager extends LocalChannelManager */ public function subscribeToTopic($appId, string $channel = null): PromiseInterface { - $topic = $this->getRedisKey($appId, $channel); + $topic = $this->getRedisTopicName($appId, $channel); DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [ 'serverId' => $this->getServerId(), @@ -660,7 +660,7 @@ class RedisChannelManager extends LocalChannelManager */ public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface { - $topic = $this->getRedisKey($appId, $channel); + $topic = $this->getRedisTopicName($appId, $channel); DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [ 'serverId' => $this->getServerId(), @@ -682,7 +682,7 @@ class RedisChannelManager extends LocalChannelManager protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface { return $this->publishClient->sadd( - $this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId + $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId ); } @@ -698,7 +698,7 @@ class RedisChannelManager extends LocalChannelManager protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface { return $this->publishClient->srem( - $this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId + $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId ); } @@ -729,6 +729,79 @@ class RedisChannelManager extends LocalChannelManager return $hash; } + /** + * Get the statistics Redis hash. + * + * @param string|int $appId + * @param string|null $channel + * @return string + */ + public function getStatsRedisHash($appId, string $channel = null): string + { + return $this->getRedisKey($appId, $channel, ['stats']); + } + + /** + * Get the sockets Redis hash used to store all sockets ids. + * + * @return string + */ + public function getSocketsRedisHash(): string + { + return $this->getRedisKey(null, null, ['sockets']); + } + + /** + * Get the channels Redis hash for a specific app id, used + * to store existing channels. + * + * @param string|int $appId + * @return string + */ + public function getChannelsRedisHash($appId): string + { + return $this->getRedisKey($appId, null, ['channels']); + } + + /** + * Get the Redis hash for storing presence channels users. + * + * @param string|int $appId + * @param string|null $channel + * @return string + */ + public function getUsersRedisHash($appId, string $channel = null): string + { + return $this->getRedisKey($appId, $channel, ['users']); + } + + /** + * Get the Redis hash for storing socket ids + * for a specific presence channels user. + * + * @param string|int $appId + * @param string|null $channel + * @param string|int|null $userId + * @return string + */ + public function getUserSocketsRedisHash($appId, string $channel = null, $userId = null): string + { + return $this->getRedisKey($appId, $channel, [$userId, 'userSockets']); + } + + /** + * Get the Redis topic name for PubSub + * used to transfer info between servers. + * + * @param string|int $appId + * @param string|null $channel + * @return string + */ + public function getRedisTopicName($appId, string $channel = null): string + { + return $this->getRedisKey($appId, $channel); + } + /** * Get a new RedisLock instance to avoid race conditions. * diff --git a/src/Statistics/Collectors/RedisCollector.php b/src/Statistics/Collectors/RedisCollector.php index bb5c688..921771a 100644 --- a/src/Statistics/Collectors/RedisCollector.php +++ b/src/Statistics/Collectors/RedisCollector.php @@ -56,7 +56,7 @@ class RedisCollector extends MemoryCollector public function webSocketMessage($appId) { $this->ensureAppIsInSet($appId) - ->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'websocket_messages_count', 1); + ->hincrby($this->channelManager->getStatsRedisHash($appId, null), 'websocket_messages_count', 1); } /** @@ -68,7 +68,7 @@ class RedisCollector extends MemoryCollector public function apiMessage($appId) { $this->ensureAppIsInSet($appId) - ->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'api_messages_count', 1); + ->hincrby($this->channelManager->getStatsRedisHash($appId, null), 'api_messages_count', 1); } /** @@ -82,7 +82,7 @@ class RedisCollector extends MemoryCollector // Increment the current connections count by 1. $this->ensureAppIsInSet($appId) ->hincrby( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'current_connections_count', 1 ) ->then(function ($currentConnectionsCount) use ($appId) { @@ -90,7 +90,7 @@ class RedisCollector extends MemoryCollector $this->channelManager ->getPublishClient() ->hget( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'peak_connections_count' ) ->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) { @@ -104,7 +104,7 @@ class RedisCollector extends MemoryCollector $this->channelManager ->getPublishClient() ->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'peak_connections_count', $peakConnectionsCount ); }); @@ -121,12 +121,12 @@ class RedisCollector extends MemoryCollector { // Decrement the current connections count by 1. $this->ensureAppIsInSet($appId) - ->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'current_connections_count', -1) + ->hincrby($this->channelManager->getStatsRedisHash($appId, null), 'current_connections_count', -1) ->then(function ($currentConnectionsCount) use ($appId) { // Get the peak connections count from Redis. $this->channelManager ->getPublishClient() - ->hget($this->channelManager->getRedisKey($appId, null, ['stats']), 'peak_connections_count') + ->hget($this->channelManager->getStatsRedisHash($appId, null), 'peak_connections_count') ->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) { // Extract the greatest number between the current peak connection count // and the current connection number. @@ -138,7 +138,7 @@ class RedisCollector extends MemoryCollector $this->channelManager ->getPublishClient() ->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'peak_connections_count', $peakConnectionsCount ); }); @@ -160,7 +160,7 @@ class RedisCollector extends MemoryCollector foreach ($members as $appId) { $this->channelManager ->getPublishClient() - ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) + ->hgetall($this->channelManager->getStatsRedisHash($appId, null)) ->then(function ($list) use ($appId) { if (! $list) { return; @@ -219,7 +219,7 @@ class RedisCollector extends MemoryCollector foreach ($members as $appId) { $this->channelManager ->getPublishClient() - ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) + ->hgetall($this->channelManager->getStatsRedisHash($appId, null)) ->then(function ($list) use ($appId, &$appsWithStatistics) { $appsWithStatistics[$appId] = $this->arrayToStatisticInstance( $appId, Helpers::redisListToArray($list) @@ -241,7 +241,7 @@ class RedisCollector extends MemoryCollector { return $this->channelManager ->getPublishClient() - ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) + ->hgetall($this->channelManager->getStatsRedisHash($appId, null)) ->then(function ($list) use ($appId) { return $this->arrayToStatisticInstance( $appId, Helpers::redisListToArray($list) @@ -261,28 +261,28 @@ class RedisCollector extends MemoryCollector $this->channelManager ->getPublishClient() ->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'current_connections_count', $currentConnectionCount ); $this->channelManager ->getPublishClient() ->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'peak_connections_count', max(0, $currentConnectionCount) ); $this->channelManager ->getPublishClient() ->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'websocket_messages_count', 0 ); $this->channelManager ->getPublishClient() ->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'api_messages_count', 0 ); } @@ -301,28 +301,28 @@ class RedisCollector extends MemoryCollector $this->channelManager ->getPublishClient() ->hdel( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'current_connections_count' ); $this->channelManager ->getPublishClient() ->hdel( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'peak_connections_count' ); $this->channelManager ->getPublishClient() ->hdel( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'websocket_messages_count' ); $this->channelManager ->getPublishClient() ->hdel( - $this->channelManager->getRedisKey($appId, null, ['stats']), + $this->channelManager->getStatsRedisHash($appId, null), 'api_messages_count' );