diff --git a/src/API/FetchChannels.php b/src/API/FetchChannels.php index 9e3ef3f..ddd39cc 100644 --- a/src/API/FetchChannels.php +++ b/src/API/FetchChannels.php @@ -28,48 +28,50 @@ class FetchChannels extends Controller } } - return $this->channelManager->getGlobalChannels($request->appId)->then(function ($channels) use ($request, $attributes) { - $channels = collect($channels)->keyBy(function ($channel) { - return $channel instanceof Channel - ? $channel->getName() - : $channel; + return $this->channelManager + ->getGlobalChannels($request->appId) + ->then(function ($channels) use ($request, $attributes) { + $channels = collect($channels)->keyBy(function ($channel) { + return $channel instanceof Channel + ? $channel->getName() + : $channel; + }); + + if ($request->has('filter_by_prefix')) { + $channels = $channels->filter(function ($channel, $channelName) use ($request) { + return Str::startsWith($channelName, $request->filter_by_prefix); + }); + } + + $channelNames = $channels->map(function ($channel) { + return $channel instanceof Channel + ? $channel->getName() + : $channel; + })->toArray(); + + return $this->channelManager + ->getChannelsMembersCount($request->appId, $channelNames) + ->then(function ($counts) use ($channels, $attributes) { + $channels = $channels->map(function ($channel) use ($counts, $attributes) { + $info = new stdClass; + + $channelName = $channel instanceof Channel + ? $channel->getName() + : $channel; + + if (in_array('user_count', $attributes)) { + $info->user_count = $counts[$channelName]; + } + + return $info; + })->sortBy(function ($content, $name) { + return $name; + })->all(); + + return [ + 'channels' => $channels ?: new stdClass, + ]; + }); }); - - if ($request->has('filter_by_prefix')) { - $channels = $channels->filter(function ($channel, $channelName) use ($request) { - return Str::startsWith($channelName, $request->filter_by_prefix); - }); - } - - $channelNames = $channels->map(function ($channel) { - return $channel instanceof Channel - ? $channel->getName() - : $channel; - })->toArray(); - - return $this->channelManager - ->getChannelsMembersCount($request->appId, $channelNames) - ->then(function ($counts) use ($channels, $attributes) { - $channels = $channels->map(function ($channel) use ($counts, $attributes) { - $info = new stdClass; - - $channelName = $channel instanceof Channel - ? $channel->getName() - : $channel; - - if (in_array('user_count', $attributes)) { - $info->user_count = $counts[$channelName]; - } - - return $info; - })->sortBy(function ($content, $name) { - return $name; - })->all(); - - return [ - 'channels' => $channels ?: new stdClass, - ]; - }); - }); } } diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 03dbd21..919a239 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -163,21 +163,23 @@ class LocalChannelManager implements ChannelManager return Helpers::createFulfilledPromise(false); } - $this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { - collect($channels)->each->unsubscribe($connection); + $this->getLocalChannels($connection->app->id) + ->then(function ($channels) use ($connection) { + collect($channels)->each->unsubscribe($connection); - collect($channels) - ->reject->hasConnections() - ->each(function (Channel $channel, string $channelName) use ($connection) { - unset($this->channels[$connection->app->id][$channelName]); - }); - }); + collect($channels) + ->reject->hasConnections() + ->each(function (Channel $channel, string $channelName) use ($connection) { + unset($this->channels[$connection->app->id][$channelName]); + }); + }); - $this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { - if (count($channels) === 0) { - unset($this->channels[$connection->app->id]); - } - }); + $this->getLocalChannels($connection->app->id) + ->then(function ($channels) use ($connection) { + if (count($channels) === 0) { + unset($this->channels[$connection->app->id]); + } + }); return Helpers::createFulfilledPromise(true); } @@ -250,17 +252,18 @@ class LocalChannelManager implements ChannelManager */ public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface { - return $this->getLocalChannels($appId)->then(function ($channels) use ($channelName) { - return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) { - return $collection->filter(function (Channel $channel) use ($channelName) { - return $channel->getName() === $channelName; - }); - }) - ->flatMap(function (Channel $channel) { - return collect($channel->getConnections())->pluck('socketId'); - }) - ->unique()->count(); - }); + return $this->getLocalChannels($appId) + ->then(function ($channels) use ($channelName) { + return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) { + return $collection->filter(function (Channel $channel) use ($channelName) { + return $channel->getName() === $channelName; + }); + }) + ->flatMap(function (Channel $channel) { + return collect($channel->getConnections())->pluck('socketId'); + }) + ->unique()->count(); + }); } /** @@ -452,15 +455,16 @@ class LocalChannelManager implements ChannelManager */ public function updateConnectionInChannels($connection): PromiseInterface { - return $this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { - foreach ($channels as $channel) { - if ($channel->hasConnection($connection)) { - $channel->saveConnection($connection); + return $this->getLocalChannels($connection->app->id) + ->then(function ($channels) use ($connection) { + foreach ($channels as $channel) { + if ($channel->hasConnection($connection)) { + $channel->saveConnection($connection); + } } - } - return true; - }); + return true; + }); } /** diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index a927e68..01e3419 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -137,13 +137,15 @@ class RedisChannelManager extends LocalChannelManager */ public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface { - return $this->getGlobalChannels($connection->app->id)->then(function ($channels) use ($connection) { - foreach ($channels as $channel) { - $this->unsubscribeFromChannel($connection, $channel, new stdClass); - } - })->then(function () use ($connection) { - return parent::unsubscribeFromAllChannels($connection); - }); + return $this->getGlobalChannels($connection->app->id) + ->then(function ($channels) use ($connection) { + foreach ($channels as $channel) { + $this->unsubscribeFromChannel($connection, $channel, new stdClass); + } + }) + ->then(function () use ($connection) { + return parent::unsubscribeFromAllChannels($connection); + }); } /** @@ -156,15 +158,19 @@ class RedisChannelManager extends LocalChannelManager */ public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface { - return $this->subscribeToTopic($connection->app->id, $channelName)->then(function () use ($connection) { - return $this->addConnectionToSet($connection, Carbon::now()); - })->then(function () use ($connection, $channelName) { - return $this->addChannelToSet($connection->app->id, $channelName); - })->then(function () use ($connection, $channelName) { - return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1); - })->then(function () use ($connection, $channelName, $payload) { - return parent::subscribeToChannel($connection, $channelName, $payload); - }); + return $this->subscribeToTopic($connection->app->id, $channelName) + ->then(function () use ($connection) { + return $this->addConnectionToSet($connection, Carbon::now()); + }) + ->then(function () use ($connection, $channelName) { + return $this->addChannelToSet($connection->app->id, $channelName); + }) + ->then(function () use ($connection, $channelName) { + return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1); + }) + ->then(function () use ($connection, $channelName, $payload) { + return parent::subscribeToChannel($connection, $channelName, $payload); + }); } /** @@ -193,11 +199,14 @@ class RedisChannelManager extends LocalChannelManager $this->unsubscribeFromTopic($connection->app->id, $channelName); } }); - })->then(function () use ($connection, $channelName) { + }) + ->then(function () use ($connection, $channelName) { return $this->removeChannelFromSet($connection->app->id, $channelName); - })->then(function () use ($connection) { + }) + ->then(function () use ($connection) { return $this->removeConnectionFromSet($connection); - })->then(function () use ($connection, $channelName, $payload) { + }) + ->then(function () use ($connection, $channelName, $payload) { return parent::unsubscribeFromChannel($connection, $channelName, $payload); }); } @@ -211,9 +220,10 @@ class RedisChannelManager extends LocalChannelManager */ public function subscribeToApp($appId): PromiseInterface { - return $this->subscribeToTopic($appId)->then(function () use ($appId) { - return $this->incrementSubscriptionsCount($appId); - }); + return $this->subscribeToTopic($appId) + ->then(function () use ($appId) { + return $this->incrementSubscriptionsCount($appId); + }); } /** @@ -225,9 +235,10 @@ class RedisChannelManager extends LocalChannelManager */ public function unsubscribeFromApp($appId): PromiseInterface { - return $this->unsubscribeFromTopic($appId)->then(function () use ($appId) { - return $this->decrementSubscriptionsCount($appId); - }); + return $this->unsubscribeFromTopic($appId) + ->then(function () use ($appId) { + return $this->decrementSubscriptionsCount($appId); + }); } /** @@ -297,7 +308,8 @@ class RedisChannelManager extends LocalChannelManager return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user)) ->then(function () use ($connection, $channel, $user) { return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId); - })->then(function () use ($connection, $user, $channel, $payload) { + }) + ->then(function () use ($connection, $user, $channel, $payload) { return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload); }); } @@ -316,7 +328,8 @@ class RedisChannelManager extends LocalChannelManager return $this->removeUserData($connection->app->id, $channel, $connection->socketId) ->then(function () use ($connection, $channel, $user) { return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId); - })->then(function () use ($connection, $user, $channel) { + }) + ->then(function () use ($connection, $user, $channel) { return parent::userLeftPresenceChannel($connection, $user, $channel); }); } @@ -370,9 +383,10 @@ class RedisChannelManager extends LocalChannelManager ); } - return $this->publishClient->exec()->then(function ($data) use ($channelNames) { - return array_combine($channelNames, $data); - }); + return $this->publishClient->exec() + ->then(function ($data) use ($channelNames) { + return array_combine($channelNames, $data); + }); } /** @@ -399,9 +413,10 @@ class RedisChannelManager extends LocalChannelManager public function connectionPonged(ConnectionInterface $connection): PromiseInterface { // This will update the score with the current timestamp. - return $this->addConnectionToSet($connection, Carbon::now())->then(function () use ($connection) { - return parent::connectionPonged($connection); - }); + return $this->addConnectionToSet($connection, Carbon::now()) + ->then(function () use ($connection) { + return parent::connectionPonged($connection); + }); } /** diff --git a/src/Channels/PresenceChannel.php b/src/Channels/PresenceChannel.php index 11fe900..614fe8d 100644 --- a/src/Channels/PresenceChannel.php +++ b/src/Channels/PresenceChannel.php @@ -54,7 +54,8 @@ class PresenceChannel extends PrivateChannel ]), ])); }); - })->then(function () use ($connection, $user, $payload) { + }) + ->then(function () use ($connection, $user, $payload) { // The `pusher_internal:member_added` event is triggered when a user joins a channel. // It's quite possible that a user can have multiple connections to the same channel // (for example by having multiple browser tabs open) @@ -103,47 +104,50 @@ class PresenceChannel extends PrivateChannel { $truth = parent::unsubscribe($connection); - $this->channelManager->getChannelMember($connection, $this->getName())->then(function ($user) { - return @json_decode($user); - })->then(function ($user) use ($connection) { - if (! $user) { - return; - } + $this->channelManager + ->getChannelMember($connection, $this->getName()) + ->then(function ($user) { + return @json_decode($user); + }) + ->then(function ($user) use ($connection) { + if (! $user) { + return; + } - $this->channelManager - ->userLeftPresenceChannel($connection, $user, $this->getName()) - ->then(function () use ($connection, $user) { - // The `pusher_internal:member_removed` is triggered when a user leaves a channel. - // It's quite possible that a user can have multiple connections to the same channel - // (for example by having multiple browser tabs open) - // and in this case the events will only be triggered when the last one is closed. - $this->channelManager - ->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) - ->then(function ($sockets) use ($connection, $user) { - if (count($sockets) === 0) { - $memberRemovedPayload = [ - 'event' => 'pusher_internal:member_removed', - 'channel' => $this->getName(), - 'data' => json_encode([ - 'user_id' => $user->user_id, - ]), - ]; + $this->channelManager + ->userLeftPresenceChannel($connection, $user, $this->getName()) + ->then(function () use ($connection, $user) { + // The `pusher_internal:member_removed` is triggered when a user leaves a channel. + // It's quite possible that a user can have multiple connections to the same channel + // (for example by having multiple browser tabs open) + // and in this case the events will only be triggered when the last one is closed. + $this->channelManager + ->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) + ->then(function ($sockets) use ($connection, $user) { + if (count($sockets) === 0) { + $memberRemovedPayload = [ + 'event' => 'pusher_internal:member_removed', + 'channel' => $this->getName(), + 'data' => json_encode([ + 'user_id' => $user->user_id, + ]), + ]; - $this->broadcastToEveryoneExcept( - (object) $memberRemovedPayload, $connection->socketId, - $connection->app->id - ); + $this->broadcastToEveryoneExcept( + (object) $memberRemovedPayload, $connection->socketId, + $connection->app->id + ); - UnsubscribedFromChannel::dispatch( - $connection->app->id, - $connection->socketId, - $this->getName(), - $user - ); - } - }); - }); - }); + UnsubscribedFromChannel::dispatch( + $connection->app->id, + $connection->socketId, + $this->getName(), + $user + ); + } + }); + }); + }); return $truth; } diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index b586748..890a4f1 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -304,12 +304,14 @@ class StartServer extends Command // Get all local connections and close them. They will // be automatically be unsubscribed from all channels. - $channelManager->getLocalConnections()->then(function ($connections) { - foreach ($connections as $connection) { - $connection->close(); - } - })->then(function () { - $this->loop->stop(); - }); + $channelManager->getLocalConnections() + ->then(function ($connections) { + foreach ($connections as $connection) { + $connection->close(); + } + }) + ->then(function () { + $this->loop->stop(); + }); } } diff --git a/src/Server/Messages/PusherChannelProtocolMessage.php b/src/Server/Messages/PusherChannelProtocolMessage.php index 4857bd8..c6f4f13 100644 --- a/src/Server/Messages/PusherChannelProtocolMessage.php +++ b/src/Server/Messages/PusherChannelProtocolMessage.php @@ -32,11 +32,13 @@ class PusherChannelProtocolMessage extends PusherClientMessage */ protected function ping(ConnectionInterface $connection) { - $this->channelManager->connectionPonged($connection)->then(function () use ($connection) { - $connection->send(json_encode(['event' => 'pusher:pong'])); + $this->channelManager + ->connectionPonged($connection) + ->then(function () use ($connection) { + $connection->send(json_encode(['event' => 'pusher:pong'])); - ConnectionPonged::dispatch($connection->app->id, $connection->socketId); - }); + ConnectionPonged::dispatch($connection->app->id, $connection->socketId); + }); } /** diff --git a/src/Statistics/Collectors/MemoryCollector.php b/src/Statistics/Collectors/MemoryCollector.php index 34644de..2bb2630 100644 --- a/src/Statistics/Collectors/MemoryCollector.php +++ b/src/Statistics/Collectors/MemoryCollector.php @@ -98,11 +98,13 @@ class MemoryCollector implements StatisticsCollector $this->createRecord($statistic, $appId); - $this->channelManager->getGlobalConnectionsCount($appId)->then(function ($connections) use ($statistic) { - $statistic->reset( - is_null($connections) ? 0 : $connections - ); - }); + $this->channelManager + ->getGlobalConnectionsCount($appId) + ->then(function ($connections) use ($statistic) { + $statistic->reset( + is_null($connections) ? 0 : $connections + ); + }); } }); } diff --git a/src/Statistics/Collectors/RedisCollector.php b/src/Statistics/Collectors/RedisCollector.php index 4840a10..c37b940 100644 --- a/src/Statistics/Collectors/RedisCollector.php +++ b/src/Statistics/Collectors/RedisCollector.php @@ -84,24 +84,30 @@ class RedisCollector extends MemoryCollector ->hincrby( $this->channelManager->getRedisKey($appId, null, ['stats']), 'current_connections_count', 1 - )->then(function ($currentConnectionsCount) use ($appId) { + ) + ->then(function ($currentConnectionsCount) use ($appId) { // Get the peak connections count from Redis. - $this->channelManager->getPublishClient()->hget( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'peak_connections_count' - )->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) { - // Extract the greatest number between the current peak connection count - // and the current connection number. - $peakConnectionsCount = is_null($currentPeakConnectionCount) - ? $currentConnectionsCount - : max($currentPeakConnectionCount, $currentConnectionsCount); - - // Then set it to the database. - $this->channelManager->getPublishClient()->hset( + $this->channelManager + ->getPublishClient() + ->hget( $this->channelManager->getRedisKey($appId, null, ['stats']), - 'peak_connections_count', $peakConnectionsCount - ); - }); + 'peak_connections_count' + ) + ->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) { + // Extract the greatest number between the current peak connection count + // and the current connection number. + $peakConnectionsCount = is_null($currentPeakConnectionCount) + ? $currentConnectionsCount + : max($currentPeakConnectionCount, $currentConnectionsCount); + + // Then set it to the database. + $this->channelManager + ->getPublishClient() + ->hset( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'peak_connections_count', $peakConnectionsCount + ); + }); }); } @@ -129,10 +135,12 @@ class RedisCollector extends MemoryCollector : max($currentPeakConnectionCount, $currentConnectionsCount); // Then set it to the database. - $this->channelManager->getPublishClient()->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'peak_connections_count', $peakConnectionsCount - ); + $this->channelManager + ->getPublishClient() + ->hset( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'peak_connections_count', $peakConnectionsCount + ); }); }); } @@ -145,32 +153,35 @@ class RedisCollector extends MemoryCollector public function save() { $this->lock()->get(function () { - $this->channelManager->getPublishClient()->smembers(static::$redisSetName)->then(function ($members) { - foreach ($members as $appId) { - $this->channelManager - ->getPublishClient() - ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) - ->then(function ($list) use ($appId) { - if (! $list) { - return; - } + $this->channelManager + ->getPublishClient() + ->smembers(static::$redisSetName) + ->then(function ($members) { + foreach ($members as $appId) { + $this->channelManager + ->getPublishClient() + ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) + ->then(function ($list) use ($appId) { + if (! $list) { + return; + } - $statistic = $this->arrayToStatisticInstance( - $appId, Helpers::redisListToArray($list) - ); + $statistic = $this->arrayToStatisticInstance( + $appId, Helpers::redisListToArray($list) + ); - $this->createRecord($statistic, $appId); + $this->createRecord($statistic, $appId); - $this->channelManager - ->getGlobalConnectionsCount($appId) - ->then(function ($currentConnectionsCount) use ($appId) { - $currentConnectionsCount === 0 || is_null($currentConnectionsCount) - ? $this->resetAppTraces($appId) - : $this->resetStatistics($appId, $currentConnectionsCount); - }); - }); - } - }); + $this->channelManager + ->getGlobalConnectionsCount($appId) + ->then(function ($currentConnectionsCount) use ($appId) { + $currentConnectionsCount === 0 || is_null($currentConnectionsCount) + ? $this->resetAppTraces($appId) + : $this->resetStatistics($appId, $currentConnectionsCount); + }); + }); + } + }); }); } @@ -195,22 +206,25 @@ class RedisCollector extends MemoryCollector */ public function getStatistics(): PromiseInterface { - return $this->channelManager->getPublishClient()->smembers(static::$redisSetName)->then(function ($members) { - $appsWithStatistics = []; + return $this->channelManager + ->getPublishClient() + ->smembers(static::$redisSetName) + ->then(function ($members) { + $appsWithStatistics = []; - foreach ($members as $appId) { - $this->channelManager - ->getPublishClient() - ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) - ->then(function ($list) use ($appId, &$appsWithStatistics) { - $appsWithStatistics[$appId] = $this->arrayToStatisticInstance( - $appId, Helpers::redisListToArray($list) - ); - }); - } + foreach ($members as $appId) { + $this->channelManager + ->getPublishClient() + ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) + ->then(function ($list) use ($appId, &$appsWithStatistics) { + $appsWithStatistics[$appId] = $this->arrayToStatisticInstance( + $appId, Helpers::redisListToArray($list) + ); + }); + } - return $appsWithStatistics; - }); + return $appsWithStatistics; + }); } /** @@ -240,25 +254,33 @@ class RedisCollector extends MemoryCollector */ public function resetStatistics($appId, int $currentConnectionCount) { - $this->channelManager->getPublishClient()->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'current_connections_count', $currentConnectionCount - ); + $this->channelManager + ->getPublishClient() + ->hset( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'current_connections_count', $currentConnectionCount + ); - $this->channelManager->getPublishClient()->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'peak_connections_count', $currentConnectionCount - ); + $this->channelManager + ->getPublishClient() + ->hset( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'peak_connections_count', $currentConnectionCount + ); - $this->channelManager->getPublishClient()->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'websocket_messages_count', 0 - ); + $this->channelManager + ->getPublishClient() + ->hset( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'websocket_messages_count', 0 + ); - $this->channelManager->getPublishClient()->hset( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'api_messages_count', 0 - ); + $this->channelManager + ->getPublishClient() + ->hset( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'api_messages_count', 0 + ); } /** @@ -270,27 +292,37 @@ class RedisCollector extends MemoryCollector */ public function resetAppTraces($appId) { - $this->channelManager->getPublishClient()->hdel( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'current_connections_count' - ); + $this->channelManager + ->getPublishClient() + ->hdel( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'current_connections_count' + ); - $this->channelManager->getPublishClient()->hdel( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'peak_connections_count' - ); + $this->channelManager + ->getPublishClient() + ->hdel( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'peak_connections_count' + ); - $this->channelManager->getPublishClient()->hdel( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'websocket_messages_count' - ); + $this->channelManager + ->getPublishClient() + ->hdel( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'websocket_messages_count' + ); - $this->channelManager->getPublishClient()->hdel( - $this->channelManager->getRedisKey($appId, null, ['stats']), - 'api_messages_count' - ); + $this->channelManager + ->getPublishClient() + ->hdel( + $this->channelManager->getRedisKey($appId, null, ['stats']), + 'api_messages_count' + ); - $this->channelManager->getPublishClient()->srem(static::$redisSetName, $appId); + $this->channelManager + ->getPublishClient() + ->srem(static::$redisSetName, $appId); } /** @@ -301,7 +333,9 @@ class RedisCollector extends MemoryCollector */ protected function ensureAppIsInSet($appId) { - $this->channelManager->getPublishClient()->sadd(static::$redisSetName, $appId); + $this->channelManager + ->getPublishClient() + ->sadd(static::$redisSetName, $appId); return $this->channelManager->getPublishClient(); } diff --git a/tests/AsyncRedisQueueTest.php b/tests/AsyncRedisQueueTest.php index 11e0862..89db9cd 100644 --- a/tests/AsyncRedisQueueTest.php +++ b/tests/AsyncRedisQueueTest.php @@ -62,9 +62,11 @@ class AsyncRedisQueueTest extends TestCase $this->queue->later(-300, $jobs[2]); $this->queue->later(-100, $jobs[3]); - $this->getPublishClient()->zcard('queues:default:delayed')->then(function ($count) { - $this->assertEquals(4, $count); - }); + $this->getPublishClient() + ->zcard('queues:default:delayed') + ->then(function ($count) { + $this->assertEquals(4, $count); + }); $this->unregisterManagers(); @@ -85,7 +87,8 @@ class AsyncRedisQueueTest extends TestCase $this->unregisterManagers(); - $this->getPublishClient()->assertCalledCount(1, 'eval'); + $this->getPublishClient() + ->assertCalledCount(1, 'eval'); $redisJob = $this->queue->pop(); @@ -123,7 +126,8 @@ class AsyncRedisQueueTest extends TestCase $this->unregisterManagers(); - $this->getPublishClient()->assertCalledCount(1, 'eval'); + $this->getPublishClient() + ->assertCalledCount(1, 'eval'); $redisJob = $this->queue->pop(); @@ -148,7 +152,8 @@ class AsyncRedisQueueTest extends TestCase $this->queue->push($job1); $this->queue->push($job2); - $this->getPublishClient()->assertCalledCount(2, 'eval'); + $this->getPublishClient() + ->assertCalledCount(2, 'eval'); $this->unregisterManagers(); diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index df163d3..2e4f2ed 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -54,23 +54,31 @@ class ConnectionTest extends TestCase { $connection = $this->newActiveConnection(['public-channel']); - $this->channelManager->getGlobalChannels('1234')->then(function ($channels) { - $this->assertCount(1, $channels); - }); + $this->channelManager + ->getGlobalChannels('1234') + ->then(function ($channels) { + $this->assertCount(1, $channels); + }); - $this->channelManager->getGlobalConnectionsCount('1234')->then(function ($total) { - $this->assertEquals(1, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234') + ->then(function ($total) { + $this->assertEquals(1, $total); + }); $this->pusherServer->onClose($connection); - $this->channelManager->getGlobalConnectionsCount('1234')->then(function ($total) { - $this->assertEquals(0, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234') + ->then(function ($total) { + $this->assertEquals(0, $total); + }); - $this->channelManager->getGlobalChannels('1234')->then(function ($channels) { - $this->assertCount(0, $channels); - }); + $this->channelManager + ->getGlobalChannels('1234') + ->then(function ($channels) { + $this->assertCount(0, $channels); + }); } public function test_websocket_exceptions_are_sent() diff --git a/tests/LocalPongRemovalTest.php b/tests/LocalPongRemovalTest.php index a407464..fa643e4 100644 --- a/tests/LocalPongRemovalTest.php +++ b/tests/LocalPongRemovalTest.php @@ -20,21 +20,27 @@ class LocalPongRemovalTest extends TestCase $this->channelManager->updateConnectionInChannels($activeConnection); $this->channelManager->updateConnectionInChannels($obsoleteConnection); - $this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { - $this->assertEquals(2, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); $this->channelManager->removeObsoleteConnections(); - $this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { - $this->assertEquals(1, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); - $this->channelManager->getLocalConnections()->then(function ($connections) use ($activeConnection) { - $connection = $connections[$activeConnection->socketId]; + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) use ($activeConnection) { + $connection = $connections[$activeConnection->socketId]; - $this->assertEquals($activeConnection->socketId, $connection->socketId); - }); + $this->assertEquals($activeConnection->socketId, $connection->socketId); + }); } public function test_not_ponged_connections_do_get_removed_on_local_for_private_channels() @@ -51,21 +57,27 @@ class LocalPongRemovalTest extends TestCase $this->channelManager->updateConnectionInChannels($activeConnection); $this->channelManager->updateConnectionInChannels($obsoleteConnection); - $this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { - $this->assertEquals(2, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); $this->channelManager->removeObsoleteConnections(); - $this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { - $this->assertEquals(1, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); - $this->channelManager->getLocalConnections()->then(function ($connections) use ($activeConnection) { - $connection = $connections[$activeConnection->socketId]; + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) use ($activeConnection) { + $connection = $connections[$activeConnection->socketId]; - $this->assertEquals($activeConnection->socketId, $connection->socketId); - }); + $this->assertEquals($activeConnection->socketId, $connection->socketId); + }); } public function test_not_ponged_connections_do_get_removed_on_local_for_presence_channels() @@ -82,28 +94,38 @@ class LocalPongRemovalTest extends TestCase $this->channelManager->updateConnectionInChannels($activeConnection); $this->channelManager->updateConnectionInChannels($obsoleteConnection); - $this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { - $this->assertEquals(2, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); - $this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { - $this->assertCount(2, $members); - }); + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(2, $members); + }); $this->channelManager->removeObsoleteConnections(); - $this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { - $this->assertEquals(1, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); - $this->channelManager->getLocalConnections()->then(function ($connections) use ($activeConnection) { - $connection = $connections[$activeConnection->socketId]; + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) use ($activeConnection) { + $connection = $connections[$activeConnection->socketId]; - $this->assertEquals($activeConnection->socketId, $connection->socketId); - }); + $this->assertEquals($activeConnection->socketId, $connection->socketId); + }); - $this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { - $this->assertCount(1, $members); - }); + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(1, $members); + }); } } diff --git a/tests/PresenceChannelTest.php b/tests/PresenceChannelTest.php index d2298ac..d983c78 100644 --- a/tests/PresenceChannelTest.php +++ b/tests/PresenceChannelTest.php @@ -58,9 +58,11 @@ class PresenceChannelTest extends TestCase 'channel' => 'presence-channel', ]); - $this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { - $this->assertEquals(1, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($total) { + $this->assertEquals(1, $total); + }); } public function test_connect_to_presence_channel_when_user_with_same_ids_is_already_joined() @@ -110,13 +112,17 @@ class PresenceChannelTest extends TestCase ]), ]); - $this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { - $this->assertEquals(3, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($total) { + $this->assertEquals(3, $total); + }); - $this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { - $this->assertCount(2, $members); - }); + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(2, $members); + }); } public function test_presence_channel_broadcast_member_events() @@ -129,9 +135,11 @@ class PresenceChannelTest extends TestCase 'data' => json_encode(['user_id' => 2]), ]); - $this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { - $this->assertCount(2, $members); - }); + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(2, $members); + }); $this->pusherServer->onClose($morty); @@ -140,23 +148,29 @@ class PresenceChannelTest extends TestCase 'data' => json_encode(['user_id' => 2]), ]); - $this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { - $this->assertEquals(1, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($total) { + $this->assertEquals(1, $total); + }); - $this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) use ($rick) { - $this->assertCount(1, $members); - $this->assertEquals(1, $members[$rick->socketId]->user_id); - }); + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) use ($rick) { + $this->assertCount(1, $members); + $this->assertEquals(1, $members[$rick->socketId]->user_id); + }); } public function test_unsubscribe_from_presence_channel() { $connection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); - $this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { - $this->assertEquals(1, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($total) { + $this->assertEquals(1, $total); + }); $message = new Mocks\Message([ 'event' => 'pusher:unsubscribe', @@ -167,10 +181,12 @@ class PresenceChannelTest extends TestCase $this->pusherServer->onMessage($connection, $message); - $this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { - $this->assertEquals(0, $total); - }); -} + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($total) { + $this->assertEquals(0, $total); + }); + } public function test_can_whisper_to_private_channel() { @@ -213,18 +229,22 @@ class PresenceChannelTest extends TestCase $rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); $morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]); - $this->statisticsCollector->getStatistics()->then(function ($statistics) { - $this->assertCount(1, $statistics); - }); + $this->statisticsCollector + ->getStatistics() + ->then(function ($statistics) { + $this->assertCount(1, $statistics); + }); - $this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { - $this->assertEquals([ - 'peak_connections_count' => 2, - 'websocket_messages_count' => 2, - 'api_messages_count' => 0, - 'app_id' => '1234', - ], $statistic->toArray()); - }); + $this->statisticsCollector + ->getAppStatistics('1234') + ->then(function ($statistic) { + $this->assertEquals([ + 'peak_connections_count' => 2, + 'websocket_messages_count' => 2, + 'api_messages_count' => 0, + 'app_id' => '1234', + ], $statistic->toArray()); + }); } public function test_local_connections_for_presence_channels() @@ -232,15 +252,17 @@ class PresenceChannelTest extends TestCase $this->newPresenceConnection('presence-channel', ['user_id' => 1]); $this->newPresenceConnection('presence-channel-2', ['user_id' => 2]); - $this->channelManager->getLocalConnections()->then(function ($connections) { - $this->assertCount(2, $connections); + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); - foreach ($connections as $connection) { - $this->assertInstanceOf( - ConnectionInterface::class, $connection - ); - } - }); + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); } public function test_multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection() @@ -282,13 +304,17 @@ class PresenceChannelTest extends TestCase $this->assertCount(0, $sockets); }); - $this->channelManager->getMemberSockets('2', '1234', 'presence-channel')->then(function ($sockets) { - $this->assertCount(0, $sockets); - }); + $this->channelManager + ->getMemberSockets('2', '1234', 'presence-channel') + ->then(function ($sockets) { + $this->assertCount(0, $sockets); + }); - $this->channelManager->getMemberSockets('observer', '1234', 'presence-channel')->then(function ($sockets) { - $this->assertCount(1, $sockets); - }); + $this->channelManager + ->getMemberSockets('observer', '1234', 'presence-channel') + ->then(function ($sockets) { + $this->assertCount(1, $sockets); + }); } public function test_events_are_processed_by_on_message_on_presence_channels() @@ -374,10 +400,11 @@ class PresenceChannelTest extends TestCase $this->getSubscribeClient() ->assertNothingDispatched(); - $this->getPublishClient()->assertCalledWithArgs('publish', [ - $this->channelManager->getRedisKey('1234', 'presence-channel'), - $message->getPayload(), - ]); + $this->getPublishClient() + ->assertCalledWithArgs('publish', [ + $this->channelManager->getRedisKey('1234', 'presence-channel'), + $message->getPayload(), + ]); } public function test_it_fires_the_event_to_presence_channel() @@ -411,14 +438,16 @@ class PresenceChannelTest extends TestCase $this->assertSame([], json_decode($response->getContent(), true)); - $this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { - $this->assertEquals([ - 'peak_connections_count' => 1, - 'websocket_messages_count' => 1, - 'api_messages_count' => 1, - 'app_id' => '1234', - ], $statistic->toArray()); - }); + $this->statisticsCollector + ->getAppStatistics('1234') + ->then(function ($statistic) { + $this->assertEquals([ + 'peak_connections_count' => 1, + 'websocket_messages_count' => 1, + 'api_messages_count' => 1, + 'app_id' => '1234', + ], $statistic->toArray()); + }); } public function test_it_fires_event_across_servers_when_there_are_not_users_locally_for_presence_channel() @@ -451,17 +480,19 @@ class PresenceChannelTest extends TestCase $this->assertSame([], json_decode($response->getContent(), true)); if (method_exists($this->channelManager, 'getPublishClient')) { - $this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ - $this->channelManager->getRedisKey('1234', 'presence-channel'), - json_encode([ - 'event' => 'some-event', - 'channel' => 'presence-channel', - 'data' => json_encode(['some-data' => 'yes']), - 'appId' => '1234', - 'socketId' => null, - 'serverId' => $this->channelManager->getServerId(), - ]), - ]); + $this->channelManager + ->getPublishClient() + ->assertCalledWithArgsCount(1, 'publish', [ + $this->channelManager->getRedisKey('1234', 'presence-channel'), + json_encode([ + 'event' => 'some-event', + 'channel' => 'presence-channel', + 'data' => json_encode(['some-data' => 'yes']), + 'appId' => '1234', + 'socketId' => null, + 'serverId' => $this->channelManager->getServerId(), + ]), + ]); } } @@ -497,17 +528,19 @@ class PresenceChannelTest extends TestCase $this->assertSame([], json_decode($response->getContent(), true)); if (method_exists($this->channelManager, 'getPublishClient')) { - $this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ - $this->channelManager->getRedisKey('1234', 'presence-channel'), - json_encode([ - 'event' => 'some-event', - 'channel' => 'presence-channel', - 'data' => json_encode(['some-data' => 'yes']), - 'appId' => '1234', - 'socketId' => null, - 'serverId' => $this->channelManager->getServerId(), - ]), - ]); + $this->channelManager + ->getPublishClient() + ->assertCalledWithArgsCount(1, 'publish', [ + $this->channelManager->getRedisKey('1234', 'presence-channel'), + json_encode([ + 'event' => 'some-event', + 'channel' => 'presence-channel', + 'data' => json_encode(['some-data' => 'yes']), + 'appId' => '1234', + 'socketId' => null, + 'serverId' => $this->channelManager->getServerId(), + ]), + ]); } $wsConnection->assertSentEvent('some-event', [ diff --git a/tests/PrivateChannelTest.php b/tests/PrivateChannelTest.php index 14be78b..90efa6d 100644 --- a/tests/PrivateChannelTest.php +++ b/tests/PrivateChannelTest.php @@ -48,18 +48,22 @@ class PrivateChannelTest extends TestCase 'channel' => 'private-channel', ]); - $this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($total) { - $this->assertEquals(1, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($total) { + $this->assertEquals(1, $total); + }); } public function test_unsubscribe_from_private_channel() { $connection = $this->newPrivateConnection('private-channel'); - $this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($total) { - $this->assertEquals(1, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($total) { + $this->assertEquals(1, $total); + }); $message = new Mocks\Message([ 'event' => 'pusher:unsubscribe', @@ -70,9 +74,11 @@ class PrivateChannelTest extends TestCase $this->pusherServer->onMessage($connection, $message); - $this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($total) { - $this->assertEquals(0, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($total) { + $this->assertEquals(0, $total); + }); } public function test_can_whisper_to_private_channel() @@ -116,18 +122,22 @@ class PrivateChannelTest extends TestCase $rick = $this->newPrivateConnection('private-channel'); $morty = $this->newPrivateConnection('private-channel'); - $this->statisticsCollector->getStatistics()->then(function ($statistics) { - $this->assertCount(1, $statistics); - }); + $this->statisticsCollector + ->getStatistics() + ->then(function ($statistics) { + $this->assertCount(1, $statistics); + }); - $this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { - $this->assertEquals([ - 'peak_connections_count' => 2, - 'websocket_messages_count' => 2, - 'api_messages_count' => 0, - 'app_id' => '1234', - ], $statistic->toArray()); - }); + $this->statisticsCollector + ->getAppStatistics('1234') + ->then(function ($statistic) { + $this->assertEquals([ + 'peak_connections_count' => 2, + 'websocket_messages_count' => 2, + 'api_messages_count' => 0, + 'app_id' => '1234', + ], $statistic->toArray()); + }); } public function test_local_connections_for_private_channels() @@ -135,15 +145,17 @@ class PrivateChannelTest extends TestCase $this->newPrivateConnection('private-channel'); $this->newPrivateConnection('private-channel-2'); - $this->channelManager->getLocalConnections()->then(function ($connections) { - $this->assertCount(2, $connections); + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); - foreach ($connections as $connection) { - $this->assertInstanceOf( - ConnectionInterface::class, $connection - ); - } - }); + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); } public function test_events_are_processed_by_on_message_on_private_channels() @@ -208,10 +220,11 @@ class PrivateChannelTest extends TestCase $this->getSubscribeClient() ->assertNothingDispatched(); - $this->getPublishClient()->assertCalledWithArgs('publish', [ - $this->channelManager->getRedisKey('1234', 'private-channel'), - $message->getPayload(), - ]); + $this->getPublishClient() + ->assertCalledWithArgs('publish', [ + $this->channelManager->getRedisKey('1234', 'private-channel'), + $message->getPayload(), + ]); } public function test_it_fires_the_event_to_private_channel() @@ -245,14 +258,16 @@ class PrivateChannelTest extends TestCase $this->assertSame([], json_decode($response->getContent(), true)); - $this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { - $this->assertEquals([ - 'peak_connections_count' => 1, - 'websocket_messages_count' => 1, - 'api_messages_count' => 1, - 'app_id' => '1234', - ], $statistic->toArray()); - }); + $this->statisticsCollector + ->getAppStatistics('1234') + ->then(function ($statistic) { + $this->assertEquals([ + 'peak_connections_count' => 1, + 'websocket_messages_count' => 1, + 'api_messages_count' => 1, + 'app_id' => '1234', + ], $statistic->toArray()); + }); } public function test_it_fires_event_across_servers_when_there_are_not_users_locally_for_private_channel() @@ -285,17 +300,19 @@ class PrivateChannelTest extends TestCase $this->assertSame([], json_decode($response->getContent(), true)); if (method_exists($this->channelManager, 'getPublishClient')) { - $this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ - $this->channelManager->getRedisKey('1234', 'private-channel'), - json_encode([ - 'event' => 'some-event', - 'channel' => 'private-channel', - 'data' => json_encode(['some-data' => 'yes']), - 'appId' => '1234', - 'socketId' => null, - 'serverId' => $this->channelManager->getServerId(), - ]), - ]); + $this->channelManager + ->getPublishClient() + ->assertCalledWithArgsCount(1, 'publish', [ + $this->channelManager->getRedisKey('1234', 'private-channel'), + json_encode([ + 'event' => 'some-event', + 'channel' => 'private-channel', + 'data' => json_encode(['some-data' => 'yes']), + 'appId' => '1234', + 'socketId' => null, + 'serverId' => $this->channelManager->getServerId(), + ]), + ]); } } @@ -331,17 +348,19 @@ class PrivateChannelTest extends TestCase $this->assertSame([], json_decode($response->getContent(), true)); if (method_exists($this->channelManager, 'getPublishClient')) { - $this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ - $this->channelManager->getRedisKey('1234', 'private-channel'), - json_encode([ - 'event' => 'some-event', - 'channel' => 'private-channel', - 'data' => json_encode(['some-data' => 'yes']), - 'appId' => '1234', - 'socketId' => null, - 'serverId' => $this->channelManager->getServerId(), - ]), - ]); + $this->channelManager + ->getPublishClient() + ->assertCalledWithArgsCount(1, 'publish', [ + $this->channelManager->getRedisKey('1234', 'private-channel'), + json_encode([ + 'event' => 'some-event', + 'channel' => 'private-channel', + 'data' => json_encode(['some-data' => 'yes']), + 'appId' => '1234', + 'socketId' => null, + 'serverId' => $this->channelManager->getServerId(), + ]), + ]); } $wsConnection->assertSentEvent('some-event', [ diff --git a/tests/PublicChannelTest.php b/tests/PublicChannelTest.php index d3bd5a0..b16498d 100644 --- a/tests/PublicChannelTest.php +++ b/tests/PublicChannelTest.php @@ -14,9 +14,11 @@ class PublicChannelTest extends TestCase { $connection = $this->newActiveConnection(['public-channel']); - $this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($total) { - $this->assertEquals(1, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($total) { + $this->assertEquals(1, $total); + }); $connection->assertSentEvent( 'pusher:connection_established', @@ -38,9 +40,11 @@ class PublicChannelTest extends TestCase { $connection = $this->newActiveConnection(['public-channel']); - $this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($total) { - $this->assertEquals(1, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($total) { + $this->assertEquals(1, $total); + }); $message = new Mocks\Message([ 'event' => 'pusher:unsubscribe', @@ -51,9 +55,11 @@ class PublicChannelTest extends TestCase $this->pusherServer->onMessage($connection, $message); - $this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($total) { - $this->assertEquals(0, $total); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($total) { + $this->assertEquals(0, $total); + }); } public function test_can_whisper_to_public_channel() @@ -97,18 +103,22 @@ class PublicChannelTest extends TestCase $rick = $this->newActiveConnection(['public-channel']); $morty = $this->newActiveConnection(['public-channel']); - $this->statisticsCollector->getStatistics()->then(function ($statistics) { - $this->assertCount(1, $statistics); - }); + $this->statisticsCollector + ->getStatistics() + ->then(function ($statistics) { + $this->assertCount(1, $statistics); + }); - $this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { - $this->assertEquals([ - 'peak_connections_count' => 2, - 'websocket_messages_count' => 2, - 'api_messages_count' => 0, - 'app_id' => '1234', - ], $statistic->toArray()); - }); + $this->statisticsCollector + ->getAppStatistics('1234') + ->then(function ($statistic) { + $this->assertEquals([ + 'peak_connections_count' => 2, + 'websocket_messages_count' => 2, + 'api_messages_count' => 0, + 'app_id' => '1234', + ], $statistic->toArray()); + }); } public function test_local_connections_for_public_channels() @@ -116,15 +126,17 @@ class PublicChannelTest extends TestCase $this->newActiveConnection(['public-channel']); $this->newActiveConnection(['public-channel-2']); - $this->channelManager->getLocalConnections()->then(function ($connections) { - $this->assertCount(2, $connections); + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); - foreach ($connections as $connection) { - $this->assertInstanceOf( - ConnectionInterface::class, $connection - ); - } - }); + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); } public function test_events_are_processed_by_on_message_on_public_channels() @@ -189,10 +201,11 @@ class PublicChannelTest extends TestCase $this->getSubscribeClient() ->assertNothingDispatched(); - $this->getPublishClient()->assertCalledWithArgs('publish', [ - $this->channelManager->getRedisKey('1234', 'public-channel'), - $message->getPayload(), - ]); + $this->getPublishClient() + ->assertCalledWithArgs('publish', [ + $this->channelManager->getRedisKey('1234', 'public-channel'), + $message->getPayload(), + ]); } public function test_it_fires_the_event_to_public_channel() @@ -226,14 +239,16 @@ class PublicChannelTest extends TestCase $this->assertSame([], json_decode($response->getContent(), true)); - $this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { - $this->assertEquals([ - 'peak_connections_count' => 1, - 'websocket_messages_count' => 1, - 'api_messages_count' => 1, - 'app_id' => '1234', - ], $statistic->toArray()); - }); + $this->statisticsCollector + ->getAppStatistics('1234') + ->then(function ($statistic) { + $this->assertEquals([ + 'peak_connections_count' => 1, + 'websocket_messages_count' => 1, + 'api_messages_count' => 1, + 'app_id' => '1234', + ], $statistic->toArray()); + }); } public function test_it_fires_event_across_servers_when_there_are_not_users_locally_for_public_channel() @@ -266,17 +281,19 @@ class PublicChannelTest extends TestCase $this->assertSame([], json_decode($response->getContent(), true)); if (method_exists($this->channelManager, 'getPublishClient')) { - $this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ - $this->channelManager->getRedisKey('1234', 'public-channel'), - json_encode([ - 'event' => 'some-event', - 'channel' => 'public-channel', - 'data' => json_encode(['some-data' => 'yes']), - 'appId' => '1234', - 'socketId' => null, - 'serverId' => $this->channelManager->getServerId(), - ]), - ]); + $this->channelManager + ->getPublishClient() + ->assertCalledWithArgsCount(1, 'publish', [ + $this->channelManager->getRedisKey('1234', 'public-channel'), + json_encode([ + 'event' => 'some-event', + 'channel' => 'public-channel', + 'data' => json_encode(['some-data' => 'yes']), + 'appId' => '1234', + 'socketId' => null, + 'serverId' => $this->channelManager->getServerId(), + ]), + ]); } } @@ -312,17 +329,19 @@ class PublicChannelTest extends TestCase $this->assertSame([], json_decode($response->getContent(), true)); if (method_exists($this->channelManager, 'getPublishClient')) { - $this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ - $this->channelManager->getRedisKey('1234', 'public-channel'), - json_encode([ - 'event' => 'some-event', - 'channel' => 'public-channel', - 'data' => json_encode(['some-data' => 'yes']), - 'appId' => '1234', - 'socketId' => null, - 'serverId' => $this->channelManager->getServerId(), - ]), - ]); + $this->channelManager + ->getPublishClient() + ->assertCalledWithArgsCount(1, 'publish', [ + $this->channelManager->getRedisKey('1234', 'public-channel'), + json_encode([ + 'event' => 'some-event', + 'channel' => 'public-channel', + 'data' => json_encode(['some-data' => 'yes']), + 'appId' => '1234', + 'socketId' => null, + 'serverId' => $this->channelManager->getServerId(), + ]), + ]); } $wsConnection->assertSentEvent('some-event', [ diff --git a/tests/RedisPongRemovalTest.php b/tests/RedisPongRemovalTest.php index 146f904..14410fb 100644 --- a/tests/RedisPongRemovalTest.php +++ b/tests/RedisPongRemovalTest.php @@ -19,23 +19,31 @@ class RedisPongRemovalTest extends TestCase // Make the connection look like it was lost 1 day ago. $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); - $this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { - $this->assertEquals(2, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); - $this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { - $this->assertCount(1, $expiredConnections); - }); + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); $this->channelManager->removeObsoleteConnections(); - $this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { - $this->assertEquals(1, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); - $this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { - $this->assertCount(0, $expiredConnections); - }); + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); } public function test_not_ponged_connections_do_get_removed_on_redis_for_private_channels() @@ -51,23 +59,31 @@ class RedisPongRemovalTest extends TestCase // Make the connection look like it was lost 1 day ago. $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); - $this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { - $this->assertEquals(2, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); - $this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { - $this->assertCount(1, $expiredConnections); - }); + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); $this->channelManager->removeObsoleteConnections(); - $this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { - $this->assertEquals(1, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); - $this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { - $this->assertCount(0, $expiredConnections); - }); + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); } public function test_not_ponged_connections_do_get_removed_on_redis_for_presence_channels() @@ -83,30 +99,42 @@ class RedisPongRemovalTest extends TestCase // Make the connection look like it was lost 1 day ago. $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); - $this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { - $this->assertEquals(2, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($count) { + $this->assertEquals(2, $count); + }); - $this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { - $this->assertCount(1, $expiredConnections); - }); + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); - $this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { - $this->assertCount(2, $members); - }); + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(2, $members); + }); $this->channelManager->removeObsoleteConnections(); - $this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { - $this->assertEquals(1, $count); - }); + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($count) { + $this->assertEquals(1, $count); + }); - $this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { - $this->assertCount(0, $expiredConnections); - }); + $this->channelManager + ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); - $this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { - $this->assertCount(1, $members); - }); + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(1, $members); + }); } }