Run promises one-after-another

This commit is contained in:
Alex Renoki 2020-09-19 14:16:26 +03:00
parent bf47275c9b
commit 5cb2ee9fce
14 changed files with 393 additions and 302 deletions

View File

@ -279,4 +279,19 @@ return [
], ],
/*
|--------------------------------------------------------------------------
| Promise Resolver
|--------------------------------------------------------------------------
|
| The promise resolver is a class that takes a input value and is
| able to make sure the PHP code runs async by using ->then(). You can
| use your own Promise Resolver. This is usually changed when you want to
| intercept values by the promises throughout the app, like in testing
| to switch from async to sync.
|
*/
'promise_resolver' => \React\Promise\FulfilledPromise::class,
]; ];

View File

@ -64,11 +64,9 @@ class FetchChannels extends Controller
} }
return $info; return $info;
}) })->sortBy(function ($content, $name) {
->sortBy(function ($content, $name) {
return $name; return $name;
}) })->all();
->all();
return [ return [
'channels' => $channels ?: new stdClass, 'channels' => $channels ?: new stdClass,

View File

@ -6,6 +6,7 @@ use BeyondCode\LaravelWebSockets\Channels\Channel;
use BeyondCode\LaravelWebSockets\Channels\PresenceChannel; use BeyondCode\LaravelWebSockets\Channels\PresenceChannel;
use BeyondCode\LaravelWebSockets\Channels\PrivateChannel; use BeyondCode\LaravelWebSockets\Channels\PrivateChannel;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use BeyondCode\LaravelWebSockets\Helpers;
use Illuminate\Support\Str; use Illuminate\Support\Str;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
@ -104,7 +105,7 @@ class LocalChannelManager implements ChannelManager
->values()->collapse() ->values()->collapse()
->toArray(); ->toArray();
return new FulfilledPromise($connections); return Helpers::createFulfilledPromise($connections);
} }
/** /**
@ -116,7 +117,7 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getLocalChannels($appId): PromiseInterface public function getLocalChannels($appId): PromiseInterface
{ {
return new FulfilledPromise( return Helpers::createFulfilledPromise(
$this->channels[$appId] ?? [] $this->channels[$appId] ?? []
); );
} }
@ -137,12 +138,12 @@ class LocalChannelManager implements ChannelManager
* Remove connection from all channels. * Remove connection from all channels.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return void * @return PromiseInterface[bool]
*/ */
public function unsubscribeFromAllChannels(ConnectionInterface $connection) public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
{ {
if (! isset($connection->app)) { if (! isset($connection->app)) {
return; return new FuilfilledPromise(false);
} }
$this->getLocalChannels($connection->app->id) $this->getLocalChannels($connection->app->id)
@ -162,6 +163,8 @@ class LocalChannelManager implements ChannelManager
unset($this->channels[$connection->app->id]); unset($this->channels[$connection->app->id]);
} }
}); });
return Helpers::createFulfilledPromise(true);
} }
/** /**
@ -170,13 +173,15 @@ class LocalChannelManager implements ChannelManager
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param string $channelName * @param string $channelName
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{ {
$channel = $this->findOrCreate($connection->app->id, $channelName); $channel = $this->findOrCreate($connection->app->id, $channelName);
$channel->subscribe($connection, $payload); return Helpers::createFulfilledPromise(
$channel->subscribe($connection, $payload)
);
} }
/** /**
@ -185,35 +190,39 @@ class LocalChannelManager implements ChannelManager
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param string $channelName * @param string $channelName
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{ {
$channel = $this->findOrCreate($connection->app->id, $channelName); $channel = $this->findOrCreate($connection->app->id, $channelName);
$channel->unsubscribe($connection, $payload); return Helpers::createFulfilledPromise(
$channel->unsubscribe($connection, $payload)
);
} }
/** /**
* Subscribe the connection to a specific channel. * Subscribe the connection to a specific channel, returning
* a promise containing the amount of connections.
* *
* @param string|int $appId * @param string|int $appId
* @return void * @return PromiseInterface[int]
*/ */
public function subscribeToApp($appId) public function subscribeToApp($appId): PromiseInterface
{ {
// return Helpers::createFulfilledPromise(0);
} }
/** /**
* Unsubscribe the connection from the channel. * Unsubscribe the connection from the channel, returning
* a promise containing the amount of connections after decrement.
* *
* @param string|int $appId * @param string|int $appId
* @return void * @return PromiseInterface[int]
*/ */
public function unsubscribeFromApp($appId) public function unsubscribeFromApp($appId): PromiseInterface
{ {
// return Helpers::createFulfilledPromise(0);
} }
/** /**
@ -222,14 +231,13 @@ class LocalChannelManager implements ChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param string|null $channelName * @param string|null $channelName
* @return \React\Promise\PromiseInterface * @return PromiseInterface[int]
*/ */
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface
{ {
return $this->getLocalChannels($appId) return $this->getLocalChannels($appId)
->then(function ($channels) use ($channelName) { ->then(function ($channels) use ($channelName) {
return collect($channels) return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) {
->when(! is_null($channelName), function ($collection) use ($channelName) {
return $collection->filter(function (Channel $channel) use ($channelName) { return $collection->filter(function (Channel $channel) use ($channelName) {
return $channel->getName() === $channelName; return $channel->getName() === $channelName;
}); });
@ -237,8 +245,7 @@ class LocalChannelManager implements ChannelManager
->flatMap(function (Channel $channel) { ->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId'); return collect($channel->getConnections())->pluck('socketId');
}) })
->unique() ->unique()->count();
->count();
}); });
} }
@ -248,7 +255,7 @@ class LocalChannelManager implements ChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param string|null $channelName * @param string|null $channelName
* @return \React\Promise\PromiseInterface * @return PromiseInterface[int]
*/ */
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
{ {
@ -263,11 +270,11 @@ class LocalChannelManager implements ChannelManager
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @param string|null $serverId * @param string|null $serverId
* @return bool * @return PromiseInterface[bool]
*/ */
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null) public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface
{ {
return true; return Helpers::createFulfilledPromise(true);
} }
/** /**
@ -277,12 +284,14 @@ class LocalChannelManager implements ChannelManager
* @param stdClass $user * @param stdClass $user
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload) public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface
{ {
$this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user); $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user);
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId; $this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId;
return Helpers::createFulfilledPromise(true);
} }
/** /**
@ -292,9 +301,9 @@ class LocalChannelManager implements ChannelManager
* @param stdClass $user * @param stdClass $user
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel) public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface
{ {
unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]); unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]);
@ -310,6 +319,8 @@ class LocalChannelManager implements ChannelManager
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]); unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]);
} }
} }
return Helpers::createFulfilledPromise(true);
} }
/** /**
@ -327,7 +338,7 @@ class LocalChannelManager implements ChannelManager
return json_decode($user); return json_decode($user);
})->unique('user_id')->toArray(); })->unique('user_id')->toArray();
return new FulfilledPromise($members); return Helpers::createFulfilledPromise($members);
} }
/** /**
@ -341,7 +352,7 @@ class LocalChannelManager implements ChannelManager
{ {
$member = $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] ?? null; $member = $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] ?? null;
return new FulfilledPromise($member); return Helpers::createFulfilledPromise($member);
} }
/** /**
@ -362,7 +373,7 @@ class LocalChannelManager implements ChannelManager
return $results; return $results;
}, []); }, []);
return new FulfilledPromise($results); return Helpers::createFulfilledPromise($results);
} }
/** /**
@ -375,7 +386,7 @@ class LocalChannelManager implements ChannelManager
*/ */
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
{ {
return new FulfilledPromise( return Helpers::createFulfilledPromise(
$this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? [] $this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? []
); );
} }
@ -384,21 +395,21 @@ class LocalChannelManager implements ChannelManager
* Keep tracking the connections availability when they pong. * Keep tracking the connections availability when they pong.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return bool * @return PromiseInterface[bool]
*/ */
public function connectionPonged(ConnectionInterface $connection): bool public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{ {
return true; return Helpers::createFulfilledPromise(true);
} }
/** /**
* Remove the obsolete connections that didn't ponged in a while. * Remove the obsolete connections that didn't ponged in a while.
* *
* @return bool * @return PromiseInterface[bool]
*/ */
public function removeObsoleteConnections(): bool public function removeObsoleteConnections(): PromiseInterface
{ {
return true; return Helpers::createFulfilledPromise(true);
} }
/** /**

View File

@ -132,20 +132,19 @@ class RedisChannelManager extends LocalChannelManager
* Remove connection from all channels. * Remove connection from all channels.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return void * @return PromiseInterface[bool]
*/ */
public function unsubscribeFromAllChannels(ConnectionInterface $connection) public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
{ {
$this->getGlobalChannels($connection->app->id) return $this->getGlobalChannels($connection->app->id)
->then(function ($channels) use ($connection) { ->then(function ($channels) use ($connection) {
foreach ($channels as $channel) { foreach ($channels as $channel) {
$this->unsubscribeFromChannel( $this->unsubscribeFromChannel($connection, $channel, new stdClass);
$connection, $channel, new stdClass
);
} }
})
->then(function () use ($connection) {
return parent::unsubscribeFromAllChannels($connection);
}); });
parent::unsubscribeFromAllChannels($connection);
} }
/** /**
@ -154,19 +153,23 @@ class RedisChannelManager extends LocalChannelManager
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param string $channelName * @param string $channelName
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{ {
$this->subscribeToTopic($connection->app->id, $channelName); return $this->subscribeToTopic($connection->app->id, $channelName)
->then(function () use ($connection) {
$this->addConnectionToSet($connection, Carbon::now()); return $this->addConnectionToSet($connection, Carbon::now());
})
$this->addChannelToSet($connection->app->id, $channelName); ->then(function () use ($connection, $channelName) {
return $this->addChannelToSet($connection->app->id, $channelName);
$this->incrementSubscriptionsCount($connection->app->id, $channelName, 1); })
->then(function () use ($connection, $channelName) {
parent::subscribeToChannel($connection, $channelName, $payload); return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1);
})
->then(function () use ($connection, $channelName, $payload) {
return parent::subscribeToChannel($connection, $channelName, $payload);
});
} }
/** /**
@ -175,11 +178,11 @@ class RedisChannelManager extends LocalChannelManager
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param string $channelName * @param string $channelName
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{ {
$this->getGlobalConnectionsCount($connection->app->id, $channelName) return $this->getGlobalConnectionsCount($connection->app->id, $channelName)
->then(function ($count) use ($connection, $channelName) { ->then(function ($count) use ($connection, $channelName) {
if ($count === 0) { if ($count === 0) {
// Make sure to not stay subscribed to the PubSub topic // Make sure to not stay subscribed to the PubSub topic
@ -195,39 +198,46 @@ class RedisChannelManager extends LocalChannelManager
$this->unsubscribeFromTopic($connection->app->id, $channelName); $this->unsubscribeFromTopic($connection->app->id, $channelName);
} }
}); });
})
$this->removeChannelFromSet($connection->app->id, $channelName); ->then(function () use ($connection, $channelName) {
return $this->removeChannelFromSet($connection->app->id, $channelName);
$this->removeConnectionFromSet($connection); })
->then(function () use($connection) {
return $this->removeConnectionFromSet($connection);
})
->then(function () use ($connection, $channelName, $payload) {
return parent::unsubscribeFromChannel($connection, $channelName, $payload);
}); });
parent::unsubscribeFromChannel($connection, $channelName, $payload);
} }
/** /**
* Subscribe the connection to a specific channel. * Subscribe the connection to a specific channel, returning
* a promise containing the amount of connections.
* *
* @param string|int $appId * @param string|int $appId
* @return void * @return PromiseInterface[int]
*/ */
public function subscribeToApp($appId) public function subscribeToApp($appId): PromiseInterface
{ {
$this->subscribeToTopic($appId); return $this->subscribeToTopic($appId)
->then(function () use ($appId) {
$this->incrementSubscriptionsCount($appId); return $this->incrementSubscriptionsCount($appId);
});
} }
/** /**
* Unsubscribe the connection from the channel. * Unsubscribe the connection from the channel, returning
* a promise containing the amount of connections after decrement.
* *
* @param string|int $appId * @param string|int $appId
* @return void * @return PromiseInterface[int]
*/ */
public function unsubscribeFromApp($appId) public function unsubscribeFromApp($appId): PromiseInterface
{ {
$this->unsubscribeFromTopic($appId); return $this->unsubscribeFromTopic($appId)
->then(function () use ($appId) {
$this->incrementSubscriptionsCount($appId, null, -1); return $this->decrementSubscriptionsCount($appId);
});
} }
/** /**
@ -236,7 +246,7 @@ class RedisChannelManager extends LocalChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param string|null $channelName * @param string|null $channelName
* @return \React\Promise\PromiseInterface * @return PromiseInterface[int]
*/ */
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface
{ {
@ -249,7 +259,7 @@ class RedisChannelManager extends LocalChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param string|null $channelName * @param string|null $channelName
* @return \React\Promise\PromiseInterface * @return PromiseInterface[int]
*/ */
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
{ {
@ -268,17 +278,19 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @param string|null $serverId * @param string|null $serverId
* @return bool * @return PromiseInterface[bool]
*/ */
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null) public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface
{ {
$payload->appId = $appId; $payload->appId = $appId;
$payload->socketId = $socketId; $payload->socketId = $socketId;
$payload->serverId = $serverId ?: $this->getServerId(); $payload->serverId = $serverId ?: $this->getServerId();
$this->publishClient->publish($this->getRedisKey($appId, $channel), json_encode($payload)); return $this->publishClient
->publish($this->getRedisKey($appId, $channel), json_encode($payload))
return true; ->then(function () use ($appId, $socketId, $channel, $payload, $serverId) {
return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId);
});
} }
/** /**
@ -288,17 +300,17 @@ class RedisChannelManager extends LocalChannelManager
* @param stdClass $user * @param stdClass $user
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface
*/ */
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload) public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface
{ {
$this->storeUserData( return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user))
$connection->app->id, $channel, $connection->socketId, json_encode($user) ->then(function () use ($connection, $channel, $user) {
); return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId);
})
$this->addUserSocket( ->then(function () use ($connection, $user, $channel, $payload) {
$connection->app->id, $channel, $user, $connection->socketId return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload);
); });
} }
/** /**
@ -308,17 +320,17 @@ class RedisChannelManager extends LocalChannelManager
* @param stdClass $user * @param stdClass $user
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel) public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface
{ {
$this->removeUserData( return $this->removeUserData($connection->app->id, $channel, $connection->socketId)
$connection->app->id, $channel, $connection->socketId ->then(function () use ($connection, $channel, $user) {
); return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId);
})
$this->removeUserSocket( ->then(function () use ($connection, $user, $channel) {
$connection->app->id, $channel, $user, $connection->socketId return parent::userLeftPresenceChannel($connection, $user, $channel);
); });
} }
/** /**
@ -326,19 +338,16 @@ class RedisChannelManager extends LocalChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param string $channel * @param string $channel
* @return \React\Promise\PromiseInterface * @return \React\Promise\PromiseInterface[array]
*/ */
public function getChannelMembers($appId, string $channel): PromiseInterface public function getChannelMembers($appId, string $channel): PromiseInterface
{ {
return $this->publishClient return $this->publishClient
->hgetall($this->getRedisKey($appId, $channel, ['users'])) ->hgetall($this->getRedisKey($appId, $channel, ['users']))
->then(function ($list) { ->then(function ($list) {
return collect(Helpers::redisListToArray($list)) return collect(Helpers::redisListToArray($list))->map(function ($user) {
->map(function ($user) {
return json_decode($user); return json_decode($user);
}) })->unique('user_id')->toArray();
->unique('user_id')
->toArray();
}); });
} }
@ -347,7 +356,7 @@ class RedisChannelManager extends LocalChannelManager
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param string $channel * @param string $channel
* @return \React\Promise\PromiseInterface * @return \React\Promise\PromiseInterface[null|array]
*/ */
public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface
{ {
@ -361,7 +370,7 @@ class RedisChannelManager extends LocalChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param array $channelNames * @param array $channelNames
* @return \React\Promise\PromiseInterface * @return \React\Promise\PromiseInterface[array]
*/ */
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface
{ {
@ -385,7 +394,7 @@ class RedisChannelManager extends LocalChannelManager
* @param string|int $userId * @param string|int $userId
* @param string|int $appId * @param string|int $appId
* @param string $channelName * @param string $channelName
* @return \React\Promise\PromiseInterface * @return \React\Promise\PromiseInterface[array]
*/ */
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
{ {
@ -398,30 +407,31 @@ class RedisChannelManager extends LocalChannelManager
* Keep tracking the connections availability when they pong. * Keep tracking the connections availability when they pong.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return bool * @return PromiseInterface[bool]
*/ */
public function connectionPonged(ConnectionInterface $connection): bool public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{ {
// This will update the score with the current timestamp. // This will update the score with the current timestamp.
$this->addConnectionToSet($connection, Carbon::now()); return $this->addConnectionToSet($connection, Carbon::now())
->then(function () use ($connection) {
return parent::connectionPonged($connection); return parent::connectionPonged($connection);
});
} }
/** /**
* Remove the obsolete connections that didn't ponged in a while. * Remove the obsolete connections that didn't ponged in a while.
* *
* @return bool * @return PromiseInterface[bool]
*/ */
public function removeObsoleteConnections(): bool public function removeObsoleteConnections(): PromiseInterface
{ {
$this->lock()->get(function () { $this->lock()->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) { ->then(function ($connections) {
foreach ($connections as $socketId => $appId) { foreach ($connections as $socketId => $appId) {
$this->unsubscribeFromAllChannels( $connection = $this->fakeConnectionForApp($appId, $socketId);
$this->fakeConnectionForApp($appId, $socketId)
); $this->unsubscribeFromAllChannels($connection);
} }
}); });
}); });
@ -514,7 +524,7 @@ class RedisChannelManager extends LocalChannelManager
* *
* @return string * @return string
*/ */
public function getServerId() public function getServerId(): string
{ {
return $this->serverId; return $this->serverId;
} }
@ -525,9 +535,9 @@ class RedisChannelManager extends LocalChannelManager
* @param string|int $appId * @param string|int $appId
* @param string|null $channel * @param string|null $channel
* @param int $increment * @param int $increment
* @return PromiseInterface * @return PromiseInterface[int]
*/ */
public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1) public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
{ {
return $this->publishClient->hincrby( return $this->publishClient->hincrby(
$this->getRedisKey($appId, $channel, ['stats']), 'connections', $increment $this->getRedisKey($appId, $channel, ['stats']), 'connections', $increment
@ -540,9 +550,9 @@ class RedisChannelManager extends LocalChannelManager
* @param string|int $appId * @param string|int $appId
* @param string|null $channel * @param string|null $channel
* @param int $decrement * @param int $decrement
* @return PromiseInterface * @return PromiseInterface[int]
*/ */
public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1) public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
{ {
return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1); return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1);
} }
@ -552,13 +562,13 @@ class RedisChannelManager extends LocalChannelManager
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param \DateTime|string|null $moment * @param \DateTime|string|null $moment
* @return void * @return PromiseInterface
*/ */
public function addConnectionToSet(ConnectionInterface $connection, $moment = null) public function addConnectionToSet(ConnectionInterface $connection, $moment = null): PromiseInterface
{ {
$moment = $moment ? Carbon::parse($moment) : Carbon::now(); $moment = $moment ? Carbon::parse($moment) : Carbon::now();
$this->publishClient->zadd( return $this->publishClient->zadd(
$this->getRedisKey(null, null, ['sockets']), $this->getRedisKey(null, null, ['sockets']),
$moment->format('U'), "{$connection->app->id}:{$connection->socketId}" $moment->format('U'), "{$connection->app->id}:{$connection->socketId}"
); );
@ -568,11 +578,11 @@ class RedisChannelManager extends LocalChannelManager
* Remove the connection from the sorted list. * Remove the connection from the sorted list.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return void * @return PromiseInterface
*/ */
public function removeConnectionFromSet(ConnectionInterface $connection) public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface
{ {
$this->publishClient->zrem( return $this->publishClient->zrem(
$this->getRedisKey(null, null, ['sockets']), $this->getRedisKey(null, null, ['sockets']),
"{$connection->app->id}:{$connection->socketId}" "{$connection->app->id}:{$connection->socketId}"
); );
@ -585,9 +595,9 @@ class RedisChannelManager extends LocalChannelManager
* @param int $start * @param int $start
* @param int $stop * @param int $stop
* @param bool $strict * @param bool $strict
* @return PromiseInterface * @return PromiseInterface[array]
*/ */
public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true) public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true): PromiseInterface
{ {
if ($strict) { if ($strict) {
$start = "({$start}"; $start = "({$start}";
@ -612,7 +622,7 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel * @param string $channel
* @return PromiseInterface * @return PromiseInterface
*/ */
public function addChannelToSet($appId, string $channel) public function addChannelToSet($appId, string $channel): PromiseInterface
{ {
return $this->publishClient->sadd( return $this->publishClient->sadd(
$this->getRedisKey($appId, null, ['channels']), $channel $this->getRedisKey($appId, null, ['channels']), $channel
@ -626,7 +636,7 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel * @param string $channel
* @return PromiseInterface * @return PromiseInterface
*/ */
public function removeChannelFromSet($appId, string $channel) public function removeChannelFromSet($appId, string $channel): PromiseInterface
{ {
return $this->publishClient->srem( return $this->publishClient->srem(
$this->getRedisKey($appId, null, ['channels']), $channel $this->getRedisKey($appId, null, ['channels']), $channel
@ -642,9 +652,9 @@ class RedisChannelManager extends LocalChannelManager
* @param string $data * @param string $data
* @return PromiseInterface * @return PromiseInterface
*/ */
public function storeUserData($appId, string $channel = null, string $key, $data) public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface
{ {
$this->publishClient->hset( return $this->publishClient->hset(
$this->getRedisKey($appId, $channel, ['users']), $key, $data $this->getRedisKey($appId, $channel, ['users']), $key, $data
); );
} }
@ -657,7 +667,7 @@ class RedisChannelManager extends LocalChannelManager
* @param string $key * @param string $key
* @return PromiseInterface * @return PromiseInterface
*/ */
public function removeUserData($appId, string $channel = null, string $key) public function removeUserData($appId, string $channel = null, string $key): PromiseInterface
{ {
return $this->publishClient->hdel( return $this->publishClient->hdel(
$this->getRedisKey($appId, $channel, ['users']), $key $this->getRedisKey($appId, $channel, ['users']), $key
@ -669,11 +679,11 @@ class RedisChannelManager extends LocalChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param string|null $channel * @param string|null $channel
* @return void * @return PromiseInterface
*/ */
public function subscribeToTopic($appId, string $channel = null) public function subscribeToTopic($appId, string $channel = null): PromiseInterface
{ {
$this->subscribeClient->subscribe( return $this->subscribeClient->subscribe(
$this->getRedisKey($appId, $channel) $this->getRedisKey($appId, $channel)
); );
} }
@ -683,11 +693,11 @@ class RedisChannelManager extends LocalChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param string|null $channel * @param string|null $channel
* @return void * @return PromiseInterface
*/ */
public function unsubscribeFromTopic($appId, string $channel = null) public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface
{ {
$this->subscribeClient->unsubscribe( return $this->subscribeClient->unsubscribe(
$this->getRedisKey($appId, $channel) $this->getRedisKey($appId, $channel)
); );
} }
@ -699,11 +709,11 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel * @param string $channel
* @param stdClass $user * @param stdClass $user
* @param string $socketId * @param string $socketId
* @return void * @return PromiseInterface
*/ */
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId) protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
{ {
$this->publishClient->sadd( return $this->publishClient->sadd(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId $this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId
); );
} }
@ -715,11 +725,11 @@ class RedisChannelManager extends LocalChannelManager
* @param string $channel * @param string $channel
* @param stdClass $user * @param stdClass $user
* @param string $socketId * @param string $socketId
* @return void * @return PromiseInterface
*/ */
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId) protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
{ {
$this->publishClient->srem( return $this->publishClient->srem(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId $this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), $socketId
); );
} }

View File

@ -73,9 +73,9 @@ class Channel
* @see https://pusher.com/docs/pusher_protocol#presence-channel-events * @see https://pusher.com/docs/pusher_protocol#presence-channel-events
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload * @param \stdClass $payload
* @return void * @return bool
*/ */
public function subscribe(ConnectionInterface $connection, stdClass $payload) public function subscribe(ConnectionInterface $connection, stdClass $payload): bool
{ {
$this->saveConnection($connection); $this->saveConnection($connection);
@ -88,21 +88,25 @@ class Channel
'socketId' => $connection->socketId, 'socketId' => $connection->socketId,
'channel' => $this->getName(), 'channel' => $this->getName(),
]); ]);
return true;
} }
/** /**
* Unsubscribe connection from the channel. * Unsubscribe connection from the channel.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return void * @return bool
*/ */
public function unsubscribe(ConnectionInterface $connection) public function unsubscribe(ConnectionInterface $connection): bool
{ {
if (! isset($this->connections[$connection->socketId])) { if (! isset($this->connections[$connection->socketId])) {
return; return false;
} }
unset($this->connections[$connection->socketId]); unset($this->connections[$connection->socketId]);
return true;
} }
/** /**

View File

@ -15,22 +15,20 @@ class PresenceChannel extends PrivateChannel
* @see https://pusher.com/docs/pusher_protocol#presence-channel-events * @see https://pusher.com/docs/pusher_protocol#presence-channel-events
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload * @param \stdClass $payload
* @return void * @return bool
* @throws InvalidSignature * @throws InvalidSignature
*/ */
public function subscribe(ConnectionInterface $connection, stdClass $payload) public function subscribe(ConnectionInterface $connection, stdClass $payload): bool
{ {
$this->verifySignature($connection, $payload); $this->verifySignature($connection, $payload);
$this->saveConnection($connection); $this->saveConnection($connection);
$this->channelManager->userJoinedPresenceChannel( $user = json_decode($payload->channel_data);
$connection,
$user = json_decode($payload->channel_data),
$this->getName(),
$payload
);
$this->channelManager
->userJoinedPresenceChannel($connection, $user, $this->getName(), $payload)
->then(function () use ($connection, $user) {
$this->channelManager $this->channelManager
->getChannelMembers($connection->app->id, $this->getName()) ->getChannelMembers($connection->app->id, $this->getName())
->then(function ($users) use ($connection) { ->then(function ($users) use ($connection) {
@ -54,7 +52,8 @@ class PresenceChannel extends PrivateChannel
]), ]),
])); ]));
}); });
})
->then(function () use ($connection, $user, $payload) {
// The `pusher_internal:member_added` event is triggered when a user joins a channel. // The `pusher_internal:member_added` event is triggered when a user joins a channel.
// It's quite possible that a user can have multiple connections to the same channel // It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open) // (for example by having multiple browser tabs open)
@ -81,31 +80,34 @@ class PresenceChannel extends PrivateChannel
'duplicate-connection' => count($sockets) > 1, 'duplicate-connection' => count($sockets) > 1,
]); ]);
}); });
});
return true;
} }
/** /**
* Unsubscribe connection from the channel. * Unsubscribe connection from the channel.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return void * @return bool
*/ */
public function unsubscribe(ConnectionInterface $connection) public function unsubscribe(ConnectionInterface $connection): bool
{ {
parent::unsubscribe($connection); $truth = parent::unsubscribe($connection);
$this->channelManager $this->channelManager
->getChannelMember($connection, $this->getName()) ->getChannelMember($connection, $this->getName())
->then(function ($user) {
return @json_decode($user);
})
->then(function ($user) use ($connection) { ->then(function ($user) use ($connection) {
$user = @json_decode($user);
if (! $user) { if (! $user) {
return; return;
} }
$this->channelManager->userLeftPresenceChannel( $this->channelManager
$connection, $user, $this->getName() ->userLeftPresenceChannel($connection, $user, $this->getName())
); ->then(function () use ($connection, $user) {
// The `pusher_internal:member_removed` is triggered when a user leaves a channel. // The `pusher_internal:member_removed` is triggered when a user leaves a channel.
// It's quite possible that a user can have multiple connections to the same channel // It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open) // (for example by having multiple browser tabs open)
@ -129,5 +131,8 @@ class PresenceChannel extends PrivateChannel
} }
}); });
}); });
});
return $truth;
} }
} }

