wip docblocks

This commit is contained in:
Alex Renoki 2020-08-14 08:42:17 +03:00
parent 5838acad30
commit 5997dd4df8
5 changed files with 105 additions and 80 deletions

View File

@ -20,7 +20,7 @@ class LocalClient implements ReplicationInterface
/** /**
* Boot the pub/sub provider (open connections, initial subscriptions, etc). * Boot the pub/sub provider (open connections, initial subscriptions, etc).
* *
* @param LoopInterface $loop * @param LoopInterface $loop
* @return self * @return self
*/ */
public function boot(LoopInterface $loop): ReplicationInterface public function boot(LoopInterface $loop): ReplicationInterface
@ -31,9 +31,9 @@ class LocalClient implements ReplicationInterface
/** /**
* Publish a payload on a specific channel, for a specific app. * Publish a payload on a specific channel, for a specific app.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @return bool * @return bool
*/ */
public function publish(string $appId, string $channel, stdClass $payload): bool public function publish(string $appId, string $channel, stdClass $payload): bool
@ -44,8 +44,8 @@ class LocalClient implements ReplicationInterface
/** /**
* Subscribe to receive messages for a channel. * Subscribe to receive messages for a channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @return bool * @return bool
*/ */
public function subscribe(string $appId, string $channel): bool public function subscribe(string $appId, string $channel): bool
@ -56,8 +56,8 @@ class LocalClient implements ReplicationInterface
/** /**
* Unsubscribe from a channel. * Unsubscribe from a channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @return bool * @return bool
*/ */
public function unsubscribe(string $appId, string $channel): bool public function unsubscribe(string $appId, string $channel): bool
@ -69,10 +69,11 @@ class LocalClient implements ReplicationInterface
* Add a member to a channel. To be called when they have * Add a member to a channel. To be called when they have
* subscribed to the channel. * subscribed to the channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @param string $socketId * @param string $socketId
* @param string $data * @param string $data
* @return void
*/ */
public function joinChannel(string $appId, string $channel, string $socketId, string $data) public function joinChannel(string $appId, string $channel, string $socketId, string $data)
{ {
@ -83,13 +84,15 @@ class LocalClient implements ReplicationInterface
* Remove a member from the channel. To be called when they have * Remove a member from the channel. To be called when they have
* unsubscribed from the channel. * unsubscribed from the channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @param string $socketId * @param string $socketId
* @return void
*/ */
public function leaveChannel(string $appId, string $channel, string $socketId) public function leaveChannel(string $appId, string $channel, string $socketId)
{ {
unset($this->channelData["$appId:$channel"][$socketId]); unset($this->channelData["$appId:$channel"][$socketId]);
if (empty($this->channelData["$appId:$channel"])) { if (empty($this->channelData["$appId:$channel"])) {
unset($this->channelData["$appId:$channel"]); unset($this->channelData["$appId:$channel"]);
} }
@ -98,15 +101,14 @@ class LocalClient implements ReplicationInterface
/** /**
* Retrieve the full information about the members in a presence channel. * Retrieve the full information about the members in a presence channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @return PromiseInterface * @return PromiseInterface
*/ */
public function channelMembers(string $appId, string $channel): PromiseInterface public function channelMembers(string $appId, string $channel): PromiseInterface
{ {
$members = $this->channelData["$appId:$channel"] ?? []; $members = $this->channelData["$appId:$channel"] ?? [];
// The data is expected as objects, so we need to JSON decode
$members = array_map(function ($user) { $members = array_map(function ($user) {
return json_decode($user); return json_decode($user);
}, $members); }, $members);
@ -117,8 +119,8 @@ class LocalClient implements ReplicationInterface
/** /**
* Get the amount of users subscribed for each presence channel. * Get the amount of users subscribed for each presence channel.
* *
* @param string $appId * @param string $appId
* @param array $channelNames * @param array $channelNames
* @return PromiseInterface * @return PromiseInterface
*/ */
public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface

View File

@ -54,7 +54,7 @@ class RedisClient implements ReplicationInterface
/** /**
* Boot the RedisClient, initializing the connections. * Boot the RedisClient, initializing the connections.
* *
* @param LoopInterface $loop * @param LoopInterface $loop
* @return ReplicationInterface * @return ReplicationInterface
*/ */
public function boot(LoopInterface $loop): ReplicationInterface public function boot(LoopInterface $loop): ReplicationInterface
@ -77,8 +77,9 @@ class RedisClient implements ReplicationInterface
/** /**
* Handle a message received from Redis on a specific channel. * Handle a message received from Redis on a specific channel.
* *
* @param string $redisChannel * @param string $redisChannel
* @param string $payload * @param string $payload
* @return void
*/ */
protected function onMessage(string $redisChannel, string $payload) protected function onMessage(string $redisChannel, string $payload)
{ {
@ -123,8 +124,8 @@ class RedisClient implements ReplicationInterface
/** /**
* Subscribe to a channel on behalf of websocket user. * Subscribe to a channel on behalf of websocket user.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @return bool * @return bool
*/ */
public function subscribe(string $appId, string $channel): bool public function subscribe(string $appId, string $channel): bool
@ -144,8 +145,8 @@ class RedisClient implements ReplicationInterface
/** /**
* Unsubscribe from a channel on behalf of a websocket user. * Unsubscribe from a channel on behalf of a websocket user.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @return bool * @return bool
*/ */
public function unsubscribe(string $appId, string $channel): bool public function unsubscribe(string $appId, string $channel): bool
@ -169,9 +170,9 @@ class RedisClient implements ReplicationInterface
/** /**
* Publish a message to a channel on behalf of a websocket user. * Publish a message to a channel on behalf of a websocket user.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @return bool * @return bool
*/ */
public function publish(string $appId, string $channel, stdClass $payload): bool public function publish(string $appId, string $channel, stdClass $payload): bool
@ -188,10 +189,11 @@ class RedisClient implements ReplicationInterface
* Add a member to a channel. To be called when they have * Add a member to a channel. To be called when they have
* subscribed to the channel. * subscribed to the channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @param string $socketId * @param string $socketId
* @param string $data * @param string $data
* @return void
*/ */
public function joinChannel(string $appId, string $channel, string $socketId, string $data) public function joinChannel(string $appId, string $channel, string $socketId, string $data)
{ {
@ -202,9 +204,10 @@ class RedisClient implements ReplicationInterface
* Remove a member from the channel. To be called when they have * Remove a member from the channel. To be called when they have
* unsubscribed from the channel. * unsubscribed from the channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @param string $socketId * @param string $socketId
* @return void
*/ */
public function leaveChannel(string $appId, string $channel, string $socketId) public function leaveChannel(string $appId, string $channel, string $socketId)
{ {
@ -214,8 +217,8 @@ class RedisClient implements ReplicationInterface
/** /**
* Retrieve the full information about the members in a presence channel. * Retrieve the full information about the members in a presence channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @return PromiseInterface * @return PromiseInterface
*/ */
public function channelMembers(string $appId, string $channel): PromiseInterface public function channelMembers(string $appId, string $channel): PromiseInterface
@ -232,8 +235,8 @@ class RedisClient implements ReplicationInterface
/** /**
* Get the amount of users subscribed for each presence channel. * Get the amount of users subscribed for each presence channel.
* *
* @param string $appId * @param string $appId
* @param array $channelNames * @param array $channelNames
* @return PromiseInterface * @return PromiseInterface
*/ */
public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface

View File

@ -11,7 +11,7 @@ interface ReplicationInterface
/** /**
* Boot the pub/sub provider (open connections, initial subscriptions, etc). * Boot the pub/sub provider (open connections, initial subscriptions, etc).
* *
* @param LoopInterface $loop * @param LoopInterface $loop
* @return self * @return self
*/ */
public function boot(LoopInterface $loop): self; public function boot(LoopInterface $loop): self;
@ -19,9 +19,9 @@ interface ReplicationInterface
/** /**
* Publish a payload on a specific channel, for a specific app. * Publish a payload on a specific channel, for a specific app.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @param stdClass $payload * @param stdClass $payload
* @return bool * @return bool
*/ */
public function publish(string $appId, string $channel, stdClass $payload): bool; public function publish(string $appId, string $channel, stdClass $payload): bool;
@ -29,8 +29,8 @@ interface ReplicationInterface
/** /**
* Subscribe to receive messages for a channel. * Subscribe to receive messages for a channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @return bool * @return bool
*/ */
public function subscribe(string $appId, string $channel): bool; public function subscribe(string $appId, string $channel): bool;
@ -38,8 +38,8 @@ interface ReplicationInterface
/** /**
* Unsubscribe from a channel. * Unsubscribe from a channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @return bool * @return bool
*/ */
public function unsubscribe(string $appId, string $channel): bool; public function unsubscribe(string $appId, string $channel): bool;
@ -48,10 +48,11 @@ interface ReplicationInterface
* Add a member to a channel. To be called when they have * Add a member to a channel. To be called when they have
* subscribed to the channel. * subscribed to the channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @param string $socketId * @param string $socketId
* @param string $data * @param string $data
* @return void
*/ */
public function joinChannel(string $appId, string $channel, string $socketId, string $data); public function joinChannel(string $appId, string $channel, string $socketId, string $data);
@ -59,17 +60,18 @@ interface ReplicationInterface
* Remove a member from the channel. To be called when they have * Remove a member from the channel. To be called when they have
* unsubscribed from the channel. * unsubscribed from the channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @param string $socketId * @param string $socketId
* @return void
*/ */
public function leaveChannel(string $appId, string $channel, string $socketId); public function leaveChannel(string $appId, string $channel, string $socketId);
/** /**
* Retrieve the full information about the members in a presence channel. * Retrieve the full information about the members in a presence channel.
* *
* @param string $appId * @param string $appId
* @param string $channel * @param string $channel
* @return PromiseInterface * @return PromiseInterface
*/ */
public function channelMembers(string $appId, string $channel): PromiseInterface; public function channelMembers(string $appId, string $channel): PromiseInterface;
@ -77,8 +79,8 @@ interface ReplicationInterface
/** /**
* Get the amount of users subscribed for each presence channel. * Get the amount of users subscribed for each presence channel.
* *
* @param string $appId * @param string $appId
* @param array $channelNames * @param array $channelNames
* @return PromiseInterface * @return PromiseInterface
*/ */
public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface; public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface;

View File

@ -67,20 +67,18 @@ class Channel
{ {
$this->saveConnection($connection); $this->saveConnection($connection);
// Subscribe to broadcasted messages from the pub/sub backend
$this->replicator->subscribe($connection->app->id, $this->channelName);
$connection->send(json_encode([ $connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded', 'event' => 'pusher_internal:subscription_succeeded',
'channel' => $this->channelName, 'channel' => $this->channelName,
])); ]));
$this->replicator->subscribe($connection->app->id, $this->channelName);
} }
public function unsubscribe(ConnectionInterface $connection) public function unsubscribe(ConnectionInterface $connection)
{ {
unset($this->subscribedConnections[$connection->socketId]); unset($this->subscribedConnections[$connection->socketId]);
// Unsubscribe from the pub/sub backend
$this->replicator->unsubscribe($connection->app->id, $this->channelName); $this->replicator->unsubscribe($connection->app->id, $this->channelName);
if (! $this->hasConnections()) { if (! $this->hasConnections()) {

View File

@ -22,22 +22,24 @@ class PresenceChannel extends Channel
protected $users = []; protected $users = [];
/** /**
* @param string $appId * Get the members in the presence channel.
*
* @param string $appId
* @return PromiseInterface * @return PromiseInterface
*/ */
public function getUsers(string $appId) public function getUsers(string $appId)
{ {
// Get the members list from the replication backend return $this->replicator->channelMembers($appId, $this->channelName);
return $this->replicator
->channelMembers($appId, $this->channelName);
} }
/** /**
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events * Subscribe the connection to the channel.
* *
* @param ConnectionInterface $connection * @param ConnectionInterface $connection
* @param stdClass $payload * @param stdClass $payload
* @return void
* @throws InvalidSignature * @throws InvalidSignature
* @see https://pusher.com/docs/pusher_protocol#presence-channel-events
*/ */
public function subscribe(ConnectionInterface $connection, stdClass $payload) public function subscribe(ConnectionInterface $connection, stdClass $payload)
{ {
@ -49,20 +51,18 @@ class PresenceChannel extends Channel
$this->users[$connection->socketId] = $channelData; $this->users[$connection->socketId] = $channelData;
// Add the connection as a member of the channel // Add the connection as a member of the channel
$this->replicator $this->replicator->joinChannel(
->joinChannel( $connection->app->id,
$connection->app->id, $this->channelName,
$this->channelName, $connection->socketId,
$connection->socketId, json_encode($channelData)
json_encode($channelData) );
);
// We need to pull the channel data from the replication backend, // We need to pull the channel data from the replication backend,
// otherwise we won't be sending the full details of the channel // otherwise we won't be sending the full details of the channel
$this->replicator $this->replicator
->channelMembers($connection->app->id, $this->channelName) ->channelMembers($connection->app->id, $this->channelName)
->then(function ($users) use ($connection) { ->then(function ($users) use ($connection) {
// Send the success event
$connection->send(json_encode([ $connection->send(json_encode([
'event' => 'pusher_internal:subscription_succeeded', 'event' => 'pusher_internal:subscription_succeeded',
'channel' => $this->channelName, 'channel' => $this->channelName,
@ -77,6 +77,12 @@ class PresenceChannel extends Channel
]); ]);
} }
/**
* Unsubscribe the connection from the Presence channel.
*
* @param ConnectionInterface $connection
* @return void
*/
public function unsubscribe(ConnectionInterface $connection) public function unsubscribe(ConnectionInterface $connection)
{ {
parent::unsubscribe($connection); parent::unsubscribe($connection);
@ -105,7 +111,9 @@ class PresenceChannel extends Channel
} }
/** /**
* @param string|null $appId * Get the Presence Channel to array.
*
* @param string|null $appId
* @return PromiseInterface * @return PromiseInterface
*/ */
public function toArray(string $appId = null) public function toArray(string $appId = null)
@ -119,6 +127,12 @@ class PresenceChannel extends Channel
}); });
} }
/**
* Get the Presence channel data.
*
* @param array $users
* @return array
*/
protected function getChannelData(array $users): array protected function getChannelData(array $users): array
{ {
return [ return [
@ -130,6 +144,12 @@ class PresenceChannel extends Channel
]; ];
} }
/**
* Get the Presence Channel's users.
*
* @param array $users
* @return array
*/
protected function getUserIds(array $users): array protected function getUserIds(array $users): array
{ {
$userIds = array_map(function ($channelData) { $userIds = array_map(function ($channelData) {