[2.x] Dispatch events on actions (#556)

* Dispatching events
This commit is contained in:
rennokki 2020-09-26 19:01:58 +00:00 committed by GitHub
parent 9cb83dccfd
commit a7c505e683
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 443 additions and 27 deletions

View File

@ -0,0 +1,82 @@
---
title: Dispatched Events
order: 5
---
# Dispatched Events
Laravel WebSockets takes advantage of Laravel's Event dispatching observer, in a way that you can handle in-server events outside of it.
For example, you can listen for events like when a new connection establishes or when an user joins a presence channel.
## Events
Below you will find a list of dispatched events:
- `BeyondCode\LaravelWebSockets\Events\NewConnection` - when a connection successfully establishes on the server
- `BeyondCode\LaravelWebSockets\Events\ConnectionClosed` - when a connection leaves the server
- `BeyondCode\LaravelWebSockets\Events\SubscribedToChannel` - when a connection subscribes to a specific channel
- `BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel` - when a connection unsubscribes from a specific channel
- `BeyondCode\LaravelWebSockets\Events\WebSocketMessageReceived` - when the server receives a message
- `BeyondCode\LaravelWebSockets\EventsConnectionPonged` - when a connection pings to the server that it is still alive
## Queued Listeners
Because the default Redis connection (either PhpRedis or Predis) is a blocking I/O method and can cause problems with the server speed and availability, you might want to check the [Non-Blocking Queue Driver](non-blocking-queue-driver.md) documentation that helps you create the Async Redis queue driver that is going to fix the Blocking I/O issue.
If set up, you can use the `async-redis` queue driver in your listeners:
```php
<?php
namespace App\Listeners;
use BeyondCode\LaravelWebSockets\Events\NewConnection;
use Illuminate\Contracts\Queue\ShouldQueue;
class HandleNewConnections implements ShouldQueue
{
/**
* The name of the connection the job should be sent to.
*
* @var string|null
*/
public $connection = 'async-redis';
/**
* Create the event listener.
*
* @return void
*/
public function __construct()
{
//
}
/**
* Handle the event.
*
* @param NewConnection $event
* @return void
*/
public function handle(NewConnection $event)
{
//
}
}
```
The `EventServiceProvider` might look like this, registering the listeners that are going to be used by the event dispatching:
```php
/**
* The event listener mappings for the application.
*
* @var array
*/
protected $listen = [
\BeyondCode\LaravelWebSockets\Events\NewConnection::class => [
App\Listeners\HandleNewConnections::class,
],
];
```

View File

@ -0,0 +1,30 @@
---
title: Non-Blocking Queue Driver
order: 4
---
# Non-Blocking Queue Driver
In Laravel, he default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server.
To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example.
Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver:
```php
'connections' => [
'async-redis' => [
'driver' => 'async-redis',
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
],
]
```
Also, make sure that the default queue driver is set to `async-redis`:
```
QUEUE_CONNECTION=async-redis
```

View File

@ -40,29 +40,3 @@ You can set the connection name to the Redis database under `redis`:
```
The connections can be found in your `config/database.php` file, under the `redis` key.
## Async Redis Queue
The default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server.
To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example.
Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver:
```php
'connections' => [
'async-redis' => [
'driver' => 'async-redis',
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
],
]
```
Also, make sure that the default queue driver is set to `async-redis`:
```
QUEUE_CONNECTION=async-redis
```

View File

@ -4,6 +4,8 @@ namespace BeyondCode\LaravelWebSockets\Channels;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use BeyondCode\LaravelWebSockets\DashboardLogger;
use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel;
use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel;
use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature;
use Illuminate\Support\Str;
use Ratchet\ConnectionInterface;
@ -89,6 +91,12 @@ class Channel
'channel' => $this->getName(),
]);
SubscribedToChannel::dispatch(
$connection->app->id,
$connection->socketId,
$this->getName(),
);
return true;
}
@ -106,6 +114,12 @@ class Channel
unset($this->connections[$connection->socketId]);
UnsubscribedFromChannel::dispatch(
$connection->app->id,
$connection->socketId,
$this->getName()
);
return true;
}

View File

@ -3,6 +3,8 @@
namespace BeyondCode\LaravelWebSockets\Channels;
use BeyondCode\LaravelWebSockets\DashboardLogger;
use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel;
use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel;
use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature;
use Ratchet\ConnectionInterface;
use stdClass;
@ -60,7 +62,7 @@ class PresenceChannel extends PrivateChannel
// and in this case the events will only be triggered when the first tab is opened.
$this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($payload, $connection) {
->then(function ($sockets) use ($payload, $connection, $user) {
if (count($sockets) === 1) {
$memberAddedPayload = [
'event' => 'pusher_internal:member_added',
@ -72,6 +74,13 @@ class PresenceChannel extends PrivateChannel
(object) $memberAddedPayload, $connection->socketId,
$connection->app->id
);
SubscribedToChannel::dispatch(
$connection->app->id,
$connection->socketId,
$this->getName(),
$user
);
}
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
@ -128,6 +137,13 @@ class PresenceChannel extends PrivateChannel
(object) $memberRemovedPayload, $connection->socketId,
$connection->app->id
);
UnsubscribedFromChannel::dispatch(
$connection->app->id,
$connection->socketId,
$this->getName(),
$user
);
}
});
});

View File

@ -0,0 +1,38 @@
<?php
namespace BeyondCode\LaravelWebSockets\Events;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class ConnectionClosed
{
use Dispatchable, SerializesModels;
/**
* The WebSockets app id that the user connected to.
*
* @var string
*/
public $appId;
/**
* The Socket ID associated with the connection.
*
* @var string
*/
public $socketId;
/**
* Create a new event instance.
*
* @param string $appId
* @param string $socketId
* @return void
*/
public function __construct(string $appId, string $socketId)
{
$this->appId = $appId;
$this->socketId = $socketId;
}
}

View File

@ -0,0 +1,38 @@
<?php
namespace BeyondCode\LaravelWebSockets\Events;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class ConnectionPonged
{
use Dispatchable, SerializesModels;
/**
* The WebSockets app id that the user connected to.
*
* @var string
*/
public $appId;
/**
* The Socket ID associated with the connection.
*
* @var string
*/
public $socketId;
/**
* Create a new event instance.
*
* @param string $appId
* @param string $socketId
* @return void
*/
public function __construct(string $appId, string $socketId)
{
$this->appId = $appId;
$this->socketId = $socketId;
}
}

View File

@ -0,0 +1,38 @@
<?php
namespace BeyondCode\LaravelWebSockets\Events;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class NewConnection
{
use Dispatchable, SerializesModels;
/**
* The WebSockets app id that the user connected to.
*
* @var string
*/
public $appId;
/**
* The Socket ID associated with the connection.
*
* @var string
*/
public $socketId;
/**
* Create a new event instance.
*
* @param string $appId
* @param string $socketId
* @return void
*/
public function __construct(string $appId, string $socketId)
{
$this->appId = $appId;
$this->socketId = $socketId;
}
}

View File

@ -0,0 +1,57 @@
<?php
namespace BeyondCode\LaravelWebSockets\Events;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
use stdClass;
class SubscribedToChannel
{
use Dispatchable, SerializesModels;
/**
* The WebSockets app id that the user connected to.
*
* @var string
*/
public $appId;
/**
* The Socket ID associated with the connection.
*
* @var string
*/
public $socketId;
/**
* The channel name.
*
* @var string
*/
public $channelName;
/**
* The user received on presence channel.
*
* @var string
*/
public $user;
/**
* Create a new event instance.
*
* @param string $appId
* @param string $socketId
* @param string $channelName
* @param stdClass|null $user
* @return void
*/
public function __construct(string $appId, string $socketId, string $channelName, ?stdClass $user = null)
{
$this->appId = $appId;
$this->socketId = $socketId;
$this->channelName = $channelName;
$this->user = $user;
}
}

View File

@ -0,0 +1,57 @@
<?php
namespace BeyondCode\LaravelWebSockets\Events;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
use stdClass;
class UnsubscribedFromChannel
{
use Dispatchable, SerializesModels;
/**
* The WebSockets app id that the user connected to.
*
* @var string
*/
public $appId;
/**
* The Socket ID associated with the connection.
*
* @var string
*/
public $socketId;
/**
* The channel name.
*
* @var string
*/
public $channelName;
/**
* The user received on presence channel.
*
* @var string
*/
public $user;
/**
* Create a new event instance.
*
* @param string $appId
* @param string $socketId
* @param string $channelName
* @param stdClass|null $user
* @return void
*/
public function __construct(string $appId, string $socketId, string $channelName, ?stdClass $user = null)
{
$this->appId = $appId;
$this->socketId = $socketId;
$this->channelName = $channelName;
$this->user = $user;
}
}

View File

@ -0,0 +1,56 @@
<?php
namespace BeyondCode\LaravelWebSockets\Events;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
use Ratchet\RFC6455\Messaging\MessageInterface;
class WebSocketMessageReceived
{
use Dispatchable, SerializesModels;
/**
* The WebSockets app id that the user connected to.
*
* @var string
*/
public $appId;
/**
* The Socket ID associated with the connection.
*
* @var string
*/
public $socketId;
/**
* The message received.
*
* @var MessageInterface
*/
public $message;
/**
* The decoded message as array.
*
* @var array
*/
public $decodedMessage;
/**
* Create a new event instance.
*
* @param string $appId
* @param string $socketId
* @param MessageInterface $message
* @return void
*/
public function __construct(string $appId, string $socketId, MessageInterface $message)
{
$this->appId = $appId;
$this->socketId = $socketId;
$this->message = $message;
$this->decodedMessage = json_decode($message->getPayload(), true);
}
}

View File

@ -2,6 +2,7 @@
namespace BeyondCode\LaravelWebSockets\Server\Messages;
use BeyondCode\LaravelWebSockets\Events\ConnectionPonged;
use Illuminate\Support\Str;
use Ratchet\ConnectionInterface;
use stdClass;
@ -35,6 +36,8 @@ class PusherChannelProtocolMessage extends PusherClientMessage
->connectionPonged($connection)
->then(function () use ($connection) {
$connection->send(json_encode(['event' => 'pusher:pong']));
ConnectionPonged::dispatch($connection->app->id, $connection->socketId);
});
}

View File

@ -5,6 +5,9 @@ namespace BeyondCode\LaravelWebSockets\Server;
use BeyondCode\LaravelWebSockets\Apps\App;
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
use BeyondCode\LaravelWebSockets\DashboardLogger;
use BeyondCode\LaravelWebSockets\Events\ConnectionClosed;
use BeyondCode\LaravelWebSockets\Events\NewConnection;
use BeyondCode\LaravelWebSockets\Events\WebSocketMessageReceived;
use BeyondCode\LaravelWebSockets\Facades\StatisticsCollector;
use Exception;
use Ratchet\ConnectionInterface;
@ -63,6 +66,8 @@ class WebSocketHandler implements MessageComponentInterface
'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}",
'socketId' => $connection->socketId,
]);
NewConnection::dispatch($connection->app->id, $connection->socketId);
}
}
@ -84,6 +89,12 @@ class WebSocketHandler implements MessageComponentInterface
)->respond();
StatisticsCollector::webSocketMessage($connection->app->id);
WebSocketMessageReceived::dispatch(
$connection->app->id,
$connection->socketId,
$message
);
}
/**
@ -105,6 +116,8 @@ class WebSocketHandler implements MessageComponentInterface
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_DISCONNECTED, [
'socketId' => $connection->socketId,
]);
ConnectionClosed::dispatch($connection->app->id, $connection->socketId);
}
});
}