Revert "formatting"

This reverts commit 8308a7d16d.
This commit is contained in:
Alex Renoki 2020-12-07 23:30:36 +02:00
parent b6837a05e4
commit cbe4378086
15 changed files with 756 additions and 557 deletions

View File

@ -28,48 +28,50 @@ class FetchChannels extends Controller
} }
} }
return $this->channelManager->getGlobalChannels($request->appId)->then(function ($channels) use ($request, $attributes) { return $this->channelManager
$channels = collect($channels)->keyBy(function ($channel) { ->getGlobalChannels($request->appId)
return $channel instanceof Channel ->then(function ($channels) use ($request, $attributes) {
? $channel->getName() $channels = collect($channels)->keyBy(function ($channel) {
: $channel; return $channel instanceof Channel
? $channel->getName()
: $channel;
});
if ($request->has('filter_by_prefix')) {
$channels = $channels->filter(function ($channel, $channelName) use ($request) {
return Str::startsWith($channelName, $request->filter_by_prefix);
});
}
$channelNames = $channels->map(function ($channel) {
return $channel instanceof Channel
? $channel->getName()
: $channel;
})->toArray();
return $this->channelManager
->getChannelsMembersCount($request->appId, $channelNames)
->then(function ($counts) use ($channels, $attributes) {
$channels = $channels->map(function ($channel) use ($counts, $attributes) {
$info = new stdClass;
$channelName = $channel instanceof Channel
? $channel->getName()
: $channel;
if (in_array('user_count', $attributes)) {
$info->user_count = $counts[$channelName];
}
return $info;
})->sortBy(function ($content, $name) {
return $name;
})->all();
return [
'channels' => $channels ?: new stdClass,
];
});
}); });
if ($request->has('filter_by_prefix')) {
$channels = $channels->filter(function ($channel, $channelName) use ($request) {
return Str::startsWith($channelName, $request->filter_by_prefix);
});
}
$channelNames = $channels->map(function ($channel) {
return $channel instanceof Channel
? $channel->getName()
: $channel;
})->toArray();
return $this->channelManager
->getChannelsMembersCount($request->appId, $channelNames)
->then(function ($counts) use ($channels, $attributes) {
$channels = $channels->map(function ($channel) use ($counts, $attributes) {
$info = new stdClass;
$channelName = $channel instanceof Channel
? $channel->getName()
: $channel;
if (in_array('user_count', $attributes)) {
$info->user_count = $counts[$channelName];
}
return $info;
})->sortBy(function ($content, $name) {
return $name;
})->all();
return [
'channels' => $channels ?: new stdClass,
];
});
});
} }
} }

View File

