diff --git a/.circleci/config.yml b/.circleci/config.yml index 30792c8..c9f784e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,8 +1,8 @@ version: 2 jobs: - test-php72: + test-php73: docker: - - image: circleci/php:7.2-cli + - image: circleci/php:7.3-cli working_directory: ~/project steps: @@ -48,5 +48,5 @@ workflows: version: 2 test: jobs: - - test-php72 + - test-php73 - test-php74 \ No newline at end of file diff --git a/composer.json b/composer.json index eb0f0ba..9125e4c 100644 --- a/composer.json +++ b/composer.json @@ -4,17 +4,26 @@ "keywords": ["blocking", "await", "sleep", "Event Loop", "Promise", "ReactPHP", "async"], "homepage": "https://github.com/driftphp/reactphp-functions", "license": "MIT", - "autoload": { - "files": [ "src/functions_include.php" ] - }, + + "authors": [ + { + "name": "Marc Morera", + "email": "yuhu@mmoreram.com" + } + ], + "require": { - "php": ">=7.1", + "php": "^7.3", "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5", "react/promise": "^2.7 || ^1.2.1", "react/child-process": "^0.6", - "react/promise-timer": "^1.5" + "react/promise-timer": "^1.5", + "react/stream": "^1.0" }, "require-dev": { "clue/block-react": "^1.3" + }, + "autoload": { + "files": [ "src/functions_include.php" ] } } \ No newline at end of file diff --git a/src/functions.php b/src/functions.php index 072d45e..7ade598 100644 --- a/src/functions.php +++ b/src/functions.php @@ -3,11 +3,15 @@ namespace Drift\React; +use Evenement\EventEmitterInterface; use React\ChildProcess\Process; use React\EventLoop\LoopInterface; +use React\EventLoop\TimerInterface; use React\Promise\Deferred; use React\Promise\PromiseInterface; use React\Promise\Timer; +use React\Stream\ReadableStreamInterface; +use function React\Promise\reject; /** * Sleep for n seconds @@ -74,4 +78,41 @@ function mime_content_type(string $fileName, LoopInterface $loop) : PromiseInter return $deferred->promise(); } +/** + * Wait until a number of listeners are aware of stream data. + * + * @param EventEmitterInterface $stream + * @param LoopInterface $loop + * @param int $minimumListeners + * @param int $timeout + * + * @return PromiseInterface + */ +function wait_for_stream_listeners( + EventEmitterInterface $stream, + LoopInterface $loop, + int $minimumListeners = 1, + float $timeout = -1 +) : PromiseInterface +{ + if ($minimumListeners < 0) { + return reject(new \LogicException('You cannot expect negative amount of listeners in a stream.')); + } + $deferred = new Deferred(); + $timer = $loop->addPeriodicTimer(0.001, function(TimerInterface $timer) use ($deferred, $stream, $minimumListeners, $loop) { + if (count($stream->listeners('data')) >= $minimumListeners) { + $loop->cancelTimer($timer); + $deferred->resolve($stream); + } + }); + + if ($timeout>0) { + $loop->addTimer($timeout, function() use ($timer, $loop, $deferred, $timeout) { + $loop->cancelTimer($timer); + $deferred->reject(new \RuntimeException("No listeners attached after $timeout seconds")); + }); + } + + return $deferred->promise(); +} diff --git a/tests/WaitForStreamListenersTest.php b/tests/WaitForStreamListenersTest.php new file mode 100644 index 0000000..6ee6d47 --- /dev/null +++ b/tests/WaitForStreamListenersTest.php @@ -0,0 +1,126 @@ +expectNotToPerformAssertions(); + $loop = Factory::create(); + $stream = new ThroughStream(); + await(wait_for_stream_listeners($stream, $loop, 0), $loop); + } + + /** + * Test without listeners + */ + public function testNegativeListeners() + { + $this->expectException(\LogicException::class); + $loop = Factory::create(); + $stream = new ThroughStream(); + await(wait_for_stream_listeners($stream, $loop, -1), $loop); + } + + /** + * Test timeout + */ + public function testTimeout() + { + $loop = Factory::create(); + $stream = new ThroughStream(); + try { + await(wait_for_stream_listeners($stream, $loop, 1, 1), $loop); + $this->fail('Timeout should reject'); + } catch (\Exception $exception) { + $this->assertTrue(true); + } + } + + /** + * Test one listener + */ + public function testOneListener() + { + $this->expectNotToPerformAssertions(); + $loop = Factory::create(); + $stream = new ThroughStream(); + $stream->on('data', function(){}); + await(wait_for_stream_listeners($stream, $loop, 1, 1), $loop); + } + + /** + * Test two listener + */ + public function testTwoListeners() + { + $loop = Factory::create(); + $stream = new ThroughStream(); + $stream->on('data', function(){}); + + $from = time(); + try { + await(wait_for_stream_listeners($stream, $loop, 2, 1), $loop); + $this->fail('Timeout should reject'); + } catch (\Exception $exception) { + $to = time(); + $this->assertTrue(intval($to-$from) == 1); + } + + $stream->on('data', function(){}); + await(wait_for_stream_listeners($stream, $loop, 2, 1), $loop); + } + + /** + * Test future listener + */ + public function testFutureListener() + { + $this->expectNotToPerformAssertions(); + $loop = Factory::create(); + $stream = new ThroughStream(); + + $loop->addPeriodicTimer(0.1, function(TimerInterface $timer) use ($stream) { + $stream->on('data', function() {}); + }); + + await(wait_for_stream_listeners($stream, $loop, 10, 1.01), $loop); + } + + /** + * Test future listener + * + * @group X + */ + public function testFutureListenerTimeout() + { + $loop = Factory::create(); + $stream = new ThroughStream(); + + $loop->addPeriodicTimer(0.1, function(TimerInterface $timer) use ($stream) { + $stream->on('data', function() {}); + }); + + try { + await(wait_for_stream_listeners($stream, $loop, 10, 9), $loop); + $this->fail('Timeout should reject'); + } catch (\Exception $exception) { + // Great + } + } +} \ No newline at end of file