Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.split.client.events;

public interface EventsStorage extends EventsStorageConsumer, EventsStorageProducer{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.split.client.events;

public interface EventsStorageConsumer {
WrappedEvent pop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.split.client.events;

import io.split.client.dtos.Event;

public interface EventsStorageProducer {
boolean track(Event event, int eventSize);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.split.client.events;


import com.google.common.annotations.VisibleForTesting;
import io.split.client.dtos.Event;
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import static com.google.gson.internal.$Gson$Preconditions.checkNotNull;

public class InMemoryEventsStorage implements EventsStorage{

private static final Logger _log = LoggerFactory.getLogger(InMemoryEventsStorage.class);
private final BlockingQueue<WrappedEvent> _eventQueue;
private final int _maxQueueSize;
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;

public InMemoryEventsStorage(int maxQueueSize, TelemetryRuntimeProducer telemetryRuntimeProducer) {
_eventQueue = new LinkedBlockingQueue<>(maxQueueSize);
_maxQueueSize = maxQueueSize;
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
}

@Override
public WrappedEvent pop() {
try {
return _eventQueue.take();
} catch (InterruptedException e) {
_log.warn("Got interrupted while waiting for an event in the queue.");
}
return null;
}

@Override
public boolean track(Event event, int eventSize) {
try {
if (event == null) {
return false;
}
if(_eventQueue.offer(new WrappedEvent(event, eventSize))) {
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
}
else {
_log.warn("Event queue is full, dropping event.");
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
return false;
}

} catch (ClassCastException | NullPointerException | IllegalArgumentException e) {
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
_log.warn("Interruption when adding event withed while adding message %s.", event);
return false;
}
return true;
}

@VisibleForTesting
int queueSize() {
return _maxQueueSize - _eventQueue.remainingCapacity();
}
}
21 changes: 21 additions & 0 deletions client/src/main/java/io/split/client/events/WrappedEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.split.client.events;

import io.split.client.dtos.Event;

public class WrappedEvent {
private final Event _event;
private final long _size;

public WrappedEvent(Event event, long size) {
_event = event;
_size = size;
}

public Event event() {
return _event;
}

public long size() {
return _size;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.split.client.events;

import io.split.client.dtos.Event;
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.concurrent.BlockingQueue;

public class InMemoryEventsStorageTest{

@Test
public void testDropEvent() {
TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class);
EventsStorage eventsStorage = new InMemoryEventsStorage(2, telemetryRuntimeProducer);

for (int i = 0; i < 3; ++i) {
Event event = new Event();
eventsStorage.track(event, 1);
}

Mockito.verify(telemetryRuntimeProducer, Mockito.times(2)).recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
Mockito.verify(telemetryRuntimeProducer, Mockito.times(1)).recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
}

@Test
public void testTrackAndPop() {
TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class);
InMemoryEventsStorage eventsStorage = new InMemoryEventsStorage(10, telemetryRuntimeProducer);

for (int i = 0; i < 5; ++i) {
Event event = new Event();
eventsStorage.track(event, 1);
}

Assert.assertEquals(5, eventsStorage.queueSize());
Assert.assertNotNull(eventsStorage.pop());
}

@Test
public void testPopFailed() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class);
BlockingQueue blockingQueue = Mockito.mock(BlockingQueue.class);
EventsStorage eventsStorage = new InMemoryEventsStorage(2, telemetryRuntimeProducer);
Field eventsQueue = InMemoryEventsStorage.class.getDeclaredField("_eventQueue");
eventsQueue.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(eventsQueue, eventsQueue.getModifiers() & ~Modifier.FINAL);
eventsQueue.set(eventsStorage, blockingQueue);
Mockito.when(blockingQueue.take()).thenThrow(new InterruptedException());
Assert.assertNull(eventsStorage.pop());
}

@Test
public void testTrackException() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class);
BlockingQueue blockingQueue = Mockito.mock(BlockingQueue.class);
EventsStorage eventsStorage = new InMemoryEventsStorage(2, telemetryRuntimeProducer);

Field eventsQueue = InMemoryEventsStorage.class.getDeclaredField("_eventQueue");
eventsQueue.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(eventsQueue, eventsQueue.getModifiers() & ~Modifier.FINAL);
eventsQueue.set(eventsStorage, blockingQueue);
Mockito.when(blockingQueue.offer(Mockito.anyObject())).thenThrow(new ClassCastException());


Assert.assertEquals(false, eventsStorage.track(new Event(), 1));
Mockito.verify(telemetryRuntimeProducer, Mockito.times(1)).recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
}

@Test
public void testEventNullThenFalse() {
TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class);
BlockingQueue blockingQueue = Mockito.mock(BlockingQueue.class);
EventsStorage eventsStorage = new InMemoryEventsStorage(2, telemetryRuntimeProducer);
Assert.assertEquals(false, eventsStorage.track(null, 1));
}
}