@ -163,21 +163,23 @@ class LocalChannelManager implements ChannelManager
return Helpers::createFulfilledPromise(false); return Helpers::createFulfilledPromise(false);
} }
$this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { $this->getLocalChannels($connection->app->id)
collect($channels)->each->unsubscribe($connection); ->then(function ($channels) use ($connection) {
collect($channels)->each->unsubscribe($connection);
collect($channels) collect($channels)
->reject->hasConnections() ->reject->hasConnections()
->each(function (Channel $channel, string $channelName) use ($connection) { ->each(function (Channel $channel, string $channelName) use ($connection) {
unset($this->channels[$connection->app->id][$channelName]); unset($this->channels[$connection->app->id][$channelName]);
}); });
}); });
$this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { $this->getLocalChannels($connection->app->id)
if (count($channels) === 0) { ->then(function ($channels) use ($connection) {
unset($this->channels[$connection->app->id]); if (count($channels) === 0) {
} unset($this->channels[$connection->app->id]);
}); }
});
return Helpers::createFulfilledPromise(true); return Helpers::createFulfilledPromise(true);
} }
@ -250,17 +252,18 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface
{ {
return $this->getLocalChannels($appId)->then(function ($channels) use ($channelName) { return $this->getLocalChannels($appId)
return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) { ->then(function ($channels) use ($channelName) {
return $collection->filter(function (Channel $channel) use ($channelName) { return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) {
return $channel->getName() === $channelName; return $collection->filter(function (Channel $channel) use ($channelName) {
}); return $channel->getName() === $channelName;
}) });
->flatMap(function (Channel $channel) { })
return collect($channel->getConnections())->pluck('socketId'); ->flatMap(function (Channel $channel) {
}) return collect($channel->getConnections())->pluck('socketId');
->unique()->count(); })
}); ->unique()->count();
});
} }
/** /**
@ -452,15 +455,16 @@ class LocalChannelManager implements ChannelManager
*/ */
public function updateConnectionInChannels($connection): PromiseInterface public function updateConnectionInChannels($connection): PromiseInterface
{ {
return $this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { return $this->getLocalChannels($connection->app->id)
foreach ($channels as $channel) { ->then(function ($channels) use ($connection) {
if ($channel->hasConnection($connection)) { foreach ($channels as $channel) {
$channel->saveConnection($connection); if ($channel->hasConnection($connection)) {
$channel->saveConnection($connection);
}
} }
}
return true; return true;
}); });
} }
/** /**

View File

@ -137,13 +137,15 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
{ {
return $this->getGlobalChannels($connection->app->id)->then(function ($channels) use ($connection) { return $this->getGlobalChannels($connection->app->id)
foreach ($channels as $channel) { ->then(function ($channels) use ($connection) {
$this->unsubscribeFromChannel($connection, $channel, new stdClass); foreach ($channels as $channel) {
} $this->unsubscribeFromChannel($connection, $channel, new stdClass);
})->then(function () use ($connection) { }
return parent::unsubscribeFromAllChannels($connection); })
}); ->then(function () use ($connection) {
return parent::unsubscribeFromAllChannels($connection);
});
} }
/** /**
@ -156,15 +158,19 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{ {
return $this->subscribeToTopic($connection->app->id, $channelName)->then(function () use ($connection) { return $this->subscribeToTopic($connection->app->id, $channelName)
return $this->addConnectionToSet($connection, Carbon::now()); ->then(function () use ($connection) {
})->then(function () use ($connection, $channelName) { return $this->addConnectionToSet($connection, Carbon::now());
return $this->addChannelToSet($connection->app->id, $channelName); })
})->then(function () use ($connection, $channelName) { ->then(function () use ($connection, $channelName) {
return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1); return $this->addChannelToSet($connection->app->id, $channelName);
})->then(function () use ($connection, $channelName, $payload) { })
return parent::subscribeToChannel($connection, $channelName, $payload); ->then(function () use ($connection, $channelName) {
}); return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1);
})
->then(function () use ($connection, $channelName, $payload) {
return parent::subscribeToChannel($connection, $channelName, $payload);
});
} }
/** /**
@ -193,11 +199,14 @@ class RedisChannelManager extends LocalChannelManager
$this->unsubscribeFromTopic($connection->app->id, $channelName); $this->unsubscribeFromTopic($connection->app->id, $channelName);
} }
}); });
})->then(function () use ($connection, $channelName) { })
->then(function () use ($connection, $channelName) {
return $this->removeChannelFromSet($connection->app->id, $channelName); return $this->removeChannelFromSet($connection->app->id, $channelName);
})->then(function () use ($connection) { })
->then(function () use ($connection) {
return $this->removeConnectionFromSet($connection); return $this->removeConnectionFromSet($connection);
})->then(function () use ($connection, $channelName, $payload) { })
->then(function () use ($connection, $channelName, $payload) {
return parent::unsubscribeFromChannel($connection, $channelName, $payload); return parent::unsubscribeFromChannel($connection, $channelName, $payload);
}); });
} }
@ -211,9 +220,10 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function subscribeToApp($appId): PromiseInterface public function subscribeToApp($appId): PromiseInterface
{ {
return $this->subscribeToTopic($appId)->then(function () use ($appId) { return $this->subscribeToTopic($appId)
return $this->incrementSubscriptionsCount($appId); ->then(function () use ($appId) {
}); return $this->incrementSubscriptionsCount($appId);
});
} }
/** /**
@ -225,9 +235,10 @@ class RedisChannelManager extends LocalChannelManager
*/ */
public function unsubscribeFromApp($appId): PromiseInterface public function unsubscribeFromApp($appId): PromiseInterface
{ {
return $this->unsubscribeFromTopic($appId)->then(function () use ($appId) { return $this->unsubscribeFromTopic($appId)
return $this->decrementSubscriptionsCount($appId); ->then(function () use ($appId) {
}); return $this->decrementSubscriptionsCount($appId);
});
} }
/** /**
@ -297,7 +308,8 @@ class RedisChannelManager extends LocalChannelManager
return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user)) return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user))
->then(function () use ($connection, $channel, $user) { ->then(function () use ($connection, $channel, $user) {
return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId); return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId);
})->then(function () use ($connection, $user, $channel, $payload) { })
->then(function () use ($connection, $user, $channel, $payload) {
return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload); return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload);
}); });
} }
@ -316,7 +328,8 @@ class RedisChannelManager extends LocalChannelManager
return $this->removeUserData($connection->app->id, $channel, $connection->socketId) return $this->removeUserData($connection->app->id, $channel, $connection->socketId)
->then(function () use ($connection, $channel, $user) { ->then(function () use ($connection, $channel, $user) {
return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId); return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId);
})->then(function () use ($connection, $user, $channel) { })
->then(function () use ($connection, $user, $channel) {
return parent::userLeftPresenceChannel($connection, $user, $channel); return parent::userLeftPresenceChannel($connection, $user, $channel);
}); });
} }
@ -370,9 +383,10 @@ class RedisChannelManager extends LocalChannelManager
); );
} }
return $this->publishClient->exec()->then(function ($data) use ($channelNames) { return $this->publishClient->exec()
return array_combine($channelNames, $data); ->then(function ($data) use ($channelNames) {
}); return array_combine($channelNames, $data);
});
} }
/** /**
@ -399,9 +413,10 @@ class RedisChannelManager extends LocalChannelManager
public function connectionPonged(ConnectionInterface $connection): PromiseInterface public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{ {
// This will update the score with the current timestamp. // This will update the score with the current timestamp.
return $this->addConnectionToSet($connection, Carbon::now())->then(function () use ($connection) { return $this->addConnectionToSet($connection, Carbon::now())
return parent::connectionPonged($connection); ->then(function () use ($connection) {
}); return parent::connectionPonged($connection);
});
} }
/** /**

View File

@ -54,7 +54,8 @@ class PresenceChannel extends PrivateChannel
]), ]),
])); ]));
}); });
})->then(function () use ($connection, $user, $payload) { })
->then(function () use ($connection, $user, $payload) {
// The `pusher_internal:member_added` event is triggered when a user joins a channel. // The `pusher_internal:member_added` event is triggered when a user joins a channel.
// It's quite possible that a user can have multiple connections to the same channel // It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open) // (for example by having multiple browser tabs open)
@ -103,47 +104,50 @@ class PresenceChannel extends PrivateChannel
{ {
$truth = parent::unsubscribe($connection); $truth = parent::unsubscribe($connection);
$this->channelManager->getChannelMember($connection, $this->getName())->then(function ($user) { $this->channelManager
return @json_decode($user); ->getChannelMember($connection, $this->getName())
})->then(function ($user) use ($connection) { ->then(function ($user) {
if (! $user) { return @json_decode($user);
return; })
} ->then(function ($user) use ($connection) {
if (! $user) {
return;
}
$this->channelManager $this->channelManager
->userLeftPresenceChannel($connection, $user, $this->getName()) ->userLeftPresenceChannel($connection, $user, $this->getName())
->then(function () use ($connection, $user) { ->then(function () use ($connection, $user) {
// The `pusher_internal:member_removed` is triggered when a user leaves a channel. // The `pusher_internal:member_removed` is triggered when a user leaves a channel.
// It's quite possible that a user can have multiple connections to the same channel // It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open) // (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the last one is closed. // and in this case the events will only be triggered when the last one is closed.
$this->channelManager $this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) ->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($connection, $user) { ->then(function ($sockets) use ($connection, $user) {
if (count($sockets) === 0) { if (count($sockets) === 0) {
$memberRemovedPayload = [ $memberRemovedPayload = [
'event' => 'pusher_internal:member_removed', 'event' => 'pusher_internal:member_removed',
'channel' => $this->getName(), 'channel' => $this->getName(),
'data' => json_encode([ 'data' => json_encode([
'user_id' => $user->user_id, 'user_id' => $user->user_id,
]), ]),
]; ];
$this->broadcastToEveryoneExcept( $this->broadcastToEveryoneExcept(
(object) $memberRemovedPayload, $connection->socketId, (object) $memberRemovedPayload, $connection->socketId,
$connection->app->id $connection->app->id
); );
UnsubscribedFromChannel::dispatch( UnsubscribedFromChannel::dispatch(
$connection->app->id, $connection->app->id,
$connection->socketId, $connection->socketId,
$this->getName(), $this->getName(),
$user $user
); );
} }
}); });
}); });
}); });
return $truth; return $truth;
} }

View File

@ -304,12 +304,14 @@ class StartServer extends Command
// Get all local connections and close them. They will // Get all local connections and close them. They will
// be automatically be unsubscribed from all channels. // be automatically be unsubscribed from all channels.
$channelManager->getLocalConnections()->then(function ($connections) { $channelManager->getLocalConnections()
foreach ($connections as $connection) { ->then(function ($connections) {
$connection->close(); foreach ($connections as $connection) {
} $connection->close();
})->then(function () { }
$this->loop->stop(); })
}); ->then(function () {
$this->loop->stop();
});
} }
} }

View File

@ -32,11 +32,13 @@ class PusherChannelProtocolMessage extends PusherClientMessage
*/ */
protected function ping(ConnectionInterface $connection) protected function ping(ConnectionInterface $connection)
{ {
$this->channelManager->connectionPonged($connection)->then(function () use ($connection) { $this->channelManager
$connection->send(json_encode(['event' => 'pusher:pong'])); ->connectionPonged($connection)
->then(function () use ($connection) {
$connection->send(json_encode(['event' => 'pusher:pong']));
ConnectionPonged::dispatch($connection->app->id, $connection->socketId); ConnectionPonged::dispatch($connection->app->id, $connection->socketId);
}); });
} }
/** /**

View File

@ -98,11 +98,13 @@ class MemoryCollector implements StatisticsCollector
$this->createRecord($statistic, $appId); $this->createRecord($statistic, $appId);
$this->channelManager->getGlobalConnectionsCount($appId)->then(function ($connections) use ($statistic) { $this->channelManager
$statistic->reset( ->getGlobalConnectionsCount($appId)
is_null($connections) ? 0 : $connections ->then(function ($connections) use ($statistic) {
); $statistic->reset(
}); is_null($connections) ? 0 : $connections
);
});
} }
}); });
} }

View File

@ -84,24 +84,30 @@ class RedisCollector extends MemoryCollector
->hincrby( ->hincrby(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'current_connections_count', 1 'current_connections_count', 1
)->then(function ($currentConnectionsCount) use ($appId) { )
->then(function ($currentConnectionsCount) use ($appId) {
// Get the peak connections count from Redis. // Get the peak connections count from Redis.
$this->channelManager->getPublishClient()->hget( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'peak_connections_count' ->hget(
)->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) {
// Extract the greatest number between the current peak connection count
// and the current connection number.
$peakConnectionsCount = is_null($currentPeakConnectionCount)
? $currentConnectionsCount
: max($currentPeakConnectionCount, $currentConnectionsCount);
// Then set it to the database.
$this->channelManager->getPublishClient()->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']), $this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count', $peakConnectionsCount 'peak_connections_count'
); )
}); ->then(function ($currentPeakConnectionCount) use ($currentConnectionsCount, $appId) {
// Extract the greatest number between the current peak connection count
// and the current connection number.
$peakConnectionsCount = is_null($currentPeakConnectionCount)
? $currentConnectionsCount
: max($currentPeakConnectionCount, $currentConnectionsCount);
// Then set it to the database.
$this->channelManager
->getPublishClient()
->hset(
$this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count', $peakConnectionsCount
);
});
}); });
} }
@ -129,10 +135,12 @@ class RedisCollector extends MemoryCollector
: max($currentPeakConnectionCount, $currentConnectionsCount); : max($currentPeakConnectionCount, $currentConnectionsCount);
// Then set it to the database. // Then set it to the database.
$this->channelManager->getPublishClient()->hset( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'peak_connections_count', $peakConnectionsCount ->hset(
); $this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count', $peakConnectionsCount
);
}); });
}); });
} }
@ -145,32 +153,35 @@ class RedisCollector extends MemoryCollector
public function save() public function save()
{ {
$this->lock()->get(function () { $this->lock()->get(function () {
$this->channelManager->getPublishClient()->smembers(static::$redisSetName)->then(function ($members) { $this->channelManager
foreach ($members as $appId) { ->getPublishClient()
$this->channelManager ->smembers(static::$redisSetName)
->getPublishClient() ->then(function ($members) {
->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) foreach ($members as $appId) {
->then(function ($list) use ($appId) { $this->channelManager
if (! $list) { ->getPublishClient()
return; ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats']))
} ->then(function ($list) use ($appId) {
if (! $list) {
return;
}
$statistic = $this->arrayToStatisticInstance( $statistic = $this->arrayToStatisticInstance(
$appId, Helpers::redisListToArray($list) $appId, Helpers::redisListToArray($list)
); );
$this->createRecord($statistic, $appId); $this->createRecord($statistic, $appId);
$this->channelManager $this->channelManager
->getGlobalConnectionsCount($appId) ->getGlobalConnectionsCount($appId)
->then(function ($currentConnectionsCount) use ($appId) { ->then(function ($currentConnectionsCount) use ($appId) {
$currentConnectionsCount === 0 || is_null($currentConnectionsCount) $currentConnectionsCount === 0 || is_null($currentConnectionsCount)
? $this->resetAppTraces($appId) ? $this->resetAppTraces($appId)
: $this->resetStatistics($appId, $currentConnectionsCount); : $this->resetStatistics($appId, $currentConnectionsCount);
}); });
}); });
} }
}); });
}); });
} }
@ -195,22 +206,25 @@ class RedisCollector extends MemoryCollector
*/ */
public function getStatistics(): PromiseInterface public function getStatistics(): PromiseInterface
{ {
return $this->channelManager->getPublishClient()->smembers(static::$redisSetName)->then(function ($members) { return $this->channelManager
$appsWithStatistics = []; ->getPublishClient()
->smembers(static::$redisSetName)
->then(function ($members) {
$appsWithStatistics = [];
foreach ($members as $appId) { foreach ($members as $appId) {
$this->channelManager $this->channelManager
->getPublishClient() ->getPublishClient()
->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats']))
->then(function ($list) use ($appId, &$appsWithStatistics) { ->then(function ($list) use ($appId, &$appsWithStatistics) {
$appsWithStatistics[$appId] = $this->arrayToStatisticInstance( $appsWithStatistics[$appId] = $this->arrayToStatisticInstance(
$appId, Helpers::redisListToArray($list) $appId, Helpers::redisListToArray($list)
); );
}); });
} }
return $appsWithStatistics; return $appsWithStatistics;
}); });
} }
/** /**
@ -240,25 +254,33 @@ class RedisCollector extends MemoryCollector
*/ */
public function resetStatistics($appId, int $currentConnectionCount) public function resetStatistics($appId, int $currentConnectionCount)
{ {
$this->channelManager->getPublishClient()->hset( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'current_connections_count', $currentConnectionCount ->hset(
); $this->channelManager->getRedisKey($appId, null, ['stats']),
'current_connections_count', $currentConnectionCount
);
$this->channelManager->getPublishClient()->hset( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'peak_connections_count', $currentConnectionCount ->hset(
); $this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count', $currentConnectionCount
);
$this->channelManager->getPublishClient()->hset( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'websocket_messages_count', 0 ->hset(
); $this->channelManager->getRedisKey($appId, null, ['stats']),
'websocket_messages_count', 0
);
$this->channelManager->getPublishClient()->hset( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'api_messages_count', 0 ->hset(
); $this->channelManager->getRedisKey($appId, null, ['stats']),
'api_messages_count', 0
);
} }
/** /**
@ -270,27 +292,37 @@ class RedisCollector extends MemoryCollector
*/ */
public function resetAppTraces($appId) public function resetAppTraces($appId)
{ {
$this->channelManager->getPublishClient()->hdel( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'current_connections_count' ->hdel(
); $this->channelManager->getRedisKey($appId, null, ['stats']),
'current_connections_count'
);
$this->channelManager->getPublishClient()->hdel( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'peak_connections_count' ->hdel(
); $this->channelManager->getRedisKey($appId, null, ['stats']),
'peak_connections_count'
);
$this->channelManager->getPublishClient()->hdel( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'websocket_messages_count' ->hdel(
); $this->channelManager->getRedisKey($appId, null, ['stats']),
'websocket_messages_count'
);
$this->channelManager->getPublishClient()->hdel( $this->channelManager
$this->channelManager->getRedisKey($appId, null, ['stats']), ->getPublishClient()
'api_messages_count' ->hdel(
); $this->channelManager->getRedisKey($appId, null, ['stats']),
'api_messages_count'
);
$this->channelManager->getPublishClient()->srem(static::$redisSetName, $appId); $this->channelManager
->getPublishClient()
->srem(static::$redisSetName, $appId);
} }
/** /**
@ -301,7 +333,9 @@ class RedisCollector extends MemoryCollector
*/ */
protected function ensureAppIsInSet($appId) protected function ensureAppIsInSet($appId)
{ {
$this->channelManager->getPublishClient()->sadd(static::$redisSetName, $appId); $this->channelManager
->getPublishClient()
->sadd(static::$redisSetName, $appId);
return $this->channelManager->getPublishClient(); return $this->channelManager->getPublishClient();
} }

