diff --git a/src/core/components/subscription_manager.js b/src/core/components/subscription_manager.js index 544d55309..23fb3d458 100644 --- a/src/core/components/subscription_manager.js +++ b/src/core/components/subscription_manager.js @@ -11,13 +11,17 @@ import categoryConstants from '../constants/categories'; type SubscribeArgs = { channels: Array, channelGroups: Array, + heartbeatChannels: Array, + heartbeatChannelGroups: Array, withPresence: ?boolean, timetoken: ?number } type UnsubscribeArgs = { channels: Array, - channelGroups: Array + channelGroups: Array, + heartbeatChannels: Array, + heartbeatChannelGroups: Array } type StateArgs = { @@ -52,6 +56,9 @@ export default class { _channels: Object; _presenceChannels: Object; + _heartbeatChannels: Object; + _heartbeatChannelGroups: Object; + _channelGroups: Object; _presenceChannelGroups: Object; @@ -72,6 +79,9 @@ export default class { // store pending connection elements _pendingChannelSubscriptions: Array; _pendingChannelGroupSubscriptions: Array; + + _pendingHeartbeatChannels: Array; + _pendingHeartbeatChannelGroups: Array; // _dedupingManager: DedupingManager; @@ -90,12 +100,18 @@ export default class { this._channels = {}; this._presenceChannels = {}; + this._heartbeatChannels = {}; + this._heartbeatChannelGroups = {}; + this._channelGroups = {}; this._presenceChannelGroups = {}; this._pendingChannelSubscriptions = []; this._pendingChannelGroupSubscriptions = []; + this._pendingHeartbeatChannels = []; + this._pendingHeartbeatChannelGroups = []; + this._currentTimetoken = 0; this._lastTimetoken = 0; this._storedTimetoken = null; @@ -123,7 +139,7 @@ export default class { } adaptSubscribeChange(args: SubscribeArgs) { - const { timetoken, channels = [], channelGroups = [], withPresence = false } = args; + const { timetoken, channels = [], channelGroups = [], heartbeatChannels = [], heartbeatChannelGroups = [], withPresence = false } = args; if (!this._config.subscribeKey || this._config.subscribeKey === '') { if (console && console.log) console.log('subscribe key missing; aborting subscribe') //eslint-disable-line @@ -141,10 +157,24 @@ export default class { this._currentTimetoken = 0; } + heartbeatChannels.forEach((channel: string) => { + this._heartbeatChannels[channel] = { state: {} }; + this._pendingHeartbeatChannels.push(channel); + }); + + heartbeatChannelGroups.forEach((channelGroup: string) => { + this._heartbeatChannelGroups[channelGroup] = { state: {} }; + this._pendingHeartbeatChannelGroups.push(channelGroup); + }); + channels.forEach((channel: string) => { this._channels[channel] = { state: {} }; if (withPresence) this._presenceChannels[channel] = {}; + if (!(channel in this._heartbeatChannels)) { + this._heartbeatChannels[channel] = this._channels[channel]; + } + this._pendingChannelSubscriptions.push(channel); }); @@ -152,6 +182,10 @@ export default class { this._channelGroups[channelGroup] = { state: {} }; if (withPresence) this._presenceChannelGroups[channelGroup] = {}; + if (!(channelGroup in this._heartbeatChannelGroups)) { + this._heartbeatChannelGroups[channelGroup] = this._channelGroups[channelGroup]; + } + this._pendingChannelGroupSubscriptions.push(channelGroup); }); @@ -160,14 +194,30 @@ export default class { } adaptUnsubscribeChange(args: UnsubscribeArgs, isOffline: boolean) { - const { channels = [], channelGroups = [] } = args; + const { channels = [], channelGroups = [], heartbeatChannels = [], heartbeatChannelGroups = [] } = args; // keep track of which channels and channel groups // we are going to unsubscribe from. const actualChannels = []; const actualChannelGroups = []; + const actualHeartbeatChannels = []; + const actualHeartbeatChannelGroups = []; // + heartbeatChannels.forEach((channel) => { + if (channel in this._heartbeatChannels) { + delete this._heartbeatChannels[channel]; + actualHeartbeatChannels.push(channel); + } + }); + + heartbeatChannelGroups.forEach((channelGroup) => { + if (channelGroup in this._heartbeatChannelGroups) { + delete this._heartbeatChannelGroups[channelGroup]; + actualHeartbeatChannelGroups.push(channelGroup); + } + }); + channels.forEach((channel) => { if (channel in this._channels) { delete this._channels[channel]; @@ -177,6 +227,9 @@ export default class { delete this._presenceChannels[channel]; actualChannels.push(channel); } + if (channel in this._heartbeatChannels) { + delete this._heartbeatChannels[channel]; + } }); channelGroups.forEach((channelGroup) => { @@ -188,17 +241,42 @@ export default class { delete this._channelGroups[channelGroup]; actualChannelGroups.push(channelGroup); } + + if (channelGroup in this._heartbeatChannelGroups) { + delete this._heartbeatChannelGroups[channelGroup]; + } }); // no-op if there are no channels and cg's to unsubscribe from. - if (actualChannels.length === 0 && actualChannelGroups.length === 0) { + if (actualChannels.length === 0 && actualChannelGroups.length === 0 && actualHeartbeatChannels.length === 0 && actualHeartbeatChannelGroups.length === 0) { return; } if (this._config.suppressLeaveEvents === false && !isOffline) { - this._leaveEndpoint({ channels: actualChannels, channelGroups: actualChannelGroups }, (status) => { + let _actualChannels = actualChannels.map(channel => channel); + let _actualChannelGroups = actualChannelGroups.map(channelGroup => channelGroup); + + actualHeartbeatChannels.forEach((channel) => { + if (!_actualChannels.includes(channel)) { + _actualChannels.push(channel); + } + }); + + actualHeartbeatChannelGroups.forEach((channelGroup) => { + if (!_actualChannelGroups.includes(channelGroup)) { + _actualChannelGroups.push(channelGroup); + } + }); + + this._leaveEndpoint({ channels: _actualChannels, channelGroups: _actualChannelGroups }, (status) => { status.affectedChannels = actualChannels; status.affectedChannelGroups = actualChannelGroups; + + if (heartbeatChannels.length > 0 || heartbeatChannelGroups.length > 0) { + status.affectedHeartbeatChannels = actualHeartbeatChannels; + status.affectedHeartbeatChannelGroups = actualHeartbeatChannelGroups; + } + status.currentTimetoken = this._currentTimetoken; status.lastTimetoken = this._lastTimetoken; this._listenerManager.announceStatus(status); @@ -221,7 +299,20 @@ export default class { } unsubscribeAll(isOffline: boolean) { - this.adaptUnsubscribeChange({ channels: this.getSubscribedChannels(), channelGroups: this.getSubscribedChannelGroups() }, isOffline); + this.adaptUnsubscribeChange({ + channels: this.getSubscribedChannels(), + channelGroups: this.getSubscribedChannelGroups(), + heartbeatChannels: this.getHeartbeatChannels(), + heartbeatChannelGroups: this.getHeartbeatChannelGroups() + }, isOffline); + } + + getHeartbeatChannels(): Array { + return Object.keys(this._heartbeatChannels); + } + + getHeartbeatChannelGroups(): Array { + return Object.keys(this._heartbeatChannelGroups); } getSubscribedChannels(): Array { @@ -263,21 +354,21 @@ export default class { } _performHeartbeatLoop() { - let presenceChannels = Object.keys(this._channels); - let presenceChannelGroups = Object.keys(this._channelGroups); + let heartbeatChannels = Object.keys(this._heartbeatChannels); + let heartbeatChannelGroups = Object.keys(this._heartbeatChannelGroups); let presenceState = {}; - if (presenceChannels.length === 0 && presenceChannelGroups.length === 0) { + if (heartbeatChannels.length === 0 && heartbeatChannelGroups.length === 0) { return; } - presenceChannels.forEach((channel) => { - let channelState = this._channels[channel].state; + heartbeatChannels.forEach((channel) => { + let channelState = this._heartbeatChannels[channel].state; if (Object.keys(channelState).length) presenceState[channel] = channelState; }); - presenceChannelGroups.forEach((channelGroup) => { - let channelGroupState = this._channelGroups[channelGroup].state; + heartbeatChannelGroups.forEach((channelGroup) => { + let channelGroupState = this._heartbeatChannelGroups[channelGroup].state; if (Object.keys(channelGroupState).length) presenceState[channelGroup] = channelGroupState; }); @@ -299,8 +390,8 @@ export default class { }; this._heartbeatEndpoint({ - channels: presenceChannels, - channelGroups: presenceChannelGroups, + channels: heartbeatChannels, + channelGroups: heartbeatChannelGroups, state: presenceState }, onHeartbeat.bind(this)); } @@ -308,6 +399,8 @@ export default class { this._stopSubscribeLoop(); let channels = []; let channelGroups = []; + let heartbeatChannels = []; + let heartbeatChannelGroups = []; Object.keys(this._channels).forEach(channel => channels.push(channel)); Object.keys(this._presenceChannels).forEach(channel => channels.push(`${channel}-pnpres`)); @@ -315,7 +408,10 @@ export default class { Object.keys(this._channelGroups).forEach(channelGroup => channelGroups.push(channelGroup)); Object.keys(this._presenceChannelGroups).forEach(channelGroup => channelGroups.push(`${channelGroup}-pnpres`)); - if (channels.length === 0 && channelGroups.length === 0) { + Object.keys(this._heartbeatChannels).forEach(channel => heartbeatChannels.push(channel)); + Object.keys(this._heartbeatChannelGroups).forEach(channelGroup => heartbeatChannelGroups.push(channelGroup)); + + if (channels.length === 0 && channelGroups.length === 0 && heartbeatChannels.length === 0 && heartbeatChannelGroups.length === 0) { return; } @@ -381,10 +477,21 @@ export default class { } if (!this._subscriptionStatusAnnounced) { + const conciliationChannels = this.getHeartbeatChannels().every(channel => this.getSubscribedChannels().includes(channel)); + const conciliationChannelGroups = this.getHeartbeatChannelGroups().every(channelGroup => this.getSubscribedChannelGroups().includes(channelGroup)); + let connectedAnnounce: StatusAnnouncement = {}; connectedAnnounce.category = categoryConstants.PNConnectedCategory; connectedAnnounce.operation = status.operation; connectedAnnounce.affectedChannels = this._pendingChannelSubscriptions; + + if (!conciliationChannels || !conciliationChannelGroups) { + connectedAnnounce.affectedHeartbeatChannels = this._pendingHeartbeatChannels; + connectedAnnounce.affectedHeartbeatChannelGroups = this._pendingHeartbeatChannelGroups; + connectedAnnounce.heartbeatChannels = this.getHeartbeatChannels(); + connectedAnnounce.heartbeatChannelGroups = this.getHeartbeatChannelGroups(); + } + connectedAnnounce.subscribedChannels = this.getSubscribedChannels(); connectedAnnounce.affectedChannelGroups = this._pendingChannelGroupSubscriptions; connectedAnnounce.lastTimetoken = this._lastTimetoken; @@ -395,6 +502,8 @@ export default class { // clear the pending connections list this._pendingChannelSubscriptions = []; this._pendingChannelGroupSubscriptions = []; + this._pendingHeartbeatChannels = []; + this._pendingHeartbeatChannelGroups = []; } let messages = payload.messages || []; diff --git a/test/integration/components/subscription_manager.test.js b/test/integration/components/subscription_manager.test.js index f0a0b6791..65323602f 100644 --- a/test/integration/components/subscription_manager.test.js +++ b/test/integration/components/subscription_manager.test.js @@ -222,6 +222,50 @@ describe('#components/subscription_manager', () => { pubnubWithPassingHeartbeats.subscribe({ channels: ['ch1', 'ch2'], withPresence: true }); }); + it('reports when heartbeats pass with heartbeatChannels', (done) => { + const scope = utils.createNock().get('/v2/presence/sub-key/mySubKey/channel/ch1%2Cch2/heartbeat') + .query({ pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`, uuid: 'myUUID', heartbeat: 300, state: '{}' }) + .reply(200, '{"status": 200, "message": "OK", "service": "Presence"}'); + + pubnubWithPassingHeartbeats.addListener({ + status(statusPayload) { + if (statusPayload.operation !== PubNub.OPERATIONS.PNHeartbeatOperation) return; + + assert.equal(scope.isDone(), true); + assert.deepEqual({ + error: false, + operation: 'PNHeartbeatOperation', + statusCode: 200 + }, statusPayload); + done(); + } + }); + + pubnubWithPassingHeartbeats.subscribe({ heartbeatChannels: ['ch1', 'ch2'] }); + }); + + it('reports when heartbeats pass with heartbeatChannelGroups', (done) => { + const scope = utils.createNock().get('/v2/presence/sub-key/mySubKey/channel/%2C/heartbeat') + .query({ pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`, uuid: 'myUUID', heartbeat: 300, state: '{}', 'channel-group': 'cg1' }) + .reply(200, '{"status": 200, "message": "OK", "service": "Presence"}'); + + pubnubWithPassingHeartbeats.addListener({ + status(statusPayload) { + if (statusPayload.operation !== PubNub.OPERATIONS.PNHeartbeatOperation) return; + + assert.equal(scope.isDone(), true); + assert.deepEqual({ + error: false, + operation: 'PNHeartbeatOperation', + statusCode: 200 + }, statusPayload); + done(); + } + }); + + pubnubWithPassingHeartbeats.subscribe({ heartbeatChannelGroups: ['cg1'] }); + }); + it('reports when the queue is beyond set threshold', (done) => { const scope = utils.createNock().get('/v2/subscribe/mySubKey/ch1%2Cch2%2Cch1-pnpres%2Cch2-pnpres/0') .query({ pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`, uuid: 'myUUID', heartbeat: 300 })