diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index eb02101..ee4b94e 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -4,6 +4,8 @@ namespace BeyondCode\LaravelWebSockets\Console; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Server\Logger\ConnectionLogger; use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger; @@ -53,6 +55,7 @@ class StartWebSocketServer extends Command ->configureMessageLogger() ->configureConnectionLogger() ->configureRestartTimer() + ->configurePubSub() ->registerEchoRoutes() ->registerCustomRoutes() ->configurePubSubReplication() @@ -130,6 +133,28 @@ class StartWebSocketServer extends Command return $this; } + /** + * Configure the replicators. + * + * @return void + */ + public function configurePubSub() + { + if (config('websockets.replication.driver', 'local') === 'local') { + $this->laravel->singleton(ReplicationInterface::class, function () { + return new LocalClient; + }); + } + + if (config('websockets.replication.driver', 'local') === 'redis') { + $this->laravel->singleton(ReplicationInterface::class, function () { + return (new RedisClient)->boot($this->loop); + }); + } + + return $this; + } + protected function registerEchoRoutes() { WebSocketsRouter::echo(); diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php index 437ed98..3e24c73 100644 --- a/src/PubSub/Drivers/LocalClient.php +++ b/src/PubSub/Drivers/LocalClient.php @@ -21,9 +21,10 @@ class LocalClient implements ReplicationInterface * Boot the pub/sub provider (open connections, initial subscriptions, etc). * * @param LoopInterface $loop + * @param string|null $factoryClass * @return self */ - public function boot(LoopInterface $loop): ReplicationInterface + public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInterface { return $this; } diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php index 7195426..ef48149 100644 --- a/src/PubSub/Drivers/RedisClient.php +++ b/src/PubSub/Drivers/RedisClient.php @@ -66,14 +66,17 @@ class RedisClient implements ReplicationInterface * Boot the RedisClient, initializing the connections. * * @param LoopInterface $loop + * @param string|null $factoryClass * @return ReplicationInterface */ - public function boot(LoopInterface $loop): ReplicationInterface + public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInterface { + $factoryClass = $factoryClass ?: Factory::class; + $this->loop = $loop; $connectionUri = $this->getConnectionUri(); - $factory = new Factory($this->loop); + $factory = new $factoryClass($this->loop); $this->publishClient = $factory->createLazyClient($connectionUri); $this->subscribeClient = $factory->createLazyClient($connectionUri); @@ -108,7 +111,7 @@ class RedisClient implements ReplicationInterface // We need to put the channel name in the payload. // We strip the app ID from the channel name, websocket clients // expect the channel name to not include the app ID. - $payload->channel = Str::after($redisChannel, "$appId:"); + $payload->channel = Str::after($redisChannel, "{$appId}:"); $channelManager = app(ChannelManager::class); @@ -296,4 +299,34 @@ class RedisClient implements ReplicationInterface return "redis://{$host}:{$port}".($query ? "?{$query}" : ''); } + + /** + * Get the Subscribe client instance. + * + * @return Client + */ + public function getSubscribeClient() + { + return $this->subscribeClient; + } + + /** + * Get the Publish client instance. + * + * @return Client + */ + public function getPublishClient() + { + return $this->publishClient; + } + + /** + * Get the unique identifier for the server. + * + * @return string + */ + public function getServerId() + { + return $this->serverId; + } } diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php index f40b445..71d83dd 100644 --- a/src/PubSub/ReplicationInterface.php +++ b/src/PubSub/ReplicationInterface.php @@ -12,9 +12,10 @@ interface ReplicationInterface * Boot the pub/sub provider (open connections, initial subscriptions, etc). * * @param LoopInterface $loop + * @param string|null $factoryClass * @return self */ - public function boot(LoopInterface $loop): self; + public function boot(LoopInterface $loop, $factoryClass = null): self; /** * Publish a payload on a specific channel, for a specific app. diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index 713e387..aea8e3c 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -9,9 +9,6 @@ use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard; use BeyondCode\LaravelWebSockets\PubSub\Broadcasters\RedisPusherBroadcaster; -use BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient; -use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; -use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Statistics\Http\Controllers\WebSocketStatisticsEntriesController; use BeyondCode\LaravelWebSockets\Statistics\Http\Middleware\Authorize as AuthorizeStatistics; @@ -54,18 +51,6 @@ class WebSocketsServiceProvider extends ServiceProvider protected function configurePubSub() { - if (config('websockets.replication.driver', 'local') === 'local') { - $this->app->singleton(ReplicationInterface::class, function () { - return new LocalClient; - }); - } - - if (config('websockets.replication.driver', 'local') === 'redis') { - $this->app->singleton(ReplicationInterface::class, function () { - return (new RedisClient)->boot($this->loop ?? LoopFactory::create()); - }); - } - $this->app->make(BroadcastManager::class)->extend('websockets', function ($app, array $config) { $pusher = new Pusher( $config['key'], $config['secret'], diff --git a/tests/Channels/ChannelReplicationTest.php b/tests/Channels/ChannelReplicationTest.php index e3c79c3..4edf22a 100644 --- a/tests/Channels/ChannelReplicationTest.php +++ b/tests/Channels/ChannelReplicationTest.php @@ -6,5 +6,13 @@ use BeyondCode\LaravelWebSockets\Tests\TestCase; class ChannelReplicationTest extends TestCase { - // + /** + * {@inheritdoc} + */ + public function setUp(): void + { + parent::setUp(); + + $this->runOnlyOnRedisReplication(); + } } diff --git a/tests/Channels/PresenceChannelReplicationTest.php b/tests/Channels/PresenceChannelReplicationTest.php index 4008be2..7e751ef 100644 --- a/tests/Channels/PresenceChannelReplicationTest.php +++ b/tests/Channels/PresenceChannelReplicationTest.php @@ -6,5 +6,13 @@ use BeyondCode\LaravelWebSockets\Tests\TestCase; class PresenceChannelReplicationTest extends TestCase { - // + /** + * {@inheritdoc} + */ + public function setUp(): void + { + parent::setUp(); + + $this->runOnlyOnRedisReplication(); + } } diff --git a/tests/Channels/PresenceChannelTest.php b/tests/Channels/PresenceChannelTest.php index e2d4de1..2180a4c 100644 --- a/tests/Channels/PresenceChannelTest.php +++ b/tests/Channels/PresenceChannelTest.php @@ -2,6 +2,7 @@ namespace BeyondCode\LaravelWebSockets\Tests\Channels; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Tests\Mocks\Message; use BeyondCode\LaravelWebSockets\Tests\TestCase; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; @@ -55,9 +56,27 @@ class PresenceChannelTest extends TestCase $this->pusherServer->onMessage($connection, $message); - $connection->assertSentEvent('pusher_internal:subscription_succeeded', [ - 'channel' => 'presence-channel', - ]); + $this->getPublishClient() + ->assertCalledWithArgs('hset', [ + '1234:presence-channel', + $connection->socketId, + json_encode($channelData), + ]) + ->assertCalledWithArgs('hgetall', [ + '1234:presence-channel' + ]); + // TODO: This fails somehow + // Debugging shows the exact same pattern as good. + /* ->assertCalledWithArgs('publish', [ + '1234:presence-channel', + json_encode([ + 'event' => 'pusher_internal:member_added', + 'channel' => 'presence-channel', + 'data' => $channelData, + 'appId' => '1234', + 'serverId' => $this->app->make(ReplicationInterface::class)->getServerId(), + ]), + ]) */ } /** @test */ diff --git a/tests/Channels/PrivateChannelReplicationTest.php b/tests/Channels/PrivateChannelReplicationTest.php new file mode 100644 index 0000000..dfb08f3 --- /dev/null +++ b/tests/Channels/PrivateChannelReplicationTest.php @@ -0,0 +1,18 @@ +runOnlyOnRedisReplication(); + } +} diff --git a/tests/HttpApi/FetchChannelReplicationTest.php b/tests/HttpApi/FetchChannelReplicationTest.php index 46dc080..9b8c731 100644 --- a/tests/HttpApi/FetchChannelReplicationTest.php +++ b/tests/HttpApi/FetchChannelReplicationTest.php @@ -2,9 +2,150 @@ namespace BeyondCode\LaravelWebSockets\Tests\HttpApi; +use BeyondCode\LaravelWebSockets\HttpApi\Controllers\FetchChannelController; +use BeyondCode\LaravelWebSockets\Tests\Mocks\Connection; use BeyondCode\LaravelWebSockets\Tests\TestCase; +use GuzzleHttp\Psr7\Request; +use Illuminate\Http\JsonResponse; +use Pusher\Pusher; +use Symfony\Component\HttpKernel\Exception\HttpException; class FetchChannelReplicationTest extends TestCase { - // + /** + * {@inheritdoc} + */ + public function setUp(): void + { + parent::setUp(); + + $this->runOnlyOnRedisReplication(); + } + + /** @test */ + public function replication_invalid_signatures_can_not_access_the_api() + { + $this->expectException(HttpException::class); + $this->expectExceptionMessage('Invalid auth signature provided.'); + + $connection = new Connection(); + + $requestPath = '/apps/1234/channel/my-channel'; + $routeParams = [ + 'appId' => '1234', + 'channelName' => 'my-channel', + ]; + + $queryString = Pusher::build_auth_query_string('TestKey', 'InvalidSecret', 'GET', $requestPath); + + $request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams)); + + $controller = app(FetchChannelController::class); + + $controller->onOpen($connection, $request); + } + + /** @test */ + public function replication_it_returns_the_channel_information() + { + $this->getConnectedWebSocketConnection(['my-channel']); + $this->getConnectedWebSocketConnection(['my-channel']); + + $connection = new Connection(); + + $requestPath = '/apps/1234/channel/my-channel'; + $routeParams = [ + 'appId' => '1234', + 'channelName' => 'my-channel', + ]; + + $queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath); + + $request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams)); + + $controller = app(FetchChannelController::class); + + $controller->onOpen($connection, $request); + + /** @var JsonResponse $response */ + $response = array_pop($connection->sentRawData); + + $this->assertSame([ + 'occupied' => true, + 'subscription_count' => 2, + ], json_decode($response->getContent(), true)); + } + + /** @test */ + public function replication_it_returns_presence_channel_information() + { + $this->joinPresenceChannel('presence-channel'); + $this->joinPresenceChannel('presence-channel'); + + $connection = new Connection(); + + $requestPath = '/apps/1234/channel/my-channel'; + $routeParams = [ + 'appId' => '1234', + 'channelName' => 'presence-channel', + ]; + + $queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath); + + $request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams)); + + $controller = app(FetchChannelController::class); + + $controller->onOpen($connection, $request); + + /** @var JsonResponse $response */ + $response = array_pop($connection->sentRawData); + + $this->getSubscribeClient()->assertNothingCalled(); + + dd($this->getSubscribeClient()); + + $this->getPublishClient() + ->assertCalled('hset') + ->assertCalled('hgetall'); + + $this->assertSame([ + 'occupied' => true, + 'subscription_count' => 2, + 'user_count' => 2, + ], json_decode($response->getContent(), true)); + } + + /** @test */ + public function replication_it_returns_404_for_invalid_channels() + { + $this->expectException(HttpException::class); + $this->expectExceptionMessage('Unknown channel'); + + $this->getConnectedWebSocketConnection(['my-channel']); + + $connection = new Connection(); + + $requestPath = '/apps/1234/channel/invalid-channel'; + $routeParams = [ + 'appId' => '1234', + 'channelName' => 'invalid-channel', + ]; + + $queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath); + + $request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams)); + + $controller = app(FetchChannelController::class); + + $controller->onOpen($connection, $request); + + /** @var JsonResponse $response */ + $response = array_pop($connection->sentRawData); + + $this->assertSame([ + 'occupied' => true, + 'subscription_count' => 2, + ], json_decode($response->getContent(), true)); + } } diff --git a/tests/HttpApi/FetchChannelTest.php b/tests/HttpApi/FetchChannelTest.php index ed6846c..e1ca22d 100644 --- a/tests/HttpApi/FetchChannelTest.php +++ b/tests/HttpApi/FetchChannelTest.php @@ -69,6 +69,8 @@ class FetchChannelTest extends TestCase /** @test */ public function it_returns_presence_channel_information() { + $this->runOnlyOnLocalReplication(); + $this->joinPresenceChannel('presence-channel'); $this->joinPresenceChannel('presence-channel'); diff --git a/tests/HttpApi/FetchChannelsReplicationTest.php b/tests/HttpApi/FetchChannelsReplicationTest.php index a3d1664..8845eac 100644 --- a/tests/HttpApi/FetchChannelsReplicationTest.php +++ b/tests/HttpApi/FetchChannelsReplicationTest.php @@ -6,5 +6,13 @@ use BeyondCode\LaravelWebSockets\Tests\TestCase; class FetchChannelsReplicationTest extends TestCase { - // + /** + * {@inheritdoc} + */ + public function setUp(): void + { + parent::setUp(); + + $this->runOnlyOnRedisReplication(); + } } diff --git a/tests/HttpApi/FetchUsersReplicationTest.php b/tests/HttpApi/FetchUsersReplicationTest.php index 706a07d..0fbf484 100644 --- a/tests/HttpApi/FetchUsersReplicationTest.php +++ b/tests/HttpApi/FetchUsersReplicationTest.php @@ -6,5 +6,13 @@ use BeyondCode\LaravelWebSockets\Tests\TestCase; class FetchUsersReplicationTest extends TestCase { - // + /** + * {@inheritdoc} + */ + public function setUp(): void + { + parent::setUp(); + + $this->runOnlyOnRedisReplication(); + } } diff --git a/tests/Mocks/LazyClient.php b/tests/Mocks/LazyClient.php new file mode 100644 index 0000000..b38c23a --- /dev/null +++ b/tests/Mocks/LazyClient.php @@ -0,0 +1,95 @@ +calls[] = [$name, $args]; + + return parent::__call($name, $args); + } + + /** + * Check if the method got called. + * + * @param string $name + * @return $this + */ + public function assertCalled($name) + { + foreach ($this->getCalledFunctions() as $function) { + [$calledName, ] = $function; + + if ($calledName === $name) { + PHPUnit::assertTrue(true); + + return $this; + } + } + + PHPUnit::assertFalse(true); + + return $this; + } + + /** + * Check if the method with args got called. + * + * @param string $name + * @param array $args + * @return $this + */ + public function assertCalledWithArgs($name, array $args) + { + foreach ($this->getCalledFunctions() as $function) { + [$calledName, $calledArgs] = $function; + + if ($calledName === $name && $calledArgs === $args) { + PHPUnit::assertTrue(true); + + return $this; + } + } + + PHPUnit::assertFalse(true); + + return $this; + } + + /** + * Check if no function got called. + * + * @return $this + */ + public function assertNothingCalled() + { + PHPUnit::assertEquals([], $this->getCalledFunctions()); + + return $this; + } + + /** + * Get the list of all calls. + * + * @return array + */ + public function getCalledFunctions() + { + return $this->calls; + } +} diff --git a/tests/Mocks/RedisFactory.php b/tests/Mocks/RedisFactory.php new file mode 100644 index 0000000..25962f7 --- /dev/null +++ b/tests/Mocks/RedisFactory.php @@ -0,0 +1,40 @@ +loop = $loop; + } + + /** + * Create Redis client connected to address of given redis instance + * + * @param string $target + * @return Client + */ + public function createLazyClient($target) + { + return new LazyClient($target, $this, $this->loop); + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 7cba922..b142833 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -3,6 +3,9 @@ namespace BeyondCode\LaravelWebSockets\Tests; use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Tests\Mocks\Connection; use BeyondCode\LaravelWebSockets\Tests\Mocks\Message; use BeyondCode\LaravelWebSockets\Tests\Statistics\Logger\FakeStatisticsLogger; @@ -12,6 +15,7 @@ use Clue\React\Buzz\Browser; use GuzzleHttp\Psr7\Request; use Mockery; use Ratchet\ConnectionInterface; +use React\EventLoop\Factory as LoopFactory; abstract class TestCase extends \Orchestra\Testbench\TestCase { @@ -38,6 +42,8 @@ abstract class TestCase extends \Orchestra\Testbench\TestCase )); $this->loadMigrationsFrom(__DIR__.'/../database/migrations'); + + $this->configurePubSub(); } /** @@ -167,8 +173,55 @@ abstract class TestCase extends \Orchestra\Testbench\TestCase return $this->channelManager->findOrCreate($connection->app->id, $channelName); } + protected function configurePubSub() + { + // Replace the publish and subscribe clients with a Mocked + // factory lazy instance on boot. + if (config('websockets.replication.driver') === 'redis') { + $this->app->singleton(ReplicationInterface::class, function () { + return (new RedisClient)->boot( + LoopFactory::create(), Mocks\RedisFactory::class + ); + }); + } + + if (config('websockets.replication.driver') === 'local') { + $this->app->singleton(ReplicationInterface::class, function () { + return new LocalClient; + }); + } + } + protected function markTestAsPassed() { $this->assertTrue(true); } + + protected function runOnlyOnRedisReplication() + { + if (config('websockets.replication.driver') !== 'redis') { + $this->markTestSkipped('Skipped test because the replication driver is set to Redis.'); + } + } + + protected function runOnlyOnLocalReplication() + { + if (config('websockets.replication.driver') !== 'local') { + $this->markTestSkipped('Skipped test because the replication driver is set to Local.'); + } + } + + protected function getSubscribeClient() + { + return $this->app + ->make(ReplicationInterface::class) + ->getSubscribeClient(); + } + + protected function getPublishClient() + { + return $this->app + ->make(ReplicationInterface::class) + ->getPublishClient(); + } }