Merge pull request #544 from beyondcode/feature/promises

[2.x] Ensure promises run one-after-another
This commit is contained in:
rennokki 2020-09-19 12:42:19 +00:00 committed by GitHub
commit 1f6e714873
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 393 additions and 304 deletions

View File

@ -279,4 +279,19 @@ return [
],
/*
|--------------------------------------------------------------------------
| Promise Resolver
|--------------------------------------------------------------------------
|
| The promise resolver is a class that takes a input value and is
| able to make sure the PHP code runs async by using ->then(). You can
| use your own Promise Resolver. This is usually changed when you want to
| intercept values by the promises throughout the app, like in testing
| to switch from async to sync.
|
*/
'promise_resolver' => \React\Promise\FulfilledPromise::class,
];

View File

@ -64,11 +64,9 @@ class FetchChannels extends Controller
}
return $info;
})
->sortBy(function ($content, $name) {
})->sortBy(function ($content, $name) {
return $name;
})
->all();
})->all();
return [
'channels' => $channels ?: new stdClass,

View File

@ -6,10 +6,10 @@ use BeyondCode\LaravelWebSockets\Channels\Channel;
use BeyondCode\LaravelWebSockets\Channels\PresenceChannel;
use BeyondCode\LaravelWebSockets\Channels\PrivateChannel;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use BeyondCode\LaravelWebSockets\Helpers;
use Illuminate\Support\Str;
use Ratchet\ConnectionInterface;
use React\EventLoop\LoopInterface;
use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface;
use stdClass;
@ -104,7 +104,7 @@ class LocalChannelManager implements ChannelManager
->values()->collapse()
->toArray();
return new FulfilledPromise($connections);
return Helpers::createFulfilledPromise($connections);
}
/**
@ -116,7 +116,7 @@ class LocalChannelManager implements ChannelManager
*/
public function getLocalChannels($appId): PromiseInterface
{
return new FulfilledPromise(
return Helpers::createFulfilledPromise(
$this->channels[$appId] ?? []
);
}
@ -137,12 +137,12 @@ class LocalChannelManager implements ChannelManager
* Remove connection from all channels.
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
* @return PromiseInterface[bool]
*/
public function unsubscribeFromAllChannels(ConnectionInterface $connection)
public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
{
if (! isset($connection->app)) {
return;
return new FuilfilledPromise(false);
}
$this->getLocalChannels($connection->app->id)
@ -162,6 +162,8 @@ class LocalChannelManager implements ChannelManager
unset($this->channels[$connection->app->id]);
}
});
return Helpers::createFulfilledPromise(true);
}
/**
@ -170,13 +172,15 @@ class LocalChannelManager implements ChannelManager
* @param \Ratchet\ConnectionInterface $connection
* @param string $channelName
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload)
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{
$channel = $this->findOrCreate($connection->app->id, $channelName);
$channel->subscribe($connection, $payload);
return Helpers::createFulfilledPromise(
$channel->subscribe($connection, $payload)
);
}
/**
@ -185,35 +189,39 @@ class LocalChannelManager implements ChannelManager
* @param \Ratchet\ConnectionInterface $connection
* @param string $channelName
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload)
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{
$channel = $this->findOrCreate($connection->app->id, $channelName);
$channel->unsubscribe($connection, $payload);
return Helpers::createFulfilledPromise(
$channel->unsubscribe($connection, $payload)
);
}
/**
* Subscribe the connection to a specific channel.
* Subscribe the connection to a specific channel, returning
* a promise containing the amount of connections.
*
* @param string|int $appId
* @return void
* @return PromiseInterface[int]
*/
public function subscribeToApp($appId)
public function subscribeToApp($appId): PromiseInterface
{
//
return Helpers::createFulfilledPromise(0);
}
/**
* Unsubscribe the connection from the channel.
* Unsubscribe the connection from the channel, returning
* a promise containing the amount of connections after decrement.
*
* @param string|int $appId
* @return void
* @return PromiseInterface[int]
*/
public function unsubscribeFromApp($appId)
public function unsubscribeFromApp($appId): PromiseInterface
{
//
return Helpers::createFulfilledPromise(0);
}
/**
@ -222,23 +230,21 @@ class LocalChannelManager implements ChannelManager
*
* @param string|int $appId
* @param string|null $channelName
* @return \React\Promise\PromiseInterface
* @return PromiseInterface[int]
*/
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface
{
return $this->getLocalChannels($appId)
->then(function ($channels) use ($channelName) {
return collect($channels)
->when(! is_null($channelName), function ($collection) use ($channelName) {
return $collection->filter(function (Channel $channel) use ($channelName) {
return $channel->getName() === $channelName;
});
})
->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId');
})
->unique()
->count();
return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) {
return $collection->filter(function (Channel $channel) use ($channelName) {
return $channel->getName() === $channelName;
});
})
->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId');
})
->unique()->count();
});
}
@ -248,7 +254,7 @@ class LocalChannelManager implements ChannelManager
*
* @param string|int $appId
* @param string|null $channelName
* @return \React\Promise\PromiseInterface
* @return PromiseInterface[int]
*/
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
{
@ -263,11 +269,11 @@ class LocalChannelManager implements ChannelManager
* @param string $channel
* @param stdClass $payload
* @param string|null $serverId
* @return bool
* @return PromiseInterface[bool]
*/
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null)
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface
{
return true;
return Helpers::createFulfilledPromise(true);
}
/**
@ -277,12 +283,14 @@ class LocalChannelManager implements ChannelManager
* @param stdClass $user
* @param string $channel
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload)
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface
{
$this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user);
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId;
return Helpers::createFulfilledPromise(true);
}
/**
@ -292,9 +300,9 @@ class LocalChannelManager implements ChannelManager
* @param stdClass $user
* @param string $channel
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel)
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface
{
unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]);
@ -310,6 +318,8 @@ class LocalChannelManager implements ChannelManager
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]);
}
}
return Helpers::createFulfilledPromise(true);
}
/**
@ -327,7 +337,7 @@ class LocalChannelManager implements ChannelManager
return json_decode($user);
})->unique('user_id')->toArray();
return new FulfilledPromise($members);
return Helpers::createFulfilledPromise($members);
}
/**
@ -341,7 +351,7 @@ class LocalChannelManager implements ChannelManager
{
$member = $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] ?? null;
return new FulfilledPromise($member);
return Helpers::createFulfilledPromise($member);
}
/**
@ -362,7 +372,7 @@ class LocalChannelManager implements ChannelManager
return $results;
}, []);
return new FulfilledPromise($results);
return Helpers::createFulfilledPromise($results);
}
/**
@ -375,7 +385,7 @@ class LocalChannelManager implements ChannelManager
*/
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
{
return new FulfilledPromise(
return Helpers::createFulfilledPromise(
$this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? []
);
}
@ -384,21 +394,21 @@ class LocalChannelManager implements ChannelManager
* Keep tracking the connections availability when they pong.
*
* @param \Ratchet\ConnectionInterface $connection
* @return bool
* @return PromiseInterface[bool]
*/
public function connectionPonged(ConnectionInterface $connection): bool
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{
return true;
return Helpers::createFulfilledPromise(true);
}
/**
* Remove the obsolete connections that didn't ponged in a while.
*
* @return bool
* @return PromiseInterface[bool]
*/
public function removeObsoleteConnections(): bool
public function removeObsoleteConnections(): PromiseInterface
{
return true;
return Helpers::createFulfilledPromise(true);
}
/**

View File

@ -132,20 +132,19 @@ class RedisChannelManager extends LocalChannelManager
* Remove connection from all channels.
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
* @return PromiseInterface[bool]
*/
public function unsubscribeFromAllChannels(ConnectionInterface $connection)
public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
{
$this->getGlobalChannels($connection->app->id)
return $this->getGlobalChannels($connection->app->id)
->then(function ($channels) use ($connection) {
foreach ($channels as $channel) {
$this->unsubscribeFromChannel(
$connection, $channel, new stdClass
);
$this->unsubscribeFromChannel($connection, $channel, new stdClass);
}
})
->then(function () use ($connection) {
return parent::unsubscribeFromAllChannels($connection);
});
parent::unsubscribeFromAllChannels($connection);
}
/**
@ -154,19 +153,23 @@ class RedisChannelManager extends LocalChannelManager
* @param \Ratchet\ConnectionInterface $connection
* @param string $channelName
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload)
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{
$this->subscribeToTopic($connection->app->id, $channelName);
$this->addConnectionToSet($connection, Carbon::now());
$this->addChannelToSet($connection->app->id, $channelName);
$this->incrementSubscriptionsCount($connection->app->id, $channelName, 1);
parent::subscribeToChannel($connection, $channelName, $payload);
return $this->subscribeToTopic($connection->app->id, $channelName)
->then(function () use ($connection) {
return $this->addConnectionToSet($connection, Carbon::now());
})
->then(function () use ($connection, $channelName) {
return $this->addChannelToSet($connection->app->id, $channelName);
})
->then(function () use ($connection, $channelName) {
return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1);
})
->then(function () use ($connection, $channelName, $payload) {
return parent::subscribeToChannel($connection, $channelName, $payload);
});
}
/**
@ -175,11 +178,11 @@ class RedisChannelManager extends LocalChannelManager
* @param \Ratchet\ConnectionInterface $connection
* @param string $channelName
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload)
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{
$this->getGlobalConnectionsCount($connection->app->id, $channelName)
return $this->getGlobalConnectionsCount($connection->app->id, $channelName)
->then(function ($count) use ($connection, $channelName) {
if ($count === 0) {
// Make sure to not stay subscribed to the PubSub topic
@ -195,39 +198,46 @@ class RedisChannelManager extends LocalChannelManager
$this->unsubscribeFromTopic($connection->app->id, $channelName);
}
});
$this->removeChannelFromSet($connection->app->id, $channelName);
$this->removeConnectionFromSet($connection);
})
->then(function () use ($connection, $channelName) {
return $this->removeChannelFromSet($connection->app->id, $channelName);
})
->then(function () use ($connection) {
return $this->removeConnectionFromSet($connection);
})
->then(function () use ($connection, $channelName, $payload) {
return parent::unsubscribeFromChannel($connection, $channelName, $payload);
});
parent::unsubscribeFromChannel($connection, $channelName, $payload);
}
/**
* Subscribe the connection to a specific channel.
* Subscribe the connection to a specific channel, returning
* a promise containing the amount of connections.
*
* @param string|int $appId
* @return void
* @return PromiseInterface[int]
*/
public function subscribeToApp($appId)
public function subscribeToApp($appId): PromiseInterface
{
$this->subscribeToTopic($appId);
$this->incrementSubscriptionsCount($appId);
return $this->subscribeToTopic($appId)
->then(function () use ($appId) {
return $this->incrementSubscriptionsCount($appId);
});
}
/**
* Unsubscribe the connection from the channel.
* Unsubscribe the connection from the channel, returning
* a promise containing the amount of connections after decrement.
*
* @param string|int $appId
* @return void
* @return PromiseInterface[int]
*/
public function unsubscribeFromApp($appId)
public function unsubscribeFromApp($appId): PromiseInterface
{
$this->unsubscribeFromTopic($appId);
$this->incrementSubscriptionsCount($appId, null, -1);
return $this->unsubscribeFromTopic($appId)
->then(function () use ($appId) {
return $this->decrementSubscriptionsCount($appId);
});
}
/**
@ -236,7 +246,7 @@ class RedisChannelManager extends LocalChannelManager
*
* @param string|int $appId
* @param string|null $channelName
* @return \React\Promise\PromiseInterface
* @return PromiseInterface[int]
*/
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface
{
@ -249,7 +259,7 @@ class RedisChannelManager extends LocalChannelManager
*
* @param string|int $appId
* @param string|null $channelName
* @return \React\Promise\PromiseInterface
* @return PromiseInterface[int]
*/
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
{
@ -268,17 +278,19 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel
* @param stdClass $payload
* @param string|null $serverId
* @return bool
* @return PromiseInterface[bool]
*/
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null)
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface
{
$payload->appId = $appId;
$payload->socketId = $socketId;
$payload->serverId = $serverId ?: $this->getServerId();
$this->publishClient->publish($this->getRedisKey($appId, $channel), json_encode($payload));
return true;
return $this->publishClient
->publish($this->getRedisKey($appId, $channel), json_encode($payload))
->then(function () use ($appId, $socketId, $channel, $payload, $serverId) {
return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId);
});
}
/**
@ -288,17 +300,17 @@ class RedisChannelManager extends LocalChannelManager
* @param stdClass $user
* @param string $channel
* @param stdClass $payload
* @return void
* @return PromiseInterface
*/
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload)
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface
{
$this->storeUserData(
$connection->app->id, $channel, $connection->socketId, json_encode($user)
);
$this->addUserSocket(
$connection->app->id, $channel, $user, $connection->socketId
);
return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user))
->then(function () use ($connection, $channel, $user) {
return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId);
})
->then(function () use ($connection, $user, $channel, $payload) {
return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload);
});
}
/**
@ -308,17 +320,17 @@ class RedisChannelManager extends LocalChannelManager
* @param stdClass $user
* @param string $channel
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel)
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface
{
$this->removeUserData(
$connection->app->id, $channel, $connection->socketId
);
$this->removeUserSocket(
$connection->app->id, $channel, $user, $connection->socketId
);
return $this->removeUserData($connection->app->id, $channel, $connection->socketId)
->then(function () use ($connection, $channel, $user) {
return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId);
})
->then(function () use ($connection, $user, $channel) {
return parent::userLeftPresenceChannel($connection, $user, $channel);
});
}
/**
@ -326,19 +338,16 @@ class RedisChannelManager extends LocalChannelManager
*
* @param string|int $appId
* @param string $channel
* @return \React\Promise\PromiseInterface
* @return \React\Promise\PromiseInterface[array]
*/
public function getChannelMembers($appId, string $channel): PromiseInterface
{
return $this->publishClient
->hgetall($this->getRedisKey($appId, $channel, ['users']))
->then(function ($list) {
return collect(Helpers::redisListToArray($list))
->map(function ($user) {
return json_decode($user);
})
->unique('user_id')
->toArray();
return collect(Helpers::redisListToArray($list))->map(function ($user) {
return json_decode($user);
})->unique('user_id')->toArray();
});
}
@ -347,7 +356,7 @@ class RedisChannelManager extends LocalChannelManager
*
* @param \Ratchet\ConnectionInterface $connection
* @param string $channel
* @return \React\Promise\PromiseInterface
* @return \React\Promise\PromiseInterface[null|array]
*/
public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface
{
@ -361,7 +370,7 @@ class RedisChannelManager extends LocalChannelManager
*
* @param string|int $appId
* @param array $channelNames
* @return \React\Promise\PromiseInterface
* @return \React\Promise\PromiseInterface[array]
*/
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface
{
@ -385,7 +394,7 @@ class RedisChannelManager extends LocalChannelManager
* @param string|int $userId
* @param string|int $appId
* @param string $channelName
* @return \React\Promise\PromiseInterface
* @return \React\Promise\PromiseInterface[array]
*/
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
{
@ -398,30 +407,31 @@ class RedisChannelManager extends LocalChannelManager
* Keep tracking the connections availability when they pong.
*
* @param \Ratchet\ConnectionInterface $connection
* @return bool
* @return PromiseInterface[bool]
*/
public function connectionPonged(ConnectionInterface $connection): bool
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{
// This will update the score with the current timestamp.
$this->addConnectionToSet($connection, Carbon::now());
return parent::connectionPonged($connection);
return $this->addConnectionToSet($connection, Carbon::now())
->then(function () use ($connection) {
return parent::connectionPonged($connection);
});
}
/**
* Remove the obsolete connections that didn't ponged in a while.
*
* @return bool
* @return PromiseInterface[bool]
*/
public function removeObsoleteConnections(): bool
public function removeObsoleteConnections(): PromiseInterface
{
$this->lock()->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) {
foreach ($connections as $socketId => $appId) {
$this->unsubscribeFromAllChannels(
$this->fakeConnectionForApp($appId, $socketId)
);
$connection = $this->fakeConnectionForApp($appId, $socketId);
$this->unsubscribeFromAllChannels($connection);
}
});
});
@ -514,7 +524,7 @@ class RedisChannelManager extends LocalChannelManager
*
* @return string
*/
public function getServerId()
public function getServerId(): string
{
return $this->serverId;
}
@ -525,9 +535,9 @@ class RedisChannelManager extends LocalChannelManager
* @param string|int $appId
* @param string|null $channel
* @param int $increment
* @return PromiseInterface
* @return PromiseInterface[int]
*/
public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1)
public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
{
return $this->publishClient->hincrby(
$this->getRedisKey($appId, $channel, ['stats']), 'connections', $increment
@ -540,9 +550,9 @@ class RedisChannelManager extends LocalChannelManager
* @param string|int $appId
* @param string|null $channel
* @param int $decrement
* @return PromiseInterface
* @return PromiseInterface[int]
*/
public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1)
public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
{
return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1);
}
@ -552,13 +562,13 @@ class RedisChannelManager extends LocalChannelManager
*
* @param \Ratchet\ConnectionInterface $connection
* @param \DateTime|string|null $moment
* @return void
* @return PromiseInterface
*/
public function addConnectionToSet(ConnectionInterface $connection, $moment = null)
public function addConnectionToSet(ConnectionInterface $connection, $moment = null): PromiseInterface
{
$moment = $moment ? Carbon::parse($moment) : Carbon::now();
$this->publishClient->zadd(
return $this->publishClient->zadd(
$this->getRedisKey(null, null, ['sockets']),
$moment->format('U'), "{$connection->app->id}:{$connection->socketId}"
);
@ -568,11 +578,11 @@ class RedisChannelManager extends LocalChannelManager
* Remove the connection from the sorted list.
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
* @return PromiseInterface
*/
public function removeConnectionFromSet(ConnectionInterface $connection)
public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface
{
$this->publishClient->zrem(
return $this->publishClient->zrem(
$this->getRedisKey(null, null, ['sockets']),
"{$connection->app->id}:{$connection->socketId}"
);
@ -585,9 +595,9 @@ class RedisChannelManager extends LocalChannelManager
* @param int $start
* @param int $stop
* @param bool $strict
* @return PromiseInterface
* @return PromiseInterface[array]
*/
public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true)
public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true): PromiseInterface
{
if ($strict) {
$start = "({$start}";
@ -612,7 +622,7 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel
* @return PromiseInterface
*/
public function addChannelToSet($appId, string $channel)
public function addChannelToSet($appId, string $channel): PromiseInterface
{
return $this->publishClient->sadd(
$this->getRedisKey($appId, null, ['channels']), $channel
@ -626,7 +636,7 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel
* @return PromiseInterface
*/
public function removeChannelFromSet($appId, string $channel)
public function removeChannelFromSet($appId, string $channel): PromiseInterface
{
return $this->publishClient->srem(
$this->getRedisKey($appId, null, ['channels']), $channel
@ -642,9 +652,9 @@ class RedisChannelManager extends LocalChannelManager
* @param string $data
* @return PromiseInterface
*/
public function storeUserData($appId, string $channel = null, string $key, $data)
public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface
{
$this->publishClient->hset(
return $this->publishClient->hset(
$this->getRedisKey($appId, $channel, ['users']), $key, $data
);
}
@ -657,7 +667,7 @@ class RedisChannelManager extends LocalChannelManager
* @param string $key
* @return PromiseInterface
*/
public function removeUserData($appId, string $channel = null, string $key)
public function removeUserData($appId, string $channel = null, string $key): PromiseInterface
{
return $this->publishClient->hdel(
$this->getRedisKey($appId, $channel, ['users']), $key
@ -669,11 +679,11 @@ class RedisChannelManager extends LocalChannelManager
*
* @param string|int $appId
* @param string|null $channel
* @return void
* @return PromiseInterface
*/
public function subscribeToTopic($appId, string $channel = null)
public function subscribeToTopic($appId, string $channel = null): PromiseInterface
{
$this->subscribeClient->subscribe(
return $this->subscribeClient->subscribe(
$this->getRedisKey($appId, $channel)
);
}
@ -683,11 +693,11 @@ class RedisChannelManager extends LocalChannelManager
*
* @param string|int $appId
* @param string|null $channel
* @return void
* @return PromiseInterface
*/
public function unsubscribeFromTopic($appId, string $channel = null)
public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface
{
$this->subscribeClient->unsubscribe(
return $this->subscribeClient->unsubscribe(
$this->getRedisKey($appId, $channel)
);
}
@ -699,11 +709,11 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel
* @param stdClass $user
* @param string $socketId
* @return void
* @return PromiseInterface
*/
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId)
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
{
$this->publishClient->sadd(
return $this->publishClient->sadd(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId
);
}
@ -715,11 +725,11 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel
* @param stdClass $user
* @param string $socketId
* @return void
* @return PromiseInterface
*/
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId)
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
{
$this->publishClient->srem(
return $this->publishClient->srem(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId
);
}

View File

@ -73,9 +73,9 @@ class Channel
* @see https://pusher.com/docs/pusher_protocol#presence-channel-events
* @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload
* @return void
* @return bool
*/
public function subscribe(ConnectionInterface $connection, stdClass $payload)
public function subscribe(ConnectionInterface $connection, stdClass $payload): bool
{
$this->saveConnection($connection);
@ -88,21 +88,25 @@ class Channel
'socketId' => $connection->socketId,
'channel' => $this->getName(),
]);
return true;
}
/**
* Unsubscribe connection from the channel.
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
* @return bool
*/
public function unsubscribe(ConnectionInterface $connection)
public function unsubscribe(ConnectionInterface $connection): bool
{
if (! isset($this->connections[$connection->socketId])) {
return;
return false;
}
unset($this->connections[$connection->socketId]);
return true;
}
/**

View File

@ -15,119 +15,124 @@ class PresenceChannel extends PrivateChannel
* @see https://pusher.com/docs/pusher_protocol#presence-channel-events
* @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload
* @return void
* @return bool
* @throws InvalidSignature
*/
public function subscribe(ConnectionInterface $connection, stdClass $payload)
public function subscribe(ConnectionInterface $connection, stdClass $payload): bool
{
$this->verifySignature($connection, $payload);
$this->saveConnection($connection);
$this->channelManager->userJoinedPresenceChannel(
$connection,
$user = json_decode($payload->channel_data),
$this->getName(),
$payload
);
$user = json_decode($payload->channel_data);
$this->channelManager
->getChannelMembers($connection->app->id, $this->getName())
->then(function ($users) use ($connection) {
$hash = [];
->userJoinedPresenceChannel($connection, $user, $this->getName(), $payload)
->then(function () use ($connection) {
$this->channelManager
->getChannelMembers($connection->app->id, $this->getName())
->then(function ($users) use ($connection) {
$hash = [];
foreach ($users as $socketId => $user) {
$hash[$user->user_id] = $user->user_info ?? [];
}
foreach ($users as $socketId => $user) {
$hash[$user->user_id] = $user->user_info ?? [];
}
$connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded',
'channel' => $this->getName(),
'data' => json_encode([
'presence' => [
'ids' => collect($users)->map(function ($user) {
return (string) $user->user_id;
})->values(),
'hash' => $hash,
'count' => count($users),
],
]),
]));
$connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded',
'channel' => $this->getName(),
'data' => json_encode([
'presence' => [
'ids' => collect($users)->map(function ($user) {
return (string) $user->user_id;
})->values(),
'hash' => $hash,
'count' => count($users),
],
]),
]));
});
})
->then(function () use ($connection, $user, $payload) {
// The `pusher_internal:member_added` event is triggered when a user joins a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the first tab is opened.
$this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($payload, $connection) {
if (count($sockets) === 1) {
$memberAddedPayload = [
'event' => 'pusher_internal:member_added',
'channel' => $this->getName(),
'data' => $payload->channel_data,
];
$this->broadcastToEveryoneExcept(
(object) $memberAddedPayload, $connection->socketId,
$connection->app->id
);
}
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
'socketId' => $connection->socketId,
'channel' => $this->getName(),
'duplicate-connection' => count($sockets) > 1,
]);
});
});
// The `pusher_internal:member_added` event is triggered when a user joins a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the first tab is opened.
$this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($payload, $connection) {
if (count($sockets) === 1) {
$memberAddedPayload = [
'event' => 'pusher_internal:member_added',
'channel' => $this->getName(),
'data' => $payload->channel_data,
];
$this->broadcastToEveryoneExcept(
(object) $memberAddedPayload, $connection->socketId,
$connection->app->id
);
}
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
'socketId' => $connection->socketId,
'channel' => $this->getName(),
'duplicate-connection' => count($sockets) > 1,
]);
});
return true;
}
/**
* Unsubscribe connection from the channel.
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
* @return bool
*/
public function unsubscribe(ConnectionInterface $connection)
public function unsubscribe(ConnectionInterface $connection): bool
{
parent::unsubscribe($connection);
$truth = parent::unsubscribe($connection);
$this->channelManager
->getChannelMember($connection, $this->getName())
->then(function ($user) {
return @json_decode($user);
})
->then(function ($user) use ($connection) {
$user = @json_decode($user);
if (! $user) {
return;
}
$this->channelManager->userLeftPresenceChannel(
$connection, $user, $this->getName()
);
// The `pusher_internal:member_removed` is triggered when a user leaves a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the last one is closed.
$this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($connection, $user) {
if (count($sockets) === 0) {
$memberRemovedPayload = [
'event' => 'pusher_internal:member_removed',
'channel' => $this->getName(),
'data' => json_encode([
'user_id' => $user->user_id,
]),
];
->userLeftPresenceChannel($connection, $user, $this->getName())
->then(function () use ($connection, $user) {
// The `pusher_internal:member_removed` is triggered when a user leaves a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the last one is closed.
$this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($connection, $user) {
if (count($sockets) === 0) {
$memberRemovedPayload = [
'event' => 'pusher_internal:member_removed',
'channel' => $this->getName(),
'data' => json_encode([
'user_id' => $user->user_id,
]),
];
$this->broadcastToEveryoneExcept(
(object) $memberRemovedPayload, $connection->socketId,
$connection->app->id
);
}
$this->broadcastToEveryoneExcept(
(object) $memberRemovedPayload, $connection->socketId,
$connection->app->id
);
}
});
});
});
return $truth;
}
}

