Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
version: 2
jobs:
test-php72:
test-php73:
docker:
- image: circleci/php:7.2-cli
- image: circleci/php:7.3-cli

working_directory: ~/project
steps:
Expand Down Expand Up @@ -48,5 +48,5 @@ workflows:
version: 2
test:
jobs:
- test-php72
- test-php73
- test-php74
19 changes: 14 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@
"keywords": ["blocking", "await", "sleep", "Event Loop", "Promise", "ReactPHP", "async"],
"homepage": "https://github.com/driftphp/reactphp-functions",
"license": "MIT",
"autoload": {
"files": [ "src/functions_include.php" ]
},

"authors": [
{
"name": "Marc Morera",
"email": "[email protected]"
}
],

"require": {
"php": ">=7.1",
"php": "^7.3",
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5",
"react/promise": "^2.7 || ^1.2.1",
"react/child-process": "^0.6",
"react/promise-timer": "^1.5"
"react/promise-timer": "^1.5",
"react/stream": "^1.0"
},
"require-dev": {
"clue/block-react": "^1.3"
},
"autoload": {
"files": [ "src/functions_include.php" ]
}
}
41 changes: 41 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@

namespace Drift\React;

use Evenement\EventEmitterInterface;
use React\ChildProcess\Process;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use React\Promise\Timer;
use React\Stream\ReadableStreamInterface;
use function React\Promise\reject;

/**
* Sleep for n seconds
Expand Down Expand Up @@ -74,4 +78,41 @@ function mime_content_type(string $fileName, LoopInterface $loop) : PromiseInter
return $deferred->promise();
}

/**
* Wait until a number of listeners are aware of stream data.
*
* @param EventEmitterInterface $stream
* @param LoopInterface $loop
* @param int $minimumListeners
* @param int $timeout
*
* @return PromiseInterface<ReadableStreamInterface>
*/
function wait_for_stream_listeners(
EventEmitterInterface $stream,
LoopInterface $loop,
int $minimumListeners = 1,
float $timeout = -1
) : PromiseInterface
{
if ($minimumListeners < 0) {
return reject(new \LogicException('You cannot expect negative amount of listeners in a stream.'));
}

$deferred = new Deferred();
$timer = $loop->addPeriodicTimer(0.001, function(TimerInterface $timer) use ($deferred, $stream, $minimumListeners, $loop) {
if (count($stream->listeners('data')) >= $minimumListeners) {
$loop->cancelTimer($timer);
$deferred->resolve($stream);
}
});

if ($timeout>0) {
$loop->addTimer($timeout, function() use ($timer, $loop, $deferred, $timeout) {
$loop->cancelTimer($timer);
$deferred->reject(new \RuntimeException("No listeners attached after $timeout seconds"));
});
}

return $deferred->promise();
}
126 changes: 126 additions & 0 deletions tests/WaitForStreamListenersTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
<?php


namespace Drift\React\Tests;

use PHPUnit\Framework\TestCase;
use React\EventLoop\Factory;
use React\EventLoop\TimerInterface;
use React\Stream\ThroughStream;
use function Clue\React\Block\await;
use function Drift\React\wait_for_stream_listeners;

/**
* Class WaitForStreamListenersTest
*/
class WaitForStreamListenersTest extends TestCase
{
/**
* Test without listeners
*/
public function testNoListeners()
{
$this->expectNotToPerformAssertions();
$loop = Factory::create();
$stream = new ThroughStream();
await(wait_for_stream_listeners($stream, $loop, 0), $loop);
}

/**
* Test without listeners
*/
public function testNegativeListeners()
{
$this->expectException(\LogicException::class);
$loop = Factory::create();
$stream = new ThroughStream();
await(wait_for_stream_listeners($stream, $loop, -1), $loop);
}

/**
* Test timeout
*/
public function testTimeout()
{
$loop = Factory::create();
$stream = new ThroughStream();
try {
await(wait_for_stream_listeners($stream, $loop, 1, 1), $loop);
$this->fail('Timeout should reject');
} catch (\Exception $exception) {
$this->assertTrue(true);
}
}

/**
* Test one listener
*/
public function testOneListener()
{
$this->expectNotToPerformAssertions();
$loop = Factory::create();
$stream = new ThroughStream();
$stream->on('data', function(){});
await(wait_for_stream_listeners($stream, $loop, 1, 1), $loop);
}

/**
* Test two listener
*/
public function testTwoListeners()
{
$loop = Factory::create();
$stream = new ThroughStream();
$stream->on('data', function(){});

$from = time();
try {
await(wait_for_stream_listeners($stream, $loop, 2, 1), $loop);
$this->fail('Timeout should reject');
} catch (\Exception $exception) {
$to = time();
$this->assertTrue(intval($to-$from) == 1);
}

$stream->on('data', function(){});
await(wait_for_stream_listeners($stream, $loop, 2, 1), $loop);
}

/**
* Test future listener
*/
public function testFutureListener()
{
$this->expectNotToPerformAssertions();
$loop = Factory::create();
$stream = new ThroughStream();

$loop->addPeriodicTimer(0.1, function(TimerInterface $timer) use ($stream) {
$stream->on('data', function() {});
});

await(wait_for_stream_listeners($stream, $loop, 10, 1.01), $loop);
}

/**
* Test future listener
*
* @group X
*/
public function testFutureListenerTimeout()
{
$loop = Factory::create();
$stream = new ThroughStream();

$loop->addPeriodicTimer(0.1, function(TimerInterface $timer) use ($stream) {
$stream->on('data', function() {});
});

try {
await(wait_for_stream_listeners($stream, $loop, 10, 9), $loop);
$this->fail('Timeout should reject');
} catch (\Exception $exception) {
// Great
}
}
}