View File

@ -62,9 +62,11 @@ class AsyncRedisQueueTest extends TestCase
$this->queue->later(-300, $jobs[2]); $this->queue->later(-300, $jobs[2]);
$this->queue->later(-100, $jobs[3]); $this->queue->later(-100, $jobs[3]);
$this->getPublishClient()->zcard('queues:default:delayed')->then(function ($count) { $this->getPublishClient()
$this->assertEquals(4, $count); ->zcard('queues:default:delayed')
}); ->then(function ($count) {
$this->assertEquals(4, $count);
});
$this->unregisterManagers(); $this->unregisterManagers();
@ -85,7 +87,8 @@ class AsyncRedisQueueTest extends TestCase
$this->unregisterManagers(); $this->unregisterManagers();
$this->getPublishClient()->assertCalledCount(1, 'eval'); $this->getPublishClient()
->assertCalledCount(1, 'eval');
$redisJob = $this->queue->pop(); $redisJob = $this->queue->pop();
@ -123,7 +126,8 @@ class AsyncRedisQueueTest extends TestCase
$this->unregisterManagers(); $this->unregisterManagers();
$this->getPublishClient()->assertCalledCount(1, 'eval'); $this->getPublishClient()
->assertCalledCount(1, 'eval');
$redisJob = $this->queue->pop(); $redisJob = $this->queue->pop();
@ -148,7 +152,8 @@ class AsyncRedisQueueTest extends TestCase
$this->queue->push($job1); $this->queue->push($job1);
$this->queue->push($job2); $this->queue->push($job2);
$this->getPublishClient()->assertCalledCount(2, 'eval'); $this->getPublishClient()
->assertCalledCount(2, 'eval');
$this->unregisterManagers(); $this->unregisterManagers();