View File

@ -14,13 +14,13 @@ class PrivateChannel extends Channel
* @see https://pusher.com/docs/pusher_protocol#presence-channel-events
* @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload
* @return void
* @return bool
* @throws InvalidSignature
*/
public function subscribe(ConnectionInterface $connection, stdClass $payload)
public function subscribe(ConnectionInterface $connection, stdClass $payload): bool
{
$this->verifySignature($connection, $payload);
parent::subscribe($connection, $payload);
return parent::subscribe($connection, $payload);
}
}

View File

@ -66,9 +66,9 @@ interface ChannelManager
* Remove connection from all channels.
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
* @return PromiseInterface[bool]
*/
public function unsubscribeFromAllChannels(ConnectionInterface $connection);
public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface;
/**
* Subscribe the connection to a specific channel.
@ -76,9 +76,9 @@ interface ChannelManager
* @param \Ratchet\ConnectionInterface $connection
* @param string $channelName
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload);
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface;
/**
* Unsubscribe the connection from the channel.
@ -86,26 +86,27 @@ interface ChannelManager
* @param \Ratchet\ConnectionInterface $connection
* @param string $channelName
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload);
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface;
/**
* Subscribe the connection to a specific channel.
* Subscribe the connection to a specific channel, returning
* a promise containing the amount of connections.
*
* @param string|int $appId
* @return void
* @return PromiseInterface[int]
*/
public function subscribeToApp($appId);
public function subscribeToApp($appId): PromiseInterface;
/**
* Unsubscribe the connection from the channel.
* Unsubscribe the connection from the channel, returning
* a promise containing the amount of connections after decrement.
*
* @param \Ratchet\ConnectionInterface $connection
* @param string|int $appId
* @return void
* @return PromiseInterface[int]
*/
public function unsubscribeFromApp($appId);
public function unsubscribeFromApp($appId): PromiseInterface;
/**
* Get the connections count on the app
@ -113,7 +114,7 @@ interface ChannelManager
*
* @param string|int $appId
* @param string|null $channelName
* @return \React\Promise\PromiseInterface
* @return PromiseInterface[int]
*/
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface;
@ -123,7 +124,7 @@ interface ChannelManager
*
* @param string|int $appId
* @param string|null $channelName
* @return \React\Promise\PromiseInterface
* @return PromiseInterface[int]
*/
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface;
@ -135,9 +136,9 @@ interface ChannelManager
* @param string $channel
* @param stdClass $payload
* @param string|null $serverId
* @return bool
* @return PromiseInterface[bool]
*/
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null);
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface;
/**
* Handle the user when it joined a presence channel.
@ -146,9 +147,9 @@ interface ChannelManager
* @param stdClass $user
* @param string $channel
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload);
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface;
/**
* Handle the user when it left a presence channel.
@ -157,9 +158,9 @@ interface ChannelManager
* @param stdClass $user
* @param string $channel
* @param stdClass $payload
* @return void
* @return PromiseInterface[bool]
*/
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel);
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface;
/**
* Get the presence channel members.
@ -202,14 +203,14 @@ interface ChannelManager
* Keep tracking the connections availability when they pong.
*
* @param \Ratchet\ConnectionInterface $connection
* @return bool
* @return PromiseInterface[bool]
*/
public function connectionPonged(ConnectionInterface $connection): bool;
public function connectionPonged(ConnectionInterface $connection): PromiseInterface;
/**
* Remove the obsolete connections that didn't ponged in a while.
*
* @return bool
* @return PromiseInterface[bool]
*/
public function removeObsoleteConnections(): bool;
public function removeObsoleteConnections(): PromiseInterface;
}

