diff --git a/client/src/main/java/io/split/client/events/EventsStorage.java b/client/src/main/java/io/split/client/events/EventsStorage.java new file mode 100644 index 000000000..ede0d0aae --- /dev/null +++ b/client/src/main/java/io/split/client/events/EventsStorage.java @@ -0,0 +1,4 @@ +package io.split.client.events; + +public interface EventsStorage extends EventsStorageConsumer, EventsStorageProducer{ +} diff --git a/client/src/main/java/io/split/client/events/EventsStorageConsumer.java b/client/src/main/java/io/split/client/events/EventsStorageConsumer.java new file mode 100644 index 000000000..3e6b613ef --- /dev/null +++ b/client/src/main/java/io/split/client/events/EventsStorageConsumer.java @@ -0,0 +1,5 @@ +package io.split.client.events; + +public interface EventsStorageConsumer { + WrappedEvent pop(); +} diff --git a/client/src/main/java/io/split/client/events/EventsStorageProducer.java b/client/src/main/java/io/split/client/events/EventsStorageProducer.java new file mode 100644 index 000000000..ab9863104 --- /dev/null +++ b/client/src/main/java/io/split/client/events/EventsStorageProducer.java @@ -0,0 +1,7 @@ +package io.split.client.events; + +import io.split.client.dtos.Event; + +public interface EventsStorageProducer { + boolean track(Event event, int eventSize); +} diff --git a/client/src/main/java/io/split/client/events/InMemoryEventsStorage.java b/client/src/main/java/io/split/client/events/InMemoryEventsStorage.java new file mode 100644 index 000000000..2d2aa9b1e --- /dev/null +++ b/client/src/main/java/io/split/client/events/InMemoryEventsStorage.java @@ -0,0 +1,66 @@ +package io.split.client.events; + + +import com.google.common.annotations.VisibleForTesting; +import io.split.client.dtos.Event; +import io.split.telemetry.domain.enums.EventsDataRecordsEnum; +import io.split.telemetry.storage.TelemetryRuntimeProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static com.google.gson.internal.$Gson$Preconditions.checkNotNull; + +public class InMemoryEventsStorage implements EventsStorage{ + + private static final Logger _log = LoggerFactory.getLogger(InMemoryEventsStorage.class); + private final BlockingQueue _eventQueue; + private final int _maxQueueSize; + private final TelemetryRuntimeProducer _telemetryRuntimeProducer; + + public InMemoryEventsStorage(int maxQueueSize, TelemetryRuntimeProducer telemetryRuntimeProducer) { + _eventQueue = new LinkedBlockingQueue<>(maxQueueSize); + _maxQueueSize = maxQueueSize; + _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer); + } + + @Override + public WrappedEvent pop() { + try { + return _eventQueue.take(); + } catch (InterruptedException e) { + _log.warn("Got interrupted while waiting for an event in the queue."); + } + return null; + } + + @Override + public boolean track(Event event, int eventSize) { + try { + if (event == null) { + return false; + } + if(_eventQueue.offer(new WrappedEvent(event, eventSize))) { + _telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1); + } + else { + _log.warn("Event queue is full, dropping event."); + _telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1); + return false; + } + + } catch (ClassCastException | NullPointerException | IllegalArgumentException e) { + _telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1); + _log.warn("Interruption when adding event withed while adding message %s.", event); + return false; + } + return true; + } + + @VisibleForTesting + int queueSize() { + return _maxQueueSize - _eventQueue.remainingCapacity(); + } +} diff --git a/client/src/main/java/io/split/client/events/WrappedEvent.java b/client/src/main/java/io/split/client/events/WrappedEvent.java new file mode 100644 index 000000000..82c0196e4 --- /dev/null +++ b/client/src/main/java/io/split/client/events/WrappedEvent.java @@ -0,0 +1,21 @@ +package io.split.client.events; + +import io.split.client.dtos.Event; + +public class WrappedEvent { + private final Event _event; + private final long _size; + + public WrappedEvent(Event event, long size) { + _event = event; + _size = size; + } + + public Event event() { + return _event; + } + + public long size() { + return _size; + } +} diff --git a/client/src/test/java/io/split/client/events/InMemoryEventsStorageTest.java b/client/src/test/java/io/split/client/events/InMemoryEventsStorageTest.java new file mode 100644 index 000000000..d5f19fd57 --- /dev/null +++ b/client/src/test/java/io/split/client/events/InMemoryEventsStorageTest.java @@ -0,0 +1,85 @@ +package io.split.client.events; + +import io.split.client.dtos.Event; +import io.split.telemetry.domain.enums.EventsDataRecordsEnum; +import io.split.telemetry.storage.TelemetryRuntimeProducer; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.concurrent.BlockingQueue; + +public class InMemoryEventsStorageTest{ + + @Test + public void testDropEvent() { + TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class); + EventsStorage eventsStorage = new InMemoryEventsStorage(2, telemetryRuntimeProducer); + + for (int i = 0; i < 3; ++i) { + Event event = new Event(); + eventsStorage.track(event, 1); + } + + Mockito.verify(telemetryRuntimeProducer, Mockito.times(2)).recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1); + Mockito.verify(telemetryRuntimeProducer, Mockito.times(1)).recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1); + } + + @Test + public void testTrackAndPop() { + TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class); + InMemoryEventsStorage eventsStorage = new InMemoryEventsStorage(10, telemetryRuntimeProducer); + + for (int i = 0; i < 5; ++i) { + Event event = new Event(); + eventsStorage.track(event, 1); + } + + Assert.assertEquals(5, eventsStorage.queueSize()); + Assert.assertNotNull(eventsStorage.pop()); + } + + @Test + public void testPopFailed() throws NoSuchFieldException, IllegalAccessException, InterruptedException { + TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class); + BlockingQueue blockingQueue = Mockito.mock(BlockingQueue.class); + EventsStorage eventsStorage = new InMemoryEventsStorage(2, telemetryRuntimeProducer); + Field eventsQueue = InMemoryEventsStorage.class.getDeclaredField("_eventQueue"); + eventsQueue.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(eventsQueue, eventsQueue.getModifiers() & ~Modifier.FINAL); + eventsQueue.set(eventsStorage, blockingQueue); + Mockito.when(blockingQueue.take()).thenThrow(new InterruptedException()); + Assert.assertNull(eventsStorage.pop()); + } + + @Test + public void testTrackException() throws NoSuchFieldException, IllegalAccessException, InterruptedException { + TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class); + BlockingQueue blockingQueue = Mockito.mock(BlockingQueue.class); + EventsStorage eventsStorage = new InMemoryEventsStorage(2, telemetryRuntimeProducer); + + Field eventsQueue = InMemoryEventsStorage.class.getDeclaredField("_eventQueue"); + eventsQueue.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(eventsQueue, eventsQueue.getModifiers() & ~Modifier.FINAL); + eventsQueue.set(eventsStorage, blockingQueue); + Mockito.when(blockingQueue.offer(Mockito.anyObject())).thenThrow(new ClassCastException()); + + + Assert.assertEquals(false, eventsStorage.track(new Event(), 1)); + Mockito.verify(telemetryRuntimeProducer, Mockito.times(1)).recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1); + } + + @Test + public void testEventNullThenFalse() { + TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class); + BlockingQueue blockingQueue = Mockito.mock(BlockingQueue.class); + EventsStorage eventsStorage = new InMemoryEventsStorage(2, telemetryRuntimeProducer); + Assert.assertEquals(false, eventsStorage.track(null, 1)); + } +} \ No newline at end of file