Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added streaming token refresh config
  • Loading branch information
chillaq committed Sep 22, 2025
commit 3c7f877cfb7a3fe45e04b596ae8ecceffcd54b4e
4 changes: 2 additions & 2 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.18.1-rc2</version>
<version>4.18.1-rc3</version>
</parent>
<version>4.18.1-rc2</version>
<version>4.18.1-rc3</version>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
<name>Java Client</name>
Expand Down
22 changes: 20 additions & 2 deletions client/src/main/java/io/split/client/SplitClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ private HttpScheme() {
private final CustomHeaderDecorator _customHeaderDecorator;
private final CustomHttpModule _alternativeHTTPModule;

private final int _streamingTokenRefreshRate;

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -170,7 +172,8 @@ private SplitClientConfig(String endpoint,
int invalidSets,
CustomHeaderDecorator customHeaderDecorator,
CustomHttpModule alternativeHTTPModule,
FallbackTreatmentsConfiguration fallbackTreatments) {
FallbackTreatmentsConfiguration fallbackTreatments,
int streamingTokenRefreshRate) {
_endpoint = endpoint;
_eventsEndpoint = eventsEndpoint;
_featuresRefreshRate = pollForFeatureChangesEveryNSeconds;
Expand Down Expand Up @@ -226,6 +229,7 @@ private SplitClientConfig(String endpoint,
_customHeaderDecorator = customHeaderDecorator;
_alternativeHTTPModule = alternativeHTTPModule;
_fallbackTreatments = fallbackTreatments;
_streamingTokenRefreshRate = streamingTokenRefreshRate;

Properties props = new Properties();
try {
Expand Down Expand Up @@ -446,6 +450,8 @@ public boolean isSdkEndpointOverridden() {

public FallbackTreatmentsConfiguration fallbackTreatments() { return _fallbackTreatments; }

public int streamingTokenRefreshRate() { return _streamingTokenRefreshRate; }

public static final class Builder {
private String _endpoint = SDK_ENDPOINT;
private boolean _endpointSet = false;
Expand Down Expand Up @@ -505,6 +511,7 @@ public static final class Builder {
private CustomHeaderDecorator _customHeaderDecorator = null;
private CustomHttpModule _alternativeHTTPModule = null;
private FallbackTreatmentsConfiguration _fallbackTreatments;
private int _streamingTokenRefreshRate = 180;

public Builder() {
}
Expand Down Expand Up @@ -1055,6 +1062,11 @@ public Builder threadFactory(ThreadFactory threadFactory) {
return this;
}

public Builder streamingTokenRefreshRate(int streamingTokenRefreshRate) {
_streamingTokenRefreshRate = streamingTokenRefreshRate;
return this;
}

private void verifyRates() {
if (_featuresRefreshRate < 5 ) {
throw new IllegalArgumentException("featuresRefreshRate must be >= 5: " + _featuresRefreshRate);
Expand All @@ -1071,9 +1083,14 @@ private void verifyRates() {
if (_metricsRefreshRate < 30) {
throw new IllegalArgumentException("metricsRefreshRate must be >= 30: " + _metricsRefreshRate);
}

if(_telemetryRefreshRate < 60) {
throw new IllegalStateException("_telemetryRefreshRate must be >= 60");
}

if (_streamingTokenRefreshRate < 60) {
throw new IllegalStateException("_streamingTokenRefreshRate must be >= 60");
}
}

private void verifyEndPoints() {
Expand Down Expand Up @@ -1274,7 +1291,8 @@ public SplitClientConfig build() {
_invalidSetsCount,
_customHeaderDecorator,
_alternativeHTTPModule,
_fallbackTreatments);
_fallbackTreatments,
_streamingTokenRefreshRate);
}
}
}
33 changes: 25 additions & 8 deletions client/src/main/java/io/split/engine/common/PushManagerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class PushManagerImp implements PushManager {
private final ScheduledExecutorService _scheduledExecutorService;
private AtomicLong _expirationTime;
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
private final int _streamingTokenRefreshRate;

@VisibleForTesting
/* package private */ PushManagerImp(AuthApiClient authApiClient,
Expand All @@ -60,7 +61,8 @@ public class PushManagerImp implements PushManager {
Worker<SegmentQueueDto> segmentWorker,
PushStatusTracker pushStatusTracker,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory) {
ThreadFactory threadFactory,
int streamingTokenRefreshRate) {

_authApiClient = checkNotNull(authApiClient);
_eventSourceClient = checkNotNull(eventSourceClient);
Expand All @@ -70,6 +72,7 @@ public class PushManagerImp implements PushManager {
_expirationTime = new AtomicLong();
_scheduledExecutorService = buildSingleThreadScheduledExecutor(threadFactory, "Split-SSERefreshToken-%d");
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_streamingTokenRefreshRate = streamingTokenRefreshRate;
}

public static PushManagerImp build(Synchronizer synchronizer,
Expand All @@ -83,7 +86,8 @@ public static PushManagerImp build(Synchronizer synchronizer,
SplitCacheProducer splitCacheProducer,
FlagSetsFilter flagSetsFilter,
RuleBasedSegmentCache ruleBasedSegmentCache,
RuleBasedSegmentParser ruleBasedSegmentParser) {
RuleBasedSegmentParser ruleBasedSegmentParser,
int streamingTokenRefreshRate) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer,
ruleBasedSegmentCache, telemetryRuntimeProducer, flagSetsFilter);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
Expand All @@ -96,7 +100,8 @@ public static PushManagerImp build(Synchronizer synchronizer,
segmentWorker,
pushStatusTracker,
telemetryRuntimeProducer,
threadFactory);
threadFactory,
streamingTokenRefreshRate);
}

@Override
Expand All @@ -106,18 +111,22 @@ public void start() {
AuthenticationResponse response = _authApiClient.Authenticate();
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
_expirationTime.set(response.getExpiration());
_expirationTime.set(_streamingTokenRefreshRate);
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
response.getExpiration(), System.currentTimeMillis()));
return;
}

cleanUpResources();
if (response.isRetry()) {
_log.debug(String.format("Handling retry error response"));
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
} else {
_log.debug(String.format("Auth service response is disabled: %s", response.getToken()));
_pushStatusTracker.forcePushDisable();
}
} catch (Exception e) {
_log.debug("Exception in PushManager start: " + e.getMessage());
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -156,14 +165,22 @@ private boolean startSse(String token, String channels) {

@Override
public void startWorkers() {
_featureFlagsWorker.start();
_segmentWorker.start();
try {
_featureFlagsWorker.start();
_segmentWorker.start();
} catch (Exception e) {
_log.debug("Exception in starting workers: " + e.getMessage());
}
}

@Override
public void stopWorkers() {
_featureFlagsWorker.stop();
_segmentWorker.stop();
try {
_featureFlagsWorker.stop();
_segmentWorker.stop();
} catch (Exception e) {
_log.debug("Exception in stopping workers: " + e.getMessage());
}
}

private void cleanUpResources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public static SyncManagerImp build(SplitTasks splitTasks,
splitCacheProducer,
flagSetsFilter,
ruleBasedSegmentCache,
ruleBasedSegmentParser);
ruleBasedSegmentParser,
config.streamingTokenRefreshRate());

return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public boolean isOpen() {
}

public void close() {
_log.debug("closing SSE client");
try {
lock.lock();
_forcedStop.set(true);
Expand All @@ -128,6 +129,8 @@ public void close() {
}
}
}
} catch (Exception e) {
_log.debug("Exception in closing SSE client: " + e.getMessage());
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -184,16 +187,19 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
}
}
} catch (Exception e) { // Any other error non related to the connection disables streaming altogether
_log.debug(String.format("SSE connection exception: %s", e.getMessage()));
_telemetryRuntimeProducer
.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(),
System.currentTimeMillis()));
_log.warn(e.getMessage(), e);
_statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
} finally {
_log.debug(String.format("Attempt to close SSE connection"));
try {
_ongoingResponse.get().close();
} catch (IOException e) {
_log.debug(String.format("SSE connection closing exception: %s", e.getMessage()));
_log.debug(e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void setUp() {
_segmentsWorkerImp,
_pushStatusTracker,
_telemetryStorage,
null);
null,
180);
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions okhttp-modules/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
<parent>
<artifactId>java-client-parent</artifactId>
<groupId>io.split.client</groupId>
<version>4.18.1-rc2</version>
<version>4.18.1-rc3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>4.18.1-rc2</version>
<version>4.18.1-rc3</version>
<artifactId>okhttp-modules</artifactId>
<packaging>jar</packaging>
<name>http-modules</name>
Expand Down
2 changes: 1 addition & 1 deletion pluggable-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>java-client-parent</artifactId>
<groupId>io.split.client</groupId>
<version>4.18.1-rc2</version>
<version>4.18.1-rc3</version>
</parent>

<version>2.1.0</version>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.18.1-rc2</version>
<version>4.18.1-rc3</version>
<dependencyManagement>
<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion redis-wrapper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>java-client-parent</artifactId>
<groupId>io.split.client</groupId>
<version>4.18.1-rc2</version>
<version>4.18.1-rc3</version>
</parent>
<artifactId>redis-wrapper</artifactId>
<version>3.1.1</version>
Expand Down
4 changes: 2 additions & 2 deletions testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.18.1-rc2</version>
<version>4.18.1-rc3</version>
</parent>
<artifactId>java-client-testing</artifactId>
<packaging>jar</packaging>
<version>4.18.1-rc2</version>
<version>4.18.1-rc3</version>
<name>Java Client For Testing</name>
<description>Testing suite for Java SDK for Split</description>
<dependencies>
Expand Down