diff --git a/README.md b/README.md index 2b8e230..056e048 100644 --- a/README.md +++ b/README.md @@ -7,21 +7,24 @@ Bring the power of WebSockets to your Laravel application. Drop-in Pusher replac ## Documentation +### Features +* Laravel native event broadcasting +* Async with pcntl_fork +* SSL support +* Laravel Echo support +* Docker & Traefik capable + ### Changelog Please see [CHANGELOG](CHANGELOG.md) for more information on what has changed recently. -## Contributing - -~Please see [CONTRIBUTING](CONTRIBUTING.md) for details.~ - ### Security If you discover any security related issues, please email office@blax.at or the the issue tracker. ## Credits -- [Marcel Pociot](https://github.com/mpociot) +- [Marcel Pociot (beyondco.de)](https://github.com/mpociot) - [Freek Van der Herten](https://github.com/freekmurze) - [All Contributors](../../contributors) diff --git a/composer.json b/composer.json index a08340e..3a310b9 100644 --- a/composer.json +++ b/composer.json @@ -70,12 +70,12 @@ }, "autoload": { "psr-4": { - "BeyondCode\\LaravelWebSockets\\": "src/" + "BlaxSoftware\\LaravelWebSockets\\": "src/" } }, "autoload-dev": { "psr-4": { - "BeyondCode\\LaravelWebSockets\\Test\\": "tests" + "BlaxSoftware\\LaravelWebSockets\\Test\\": "tests" } }, "scripts": { @@ -89,11 +89,11 @@ "extra": { "laravel": { "providers": [ - "BeyondCode\\LaravelWebSockets\\WebSocketsServiceProvider" + "BlaxSoftware\\LaravelWebSockets\\WebSocketsServiceProvider" ], "aliases": { - "WebSocketRouter": "BeyondCode\\LaravelWebSockets\\Facades\\WebSocketRouter" + "WebSocketRouter": "BlaxSoftware\\LaravelWebSockets\\Facades\\WebSocketRouter" } } } -} +} \ No newline at end of file diff --git a/config/websockets.php b/config/websockets.php index a321ace..1e1b591 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -10,24 +10,19 @@ return [ | You can configure the dashboard settings from here. | */ + 'port' => env('LARAVEL_WEBSOCKETS_PORT', env('PUSHER_PORT', 6001)), 'dashboard' => [ - - 'port' => env('LARAVEL_WEBSOCKETS_PORT', 6001), - + 'port' => env('LARAVEL_WEBSOCKETS_PORT', env('PUSHER_PORT', 6001)), 'domain' => env('LARAVEL_WEBSOCKETS_DOMAIN'), - 'path' => env('LARAVEL_WEBSOCKETS_PATH', 'laravel-websockets'), - 'middleware' => [ 'web', - \BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize::class, + \BlaxSoftware\LaravelWebSockets\Dashboard\Http\Middleware\Authorize::class, ], - ], 'managers' => [ - /* |-------------------------------------------------------------------------- | Application Manager @@ -41,8 +36,7 @@ return [ | custom application manager. | */ - - 'app' => \BeyondCode\LaravelWebSockets\Apps\ConfigAppManager::class, + 'app' => \BlaxSoftware\LaravelWebSockets\Apps\ConfigAppManager::class, /* |-------------------------------------------------------------------------- @@ -52,7 +46,6 @@ return [ | The SQLite database to use when using the SQLite application manager. | */ - 'sqlite' => [ 'database' => storage_path('laravel-websockets.sqlite'), ], @@ -65,10 +58,8 @@ return [ | The MySQL database connection to use. | */ - 'mysql' => [ 'connection' => env('DB_CONNECTION', 'mysql'), - 'table' => 'websockets_apps', ], ], @@ -87,7 +78,6 @@ return [ | client-to-client messages or statistics. | */ - 'apps' => [ [ 'id' => env('PUSHER_APP_ID'), @@ -97,8 +87,8 @@ return [ 'secret' => env('PUSHER_APP_SECRET'), 'path' => env('PUSHER_APP_PATH'), 'capacity' => null, - 'enable_client_messages' => false, - 'enable_statistics' => true, + 'enable_client_messages' => true, + 'enable_statistics' => false, 'allowed_origins' => [ // env('LARAVEL_WEBSOCKETS_DOMAIN'), ], @@ -124,79 +114,23 @@ return [ 'mode' => env('WEBSOCKETS_REPLICATION_MODE', 'local'), 'modes' => [ - - /* - |-------------------------------------------------------------------------- - | Local Replication - |-------------------------------------------------------------------------- - | - | Local replication is actually a null replicator, meaning that it - | is the default behaviour of storing the connections into an array. - | - */ - 'local' => [ - - /* - |-------------------------------------------------------------------------- - | Channel Manager - |-------------------------------------------------------------------------- - | - | The channel manager is responsible for storing, tracking and retrieving - | the channels as long as their members and connections. - | - */ - - 'channel_manager' => \BeyondCode\LaravelWebSockets\ChannelManagers\LocalChannelManager::class, - - /* - |-------------------------------------------------------------------------- - | Statistics Collector - |-------------------------------------------------------------------------- - | - | The Statistics Collector will, by default, handle the incoming statistics, - | storing them until they will become dumped into another database, usually - | a MySQL database or a time-series database. - | - */ - - 'collector' => \BeyondCode\LaravelWebSockets\Statistics\Collectors\MemoryCollector::class, - + 'channel_manager' => \BlaxSoftware\LaravelWebSockets\ChannelManagers\LocalChannelManager::class, + 'collector' => \BlaxSoftware\LaravelWebSockets\Statistics\Collectors\MemoryCollector::class, ], 'redis' => [ - 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), - - /* - |-------------------------------------------------------------------------- - | Channel Manager - |-------------------------------------------------------------------------- - | - | The channel manager is responsible for storing, tracking and retrieving - | the channels as long as their members and connections. - | - */ - - 'channel_manager' => \BeyondCode\LaravelWebSockets\ChannelManagers\RedisChannelManager::class, - - /* - |-------------------------------------------------------------------------- - | Statistics Collector - |-------------------------------------------------------------------------- - | - | The Statistics Collector will, by default, handle the incoming statistics, - | storing them until they will become dumped into another database, usually - | a MySQL database or a time-series database. - | - */ - - 'collector' => \BeyondCode\LaravelWebSockets\Statistics\Collectors\RedisCollector::class, - + 'channel_manager' => \BlaxSoftware\LaravelWebSockets\ChannelManagers\RedisChannelManager::class, + 'collector' => \BlaxSoftware\LaravelWebSockets\Statistics\Collectors\RedisCollector::class, ], + 'custom' => [ + 'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'), + 'channel_manager' => \BlaxSoftware\LaravelWebSockets\Websocket\ChannelManager::class, + 'collector' => BlaxSoftware\LaravelWebSockets\Statistics\Collectors\MemoryCollector::class, + ], ], - ], 'statistics' => [ @@ -212,7 +146,7 @@ return [ | */ - 'store' => \BeyondCode\LaravelWebSockets\Statistics\Stores\DatabaseStore::class, + 'store' => \BlaxSoftware\LaravelWebSockets\Statistics\Stores\DatabaseStore::class, /* |-------------------------------------------------------------------------- @@ -250,7 +184,7 @@ return [ | */ - 'max_request_size_in_kb' => 250, + 'max_request_size_in_kb' => 2048, /* |-------------------------------------------------------------------------- @@ -265,19 +199,12 @@ return [ */ 'ssl' => [ - 'local_cert' => env('LARAVEL_WEBSOCKETS_SSL_LOCAL_CERT', null), - 'capath' => env('LARAVEL_WEBSOCKETS_SSL_CA', null), - 'local_pk' => env('LARAVEL_WEBSOCKETS_SSL_LOCAL_PK', null), - 'passphrase' => env('LARAVEL_WEBSOCKETS_SSL_PASSPHRASE', null), - 'verify_peer' => env('APP_ENV') === 'production', - - 'allow_self_signed' => env('APP_ENV') !== 'production', - + 'allow_self_signed' => true, ], /* @@ -294,17 +221,17 @@ return [ 'handlers' => [ - 'websocket' => \BeyondCode\LaravelWebSockets\Server\WebSocketHandler::class, + 'websocket' => \BlaxSoftware\LaravelWebSockets\Server\WebSocketHandler::class, - 'health' => \BeyondCode\LaravelWebSockets\Server\HealthHandler::class, + 'health' => \BlaxSoftware\LaravelWebSockets\Server\HealthHandler::class, - 'trigger_event' => \BeyondCode\LaravelWebSockets\API\TriggerEvent::class, + 'trigger_event' => \BlaxSoftware\LaravelWebSockets\API\TriggerEvent::class, - 'fetch_channels' => \BeyondCode\LaravelWebSockets\API\FetchChannels::class, + 'fetch_channels' => \BlaxSoftware\LaravelWebSockets\API\FetchChannels::class, - 'fetch_channel' => \BeyondCode\LaravelWebSockets\API\FetchChannel::class, + 'fetch_channel' => \BlaxSoftware\LaravelWebSockets\API\FetchChannel::class, - 'fetch_users' => \BeyondCode\LaravelWebSockets\API\FetchUsers::class, + 'fetch_users' => \BlaxSoftware\LaravelWebSockets\API\FetchUsers::class, ], diff --git a/database/migrations/0000_00_00_000000_create_websockets_apps_table.php b/database/migrations/0000_00_00_000000_create_websockets_apps_table.php index 7c22401..844696f 100644 --- a/database/migrations/0000_00_00_000000_create_websockets_apps_table.php +++ b/database/migrations/0000_00_00_000000_create_websockets_apps_table.php @@ -13,6 +13,7 @@ class CreateWebSocketsAppsTable extends Migration */ public function up() { + Schema::dropIfExists('websockets_apps'); Schema::create('websockets_apps', function (Blueprint $table) { $table->string('id')->index(); $table->string('key'); diff --git a/database/migrations/0000_00_00_000000_create_websockets_statistics_entries_table.php b/database/migrations/0000_00_00_000000_create_websockets_statistics_entries_table.php index 0989f28..8894066 100644 --- a/database/migrations/0000_00_00_000000_create_websockets_statistics_entries_table.php +++ b/database/migrations/0000_00_00_000000_create_websockets_statistics_entries_table.php @@ -13,6 +13,7 @@ class CreateWebSocketsStatisticsEntriesTable extends Migration */ public function up() { + Schema::dropIfExists('websockets_statistics_entries'); Schema::create('websockets_statistics_entries', function (Blueprint $table) { $table->increments('id'); $table->string('app_id'); diff --git a/docs/_index.md b/docs/_index.md index 7c504e5..f1c90b7 100644 --- a/docs/_index.md +++ b/docs/_index.md @@ -1,4 +1,4 @@ --- packageName: Laravel Websockets -githubUrl: https://github.com/beyondcode/laravel-websockets +githubUrl: https://github.com/blax-software/laravel-websockets --- diff --git a/docs/advanced-usage/app-providers.md b/docs/advanced-usage/app-providers.md index 77f4502..fcd3564 100644 --- a/docs/advanced-usage/app-providers.md +++ b/docs/advanced-usage/app-providers.md @@ -11,23 +11,23 @@ Depending on your setup, you might have your app configuration stored elsewhere > Make sure that you do **not** perform any IO blocking tasks in your `AppManager`, as they will interfere with the asynchronous WebSocket execution. -In order to create your custom `AppManager`, create a class that implements the `BeyondCode\LaravelWebSockets\Contracts\AppManager` interface. +In order to create your custom `AppManager`, create a class that implements the `BlaxSoftware\LaravelWebSockets\Contracts\AppManager` interface. This is what it looks like: ```php interface AppManager { - /** @return array[BeyondCode\LaravelWebSockets\Apps\App] */ + /** @return array[BlaxSoftware\LaravelWebSockets\Apps\App] */ public function all(): array; - /** @return BeyondCode\LaravelWebSockets\Apps\App */ + /** @return BlaxSoftware\LaravelWebSockets\Apps\App */ public function findById($appId): ?App; - /** @return BeyondCode\LaravelWebSockets\Apps\App */ + /** @return BlaxSoftware\LaravelWebSockets\Apps\App */ public function findByKey($appKey): ?App; - /** @return BeyondCode\LaravelWebSockets\Apps\App */ + /** @return BlaxSoftware\LaravelWebSockets\Apps\App */ public function findBySecret($appSecret): ?App; } ``` @@ -37,8 +37,8 @@ The following is an example AppManager that utilizes an Eloquent model: namespace App\Managers; use App\Application; -use BeyondCode\LaravelWebSockets\Apps\App; -use BeyondCode\LaravelWebSockets\Contracts\AppManager; +use BlaxSoftware\LaravelWebSockets\Apps\App; +use BlaxSoftware\LaravelWebSockets\Contracts\AppManager; class MyCustomAppManager implements AppManager { diff --git a/docs/advanced-usage/dispatched-events.md b/docs/advanced-usage/dispatched-events.md index be5e095..aaa39ae 100644 --- a/docs/advanced-usage/dispatched-events.md +++ b/docs/advanced-usage/dispatched-events.md @@ -13,12 +13,12 @@ For example, you can listen for events like when a new connection establishes or Below you will find a list of dispatched events: -- `BeyondCode\LaravelWebSockets\Events\NewConnection` - when a connection successfully establishes on the server -- `BeyondCode\LaravelWebSockets\Events\ConnectionClosed` - when a connection leaves the server -- `BeyondCode\LaravelWebSockets\Events\SubscribedToChannel` - when a connection subscribes to a specific channel -- `BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel` - when a connection unsubscribes from a specific channel -- `BeyondCode\LaravelWebSockets\Events\WebSocketMessageReceived` - when the server receives a message -- `BeyondCode\LaravelWebSockets\EventsConnectionPonged` - when a connection pings to the server that it is still alive +- `BlaxSoftware\LaravelWebSockets\Events\NewConnection` - when a connection successfully establishes on the server +- `BlaxSoftware\LaravelWebSockets\Events\ConnectionClosed` - when a connection leaves the server +- `BlaxSoftware\LaravelWebSockets\Events\SubscribedToChannel` - when a connection subscribes to a specific channel +- `BlaxSoftware\LaravelWebSockets\Events\UnsubscribedFromChannel` - when a connection unsubscribes from a specific channel +- `BlaxSoftware\LaravelWebSockets\Events\WebSocketMessageReceived` - when the server receives a message +- `BlaxSoftware\LaravelWebSockets\EventsConnectionPonged` - when a connection pings to the server that it is still alive ## Queued Listeners @@ -31,7 +31,7 @@ If set up, you can use the `async-redis` queue driver in your listeners: namespace App\Listeners; -use BeyondCode\LaravelWebSockets\Events\NewConnection; +use BlaxSoftware\LaravelWebSockets\Events\NewConnection; use Illuminate\Contracts\Queue\ShouldQueue; class HandleNewConnections implements ShouldQueue @@ -75,7 +75,7 @@ The `EventServiceProvider` might look like this, registering the listeners that * @var array */ protected $listen = [ - \BeyondCode\LaravelWebSockets\Events\NewConnection::class => [ + \BlaxSoftware\LaravelWebSockets\Events\NewConnection::class => [ App\Listeners\HandleNewConnections::class, ], ]; diff --git a/docs/advanced-usage/webhooks.md b/docs/advanced-usage/webhooks.md index 2df8e92..8f80093 100644 --- a/docs/advanced-usage/webhooks.md +++ b/docs/advanced-usage/webhooks.md @@ -15,7 +15,7 @@ For example, running your own business logic on connection open and close: ```php namespace App\Controllers\WebSockets; -use BeyondCode\LaravelWebSockets\WebSockets\WebSocketHandler as BaseWebSocketHandler; +use BlaxSoftware\LaravelWebSockets\WebSockets\WebSocketHandler as BaseWebSocketHandler; use Ratchet\ConnectionInterface; class WebSocketHandler extends BaseWebSocketHandler diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md index 5d24d7d..2ea0c8a 100644 --- a/docs/getting-started/installation.md +++ b/docs/getting-started/installation.md @@ -16,7 +16,7 @@ The package will automatically register a service provider. You need to publish the WebSocket configuration file: ```bash -php artisan vendor:publish --provider="BeyondCode\LaravelWebSockets\WebSocketsServiceProvider" --tag="config" +php artisan vendor:publish --provider="BlaxSoftware\LaravelWebSockets\WebSocketsServiceProvider" --tag="config" ``` # Statistics @@ -26,7 +26,7 @@ This package comes with migrations to store statistic information while running You can publish the migration file using: ```bash -php artisan vendor:publish --provider="BeyondCode\LaravelWebSockets\WebSocketsServiceProvider" --tag="migrations" +php artisan vendor:publish --provider="BlaxSoftware\LaravelWebSockets\WebSocketsServiceProvider" --tag="migrations" ``` Run the migrations with: diff --git a/docs/getting-started/questions-issues.md b/docs/getting-started/questions-issues.md index 7912216..0f539a5 100644 --- a/docs/getting-started/questions-issues.md +++ b/docs/getting-started/questions-issues.md @@ -5,6 +5,6 @@ order: 3 # Questions and issues -Find yourself stuck using the package? Found a bug? Do you have general questions or suggestions for improving laravel-websockets? Feel free to create an issue on [GitHub](https://github.com/beyondcode/laravel-websockets/issues), we'll try to address it as soon as possible. +Find yourself stuck using the package? Found a bug? Do you have general questions or suggestions for improving laravel-websockets? Feel free to create an issue on [GitHub](https://github.com/blax-software/laravel-websockets/issues), we'll try to address it as soon as possible. -If you've found a bug regarding security please mail [marcel@beyondco.de](mailto:marcel@beyondco.de) instead of using the issue tracker. \ No newline at end of file +If you've found a bug regarding security please mail [office@blax.at](mailto:office@blax.at) instead of using the issue tracker. \ No newline at end of file diff --git a/src/API/Controller.php b/src/API/Controller.php index 8e4513b..339c3ec 100644 --- a/src/API/Controller.php +++ b/src/API/Controller.php @@ -1,10 +1,10 @@ currentTime() ); diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 41a236b..f97e363 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -1,14 +1,14 @@ loop = $loop; + + $this->redis = Redis::connection( + config('websockets.replication.modes.redis.connection', 'default') + ); + + $connectionUri = $this->getConnectionUri(); + + $factoryClass = $factoryClass ?: Factory::class; + $factory = new $factoryClass($this->loop); + + $this->publishClient = $factory->createLazyClient($connectionUri); + $this->subscribeClient = $factory->createLazyClient($connectionUri); + + $this->subscribeClient->on('message', function ($channel, $payload) { + $this->onMessage($channel, $payload); + }); + } + + /** + * Get all channels for a specific app + * across multiple servers. + * + * @param string|int $appId + * @return \React\Promise\PromiseInterface[array] + */ + public function getGlobalChannels($appId) : PromiseInterface + { + return $this->publishClient->smembers( + $this->getChannelsRedisHash($appId) + ); + } + + /** + * Remove connection from all channels. + * + * @return PromiseInterface[bool] + */ + public function unsubscribeFromAllChannels(ConnectionInterface $connection) : PromiseInterface + { + return $this->getGlobalChannels($connection->app->id) + ->then(function ($channels) use ($connection) { + foreach ($channels as $channel) { + $this->unsubscribeFromChannel($connection, $channel, new stdClass); + } + }) + ->then(fn () => parent::unsubscribeFromAllChannels($connection)); + } + + /** + * Subscribe the connection to a specific channel. + * + * @return PromiseInterface[bool] + */ + public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) : PromiseInterface + { + return $this->subscribeToTopic($connection->app->id, $channelName) + ->then(fn () => $this->addConnectionToSet($connection, Carbon::now())) + ->then(fn () => $this->addChannelToSet($connection->app->id, $channelName)) + ->then(fn () => $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1)) + ->then(fn () => parent::subscribeToChannel($connection, $channelName, $payload)); + } + + /** + * Unsubscribe the connection from the channel. + * + * @return PromiseInterface[bool] + */ + public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload) : PromiseInterface + { + return parent::unsubscribeFromChannel($connection, $channelName, $payload) + ->then(fn () => $this->decrementSubscriptionsCount($connection->app->id, $channelName)) + ->then(function ($count) use ($connection, $channelName) { + $this->removeConnectionFromSet($connection); + // If the total connections count gets to 0 after unsubscribe, + // try again to check & unsubscribe from the PubSub topic if needed. + if ($count < 1) { + $this->removeChannelFromSet($connection->app->id, $channelName); + $this->unsubscribeFromTopic($connection->app->id, $channelName); + } + }); + } + + /** + * Subscribe the connection to a specific channel, returning + * a promise containing the amount of connections. + * + * @param string|int $appId + * @return PromiseInterface[int] + */ + public function subscribeToApp($appId) : PromiseInterface + { + return $this->subscribeToTopic($appId) + ->then(fn () => $this->incrementSubscriptionsCount($appId)); + } + + /** + * Unsubscribe the connection from the channel, returning + * a promise containing the amount of connections after decrement. + * + * @param string|int $appId + * @return PromiseInterface[int] + */ + public function unsubscribeFromApp($appId) : PromiseInterface + { + return $this->unsubscribeFromTopic($appId) + ->then(fn () => $this->decrementSubscriptionsCount($appId)); + } + + /** + * Get the connections count + * across multiple servers. + * + * @param string|int $appId + * @return PromiseInterface[int] + */ + public function getGlobalConnectionsCount($appId, ?string $channelName = null) : PromiseInterface + { + return $this->publishClient + ->hget($this->getStatsRedisHash($appId, $channelName), 'connections') + ->then(fn ($count) => is_null($count) ? 0 : (int) $count); + } + + /** + * Broadcast the message across multiple servers. + * + * @param string|int $appId + * @return PromiseInterface[bool] + */ + public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, ?string $serverId = null) : PromiseInterface + { + $payload->appId = $appId; + $payload->socketId = $socketId; + $payload->serverId = $serverId ?: $this->getServerId(); + + return $this->publishClient + ->publish($this->getRedisTopicName($appId, $channel), json_encode($payload)) + ->then(fn () => parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId)); + } + + /** + * Handle the user when it joined a presence channel. + */ + public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload) : PromiseInterface + { + return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user)) + ->then(fn () => $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId)) + ->then(fn () => parent::userJoinedPresenceChannel($connection, $user, $channel, $payload)); + } + + /** + * Handle the user when it left a presence channel. + * + * @param stdClass $payload + * @return PromiseInterface[bool] + */ + public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel) : PromiseInterface + { + return $this->removeUserData($connection->app->id, $channel, $connection->socketId) + ->then(fn () => $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId)) + ->then(fn () => parent::userLeftPresenceChannel($connection, $user, $channel)); + } + + /** + * Get the presence channel members. + * + * @param string|int $appId + * @return \React\Promise\PromiseInterface[array] + */ + public function getChannelMembers($appId, string $channel) : PromiseInterface + { + return $this->publishClient + ->hgetall($this->getUsersRedisHash($appId, $channel)) + ->then(fn ($list) => collect(Helpers::redisListToArray($list))->map(fn ($user) => json_decode($user))->unique('user_id')->toArray()); + } + + /** + * Get a member from a presence channel based on connection. + * + * @return \React\Promise\PromiseInterface[null|array] + */ + public function getChannelMember(ConnectionInterface $connection, string $channel) : PromiseInterface + { + return $this->publishClient->hget( + $this->getUsersRedisHash($connection->app->id, $channel), + $connection->socketId + ); + } + + /** + * Get the presence channels total members count. + * + * @param string|int $appId + * @return \React\Promise\PromiseInterface[array] + */ + public function getChannelsMembersCount($appId, array $channelNames) : PromiseInterface + { + $this->publishClient->multi(); + + foreach ($channelNames as $channel) { + $this->publishClient->hlen( + $this->getUsersRedisHash($appId, $channel) + ); + } + + return $this->publishClient->exec() + ->then(fn ($data) => array_combine($channelNames, $data)); + } + + /** + * Get the socket IDs for a presence channel member. + * + * @param string|int $userId + * @param string|int $appId + * @param string $channelName + * @return \React\Promise\PromiseInterface[array] + */ + public function getMemberSockets($userId, $appId, $channelName) : PromiseInterface + { + return $this->publishClient->smembers( + $this->getUserSocketsRedisHash($appId, $channelName, $userId) + ); + } + + /** + * Keep tracking the connections availability when they pong. + * + * @return PromiseInterface[bool] + */ + public function connectionPonged(ConnectionInterface $connection) : PromiseInterface + { + // This will update the score with the current timestamp. + return $this->addConnectionToSet($connection, Carbon::now()) + ->then(function () use ($connection) { + $payload = [ + 'socketId' => $connection->socketId, + 'appId' => $connection->app->id, + 'serverId' => $this->getServerId(), + ]; + + return $this->publishClient + ->publish($this->getPongRedisHash($connection->app->id), json_encode($payload)); + }) + ->then(fn () => parent::connectionPonged($connection)); + } + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return PromiseInterface[bool] + */ + public function removeObsoleteConnections() : PromiseInterface + { + $lock = $this->lock(); + try { + $lock->get(function () { + $this->getConnectionsFromSet(0, (int) now()->subMinutes(2)->format('U')) + ->then(function ($connections) { + foreach ($connections as $socketId => $appId) { + $connection = $this->fakeConnectionForApp($appId, $socketId); + + $this->unsubscribeFromAllChannels($connection); + } + }); + }); + + return parent::removeObsoleteConnections(); + } finally { + optional($lock)->forceRelease(); + } + } + + /** + * Handle a message received from Redis on a specific channel. + * + * @return void + */ + public function onMessage(string $redisChannel, string $payload) + { + $payload = json_decode($payload); + + if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) { + return; + } + + if ($redisChannel == $this->getPongRedisHash($payload->appId)) { + $connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId); + + return parent::connectionPonged($connection); + } + + $payload->channel = Str::after($redisChannel, "{$payload->appId}:"); + + if (! $channel = $this->find($payload->appId, $payload->channel)) { + return; + } + + $appId = $payload->appId ?? null; + $socketId = $payload->socketId ?? null; + $serverId = $payload->serverId ?? null; + + DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [ + 'fromServerId' => $serverId, + 'fromSocketId' => $socketId, + 'receiverServerId' => $this->getServerId(), + 'channel' => $channel, + 'payload' => $payload, + ]); + + unset($payload->socketId, $payload->serverId, $payload->appId); + + $channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId); + } + + public function find($appId, string $channel) + { + if (! $channelInstance = parent::find($appId, $channel)) { + $class = $this->getChannelClassName($channel); + $this->channels[$appId][$channel] = new $class($channel); + } + + return parent::find($appId, $channel); + } + + /** + * Get the Subscribe client instance. + * + * @return Client + */ + public function getSubscribeClient() + { + return $this->subscribeClient; + } + + /** + * Get the Publish client instance. + * + * @return Client + */ + public function getPublishClient() + { + return $this->publishClient; + } + + /** + * Get the Redis client used by other classes. + * + * @return Client + */ + public function getRedisClient() + { + return $this->getPublishClient(); + } + + /** + * Increment the subscribed count number. + * + * @param string|int $appId + * @return PromiseInterface[int] + */ + public function incrementSubscriptionsCount($appId, ?string $channel = null, int $increment = 1) : PromiseInterface + { + return $this->publishClient->hincrby( + $this->getStatsRedisHash($appId, $channel), + 'connections', + $increment + ); + } + + /** + * Decrement the subscribed count number. + * + * @param string|int $appId + * @param int $decrement + * @return PromiseInterface[int] + */ + public function decrementSubscriptionsCount($appId, ?string $channel = null, int $increment = 1) : PromiseInterface + { + return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1); + } + + /** + * Add the connection to the sorted list. + * + * @param \DateTime|string|null $moment + */ + public function addConnectionToSet(ConnectionInterface $connection, $moment = null) : PromiseInterface + { + $moment = $moment ? Carbon::parse($moment) : Carbon::now(); + + return $this->publishClient->zadd( + $this->getSocketsRedisHash(), + $moment->format('U'), + "{$connection->app->id}:{$connection->socketId}" + ); + } + + /** + * Remove the connection from the sorted list. + */ + public function removeConnectionFromSet(ConnectionInterface $connection) : PromiseInterface + { + return $this->publishClient->zrem( + $this->getSocketsRedisHash(), + "{$connection->app->id}:{$connection->socketId}" + ); + } + + /** + * Get the connections from the sorted list, with last + * connection between certain timestamps. + * + * @return PromiseInterface[array] + */ + public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true) : PromiseInterface + { + if ($strict) { + $start = "({$start}"; + $stop = "({$stop}"; + } + + return $this->publishClient + ->zrangebyscore($this->getSocketsRedisHash(), $start, $stop) + ->then(function ($list) { + return collect($list)->mapWithKeys(function ($appWithSocket) { + [$appId, $socketId] = explode(':', $appWithSocket); + + return [$socketId => $appId]; + })->toArray(); + }); + } + + /** + * Add a channel to the set list. + * + * @param string|int $appId + */ + public function addChannelToSet($appId, string $channel) : PromiseInterface + { + return $this->publishClient->sadd( + $this->getChannelsRedisHash($appId), + $channel + ); + } + + /** + * Remove a channel from the set list. + * + * @param string|int $appId + */ + public function removeChannelFromSet($appId, string $channel) : PromiseInterface + { + return $this->publishClient->srem( + $this->getChannelsRedisHash($appId), + $channel + ); + } + + /** + * Check if channel is on the list. + * + * @param string|int $appId + */ + public function isChannelInSet($appId, string $channel) : PromiseInterface + { + return $this->publishClient->sismember( + $this->getChannelsRedisHash($appId), + $channel + ); + } + + /** + * Set data for a topic. Might be used for the presence channels. + * + * @param string|int $appId + * @param string $data + */ + public function storeUserData($appId, ?string $channel, string $key, $data) : PromiseInterface + { + return $this->publishClient->hset( + $this->getUsersRedisHash($appId, $channel), + $key, + $data + ); + } + + /** + * Remove data for a topic. Might be used for the presence channels. + * + * @param string|int $appId + */ + public function removeUserData($appId, ?string $channel, string $key) : PromiseInterface + { + return $this->publishClient->hdel( + $this->getUsersRedisHash($appId, $channel), + $key + ); + } + + /** + * Subscribe to the topic for the app, or app and channel. + * + * @param string|int $appId + */ + public function subscribeToTopic($appId, ?string $channel = null) : PromiseInterface + { + $topic = $this->getRedisTopicName($appId, $channel); + + DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [ + 'serverId' => $this->getServerId(), + 'pubsubTopic' => $topic, + ]); + + return $this->subscribeClient->subscribe($topic); + } + + /** + * Unsubscribe from the topic for the app, or app and channel. + * + * @param string|int $appId + */ + public function unsubscribeFromTopic($appId, ?string $channel = null) : PromiseInterface + { + $topic = $this->getRedisTopicName($appId, $channel); + + DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [ + 'serverId' => $this->getServerId(), + 'pubsubTopic' => $topic, + ]); + + return $this->subscribeClient->unsubscribe($topic); + } + + /** + * Get the Redis Keyspace name to handle subscriptions + * and other key-value sets. + * + * @param string|int|null $appId + */ + public function getRedisKey($appId = null, ?string $channel = null, array $suffixes = []) : string + { + $prefix = config('database.redis.options.prefix', null); + + $hash = "{$prefix}{$appId}"; + + if ($channel) { + $suffixes = array_merge([$channel], $suffixes); + } + + $suffixes = implode(':', $suffixes); + + if ($suffixes) { + $hash .= ":{$suffixes}"; + } + + return $hash; + } + + /** + * Get the pong Redis hash. + * + * @param string|int $appId + */ + public function getPongRedisHash($appId) : string + { + return $this->getRedisKey($appId, null, ['pong']); + } + + /** + * Get the statistics Redis hash. + * + * @param string|int $appId + */ + public function getStatsRedisHash($appId, ?string $channel = null) : string + { + return $this->getRedisKey($appId, $channel, ['stats']); + } + + /** + * Get the sockets Redis hash used to store all sockets ids. + */ + public function getSocketsRedisHash() : string + { + return $this->getRedisKey(null, null, ['sockets']); + } + + /** + * Get the channels Redis hash for a specific app id, used + * to store existing channels. + * + * @param string|int $appId + */ + public function getChannelsRedisHash($appId) : string + { + return $this->getRedisKey($appId, null, ['channels']); + } + + /** + * Get the Redis hash for storing presence channels users. + * + * @param string|int $appId + */ + public function getUsersRedisHash($appId, ?string $channel = null) : string + { + return $this->getRedisKey($appId, $channel, ['users']); + } + + /** + * Get the Redis hash for storing socket ids + * for a specific presence channels user. + * + * @param string|int $appId + * @param string|int|null $userId + */ + public function getUserSocketsRedisHash($appId, ?string $channel = null, $userId = null) : string + { + return $this->getRedisKey($appId, $channel, [$userId, 'userSockets']); + } + + /** + * Get the Redis topic name for PubSub + * used to transfer info between servers. + * + * @param string|int $appId + */ + public function getRedisTopicName($appId, ?string $channel = null) : string + { + return $this->getRedisKey($appId, $channel); + } + + /** + * Create a fake connection for app that will mimick a connection + * by app ID and Socket ID to be able to be passed to the methods + * that accepts a connection class. + * + * @param string|int $appId + * @return ConnectionInterface + */ + public function fakeConnectionForApp($appId, string $socketId) + { + return new MockableConnection($appId, $socketId); + } + + /** + * Build the Redis connection URL from Laravel database config. + * + * @return string + */ + protected function getConnectionUri() + { + $name = config('websockets.replication.modes.redis.connection', 'default'); + $config = config("database.redis.{$name}"); + + $host = $config['host']; + $port = $config['port'] ?: 6379; + + $query = []; + + if ($config['password']) { + $query['password'] = $config['password']; + } + + if ($config['database']) { + $query['db'] = $config['database']; + } + + $query = http_build_query($query); + + return "redis://{$host}:{$port}" . ($query ? "?{$query}" : ''); + } + + /** + * Add the Presence Channel's User's Socket ID to a list. + * + * @param string|int $appId + */ + protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId) : PromiseInterface + { + return $this->publishClient->sadd( + $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), + $socketId + ); + } + + /** + * Remove the Presence Channel's User's Socket ID from the list. + * + * @param string|int $appId + */ + protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId) : PromiseInterface + { + return $this->publishClient->srem( + $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), + $socketId + ); + } + + /** + * Get a new RedisLock instance to avoid race conditions. + * + * @return \Illuminate\Cache\CacheLock + */ + protected function lock() + { + return new RedisLock($this->redis, static::$lockName, 0); + } +} diff --git a/src/Websocket/Controller.php b/src/Websocket/Controller.php new file mode 100644 index 0000000..2b9025a --- /dev/null +++ b/src/Websocket/Controller.php @@ -0,0 +1,262 @@ +send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'Event unknown', + ], + 'channel' => $message['channel'], + ])); + } + + try { + $controller = (strpos($event[0], '-') >= 0) + ? implode('', array_map(fn ($item) => ucfirst($item), explode('-', $event[0]))) + : ucfirst($event[0]); + + $controller = '\BlaxSoftware\LaravelWebSockets\Websocket\Controllers\\' . $controller . 'Controller'; + $method = static::without_uniquifyer($event[1]); + + if (! class_exists($controller)) { + return $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'Event could not be associated', + ], + 'channel' => $message['channel'], + ])); + } + + if (! method_exists($controller, $method)) { + return $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'Event could not be handled', + ], + 'channel' => $message['channel'], + ])); + } + + // Instantiate the controller + $controller = (new $controller( + $connection, + $channel, + $message['event'], + $channelManager + )); + + // Return unauthorized if auth is required + if (($controller->need_auth ?? true) && ! $connection->user) { + return $controller->error('Unauthorized'); + } + + $payload = $controller->$method( + $connection, + $message['data'], + $message['channel'] + ); + + if ( + $payload === false + || $payload === true + ) { + return; + } + + $connection->send(json_encode([ + 'event' => $message['event'] . ':response', + 'data' => $payload, + 'channel' => $message['channel'], + ])); + + return $payload; + } catch (\Exception $e) { + $reload = [ + 'event' => @$message['event'], + 'data' => @$message['data'], + 'channel' => @$message['channel'], + 'line' => $e->getFile() . ':' . $e->getLine(), + ]; + \Log::error($e->getMessage(), $reload); + + return $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'Error: ' . $e->getMessage(), + 'meta' => [ + 'reported' => true, + ], + ], + 'channel' => $message['channel'], + ])); + } + + return $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'An unknown error occured', + ], + 'channel' => $message['channel'], + ])); + } + + final public function broadcast( + mixed $payload, + ?string $event = null, + ?string $channel = null, + bool $including_self = false + ) : void { + if (! $this->channel) { + $this->error('Channel not found'); + + return; + } + + foreach ($this->channel->getConnections() as $channel_conection) { + if ($channel_conection !== $this->connection) { + $channel_conection->send(json_encode([ + 'event' => ($event ?? $this->event), + 'data' => $payload, + 'channel' => $channel ?? $this->channel->getName(), + ])); + } + + if ($including_self) { + $this->connection->send(json_encode([ + 'event' => ($event ?? $this->event), + 'data' => $payload, + 'channel' => $channel ?? $this->channel->getName(), + ])); + } + } + } + + final public function success( + mixed $payload = null, + ?string $event = null, + ?string $channel = null + ) : bool { + $p = [ + 'event' => ($event ?? $this->event) . ':response', + 'data' => $payload, + 'channel' => $channel ?? $this->channel->getName(), + ]; + + // if payload only contains key "data" + if ( + count($p) === 1 + && isset($payload['data']) + ) { + $p['data'] = $payload['data']; + } + + if (get_class($this->connection) === MockConnection::class) { + $connection = clone $this->connection; + $connection->send(json_encode($p)); + } else { + $this->connection->send(json_encode($p)); + } + + return true; + } + + final public function error( + array|string|null $payload = null, + ?string $event = null, + ?string $channel = null + ) : bool { + if (is_string($payload)) { + $payload = [ + 'message' => $payload, + ]; + } + + $p = [ + 'event' => ($event ?? $this->event) . ':error', + 'data' => $payload, + 'channel' => $channel ?? $this->channel->getName(), + ]; + + // if payload only contains key "data" + if ( + count($p) === 1 + && isset($payload['data']) + ) { + $p['data'] = $payload['data']; + } + + // get line from where this is called from + $trace = debug_backtrace(); + $p['data']['trace'] = $trace + ? $trace[0]['line'] + : null; + + // log + \Log::channel('websocket')->error('Send error: ' . $p['data']['message'], $p); + + if (get_class($this->connection) === MockConnection::class) { + $connection = clone $this->connection; + $connection->send(json_encode($p)); + } else { + $this->connection->send(json_encode($p)); + } + + return true; + } + + protected static function get_uniquifyer($event) + { + preg_match('/[\[].*[\]]/', $event, $matches); + if (count($matches) === 1) { + $uniqiueifier = $matches[0]; + } + + return $uniqiueifier ?? null; + } + + protected static function without_uniquifyer($event) + { + return preg_replace('/[\[].*[\]]/', '', $event); + } + + private static function get_event($message) + { + $event = explode('.', $message['event']); + + if (strpos($event[0], 'pusher:') > -1) { + $event = explode(':', $event[0]); + } + + return $event; + } +} diff --git a/src/Websocket/Controllers/ExampleController.php b/src/Websocket/Controllers/ExampleController.php new file mode 100644 index 0000000..597f819 --- /dev/null +++ b/src/Websocket/Controllers/ExampleController.php @@ -0,0 +1,24 @@ +user(); + + return [ + 'data' => [ + 'Hi mom!' + ], + 'meta' => [ + 'user' => $user + ], + ]; + } +} diff --git a/src/Websocket/Handler.php b/src/Websocket/Handler.php new file mode 100644 index 0000000..323c615 --- /dev/null +++ b/src/Websocket/Handler.php @@ -0,0 +1,550 @@ +connectionCanBeMade($connection)) { + return $connection->close(); + } + + $this->verifyAppKey($connection) + ->verifyOrigin($connection) + ->limitConcurrentConnections($connection) + ->generateSocketId($connection) + ->establishConnection($connection); + + if (isset($connection->app)) { + $this->channelManager->subscribeToApp($connection->app->id); + $this->channelManager->connectionPonged($connection); + + NewConnection::dispatch( + $connection->app->id, + $connection->socketId + ); + } + } + + public function onMessage(ConnectionInterface $connection, MessageInterface $message) + { + if (! isset($connection->app)) { + return; + } + + PusherMessageFactory::createForMessage( + $message, + $connection, + $this->channelManager + )->respond(); + + // Payload json to array + $message = json_decode($message->getPayload(), true); + + // Cut short for ping pong + if (strpos($message['event'], ':ping') !== false) { + return gc_collect_cycles(); + } + + $this->handleChannelSubscriptions($message, $connection); + + if (! $channel = $this->get_connection_channel($connection, $message)) { + return $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'Channel not found', + 'meta' => $message, + ], + ])); + } + + $this->authenticateConnection($connection, $channel, $message); + + Log::channel('websocket')->info('Executing event: ' . $message['event']); + + if (strpos($message['event'], 'pusher') !== false) { + try { + return Controller::controll_message( + $connection, + $channel, + $message, + $this->channelManager + ); + } catch (Exception $e) { + return $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => $e->getMessage(), + ], + ])); + } + } + + $pid = pcntl_fork(); + + if ($pid == -1) { + Log::error('Fork error'); + } elseif ($pid == 0) { + try { + \DB::reconnect(); + + $this->setRequest($message, $connection); + $mock = new MockConnection($connection); + + Controller::controll_message( + $mock, + $channel, + $message, + $this->channelManager + ); + } catch (Exception $e) { + $mock->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => $e->getMessage(), + ], + ])); + } + + exit(0); + } else { + $this->addDataCheckLoop($connection, $message, $pid); + } + } + + /** + * Handle the websocket close. + */ + public function onClose(ConnectionInterface $connection) : void + { + if (optional($connection)->tenant) { + if (optional($connection->tenant)->tenantable) { + $connection->tenant->tenantable->logActivity('Disconnected from websocket', $connection->tenant->tenantable, 'info', 'websocket'); + } else { + $connection->tenant->logActivity('Disconnected from websocket', $connection->tenant, 'info', 'websocket'); + } + } + + // remove connection from $channel_connections + foreach ($this->channel_connections as $channel => $connections) { + if (in_array($connection->socketId, $connections)) { + $this->channel_connections[$channel] = array_diff($connections, [$connection->socketId]); + } + + if (empty(@$this->channel_connections[$channel])) { + unset($this->channel_connections[$channel]); + } + + cache()->forget( + 'ws_socket_tenantable_' . $connection->socketId, + ); + + if (@$this->channel_connections[$channel]) { + cache()->forever( + 'ws_channel_connections_' . $channel, + @$this->channel_connections[$channel] + ); + } else { + cache()->forget('ws_channel_connections_' . $channel); + } + + cache()->forever( + 'ws_active_channels', + array_keys($this->channel_connections) + ); + } + + $this->channelManager + ->unsubscribeFromAllChannels($connection) + ->then(function (bool $unsubscribed) use ($connection) : void { + if (isset($connection->app)) { + $this->channelManager->unsubscribeFromApp($connection->app->id); + + ConnectionClosed::dispatch($connection->app->id, $connection->socketId); + + cache()->forget('ws_connection_' . $connection->socketId); + } + }); + } + + /** + * Handle the websocket errors. + * + * @param WebSocketException $exception + */ + public function onError(ConnectionInterface $connection, Exception $exception) : void + { + if ($exception instanceof ExceptionsWebSocketException) { + $connection->send(json_encode( + $exception->getPayload() + )); + } + } + + /** + * Check if the connection can be made for the + * current server instance. + */ + protected function connectionCanBeMade(ConnectionInterface $connection) : bool + { + return $this->channelManager->acceptsNewConnections(); + } + + /** + * Verify the app key validity. + * + * @return $this + */ + protected function verifyAppKey(ConnectionInterface $connection) + { + $query = QueryParameters::create($connection->httpRequest); + + $appKey = $query->get('appKey'); + + if (! $app = App::findByKey($appKey)) { + throw new UnknownAppKey($appKey); + } + + $connection->app = $app; + + return $this; + } + + /** + * Verify the origin. + * + * @return $this + */ + protected function verifyOrigin(ConnectionInterface $connection) + { + if (! $connection->app->allowedOrigins) { + return $this; + } + + $header = (string) ($connection->httpRequest->getHeader('Origin')[0] ?? null); + + $origin = parse_url($header, PHP_URL_HOST) ?: $header; + + if (! $header || ! in_array($origin, $connection->app->allowedOrigins)) { + throw new OriginNotAllowed($connection->app->key); + } + + return $this; + } + + /** + * Limit the connections count by the app. + * + * @return $this + */ + protected function limitConcurrentConnections(ConnectionInterface $connection) + { + if (! is_null($capacity = $connection->app->capacity)) { + $this->channelManager + ->getGlobalConnectionsCount($connection->app->id) + ->then(function ($connectionsCount) use ($capacity, $connection) : void { + if ($connectionsCount >= $capacity) { + $exception = new ConnectionsOverCapacity; + + $payload = json_encode($exception->getPayload()); + + tap($connection)->send($payload)->close(); + } + }); + } + + return $this; + } + + /** + * Create a socket id. + * + * @return $this + */ + protected function generateSocketId(ConnectionInterface $connection) + { + $socketId = sprintf('%d.%d', random_int(1, 1000000000), random_int(1, 1000000000)); + + $connection->socketId = $socketId; + + return $this; + } + + /** + * Establish connection with the client. + * + * @return $this + */ + protected function establishConnection(ConnectionInterface $connection) + { + $connection->send(json_encode([ + 'event' => 'pusher:connection_established', + 'data' => json_encode([ + 'socket_id' => $connection->socketId, + 'activity_timeout' => 30, + ]), + ])); + + return $this; + } + + protected function get_connection_channel(&$connection, &$message) : ?PrivateChannel + { + // Put channel on its place + if (! @$message['channel'] && $message['data'] && $message['data']['channel']) { + $message['channel'] = $message['data']['channel']; + unset($message['data']['channel']); + } + + $this->channelManager->findOrCreate( + $connection->app->id, + $message['channel'] + ); + + return $this->channelManager->find( + $connection->app->id, + $message['channel'] + ); + } + + protected function handleChannelSubscriptions($message, $connection) + { + $channel_name = optional($this->get_connection_channel($connection, $message))->getName() ?? 'no-channel'; + $socket_id = $connection->socketId; + + // if not in $channel_connections add it + if (strpos($message['event'], ':subscribe') !== false) { + if (! isset($this->channel_connections[$channel_name])) { + $this->channel_connections[$channel_name] = []; + } + + if (! in_array($connection->socketId, $this->channel_connections[$this->get_connection_channel($connection, $message)->getName()])) { + $this->channel_connections[$channel_name][] = $connection->socketId; + } + + cache()->forever( + 'ws_channel_connections_' . $channel_name, + $this->channel_connections[$channel_name] + ); + + cache()->forever( + 'ws_active_channels', + array_keys($this->channel_connections) + ); + } + + if (strpos($message['event'], ':unsubscribe') !== false) { + if (isset($this->channel_connections[$channel_name])) { + $this->channel_connections[$channel_name] = array_diff($this->channel_connections[$channel_name], [$socket_id]); + } + + if (empty($this->channel_connections[$channel_name])) { + unset($this->channel_connections[$channel_name]); + } + + if (@$this->channel_connections[$channel_name]) { + cache()->forever( + 'ws_channel_connections_' . $channel_name, + $this->channel_connections[$channel_name] + ); + } else { + cache()->forget('ws_channel_connections_' . $channel_name); + } + + cache()->forever( + 'ws_active_channels', + array_keys($this->channel_connections) + ); + + Log::channel('websocket')->info('Tenant left', ['socketId' => $socket_id, 'channel' => $channel_name]); + } + + return $this; + } + + protected function setRequest($message, $connection) + { + foreach (request()->keys() as $key) { + request()->offsetUnset($key); + } + + if (optional($connection)->tenant) { + request()->merge([ + 'tenant' => $connection->tenant ?? null, + 'tenantable' => $connection->tenant->tenantable ?? null, + 'user' => optional($connection->tenant)->tenantable instanceof \App\Models\User ? $connection->tenant->tenantable : null, + 'organization' => optional($connection->tenant)->organization, + 'organization_id' => optional($connection->tenant)->organization_id, + ]); + } else { + request()->offsetUnset('tenant'); + request()->offsetUnset('tenantable'); + request()->offsetUnset('user'); + request()->offsetUnset('organization'); + request()->offsetUnset('organization_id'); + } + + request()->merge(@$message['data'] ?? []); + } + + protected function authenticateConnection( + ConnectionInterface $connection, + PrivateChannel|Channel|PresenceChannel|null $channel, + $message + ) { + if (! optional($connection)->tenant && isset($message['data']['tenant'])) { + $tenant = Tenant::where('slug', $message['data']['tenant'])->first(); + + if (! $tenant) { + return $connection->send(json_encode([ + 'channel' => $message['channel'], + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'No tenant has been supplied', + ], + ])); + } + + if (optional($tenant)->is_user) { + $connection->user = $tenant->tenantable; + + cache()->forever( + 'ws_socket_tenantable_' . $connection->socketId, + $tenant->tenantable + ); + } + + $connection->tenant = $tenant; + + if (optional($connection)->tenant) { + if (optional($connection->tenant)->tenantable) { + $connection->tenant->tenantable->logActivity('Connected to websocket', $connection->tenant->tenantable, 'info', 'websocket'); + } else { + $connection->tenant->logActivity('Connected to websocket', $connection->tenant, 'info', 'websocket'); + } + } + + $channel->saveConnection($connection); + } + + // Update last online of user if user + if (! optional($connection)->user) { + $connection->user = false; + $channel->saveConnection($connection); + } + + // Set auth or logout + ($connection->user) + ? Auth::login($connection->user) + : Auth::logout(); + + if (Auth::user()) { + Auth::user()->update_last_online(); + } + } + + private function addDataCheckLoop( + $connection, + $message, + $pid, + $optional = false, + $iteration = false + ) { + $pid = explode('_', $pid . '')[0]; + + if ($iteration >= 0 && $iteration !== false) { + $pid .= '_' . $iteration; + } + + // Set timeout start + $pidcache_start = 'dedicated_start_' . $pid; + cache()->put($pidcache_start, microtime(true), 100); + + // Periodic check for data + $this->channelManager->loop->addPeriodicTimer(0.01, function ($timer) use ( + $pidcache_start, + $message, + $pid, + $connection, + $optional, + $iteration + ) { + $pidcache_data = 'dedicated_data_' . $pid; + $pidcache_done = 'dedicated_data_' . $pid . '_done'; + + if ( + cache()->has($pidcache_start) + && ($diff = microtime(true) - ((int) cache()->get($pidcache_start))) > 20 + ) { + if (! $optional) { + $connection->send(json_encode([ + 'event' => $message['event'] . ':error', + 'data' => [ + 'message' => 'Timeout', + 'diff' => $diff, + ], + ])); + } + + $this->channelManager->loop->cancelTimer($timer); + } + + if (cache()->has($pidcache_done)) { + // call self with pid + '_0' and optional + if ($iteration === false) { + $this->addDataCheckLoop($connection, $message, $pid, true, 0); + } else { + $this->addDataCheckLoop($connection, $message, $pid, true, $iteration + 1); + } + + // Retrieve cached data + $sending = @cache()->get($pidcache_data); + + // Send the data to client + $connection->send($sending); + + // Stop periodic check + $this->channelManager->loop->cancelTimer($timer); + } + + // Prevent zombie processes + pcntl_waitpid(-1, $status, WNOHANG); + }); + } +} diff --git a/src/Websocket/MockConnection.php b/src/Websocket/MockConnection.php new file mode 100644 index 0000000..7b5ec4c --- /dev/null +++ b/src/Websocket/MockConnection.php @@ -0,0 +1,100 @@ + $value) { + $this->{$key} = $value; + } + + // get all defined properties (including private and protected) + $reflection = new \ReflectionClass($original_connection); + $properties = $reflection->getProperties(\ReflectionProperty::IS_PUBLIC | \ReflectionProperty::IS_PROTECTED | \ReflectionProperty::IS_PRIVATE); + + foreach ($properties as $property) { + // if name includes send + if (strpos($property->getName(), 'send') !== false) { + continue; + } + + try { + $property->setAccessible(true); + $this->{$property->getName()} = $property->getValue($original_connection); + } catch (\Exception $e) { + } + } + + $this->socketId = optional($original_connection)->socketId; + $this->user = optional($original_connection)->user; + $this->tenant = optional($original_connection)->tenant; + $this->tenantable = optional($original_connection)->tenantable; + + return $this; + } + + public function send($data) + { + \Log::channel('websocket')->info('[MockConnection] Send for pid: ' . getmypid(), [ + 'data' => $data, + ]); + + $key = static::getDataKey(); + + cache()->put($key, $data, 60); + cache()->put($key . '_done', true, 60); + + // ==== Alternative way without using cache + // if (is_string($data)) { + // $d = json_decode($data, true); + + // \App\Events\TenantEvent::dispatch( + // optional(optional(tenant())->tenantable)->public_id, + // $d['event'], + // (is_array($d['data'])) + // ? $d['data'] + // : ['data' => $d['data']] + // ); + // } + + return $this; + } + + private static function getDataKey() + { + $key = 'dedicated_data_' . getmypid(); + $i = ''; + + while (cache()->has($key . ($i !== '' ? '_' . $i : ''))) { + if ($i === '') { + $i = 0; + } else { + $i = (int) $i; + $i++; + } + } + + if ($i !== '') { + $i = '_' . $i; + } + + $key .= $i; + + return $key; + } +} diff --git a/tests/Apps/ConfigAppManagerTest.php b/tests/Apps/ConfigAppManagerTest.php index ce0454a..dc6b1d6 100644 --- a/tests/Apps/ConfigAppManagerTest.php +++ b/tests/Apps/ConfigAppManagerTest.php @@ -1,11 +1,11 @@ artisan('websockets:restart'); $this->assertGreaterThanOrEqual( - $start, Cache::get('beyondcode:websockets:restart', 0) + $start, Cache::get('blax:websockets:restart', 0) ); } } diff --git a/tests/Commands/StartServerTest.php b/tests/Commands/StartServerTest.php index 08f71a3..4e9aef8 100644 --- a/tests/Commands/StartServerTest.php +++ b/tests/Commands/StartServerTest.php @@ -1,8 +1,8 @@ set('websockets.replication.modes', [ 'local' => [ - 'channel_manager' => \BeyondCode\LaravelWebSockets\ChannelManagers\LocalChannelManager::class, - 'collector' => \BeyondCode\LaravelWebSockets\Statistics\Collectors\MemoryCollector::class, + 'channel_manager' => \BlaxSoftware\LaravelWebSockets\ChannelManagers\LocalChannelManager::class, + 'collector' => \BlaxSoftware\LaravelWebSockets\Statistics\Collectors\MemoryCollector::class, ], 'redis' => [ - 'channel_manager' => \BeyondCode\LaravelWebSockets\ChannelManagers\RedisChannelManager::class, + 'channel_manager' => \BlaxSoftware\LaravelWebSockets\ChannelManagers\RedisChannelManager::class, 'connection' => 'default', - 'collector' => \BeyondCode\LaravelWebSockets\Statistics\Collectors\RedisCollector::class, + 'collector' => \BlaxSoftware\LaravelWebSockets\Statistics\Collectors\RedisCollector::class, ], ]); } @@ -302,7 +302,7 @@ abstract class TestCase extends Orchestra $this->app['config']->set( 'websockets.promise_resolver', - \BeyondCode\LaravelWebSockets\Test\Mocks\PromiseResolver::class + \BlaxSoftware\LaravelWebSockets\Test\Mocks\PromiseResolver::class ); } diff --git a/tests/TestServiceProvider.php b/tests/TestServiceProvider.php index c43ce45..14f4965 100644 --- a/tests/TestServiceProvider.php +++ b/tests/TestServiceProvider.php @@ -1,6 +1,6 @@ define(\BeyondCode\LaravelWebSockets\Test\Models\User::class, function () { +$factory->define(\BlaxSoftware\LaravelWebSockets\Test\Models\User::class, function () { return [ 'name' => 'Name'.Str::random(5), 'email' => Str::random(5).'@gmail.com',