Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public synchronized void destroy() {
_log.info("Shutdown called for split");
try {
long splitCount = _splitCache.getAll().stream().count();
long segmentCount = _segmentCache.getAll().stream().count();
long segmentCount = _segmentCache.getSegmentCount();
long segmentKeyCount = _segmentCache.getKeyCount();
_impressionsManager.close();
_log.info("Successful shutdown of impressions manager");
Expand Down
10 changes: 5 additions & 5 deletions client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.split.client.jmx;

import io.split.storages.SegmentCache;
import io.split.client.SplitClient;
import io.split.engine.common.FetchOptions;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.SegmentCacheConsumer;
import io.split.storages.SplitCacheConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -23,14 +23,14 @@ public class SplitJmxMonitor implements SplitJmxMonitorMBean {
private final SplitFetcher _featureFetcher;
private final SplitCacheConsumer _splitCacheConsumer;
private final SegmentSynchronizationTask _segmentSynchronizationTask;
private SegmentCache _segmentCache;
private SegmentCacheConsumer segmentCacheConsumer;

public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCacheConsumer splitCacheConsumer, SegmentSynchronizationTask segmentSynchronizationTask, SegmentCache segmentCache) {
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCacheConsumer splitCacheConsumer, SegmentSynchronizationTask segmentSynchronizationTask, SegmentCacheConsumer segmentCacheConsumer) {
_client = checkNotNull(splitClient);
_featureFetcher = checkNotNull(featureFetcher);
_splitCacheConsumer = checkNotNull(splitCacheConsumer);
_segmentSynchronizationTask = checkNotNull(segmentSynchronizationTask);
_segmentCache = checkNotNull(segmentCache);
this.segmentCacheConsumer = checkNotNull(segmentCacheConsumer);
}

@Override
Expand Down Expand Up @@ -67,6 +67,6 @@ public String fetchDefinition(String featureName) {

@Override
public boolean isKeyInSegment(String key, String segmentName) {
return _segmentCache.isInSegment(segmentName, key);
return segmentCacheConsumer.isInSegment(segmentName, key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.split.storages.SegmentCache;
import io.split.client.ApiKeyCounter;
import io.split.client.SplitClientConfig;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
Expand Down Expand Up @@ -83,7 +83,7 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
String streamingServiceUrl,
int authRetryBackOffBase,
CloseableHttpClient sseHttpClient,
SegmentCache segmentCache,
SegmentCacheProducer segmentCacheProducer,
int streamingRetryDelay,
int maxOnDemandFetchRetries,
int failedAttemptsBeforeLogging,
Expand All @@ -97,7 +97,7 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
splitFetcher,
segmentSynchronizationTaskImp,
splitCacheProducer,
segmentCache,
segmentCacheProducer,
streamingRetryDelay,
maxOnDemandFetchRetries,
failedAttemptsBeforeLogging,
Expand Down
12 changes: 6 additions & 6 deletions client/src/main/java/io/split/engine/common/SynchronizerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.split.storages.SegmentCache;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,7 +31,7 @@ public class SynchronizerImp implements Synchronizer {
private final SplitFetcher _splitFetcher;
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
private final SplitCacheProducer _splitCacheProducer;
private final SegmentCache _segmentCache;
private final SegmentCacheProducer segmentCacheProducer;
private final int _onDemandFetchRetryDelayMs;
private final int _onDemandFetchMaxRetries;
private final int _failedAttemptsBeforeLogging;
Expand All @@ -43,7 +43,7 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
SplitFetcher splitFetcher,
SegmentSynchronizationTask segmentSynchronizationTaskImp,
SplitCacheProducer splitCacheProducer,
SegmentCache segmentCache,
SegmentCacheProducer segmentCacheProducer,
int onDemandFetchRetryDelayMs,
int onDemandFetchMaxRetries,
int failedAttemptsBeforeLogging,
Expand All @@ -53,7 +53,7 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
_splitFetcher = checkNotNull(splitFetcher);
_segmentSynchronizationTaskImp = checkNotNull(segmentSynchronizationTaskImp);
_splitCacheProducer = checkNotNull(splitCacheProducer);
_segmentCache = checkNotNull(segmentCache);
this.segmentCacheProducer = checkNotNull(segmentCacheProducer);
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
_cdnResponseHeadersLogging = cdnResponseHeadersLogging;
_onDemandFetchMaxRetries = onDemandFetchMaxRetries;
Expand Down Expand Up @@ -189,7 +189,7 @@ public SyncResult attemptSegmentSync(String segmentName,
while(true) {
remainingAttempts--;
fetcher.fetch(opts);
if (targetChangeNumber <= _segmentCache.getChangeNumber(segmentName)) {
if (targetChangeNumber <= segmentCacheProducer.getChangeNumber(segmentName)) {
return new SyncResult(true, remainingAttempts);
} else if (remainingAttempts <= 0) {
return new SyncResult(false, remainingAttempts);
Expand All @@ -207,7 +207,7 @@ public SyncResult attemptSegmentSync(String segmentName,
@Override
public void refreshSegment(String segmentName, long targetChangeNumber) {

if (targetChangeNumber <= _segmentCache.getChangeNumber(segmentName)) {
if (targetChangeNumber <= segmentCacheProducer.getChangeNumber(segmentName)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.split.engine.experiments;

import com.google.common.collect.Lists;
import io.split.storages.SegmentCache;
import io.split.client.dtos.Condition;
import io.split.client.dtos.Matcher;
import io.split.client.dtos.MatcherGroup;
Expand All @@ -28,6 +27,7 @@
import io.split.engine.matchers.strings.StartsWithAnyOfMatcher;
import io.split.engine.matchers.strings.WhitelistMatcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.SegmentCacheConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,12 +46,12 @@ public final class SplitParser {
private static final Logger _log = LoggerFactory.getLogger(SplitParser.class);

private final SegmentSynchronizationTask _segmentSynchronizationTask;
private final SegmentCache _segmentCache;
private final SegmentCacheConsumer _segmentCacheConsumer;

public SplitParser(SegmentSynchronizationTask segmentSynchronizationTaskImp,
SegmentCache segmentCache) {
SegmentCacheConsumer segmentCacheConsumer) {
_segmentSynchronizationTask = checkNotNull(segmentSynchronizationTaskImp);
_segmentCache = checkNotNull(segmentCache);
_segmentCacheConsumer = checkNotNull(segmentCacheConsumer);
}

public ParsedSplit parse(Split split) {
Expand Down Expand Up @@ -103,7 +103,7 @@ private AttributeMatcher toMatcher(Matcher matcher) {
checkNotNull(matcher.userDefinedSegmentMatcherData);
String segmentName = matcher.userDefinedSegmentMatcherData.segmentName;
_segmentSynchronizationTask.initializeSegment(segmentName);
delegate = new UserDefinedSegmentMatcher(_segmentCache, segmentName);
delegate = new UserDefinedSegmentMatcher(_segmentCacheConsumer, segmentName);
break;
case WHITELIST:
checkNotNull(matcher.whitelistMatcherData);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.split.engine.matchers;

import io.split.storages.SegmentCache;
import io.split.engine.evaluator.Evaluator;
import io.split.storages.SegmentCacheConsumer;

import java.util.Map;

Expand All @@ -16,10 +16,10 @@
*/
public class UserDefinedSegmentMatcher implements Matcher {
private final String _segmentName;
private final SegmentCache _segmentCache;
private final SegmentCacheConsumer _segmentCacheConsumer;

public UserDefinedSegmentMatcher(SegmentCache segmentCache, String segmentName) {
_segmentCache = checkNotNull(segmentCache);
public UserDefinedSegmentMatcher(SegmentCacheConsumer segmentCacheConsumer, String segmentName) {
_segmentCacheConsumer = checkNotNull(segmentCacheConsumer);
_segmentName = checkNotNull(segmentName);
}

Expand All @@ -30,7 +30,7 @@ public boolean match(Object matchValue, String bucketingKey, Map<String, Object>
return false;
}

return _segmentCache.isInSegment(_segmentName, (String) matchValue);
return _segmentCacheConsumer.isInSegment(_segmentName, (String) matchValue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.split.engine.segments;

import com.google.common.annotations.VisibleForTesting;
import io.split.storages.SegmentCache;
import io.split.client.dtos.SegmentChange;
import io.split.engine.SDKReadinessGates;
import io.split.storages.SegmentCacheProducer;
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import io.split.engine.common.FetchOptions;
Expand All @@ -20,20 +20,20 @@ public class SegmentFetcherImp implements SegmentFetcher {

private final String _segmentName;
private final SegmentChangeFetcher _segmentChangeFetcher;
private final SegmentCache _segmentCache;
private final SegmentCacheProducer _segmentCacheProducer;
private final SDKReadinessGates _gates;
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;

private final Object _lock = new Object();

public SegmentFetcherImp(String segmentName, SegmentChangeFetcher segmentChangeFetcher, SDKReadinessGates gates, SegmentCache segmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer) {
public SegmentFetcherImp(String segmentName, SegmentChangeFetcher segmentChangeFetcher, SDKReadinessGates gates, SegmentCacheProducer segmentCacheProducer, TelemetryRuntimeProducer telemetryRuntimeProducer) {
_segmentName = checkNotNull(segmentName);
_segmentChangeFetcher = checkNotNull(segmentChangeFetcher);
_segmentCache = checkNotNull(segmentCache);
_segmentCacheProducer = checkNotNull(segmentCacheProducer);
_gates = checkNotNull(gates);
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);

_segmentCache.updateSegment(segmentName, new ArrayList<>(), new ArrayList<>());
_segmentCacheProducer.updateSegment(segmentName, new ArrayList<>(), new ArrayList<>());
}

@Override
Expand All @@ -50,39 +50,39 @@ public void fetch(FetchOptions opts){

private void runWithoutExceptionHandling(FetchOptions options) {
long initTime = System.currentTimeMillis();
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName), options);
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCacheProducer.getChangeNumber(_segmentName), options);

if (change == null) {
throw new IllegalStateException("SegmentChange was null");
}

if (change.till == _segmentCache.getChangeNumber(_segmentName)) {
if (change.till == _segmentCacheProducer.getChangeNumber(_segmentName)) {
// no change.
return;
}

if (change.since != _segmentCache.getChangeNumber(_segmentName)
|| change.since < _segmentCache.getChangeNumber(_segmentName)) {
if (change.since != _segmentCacheProducer.getChangeNumber(_segmentName)
|| change.since < _segmentCacheProducer.getChangeNumber(_segmentName)) {
// some other thread may have updated the shared state. exit
return;
}


if (change.added.isEmpty() && change.removed.isEmpty()) {
// there are no changes. weird!
_segmentCache.setChangeNumber(_segmentName,change.till);
_segmentCacheProducer.setChangeNumber(_segmentName,change.till);
return;
}

synchronized (_lock) {
// check state one more time.
if (change.since != _segmentCache.getChangeNumber(_segmentName)
|| change.till < _segmentCache.getChangeNumber(_segmentName)) {
if (change.since != _segmentCacheProducer.getChangeNumber(_segmentName)
|| change.till < _segmentCacheProducer.getChangeNumber(_segmentName)) {
// some other thread may have updated the shared state. exit
return;
}
//updateSegment(sn, toadd, tormv, chngN)
_segmentCache.updateSegment(_segmentName,change.added, change.removed);
_segmentCacheProducer.updateSegment(_segmentName,change.added, change.removed);

if (!change.added.isEmpty()) {
_log.info(_segmentName + " added keys: " + summarize(change.added));
Expand All @@ -92,7 +92,7 @@ private void runWithoutExceptionHandling(FetchOptions options) {
_log.info(_segmentName + " removed keys: " + summarize(change.removed));
}

_segmentCache.setChangeNumber(_segmentName,change.till);
_segmentCacheProducer.setChangeNumber(_segmentName,change.till);
_telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, System.currentTimeMillis());
}
}
Expand All @@ -119,14 +119,14 @@ private String summarize(List<String> changes) {

@VisibleForTesting
void callLoopRun(FetchOptions opts){
final long INITIAL_CN = _segmentCache.getChangeNumber(_segmentName);
final long INITIAL_CN = _segmentCacheProducer.getChangeNumber(_segmentName);
while (true) {
long start = _segmentCache.getChangeNumber(_segmentName);
long start = _segmentCacheProducer.getChangeNumber(_segmentName);
runWithoutExceptionHandling(opts);
if (INITIAL_CN == start) {
opts = new FetchOptions.Builder(opts).targetChangeNumber(FetchOptions.DEFAULT_TARGET_CHANGENUMBER).build();
}
long end = _segmentCache.getChangeNumber(_segmentName);
long end = _segmentCacheProducer.getChangeNumber(_segmentName);
if (start >= end) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.split.storages.SegmentCache;
import io.split.engine.SDKReadinessGates;
import io.split.storages.SegmentCacheProducer;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -32,14 +32,14 @@ public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask
private final AtomicBoolean _running;
private final Object _lock = new Object();
private final ConcurrentMap<String, SegmentFetcher> _segmentFetchers = Maps.newConcurrentMap();
private final SegmentCache _segmentCache;
private final SegmentCacheProducer _segmentCacheProducer;
private final SDKReadinessGates _gates;
private final ScheduledExecutorService _scheduledExecutorService;
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;

private ScheduledFuture<?> _scheduledFuture;

public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads, SDKReadinessGates gates, SegmentCache segmentCache,
public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads, SDKReadinessGates gates, SegmentCacheProducer segmentCacheProducer,
TelemetryRuntimeProducer telemetryRuntimeProducer) {
_segmentChangeFetcher = checkNotNull(segmentChangeFetcher);

Expand All @@ -57,7 +57,7 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher,

_running = new AtomicBoolean(false);

_segmentCache = checkNotNull(segmentCache);
_segmentCacheProducer = checkNotNull(segmentCacheProducer);
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
}

Expand All @@ -82,7 +82,7 @@ public void initializeSegment(String segmentName) {
return;
}

segment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _gates, _segmentCache, _telemetryRuntimeProducer);
segment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _gates, _segmentCacheProducer, _telemetryRuntimeProducer);

if (_running.get()) {
_scheduledExecutorService.submit(segment::fetchAll);
Expand Down
Loading