View File

@ -14,13 +14,13 @@ class PrivateChannel extends Channel
* @see https://pusher.com/docs/pusher_protocol#presence-channel-events * @see https://pusher.com/docs/pusher_protocol#presence-channel-events
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload * @param \stdClass $payload
* @return void * @return bool
* @throws InvalidSignature * @throws InvalidSignature
*/ */
public function subscribe(ConnectionInterface $connection, stdClass $payload) public function subscribe(ConnectionInterface $connection, stdClass $payload): bool
{ {
$this->verifySignature($connection, $payload); $this->verifySignature($connection, $payload);
parent::subscribe($connection, $payload); return parent::subscribe($connection, $payload);
} }
} }

View File

@ -66,9 +66,9 @@ interface ChannelManager
* Remove connection from all channels. * Remove connection from all channels.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return void * @return PromiseInterface[bool]
*/ */
public function unsubscribeFromAllChannels(ConnectionInterface $connection); public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface;
/** /**
* Subscribe the connection to a specific channel. * Subscribe the connection to a specific channel.
@ -76,9 +76,9 @@ interface ChannelManager
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param string $channelName * @param string $channelName
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload); public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface;
/** /**
* Unsubscribe the connection from the channel. * Unsubscribe the connection from the channel.
@ -86,26 +86,27 @@ interface ChannelManager
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @param string $channelName * @param string $channelName
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload); public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface;
/** /**
* Subscribe the connection to a specific channel. * Subscribe the connection to a specific channel, returning
* a promise containing the amount of connections.
* *
* @param string|int $appId * @param string|int $appId
* @return void * @return PromiseInterface[int]
*/ */
public function subscribeToApp($appId); public function subscribeToApp($appId): PromiseInterface;
/** /**
* Unsubscribe the connection from the channel. * Unsubscribe the connection from the channel, returning
* a promise containing the amount of connections after decrement.
* *
* @param \Ratchet\ConnectionInterface $connection
* @param string|int $appId * @param string|int $appId
* @return void * @return PromiseInterface[int]
*/ */
public function unsubscribeFromApp($appId); public function unsubscribeFromApp($appId): PromiseInterface;
/** /**
* Get the connections count on the app * Get the connections count on the app
@ -113,7 +114,7 @@ interface ChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param string|null $channelName * @param string|null $channelName
* @return \React\Promise\PromiseInterface * @return PromiseInterface[int]
*/ */
public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface; public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface;
@ -123,7 +124,7 @@ interface ChannelManager
* *
* @param string|int $appId * @param string|int $appId
* @param string|null $channelName * @param string|null $channelName
* @return \React\Promise\PromiseInterface * @return PromiseInterface[int]
*/ */
public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface; public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface;
@ -135,9 +136,9 @@ interface ChannelManager
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @param string|null $serverId * @param string|null $serverId
* @return bool * @return PromiseInterface[bool]
*/ */
public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null); public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface;
/** /**
* Handle the user when it joined a presence channel. * Handle the user when it joined a presence channel.
@ -146,9 +147,9 @@ interface ChannelManager
* @param stdClass $user * @param stdClass $user
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload); public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface;
/** /**
* Handle the user when it left a presence channel. * Handle the user when it left a presence channel.
@ -157,9 +158,9 @@ interface ChannelManager
* @param stdClass $user * @param stdClass $user
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @return void * @return PromiseInterface[bool]
*/ */
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel); public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface;
/** /**
* Get the presence channel members. * Get the presence channel members.
@ -202,14 +203,14 @@ interface ChannelManager
* Keep tracking the connections availability when they pong. * Keep tracking the connections availability when they pong.
* *
* @param \Ratchet\ConnectionInterface $connection * @param \Ratchet\ConnectionInterface $connection
* @return bool * @return PromiseInterface[bool]
*/ */
public function connectionPonged(ConnectionInterface $connection): bool; public function connectionPonged(ConnectionInterface $connection): PromiseInterface;
/** /**
* Remove the obsolete connections that didn't ponged in a while. * Remove the obsolete connections that didn't ponged in a while.
* *
* @return bool * @return PromiseInterface[bool]
*/ */
public function removeObsoleteConnections(): bool; public function removeObsoleteConnections(): PromiseInterface;
} }

