From a7c505e683c3b5a691dde3be23cea1894b71d1fb Mon Sep 17 00:00:00 2001 From: rennokki Date: Sat, 26 Sep 2020 19:01:58 +0000 Subject: [PATCH] [2.x] Dispatch events on actions (#556) * Dispatching events --- docs/advanced-usage/dispatched-events.md | 82 +++++++++++++++++++ .../non-blocking-queue-driver.md | 30 +++++++ docs/horizontal-scaling/redis.md | 26 ------ src/Channels/Channel.php | 14 ++++ src/Channels/PresenceChannel.php | 18 +++- src/Events/ConnectionClosed.php | 38 +++++++++ src/Events/ConnectionPonged.php | 38 +++++++++ src/Events/NewConnection.php | 38 +++++++++ src/Events/SubscribedToChannel.php | 57 +++++++++++++ src/Events/UnsubscribedFromChannel.php | 57 +++++++++++++ src/Events/WebSocketMessageReceived.php | 56 +++++++++++++ .../Messages/PusherChannelProtocolMessage.php | 3 + src/Server/WebSocketHandler.php | 13 +++ 13 files changed, 443 insertions(+), 27 deletions(-) create mode 100644 docs/advanced-usage/dispatched-events.md create mode 100644 docs/advanced-usage/non-blocking-queue-driver.md create mode 100644 src/Events/ConnectionClosed.php create mode 100644 src/Events/ConnectionPonged.php create mode 100644 src/Events/NewConnection.php create mode 100644 src/Events/SubscribedToChannel.php create mode 100644 src/Events/UnsubscribedFromChannel.php create mode 100644 src/Events/WebSocketMessageReceived.php diff --git a/docs/advanced-usage/dispatched-events.md b/docs/advanced-usage/dispatched-events.md new file mode 100644 index 0000000..be5e095 --- /dev/null +++ b/docs/advanced-usage/dispatched-events.md @@ -0,0 +1,82 @@ +--- +title: Dispatched Events +order: 5 +--- + +# Dispatched Events + +Laravel WebSockets takes advantage of Laravel's Event dispatching observer, in a way that you can handle in-server events outside of it. + +For example, you can listen for events like when a new connection establishes or when an user joins a presence channel. + +## Events + +Below you will find a list of dispatched events: + +- `BeyondCode\LaravelWebSockets\Events\NewConnection` - when a connection successfully establishes on the server +- `BeyondCode\LaravelWebSockets\Events\ConnectionClosed` - when a connection leaves the server +- `BeyondCode\LaravelWebSockets\Events\SubscribedToChannel` - when a connection subscribes to a specific channel +- `BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel` - when a connection unsubscribes from a specific channel +- `BeyondCode\LaravelWebSockets\Events\WebSocketMessageReceived` - when the server receives a message +- `BeyondCode\LaravelWebSockets\EventsConnectionPonged` - when a connection pings to the server that it is still alive + +## Queued Listeners + +Because the default Redis connection (either PhpRedis or Predis) is a blocking I/O method and can cause problems with the server speed and availability, you might want to check the [Non-Blocking Queue Driver](non-blocking-queue-driver.md) documentation that helps you create the Async Redis queue driver that is going to fix the Blocking I/O issue. + +If set up, you can use the `async-redis` queue driver in your listeners: + +```php + [ + App\Listeners\HandleNewConnections::class, + ], +]; +``` diff --git a/docs/advanced-usage/non-blocking-queue-driver.md b/docs/advanced-usage/non-blocking-queue-driver.md new file mode 100644 index 0000000..98ed10d --- /dev/null +++ b/docs/advanced-usage/non-blocking-queue-driver.md @@ -0,0 +1,30 @@ +--- +title: Non-Blocking Queue Driver +order: 4 +--- + +# Non-Blocking Queue Driver + +In Laravel, he default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server. + +To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example. + +Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver: + +```php +'connections' => [ + 'async-redis' => [ + 'driver' => 'async-redis', + 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), + 'queue' => env('REDIS_QUEUE', 'default'), + 'retry_after' => 90, + 'block_for' => null, + ], +] +``` + +Also, make sure that the default queue driver is set to `async-redis`: + +``` +QUEUE_CONNECTION=async-redis +``` diff --git a/docs/horizontal-scaling/redis.md b/docs/horizontal-scaling/redis.md index 86759db..4f63835 100644 --- a/docs/horizontal-scaling/redis.md +++ b/docs/horizontal-scaling/redis.md @@ -40,29 +40,3 @@ You can set the connection name to the Redis database under `redis`: ``` The connections can be found in your `config/database.php` file, under the `redis` key. - -## Async Redis Queue - -The default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server. - -To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example. - -Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver: - -```php -'connections' => [ - 'async-redis' => [ - 'driver' => 'async-redis', - 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), - 'queue' => env('REDIS_QUEUE', 'default'), - 'retry_after' => 90, - 'block_for' => null, - ], -] -``` - -Also, make sure that the default queue driver is set to `async-redis`: - -``` -QUEUE_CONNECTION=async-redis -``` diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index e64a4d1..fd857e2 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -4,6 +4,8 @@ namespace BeyondCode\LaravelWebSockets\Channels; use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\DashboardLogger; +use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel; +use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; @@ -89,6 +91,12 @@ class Channel 'channel' => $this->getName(), ]); + SubscribedToChannel::dispatch( + $connection->app->id, + $connection->socketId, + $this->getName(), + ); + return true; } @@ -106,6 +114,12 @@ class Channel unset($this->connections[$connection->socketId]); + UnsubscribedFromChannel::dispatch( + $connection->app->id, + $connection->socketId, + $this->getName() + ); + return true; } diff --git a/src/Channels/PresenceChannel.php b/src/Channels/PresenceChannel.php index 3191be4..614fe8d 100644 --- a/src/Channels/PresenceChannel.php +++ b/src/Channels/PresenceChannel.php @@ -3,6 +3,8 @@ namespace BeyondCode\LaravelWebSockets\Channels; use BeyondCode\LaravelWebSockets\DashboardLogger; +use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel; +use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; use Ratchet\ConnectionInterface; use stdClass; @@ -60,7 +62,7 @@ class PresenceChannel extends PrivateChannel // and in this case the events will only be triggered when the first tab is opened. $this->channelManager ->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) - ->then(function ($sockets) use ($payload, $connection) { + ->then(function ($sockets) use ($payload, $connection, $user) { if (count($sockets) === 1) { $memberAddedPayload = [ 'event' => 'pusher_internal:member_added', @@ -72,6 +74,13 @@ class PresenceChannel extends PrivateChannel (object) $memberAddedPayload, $connection->socketId, $connection->app->id ); + + SubscribedToChannel::dispatch( + $connection->app->id, + $connection->socketId, + $this->getName(), + $user + ); } DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ @@ -128,6 +137,13 @@ class PresenceChannel extends PrivateChannel (object) $memberRemovedPayload, $connection->socketId, $connection->app->id ); + + UnsubscribedFromChannel::dispatch( + $connection->app->id, + $connection->socketId, + $this->getName(), + $user + ); } }); }); diff --git a/src/Events/ConnectionClosed.php b/src/Events/ConnectionClosed.php new file mode 100644 index 0000000..60b810b --- /dev/null +++ b/src/Events/ConnectionClosed.php @@ -0,0 +1,38 @@ +appId = $appId; + $this->socketId = $socketId; + } +} diff --git a/src/Events/ConnectionPonged.php b/src/Events/ConnectionPonged.php new file mode 100644 index 0000000..43440eb --- /dev/null +++ b/src/Events/ConnectionPonged.php @@ -0,0 +1,38 @@ +appId = $appId; + $this->socketId = $socketId; + } +} diff --git a/src/Events/NewConnection.php b/src/Events/NewConnection.php new file mode 100644 index 0000000..5c8a30f --- /dev/null +++ b/src/Events/NewConnection.php @@ -0,0 +1,38 @@ +appId = $appId; + $this->socketId = $socketId; + } +} diff --git a/src/Events/SubscribedToChannel.php b/src/Events/SubscribedToChannel.php new file mode 100644 index 0000000..b3109f7 --- /dev/null +++ b/src/Events/SubscribedToChannel.php @@ -0,0 +1,57 @@ +appId = $appId; + $this->socketId = $socketId; + $this->channelName = $channelName; + $this->user = $user; + } +} diff --git a/src/Events/UnsubscribedFromChannel.php b/src/Events/UnsubscribedFromChannel.php new file mode 100644 index 0000000..6e132e7 --- /dev/null +++ b/src/Events/UnsubscribedFromChannel.php @@ -0,0 +1,57 @@ +appId = $appId; + $this->socketId = $socketId; + $this->channelName = $channelName; + $this->user = $user; + } +} diff --git a/src/Events/WebSocketMessageReceived.php b/src/Events/WebSocketMessageReceived.php new file mode 100644 index 0000000..442ecb7 --- /dev/null +++ b/src/Events/WebSocketMessageReceived.php @@ -0,0 +1,56 @@ +appId = $appId; + $this->socketId = $socketId; + $this->message = $message; + $this->decodedMessage = json_decode($message->getPayload(), true); + } +} diff --git a/src/Server/Messages/PusherChannelProtocolMessage.php b/src/Server/Messages/PusherChannelProtocolMessage.php index 6385d90..c6f4f13 100644 --- a/src/Server/Messages/PusherChannelProtocolMessage.php +++ b/src/Server/Messages/PusherChannelProtocolMessage.php @@ -2,6 +2,7 @@ namespace BeyondCode\LaravelWebSockets\Server\Messages; +use BeyondCode\LaravelWebSockets\Events\ConnectionPonged; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use stdClass; @@ -35,6 +36,8 @@ class PusherChannelProtocolMessage extends PusherClientMessage ->connectionPonged($connection) ->then(function () use ($connection) { $connection->send(json_encode(['event' => 'pusher:pong'])); + + ConnectionPonged::dispatch($connection->app->id, $connection->socketId); }); } diff --git a/src/Server/WebSocketHandler.php b/src/Server/WebSocketHandler.php index 9fd3fe2..8bec389 100644 --- a/src/Server/WebSocketHandler.php +++ b/src/Server/WebSocketHandler.php @@ -5,6 +5,9 @@ namespace BeyondCode\LaravelWebSockets\Server; use BeyondCode\LaravelWebSockets\Apps\App; use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\DashboardLogger; +use BeyondCode\LaravelWebSockets\Events\ConnectionClosed; +use BeyondCode\LaravelWebSockets\Events\NewConnection; +use BeyondCode\LaravelWebSockets\Events\WebSocketMessageReceived; use BeyondCode\LaravelWebSockets\Facades\StatisticsCollector; use Exception; use Ratchet\ConnectionInterface; @@ -63,6 +66,8 @@ class WebSocketHandler implements MessageComponentInterface 'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}", 'socketId' => $connection->socketId, ]); + + NewConnection::dispatch($connection->app->id, $connection->socketId); } } @@ -84,6 +89,12 @@ class WebSocketHandler implements MessageComponentInterface )->respond(); StatisticsCollector::webSocketMessage($connection->app->id); + + WebSocketMessageReceived::dispatch( + $connection->app->id, + $connection->socketId, + $message + ); } /** @@ -105,6 +116,8 @@ class WebSocketHandler implements MessageComponentInterface DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_DISCONNECTED, [ 'socketId' => $connection->socketId, ]); + + ConnectionClosed::dispatch($connection->app->id, $connection->socketId); } }); }