View File

@ -54,23 +54,31 @@ class ConnectionTest extends TestCase
{ {
$connection = $this->newActiveConnection(['public-channel']); $connection = $this->newActiveConnection(['public-channel']);
$this->channelManager->getGlobalChannels('1234')->then(function ($channels) { $this->channelManager
$this->assertCount(1, $channels); ->getGlobalChannels('1234')
}); ->then(function ($channels) {
$this->assertCount(1, $channels);
});
$this->channelManager->getGlobalConnectionsCount('1234')->then(function ($total) { $this->channelManager
$this->assertEquals(1, $total); ->getGlobalConnectionsCount('1234')
}); ->then(function ($total) {
$this->assertEquals(1, $total);
});
$this->pusherServer->onClose($connection); $this->pusherServer->onClose($connection);
$this->channelManager->getGlobalConnectionsCount('1234')->then(function ($total) { $this->channelManager
$this->assertEquals(0, $total); ->getGlobalConnectionsCount('1234')
}); ->then(function ($total) {
$this->assertEquals(0, $total);
});
$this->channelManager->getGlobalChannels('1234')->then(function ($channels) { $this->channelManager
$this->assertCount(0, $channels); ->getGlobalChannels('1234')
}); ->then(function ($channels) {
$this->assertCount(0, $channels);
});
} }
public function test_websocket_exceptions_are_sent() public function test_websocket_exceptions_are_sent()

View File

