From 589065910240f2dbc122362e95cd9175d0653a8a Mon Sep 17 00:00:00 2001 From: Alex Renoki Date: Mon, 17 Aug 2020 21:06:51 +0300 Subject: [PATCH] Refactored the dashboard logger --- src/Dashboard/DashboardLogger.php | 124 ++++---------- .../Controllers/TriggerEventController.php | 11 +- src/PubSub/Drivers/RedisClient.php | 151 +++++++++++------- src/WebSockets/Channels/Channel.php | 14 +- .../Messages/PusherClientMessage.php | 7 +- src/WebSockets/WebSocketHandler.php | 12 +- 6 files changed, 155 insertions(+), 164 deletions(-) diff --git a/src/Dashboard/DashboardLogger.php b/src/Dashboard/DashboardLogger.php index 2b00d3f..e787c47 100644 --- a/src/Dashboard/DashboardLogger.php +++ b/src/Dashboard/DashboardLogger.php @@ -10,9 +10,9 @@ class DashboardLogger { const LOG_CHANNEL_PREFIX = 'private-websockets-dashboard-'; - const TYPE_DISCONNECTION = 'disconnection'; + const TYPE_DISCONNECTED = 'disconnected'; - const TYPE_CONNECTION = 'connection'; + const TYPE_CONNECTED = 'connected'; const TYPE_VACATED = 'vacated'; @@ -20,7 +20,7 @@ class DashboardLogger const TYPE_SUBSCRIBED = 'subscribed'; - const TYPE_CLIENT_MESSAGE = 'client-message'; + const TYPE_WS_MESSAGE = 'ws-message'; const TYPE_API_MESSAGE = 'api-message'; @@ -28,101 +28,36 @@ class DashboardLogger const TYPE_REPLICATOR_UNSUBSCRIBED = 'replicator-unsubscribed'; - public static function connection(ConnectionInterface $connection) - { - /** @var \GuzzleHttp\Psr7\Request $request */ - $request = $connection->httpRequest; + const TYPE_REPLICATOR_JOINED_CHANNEL = 'replicator-joined'; - static::log($connection->app->id, static::TYPE_CONNECTION, [ - 'details' => [ - 'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}", - 'socketId' => $connection->socketId, - ], - ]); - } + const TYPE_REPLICATOR_LEFT_CHANNEL = 'replicator-left'; - public static function occupied(ConnectionInterface $connection, string $channelName) - { - static::log($connection->app->id, static::TYPE_OCCUPIED, [ - 'details' => [ - 'channel' => $channelName, - ], - ]); - } + const TYPE_REPLICATOR_MESSAGE_PUBLISHED = 'replicator-message-published'; - public static function subscribed(ConnectionInterface $connection, string $channelName) - { - static::log($connection->app->id, static::TYPE_SUBSCRIBED, [ - 'details' => [ - 'socketId' => $connection->socketId, - 'channel' => $channelName, - ], - ]); - } + const TYPE_REPLICATOR_MESSAGE_RECEIVED = 'replicator-message-received'; - public static function clientMessage(ConnectionInterface $connection, stdClass $payload) - { - static::log($connection->app->id, static::TYPE_CLIENT_MESSAGE, [ - 'details' => [ - 'socketId' => $connection->socketId, - 'channel' => $payload->channel, - 'event' => $payload->event, - 'data' => $payload, - ], - ]); - } + /** + * The list of all channels. + * + * @var array + */ + public static $channels = [ + self::TYPE_DISCONNECTED, + self::TYPE_CONNECTED, + self::TYPE_VACATED, + self::TYPE_OCCUPIED, + self::TYPE_SUBSCRIBED, + self::TYPE_WS_MESSAGE, + self::TYPE_API_MESSAGE, + self::TYPE_REPLICATOR_SUBSCRIBED, + self::TYPE_REPLICATOR_UNSUBSCRIBED, + self::TYPE_REPLICATOR_JOINED_CHANNEL, + self::TYPE_REPLICATOR_LEFT_CHANNEL, + self::TYPE_REPLICATOR_MESSAGE_PUBLISHED, + self::TYPE_REPLICATOR_MESSAGE_RECEIVED, + ]; - public static function disconnection(ConnectionInterface $connection) - { - static::log($connection->app->id, static::TYPE_DISCONNECTION, [ - 'details' => [ - 'socketId' => $connection->socketId, - ], - ]); - } - - public static function vacated(ConnectionInterface $connection, string $channelName) - { - static::log($connection->app->id, static::TYPE_VACATED, [ - 'details' => [ - 'socketId' => $connection->socketId, - 'channel' => $channelName, - ], - ]); - } - - public static function apiMessage($appId, string $channel, string $event, string $payload) - { - static::log($appId, static::TYPE_API_MESSAGE, [ - 'details' => [ - 'channel' => $connection, - 'event' => $event, - 'payload' => $payload, - ], - ]); - } - - public static function replicatorSubscribed(string $appId, string $channel, string $serverId) - { - static::log($appId, static::TYPE_REPLICATOR_SUBSCRIBED, [ - 'details' => [ - 'serverId' => $serverId, - 'channel' => $channel, - ], - ]); - } - - public static function replicatorUnsubscribed(string $appId, string $channel, string $serverId) - { - static::log($appId, static::TYPE_REPLICATOR_UNSUBSCRIBED, [ - 'details' => [ - 'serverId' => $serverId, - 'channel' => $channel, - ], - ]); - } - - public static function log($appId, string $type, array $attributes = []) + public static function log($appId, string $type, array $details = []) { $channelName = static::LOG_CHANNEL_PREFIX.$type; @@ -134,7 +69,8 @@ class DashboardLogger 'data' => [ 'type' => $type, 'time' => strftime('%H:%M:%S'), - ] + $attributes, + 'details' => $details, + ], ]); } } diff --git a/src/HttpApi/Controllers/TriggerEventController.php b/src/HttpApi/Controllers/TriggerEventController.php index bc921e4..819d417 100644 --- a/src/HttpApi/Controllers/TriggerEventController.php +++ b/src/HttpApi/Controllers/TriggerEventController.php @@ -21,12 +21,11 @@ class TriggerEventController extends Controller 'data' => $request->json()->get('data'), ], $request->json()->get('socket_id'), $request->appId); - DashboardLogger::apiMessage( - $request->appId, - $channelName, - $request->json()->get('name'), - $request->json()->get('data') - ); + DashboardLogger::log($request->appId, DashboardLogger::TYPE_API_MESSAGE, [ + 'channel' => $channelName, + 'event' => $request->json()->get('name'), + 'payload' => $request->json()->get('data'), + ]); StatisticsLogger::apiMessage($request->appId); } diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index ef48149..7b730c3 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -12,7 +12,7 @@ use React\EventLoop\LoopInterface; use React\Promise\PromiseInterface; use stdClass; -class RedisClient implements ReplicationInterface +class RedisClient extends LocalClient { /** * The running loop. @@ -90,49 +90,29 @@ class RedisClient implements ReplicationInterface } /** - * Handle a message received from Redis on a specific channel. + * Publish a message to a channel on behalf of a websocket user. * - * @param string $redisChannel - * @param string $payload - * @return void + * @param string $appId + * @param string $channel + * @param stdClass $payload + * @return bool */ - protected function onMessage(string $redisChannel, string $payload) + public function publish(string $appId, string $channel, stdClass $payload): bool { - $payload = json_decode($payload); + $payload->appId = $appId; + $payload->serverId = $this->getServerId(); - // Ignore messages sent by ourselves. - if (isset($payload->serverId) && $this->serverId === $payload->serverId) { - return; - } + $payload = json_encode($payload); - // Pull out the app ID. See RedisPusherBroadcaster - $appId = $payload->appId; + $this->publishClient->__call('publish', ["$appId:$channel", $payload]); - // We need to put the channel name in the payload. - // We strip the app ID from the channel name, websocket clients - // expect the channel name to not include the app ID. - $payload->channel = Str::after($redisChannel, "{$appId}:"); + DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [ + 'channel' => $channel, + 'serverId' => $this->getServerId(), + 'payload' => $payload, + ]); - $channelManager = app(ChannelManager::class); - - // Load the Channel instance to sync. - $channel = $channelManager->find($appId, $payload->channel); - - // If no channel is found, none of our connections want to - // receive this message, so we ignore it. - if (! $channel) { - return; - } - - $socket = $payload->socket ?? null; - - // Remove fields intended for internal use from the payload. - unset($payload->socket); - unset($payload->serverId); - unset($payload->appId); - - // Push the message out to connected websocket clients. - $channel->broadcastToEveryoneExcept($payload, $socket, $appId, false); + return true; } /** @@ -153,7 +133,10 @@ class RedisClient implements ReplicationInterface $this->subscribedChannels["$appId:$channel"]++; } - DashboardLogger::replicatorSubscribed($appId, $channel, $this->serverId); + DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [ + 'channel' => $channel, + 'serverId' => $this->getServerId(), + ]); return true; } @@ -181,25 +164,10 @@ class RedisClient implements ReplicationInterface unset($this->subscribedChannels["$appId:$channel"]); } - DashboardLogger::replicatorUnsubscribed($appId, $channel, $this->serverId); - - return true; - } - - /** - * Publish a message to a channel on behalf of a websocket user. - * - * @param string $appId - * @param string $channel - * @param stdClass $payload - * @return bool - */ - public function publish(string $appId, string $channel, stdClass $payload): bool - { - $payload->appId = $appId; - $payload->serverId = $this->serverId; - - $this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]); + DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [ + 'channel' => $channel, + 'serverId' => $this->getServerId(), + ]); return true; } @@ -217,6 +185,13 @@ class RedisClient implements ReplicationInterface public function joinChannel(string $appId, string $channel, string $socketId, string $data) { $this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]); + + DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [ + 'channel' => $channel, + 'serverId' => $this->getServerId(), + 'socketId' => $socketId, + 'data' => $data, + ]); } /** @@ -231,6 +206,12 @@ class RedisClient implements ReplicationInterface public function leaveChannel(string $appId, string $channel, string $socketId) { $this->publishClient->__call('hdel', ["$appId:$channel", $socketId]); + + DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [ + 'channel' => $channel, + 'serverId' => $this->getServerId(), + 'socketId' => $socketId, + ]); } /** @@ -272,6 +253,62 @@ class RedisClient implements ReplicationInterface }); } + /** + * Handle a message received from Redis on a specific channel. + * + * @param string $redisChannel + * @param string $payload + * @return void + */ + protected function onMessage(string $redisChannel, string $payload) + { + $payload = json_decode($payload); + + // Ignore messages sent by ourselves. + if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) { + return; + } + + // Pull out the app ID. See RedisPusherBroadcaster + $appId = $payload->appId; + + // We need to put the channel name in the payload. + // We strip the app ID from the channel name, websocket clients + // expect the channel name to not include the app ID. + $payload->channel = Str::after($redisChannel, "{$appId}:"); + + $channelManager = app(ChannelManager::class); + + // Load the Channel instance to sync. + $channel = $channelManager->find($appId, $payload->channel); + + // If no channel is found, none of our connections want to + // receive this message, so we ignore it. + if (! $channel) { + return; + } + + $socket = $payload->socket ?? null; + $serverId = $payload->serverId ?? null; + + // Remove fields intended for internal use from the payload. + unset($payload->socket); + unset($payload->serverId); + unset($payload->appId); + + // Push the message out to connected websocket clients. + $channel->broadcastToEveryoneExcept($payload, $socket, $appId, false); + + DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [ + 'channel' => $channel->getChannelName(), + 'redisChannel' => $redisChannel, + 'serverId' => $this->getServer(), + 'incomingServerId' => $serverId, + 'incomingSocketId' => $socket, + 'payload' => $payload, + ]); + } + /** * Build the Redis connection URL from Laravel database config. * diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 8e301c1..cd7e473 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -82,7 +82,10 @@ class Channel $this->replicator->unsubscribe($connection->app->id, $this->channelName); if (! $this->hasConnections()) { - DashboardLogger::vacated($connection, $this->channelName); + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_VACATED, [ + 'socketId' => $connection->socketId, + 'channel' => $this->channelName, + ]); } } @@ -93,10 +96,15 @@ class Channel $this->subscribedConnections[$connection->socketId] = $connection; if (! $hadConnectionsPreviously) { - DashboardLogger::occupied($connection, $this->channelName); + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_OCCUPIED, [ + 'channel' => $this->channelName, + ]); } - DashboardLogger::subscribed($connection, $this->channelName); + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ + 'socketId' => $connection->socketId, + 'channel' => $this->channelName, + ]); } public function broadcast($payload) diff --git a/src/WebSockets/Messages/PusherClientMessage.php b/src/WebSockets/Messages/PusherClientMessage.php index f7c4c45..1ef519c 100644 --- a/src/WebSockets/Messages/PusherClientMessage.php +++ b/src/WebSockets/Messages/PusherClientMessage.php @@ -38,7 +38,12 @@ class PusherClientMessage implements PusherMessage return; } - DashboardLogger::clientMessage($this->connection, $this->payload); + DashboardLogger::log($this->connection->app->id, DashboardLogger::TYPE_WS_MESSAGE, [ + 'socketId' => $this->connection->socketId, + 'channel' => $this->payload->channel, + 'event' => $this->payload->event, + 'data' => $this->payload, + ]); $channel = $this->channelManager->find($this->connection->app->id, $this->payload->channel); diff --git a/src/WebSockets/WebSocketHandler.php b/src/WebSockets/WebSocketHandler.php index d2fdb6c..96a2fe3 100644 --- a/src/WebSockets/WebSocketHandler.php +++ b/src/WebSockets/WebSocketHandler.php @@ -48,7 +48,9 @@ class WebSocketHandler implements MessageComponentInterface { $this->channelManager->removeFromAllChannels($connection); - DashboardLogger::disconnection($connection); + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_DISCONNECTED, [ + 'socketId' => $connection->socketId, + ]); StatisticsLogger::disconnection($connection); } @@ -106,9 +108,13 @@ class WebSocketHandler implements MessageComponentInterface ]), ])); - DashboardLogger::connection($connection); + /** @var \GuzzleHttp\Psr7\Request $request */ + $request = $connection->httpRequest; - StatisticsLogger::connection($connection); + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_CONNECTED, [ + 'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}", + 'socketId' => $connection->socketId, + ]); return $this; }