Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 125 additions & 16 deletions src/core/components/subscription_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import categoryConstants from '../constants/categories';
type SubscribeArgs = {
channels: Array<string>,
channelGroups: Array<string>,
heartbeatChannels: Array<string>,
heartbeatChannelGroups: Array<string>,
withPresence: ?boolean,
timetoken: ?number
}

type UnsubscribeArgs = {
channels: Array<string>,
channelGroups: Array<string>
channelGroups: Array<string>,
heartbeatChannels: Array<string>,
heartbeatChannelGroups: Array<string>
}

type StateArgs = {
Expand Down Expand Up @@ -52,6 +56,9 @@ export default class {
_channels: Object;
_presenceChannels: Object;

_heartbeatChannels: Object;
_heartbeatChannelGroups: Object;

_channelGroups: Object;
_presenceChannelGroups: Object;

Expand All @@ -72,6 +79,9 @@ export default class {
// store pending connection elements
_pendingChannelSubscriptions: Array<string>;
_pendingChannelGroupSubscriptions: Array<string>;

_pendingHeartbeatChannels: Array<string>;
_pendingHeartbeatChannelGroups: Array<string>;
//

_dedupingManager: DedupingManager;
Expand All @@ -90,12 +100,18 @@ export default class {
this._channels = {};
this._presenceChannels = {};

this._heartbeatChannels = {};
this._heartbeatChannelGroups = {};

this._channelGroups = {};
this._presenceChannelGroups = {};

this._pendingChannelSubscriptions = [];
this._pendingChannelGroupSubscriptions = [];

this._pendingHeartbeatChannels = [];
this._pendingHeartbeatChannelGroups = [];

this._currentTimetoken = 0;
this._lastTimetoken = 0;
this._storedTimetoken = null;
Expand Down Expand Up @@ -123,7 +139,7 @@ export default class {
}

adaptSubscribeChange(args: SubscribeArgs) {
const { timetoken, channels = [], channelGroups = [], withPresence = false } = args;
const { timetoken, channels = [], channelGroups = [], heartbeatChannels = [], heartbeatChannelGroups = [], withPresence = false } = args;

if (!this._config.subscribeKey || this._config.subscribeKey === '') {
if (console && console.log) console.log('subscribe key missing; aborting subscribe') //eslint-disable-line
Expand All @@ -141,17 +157,35 @@ export default class {
this._currentTimetoken = 0;
}

heartbeatChannels.forEach((channel: string) => {
this._heartbeatChannels[channel] = { state: {} };
this._pendingHeartbeatChannels.push(channel);
});

heartbeatChannelGroups.forEach((channelGroup: string) => {
this._heartbeatChannelGroups[channelGroup] = { state: {} };
this._pendingHeartbeatChannelGroups.push(channelGroup);
});

channels.forEach((channel: string) => {
this._channels[channel] = { state: {} };
if (withPresence) this._presenceChannels[channel] = {};

if (!(channel in this._heartbeatChannels)) {
this._heartbeatChannels[channel] = this._channels[channel];
}

this._pendingChannelSubscriptions.push(channel);
});

channelGroups.forEach((channelGroup: string) => {
this._channelGroups[channelGroup] = { state: {} };
if (withPresence) this._presenceChannelGroups[channelGroup] = {};

if (!(channelGroup in this._heartbeatChannelGroups)) {
this._heartbeatChannelGroups[channelGroup] = this._channelGroups[channelGroup];
}

this._pendingChannelGroupSubscriptions.push(channelGroup);
});

Expand All @@ -160,14 +194,30 @@ export default class {
}

adaptUnsubscribeChange(args: UnsubscribeArgs, isOffline: boolean) {
const { channels = [], channelGroups = [] } = args;
const { channels = [], channelGroups = [], heartbeatChannels = [], heartbeatChannelGroups = [] } = args;

// keep track of which channels and channel groups
// we are going to unsubscribe from.
const actualChannels = [];
const actualChannelGroups = [];
const actualHeartbeatChannels = [];
const actualHeartbeatChannelGroups = [];
//

heartbeatChannels.forEach((channel) => {
if (channel in this._heartbeatChannels) {
delete this._heartbeatChannels[channel];
actualHeartbeatChannels.push(channel);
}
});

heartbeatChannelGroups.forEach((channelGroup) => {
if (channelGroup in this._heartbeatChannelGroups) {
delete this._heartbeatChannelGroups[channelGroup];
actualHeartbeatChannelGroups.push(channelGroup);
}
});

channels.forEach((channel) => {
if (channel in this._channels) {
delete this._channels[channel];
Expand All @@ -177,6 +227,9 @@ export default class {
delete this._presenceChannels[channel];
actualChannels.push(channel);
}
if (channel in this._heartbeatChannels) {
delete this._heartbeatChannels[channel];
}
});

channelGroups.forEach((channelGroup) => {
Expand All @@ -188,17 +241,42 @@ export default class {
delete this._channelGroups[channelGroup];
actualChannelGroups.push(channelGroup);
}

if (channelGroup in this._heartbeatChannelGroups) {
delete this._heartbeatChannelGroups[channelGroup];
}
});

// no-op if there are no channels and cg's to unsubscribe from.
if (actualChannels.length === 0 && actualChannelGroups.length === 0) {
if (actualChannels.length === 0 && actualChannelGroups.length === 0 && actualHeartbeatChannels.length === 0 && actualHeartbeatChannelGroups.length === 0) {
return;
}

if (this._config.suppressLeaveEvents === false && !isOffline) {
this._leaveEndpoint({ channels: actualChannels, channelGroups: actualChannelGroups }, (status) => {
let _actualChannels = actualChannels.map(channel => channel);
let _actualChannelGroups = actualChannelGroups.map(channelGroup => channelGroup);

actualHeartbeatChannels.forEach((channel) => {
if (!_actualChannels.includes(channel)) {
_actualChannels.push(channel);
}
});

actualHeartbeatChannelGroups.forEach((channelGroup) => {
if (!_actualChannelGroups.includes(channelGroup)) {
_actualChannelGroups.push(channelGroup);
}
});

this._leaveEndpoint({ channels: _actualChannels, channelGroups: _actualChannelGroups }, (status) => {
status.affectedChannels = actualChannels;
status.affectedChannelGroups = actualChannelGroups;

if (heartbeatChannels.length > 0 || heartbeatChannelGroups.length > 0) {
status.affectedHeartbeatChannels = actualHeartbeatChannels;
status.affectedHeartbeatChannelGroups = actualHeartbeatChannelGroups;
}

status.currentTimetoken = this._currentTimetoken;
status.lastTimetoken = this._lastTimetoken;
this._listenerManager.announceStatus(status);
Expand All @@ -221,7 +299,20 @@ export default class {
}

unsubscribeAll(isOffline: boolean) {
this.adaptUnsubscribeChange({ channels: this.getSubscribedChannels(), channelGroups: this.getSubscribedChannelGroups() }, isOffline);
this.adaptUnsubscribeChange({
channels: this.getSubscribedChannels(),
channelGroups: this.getSubscribedChannelGroups(),
heartbeatChannels: this.getHeartbeatChannels(),
heartbeatChannelGroups: this.getHeartbeatChannelGroups()
}, isOffline);
}

getHeartbeatChannels(): Array<string> {
return Object.keys(this._heartbeatChannels);
}

getHeartbeatChannelGroups(): Array<string> {
return Object.keys(this._heartbeatChannelGroups);
}

getSubscribedChannels(): Array<string> {
Expand Down Expand Up @@ -263,21 +354,21 @@ export default class {
}

_performHeartbeatLoop() {
let presenceChannels = Object.keys(this._channels);
let presenceChannelGroups = Object.keys(this._channelGroups);
let heartbeatChannels = Object.keys(this._heartbeatChannels);
let heartbeatChannelGroups = Object.keys(this._heartbeatChannelGroups);
let presenceState = {};

if (presenceChannels.length === 0 && presenceChannelGroups.length === 0) {
if (heartbeatChannels.length === 0 && heartbeatChannelGroups.length === 0) {
return;
}

presenceChannels.forEach((channel) => {
let channelState = this._channels[channel].state;
heartbeatChannels.forEach((channel) => {
let channelState = this._heartbeatChannels[channel].state;
if (Object.keys(channelState).length) presenceState[channel] = channelState;
});

presenceChannelGroups.forEach((channelGroup) => {
let channelGroupState = this._channelGroups[channelGroup].state;
heartbeatChannelGroups.forEach((channelGroup) => {
let channelGroupState = this._heartbeatChannelGroups[channelGroup].state;
if (Object.keys(channelGroupState).length) presenceState[channelGroup] = channelGroupState;
});

Expand All @@ -299,23 +390,28 @@ export default class {
};

this._heartbeatEndpoint({
channels: presenceChannels,
channelGroups: presenceChannelGroups,
channels: heartbeatChannels,
channelGroups: heartbeatChannelGroups,
state: presenceState }, onHeartbeat.bind(this));
}

_startSubscribeLoop() {
this._stopSubscribeLoop();
let channels = [];
let channelGroups = [];
let heartbeatChannels = [];
let heartbeatChannelGroups = [];

Object.keys(this._channels).forEach(channel => channels.push(channel));
Object.keys(this._presenceChannels).forEach(channel => channels.push(`${channel}-pnpres`));

Object.keys(this._channelGroups).forEach(channelGroup => channelGroups.push(channelGroup));
Object.keys(this._presenceChannelGroups).forEach(channelGroup => channelGroups.push(`${channelGroup}-pnpres`));

if (channels.length === 0 && channelGroups.length === 0) {
Object.keys(this._heartbeatChannels).forEach(channel => heartbeatChannels.push(channel));
Object.keys(this._heartbeatChannelGroups).forEach(channelGroup => heartbeatChannelGroups.push(channelGroup));

if (channels.length === 0 && channelGroups.length === 0 && heartbeatChannels.length === 0 && heartbeatChannelGroups.length === 0) {
return;
}

Expand Down Expand Up @@ -381,10 +477,21 @@ export default class {
}

if (!this._subscriptionStatusAnnounced) {
const conciliationChannels = this.getHeartbeatChannels().every(channel => this.getSubscribedChannels().includes(channel));
const conciliationChannelGroups = this.getHeartbeatChannelGroups().every(channelGroup => this.getSubscribedChannelGroups().includes(channelGroup));

let connectedAnnounce: StatusAnnouncement = {};
connectedAnnounce.category = categoryConstants.PNConnectedCategory;
connectedAnnounce.operation = status.operation;
connectedAnnounce.affectedChannels = this._pendingChannelSubscriptions;

if (!conciliationChannels || !conciliationChannelGroups) {
connectedAnnounce.affectedHeartbeatChannels = this._pendingHeartbeatChannels;
connectedAnnounce.affectedHeartbeatChannelGroups = this._pendingHeartbeatChannelGroups;
connectedAnnounce.heartbeatChannels = this.getHeartbeatChannels();
connectedAnnounce.heartbeatChannelGroups = this.getHeartbeatChannelGroups();
}

connectedAnnounce.subscribedChannels = this.getSubscribedChannels();
connectedAnnounce.affectedChannelGroups = this._pendingChannelGroupSubscriptions;
connectedAnnounce.lastTimetoken = this._lastTimetoken;
Expand All @@ -395,6 +502,8 @@ export default class {
// clear the pending connections list
this._pendingChannelSubscriptions = [];
this._pendingChannelGroupSubscriptions = [];
this._pendingHeartbeatChannels = [];
this._pendingHeartbeatChannelGroups = [];
}

let messages = payload.messages || [];
Expand Down
44 changes: 44 additions & 0 deletions test/integration/components/subscription_manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,50 @@ describe('#components/subscription_manager', () => {
pubnubWithPassingHeartbeats.subscribe({ channels: ['ch1', 'ch2'], withPresence: true });
});

it('reports when heartbeats pass with heartbeatChannels', (done) => {
const scope = utils.createNock().get('/v2/presence/sub-key/mySubKey/channel/ch1%2Cch2/heartbeat')
.query({ pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`, uuid: 'myUUID', heartbeat: 300, state: '{}' })
.reply(200, '{"status": 200, "message": "OK", "service": "Presence"}');

pubnubWithPassingHeartbeats.addListener({
status(statusPayload) {
if (statusPayload.operation !== PubNub.OPERATIONS.PNHeartbeatOperation) return;

assert.equal(scope.isDone(), true);
assert.deepEqual({
error: false,
operation: 'PNHeartbeatOperation',
statusCode: 200
}, statusPayload);
done();
}
});

pubnubWithPassingHeartbeats.subscribe({ heartbeatChannels: ['ch1', 'ch2'] });
});

it('reports when heartbeats pass with heartbeatChannelGroups', (done) => {
const scope = utils.createNock().get('/v2/presence/sub-key/mySubKey/channel/%2C/heartbeat')
.query({ pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`, uuid: 'myUUID', heartbeat: 300, state: '{}', 'channel-group': 'cg1' })
.reply(200, '{"status": 200, "message": "OK", "service": "Presence"}');

pubnubWithPassingHeartbeats.addListener({
status(statusPayload) {
if (statusPayload.operation !== PubNub.OPERATIONS.PNHeartbeatOperation) return;

assert.equal(scope.isDone(), true);
assert.deepEqual({
error: false,
operation: 'PNHeartbeatOperation',
statusCode: 200
}, statusPayload);
done();
}
});

pubnubWithPassingHeartbeats.subscribe({ heartbeatChannelGroups: ['cg1'] });
});

it('reports when the queue is beyond set threshold', (done) => {
const scope = utils.createNock().get('/v2/subscribe/mySubKey/ch1%2Cch2%2Cch1-pnpres%2Cch2-pnpres/0')
.query({ pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`, uuid: 'myUUID', heartbeat: 300 })
Expand Down