Added easy extendable methods to change hash names for Redis

This commit is contained in:
Alex Renoki 2020-12-09 13:45:21 +02:00
parent 139608f9aa
commit bf049a346d
2 changed files with 112 additions and 39 deletions

View File

@ -56,7 +56,7 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function __construct(LoopInterface $loop, $factoryClass = null) public function __construct(LoopInterface $loop, $factoryClass = null)
{ {
parent::construct($loop, $factoryClass); parent::__construct($loop, $factoryClass);
$this->loop = $loop; $this->loop = $loop;
@ -87,7 +87,7 @@ class RedisChannelManager extends LocalChannelManager
public function getGlobalChannels($appId): PromiseInterface public function getGlobalChannels($appId): PromiseInterface
{ {
return $this->publishClient->smembers( return $this->publishClient->smembers(
$this->getRedisKey($appId, null, ['channels']) $this->getChannelsRedisHash($appId)
); );
} }
@ -214,7 +214,7 @@ class RedisChannelManager extends LocalChannelManager
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
{ {
return $this->publishClient return $this->publishClient
->hget($this->getRedisKey($appId, $channelName, ['stats']), 'connections') ->hget($this->getStatsRedisHash($appId, $channelName), 'connections')
->then(function ($count) { ->then(function ($count) {
return is_null($count) ? 0 : (int) $count; return is_null($count) ? 0 : (int) $count;
}); });
@ -237,7 +237,7 @@ class RedisChannelManager extends LocalChannelManager
$payload->serverId = $serverId ?: $this->getServerId(); $payload->serverId = $serverId ?: $this->getServerId();
return $this->publishClient return $this->publishClient
->publish($this->getRedisKey($appId, $channel), json_encode($payload)) ->publish($this->getRedisTopicName($appId, $channel), json_encode($payload))
->then(function () use ($appId, $socketId, $channel, $payload, $serverId) { ->then(function () use ($appId, $socketId, $channel, $payload, $serverId) {
return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId); return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId);
}); });
@ -293,7 +293,7 @@ class RedisChannelManager extends LocalChannelManager
public function getChannelMembers($appId, string $channel): PromiseInterface public function getChannelMembers($appId, string $channel): PromiseInterface
{ {
return $this->publishClient return $this->publishClient
->hgetall($this->getRedisKey($appId, $channel, ['users'])) ->hgetall($this->getUsersRedisHash($appId, $channel))
->then(function ($list) { ->then(function ($list) {
return collect(Helpers::redisListToArray($list))->map(function ($user) { return collect(Helpers::redisListToArray($list))->map(function ($user) {
return json_decode($user); return json_decode($user);
@ -311,7 +311,7 @@ class RedisChannelManager extends LocalChannelManager
public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface
{ {
return $this->publishClient->hget( return $this->publishClient->hget(
$this->getRedisKey($connection->app->id, $channel, ['users']), $connection->socketId $this->getUsersRedisHash($connection->app->id, $channel), $connection->socketId
); );
} }
@ -328,7 +328,7 @@ class RedisChannelManager extends LocalChannelManager
foreach ($channelNames as $channel) { foreach ($channelNames as $channel) {
$this->publishClient->hlen( $this->publishClient->hlen(
$this->getRedisKey($appId, $channel, ['users']) $this->getUsersRedisHash($appId, $channel)
); );
} }
@ -349,7 +349,7 @@ class RedisChannelManager extends LocalChannelManager
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
{ {
return $this->publishClient->smembers( return $this->publishClient->smembers(
$this->getRedisKey($appId, $channelName, [$userId, 'userSockets']) $this->getUserSocketsRedisHash($appId, $channelName, $userId)
); );
} }
@ -498,7 +498,7 @@ class RedisChannelManager extends LocalChannelManager
public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
{ {
return $this->publishClient->hincrby( return $this->publishClient->hincrby(
$this->getRedisKey($appId, $channel, ['stats']), 'connections', $increment $this->getStatsRedisHash($appId, $channel), 'connections', $increment
); );
} }
@ -527,7 +527,7 @@ class RedisChannelManager extends LocalChannelManager
$moment = $moment ? Carbon::parse($moment) : Carbon::now(); $moment = $moment ? Carbon::parse($moment) : Carbon::now();
return $this->publishClient->zadd( return $this->publishClient->zadd(
$this->getRedisKey(null, null, ['sockets']), $this->getSocketsRedisHash(),
$moment->format('U'), "{$connection->app->id}:{$connection->socketId}" $moment->format('U'), "{$connection->app->id}:{$connection->socketId}"
); );
} }
@ -541,7 +541,7 @@ class RedisChannelManager extends LocalChannelManager
public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface
{ {
return $this->publishClient->zrem( return $this->publishClient->zrem(
$this->getRedisKey(null, null, ['sockets']), $this->getSocketsRedisHash(),
"{$connection->app->id}:{$connection->socketId}" "{$connection->app->id}:{$connection->socketId}"
); );
} }
@ -563,7 +563,7 @@ class RedisChannelManager extends LocalChannelManager
} }
return $this->publishClient return $this->publishClient
->zrangebyscore($this->getRedisKey(null, null, ['sockets']), $start, $stop) ->zrangebyscore($this->getSocketsRedisHash(), $start, $stop)
->then(function ($list) { ->then(function ($list) {
return collect($list)->mapWithKeys(function ($appWithSocket) { return collect($list)->mapWithKeys(function ($appWithSocket) {
[$appId, $socketId] = explode(':', $appWithSocket); [$appId, $socketId] = explode(':', $appWithSocket);
@ -583,7 +583,7 @@ class RedisChannelManager extends LocalChannelManager
public function addChannelToSet($appId, string $channel): PromiseInterface public function addChannelToSet($appId, string $channel): PromiseInterface
{ {
return $this->publishClient->sadd( return $this->publishClient->sadd(
$this->getRedisKey($appId, null, ['channels']), $channel $this->getChannelsRedisHash($appId), $channel
); );
} }
@ -597,7 +597,7 @@ class RedisChannelManager extends LocalChannelManager
public function removeChannelFromSet($appId, string $channel): PromiseInterface public function removeChannelFromSet($appId, string $channel): PromiseInterface
{ {
return $this->publishClient->srem( return $this->publishClient->srem(
$this->getRedisKey($appId, null, ['channels']), $channel $this->getChannelsRedisHash($appId), $channel
); );
} }
@ -613,7 +613,7 @@ class RedisChannelManager extends LocalChannelManager
public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface
{ {
return $this->publishClient->hset( return $this->publishClient->hset(
$this->getRedisKey($appId, $channel, ['users']), $key, $data $this->getUsersRedisHash($appId, $channel), $key, $data
); );
} }
@ -628,7 +628,7 @@ class RedisChannelManager extends LocalChannelManager
public function removeUserData($appId, string $channel = null, string $key): PromiseInterface public function removeUserData($appId, string $channel = null, string $key): PromiseInterface
{ {
return $this->publishClient->hdel( return $this->publishClient->hdel(
$this->getRedisKey($appId, $channel, ['users']), $key $this->getUsersRedisHash($appId, $channel), $key
); );
} }
@ -641,7 +641,7 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function subscribeToTopic($appId, string $channel = null): PromiseInterface public function subscribeToTopic($appId, string $channel = null): PromiseInterface
{ {
$topic = $this->getRedisKey($appId, $channel); $topic = $this->getRedisTopicName($appId, $channel);
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
'serverId' => $this->getServerId(), 'serverId' => $this->getServerId(),
@ -660,7 +660,7 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface
{ {
$topic = $this->getRedisKey($appId, $channel); $topic = $this->getRedisTopicName($appId, $channel);
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [ DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
'serverId' => $this->getServerId(), 'serverId' => $this->getServerId(),
@ -682,7 +682,7 @@ class RedisChannelManager extends LocalChannelManager
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
{ {
return $this->publishClient->sadd( return $this->publishClient->sadd(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId
); );
} }
@ -698,7 +698,7 @@ class RedisChannelManager extends LocalChannelManager
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
{ {
return $this->publishClient->srem( return $this->publishClient->srem(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId
); );
} }
@ -729,6 +729,79 @@ class RedisChannelManager extends LocalChannelManager
return $hash; return $hash;
} }
/**
* Get the statistics Redis hash.
*
* @param string|int $appId
* @param string|null $channel
* @return string
*/
public function getStatsRedisHash($appId, string $channel = null): string
{
return $this->getRedisKey($appId, $channel, ['stats']);
}
/**
* Get the sockets Redis hash used to store all sockets ids.
*
* @return string
*/
public function getSocketsRedisHash(): string
{
return $this->getRedisKey(null, null, ['sockets']);
}
/**
* Get the channels Redis hash for a specific app id, used
* to store existing channels.
*
* @param string|int $appId
* @return string
*/
public function getChannelsRedisHash($appId): string
{
return $this->getRedisKey($appId, null, ['channels']);
}
/**
* Get the Redis hash for storing presence channels users.
*
* @param string|int $appId
* @param string|null $channel
* @return string
*/
public function getUsersRedisHash($appId, string $channel = null): string
{
return $this->getRedisKey($appId, $channel, ['users']);
}
/**
* Get the Redis hash for storing socket ids
* for a specific presence channels user.
*
* @param string|int $appId
* @param string|null $channel
* @param string|int|null $userId
* @return string
*/
public function getUserSocketsRedisHash($appId, string $channel = null, $userId = null): string
{
return $this->getRedisKey($appId, $channel, [$userId, 'userSockets']);
}
/**
* Get the Redis topic name for PubSub
* used to transfer info between servers.
*
* @param string|int $appId
* @param string|null $channel
* @return string
*/
public function getRedisTopicName($appId, string $channel = null): string
{
return $this->getRedisKey($appId, $channel);
}
/** /**
* Get a new RedisLock instance to avoid race conditions. * Get a new RedisLock instance to avoid race conditions.
* *

View File

@ -56,7 +56,7 @@ class RedisCollector extends MemoryCollector
public function webSocketMessage($appId) public function webSocketMessage($appId)
{ {
$this->ensureAppIsInSet($appId) $this->ensureAppIsInSet($appId)
->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'websocket_messages_count', 1); ->hincrby($this->channelManager->getStatsRedisHash($appId, null), 'websocket_messages_count', 1);
} }
/** /**
@ -68,7 +68,7 @@ class RedisCollector extends MemoryCollector
public function apiMessage($appId) public function apiMessage($appId)
{ {
$this->ensureAppIsInSet($appId) $this->ensureAppIsInSet($appId)
->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'api_messages_count', 1); ->hincrby($this->channelManager->getStatsRedisHash($appId, null), 'api_messages_count', 1);
} }
/** /**
@ -82,7 +82,7 @@ class RedisCollector extends MemoryCollector
// Increment the current connections count by 1. // Increment the current connections count by 1.
$this->ensureAppIsInSet($appId) $this->ensureAppIsInSet($appId)
->hincrby( ->hincrby(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'current_connections_count', 1 'current_connections_count', 1
) )
->then(function ($currentConnectionsCount) use ($appId) { ->then(function ($currentConnectionsCount) use ($appId) {
@ -90,7 +90,7 @@ class RedisCollector extends MemoryCollector
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hget( ->hget(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'peak_connections_count' 'peak_connections_count'
) )
->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) { ->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) {
@ -104,7 +104,7 @@ class RedisCollector extends MemoryCollector
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hset( ->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'peak_connections_count', $peakConnectionsCount 'peak_connections_count', $peakConnectionsCount
); );
}); });
@ -121,12 +121,12 @@ class RedisCollector extends MemoryCollector
{ {
// Decrement the current connections count by 1. // Decrement the current connections count by 1.
$this->ensureAppIsInSet($appId) $this->ensureAppIsInSet($appId)
->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'current_connections_count', -1) ->hincrby($this->channelManager->getStatsRedisHash($appId, null), 'current_connections_count', -1)
->then(function ($currentConnectionsCount) use ($appId) { ->then(function ($currentConnectionsCount) use ($appId) {
// Get the peak connections count from Redis. // Get the peak connections count from Redis.
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hget($this->channelManager->getRedisKey($appId, null, ['stats']), 'peak_connections_count') ->hget($this->channelManager->getStatsRedisHash($appId, null), 'peak_connections_count')
->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) { ->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) {
// Extract the greatest number between the current peak connection count // Extract the greatest number between the current peak connection count
// and the current connection number. // and the current connection number.
@ -138,7 +138,7 @@ class RedisCollector extends MemoryCollector
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hset( ->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'peak_connections_count', $peakConnectionsCount 'peak_connections_count', $peakConnectionsCount
); );
}); });
@ -160,7 +160,7 @@ class RedisCollector extends MemoryCollector
foreach ($members as $appId) { foreach ($members as $appId) {
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->hgetall($this->channelManager->getStatsRedisHash($appId, null))
->then(function ($list) use ($appId) { ->then(function ($list) use ($appId) {
if (! $list) { if (! $list) {
return; return;
@ -219,7 +219,7 @@ class RedisCollector extends MemoryCollector
foreach ($members as $appId) { foreach ($members as $appId) {
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->hgetall($this->channelManager->getStatsRedisHash($appId, null))
->then(function ($list) use ($appId, &$appsWithStatistics) { ->then(function ($list) use ($appId, &$appsWithStatistics) {
$appsWithStatistics[$appId] = $this->arrayToStatisticInstance( $appsWithStatistics[$appId] = $this->arrayToStatisticInstance(
$appId, Helpers::redisListToArray($list) $appId, Helpers::redisListToArray($list)
@ -241,7 +241,7 @@ class RedisCollector extends MemoryCollector
{ {
return $this->channelManager return $this->channelManager
->getPublishClient() ->getPublishClient()
->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->hgetall($this->channelManager->getStatsRedisHash($appId, null))
->then(function ($list) use ($appId) { ->then(function ($list) use ($appId) {
return $this->arrayToStatisticInstance( return $this->arrayToStatisticInstance(
$appId, Helpers::redisListToArray($list) $appId, Helpers::redisListToArray($list)
@ -261,28 +261,28 @@ class RedisCollector extends MemoryCollector
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hset( ->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'current_connections_count', $currentConnectionCount 'current_connections_count', $currentConnectionCount
); );
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hset( ->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'peak_connections_count', max(0, $currentConnectionCount) 'peak_connections_count', max(0, $currentConnectionCount)
); );
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hset( ->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'websocket_messages_count', 0 'websocket_messages_count', 0
); );
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hset( ->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'api_messages_count', 0 'api_messages_count', 0
); );
} }
@ -301,28 +301,28 @@ class RedisCollector extends MemoryCollector
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hdel( ->hdel(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'current_connections_count' 'current_connections_count'
); );
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hdel( ->hdel(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'peak_connections_count' 'peak_connections_count'
); );
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hdel( ->hdel(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'websocket_messages_count' 'websocket_messages_count'
); );
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hdel( ->hdel(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getStatsRedisHash($appId, null),
'api_messages_count' 'api_messages_count'
); );