18
18
19
19
import com .google .common .base .MoreObjects ;
20
20
import java .io .IOException ;
21
- import java .nio .Buffer ;
22
21
import java .nio .ByteBuffer ;
23
22
import java .nio .channels .GatheringByteChannel ;
24
23
import java .nio .channels .WritableByteChannel ;
25
24
import java .time .Clock ;
26
25
import java .time .Duration ;
27
26
import java .time .Instant ;
28
- import java .util .Arrays ;
29
27
import java .util .Objects ;
30
28
import java .util .concurrent .locks .ReentrantLock ;
31
29
import java .util .logging .Logger ;
@@ -262,7 +260,7 @@ public int write(ByteBuffer src) throws IOException {
262
260
@ Override
263
261
public long write (ByteBuffer [] srcs , int offset , int length ) throws IOException {
264
262
boolean exception = false ;
265
- long available = Arrays . stream (srcs ). mapToLong ( Buffer :: remaining ). sum ( );
263
+ long available = Buffers . totalRemaining (srcs , offset , length );
266
264
Instant begin = clock .instant ();
267
265
try {
268
266
return delegate .write (srcs , offset , length );
@@ -271,7 +269,7 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
271
269
throw e ;
272
270
} finally {
273
271
Instant end = clock .instant ();
274
- long remaining = Arrays . stream (srcs ). mapToLong ( Buffer :: remaining ). sum ( );
272
+ long remaining = Buffers . totalRemaining (srcs , offset , length );
275
273
Record record = Record .of (available - remaining , begin , end , exception );
276
274
sink .recordThroughput (record );
277
275
}
@@ -280,7 +278,7 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
280
278
@ Override
281
279
public long write (ByteBuffer [] srcs ) throws IOException {
282
280
boolean exception = false ;
283
- long available = Arrays . stream (srcs ). mapToLong ( Buffer :: remaining ). sum ( );
281
+ long available = Buffers . totalRemaining (srcs , 0 , srcs . length );
284
282
Instant begin = clock .instant ();
285
283
try {
286
284
return delegate .write (srcs );
@@ -289,7 +287,7 @@ public long write(ByteBuffer[] srcs) throws IOException {
289
287
throw e ;
290
288
} finally {
291
289
Instant end = clock .instant ();
292
- long remaining = Arrays . stream (srcs ). mapToLong ( Buffer :: remaining ). sum ( );
290
+ long remaining = Buffers . totalRemaining (srcs , 0 , srcs . length );
293
291
Record record = Record .of (available - remaining , begin , end , exception );
294
292
sink .recordThroughput (record );
295
293
}
0 commit comments