diff --git a/src/Buffer.php b/src/Buffer.php index 1a6c32d..9b1221b 100644 --- a/src/Buffer.php +++ b/src/Buffer.php @@ -5,7 +5,6 @@ use Evenement\EventEmitter; use React\EventLoop\LoopInterface; -/** @event full-drain */ class Buffer extends EventEmitter implements WritableStreamInterface { public $stream; @@ -55,9 +54,9 @@ public function end($data = null) $this->writable = false; - if ($this->listening) { - $this->on('full-drain', array($this, 'close')); - } else { + // close immediately if buffer is already empty + // otherwise wait for buffer to flush first + if ($this->data === '') { $this->close(); } } @@ -120,12 +119,9 @@ public function handleWrite() $this->emit('drain', array($this)); } - // buffer is now completely empty (and not closed already) - if ($this->data === '' && $this->listening) { - $this->loop->removeWriteStream($this->stream); - $this->listening = false; - - $this->emit('full-drain', array($this)); + // buffer is end()ing and now completely empty (and not closed already) + if (!$this->writable && $this->data === '') { + $this->close(); } } } diff --git a/tests/BufferTest.php b/tests/BufferTest.php index 2759d13..dc87ced 100644 --- a/tests/BufferTest.php +++ b/tests/BufferTest.php @@ -204,7 +204,7 @@ public function testWriteInDrain() * @covers React\Stream\Buffer::write * @covers React\Stream\Buffer::handleWrite */ - public function testDrainAndFullDrainAfterWrite() + public function testDrainAfterWrite() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); @@ -213,45 +213,41 @@ public function testDrainAndFullDrainAfterWrite() $buffer->softLimit = 2; $buffer->on('drain', $this->expectCallableOnce()); - $buffer->on('full-drain', $this->expectCallableOnce()); $buffer->write("foo"); $buffer->handleWrite(); } /** - * @covers React\Stream\Buffer::write - * @covers React\Stream\Buffer::handleWrite + * @covers React\Stream\Buffer::end */ - public function testCloseDuringDrainWillNotEmitFullDrain() + public function testEndWithoutDataClosesImmediatelyIfBufferIsEmpty() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $buffer = new Buffer($stream, $loop); - $buffer->softLimit = 2; - - // close buffer on drain event => expect close event, but no full-drain after - $buffer->on('drain', $this->expectCallableOnce()); - $buffer->on('drain', array($buffer, 'close')); + $buffer->on('error', $this->expectCallableNever()); $buffer->on('close', $this->expectCallableOnce()); - $buffer->on('full-drain', $this->expectCallableNever()); - $buffer->write("foo"); - $buffer->handleWrite(); + $this->assertTrue($buffer->isWritable()); + $buffer->end(); + $this->assertFalse($buffer->isWritable()); } /** * @covers React\Stream\Buffer::end */ - public function testEnd() + public function testEndWithoutDataDoesNotCloseIfBufferIsFull() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $buffer = new Buffer($stream, $loop); $buffer->on('error', $this->expectCallableNever()); - $buffer->on('close', $this->expectCallableOnce()); + $buffer->on('close', $this->expectCallableNever()); + + $buffer->write('foo'); $this->assertTrue($buffer->isWritable()); $buffer->end(); @@ -261,21 +257,46 @@ public function testEnd() /** * @covers React\Stream\Buffer::end */ - public function testEndWithData() + public function testEndWithDataClosesImmediatelyIfBufferFlushes() { $stream = fopen('php://temp', 'r+'); - $loop = $this->createWriteableLoopMock(); + $loop = $this->createLoopMock(); $buffer = new Buffer($stream, $loop); $buffer->on('error', $this->expectCallableNever()); $buffer->on('close', $this->expectCallableOnce()); + $this->assertTrue($buffer->isWritable()); $buffer->end('final words'); + $this->assertFalse($buffer->isWritable()); + $buffer->handleWrite(); rewind($stream); $this->assertSame('final words', stream_get_contents($stream)); } + /** + * @covers React\Stream\Buffer::end + */ + public function testEndWithDataDoesNotCloseImmediatelyIfBufferIsFull() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + + $buffer = new Buffer($stream, $loop); + $buffer->on('error', $this->expectCallableNever()); + $buffer->on('close', $this->expectCallableNever()); + + $buffer->write('foo'); + + $this->assertTrue($buffer->isWritable()); + $buffer->end('final words'); + $this->assertFalse($buffer->isWritable()); + + rewind($stream); + $this->assertSame('', stream_get_contents($stream)); + } + /** * @covers React\Stream\Buffer::isWritable * @covers React\Stream\Buffer::close @@ -301,13 +322,14 @@ public function testClose() public function testWritingToClosedBufferShouldNotWriteToStream() { $stream = fopen('php://temp', 'r+'); - $loop = $this->createWriteableLoopMock(); + $loop = $this->createLoopMock(); $buffer = new Buffer($stream, $loop); $buffer->close(); $buffer->write('foo'); + $buffer->handleWrite(); rewind($stream); $this->assertSame('', stream_get_contents($stream)); } @@ -346,7 +368,7 @@ public function testWritingToClosedStream() } list($a, $b) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); - $loop = $this->createWriteableLoopMock(); + $loop = $this->createLoopMock(); $error = null; @@ -356,9 +378,11 @@ public function testWritingToClosedStream() }); $buffer->write('foo'); + $buffer->handleWrite(); stream_socket_shutdown($b, STREAM_SHUT_RD); stream_socket_shutdown($a, STREAM_SHUT_RD); $buffer->write('bar'); + $buffer->handleWrite(); $this->assertInstanceOf('Exception', $error); $this->assertSame('Unable to write to stream: fwrite(): send of 3 bytes failed with errno=32 Broken pipe', $error->getMessage()); diff --git a/tests/UtilTest.php b/tests/UtilTest.php index eaaa8ec..598ad54 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -106,7 +106,7 @@ public function testPipeWithBuffer() $readable = new Stub\ReadableStreamStub(); $stream = fopen('php://temp', 'r+'); - $loop = $this->createWriteableLoopMock(); + $loop = $this->createLoopMock(); $buffer = new Buffer($stream, $loop); $readable->pipe($buffer); @@ -114,6 +114,7 @@ public function testPipeWithBuffer() $readable->write('hello, I am some '); $readable->write('random data'); + $buffer->handleWrite(); rewind($stream); $this->assertSame('hello, I am some random data', stream_get_contents($stream)); } @@ -132,19 +133,6 @@ public function forwardEventsShouldSetupForwards() $source->emit('foo', array('bar')); } - private function createWriteableLoopMock() - { - $loop = $this->createLoopMock(); - $loop - ->expects($this->any()) - ->method('addWriteStream') - ->will($this->returnCallback(function ($stream, $listener) { - call_user_func($listener, $stream); - })); - - return $loop; - } - private function createLoopMock() { return $this->getMock('React\EventLoop\LoopInterface');