View File

@ -2,8 +2,17 @@
namespace BeyondCode\LaravelWebSockets;
use React\Promise\PromiseInterface;
class Helpers
{
/**
* The loop used to create the Fulfilled Promise.
*
* @var null|\React\EventLoop\LoopInterface
*/
public static $loop = null;
/**
* Transform the Redis' list of key after value
* to key-value pairs.
@ -23,4 +32,19 @@ class Helpers
return array_combine($keys->all(), $values->all());
}
/**
* Create a new fulfilled promise with a value.
*
* @param mixed $value
* @return \React\Promise\PromiseInterface
*/
public static function createFulfilledPromise($value): PromiseInterface
{
$resolver = config(
'websockets.promise_resolver', \React\Promise\FulfilledPromise::class
);
return new $resolver($value, static::$loop);
}
}

View File

@ -31,11 +31,11 @@ class PusherChannelProtocolMessage extends PusherClientMessage
*/
protected function ping(ConnectionInterface $connection)
{
$connection->send(json_encode([
'event' => 'pusher:pong',
]));
$this->channelManager->connectionPonged($connection);
$this->channelManager
->connectionPonged($connection)
->then(function () use ($connection) {
$connection->send(json_encode(['event' => 'pusher:pong']));
});
}
/**

View File

@ -92,17 +92,19 @@ class WebSocketHandler implements MessageComponentInterface
*/
public function onClose(ConnectionInterface $connection)
{
$this->channelManager->unsubscribeFromAllChannels($connection);
$this->channelManager
->unsubscribeFromAllChannels($connection)
->then(function (bool $unsubscribed) use ($connection) {
if (isset($connection->app)) {
StatisticsCollector::disconnection($connection->app->id);
if (isset($connection->app)) {
StatisticsCollector::disconnection($connection->app->id);
$this->channelManager->unsubscribeFromApp($connection->app->id);
$this->channelManager->unsubscribeFromApp($connection->app->id);
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_DISCONNECTED, [
'socketId' => $connection->socketId,
]);
}
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_DISCONNECTED, [
'socketId' => $connection->socketId,
]);
}
});
}
/**

View File

@ -5,8 +5,8 @@ namespace BeyondCode\LaravelWebSockets\Statistics\Collectors;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use BeyondCode\LaravelWebSockets\Contracts\StatisticsCollector;
use BeyondCode\LaravelWebSockets\Facades\StatisticsStore;
use BeyondCode\LaravelWebSockets\Helpers;
use BeyondCode\LaravelWebSockets\Statistics\Statistic;
use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface;
class MemoryCollector implements StatisticsCollector
@ -126,7 +126,7 @@ class MemoryCollector implements StatisticsCollector
*/
public function getStatistics(): PromiseInterface
{
return new FulfilledPromise($this->statistics);
return Helpers::createFulfilledPromise($this->statistics);
}
/**
@ -137,7 +137,7 @@ class MemoryCollector implements StatisticsCollector
*/
public function getAppStatistics($appId): PromiseInterface
{
return new FulfilledPromise(
return Helpers::createFulfilledPromise(
$this->statistics[$appId] ?? null
);
}

View File

@ -2,7 +2,9 @@
namespace BeyondCode\LaravelWebSockets\Test\Mocks;
use BeyondCode\LaravelWebSockets\Helpers;
use Clue\React\Block;
use React\EventLoop\LoopInterface;
use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface;
@ -25,13 +27,13 @@ class PromiseResolver implements PromiseInterface
/**
* Initialize the promise resolver.
*
* @param PromiseInterface $promise
* @param mixed $promise
* @param LoopInterface $loop
* @return void
*/
public function __construct($promise, $loop)
public function __construct($promise, LoopInterface $loop)
{
$this->promise = $promise;
$this->promise = $promise instanceof PromiseInterface ? $promise : new FulfilledPromise($promise);
$this->loop = $loop;
}
@ -53,7 +55,7 @@ class PromiseResolver implements PromiseInterface
return $result instanceof PromiseInterface
? new self($result, $this->loop)
: new self(new FulfilledPromise($result), $this->loop);
: new self(Helpers::createFulfilledPromise($result), $this->loop);
}
/**

View File

@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\Test;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use BeyondCode\LaravelWebSockets\Contracts\StatisticsCollector;
use BeyondCode\LaravelWebSockets\Contracts\StatisticsStore;
use BeyondCode\LaravelWebSockets\Helpers;
use GuzzleHttp\Psr7\Request;
use Illuminate\Support\Facades\Redis;
use Orchestra\Testbench\BrowserKit\TestCase as Orchestra;
@ -77,6 +78,8 @@ abstract class TestCase extends Orchestra
$this->loadMigrationsFrom(__DIR__.'/database/migrations');
$this->withFactories(__DIR__.'/database/factories');
$this->registerPromiseResolver();
$this->registerManagers();
$this->registerStatisticsCollectors();
@ -207,6 +210,21 @@ abstract class TestCase extends Orchestra
]);
}
/**
* Register the test promise resolver.
*
* @return void
*/
protected function registerPromiseResolver()
{
Helpers::$loop = $this->loop;
$this->app['config']->set(
'websockets.promise_resolver',
\BeyondCode\LaravelWebSockets\Test\Mocks\PromiseResolver::class
);
}
/**
* Register the managers that are not resolved
* by the package service provider.