View File

@ -2,8 +2,17 @@
namespace BeyondCode\LaravelWebSockets; namespace BeyondCode\LaravelWebSockets;
use React\Promise\PromiseInterface;
class Helpers class Helpers
{ {
/**
* The loop used to create the Fulfilled Promise.
*
* @var null|\React\EventLoop\LoopInterface
*/
public static $loop = null;
/** /**
* Transform the Redis' list of key after value * Transform the Redis' list of key after value
* to key-value pairs. * to key-value pairs.
@ -23,4 +32,19 @@ class Helpers
return array_combine($keys->all(), $values->all()); return array_combine($keys->all(), $values->all());
} }
/**
* Create a new fulfilled promise with a value.
*
* @param mixed $value
* @return \React\Promise\PromiseInterface
*/
public static function createFulfilledPromise($value): PromiseInterface
{
$resolver = config(
'websockets.promise_resolver', \React\Promise\FulfilledPromise::class
);
return new $resolver($value, static::$loop);
}
} }

View File

@ -31,11 +31,11 @@ class PusherChannelProtocolMessage extends PusherClientMessage
*/ */
protected function ping(ConnectionInterface $connection) protected function ping(ConnectionInterface $connection)
{ {
$connection->send(json_encode([ $this->channelManager
'event' => 'pusher:pong', ->connectionPonged($connection)
])); ->then(function () use ($connection) {
$connection->send(json_encode(['event' => 'pusher:pong']));
$this->channelManager->connectionPonged($connection); });
} }
/** /**

View File

@ -92,8 +92,9 @@ class WebSocketHandler implements MessageComponentInterface
*/ */
public function onClose(ConnectionInterface $connection) public function onClose(ConnectionInterface $connection)
{ {
$this->channelManager->unsubscribeFromAllChannels($connection); $this->channelManager
->unsubscribeFromAllChannels($connection)
->then(function (bool $unsubscribed) use ($connection) {
if (isset($connection->app)) { if (isset($connection->app)) {
StatisticsCollector::disconnection($connection->app->id); StatisticsCollector::disconnection($connection->app->id);
@ -103,6 +104,7 @@ class WebSocketHandler implements MessageComponentInterface
'socketId' => $connection->socketId, 'socketId' => $connection->socketId,
]); ]);
} }
});
} }
/** /**

View File

@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\Statistics\Collectors;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use BeyondCode\LaravelWebSockets\Contracts\StatisticsCollector; use BeyondCode\LaravelWebSockets\Contracts\StatisticsCollector;
use BeyondCode\LaravelWebSockets\Facades\StatisticsStore; use BeyondCode\LaravelWebSockets\Facades\StatisticsStore;
use BeyondCode\LaravelWebSockets\Helpers;
use BeyondCode\LaravelWebSockets\Statistics\Statistic; use BeyondCode\LaravelWebSockets\Statistics\Statistic;
use React\Promise\FulfilledPromise; use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface; use React\Promise\PromiseInterface;
@ -126,7 +127,7 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function getStatistics(): PromiseInterface public function getStatistics(): PromiseInterface
{ {
return new FulfilledPromise($this->statistics); return Helpers::createFulfilledPromise($this->statistics);
} }
/** /**
@ -137,7 +138,7 @@ class MemoryCollector implements StatisticsCollector
*/ */
public function getAppStatistics($appId): PromiseInterface public function getAppStatistics($appId): PromiseInterface
{ {
return new FulfilledPromise( return Helpers::createFulfilledPromise(
$this->statistics[$appId] ?? null $this->statistics[$appId] ?? null
); );
} }

