diff --git a/config/websockets.php b/config/websockets.php index 36c8c14..9dcd4f6 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -279,4 +279,19 @@ return [ ], + /* + |-------------------------------------------------------------------------- + | Promise Resolver + |-------------------------------------------------------------------------- + | + | The promise resolver is a class that takes a input value and is + | able to make sure the PHP code runs async by using ->then(). You can + | use your own Promise Resolver. This is usually changed when you want to + | intercept values by the promises throughout the app, like in testing + | to switch from async to sync. + | + */ + + 'promise_resolver' => \React\Promise\FulfilledPromise::class, + ]; diff --git a/src/API/FetchChannels.php b/src/API/FetchChannels.php index dcfd74f..ddd39cc 100644 --- a/src/API/FetchChannels.php +++ b/src/API/FetchChannels.php @@ -64,11 +64,9 @@ class FetchChannels extends Controller } return $info; - }) - ->sortBy(function ($content, $name) { + })->sortBy(function ($content, $name) { return $name; - }) - ->all(); + })->all(); return [ 'channels' => $channels ?: new stdClass, diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 980ee61..d782fc7 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -6,6 +6,7 @@ use BeyondCode\LaravelWebSockets\Channels\Channel; use BeyondCode\LaravelWebSockets\Channels\PresenceChannel; use BeyondCode\LaravelWebSockets\Channels\PrivateChannel; use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; +use BeyondCode\LaravelWebSockets\Helpers; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use React\EventLoop\LoopInterface; @@ -104,7 +105,7 @@ class LocalChannelManager implements ChannelManager ->values()->collapse() ->toArray(); - return new FulfilledPromise($connections); + return Helpers::createFulfilledPromise($connections); } /** @@ -116,7 +117,7 @@ class LocalChannelManager implements ChannelManager */ public function getLocalChannels($appId): PromiseInterface { - return new FulfilledPromise( + return Helpers::createFulfilledPromise( $this->channels[$appId] ?? [] ); } @@ -137,12 +138,12 @@ class LocalChannelManager implements ChannelManager * Remove connection from all channels. * * @param \Ratchet\ConnectionInterface $connection - * @return void + * @return PromiseInterface[bool] */ - public function unsubscribeFromAllChannels(ConnectionInterface $connection) + public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface { if (! isset($connection->app)) { - return; + return new FuilfilledPromise(false); } $this->getLocalChannels($connection->app->id) @@ -162,6 +163,8 @@ class LocalChannelManager implements ChannelManager unset($this->channels[$connection->app->id]); } }); + + return Helpers::createFulfilledPromise(true); } /** @@ -170,13 +173,15 @@ class LocalChannelManager implements ChannelManager * @param \Ratchet\ConnectionInterface $connection * @param string $channelName * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) + public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface { $channel = $this->findOrCreate($connection->app->id, $channelName); - $channel->subscribe($connection, $payload); + return Helpers::createFulfilledPromise( + $channel->subscribe($connection, $payload) + ); } /** @@ -185,35 +190,39 @@ class LocalChannelManager implements ChannelManager * @param \Ratchet\ConnectionInterface $connection * @param string $channelName * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) + public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface { $channel = $this->findOrCreate($connection->app->id, $channelName); - $channel->unsubscribe($connection, $payload); + return Helpers::createFulfilledPromise( + $channel->unsubscribe($connection, $payload) + ); } /** - * Subscribe the connection to a specific channel. + * Subscribe the connection to a specific channel, returning + * a promise containing the amount of connections. * * @param string|int $appId - * @return void + * @return PromiseInterface[int] */ - public function subscribeToApp($appId) + public function subscribeToApp($appId): PromiseInterface { - // + return Helpers::createFulfilledPromise(0); } /** - * Unsubscribe the connection from the channel. + * Unsubscribe the connection from the channel, returning + * a promise containing the amount of connections after decrement. * * @param string|int $appId - * @return void + * @return PromiseInterface[int] */ - public function unsubscribeFromApp($appId) + public function unsubscribeFromApp($appId): PromiseInterface { - // + return Helpers::createFulfilledPromise(0); } /** @@ -222,23 +231,21 @@ class LocalChannelManager implements ChannelManager * * @param string|int $appId * @param string|null $channelName - * @return \React\Promise\PromiseInterface + * @return PromiseInterface[int] */ public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface { return $this->getLocalChannels($appId) ->then(function ($channels) use ($channelName) { - return collect($channels) - ->when(! is_null($channelName), function ($collection) use ($channelName) { - return $collection->filter(function (Channel $channel) use ($channelName) { - return $channel->getName() === $channelName; - }); - }) - ->flatMap(function (Channel $channel) { - return collect($channel->getConnections())->pluck('socketId'); - }) - ->unique() - ->count(); + return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) { + return $collection->filter(function (Channel $channel) use ($channelName) { + return $channel->getName() === $channelName; + }); + }) + ->flatMap(function (Channel $channel) { + return collect($channel->getConnections())->pluck('socketId'); + }) + ->unique()->count(); }); } @@ -248,7 +255,7 @@ class LocalChannelManager implements ChannelManager * * @param string|int $appId * @param string|null $channelName - * @return \React\Promise\PromiseInterface + * @return PromiseInterface[int] */ public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface { @@ -263,11 +270,11 @@ class LocalChannelManager implements ChannelManager * @param string $channel * @param stdClass $payload * @param string|null $serverId - * @return bool + * @return PromiseInterface[bool] */ - public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null) + public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface { - return true; + return Helpers::createFulfilledPromise(true); } /** @@ -277,12 +284,14 @@ class LocalChannelManager implements ChannelManager * @param stdClass $user * @param string $channel * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload) + public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface { $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user); $this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId; + + return Helpers::createFulfilledPromise(true); } /** @@ -292,9 +301,9 @@ class LocalChannelManager implements ChannelManager * @param stdClass $user * @param string $channel * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel) + public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface { unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]); @@ -310,6 +319,8 @@ class LocalChannelManager implements ChannelManager unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]); } } + + return Helpers::createFulfilledPromise(true); } /** @@ -327,7 +338,7 @@ class LocalChannelManager implements ChannelManager return json_decode($user); })->unique('user_id')->toArray(); - return new FulfilledPromise($members); + return Helpers::createFulfilledPromise($members); } /** @@ -341,7 +352,7 @@ class LocalChannelManager implements ChannelManager { $member = $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] ?? null; - return new FulfilledPromise($member); + return Helpers::createFulfilledPromise($member); } /** @@ -362,7 +373,7 @@ class LocalChannelManager implements ChannelManager return $results; }, []); - return new FulfilledPromise($results); + return Helpers::createFulfilledPromise($results); } /** @@ -375,7 +386,7 @@ class LocalChannelManager implements ChannelManager */ public function getMemberSockets($userId, $appId, $channelName): PromiseInterface { - return new FulfilledPromise( + return Helpers::createFulfilledPromise( $this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? [] ); } @@ -384,21 +395,21 @@ class LocalChannelManager implements ChannelManager * Keep tracking the connections availability when they pong. * * @param \Ratchet\ConnectionInterface $connection - * @return bool + * @return PromiseInterface[bool] */ - public function connectionPonged(ConnectionInterface $connection): bool + public function connectionPonged(ConnectionInterface $connection): PromiseInterface { - return true; + return Helpers::createFulfilledPromise(true); } /** * Remove the obsolete connections that didn't ponged in a while. * - * @return bool + * @return PromiseInterface[bool] */ - public function removeObsoleteConnections(): bool + public function removeObsoleteConnections(): PromiseInterface { - return true; + return Helpers::createFulfilledPromise(true); } /** diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 22728ef..6b02436 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -132,20 +132,19 @@ class RedisChannelManager extends LocalChannelManager * Remove connection from all channels. * * @param \Ratchet\ConnectionInterface $connection - * @return void + * @return PromiseInterface[bool] */ - public function unsubscribeFromAllChannels(ConnectionInterface $connection) + public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface { - $this->getGlobalChannels($connection->app->id) + return $this->getGlobalChannels($connection->app->id) ->then(function ($channels) use ($connection) { foreach ($channels as $channel) { - $this->unsubscribeFromChannel( - $connection, $channel, new stdClass - ); + $this->unsubscribeFromChannel($connection, $channel, new stdClass); } + }) + ->then(function () use ($connection) { + return parent::unsubscribeFromAllChannels($connection); }); - - parent::unsubscribeFromAllChannels($connection); } /** @@ -154,19 +153,23 @@ class RedisChannelManager extends LocalChannelManager * @param \Ratchet\ConnectionInterface $connection * @param string $channelName * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) + public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface { - $this->subscribeToTopic($connection->app->id, $channelName); - - $this->addConnectionToSet($connection, Carbon::now()); - - $this->addChannelToSet($connection->app->id, $channelName); - - $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1); - - parent::subscribeToChannel($connection, $channelName, $payload); + return $this->subscribeToTopic($connection->app->id, $channelName) + ->then(function () use ($connection) { + return $this->addConnectionToSet($connection, Carbon::now()); + }) + ->then(function () use ($connection, $channelName) { + return $this->addChannelToSet($connection->app->id, $channelName); + }) + ->then(function () use ($connection, $channelName) { + return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1); + }) + ->then(function () use ($connection, $channelName, $payload) { + return parent::subscribeToChannel($connection, $channelName, $payload); + }); } /** @@ -175,11 +178,11 @@ class RedisChannelManager extends LocalChannelManager * @param \Ratchet\ConnectionInterface $connection * @param string $channelName * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) + public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface { - $this->getGlobalConnectionsCount($connection->app->id, $channelName) + return $this->getGlobalConnectionsCount($connection->app->id, $channelName) ->then(function ($count) use ($connection, $channelName) { if ($count === 0) { // Make sure to not stay subscribed to the PubSub topic @@ -195,39 +198,46 @@ class RedisChannelManager extends LocalChannelManager $this->unsubscribeFromTopic($connection->app->id, $channelName); } }); - - $this->removeChannelFromSet($connection->app->id, $channelName); - - $this->removeConnectionFromSet($connection); + }) + ->then(function () use ($connection, $channelName) { + return $this->removeChannelFromSet($connection->app->id, $channelName); + }) + ->then(function () use($connection) { + return $this->removeConnectionFromSet($connection); + }) + ->then(function () use ($connection, $channelName, $payload) { + return parent::unsubscribeFromChannel($connection, $channelName, $payload); }); - - parent::unsubscribeFromChannel($connection, $channelName, $payload); } /** - * Subscribe the connection to a specific channel. + * Subscribe the connection to a specific channel, returning + * a promise containing the amount of connections. * * @param string|int $appId - * @return void + * @return PromiseInterface[int] */ - public function subscribeToApp($appId) + public function subscribeToApp($appId): PromiseInterface { - $this->subscribeToTopic($appId); - - $this->incrementSubscriptionsCount($appId); + return $this->subscribeToTopic($appId) + ->then(function () use ($appId) { + return $this->incrementSubscriptionsCount($appId); + }); } /** - * Unsubscribe the connection from the channel. + * Unsubscribe the connection from the channel, returning + * a promise containing the amount of connections after decrement. * * @param string|int $appId - * @return void + * @return PromiseInterface[int] */ - public function unsubscribeFromApp($appId) + public function unsubscribeFromApp($appId): PromiseInterface { - $this->unsubscribeFromTopic($appId); - - $this->incrementSubscriptionsCount($appId, null, -1); + return $this->unsubscribeFromTopic($appId) + ->then(function () use ($appId) { + return $this->decrementSubscriptionsCount($appId); + }); } /** @@ -236,7 +246,7 @@ class RedisChannelManager extends LocalChannelManager * * @param string|int $appId * @param string|null $channelName - * @return \React\Promise\PromiseInterface + * @return PromiseInterface[int] */ public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface { @@ -249,7 +259,7 @@ class RedisChannelManager extends LocalChannelManager * * @param string|int $appId * @param string|null $channelName - * @return \React\Promise\PromiseInterface + * @return PromiseInterface[int] */ public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface { @@ -268,17 +278,19 @@ class RedisChannelManager extends LocalChannelManager * @param string $channel * @param stdClass $payload * @param string|null $serverId - * @return bool + * @return PromiseInterface[bool] */ - public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null) + public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface { $payload->appId = $appId; $payload->socketId = $socketId; $payload->serverId = $serverId ?: $this->getServerId(); - $this->publishClient->publish($this->getRedisKey($appId, $channel), json_encode($payload)); - - return true; + return $this->publishClient + ->publish($this->getRedisKey($appId, $channel), json_encode($payload)) + ->then(function () use ($appId, $socketId, $channel, $payload, $serverId) { + return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId); + }); } /** @@ -288,17 +300,17 @@ class RedisChannelManager extends LocalChannelManager * @param stdClass $user * @param string $channel * @param stdClass $payload - * @return void + * @return PromiseInterface */ - public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload) + public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface { - $this->storeUserData( - $connection->app->id, $channel, $connection->socketId, json_encode($user) - ); - - $this->addUserSocket( - $connection->app->id, $channel, $user, $connection->socketId - ); + return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user)) + ->then(function () use ($connection, $channel, $user) { + return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId); + }) + ->then(function () use ($connection, $user, $channel, $payload) { + return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload); + }); } /** @@ -308,17 +320,17 @@ class RedisChannelManager extends LocalChannelManager * @param stdClass $user * @param string $channel * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel) + public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface { - $this->removeUserData( - $connection->app->id, $channel, $connection->socketId - ); - - $this->removeUserSocket( - $connection->app->id, $channel, $user, $connection->socketId - ); + return $this->removeUserData($connection->app->id, $channel, $connection->socketId) + ->then(function () use ($connection, $channel, $user) { + return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId); + }) + ->then(function () use ($connection, $user, $channel) { + return parent::userLeftPresenceChannel($connection, $user, $channel); + }); } /** @@ -326,19 +338,16 @@ class RedisChannelManager extends LocalChannelManager * * @param string|int $appId * @param string $channel - * @return \React\Promise\PromiseInterface + * @return \React\Promise\PromiseInterface[array] */ public function getChannelMembers($appId, string $channel): PromiseInterface { return $this->publishClient ->hgetall($this->getRedisKey($appId, $channel, ['users'])) ->then(function ($list) { - return collect(Helpers::redisListToArray($list)) - ->map(function ($user) { - return json_decode($user); - }) - ->unique('user_id') - ->toArray(); + return collect(Helpers::redisListToArray($list))->map(function ($user) { + return json_decode($user); + })->unique('user_id')->toArray(); }); } @@ -347,7 +356,7 @@ class RedisChannelManager extends LocalChannelManager * * @param \Ratchet\ConnectionInterface $connection * @param string $channel - * @return \React\Promise\PromiseInterface + * @return \React\Promise\PromiseInterface[null|array] */ public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface { @@ -361,7 +370,7 @@ class RedisChannelManager extends LocalChannelManager * * @param string|int $appId * @param array $channelNames - * @return \React\Promise\PromiseInterface + * @return \React\Promise\PromiseInterface[array] */ public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface { @@ -385,7 +394,7 @@ class RedisChannelManager extends LocalChannelManager * @param string|int $userId * @param string|int $appId * @param string $channelName - * @return \React\Promise\PromiseInterface + * @return \React\Promise\PromiseInterface[array] */ public function getMemberSockets($userId, $appId, $channelName): PromiseInterface { @@ -398,30 +407,31 @@ class RedisChannelManager extends LocalChannelManager * Keep tracking the connections availability when they pong. * * @param \Ratchet\ConnectionInterface $connection - * @return bool + * @return PromiseInterface[bool] */ - public function connectionPonged(ConnectionInterface $connection): bool + public function connectionPonged(ConnectionInterface $connection): PromiseInterface { // This will update the score with the current timestamp. - $this->addConnectionToSet($connection, Carbon::now()); - - return parent::connectionPonged($connection); + return $this->addConnectionToSet($connection, Carbon::now()) + ->then(function () use ($connection) { + return parent::connectionPonged($connection); + }); } /** * Remove the obsolete connections that didn't ponged in a while. * - * @return bool + * @return PromiseInterface[bool] */ - public function removeObsoleteConnections(): bool + public function removeObsoleteConnections(): PromiseInterface { $this->lock()->get(function () { $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) ->then(function ($connections) { foreach ($connections as $socketId => $appId) { - $this->unsubscribeFromAllChannels( - $this->fakeConnectionForApp($appId, $socketId) - ); + $connection = $this->fakeConnectionForApp($appId, $socketId); + + $this->unsubscribeFromAllChannels($connection); } }); }); @@ -514,7 +524,7 @@ class RedisChannelManager extends LocalChannelManager * * @return string */ - public function getServerId() + public function getServerId(): string { return $this->serverId; } @@ -525,9 +535,9 @@ class RedisChannelManager extends LocalChannelManager * @param string|int $appId * @param string|null $channel * @param int $increment - * @return PromiseInterface + * @return PromiseInterface[int] */ - public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1) + public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface { return $this->publishClient->hincrby( $this->getRedisKey($appId, $channel, ['stats']), 'connections', $increment @@ -540,9 +550,9 @@ class RedisChannelManager extends LocalChannelManager * @param string|int $appId * @param string|null $channel * @param int $decrement - * @return PromiseInterface + * @return PromiseInterface[int] */ - public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1) + public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface { return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1); } @@ -552,13 +562,13 @@ class RedisChannelManager extends LocalChannelManager * * @param \Ratchet\ConnectionInterface $connection * @param \DateTime|string|null $moment - * @return void + * @return PromiseInterface */ - public function addConnectionToSet(ConnectionInterface $connection, $moment = null) + public function addConnectionToSet(ConnectionInterface $connection, $moment = null): PromiseInterface { $moment = $moment ? Carbon::parse($moment) : Carbon::now(); - $this->publishClient->zadd( + return $this->publishClient->zadd( $this->getRedisKey(null, null, ['sockets']), $moment->format('U'), "{$connection->app->id}:{$connection->socketId}" ); @@ -568,11 +578,11 @@ class RedisChannelManager extends LocalChannelManager * Remove the connection from the sorted list. * * @param \Ratchet\ConnectionInterface $connection - * @return void + * @return PromiseInterface */ - public function removeConnectionFromSet(ConnectionInterface $connection) + public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface { - $this->publishClient->zrem( + return $this->publishClient->zrem( $this->getRedisKey(null, null, ['sockets']), "{$connection->app->id}:{$connection->socketId}" ); @@ -585,9 +595,9 @@ class RedisChannelManager extends LocalChannelManager * @param int $start * @param int $stop * @param bool $strict - * @return PromiseInterface + * @return PromiseInterface[array] */ - public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true) + public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true): PromiseInterface { if ($strict) { $start = "({$start}"; @@ -612,7 +622,7 @@ class RedisChannelManager extends LocalChannelManager * @param string $channel * @return PromiseInterface */ - public function addChannelToSet($appId, string $channel) + public function addChannelToSet($appId, string $channel): PromiseInterface { return $this->publishClient->sadd( $this->getRedisKey($appId, null, ['channels']), $channel @@ -626,7 +636,7 @@ class RedisChannelManager extends LocalChannelManager * @param string $channel * @return PromiseInterface */ - public function removeChannelFromSet($appId, string $channel) + public function removeChannelFromSet($appId, string $channel): PromiseInterface { return $this->publishClient->srem( $this->getRedisKey($appId, null, ['channels']), $channel @@ -642,9 +652,9 @@ class RedisChannelManager extends LocalChannelManager * @param string $data * @return PromiseInterface */ - public function storeUserData($appId, string $channel = null, string $key, $data) + public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface { - $this->publishClient->hset( + return $this->publishClient->hset( $this->getRedisKey($appId, $channel, ['users']), $key, $data ); } @@ -657,7 +667,7 @@ class RedisChannelManager extends LocalChannelManager * @param string $key * @return PromiseInterface */ - public function removeUserData($appId, string $channel = null, string $key) + public function removeUserData($appId, string $channel = null, string $key): PromiseInterface { return $this->publishClient->hdel( $this->getRedisKey($appId, $channel, ['users']), $key @@ -669,11 +679,11 @@ class RedisChannelManager extends LocalChannelManager * * @param string|int $appId * @param string|null $channel - * @return void + * @return PromiseInterface */ - public function subscribeToTopic($appId, string $channel = null) + public function subscribeToTopic($appId, string $channel = null): PromiseInterface { - $this->subscribeClient->subscribe( + return $this->subscribeClient->subscribe( $this->getRedisKey($appId, $channel) ); } @@ -683,11 +693,11 @@ class RedisChannelManager extends LocalChannelManager * * @param string|int $appId * @param string|null $channel - * @return void + * @return PromiseInterface */ - public function unsubscribeFromTopic($appId, string $channel = null) + public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface { - $this->subscribeClient->unsubscribe( + return $this->subscribeClient->unsubscribe( $this->getRedisKey($appId, $channel) ); } @@ -699,11 +709,11 @@ class RedisChannelManager extends LocalChannelManager * @param string $channel * @param stdClass $user * @param string $socketId - * @return void + * @return PromiseInterface */ - protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId) + protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface { - $this->publishClient->sadd( + return $this->publishClient->sadd( $this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId ); } @@ -715,11 +725,11 @@ class RedisChannelManager extends LocalChannelManager * @param string $channel * @param stdClass $user * @param string $socketId - * @return void + * @return PromiseInterface */ - protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId) + protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface { - $this->publishClient->srem( + return $this->publishClient->srem( $this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId ); } diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index e0450bd..2abf150 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -73,9 +73,9 @@ class Channel * @see https://pusher.com/docs/pusher_protocol#presence-channel-events * @param \Ratchet\ConnectionInterface $connection * @param \stdClass $payload - * @return void + * @return bool */ - public function subscribe(ConnectionInterface $connection, stdClass $payload) + public function subscribe(ConnectionInterface $connection, stdClass $payload): bool { $this->saveConnection($connection); @@ -88,21 +88,25 @@ class Channel 'socketId' => $connection->socketId, 'channel' => $this->getName(), ]); + + return true; } /** * Unsubscribe connection from the channel. * * @param \Ratchet\ConnectionInterface $connection - * @return void + * @return bool */ - public function unsubscribe(ConnectionInterface $connection) + public function unsubscribe(ConnectionInterface $connection): bool { if (! isset($this->connections[$connection->socketId])) { - return; + return false; } unset($this->connections[$connection->socketId]); + + return true; } /** diff --git a/src/Channels/PresenceChannel.php b/src/Channels/PresenceChannel.php index eb81f35..c265f81 100644 --- a/src/Channels/PresenceChannel.php +++ b/src/Channels/PresenceChannel.php @@ -15,119 +15,124 @@ class PresenceChannel extends PrivateChannel * @see https://pusher.com/docs/pusher_protocol#presence-channel-events * @param \Ratchet\ConnectionInterface $connection * @param \stdClass $payload - * @return void + * @return bool * @throws InvalidSignature */ - public function subscribe(ConnectionInterface $connection, stdClass $payload) + public function subscribe(ConnectionInterface $connection, stdClass $payload): bool { $this->verifySignature($connection, $payload); $this->saveConnection($connection); - $this->channelManager->userJoinedPresenceChannel( - $connection, - $user = json_decode($payload->channel_data), - $this->getName(), - $payload - ); + $user = json_decode($payload->channel_data); $this->channelManager - ->getChannelMembers($connection->app->id, $this->getName()) - ->then(function ($users) use ($connection) { - $hash = []; + ->userJoinedPresenceChannel($connection, $user, $this->getName(), $payload) + ->then(function () use ($connection, $user) { + $this->channelManager + ->getChannelMembers($connection->app->id, $this->getName()) + ->then(function ($users) use ($connection) { + $hash = []; - foreach ($users as $socketId => $user) { - $hash[$user->user_id] = $user->user_info ?? []; - } + foreach ($users as $socketId => $user) { + $hash[$user->user_id] = $user->user_info ?? []; + } - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->getName(), - 'data' => json_encode([ - 'presence' => [ - 'ids' => collect($users)->map(function ($user) { - return (string) $user->user_id; - })->values(), - 'hash' => $hash, - 'count' => count($users), - ], - ]), - ])); + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->getName(), + 'data' => json_encode([ + 'presence' => [ + 'ids' => collect($users)->map(function ($user) { + return (string) $user->user_id; + })->values(), + 'hash' => $hash, + 'count' => count($users), + ], + ]), + ])); + }); + }) + ->then(function () use ($connection, $user, $payload) { + // The `pusher_internal:member_added` event is triggered when a user joins a channel. + // It's quite possible that a user can have multiple connections to the same channel + // (for example by having multiple browser tabs open) + // and in this case the events will only be triggered when the first tab is opened. + $this->channelManager + ->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) + ->then(function ($sockets) use ($payload, $connection) { + if (count($sockets) === 1) { + $memberAddedPayload = [ + 'event' => 'pusher_internal:member_added', + 'channel' => $this->getName(), + 'data' => $payload->channel_data, + ]; + + $this->broadcastToEveryoneExcept( + (object) $memberAddedPayload, $connection->socketId, + $connection->app->id + ); + } + + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ + 'socketId' => $connection->socketId, + 'channel' => $this->getName(), + 'duplicate-connection' => count($sockets) > 1, + ]); + }); }); - // The `pusher_internal:member_added` event is triggered when a user joins a channel. - // It's quite possible that a user can have multiple connections to the same channel - // (for example by having multiple browser tabs open) - // and in this case the events will only be triggered when the first tab is opened. - $this->channelManager - ->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) - ->then(function ($sockets) use ($payload, $connection) { - if (count($sockets) === 1) { - $memberAddedPayload = [ - 'event' => 'pusher_internal:member_added', - 'channel' => $this->getName(), - 'data' => $payload->channel_data, - ]; - - $this->broadcastToEveryoneExcept( - (object) $memberAddedPayload, $connection->socketId, - $connection->app->id - ); - } - - DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ - 'socketId' => $connection->socketId, - 'channel' => $this->getName(), - 'duplicate-connection' => count($sockets) > 1, - ]); - }); + return true; } /** * Unsubscribe connection from the channel. * * @param \Ratchet\ConnectionInterface $connection - * @return void + * @return bool */ - public function unsubscribe(ConnectionInterface $connection) + public function unsubscribe(ConnectionInterface $connection): bool { - parent::unsubscribe($connection); + $truth = parent::unsubscribe($connection); $this->channelManager ->getChannelMember($connection, $this->getName()) + ->then(function ($user) { + return @json_decode($user); + }) ->then(function ($user) use ($connection) { - $user = @json_decode($user); - if (! $user) { return; } - $this->channelManager->userLeftPresenceChannel( - $connection, $user, $this->getName() - ); - - // The `pusher_internal:member_removed` is triggered when a user leaves a channel. - // It's quite possible that a user can have multiple connections to the same channel - // (for example by having multiple browser tabs open) - // and in this case the events will only be triggered when the last one is closed. $this->channelManager - ->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) - ->then(function ($sockets) use ($connection, $user) { - if (count($sockets) === 0) { - $memberRemovedPayload = [ - 'event' => 'pusher_internal:member_removed', - 'channel' => $this->getName(), - 'data' => json_encode([ - 'user_id' => $user->user_id, - ]), - ]; + ->userLeftPresenceChannel($connection, $user, $this->getName()) + ->then(function () use ($connection, $user) { + // The `pusher_internal:member_removed` is triggered when a user leaves a channel. + // It's quite possible that a user can have multiple connections to the same channel + // (for example by having multiple browser tabs open) + // and in this case the events will only be triggered when the last one is closed. + $this->channelManager + ->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) + ->then(function ($sockets) use ($connection, $user) { + if (count($sockets) === 0) { + $memberRemovedPayload = [ + 'event' => 'pusher_internal:member_removed', + 'channel' => $this->getName(), + 'data' => json_encode([ + 'user_id' => $user->user_id, + ]), + ]; - $this->broadcastToEveryoneExcept( - (object) $memberRemovedPayload, $connection->socketId, - $connection->app->id - ); - } + $this->broadcastToEveryoneExcept( + (object) $memberRemovedPayload, $connection->socketId, + $connection->app->id + ); + } + }); }); }); + + return $truth; } } diff --git a/src/Channels/PrivateChannel.php b/src/Channels/PrivateChannel.php index e5d987c..93914e5 100644 --- a/src/Channels/PrivateChannel.php +++ b/src/Channels/PrivateChannel.php @@ -14,13 +14,13 @@ class PrivateChannel extends Channel * @see https://pusher.com/docs/pusher_protocol#presence-channel-events * @param \Ratchet\ConnectionInterface $connection * @param \stdClass $payload - * @return void + * @return bool * @throws InvalidSignature */ - public function subscribe(ConnectionInterface $connection, stdClass $payload) + public function subscribe(ConnectionInterface $connection, stdClass $payload): bool { $this->verifySignature($connection, $payload); - parent::subscribe($connection, $payload); + return parent::subscribe($connection, $payload); } } diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index 01d4a2c..50efe16 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -66,9 +66,9 @@ interface ChannelManager * Remove connection from all channels. * * @param \Ratchet\ConnectionInterface $connection - * @return void + * @return PromiseInterface[bool] */ - public function unsubscribeFromAllChannels(ConnectionInterface $connection); + public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface; /** * Subscribe the connection to a specific channel. @@ -76,9 +76,9 @@ interface ChannelManager * @param \Ratchet\ConnectionInterface $connection * @param string $channelName * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload); + public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface; /** * Unsubscribe the connection from the channel. @@ -86,26 +86,27 @@ interface ChannelManager * @param \Ratchet\ConnectionInterface $connection * @param string $channelName * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload); + public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface; /** - * Subscribe the connection to a specific channel. + * Subscribe the connection to a specific channel, returning + * a promise containing the amount of connections. * * @param string|int $appId - * @return void + * @return PromiseInterface[int] */ - public function subscribeToApp($appId); + public function subscribeToApp($appId): PromiseInterface; /** - * Unsubscribe the connection from the channel. + * Unsubscribe the connection from the channel, returning + * a promise containing the amount of connections after decrement. * - * @param \Ratchet\ConnectionInterface $connection * @param string|int $appId - * @return void + * @return PromiseInterface[int] */ - public function unsubscribeFromApp($appId); + public function unsubscribeFromApp($appId): PromiseInterface; /** * Get the connections count on the app @@ -113,7 +114,7 @@ interface ChannelManager * * @param string|int $appId * @param string|null $channelName - * @return \React\Promise\PromiseInterface + * @return PromiseInterface[int] */ public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface; @@ -123,7 +124,7 @@ interface ChannelManager * * @param string|int $appId * @param string|null $channelName - * @return \React\Promise\PromiseInterface + * @return PromiseInterface[int] */ public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface; @@ -135,9 +136,9 @@ interface ChannelManager * @param string $channel * @param stdClass $payload * @param string|null $serverId - * @return bool + * @return PromiseInterface[bool] */ - public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null); + public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface; /** * Handle the user when it joined a presence channel. @@ -146,9 +147,9 @@ interface ChannelManager * @param stdClass $user * @param string $channel * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload); + public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface; /** * Handle the user when it left a presence channel. @@ -157,9 +158,9 @@ interface ChannelManager * @param stdClass $user * @param string $channel * @param stdClass $payload - * @return void + * @return PromiseInterface[bool] */ - public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel); + public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface; /** * Get the presence channel members. @@ -202,14 +203,14 @@ interface ChannelManager * Keep tracking the connections availability when they pong. * * @param \Ratchet\ConnectionInterface $connection - * @return bool + * @return PromiseInterface[bool] */ - public function connectionPonged(ConnectionInterface $connection): bool; + public function connectionPonged(ConnectionInterface $connection): PromiseInterface; /** * Remove the obsolete connections that didn't ponged in a while. * - * @return bool + * @return PromiseInterface[bool] */ - public function removeObsoleteConnections(): bool; + public function removeObsoleteConnections(): PromiseInterface; } diff --git a/src/Helpers.php b/src/Helpers.php index 7354545..0afe7d8 100644 --- a/src/Helpers.php +++ b/src/Helpers.php @@ -2,8 +2,17 @@ namespace BeyondCode\LaravelWebSockets; +use React\Promise\PromiseInterface; + class Helpers { + /** + * The loop used to create the Fulfilled Promise. + * + * @var null|\React\EventLoop\LoopInterface + */ + public static $loop = null; + /** * Transform the Redis' list of key after value * to key-value pairs. @@ -23,4 +32,19 @@ class Helpers return array_combine($keys->all(), $values->all()); } + + /** + * Create a new fulfilled promise with a value. + * + * @param mixed $value + * @return \React\Promise\PromiseInterface + */ + public static function createFulfilledPromise($value): PromiseInterface + { + $resolver = config( + 'websockets.promise_resolver', \React\Promise\FulfilledPromise::class + ); + + return new $resolver($value, static::$loop); + } } diff --git a/src/Server/Messages/PusherChannelProtocolMessage.php b/src/Server/Messages/PusherChannelProtocolMessage.php index d70934b..6385d90 100644 --- a/src/Server/Messages/PusherChannelProtocolMessage.php +++ b/src/Server/Messages/PusherChannelProtocolMessage.php @@ -31,11 +31,11 @@ class PusherChannelProtocolMessage extends PusherClientMessage */ protected function ping(ConnectionInterface $connection) { - $connection->send(json_encode([ - 'event' => 'pusher:pong', - ])); - - $this->channelManager->connectionPonged($connection); + $this->channelManager + ->connectionPonged($connection) + ->then(function () use ($connection) { + $connection->send(json_encode(['event' => 'pusher:pong'])); + }); } /** diff --git a/src/Server/WebSocketHandler.php b/src/Server/WebSocketHandler.php index 0dbe8be..4b7f7bc 100644 --- a/src/Server/WebSocketHandler.php +++ b/src/Server/WebSocketHandler.php @@ -92,17 +92,19 @@ class WebSocketHandler implements MessageComponentInterface */ public function onClose(ConnectionInterface $connection) { - $this->channelManager->unsubscribeFromAllChannels($connection); + $this->channelManager + ->unsubscribeFromAllChannels($connection) + ->then(function (bool $unsubscribed) use ($connection) { + if (isset($connection->app)) { + StatisticsCollector::disconnection($connection->app->id); - if (isset($connection->app)) { - StatisticsCollector::disconnection($connection->app->id); + $this->channelManager->unsubscribeFromApp($connection->app->id); - $this->channelManager->unsubscribeFromApp($connection->app->id); - - DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_DISCONNECTED, [ - 'socketId' => $connection->socketId, - ]); - } + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_DISCONNECTED, [ + 'socketId' => $connection->socketId, + ]); + } + }); } /** diff --git a/src/Statistics/Collectors/MemoryCollector.php b/src/Statistics/Collectors/MemoryCollector.php index 049c001..23f52cd 100644 --- a/src/Statistics/Collectors/MemoryCollector.php +++ b/src/Statistics/Collectors/MemoryCollector.php @@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\Statistics\Collectors; use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\Contracts\StatisticsCollector; use BeyondCode\LaravelWebSockets\Facades\StatisticsStore; +use BeyondCode\LaravelWebSockets\Helpers; use BeyondCode\LaravelWebSockets\Statistics\Statistic; use React\Promise\FulfilledPromise; use React\Promise\PromiseInterface; @@ -126,7 +127,7 @@ class MemoryCollector implements StatisticsCollector */ public function getStatistics(): PromiseInterface { - return new FulfilledPromise($this->statistics); + return Helpers::createFulfilledPromise($this->statistics); } /** @@ -137,7 +138,7 @@ class MemoryCollector implements StatisticsCollector */ public function getAppStatistics($appId): PromiseInterface { - return new FulfilledPromise( + return Helpers::createFulfilledPromise( $this->statistics[$appId] ?? null ); } diff --git a/tests/Mocks/PromiseResolver.php b/tests/Mocks/PromiseResolver.php index dfec306..66f8480 100644 --- a/tests/Mocks/PromiseResolver.php +++ b/tests/Mocks/PromiseResolver.php @@ -2,7 +2,9 @@ namespace BeyondCode\LaravelWebSockets\Test\Mocks; +use BeyondCode\LaravelWebSockets\Helpers; use Clue\React\Block; +use React\EventLoop\LoopInterface; use React\Promise\FulfilledPromise; use React\Promise\PromiseInterface; @@ -25,13 +27,13 @@ class PromiseResolver implements PromiseInterface /** * Initialize the promise resolver. * - * @param PromiseInterface $promise + * @param mixed $promise * @param LoopInterface $loop * @return void */ - public function __construct($promise, $loop) + public function __construct($promise, LoopInterface $loop) { - $this->promise = $promise; + $this->promise = $promise instanceof PromiseInterface ? $promise : new FulfilledPromise($promise); $this->loop = $loop; } @@ -53,7 +55,7 @@ class PromiseResolver implements PromiseInterface return $result instanceof PromiseInterface ? new self($result, $this->loop) - : new self(new FulfilledPromise($result), $this->loop); + : new self(Helpers::createFulfilledPromise($result), $this->loop); } /** diff --git a/tests/TestCase.php b/tests/TestCase.php index da8dbae..6544731 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\Contracts\StatisticsCollector; use BeyondCode\LaravelWebSockets\Contracts\StatisticsStore; +use BeyondCode\LaravelWebSockets\Helpers; use GuzzleHttp\Psr7\Request; use Illuminate\Support\Facades\Redis; use Orchestra\Testbench\BrowserKit\TestCase as Orchestra; @@ -77,6 +78,8 @@ abstract class TestCase extends Orchestra $this->loadMigrationsFrom(__DIR__.'/database/migrations'); $this->withFactories(__DIR__.'/database/factories'); + $this->registerPromiseResolver(); + $this->registerManagers(); $this->registerStatisticsCollectors(); @@ -207,6 +210,21 @@ abstract class TestCase extends Orchestra ]); } + /** + * Register the test promise resolver. + * + * @return void + */ + protected function registerPromiseResolver() + { + Helpers::$loop = $this->loop; + + $this->app['config']->set( + 'websockets.promise_resolver', + \BeyondCode\LaravelWebSockets\Test\Mocks\PromiseResolver::class + ); + } + /** * Register the managers that are not resolved * by the package service provider.