@ -20,21 +20,27 @@ class LocalPongRemovalTest extends TestCase
$this->channelManager->updateConnectionInChannels($activeConnection); $this->channelManager->updateConnectionInChannels($activeConnection);
$this->channelManager->updateConnectionInChannels($obsoleteConnection); $this->channelManager->updateConnectionInChannels($obsoleteConnection);
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(2, $count); ->getGlobalConnectionsCount('1234', 'public-channel')
}); ->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(1, $count); ->getGlobalConnectionsCount('1234', 'public-channel')
}); ->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->channelManager->getLocalConnections()->then(function ($connections) use ($activeConnection) { $this->channelManager
$connection = $connections[$activeConnection->socketId]; ->getLocalConnections()
->then(function ($connections) use ($activeConnection) {
$connection = $connections[$activeConnection->socketId];
$this->assertEquals($activeConnection->socketId, $connection->socketId); $this->assertEquals($activeConnection->socketId, $connection->socketId);
}); });
} }
public function test_not_ponged_connections_do_get_removed_on_local_for_private_channels() public function test_not_ponged_connections_do_get_removed_on_local_for_private_channels()
@ -51,21 +57,27 @@ class LocalPongRemovalTest extends TestCase
$this->channelManager->updateConnectionInChannels($activeConnection); $this->channelManager->updateConnectionInChannels($activeConnection);
$this->channelManager->updateConnectionInChannels($obsoleteConnection); $this->channelManager->updateConnectionInChannels($obsoleteConnection);
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(2, $count); ->getGlobalConnectionsCount('1234', 'private-channel')
}); ->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(1, $count); ->getGlobalConnectionsCount('1234', 'private-channel')
}); ->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->channelManager->getLocalConnections()->then(function ($connections) use ($activeConnection) { $this->channelManager
$connection = $connections[$activeConnection->socketId]; ->getLocalConnections()
->then(function ($connections) use ($activeConnection) {
$connection = $connections[$activeConnection->socketId];
$this->assertEquals($activeConnection->socketId, $connection->socketId); $this->assertEquals($activeConnection->socketId, $connection->socketId);
}); });
} }
public function test_not_ponged_connections_do_get_removed_on_local_for_presence_channels() public function test_not_ponged_connections_do_get_removed_on_local_for_presence_channels()
@ -82,28 +94,38 @@ class LocalPongRemovalTest extends TestCase
$this->channelManager->updateConnectionInChannels($activeConnection); $this->channelManager->updateConnectionInChannels($activeConnection);
$this->channelManager->updateConnectionInChannels($obsoleteConnection); $this->channelManager->updateConnectionInChannels($obsoleteConnection);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(2, $count); ->getGlobalConnectionsCount('1234', 'presence-channel')
}); ->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
$this->assertCount(2, $members); ->getChannelMembers('1234', 'presence-channel')
}); ->then(function ($members) {
$this->assertCount(2, $members);
});
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(1, $count); ->getGlobalConnectionsCount('1234', 'presence-channel')
}); ->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->channelManager->getLocalConnections()->then(function ($connections) use ($activeConnection) { $this->channelManager
$connection = $connections[$activeConnection->socketId]; ->getLocalConnections()
->then(function ($connections) use ($activeConnection) {
$connection = $connections[$activeConnection->socketId];
$this->assertEquals($activeConnection->socketId, $connection->socketId); $this->assertEquals($activeConnection->socketId, $connection->socketId);
}); });
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
$this->assertCount(1, $members); ->getChannelMembers('1234', 'presence-channel')
}); ->then(function ($members) {
$this->assertCount(1, $members);
});
} }
} }

View File