View File

@ -2,7 +2,9 @@
namespace BeyondCode\LaravelWebSockets\Test\Mocks; namespace BeyondCode\LaravelWebSockets\Test\Mocks;
use BeyondCode\LaravelWebSockets\Helpers;
use Clue\React\Block; use Clue\React\Block;
use React\EventLoop\LoopInterface;
use React\Promise\FulfilledPromise; use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface; use React\Promise\PromiseInterface;
@ -25,13 +27,13 @@ class PromiseResolver implements PromiseInterface
/** /**
* Initialize the promise resolver. * Initialize the promise resolver.
* *
* @param PromiseInterface $promise * @param mixed $promise
* @param LoopInterface $loop * @param LoopInterface $loop
* @return void * @return void
*/ */
public function __construct($promise, $loop) public function __construct($promise, LoopInterface $loop)
{ {
$this->promise = $promise; $this->promise = $promise instanceof PromiseInterface ? $promise : new FulfilledPromise($promise);
$this->loop = $loop; $this->loop = $loop;
} }
@ -53,7 +55,7 @@ class PromiseResolver implements PromiseInterface
return $result instanceof PromiseInterface return $result instanceof PromiseInterface
? new self($result, $this->loop) ? new self($result, $this->loop)
: new self(new FulfilledPromise($result), $this->loop); : new self(Helpers::createFulfilledPromise($result), $this->loop);
} }
/** /**

View File

@ -5,6 +5,7 @@ namespace BeyondCode\LaravelWebSockets\Test;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use BeyondCode\LaravelWebSockets\Contracts\StatisticsCollector; use BeyondCode\LaravelWebSockets\Contracts\StatisticsCollector;
use BeyondCode\LaravelWebSockets\Contracts\StatisticsStore; use BeyondCode\LaravelWebSockets\Contracts\StatisticsStore;
use BeyondCode\LaravelWebSockets\Helpers;
use GuzzleHttp\Psr7\Request; use GuzzleHttp\Psr7\Request;
use Illuminate\Support\Facades\Redis; use Illuminate\Support\Facades\Redis;
use Orchestra\Testbench\BrowserKit\TestCase as Orchestra; use Orchestra\Testbench\BrowserKit\TestCase as Orchestra;
@ -77,6 +78,8 @@ abstract class TestCase extends Orchestra
$this->loadMigrationsFrom(__DIR__.'/database/migrations'); $this->loadMigrationsFrom(__DIR__.'/database/migrations');
$this->withFactories(__DIR__.'/database/factories'); $this->withFactories(__DIR__.'/database/factories');
$this->registerPromiseResolver();
$this->registerManagers(); $this->registerManagers();
$this->registerStatisticsCollectors(); $this->registerStatisticsCollectors();
@ -207,6 +210,21 @@ abstract class TestCase extends Orchestra
]); ]);
} }
/**
* Register the test promise resolver.
*
* @return void
*/
protected function registerPromiseResolver()
{
Helpers::$loop = $this->loop;
$this->app['config']->set(
'websockets.promise_resolver',
\BeyondCode\LaravelWebSockets\Test\Mocks\PromiseResolver::class
);
}
/** /**
* Register the managers that are not resolved * Register the managers that are not resolved
* by the package service provider. * by the package service provider.