Revert "wip formatting"

This reverts commit 19ca49a4a8.
This commit is contained in:
Alex Renoki 2020-12-07 23:30:12 +02:00
parent 19ca49a4a8
commit b74144cdd5
11 changed files with 74 additions and 50 deletions

View File

@ -176,7 +176,8 @@ abstract class Controller implements HttpServerInterface
$laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest)); $laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest));
$this->ensureValidAppId($laravelRequest->appId) $this
->ensureValidAppId($laravelRequest->appId)
->ensureValidSignature($laravelRequest); ->ensureValidSignature($laravelRequest);
// Invoke the controller action // Invoke the controller action

View File

@ -30,9 +30,11 @@ class ConfigAppManager implements AppManager
*/ */
public function all(): array public function all(): array
{ {
return $this->apps->map(function (array $appAttributes) { return $this->apps
->map(function (array $appAttributes) {
return $this->convertIntoApp($appAttributes); return $this->convertIntoApp($appAttributes);
})->toArray(); })
->toArray();
} }
/** /**
@ -104,7 +106,8 @@ class ConfigAppManager implements AppManager
$app->setPath($appAttributes['path']); $app->setPath($appAttributes['path']);
} }
$app->enableClientMessages($appAttributes['enable_client_messages']) $app
->enableClientMessages($appAttributes['enable_client_messages'])
->enableStatistics($appAttributes['enable_statistics']) ->enableStatistics($appAttributes['enable_statistics'])
->setCapacity($appAttributes['capacity'] ?? null) ->setCapacity($appAttributes['capacity'] ?? null)
->setAllowedOrigins($appAttributes['allowed_origins'] ?? []); ->setAllowedOrigins($appAttributes['allowed_origins'] ?? []);

View File

@ -111,12 +111,16 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getLocalConnections(): PromiseInterface public function getLocalConnections(): PromiseInterface
{ {
$connections = collect($this->channels)->map(function ($channelsWithConnections, $appId) { $connections = collect($this->channels)
->map(function ($channelsWithConnections, $appId) {
return collect($channelsWithConnections)->values(); return collect($channelsWithConnections)->values();
})->values()->collapse() })
->values()->collapse()
->map(function ($channel) { ->map(function ($channel) {
return collect($channel->getConnections()); return collect($channel->getConnections());
})->values()->collapse()->toArray(); })
->values()->collapse()
->toArray();
return Helpers::createFulfilledPromise($connections); return Helpers::createFulfilledPromise($connections);
} }
@ -162,7 +166,9 @@ class LocalChannelManager implements ChannelManager
$this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) { $this->getLocalChannels($connection->app->id)->then(function ($channels) use ($connection) {
collect($channels)->each->unsubscribe($connection); collect($channels)->each->unsubscribe($connection);
collect($channels)->reject->hasConnections()->each(function (Channel $channel, string $channelName) use ($connection) { collect($channels)
->reject->hasConnections()
->each(function (Channel $channel, string $channelName) use ($connection) {
unset($this->channels[$connection->app->id][$channelName]); unset($this->channels[$connection->app->id][$channelName]);
}); });
}); });
@ -249,9 +255,11 @@ class LocalChannelManager implements ChannelManager
return $collection->filter(function (Channel $channel) use ($channelName) { return $collection->filter(function (Channel $channel) use ($channelName) {
return $channel->getName() === $channelName; return $channel->getName() === $channelName;
}); });
})->flatMap(function (Channel $channel) { })
->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId'); return collect($channel->getConnections())->pluck('socketId');
})->unique()->count(); })
->unique()->count();
}); });
} }
@ -370,7 +378,8 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface
{ {
$results = collect($channelNames)->reduce(function ($results, $channel) use ($appId) { $results = collect($channelNames)
->reduce(function ($results, $channel) use ($appId) {
$results[$channel] = isset($this->users["{$appId}:{$channel}"]) $results[$channel] = isset($this->users["{$appId}:{$channel}"])
? count($this->users["{$appId}:{$channel}"]) ? count($this->users["{$appId}:{$channel}"])
: 0; : 0;

View File

@ -412,7 +412,8 @@ class RedisChannelManager extends LocalChannelManager
public function removeObsoleteConnections(): PromiseInterface public function removeObsoleteConnections(): PromiseInterface
{ {
$this->lock()->get(function () { $this->lock()->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))->then(function ($connections) { $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) {
foreach ($connections as $socketId => $appId) { foreach ($connections as $socketId => $appId) {
$connection = $this->fakeConnectionForApp($appId, $socketId); $connection = $this->fakeConnectionForApp($appId, $socketId);

View File

@ -155,7 +155,8 @@ class Channel
*/ */
public function broadcast($appId, stdClass $payload, bool $replicate = true): bool public function broadcast($appId, stdClass $payload, bool $replicate = true): bool
{ {
collect($this->getConnections())->each->send(json_encode($payload)); collect($this->getConnections())
->each->send(json_encode($payload));
if ($replicate) { if ($replicate) {
$this->channelManager->broadcastAcrossServers($appId, null, $this->getName(), $payload); $this->channelManager->broadcastAcrossServers($appId, null, $this->getName(), $payload);

View File

@ -43,7 +43,8 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function webSocketMessage($appId) public function webSocketMessage($appId)
{ {
$this->findOrMake($appId)->webSocketMessage(); $this->findOrMake($appId)
->webSocketMessage();
} }
/** /**
@ -54,7 +55,8 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function apiMessage($appId) public function apiMessage($appId)
{ {
$this->findOrMake($appId)->apiMessage(); $this->findOrMake($appId)
->apiMessage();
} }
/** /**
@ -65,7 +67,8 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function connection($appId) public function connection($appId)
{ {
$this->findOrMake($appId)->connection(); $this->findOrMake($appId)
->connection();
} }
/** /**
@ -76,7 +79,8 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function disconnection($appId) public function disconnection($appId)
{ {
$this->findOrMake($appId)->disconnection(); $this->findOrMake($appId)
->disconnection();
} }
/** /**

View File

@ -55,9 +55,8 @@ class RedisCollector extends MemoryCollector
*/ */
public function webSocketMessage($appId) public function webSocketMessage($appId)
{ {
$this->ensureAppIsInSet($appId)->hincrby( $this->ensureAppIsInSet($appId)
$this->channelManager->getRedisKey($appId, null, ['stats']), 'websocket_messages_count', 1 ->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'websocket_messages_count', 1);
);
} }
/** /**
@ -68,9 +67,8 @@ class RedisCollector extends MemoryCollector
*/ */
public function apiMessage($appId) public function apiMessage($appId)
{ {
$this->ensureAppIsInSet($appId)->hincrby( $this->ensureAppIsInSet($appId)
$this->channelManager->getRedisKey($appId, null, ['stats']), 'api_messages_count', 1 ->hincrby($this->channelManager->getRedisKey($appId, null, ['stats']), 'api_messages_count', 1);
);
} }
/** /**

View File

@ -42,7 +42,8 @@ class DatabaseStore implements StatisticsStore
return static::$model::where('created_at', '<', $moment->toDateTimeString()) return static::$model::where('created_at', '<', $moment->toDateTimeString())
->when(! is_null($appId), function ($query) use ($appId) { ->when(! is_null($appId), function ($query) use ($appId) {
return $query->whereAppId($appId); return $query->whereAppId($appId);
})->delete(); })
->delete();
} }
/** /**
@ -53,7 +54,8 @@ class DatabaseStore implements StatisticsStore
*/ */
public function getRawRecords(callable $processQuery = null) public function getRawRecords(callable $processQuery = null)
{ {
return static::$model::query()->when(! is_null($processQuery), function ($query) use ($processQuery) { return static::$model::query()
->when(! is_null($processQuery), function ($query) use ($processQuery) {
return call_user_func($processQuery, $query); return call_user_func($processQuery, $query);
}, function ($query) { }, function ($query) {
return $query->latest()->limit(120); return $query->latest()->limit(120);
@ -72,9 +74,11 @@ class DatabaseStore implements StatisticsStore
return $this->getRawRecords($processQuery) return $this->getRawRecords($processQuery)
->when(! is_null($processCollection), function ($collection) use ($processCollection) { ->when(! is_null($processCollection), function ($collection) use ($processCollection) {
return call_user_func($processCollection, $collection); return call_user_func($processCollection, $collection);
})->map(function (Model $statistic) { })
->map(function (Model $statistic) {
return $this->statisticToArray($statistic); return $this->statisticToArray($statistic);
})->toArray(); })
->toArray();
} }
/** /**

View File

@ -371,7 +371,8 @@ class PresenceChannelTest extends TestCase
$receiver->assertSentEvent('some-event', $message->getPayloadAsArray()); $receiver->assertSentEvent('some-event', $message->getPayloadAsArray());
$this->getSubscribeClient()->assertNothingDispatched(); $this->getSubscribeClient()
->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()->assertCalledWithArgs('publish', [
$this->channelManager->getRedisKey('1234', 'presence-channel'), $this->channelManager->getRedisKey('1234', 'presence-channel'),

View File

@ -205,7 +205,8 @@ class PrivateChannelTest extends TestCase
$receiver->assertSentEvent('some-event', $message->getPayloadAsArray()); $receiver->assertSentEvent('some-event', $message->getPayloadAsArray());
$this->getSubscribeClient()->assertNothingDispatched(); $this->getSubscribeClient()
->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()->assertCalledWithArgs('publish', [
$this->channelManager->getRedisKey('1234', 'private-channel'), $this->channelManager->getRedisKey('1234', 'private-channel'),

View File

@ -186,7 +186,8 @@ class PublicChannelTest extends TestCase
$receiver->assertSentEvent('some-event', $message->getPayloadAsArray()); $receiver->assertSentEvent('some-event', $message->getPayloadAsArray());
$this->getSubscribeClient()->assertNothingDispatched(); $this->getSubscribeClient()
->assertNothingDispatched();
$this->getPublishClient()->assertCalledWithArgs('publish', [ $this->getPublishClient()->assertCalledWithArgs('publish', [
$this->channelManager->getRedisKey('1234', 'public-channel'), $this->channelManager->getRedisKey('1234', 'public-channel'),