Merge branch '2.x' of github.com:beyondcode/laravel-websockets into fix/fix-stale-data

This commit is contained in:
Alex Renoki 2020-12-07 23:31:53 +02:00
commit 94722a7155
19 changed files with 822 additions and 599 deletions

View File

@ -176,7 +176,8 @@ abstract class Controller implements HttpServerInterface
$laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest)); $laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest));
$this->ensureValidAppId($laravelRequest->appId) $this
->ensureValidAppId($laravelRequest->appId)
->ensureValidSignature($laravelRequest); ->ensureValidSignature($laravelRequest);
// Invoke the controller action // Invoke the controller action

View File

@ -28,7 +28,9 @@ class FetchChannels extends Controller
} }
} }
return $this->channelManager->getGlobalChannels($request->appId)->then(function ($channels) use ($request, $attributes) { return $this->channelManager
->getGlobalChannels($request->appId)
->then(function ($channels) use ($request, $attributes) {
$channels = collect($channels)->keyBy(function ($channel) { $channels = collect($channels)->keyBy(function ($channel) {
return $channel instanceof Channel return $channel instanceof Channel
? $channel->getName() ? $channel->getName()

View File

@ -30,9 +30,11 @@ class ConfigAppManager implements AppManager
*/ */
public function all(): array public function all(): array
{ {
return $this->apps->map(function (array $appAttributes) { return $this->apps
->map(function (array $appAttributes) {
return $this->convertIntoApp($appAttributes); return $this->convertIntoApp($appAttributes);
})->toArray(); })
->toArray();
} }
/** /**
@ -104,7 +106,8 @@ class ConfigAppManager implements AppManager
$app->setPath($appAttributes['path']); $app->setPath($appAttributes['path']);
} }
$app->enableClientMessages($appAttributes['enable_client_messages']) $app
->enableClientMessages($appAttributes['enable_client_messages'])
->enableStatistics($appAttributes['enable_statistics']) ->enableStatistics($appAttributes['enable_statistics'])
->setCapacity($appAttributes['capacity'] ?? null) ->setCapacity($appAttributes['capacity'] ?? null)
->setAllowedOrigins($appAttributes['allowed_origins'] ?? []); ->setAllowedOrigins($appAttributes['allowed_origins'] ?? []);

View File

@ -111,12 +111,16 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getLocalConnections(): PromiseInterface public function getLocalConnections(): PromiseInterface
{ {
$connections = collect($this->channels)->map(function ($channelsWithConnections, $appId) { $connections = collect($this->channels)
->map(function ($channelsWithConnections, $appId) {
return collect($channelsWithConnections)->values(); return collect($channelsWithConnections)->values();
})->values()->collapse() })
->values()->collapse()
->map(function ($channel) { ->map(function ($channel) {
return collect($channel->getConnections()); return collect($channel->getConnections());
})->values()->collapse()->toArray(); })
->values()->collapse()
->toArray();
return Helpers::createFulfilledPromise($connections); return Helpers::createFulfilledPromise($connections);
} }
@ -159,15 +163,19 @@ class LocalChannelManager implements ChannelManager
return Helpers::createFulfilledPromise(false); return Helpers::createFulfilledPromise(false);
} }
$this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { $this->getLocalChannels($connection->app->id)
->then(function ($channels) use ($connection) {
collect($channels)->each->unsubscribe($connection); collect($channels)->each->unsubscribe($connection);
collect($channels)->reject->hasConnections()->each(function (Channel $channel, string $channelName) use ($connection) { collect($channels)
->reject->hasConnections()
->each(function (Channel $channel, string $channelName) use ($connection) {
unset($this->channels[$connection->app->id][$channelName]); unset($this->channels[$connection->app->id][$channelName]);
}); });
}); });
$this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { $this->getLocalChannels($connection->app->id)
->then(function ($channels) use ($connection) {
if (count($channels) === 0) { if (count($channels) === 0) {
unset($this->channels[$connection->app->id]); unset($this->channels[$connection->app->id]);
} }
@ -244,14 +252,17 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface
{ {
return $this->getLocalChannels($appId)->then(function ($channels) use ($channelName) { return $this->getLocalChannels($appId)
->then(function ($channels) use ($channelName) {
return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) { return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) {
return $collection->filter(function (Channel $channel) use ($channelName) { return $collection->filter(function (Channel $channel) use ($channelName) {
return $channel->getName() === $channelName; return $channel->getName() === $channelName;
}); });
})->flatMap(function (Channel $channel) { })
->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId'); return collect($channel->getConnections())->pluck('socketId');
})->unique()->count(); })
->unique()->count();
}); });
} }
@ -370,7 +381,8 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface
{ {
$results = collect($channelNames)->reduce(function ($results, $channel) use ($appId) { $results = collect($channelNames)
->reduce(function ($results, $channel) use ($appId) {
$results[$channel] = isset($this->users["{$appId}:{$channel}"]) $results[$channel] = isset($this->users["{$appId}:{$channel}"])
? count($this->users["{$appId}:{$channel}"]) ? count($this->users["{$appId}:{$channel}"])
: 0; : 0;
@ -443,7 +455,8 @@ class LocalChannelManager implements ChannelManager
*/ */
public function updateConnectionInChannels($connection): PromiseInterface public function updateConnectionInChannels($connection): PromiseInterface
{ {
return $this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { return $this->getLocalChannels($connection->app->id)
->then(function ($channels) use ($connection) {
foreach ($channels as $channel) { foreach ($channels as $channel) {
if ($channel->hasConnection($connection)) { if ($channel->hasConnection($connection)) {
$channel->saveConnection($connection); $channel->saveConnection($connection);

View File

@ -137,11 +137,13 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
{ {
return $this->getGlobalChannels($connection->app->id)->then(function ($channels) use ($connection) { return $this->getGlobalChannels($connection->app->id)
->then(function ($channels) use ($connection) {
foreach ($channels as $channel) { foreach ($channels as $channel) {
$this->unsubscribeFromChannel($connection, $channel, new stdClass); $this->unsubscribeFromChannel($connection, $channel, new stdClass);
} }
})->then(function () use ($connection) { })
->then(function () use ($connection) {
return parent::unsubscribeFromAllChannels($connection); return parent::unsubscribeFromAllChannels($connection);
}); });
} }
@ -156,13 +158,17 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{ {
return $this->subscribeToTopic($connection->app->id, $channelName)->then(function () use ($connection) { return $this->subscribeToTopic($connection->app->id, $channelName)
->then(function () use ($connection) {
return $this->addConnectionToSet($connection, Carbon::now()); return $this->addConnectionToSet($connection, Carbon::now());
})->then(function () use ($connection, $channelName) { })
->then(function () use ($connection, $channelName) {
return $this->addChannelToSet($connection->app->id, $channelName); return $this->addChannelToSet($connection->app->id, $channelName);
})->then(function () use ($connection, $channelName) { })
->then(function () use ($connection, $channelName) {
return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1); return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1);
})->then(function () use ($connection, $channelName, $payload) { })
->then(function () use ($connection, $channelName, $payload) {
return parent::subscribeToChannel($connection, $channelName, $payload); return parent::subscribeToChannel($connection, $channelName, $payload);
}); });
} }
@ -193,11 +199,14 @@ class RedisChannelManager extends LocalChannelManager
$this->unsubscribeFromTopic($connection->app->id, $channelName); $this->unsubscribeFromTopic($connection->app->id, $channelName);
} }
}); });
})->then(function () use ($connection, $channelName) { })
->then(function () use ($connection, $channelName) {
return $this->removeChannelFromSet($connection->app->id, $channelName); return $this->removeChannelFromSet($connection->app->id, $channelName);
})->then(function () use ($connection) { })
->then(function () use ($connection) {
return $this->removeConnectionFromSet($connection); return $this->removeConnectionFromSet($connection);
})->then(function () use ($connection, $channelName, $payload) { })
->then(function () use ($connection, $channelName, $payload) {
return parent::unsubscribeFromChannel($connection, $channelName, $payload); return parent::unsubscribeFromChannel($connection, $channelName, $payload);
}); });
} }
@ -211,7 +220,8 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function subscribeToApp($appId): PromiseInterface public function subscribeToApp($appId): PromiseInterface
{ {
return $this->subscribeToTopic($appId)->then(function () use ($appId) { return $this->subscribeToTopic($appId)
->then(function () use ($appId) {
return $this->incrementSubscriptionsCount($appId); return $this->incrementSubscriptionsCount($appId);
}); });
} }
@ -225,7 +235,8 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function unsubscribeFromApp($appId): PromiseInterface public function unsubscribeFromApp($appId): PromiseInterface
{ {
return $this->unsubscribeFromTopic($appId)->then(function () use ($appId) { return $this->unsubscribeFromTopic($appId)
->then(function () use ($appId) {
return $this->decrementSubscriptionsCount($appId); return $this->decrementSubscriptionsCount($appId);
}); });
} }
@ -297,7 +308,8 @@ class RedisChannelManager extends LocalChannelManager
return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user)) return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user))
->then(function () use ($connection, $channel, $user) { ->then(function () use ($connection, $channel, $user) {
return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId); return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId);
})->then(function () use ($connection, $user, $channel, $payload) { })
->then(function () use ($connection, $user, $channel, $payload) {
return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload); return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload);
}); });
} }
@ -316,7 +328,8 @@ class RedisChannelManager extends LocalChannelManager
return $this->removeUserData($connection->app->id, $channel, $connection->socketId) return $this->removeUserData($connection->app->id, $channel, $connection->socketId)
->then(function () use ($connection, $channel, $user) { ->then(function () use ($connection, $channel, $user) {
return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId); return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId);
})->then(function () use ($connection, $user, $channel) { })
->then(function () use ($connection, $user, $channel) {
return parent::userLeftPresenceChannel($connection, $user, $channel); return parent::userLeftPresenceChannel($connection, $user, $channel);
}); });
} }
@ -370,7 +383,8 @@ class RedisChannelManager extends LocalChannelManager
); );
} }
return $this->publishClient->exec()->then(function ($data) use ($channelNames) { return $this->publishClient->exec()
->then(function ($data) use ($channelNames) {
return array_combine($channelNames, $data); return array_combine($channelNames, $data);
}); });
} }
@ -399,7 +413,8 @@ class RedisChannelManager extends LocalChannelManager
public function connectionPonged(ConnectionInterface $connection): PromiseInterface public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{ {
// This will update the score with the current timestamp. // This will update the score with the current timestamp.
return $this->addConnectionToSet($connection, Carbon::now())->then(function () use ($connection) { return $this->addConnectionToSet($connection, Carbon::now())
->then(function () use ($connection) {
return parent::connectionPonged($connection); return parent::connectionPonged($connection);
}); });
} }
@ -412,7 +427,8 @@ class RedisChannelManager extends LocalChannelManager
public function removeObsoleteConnections(): PromiseInterface public function removeObsoleteConnections(): PromiseInterface
{ {
$this->lock()->get(function () { $this->lock()->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))->then(function ($connections) { $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) {
foreach ($connections as $socketId => $appId) { foreach ($connections as $socketId => $appId) {
$connection = $this->fakeConnectionForApp($appId, $socketId); $connection = $this->fakeConnectionForApp($appId, $socketId);

View File

@ -155,7 +155,8 @@ class Channel
*/ */
public function broadcast($appId, stdClass $payload, bool $replicate = true): bool public function broadcast($appId, stdClass $payload, bool $replicate = true): bool
{ {
collect($this->getConnections())->each->send(json_encode($payload)); collect($this->getConnections())
->each->send(json_encode($payload));
if ($replicate) { if ($replicate) {
$this->channelManager->broadcastAcrossServers($appId, null, $this->getName(), $payload); $this->channelManager->broadcastAcrossServers($appId, null, $this->getName(), $payload);

View File

@ -54,7 +54,8 @@ class PresenceChannel extends PrivateChannel
]), ]),
])); ]));
}); });
})->then(function () use ($connection, $user, $payload) { })
->then(function () use ($connection, $user, $payload) {
// The `pusher_internal:member_added` event is triggered when a user joins a channel. // The `pusher_internal:member_added` event is triggered when a user joins a channel.
// It's quite possible that a user can have multiple connections to the same channel // It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open) // (for example by having multiple browser tabs open)
@ -103,9 +104,12 @@ class PresenceChannel extends PrivateChannel
{ {
$truth = parent::unsubscribe($connection); $truth = parent::unsubscribe($connection);
$this->channelManager->getChannelMember($connection, $this->getName())->then(function ($user) { $this->channelManager
->getChannelMember($connection, $this->getName())
->then(function ($user) {
return @json_decode($user); return @json_decode($user);
})->then(function ($user) use ($connection) { })
->then(function ($user) use ($connection) {
if (! $user) { if (! $user) {
return; return;
} }

View File

@ -304,11 +304,13 @@ class StartServer extends Command
// Get all local connections and close them. They will // Get all local connections and close them. They will
// be automatically be unsubscribed from all channels. // be automatically be unsubscribed from all channels.
$channelManager->getLocalConnections()->then(function ($connections) { $channelManager->getLocalConnections()
->then(function ($connections) {
foreach ($connections as $connection) { foreach ($connections as $connection) {
$connection->close(); $connection->close();
} }
})->then(function () { })
->then(function () {
$this->loop->stop(); $this->loop->stop();
}); });
} }

View File

@ -32,7 +32,9 @@ class PusherChannelProtocolMessage extends PusherClientMessage
*/ */
protected function ping(ConnectionInterface $connection) protected function ping(ConnectionInterface $connection)
{ {
$this->channelManager->connectionPonged($connection)->then(function () use ($connection) { $this->channelManager
->connectionPonged($connection)
->then(function () use ($connection) {
$connection->send(json_encode(['event' => 'pusher:pong'])); $connection->send(json_encode(['event' => 'pusher:pong']));
ConnectionPonged::dispatch($connection->app->id, $connection->socketId); ConnectionPonged::dispatch($connection->app->id, $connection->socketId);

View File

@ -43,7 +43,8 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function webSocketMessage($appId) public function webSocketMessage($appId)
{ {
$this->findOrMake($appId)->webSocketMessage(); $this->findOrMake($appId)
->webSocketMessage();
} }
/** /**
@ -54,7 +55,8 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function apiMessage($appId) public function apiMessage($appId)
{ {
$this->findOrMake($appId)->apiMessage(); $this->findOrMake($appId)
->apiMessage();
} }
/** /**
@ -65,7 +67,8 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function connection($appId) public function connection($appId)
{ {
$this->findOrMake($appId)->connection(); $this->findOrMake($appId)
->connection();
} }
/** /**
@ -76,7 +79,8 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function disconnection($appId) public function disconnection($appId)
{ {
$this->findOrMake($appId)->disconnection(); $this->findOrMake($appId)
->disconnection();
} }
/** /**
@ -100,7 +104,9 @@ class MemoryCollector implements StatisticsCollector
$this->createRecord($statistic, $appId); $this->createRecord($statistic, $appId);
$this->channelManager->getGlobalConnectionsCount($appId)->then(function ($connections) use ($statistic) { $this->channelManager
->getGlobalConnectionsCount($appId)
->then(function ($connections) use ($statistic) {
$statistic->reset( $statistic->reset(
is_null($connections) ? 0 : $connections is_null($connections) ? 0 : $connections
); );

View File

@ -55,9 +55,8 @@ class RedisCollector extends MemoryCollector
*/ */
public function webSocketMessage($appId) public function webSocketMessage($appId)
{ {
$this->ensureAppIsInSet($appId)->hincrby( $this->ensureAppIsInSet($appId)
$this->channelManager->getRedisKey($appId, null, ['stats']), 'websocket_messages_count', 1 ->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'websocket_messages_count', 1);
);
} }
/** /**
@ -68,9 +67,8 @@ class RedisCollector extends MemoryCollector
*/ */
public function apiMessage($appId) public function apiMessage($appId)
{ {
$this->ensureAppIsInSet($appId)->hincrby( $this->ensureAppIsInSet($appId)
$this->channelManager->getRedisKey($appId, null, ['stats']), 'api_messages_count', 1 ->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'api_messages_count', 1);
);
} }
/** /**
@ -86,12 +84,16 @@ class RedisCollector extends MemoryCollector
->hincrby( ->hincrby(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'current_connections_count', 1 'current_connections_count', 1
)->then(function ($currentConnectionsCount) use ($appId) { )
->then(function ($currentConnectionsCount) use ($appId) {
// Get the peak connections count from Redis. // Get the peak connections count from Redis.
$this->channelManager->getPublishClient()->hget( $this->channelManager
->getPublishClient()
->hget(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count' 'peak_connections_count'
)->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) { )
->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) {
// Extract the greatest number between the current peak connection count // Extract the greatest number between the current peak connection count
// and the current connection number. // and the current connection number.
$peakConnectionsCount = is_null($currentPeakConnectionCount) $peakConnectionsCount = is_null($currentPeakConnectionCount)
@ -99,7 +101,9 @@ class RedisCollector extends MemoryCollector
: max($currentPeakConnectionCount, $currentConnectionsCount); : max($currentPeakConnectionCount, $currentConnectionsCount);
// Then set it to the database. // Then set it to the database.
$this->channelManager->getPublishClient()->hset( $this->channelManager
->getPublishClient()
->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count', $peakConnectionsCount 'peak_connections_count', $peakConnectionsCount
); );
@ -131,7 +135,9 @@ class RedisCollector extends MemoryCollector
: max($currentPeakConnectionCount, $currentConnectionsCount); : max($currentPeakConnectionCount, $currentConnectionsCount);
// Then set it to the database. // Then set it to the database.
$this->channelManager->getPublishClient()->hset( $this->channelManager
->getPublishClient()
->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count', $peakConnectionsCount 'peak_connections_count', $peakConnectionsCount
); );
@ -147,7 +153,10 @@ class RedisCollector extends MemoryCollector
public function save() public function save()
{ {
$this->lock()->get(function () { $this->lock()->get(function () {
$this->channelManager->getPublishClient()->smembers(static::$redisSetName)->then(function ($members) { $this->channelManager
->getPublishClient()
->smembers(static::$redisSetName)
->then(function ($members) {
foreach ($members as $appId) { foreach ($members as $appId) {
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
@ -201,7 +210,10 @@ class RedisCollector extends MemoryCollector
*/ */
public function getStatistics(): PromiseInterface public function getStatistics(): PromiseInterface
{ {
return $this->channelManager->getPublishClient()->smembers(static::$redisSetName)->then(function ($members) { return $this->channelManager
->getPublishClient()
->smembers(static::$redisSetName)
->then(function ($members) {
$appsWithStatistics = []; $appsWithStatistics = [];
foreach ($members as $appId) { foreach ($members as $appId) {
@ -246,22 +258,30 @@ class RedisCollector extends MemoryCollector
*/ */
public function resetStatistics($appId, int $currentConnectionCount) public function resetStatistics($appId, int $currentConnectionCount)
{ {
$this->channelManager->getPublishClient()->hset( $this->channelManager
->getPublishClient()
->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'current_connections_count', $currentConnectionCount 'current_connections_count', $currentConnectionCount
); );
$this->channelManager->getPublishClient()->hset( $this->channelManager
->getPublishClient()
->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count', max(0, $currentConnectionCount) 'peak_connections_count', max(0, $currentConnectionCount)
); );
$this->channelManager->getPublishClient()->hset( $this->channelManager
->getPublishClient()
->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'websocket_messages_count', 0 'websocket_messages_count', 0
); );
$this->channelManager->getPublishClient()->hset( $this->channelManager
->getPublishClient()
->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'api_messages_count', 0 'api_messages_count', 0
); );
@ -278,27 +298,37 @@ class RedisCollector extends MemoryCollector
{ {
parent::resetAppTraces($appId); parent::resetAppTraces($appId);
$this->channelManager->getPublishClient()->hdel( $this->channelManager
->getPublishClient()
->hdel(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'current_connections_count' 'current_connections_count'
); );
$this->channelManager->getPublishClient()->hdel( $this->channelManager
->getPublishClient()
->hdel(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count' 'peak_connections_count'
); );
$this->channelManager->getPublishClient()->hdel( $this->channelManager
->getPublishClient()
->hdel(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'websocket_messages_count' 'websocket_messages_count'
); );
$this->channelManager->getPublishClient()->hdel( $this->channelManager
->getPublishClient()
->hdel(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'api_messages_count' 'api_messages_count'
); );
$this->channelManager->getPublishClient()->srem(static::$redisSetName, $appId); $this->channelManager
->getPublishClient()
->srem(static::$redisSetName, $appId);
} }
/** /**
@ -309,7 +339,9 @@ class RedisCollector extends MemoryCollector
*/ */
protected function ensureAppIsInSet($appId) protected function ensureAppIsInSet($appId)
{ {
$this->channelManager->getPublishClient()->sadd(static::$redisSetName, $appId); $this->channelManager
->getPublishClient()
->sadd(static::$redisSetName, $appId);
return $this->channelManager->getPublishClient(); return $this->channelManager->getPublishClient();
} }

View File

@ -42,7 +42,8 @@ class DatabaseStore implements StatisticsStore
return static::$model::where('created_at', '<', $moment->toDateTimeString()) return static::$model::where('created_at', '<', $moment->toDateTimeString())
->when(! is_null($appId), function ($query) use ($appId) { ->when(! is_null($appId), function ($query) use ($appId) {
return $query->whereAppId($appId); return $query->whereAppId($appId);
})->delete(); })
->delete();
} }
/** /**
@ -53,7 +54,8 @@ class DatabaseStore implements StatisticsStore
*/ */
public function getRawRecords(callable $processQuery = null) public function getRawRecords(callable $processQuery = null)
{ {
return static::$model::query()->when(! is_null($processQuery), function ($query) use ($processQuery) { return static::$model::query()
->when(! is_null($processQuery), function ($query) use ($processQuery) {
return call_user_func($processQuery, $query); return call_user_func($processQuery, $query);
}, function ($query) { }, function ($query) {
return $query->latest()->limit(120); return $query->latest()->limit(120);
@ -72,9 +74,11 @@ class DatabaseStore implements StatisticsStore
return $this->getRawRecords($processQuery) return $this->getRawRecords($processQuery)
->when(! is_null($processCollection), function ($collection) use ($processCollection) { ->when(! is_null($processCollection), function ($collection) use ($processCollection) {
return call_user_func($processCollection, $collection); return call_user_func($processCollection, $collection);
})->map(function (Model $statistic) { })
->map(function (Model $statistic) {
return $this->statisticToArray($statistic); return $this->statisticToArray($statistic);
})->toArray(); })
->toArray();
} }
/** /**

View File

@ -62,7 +62,9 @@ class AsyncRedisQueueTest extends TestCase
$this->queue->later(-300, $jobs[2]); $this->queue->later(-300, $jobs[2]);
$this->queue->later(-100, $jobs[3]); $this->queue->later(-100, $jobs[3]);
$this->getPublishClient()->zcard('queues:default:delayed')->then(function ($count) { $this->getPublishClient()
->zcard('queues:default:delayed')
->then(function ($count) {
$this->assertEquals(4, $count); $this->assertEquals(4, $count);
}); });
@ -85,7 +87,8 @@ class AsyncRedisQueueTest extends TestCase
$this->unregisterManagers(); $this->unregisterManagers();
$this->getPublishClient()->assertCalledCount(1, 'eval'); $this->getPublishClient()
->assertCalledCount(1, 'eval');
$redisJob = $this->queue->pop(); $redisJob = $this->queue->pop();
@ -123,7 +126,8 @@ class AsyncRedisQueueTest extends TestCase
$this->unregisterManagers(); $this->unregisterManagers();
$this->getPublishClient()->assertCalledCount(1, 'eval'); $this->getPublishClient()
->assertCalledCount(1, 'eval');
$redisJob = $this->queue->pop(); $redisJob = $this->queue->pop();
@ -148,7 +152,8 @@ class AsyncRedisQueueTest extends TestCase
$this->queue->push($job1); $this->queue->push($job1);
$this->queue->push($job2); $this->queue->push($job2);
$this->getPublishClient()->assertCalledCount(2, 'eval'); $this->getPublishClient()
->assertCalledCount(2, 'eval');
$this->unregisterManagers(); $this->unregisterManagers();

View File

@ -54,21 +54,29 @@ class ConnectionTest extends TestCase
{ {
$connection = $this->newActiveConnection(['public-channel']); $connection = $this->newActiveConnection(['public-channel']);
$this->channelManager->getGlobalChannels('1234')->then(function ($channels) { $this->channelManager
->getGlobalChannels('1234')
->then(function ($channels) {
$this->assertCount(1, $channels); $this->assertCount(1, $channels);
}); });
$this->channelManager->getGlobalConnectionsCount('1234')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234')
->then(function ($total) {
$this->assertEquals(1, $total); $this->assertEquals(1, $total);
}); });
$this->pusherServer->onClose($connection); $this->pusherServer->onClose($connection);
$this->channelManager->getGlobalConnectionsCount('1234')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234')
->then(function ($total) {
$this->assertEquals(0, $total); $this->assertEquals(0, $total);
}); });
$this->channelManager->getGlobalChannels('1234')->then(function ($channels) { $this->channelManager
->getGlobalChannels('1234')
->then(function ($channels) {
$this->assertCount(0, $channels); $this->assertCount(0, $channels);
}); });
} }

View File

@ -20,17 +20,23 @@ class LocalPongRemovalTest extends TestCase
$this->channelManager->updateConnectionInChannels($activeConnection); $this->channelManager->updateConnectionInChannels($activeConnection);
$this->channelManager->updateConnectionInChannels($obsoleteConnection); $this->channelManager->updateConnectionInChannels($obsoleteConnection);
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'public-channel')
->then(function ($count) {
$this->assertEquals(2, $count); $this->assertEquals(2, $count);
}); });
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'public-channel')
->then(function ($count) {
$this->assertEquals(1, $count); $this->assertEquals(1, $count);
}); });
$this->channelManager->getLocalConnections()->then(function ($connections) use ($activeConnection) { $this->channelManager
->getLocalConnections()
->then(function ($connections) use ($activeConnection) {
$connection = $connections[$activeConnection->socketId]; $connection = $connections[$activeConnection->socketId];
$this->assertEquals($activeConnection->socketId, $connection->socketId); $this->assertEquals($activeConnection->socketId, $connection->socketId);
@ -51,17 +57,23 @@ class LocalPongRemovalTest extends TestCase
$this->channelManager->updateConnectionInChannels($activeConnection); $this->channelManager->updateConnectionInChannels($activeConnection);
$this->channelManager->updateConnectionInChannels($obsoleteConnection); $this->channelManager->updateConnectionInChannels($obsoleteConnection);
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'private-channel')
->then(function ($count) {
$this->assertEquals(2, $count); $this->assertEquals(2, $count);
}); });
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'private-channel')
->then(function ($count) {
$this->assertEquals(1, $count); $this->assertEquals(1, $count);
}); });
$this->channelManager->getLocalConnections()->then(function ($connections) use ($activeConnection) { $this->channelManager
->getLocalConnections()
->then(function ($connections) use ($activeConnection) {
$connection = $connections[$activeConnection->socketId]; $connection = $connections[$activeConnection->socketId];
$this->assertEquals($activeConnection->socketId, $connection->socketId); $this->assertEquals($activeConnection->socketId, $connection->socketId);
@ -82,27 +94,37 @@ class LocalPongRemovalTest extends TestCase
$this->channelManager->updateConnectionInChannels($activeConnection); $this->channelManager->updateConnectionInChannels($activeConnection);
$this->channelManager->updateConnectionInChannels($obsoleteConnection); $this->channelManager->updateConnectionInChannels($obsoleteConnection);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($count) {
$this->assertEquals(2, $count); $this->assertEquals(2, $count);
}); });
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(2, $members); $this->assertCount(2, $members);
}); });
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($count) {
$this->assertEquals(1, $count); $this->assertEquals(1, $count);
}); });
$this->channelManager->getLocalConnections()->then(function ($connections) use ($activeConnection) { $this->channelManager
->getLocalConnections()
->then(function ($connections) use ($activeConnection) {
$connection = $connections[$activeConnection->socketId]; $connection = $connections[$activeConnection->socketId];
$this->assertEquals($activeConnection->socketId, $connection->socketId); $this->assertEquals($activeConnection->socketId, $connection->socketId);
}); });
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(1, $members); $this->assertCount(1, $members);
}); });
} }

View File

@ -58,7 +58,9 @@ class PresenceChannelTest extends TestCase
'channel' => 'presence-channel', 'channel' => 'presence-channel',
]); ]);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($total) {
$this->assertEquals(1, $total); $this->assertEquals(1, $total);
}); });
} }
@ -110,11 +112,15 @@ class PresenceChannelTest extends TestCase
]), ]),
]); ]);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($total) {
$this->assertEquals(3, $total); $this->assertEquals(3, $total);
}); });
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(2, $members); $this->assertCount(2, $members);
}); });
} }
@ -129,7 +135,9 @@ class PresenceChannelTest extends TestCase
'data' => json_encode(['user_id' => 2]), 'data' => json_encode(['user_id' => 2]),
]); ]);
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(2, $members); $this->assertCount(2, $members);
}); });
@ -140,11 +148,15 @@ class PresenceChannelTest extends TestCase
'data' => json_encode(['user_id' => 2]), 'data' => json_encode(['user_id' => 2]),
]); ]);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($total) {
$this->assertEquals(1, $total); $this->assertEquals(1, $total);
}); });
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) use ($rick) { $this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) use ($rick) {
$this->assertCount(1, $members); $this->assertCount(1, $members);
$this->assertEquals(1, $members[$rick->socketId]->user_id); $this->assertEquals(1, $members[$rick->socketId]->user_id);
}); });
@ -154,7 +166,9 @@ class PresenceChannelTest extends TestCase
{ {
$connection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); $connection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($total) {
$this->assertEquals(1, $total); $this->assertEquals(1, $total);
}); });
@ -167,7 +181,9 @@ class PresenceChannelTest extends TestCase
$this->pusherServer->onMessage($connection, $message); $this->pusherServer->onMessage($connection, $message);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($total) {
$this->assertEquals(0, $total); $this->assertEquals(0, $total);
}); });
} }
@ -213,11 +229,15 @@ class PresenceChannelTest extends TestCase
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); $rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]); $morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
$this->statisticsCollector->getStatistics()->then(function ($statistics) { $this->statisticsCollector
->getStatistics()
->then(function ($statistics) {
$this->assertCount(1, $statistics); $this->assertCount(1, $statistics);
}); });
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
->getAppStatistics('1234')
->then(function ($statistic) {
$this->assertEquals([ $this->assertEquals([
'peak_connections_count' => 2, 'peak_connections_count' => 2,
'websocket_messages_count' => 2, 'websocket_messages_count' => 2,
@ -232,7 +252,9 @@ class PresenceChannelTest extends TestCase
$this->newPresenceConnection('presence-channel', ['user_id' => 1]); $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$this->newPresenceConnection('presence-channel-2', ['user_id' => 2]); $this->newPresenceConnection('presence-channel-2', ['user_id' => 2]);
$this->channelManager->getLocalConnections()->then(function ($connections) { $this->channelManager
->getLocalConnections()
->then(function ($connections) {
$this->assertCount(2, $connections); $this->assertCount(2, $connections);
foreach ($connections as $connection) { foreach ($connections as $connection) {
@ -282,11 +304,15 @@ class PresenceChannelTest extends TestCase
$this->assertCount(0, $sockets); $this->assertCount(0, $sockets);
}); });
$this->channelManager->getMemberSockets('2', '1234', 'presence-channel')->then(function ($sockets) { $this->channelManager
->getMemberSockets('2', '1234', 'presence-channel')
->then(function ($sockets) {
$this->assertCount(0, $sockets); $this->assertCount(0, $sockets);
}); });
$this->channelManager->getMemberSockets('observer', '1234', 'presence-channel')->then(function ($sockets) { $this->channelManager
->getMemberSockets('observer', '1234', 'presence-channel')
->then(function ($sockets) {
$this->assertCount(1, $sockets); $this->assertCount(1, $sockets);
}); });
} }
@ -371,9 +397,11 @@ class PresenceChannelTest extends TestCase
$receiver->assertSentEvent('some-event', $message->getPayloadAsArray()); $receiver->assertSentEvent('some-event', $message->getPayloadAsArray());
$this->getSubscribeClient()->assertNothingDispatched(); $this->getSubscribeClient()
->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()
->assertCalledWithArgs('publish', [
$this->channelManager->getRedisKey('1234', 'presence-channel'), $this->channelManager->getRedisKey('1234', 'presence-channel'),
$message->getPayload(), $message->getPayload(),
]); ]);
@ -410,7 +438,9 @@ class PresenceChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
->getAppStatistics('1234')
->then(function ($statistic) {
$this->assertEquals([ $this->assertEquals([
'peak_connections_count' => 1, 'peak_connections_count' => 1,
'websocket_messages_count' => 1, 'websocket_messages_count' => 1,
@ -450,7 +480,9 @@ class PresenceChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
->getPublishClient()
->assertCalledWithArgsCount(1, 'publish', [
$this->channelManager->getRedisKey('1234', 'presence-channel'), $this->channelManager->getRedisKey('1234', 'presence-channel'),
json_encode([ json_encode([
'event' => 'some-event', 'event' => 'some-event',
@ -496,7 +528,9 @@ class PresenceChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
->getPublishClient()
->assertCalledWithArgsCount(1, 'publish', [
$this->channelManager->getRedisKey('1234', 'presence-channel'), $this->channelManager->getRedisKey('1234', 'presence-channel'),
json_encode([ json_encode([
'event' => 'some-event', 'event' => 'some-event',

View File

@ -48,7 +48,9 @@ class PrivateChannelTest extends TestCase
'channel' => 'private-channel', 'channel' => 'private-channel',
]); ]);
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'private-channel')
->then(function ($total) {
$this->assertEquals(1, $total); $this->assertEquals(1, $total);
}); });
} }
@ -57,7 +59,9 @@ class PrivateChannelTest extends TestCase
{ {
$connection = $this->newPrivateConnection('private-channel'); $connection = $this->newPrivateConnection('private-channel');
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'private-channel')
->then(function ($total) {
$this->assertEquals(1, $total); $this->assertEquals(1, $total);
}); });
@ -70,7 +74,9 @@ class PrivateChannelTest extends TestCase
$this->pusherServer->onMessage($connection, $message); $this->pusherServer->onMessage($connection, $message);
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'private-channel')
->then(function ($total) {
$this->assertEquals(0, $total); $this->assertEquals(0, $total);
}); });
} }
@ -116,11 +122,15 @@ class PrivateChannelTest extends TestCase
$rick = $this->newPrivateConnection('private-channel'); $rick = $this->newPrivateConnection('private-channel');
$morty = $this->newPrivateConnection('private-channel'); $morty = $this->newPrivateConnection('private-channel');
$this->statisticsCollector->getStatistics()->then(function ($statistics) { $this->statisticsCollector
->getStatistics()
->then(function ($statistics) {
$this->assertCount(1, $statistics); $this->assertCount(1, $statistics);
}); });
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
->getAppStatistics('1234')
->then(function ($statistic) {
$this->assertEquals([ $this->assertEquals([
'peak_connections_count' => 2, 'peak_connections_count' => 2,
'websocket_messages_count' => 2, 'websocket_messages_count' => 2,
@ -135,7 +145,9 @@ class PrivateChannelTest extends TestCase
$this->newPrivateConnection('private-channel'); $this->newPrivateConnection('private-channel');
$this->newPrivateConnection('private-channel-2'); $this->newPrivateConnection('private-channel-2');
$this->channelManager->getLocalConnections()->then(function ($connections) { $this->channelManager
->getLocalConnections()
->then(function ($connections) {
$this->assertCount(2, $connections); $this->assertCount(2, $connections);
foreach ($connections as $connection) { foreach ($connections as $connection) {
@ -205,9 +217,11 @@ class PrivateChannelTest extends TestCase
$receiver->assertSentEvent('some-event', $message->getPayloadAsArray()); $receiver->assertSentEvent('some-event', $message->getPayloadAsArray());
$this->getSubscribeClient()->assertNothingDispatched(); $this->getSubscribeClient()
->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()
->assertCalledWithArgs('publish', [
$this->channelManager->getRedisKey('1234', 'private-channel'), $this->channelManager->getRedisKey('1234', 'private-channel'),
$message->getPayload(), $message->getPayload(),
]); ]);
@ -244,7 +258,9 @@ class PrivateChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
->getAppStatistics('1234')
->then(function ($statistic) {
$this->assertEquals([ $this->assertEquals([
'peak_connections_count' => 1, 'peak_connections_count' => 1,
'websocket_messages_count' => 1, 'websocket_messages_count' => 1,
@ -284,7 +300,9 @@ class PrivateChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
->getPublishClient()
->assertCalledWithArgsCount(1, 'publish', [
$this->channelManager->getRedisKey('1234', 'private-channel'), $this->channelManager->getRedisKey('1234', 'private-channel'),
json_encode([ json_encode([
'event' => 'some-event', 'event' => 'some-event',
@ -330,7 +348,9 @@ class PrivateChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
->getPublishClient()
->assertCalledWithArgsCount(1, 'publish', [
$this->channelManager->getRedisKey('1234', 'private-channel'), $this->channelManager->getRedisKey('1234', 'private-channel'),
json_encode([ json_encode([
'event' => 'some-event', 'event' => 'some-event',

View File

@ -14,7 +14,9 @@ class PublicChannelTest extends TestCase
{ {
$connection = $this->newActiveConnection(['public-channel']); $connection = $this->newActiveConnection(['public-channel']);
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'public-channel')
->then(function ($total) {
$this->assertEquals(1, $total); $this->assertEquals(1, $total);
}); });
@ -38,7 +40,9 @@ class PublicChannelTest extends TestCase
{ {
$connection = $this->newActiveConnection(['public-channel']); $connection = $this->newActiveConnection(['public-channel']);
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'public-channel')
->then(function ($total) {
$this->assertEquals(1, $total); $this->assertEquals(1, $total);
}); });
@ -51,7 +55,9 @@ class PublicChannelTest extends TestCase
$this->pusherServer->onMessage($connection, $message); $this->pusherServer->onMessage($connection, $message);
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($total) { $this->channelManager
->getGlobalConnectionsCount('1234', 'public-channel')
->then(function ($total) {
$this->assertEquals(0, $total); $this->assertEquals(0, $total);
}); });
} }
@ -97,11 +103,15 @@ class PublicChannelTest extends TestCase
$rick = $this->newActiveConnection(['public-channel']); $rick = $this->newActiveConnection(['public-channel']);
$morty = $this->newActiveConnection(['public-channel']); $morty = $this->newActiveConnection(['public-channel']);
$this->statisticsCollector->getStatistics()->then(function ($statistics) { $this->statisticsCollector
->getStatistics()
->then(function ($statistics) {
$this->assertCount(1, $statistics); $this->assertCount(1, $statistics);
}); });
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
->getAppStatistics('1234')
->then(function ($statistic) {
$this->assertEquals([ $this->assertEquals([
'peak_connections_count' => 2, 'peak_connections_count' => 2,
'websocket_messages_count' => 2, 'websocket_messages_count' => 2,
@ -116,7 +126,9 @@ class PublicChannelTest extends TestCase
$this->newActiveConnection(['public-channel']); $this->newActiveConnection(['public-channel']);
$this->newActiveConnection(['public-channel-2']); $this->newActiveConnection(['public-channel-2']);
$this->channelManager->getLocalConnections()->then(function ($connections) { $this->channelManager
->getLocalConnections()
->then(function ($connections) {
$this->assertCount(2, $connections); $this->assertCount(2, $connections);
foreach ($connections as $connection) { foreach ($connections as $connection) {
@ -186,9 +198,11 @@ class PublicChannelTest extends TestCase
$receiver->assertSentEvent('some-event', $message->getPayloadAsArray()); $receiver->assertSentEvent('some-event', $message->getPayloadAsArray());
$this->getSubscribeClient()->assertNothingDispatched(); $this->getSubscribeClient()
->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()
->assertCalledWithArgs('publish', [
$this->channelManager->getRedisKey('1234', 'public-channel'), $this->channelManager->getRedisKey('1234', 'public-channel'),
$message->getPayload(), $message->getPayload(),
]); ]);
@ -225,7 +239,9 @@ class PublicChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
->getAppStatistics('1234')
->then(function ($statistic) {
$this->assertEquals([ $this->assertEquals([
'peak_connections_count' => 1, 'peak_connections_count' => 1,
'websocket_messages_count' => 1, 'websocket_messages_count' => 1,
@ -265,7 +281,9 @@ class PublicChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
->getPublishClient()
->assertCalledWithArgsCount(1, 'publish', [
$this->channelManager->getRedisKey('1234', 'public-channel'), $this->channelManager->getRedisKey('1234', 'public-channel'),
json_encode([ json_encode([
'event' => 'some-event', 'event' => 'some-event',
@ -311,7 +329,9 @@ class PublicChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
->getPublishClient()
->assertCalledWithArgsCount(1, 'publish', [
$this->channelManager->getRedisKey('1234', 'public-channel'), $this->channelManager->getRedisKey('1234', 'public-channel'),
json_encode([ json_encode([
'event' => 'some-event', 'event' => 'some-event',

View File

@ -19,21 +19,29 @@ class RedisPongRemovalTest extends TestCase
// Make the connection look like it was lost 1 day ago. // Make the connection look like it was lost 1 day ago.
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'public-channel')
->then(function ($count) {
$this->assertEquals(2, $count); $this->assertEquals(2, $count);
}); });
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
->then(function ($expiredConnections) {
$this->assertCount(1, $expiredConnections); $this->assertCount(1, $expiredConnections);
}); });
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'public-channel')
->then(function ($count) {
$this->assertEquals(1, $count); $this->assertEquals(1, $count);
}); });
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
->then(function ($expiredConnections) {
$this->assertCount(0, $expiredConnections); $this->assertCount(0, $expiredConnections);
}); });
} }
@ -51,21 +59,29 @@ class RedisPongRemovalTest extends TestCase
// Make the connection look like it was lost 1 day ago. // Make the connection look like it was lost 1 day ago.
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'private-channel')
->then(function ($count) {
$this->assertEquals(2, $count); $this->assertEquals(2, $count);
}); });
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
->then(function ($expiredConnections) {
$this->assertCount(1, $expiredConnections); $this->assertCount(1, $expiredConnections);
}); });
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'private-channel')
->then(function ($count) {
$this->assertEquals(1, $count); $this->assertEquals(1, $count);
}); });
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
->then(function ($expiredConnections) {
$this->assertCount(0, $expiredConnections); $this->assertCount(0, $expiredConnections);
}); });
} }
@ -83,29 +99,41 @@ class RedisPongRemovalTest extends TestCase
// Make the connection look like it was lost 1 day ago. // Make the connection look like it was lost 1 day ago.
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($count) {
$this->assertEquals(2, $count); $this->assertEquals(2, $count);
}); });
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
->then(function ($expiredConnections) {
$this->assertCount(1, $expiredConnections); $this->assertCount(1, $expiredConnections);
}); });
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(2, $members); $this->assertCount(2, $members);
}); });
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { $this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($count) {
$this->assertEquals(1, $count); $this->assertEquals(1, $count);
}); });
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
->then(function ($expiredConnections) {
$this->assertCount(0, $expiredConnections); $this->assertCount(0, $expiredConnections);
}); });
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(1, $members); $this->assertCount(1, $members);
}); });
} }