wip formatting

This commit is contained in:
Alex Renoki 2020-12-07 20:48:15 +02:00
parent 908f147cb3
commit 19ca49a4a8
11 changed files with 50 additions and 74 deletions

View File

@ -176,8 +176,7 @@ abstract class Controller implements HttpServerInterface
$laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest)); $laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest));
$this $this->ensureValidAppId($laravelRequest->appId)
->ensureValidAppId($laravelRequest->appId)
->ensureValidSignature($laravelRequest); ->ensureValidSignature($laravelRequest);
// Invoke the controller action // Invoke the controller action

View File

@ -30,11 +30,9 @@ class ConfigAppManager implements AppManager
*/ */
public function all(): array public function all(): array
{ {
return $this->apps return $this->apps->map(function (array $appAttributes) {
->map(function (array $appAttributes) { return $this->convertIntoApp($appAttributes);
return $this->convertIntoApp($appAttributes); })->toArray();
})
->toArray();
} }
/** /**
@ -106,8 +104,7 @@ class ConfigAppManager implements AppManager
$app->setPath($appAttributes['path']); $app->setPath($appAttributes['path']);
} }
$app $app->enableClientMessages($appAttributes['enable_client_messages'])
->enableClientMessages($appAttributes['enable_client_messages'])
->enableStatistics($appAttributes['enable_statistics']) ->enableStatistics($appAttributes['enable_statistics'])
->setCapacity($appAttributes['capacity'] ?? null) ->setCapacity($appAttributes['capacity'] ?? null)
->setAllowedOrigins($appAttributes['allowed_origins'] ?? []); ->setAllowedOrigins($appAttributes['allowed_origins'] ?? []);

View File

@ -111,16 +111,12 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getLocalConnections(): PromiseInterface public function getLocalConnections(): PromiseInterface
{ {
$connections = collect($this->channels) $connections = collect($this->channels)->map(function ($channelsWithConnections, $appId) {
->map(function ($channelsWithConnections, $appId) { return collect($channelsWithConnections)->values();
return collect($channelsWithConnections)->values(); })->values()->collapse()
}) ->map(function ($channel) {
->values()->collapse() return collect($channel->getConnections());
->map(function ($channel) { })->values()->collapse()->toArray();
return collect($channel->getConnections());
})
->values()->collapse()
->toArray();
return Helpers::createFulfilledPromise($connections); return Helpers::createFulfilledPromise($connections);
} }
@ -166,11 +162,9 @@ class LocalChannelManager implements ChannelManager
$this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { $this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) {
collect($channels)->each->unsubscribe($connection); collect($channels)->each->unsubscribe($connection);
collect($channels) collect($channels)->reject->hasConnections()->each(function (Channel $channel, string $channelName) use ($connection) {
->reject->hasConnections() unset($this->channels[$connection->app->id][$channelName]);
->each(function (Channel $channel, string $channelName) use ($connection) { });
unset($this->channels[$connection->app->id][$channelName]);
});
}); });
$this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { $this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) {
@ -255,11 +249,9 @@ class LocalChannelManager implements ChannelManager
return $collection->filter(function (Channel $channel) use ($channelName) { return $collection->filter(function (Channel $channel) use ($channelName) {
return $channel->getName() === $channelName; return $channel->getName() === $channelName;
}); });
}) })->flatMap(function (Channel $channel) {
->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId'); return collect($channel->getConnections())->pluck('socketId');
}) })->unique()->count();
->unique()->count();
}); });
} }
@ -378,14 +370,13 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface
{ {
$results = collect($channelNames) $results = collect($channelNames)->reduce(function ($results, $channel) use ($appId) {
->reduce(function ($results, $channel) use ($appId) { $results[$channel] = isset($this->users["{$appId}:{$channel}"])
$results[$channel] = isset($this->users["{$appId}:{$channel}"]) ? count($this->users["{$appId}:{$channel}"])
? count($this->users["{$appId}:{$channel}"]) : 0;
: 0;
return $results; return $results;
}, []); }, []);
return Helpers::createFulfilledPromise($results); return Helpers::createFulfilledPromise($results);
} }

View File

@ -412,14 +412,13 @@ class RedisChannelManager extends LocalChannelManager
public function removeObsoleteConnections(): PromiseInterface public function removeObsoleteConnections(): PromiseInterface
{ {
$this->lock()->get(function () { $this->lock()->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))->then(function ($connections) {
->then(function ($connections) { foreach ($connections as $socketId => $appId) {
foreach ($connections as $socketId => $appId) { $connection = $this->fakeConnectionForApp($appId, $socketId);
$connection = $this->fakeConnectionForApp($appId, $socketId);
$this->unsubscribeFromAllChannels($connection); $this->unsubscribeFromAllChannels($connection);
} }
}); });
}); });
return parent::removeObsoleteConnections(); return parent::removeObsoleteConnections();

View File

@ -155,8 +155,7 @@ class Channel
*/ */
public function broadcast($appId, stdClass $payload, bool $replicate = true): bool public function broadcast($appId, stdClass $payload, bool $replicate = true): bool
{ {
collect($this->getConnections()) collect($this->getConnections())->each->send(json_encode($payload));
->each->send(json_encode($payload));
if ($replicate) { if ($replicate) {
$this->channelManager->broadcastAcrossServers($appId, null, $this->getName(), $payload); $this->channelManager->broadcastAcrossServers($appId, null, $this->getName(), $payload);

View File

@ -43,8 +43,7 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function webSocketMessage($appId) public function webSocketMessage($appId)
{ {
$this->findOrMake($appId) $this->findOrMake($appId)->webSocketMessage();
->webSocketMessage();
} }
/** /**
@ -55,8 +54,7 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function apiMessage($appId) public function apiMessage($appId)
{ {
$this->findOrMake($appId) $this->findOrMake($appId)->apiMessage();
->apiMessage();
} }
/** /**
@ -67,8 +65,7 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function connection($appId) public function connection($appId)
{ {
$this->findOrMake($appId) $this->findOrMake($appId)->connection();
->connection();
} }
/** /**
@ -79,8 +76,7 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function disconnection($appId) public function disconnection($appId)
{ {
$this->findOrMake($appId) $this->findOrMake($appId)->disconnection();
->disconnection();
} }
/** /**

View File

@ -55,8 +55,9 @@ class RedisCollector extends MemoryCollector
*/ */
public function webSocketMessage($appId) public function webSocketMessage($appId)
{ {
$this->ensureAppIsInSet($appId) $this->ensureAppIsInSet($appId)->hincrby(
->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'websocket_messages_count', 1); $this->channelManager->getRedisKey($appId, null, ['stats']), 'websocket_messages_count', 1
);
} }
/** /**
@ -67,8 +68,9 @@ class RedisCollector extends MemoryCollector
*/ */
public function apiMessage($appId) public function apiMessage($appId)
{ {
$this->ensureAppIsInSet($appId) $this->ensureAppIsInSet($appId)->hincrby(
->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'api_messages_count', 1); $this->channelManager->getRedisKey($appId, null, ['stats']), 'api_messages_count', 1
);
} }
/** /**

View File

@ -42,8 +42,7 @@ class DatabaseStore implements StatisticsStore
return static::$model::where('created_at', '<', $moment->toDateTimeString()) return static::$model::where('created_at', '<', $moment->toDateTimeString())
->when(! is_null($appId), function ($query) use ($appId) { ->when(! is_null($appId), function ($query) use ($appId) {
return $query->whereAppId($appId); return $query->whereAppId($appId);
}) })->delete();
->delete();
} }
/** /**
@ -54,12 +53,11 @@ class DatabaseStore implements StatisticsStore
*/ */
public function getRawRecords(callable $processQuery = null) public function getRawRecords(callable $processQuery = null)
{ {
return static::$model::query() return static::$model::query()->when(! is_null($processQuery), function ($query) use ($processQuery) {
->when(! is_null($processQuery), function ($query) use ($processQuery) { return call_user_func($processQuery, $query);
return call_user_func($processQuery, $query); }, function ($query) {
}, function ($query) { return $query->latest()->limit(120);
return $query->latest()->limit(120); })->get();
})->get();
} }
/** /**
@ -74,11 +72,9 @@ class DatabaseStore implements StatisticsStore
return $this->getRawRecords($processQuery) return $this->getRawRecords($processQuery)
->when(! is_null($processCollection), function ($collection) use ($processCollection) { ->when(! is_null($processCollection), function ($collection) use ($processCollection) {
return call_user_func($processCollection, $collection); return call_user_func($processCollection, $collection);
}) })->map(function (Model $statistic) {
->map(function (Model $statistic) {
return $this->statisticToArray($statistic); return $this->statisticToArray($statistic);
}) })->toArray();
->toArray();
} }
/** /**

View File

@ -371,8 +371,7 @@ class PresenceChannelTest extends TestCase
$receiver->assertSentEvent('some-event', $message->getPayloadAsArray()); $receiver->assertSentEvent('some-event', $message->getPayloadAsArray());
$this->getSubscribeClient() $this->getSubscribeClient()->assertNothingDispatched();
->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()->assertCalledWithArgs('publish', [
$this->channelManager->getRedisKey('1234', 'presence-channel'), $this->channelManager->getRedisKey('1234', 'presence-channel'),

View File

@ -205,8 +205,7 @@ class PrivateChannelTest extends TestCase
$receiver->assertSentEvent('some-event', $message->getPayloadAsArray()); $receiver->assertSentEvent('some-event', $message->getPayloadAsArray());
$this->getSubscribeClient() $this->getSubscribeClient()->assertNothingDispatched();
->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()->assertCalledWithArgs('publish', [
$this->channelManager->getRedisKey('1234', 'private-channel'), $this->channelManager->getRedisKey('1234', 'private-channel'),

View File

@ -186,8 +186,7 @@ class PublicChannelTest extends TestCase
$receiver->assertSentEvent('some-event', $message->getPayloadAsArray()); $receiver->assertSentEvent('some-event', $message->getPayloadAsArray());
$this->getSubscribeClient() $this->getSubscribeClient()->assertNothingDispatched();
->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()->assertCalledWithArgs('publish', [
$this->channelManager->getRedisKey('1234', 'public-channel'), $this->channelManager->getRedisKey('1234', 'public-channel'),