diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 2b9a9be..5f80f58 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -2,17 +2,15 @@ namespace React\Stream; -class ThroughStream extends CompositeStream +use Evenement\EventEmitter; + +class ThroughStream extends EventEmitter implements DuplexStreamInterface { + private $readable = true; + private $writable = true; + private $closed = false; private $paused = false; - - public function __construct() - { - $readable = new ReadableStream(); - $writable = new WritableStream(); - - parent::__construct($readable, $writable); - } + private $drain = false; public function filter($data) { @@ -21,39 +19,80 @@ public function filter($data) public function pause() { - parent::pause(); $this->paused = true; } public function resume() { - parent::resume(); + if ($this->drain) { + $this->drain = false; + $this->emit('drain'); + } $this->paused = false; } + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + return Util::pipe($this, $dest, $options); + } + + public function isReadable() + { + return $this->readable; + } + + public function isWritable() + { + return $this->writable; + } + public function write($data) { - if (!$this->writable->isWritable()) { + if (!$this->writable) { return false; } - $this->readable->emit('data', array($this->filter($data))); + $this->emit('data', array($this->filter($data))); + + if ($this->paused) { + $this->drain = true; + return false; + } - return $this->writable->isWritable() && !$this->paused; + return true; } public function end($data = null) { - if (!$this->writable->isWritable()) { + if (!$this->writable) { return; } if (null !== $data) { - $this->readable->emit('data', array($this->filter($data))); + $this->write($data); } - $this->readable->emit('end'); + $this->readable = false; + $this->writable = false; + $this->paused = true; + $this->drain = false; + + $this->emit('end'); + $this->close(); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->readable = false; + $this->writable = false; + $this->closed = true; + $this->paused = true; + $this->drain = false; - $this->writable->end(); + $this->emit('close'); } } diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index b20100e..263eb09 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -10,6 +10,15 @@ */ class ThroughStreamTest extends TestCase { + /** @test */ + public function itShouldReturnTrueForAnyDataWrittenToIt() + { + $through = new ThroughStream(); + $ret = $through->write('foo'); + + $this->assertTrue($ret); + } + /** @test */ public function itShouldEmitAnyDataWrittenToIt() { @@ -18,6 +27,39 @@ public function itShouldEmitAnyDataWrittenToIt() $through->write('foo'); } + /** @test */ + public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused() + { + $through = new ThroughStream(); + $through->pause(); + $ret = $through->write('foo'); + + $this->assertFalse($ret); + } + + /** @test */ + public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWhenPaused() + { + $through = new ThroughStream(); + $through->pause(); + $through->write('foo'); + + $through->on('drain', $this->expectCallableOnce()); + $through->resume(); + } + + /** @test */ + public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause() + { + $through = new ThroughStream(); + $through->on('drain', $this->expectCallableNever()); + $through->pause(); + $through->resume(); + $ret = $through->write('foo'); + + $this->assertTrue($ret); + } + /** @test */ public function pipingStuffIntoItShouldWork() { @@ -123,34 +165,6 @@ public function itShouldBeWritableByDefault() $this->assertTrue($through->isWritable()); } - /** @test */ - public function pauseShouldDelegateToPipeSource() - { - $input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause'))->getMock(); - $input - ->expects($this->once()) - ->method('pause'); - - $through = new ThroughStream(); - $input->pipe($through); - - $through->pause(); - } - - /** @test */ - public function resumeShouldDelegateToPipeSource() - { - $input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('resume'))->getMock(); - $input - ->expects($this->once()) - ->method('resume'); - - $through = new ThroughStream(); - $input->pipe($through); - - $through->resume(); - } - /** @test */ public function closeShouldCloseOnce() {