@ -58,9 +58,11 @@ class PresenceChannelTest extends TestCase
'channel' => 'presence-channel', 'channel' => 'presence-channel',
]); ]);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(1, $total); ->getGlobalConnectionsCount('1234', 'presence-channel')
}); ->then(function ($total) {
$this->assertEquals(1, $total);
});
} }
public function test_connect_to_presence_channel_when_user_with_same_ids_is_already_joined() public function test_connect_to_presence_channel_when_user_with_same_ids_is_already_joined()
@ -110,13 +112,17 @@ class PresenceChannelTest extends TestCase
]), ]),
]); ]);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(3, $total); ->getGlobalConnectionsCount('1234', 'presence-channel')
}); ->then(function ($total) {
$this->assertEquals(3, $total);
});
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
$this->assertCount(2, $members); ->getChannelMembers('1234', 'presence-channel')
}); ->then(function ($members) {
$this->assertCount(2, $members);
});
} }
public function test_presence_channel_broadcast_member_events() public function test_presence_channel_broadcast_member_events()
@ -129,9 +135,11 @@ class PresenceChannelTest extends TestCase
'data' => json_encode(['user_id' => 2]), 'data' => json_encode(['user_id' => 2]),
]); ]);
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
$this->assertCount(2, $members); ->getChannelMembers('1234', 'presence-channel')
}); ->then(function ($members) {
$this->assertCount(2, $members);
});
$this->pusherServer->onClose($morty); $this->pusherServer->onClose($morty);
@ -140,23 +148,29 @@ class PresenceChannelTest extends TestCase
'data' => json_encode(['user_id' => 2]), 'data' => json_encode(['user_id' => 2]),
]); ]);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(1, $total); ->getGlobalConnectionsCount('1234', 'presence-channel')
}); ->then(function ($total) {
$this->assertEquals(1, $total);
});
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) use ($rick) { $this->channelManager
$this->assertCount(1, $members); ->getChannelMembers('1234', 'presence-channel')
$this->assertEquals(1, $members[$rick->socketId]->user_id); ->then(function ($members) use ($rick) {
}); $this->assertCount(1, $members);
$this->assertEquals(1, $members[$rick->socketId]->user_id);
});
} }
public function test_unsubscribe_from_presence_channel() public function test_unsubscribe_from_presence_channel()
{ {
$connection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); $connection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(1, $total); ->getGlobalConnectionsCount('1234', 'presence-channel')
}); ->then(function ($total) {
$this->assertEquals(1, $total);
});
$message = new Mocks\Message([ $message = new Mocks\Message([
'event' => 'pusher:unsubscribe', 'event' => 'pusher:unsubscribe',
@ -167,10 +181,12 @@ class PresenceChannelTest extends TestCase
$this->pusherServer->onMessage($connection, $message); $this->pusherServer->onMessage($connection, $message);
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(0, $total); ->getGlobalConnectionsCount('1234', 'presence-channel')
}); ->then(function ($total) {
} $this->assertEquals(0, $total);
});
}
public function test_can_whisper_to_private_channel() public function test_can_whisper_to_private_channel()
{ {
@ -213,18 +229,22 @@ class PresenceChannelTest extends TestCase
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); $rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]); $morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
$this->statisticsCollector->getStatistics()->then(function ($statistics) { $this->statisticsCollector
$this->assertCount(1, $statistics); ->getStatistics()
}); ->then(function ($statistics) {
$this->assertCount(1, $statistics);
});
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
$this->assertEquals([ ->getAppStatistics('1234')
'peak_connections_count' => 2, ->then(function ($statistic) {
'websocket_messages_count' => 2, $this->assertEquals([
'api_messages_count' => 0, 'peak_connections_count' => 2,
'app_id' => '1234', 'websocket_messages_count' => 2,
], $statistic->toArray()); 'api_messages_count' => 0,
}); 'app_id' => '1234',
], $statistic->toArray());
});
} }
public function test_local_connections_for_presence_channels() public function test_local_connections_for_presence_channels()
@ -232,15 +252,17 @@ class PresenceChannelTest extends TestCase
$this->newPresenceConnection('presence-channel', ['user_id' => 1]); $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$this->newPresenceConnection('presence-channel-2', ['user_id' => 2]); $this->newPresenceConnection('presence-channel-2', ['user_id' => 2]);
$this->channelManager->getLocalConnections()->then(function ($connections) { $this->channelManager
$this->assertCount(2, $connections); ->getLocalConnections()
->then(function ($connections) {
$this->assertCount(2, $connections);
foreach ($connections as $connection) { foreach ($connections as $connection) {
$this->assertInstanceOf( $this->assertInstanceOf(
ConnectionInterface::class, $connection ConnectionInterface::class, $connection
); );
} }
}); });
} }
public function test_multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection() public function test_multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection()
@ -282,13 +304,17 @@ class PresenceChannelTest extends TestCase
$this->assertCount(0, $sockets); $this->assertCount(0, $sockets);
}); });
$this->channelManager->getMemberSockets('2', '1234', 'presence-channel')->then(function ($sockets) { $this->channelManager
$this->assertCount(0, $sockets); ->getMemberSockets('2', '1234', 'presence-channel')
}); ->then(function ($sockets) {
$this->assertCount(0, $sockets);
});
$this->channelManager->getMemberSockets('observer', '1234', 'presence-channel')->then(function ($sockets) { $this->channelManager
$this->assertCount(1, $sockets); ->getMemberSockets('observer', '1234', 'presence-channel')
}); ->then(function ($sockets) {
$this->assertCount(1, $sockets);
});
} }
public function test_events_are_processed_by_on_message_on_presence_channels() public function test_events_are_processed_by_on_message_on_presence_channels()
@ -374,10 +400,11 @@ class PresenceChannelTest extends TestCase
$this->getSubscribeClient() $this->getSubscribeClient()
->assertNothingDispatched(); ->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()
$this->channelManager->getRedisKey('1234', 'presence-channel'), ->assertCalledWithArgs('publish', [
$message->getPayload(), $this->channelManager->getRedisKey('1234', 'presence-channel'),
]); $message->getPayload(),
]);
} }
public function test_it_fires_the_event_to_presence_channel() public function test_it_fires_the_event_to_presence_channel()
@ -411,14 +438,16 @@ class PresenceChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
$this->assertEquals([ ->getAppStatistics('1234')
'peak_connections_count' => 1, ->then(function ($statistic) {
'websocket_messages_count' => 1, $this->assertEquals([
'api_messages_count' => 1, 'peak_connections_count' => 1,
'app_id' => '1234', 'websocket_messages_count' => 1,
], $statistic->toArray()); 'api_messages_count' => 1,
}); 'app_id' => '1234',
], $statistic->toArray());
});
} }
public function test_it_fires_event_across_servers_when_there_are_not_users_locally_for_presence_channel() public function test_it_fires_event_across_servers_when_there_are_not_users_locally_for_presence_channel()
@ -451,17 +480,19 @@ class PresenceChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
$this->channelManager->getRedisKey('1234', 'presence-channel'), ->getPublishClient()
json_encode([ ->assertCalledWithArgsCount(1, 'publish', [
'event' => 'some-event', $this->channelManager->getRedisKey('1234', 'presence-channel'),
'channel' => 'presence-channel', json_encode([
'data' => json_encode(['some-data' => 'yes']), 'event' => 'some-event',
'appId' => '1234', 'channel' => 'presence-channel',
'socketId' => null, 'data' => json_encode(['some-data' => 'yes']),
'serverId' => $this->channelManager->getServerId(), 'appId' => '1234',
]), 'socketId' => null,
]); 'serverId' => $this->channelManager->getServerId(),
]),
]);
} }
} }
@ -497,17 +528,19 @@ class PresenceChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
$this->channelManager->getRedisKey('1234', 'presence-channel'), ->getPublishClient()
json_encode([ ->assertCalledWithArgsCount(1, 'publish', [
'event' => 'some-event', $this->channelManager->getRedisKey('1234', 'presence-channel'),
'channel' => 'presence-channel', json_encode([
'data' => json_encode(['some-data' => 'yes']), 'event' => 'some-event',
'appId' => '1234', 'channel' => 'presence-channel',
'socketId' => null, 'data' => json_encode(['some-data' => 'yes']),
'serverId' => $this->channelManager->getServerId(), 'appId' => '1234',
]), 'socketId' => null,
]); 'serverId' => $this->channelManager->getServerId(),
]),
]);
} }
$wsConnection->assertSentEvent('some-event', [ $wsConnection->assertSentEvent('some-event', [

View File

@ -48,18 +48,22 @@ class PrivateChannelTest extends TestCase
'channel' => 'private-channel', 'channel' => 'private-channel',
]); ]);
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(1, $total); ->getGlobalConnectionsCount('1234', 'private-channel')
}); ->then(function ($total) {
$this->assertEquals(1, $total);
});
} }
public function test_unsubscribe_from_private_channel() public function test_unsubscribe_from_private_channel()
{ {
$connection = $this->newPrivateConnection('private-channel'); $connection = $this->newPrivateConnection('private-channel');
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(1, $total); ->getGlobalConnectionsCount('1234', 'private-channel')
}); ->then(function ($total) {
$this->assertEquals(1, $total);
});
$message = new Mocks\Message([ $message = new Mocks\Message([
'event' => 'pusher:unsubscribe', 'event' => 'pusher:unsubscribe',
@ -70,9 +74,11 @@ class PrivateChannelTest extends TestCase
$this->pusherServer->onMessage($connection, $message); $this->pusherServer->onMessage($connection, $message);
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(0, $total); ->getGlobalConnectionsCount('1234', 'private-channel')
}); ->then(function ($total) {
$this->assertEquals(0, $total);
});
} }
public function test_can_whisper_to_private_channel() public function test_can_whisper_to_private_channel()
@ -116,18 +122,22 @@ class PrivateChannelTest extends TestCase
$rick = $this->newPrivateConnection('private-channel'); $rick = $this->newPrivateConnection('private-channel');
$morty = $this->newPrivateConnection('private-channel'); $morty = $this->newPrivateConnection('private-channel');
$this->statisticsCollector->getStatistics()->then(function ($statistics) { $this->statisticsCollector
$this->assertCount(1, $statistics); ->getStatistics()
}); ->then(function ($statistics) {
$this->assertCount(1, $statistics);
});
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
$this->assertEquals([ ->getAppStatistics('1234')
'peak_connections_count' => 2, ->then(function ($statistic) {
'websocket_messages_count' => 2, $this->assertEquals([
'api_messages_count' => 0, 'peak_connections_count' => 2,
'app_id' => '1234', 'websocket_messages_count' => 2,
], $statistic->toArray()); 'api_messages_count' => 0,
}); 'app_id' => '1234',
], $statistic->toArray());
});
} }
public function test_local_connections_for_private_channels() public function test_local_connections_for_private_channels()
@ -135,15 +145,17 @@ class PrivateChannelTest extends TestCase
$this->newPrivateConnection('private-channel'); $this->newPrivateConnection('private-channel');
$this->newPrivateConnection('private-channel-2'); $this->newPrivateConnection('private-channel-2');
$this->channelManager->getLocalConnections()->then(function ($connections) { $this->channelManager
$this->assertCount(2, $connections); ->getLocalConnections()
->then(function ($connections) {
$this->assertCount(2, $connections);
foreach ($connections as $connection) { foreach ($connections as $connection) {
$this->assertInstanceOf( $this->assertInstanceOf(
ConnectionInterface::class, $connection ConnectionInterface::class, $connection
); );
} }
}); });
} }
public function test_events_are_processed_by_on_message_on_private_channels() public function test_events_are_processed_by_on_message_on_private_channels()
@ -208,10 +220,11 @@ class PrivateChannelTest extends TestCase
$this->getSubscribeClient() $this->getSubscribeClient()
->assertNothingDispatched(); ->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()
$this->channelManager->getRedisKey('1234', 'private-channel'), ->assertCalledWithArgs('publish', [
$message->getPayload(), $this->channelManager->getRedisKey('1234', 'private-channel'),
]); $message->getPayload(),
]);
} }
public function test_it_fires_the_event_to_private_channel() public function test_it_fires_the_event_to_private_channel()
@ -245,14 +258,16 @@ class PrivateChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
$this->assertEquals([ ->getAppStatistics('1234')
'peak_connections_count' => 1, ->then(function ($statistic) {
'websocket_messages_count' => 1, $this->assertEquals([
'api_messages_count' => 1, 'peak_connections_count' => 1,
'app_id' => '1234', 'websocket_messages_count' => 1,
], $statistic->toArray()); 'api_messages_count' => 1,
}); 'app_id' => '1234',
], $statistic->toArray());
});
} }
public function test_it_fires_event_across_servers_when_there_are_not_users_locally_for_private_channel() public function test_it_fires_event_across_servers_when_there_are_not_users_locally_for_private_channel()
@ -285,17 +300,19 @@ class PrivateChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
$this->channelManager->getRedisKey('1234', 'private-channel'), ->getPublishClient()
json_encode([ ->assertCalledWithArgsCount(1, 'publish', [
'event' => 'some-event', $this->channelManager->getRedisKey('1234', 'private-channel'),
'channel' => 'private-channel', json_encode([
'data' => json_encode(['some-data' => 'yes']), 'event' => 'some-event',
'appId' => '1234', 'channel' => 'private-channel',
'socketId' => null, 'data' => json_encode(['some-data' => 'yes']),
'serverId' => $this->channelManager->getServerId(), 'appId' => '1234',
]), 'socketId' => null,
]); 'serverId' => $this->channelManager->getServerId(),
]),
]);
} }
} }
@ -331,17 +348,19 @@ class PrivateChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
$this->channelManager->getRedisKey('1234', 'private-channel'), ->getPublishClient()
json_encode([ ->assertCalledWithArgsCount(1, 'publish', [
'event' => 'some-event', $this->channelManager->getRedisKey('1234', 'private-channel'),
'channel' => 'private-channel', json_encode([
'data' => json_encode(['some-data' => 'yes']), 'event' => 'some-event',
'appId' => '1234', 'channel' => 'private-channel',
'socketId' => null, 'data' => json_encode(['some-data' => 'yes']),
'serverId' => $this->channelManager->getServerId(), 'appId' => '1234',
]), 'socketId' => null,
]); 'serverId' => $this->channelManager->getServerId(),
]),
]);
} }
$wsConnection->assertSentEvent('some-event', [ $wsConnection->assertSentEvent('some-event', [

View File

@ -14,9 +14,11 @@ class PublicChannelTest extends TestCase
{ {
$connection = $this->newActiveConnection(['public-channel']); $connection = $this->newActiveConnection(['public-channel']);
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(1, $total); ->getGlobalConnectionsCount('1234', 'public-channel')
}); ->then(function ($total) {
$this->assertEquals(1, $total);
});
$connection->assertSentEvent( $connection->assertSentEvent(
'pusher:connection_established', 'pusher:connection_established',
@ -38,9 +40,11 @@ class PublicChannelTest extends TestCase
{ {
$connection = $this->newActiveConnection(['public-channel']); $connection = $this->newActiveConnection(['public-channel']);
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(1, $total); ->getGlobalConnectionsCount('1234', 'public-channel')
}); ->then(function ($total) {
$this->assertEquals(1, $total);
});
$message = new Mocks\Message([ $message = new Mocks\Message([
'event' => 'pusher:unsubscribe', 'event' => 'pusher:unsubscribe',
@ -51,9 +55,11 @@ class PublicChannelTest extends TestCase
$this->pusherServer->onMessage($connection, $message); $this->pusherServer->onMessage($connection, $message);
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($total) { $this->channelManager
$this->assertEquals(0, $total); ->getGlobalConnectionsCount('1234', 'public-channel')
}); ->then(function ($total) {
$this->assertEquals(0, $total);
});
} }
public function test_can_whisper_to_public_channel() public function test_can_whisper_to_public_channel()
@ -97,18 +103,22 @@ class PublicChannelTest extends TestCase
$rick = $this->newActiveConnection(['public-channel']); $rick = $this->newActiveConnection(['public-channel']);
$morty = $this->newActiveConnection(['public-channel']); $morty = $this->newActiveConnection(['public-channel']);
$this->statisticsCollector->getStatistics()->then(function ($statistics) { $this->statisticsCollector
$this->assertCount(1, $statistics); ->getStatistics()
}); ->then(function ($statistics) {
$this->assertCount(1, $statistics);
});
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
$this->assertEquals([ ->getAppStatistics('1234')
'peak_connections_count' => 2, ->then(function ($statistic) {
'websocket_messages_count' => 2, $this->assertEquals([
'api_messages_count' => 0, 'peak_connections_count' => 2,
'app_id' => '1234', 'websocket_messages_count' => 2,
], $statistic->toArray()); 'api_messages_count' => 0,
}); 'app_id' => '1234',
], $statistic->toArray());
});
} }
public function test_local_connections_for_public_channels() public function test_local_connections_for_public_channels()
@ -116,15 +126,17 @@ class PublicChannelTest extends TestCase
$this->newActiveConnection(['public-channel']); $this->newActiveConnection(['public-channel']);
$this->newActiveConnection(['public-channel-2']); $this->newActiveConnection(['public-channel-2']);
$this->channelManager->getLocalConnections()->then(function ($connections) { $this->channelManager
$this->assertCount(2, $connections); ->getLocalConnections()
->then(function ($connections) {
$this->assertCount(2, $connections);
foreach ($connections as $connection) { foreach ($connections as $connection) {
$this->assertInstanceOf( $this->assertInstanceOf(
ConnectionInterface::class, $connection ConnectionInterface::class, $connection
); );
} }
}); });
} }
public function test_events_are_processed_by_on_message_on_public_channels() public function test_events_are_processed_by_on_message_on_public_channels()
@ -189,10 +201,11 @@ class PublicChannelTest extends TestCase
$this->getSubscribeClient() $this->getSubscribeClient()
->assertNothingDispatched(); ->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()
$this->channelManager->getRedisKey('1234', 'public-channel'), ->assertCalledWithArgs('publish', [
$message->getPayload(), $this->channelManager->getRedisKey('1234', 'public-channel'),
]); $message->getPayload(),
]);
} }
public function test_it_fires_the_event_to_public_channel() public function test_it_fires_the_event_to_public_channel()
@ -226,14 +239,16 @@ class PublicChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
$this->statisticsCollector->getAppStatistics('1234')->then(function ($statistic) { $this->statisticsCollector
$this->assertEquals([ ->getAppStatistics('1234')
'peak_connections_count' => 1, ->then(function ($statistic) {
'websocket_messages_count' => 1, $this->assertEquals([
'api_messages_count' => 1, 'peak_connections_count' => 1,
'app_id' => '1234', 'websocket_messages_count' => 1,
], $statistic->toArray()); 'api_messages_count' => 1,
}); 'app_id' => '1234',
], $statistic->toArray());
});
} }
public function test_it_fires_event_across_servers_when_there_are_not_users_locally_for_public_channel() public function test_it_fires_event_across_servers_when_there_are_not_users_locally_for_public_channel()
@ -266,17 +281,19 @@ class PublicChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
$this->channelManager->getRedisKey('1234', 'public-channel'), ->getPublishClient()
json_encode([ ->assertCalledWithArgsCount(1, 'publish', [
'event' => 'some-event', $this->channelManager->getRedisKey('1234', 'public-channel'),
'channel' => 'public-channel', json_encode([
'data' => json_encode(['some-data' => 'yes']), 'event' => 'some-event',
'appId' => '1234', 'channel' => 'public-channel',
'socketId' => null, 'data' => json_encode(['some-data' => 'yes']),
'serverId' => $this->channelManager->getServerId(), 'appId' => '1234',
]), 'socketId' => null,
]); 'serverId' => $this->channelManager->getServerId(),
]),
]);
} }
} }
@ -312,17 +329,19 @@ class PublicChannelTest extends TestCase
$this->assertSame([], json_decode($response->getContent(), true)); $this->assertSame([], json_decode($response->getContent(), true));
if (method_exists($this->channelManager, 'getPublishClient')) { if (method_exists($this->channelManager, 'getPublishClient')) {
$this->channelManager->getPublishClient()->assertCalledWithArgsCount(1, 'publish', [ $this->channelManager
$this->channelManager->getRedisKey('1234', 'public-channel'), ->getPublishClient()
json_encode([ ->assertCalledWithArgsCount(1, 'publish', [
'event' => 'some-event', $this->channelManager->getRedisKey('1234', 'public-channel'),
'channel' => 'public-channel', json_encode([
'data' => json_encode(['some-data' => 'yes']), 'event' => 'some-event',
'appId' => '1234', 'channel' => 'public-channel',
'socketId' => null, 'data' => json_encode(['some-data' => 'yes']),
'serverId' => $this->channelManager->getServerId(), 'appId' => '1234',
]), 'socketId' => null,
]); 'serverId' => $this->channelManager->getServerId(),
]),
]);
} }
$wsConnection->assertSentEvent('some-event', [ $wsConnection->assertSentEvent('some-event', [

View File

@ -19,23 +19,31 @@ class RedisPongRemovalTest extends TestCase
// Make the connection look like it was lost 1 day ago. // Make the connection look like it was lost 1 day ago.
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(2, $count); ->getGlobalConnectionsCount('1234', 'public-channel')
}); ->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
$this->assertCount(1, $expiredConnections); ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
}); ->then(function ($expiredConnections) {
$this->assertCount(1, $expiredConnections);
});
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'public-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(1, $count); ->getGlobalConnectionsCount('1234', 'public-channel')
}); ->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
$this->assertCount(0, $expiredConnections); ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
}); ->then(function ($expiredConnections) {
$this->assertCount(0, $expiredConnections);
});
} }
public function test_not_ponged_connections_do_get_removed_on_redis_for_private_channels() public function test_not_ponged_connections_do_get_removed_on_redis_for_private_channels()
@ -51,23 +59,31 @@ class RedisPongRemovalTest extends TestCase
// Make the connection look like it was lost 1 day ago. // Make the connection look like it was lost 1 day ago.
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(2, $count); ->getGlobalConnectionsCount('1234', 'private-channel')
}); ->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
$this->assertCount(1, $expiredConnections); ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
}); ->then(function ($expiredConnections) {
$this->assertCount(1, $expiredConnections);
});
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'private-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(1, $count); ->getGlobalConnectionsCount('1234', 'private-channel')
}); ->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
$this->assertCount(0, $expiredConnections); ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
}); ->then(function ($expiredConnections) {
$this->assertCount(0, $expiredConnections);
});
} }
public function test_not_ponged_connections_do_get_removed_on_redis_for_presence_channels() public function test_not_ponged_connections_do_get_removed_on_redis_for_presence_channels()
@ -83,30 +99,42 @@ class RedisPongRemovalTest extends TestCase
// Make the connection look like it was lost 1 day ago. // Make the connection look like it was lost 1 day ago.
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1)); $this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(2, $count); ->getGlobalConnectionsCount('1234', 'presence-channel')
}); ->then(function ($count) {
$this->assertEquals(2, $count);
});
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
$this->assertCount(1, $expiredConnections); ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
}); ->then(function ($expiredConnections) {
$this->assertCount(1, $expiredConnections);
});
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
$this->assertCount(2, $members); ->getChannelMembers('1234', 'presence-channel')
}); ->then(function ($members) {
$this->assertCount(2, $members);
});
$this->channelManager->removeObsoleteConnections(); $this->channelManager->removeObsoleteConnections();
$this->channelManager->getGlobalConnectionsCount('1234', 'presence-channel')->then(function ($count) { $this->channelManager
$this->assertEquals(1, $count); ->getGlobalConnectionsCount('1234', 'presence-channel')
}); ->then(function ($count) {
$this->assertEquals(1, $count);
});
$this->channelManager->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))->then(function ($expiredConnections) { $this->channelManager
$this->assertCount(0, $expiredConnections); ->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
}); ->then(function ($expiredConnections) {
$this->assertCount(0, $expiredConnections);
});
$this->channelManager->getChannelMembers('1234', 'presence-channel')->then(function ($members) { $this->channelManager
$this->assertCount(1, $members); ->getChannelMembers('1234', 'presence-channel')
}); ->then(function ($members) {
$this->assertCount(1, $members);
});
} }
} }