Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions src/Buffer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;

/** @event full-drain */
class Buffer extends EventEmitter implements WritableStreamInterface
{
public $stream;
Expand Down Expand Up @@ -55,9 +54,9 @@ public function end($data = null)

$this->writable = false;

if ($this->listening) {
$this->on('full-drain', array($this, 'close'));
} else {
// close immediately if buffer is already empty
// otherwise wait for buffer to flush first
if ($this->data === '') {
$this->close();
}
}
Expand Down Expand Up @@ -120,12 +119,9 @@ public function handleWrite()
$this->emit('drain', array($this));
}

// buffer is now completely empty (and not closed already)
if ($this->data === '' && $this->listening) {
$this->loop->removeWriteStream($this->stream);
$this->listening = false;

$this->emit('full-drain', array($this));
// buffer is end()ing and now completely empty (and not closed already)
if (!$this->writable && $this->data === '') {
$this->close();
}
}
}
62 changes: 43 additions & 19 deletions tests/BufferTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public function testWriteInDrain()
* @covers React\Stream\Buffer::write
* @covers React\Stream\Buffer::handleWrite
*/
public function testDrainAndFullDrainAfterWrite()
public function testDrainAfterWrite()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createLoopMock();
Expand All @@ -213,45 +213,41 @@ public function testDrainAndFullDrainAfterWrite()
$buffer->softLimit = 2;

$buffer->on('drain', $this->expectCallableOnce());
$buffer->on('full-drain', $this->expectCallableOnce());

$buffer->write("foo");
$buffer->handleWrite();
}

/**
* @covers React\Stream\Buffer::write
* @covers React\Stream\Buffer::handleWrite
* @covers React\Stream\Buffer::end
*/
public function testCloseDuringDrainWillNotEmitFullDrain()
public function testEndWithoutDataClosesImmediatelyIfBufferIsEmpty()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createLoopMock();

$buffer = new Buffer($stream, $loop);
$buffer->softLimit = 2;

// close buffer on drain event => expect close event, but no full-drain after
$buffer->on('drain', $this->expectCallableOnce());
$buffer->on('drain', array($buffer, 'close'));
$buffer->on('error', $this->expectCallableNever());
$buffer->on('close', $this->expectCallableOnce());
$buffer->on('full-drain', $this->expectCallableNever());

$buffer->write("foo");
$buffer->handleWrite();
$this->assertTrue($buffer->isWritable());
$buffer->end();
$this->assertFalse($buffer->isWritable());
}

/**
* @covers React\Stream\Buffer::end
*/
public function testEnd()
public function testEndWithoutDataDoesNotCloseIfBufferIsFull()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createLoopMock();

$buffer = new Buffer($stream, $loop);
$buffer->on('error', $this->expectCallableNever());
$buffer->on('close', $this->expectCallableOnce());
$buffer->on('close', $this->expectCallableNever());

$buffer->write('foo');

$this->assertTrue($buffer->isWritable());
$buffer->end();
Expand All @@ -261,21 +257,46 @@ public function testEnd()
/**
* @covers React\Stream\Buffer::end
*/
public function testEndWithData()
public function testEndWithDataClosesImmediatelyIfBufferFlushes()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createWriteableLoopMock();
$loop = $this->createLoopMock();

$buffer = new Buffer($stream, $loop);
$buffer->on('error', $this->expectCallableNever());
$buffer->on('close', $this->expectCallableOnce());

$this->assertTrue($buffer->isWritable());
$buffer->end('final words');
$this->assertFalse($buffer->isWritable());

$buffer->handleWrite();
rewind($stream);
$this->assertSame('final words', stream_get_contents($stream));
}

/**
* @covers React\Stream\Buffer::end
*/
public function testEndWithDataDoesNotCloseImmediatelyIfBufferIsFull()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createLoopMock();

$buffer = new Buffer($stream, $loop);
$buffer->on('error', $this->expectCallableNever());
$buffer->on('close', $this->expectCallableNever());

$buffer->write('foo');

$this->assertTrue($buffer->isWritable());
$buffer->end('final words');
$this->assertFalse($buffer->isWritable());

rewind($stream);
$this->assertSame('', stream_get_contents($stream));
}

/**
* @covers React\Stream\Buffer::isWritable
* @covers React\Stream\Buffer::close
Expand All @@ -301,13 +322,14 @@ public function testClose()
public function testWritingToClosedBufferShouldNotWriteToStream()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createWriteableLoopMock();
$loop = $this->createLoopMock();

$buffer = new Buffer($stream, $loop);
$buffer->close();

$buffer->write('foo');

$buffer->handleWrite();
rewind($stream);
$this->assertSame('', stream_get_contents($stream));
}
Expand Down Expand Up @@ -346,7 +368,7 @@ public function testWritingToClosedStream()
}

list($a, $b) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
$loop = $this->createWriteableLoopMock();
$loop = $this->createLoopMock();

$error = null;

Expand All @@ -356,9 +378,11 @@ public function testWritingToClosedStream()
});

$buffer->write('foo');
$buffer->handleWrite();
stream_socket_shutdown($b, STREAM_SHUT_RD);
stream_socket_shutdown($a, STREAM_SHUT_RD);
$buffer->write('bar');
$buffer->handleWrite();

$this->assertInstanceOf('Exception', $error);
$this->assertSame('Unable to write to stream: fwrite(): send of 3 bytes failed with errno=32 Broken pipe', $error->getMessage());
Expand Down
16 changes: 2 additions & 14 deletions tests/UtilTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ public function testPipeWithBuffer()
$readable = new Stub\ReadableStreamStub();

$stream = fopen('php://temp', 'r+');
$loop = $this->createWriteableLoopMock();
$loop = $this->createLoopMock();
$buffer = new Buffer($stream, $loop);

$readable->pipe($buffer);

$readable->write('hello, I am some ');
$readable->write('random data');

$buffer->handleWrite();
rewind($stream);
$this->assertSame('hello, I am some random data', stream_get_contents($stream));
}
Expand All @@ -132,19 +133,6 @@ public function forwardEventsShouldSetupForwards()
$source->emit('foo', array('bar'));
}

private function createWriteableLoopMock()
{
$loop = $this->createLoopMock();
$loop
->expects($this->any())
->method('addWriteStream')
->will($this->returnCallback(function ($stream, $listener) {
call_user_func($listener, $stream);
}));

return $loop;
}

private function createLoopMock()
{
return $this->getMock('React\EventLoop\LoopInterface');
Expand Down