Skip to content

Commit 3d2c3c4

Browse files
committed
[AIT-316] feat: introduce support for message annotations
- Added `RealtimeAnnotations` class to manage annotation creation, deletion, and subscription on realtime channels. - Introduced `Annotation` and `AnnotationAction` types to encapsulate annotation details and actions. - Extended flags to include `ANNOTATION_PUBLISH` and `ANNOTATION_SUBSCRIBE`. - Refactored data encoding logic into `ably.util.encoding`. - Integrated annotation handling into `RealtimeChannel` and `RestChannel`.
1 parent b0499a9 commit 3d2c3c4

File tree

15 files changed

+1434
-99
lines changed

15 files changed

+1434
-99
lines changed

ably/realtime/annotations.py

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import TYPE_CHECKING
5+
6+
from ably.rest.annotations import RestAnnotations, construct_validate_annotation
7+
from ably.transport.websockettransport import ProtocolMessageAction
8+
from ably.types.annotation import Annotation, AnnotationAction
9+
from ably.types.channelstate import ChannelState
10+
from ably.types.flags import Flag
11+
from ably.util.eventemitter import EventEmitter
12+
from ably.util.exceptions import AblyException
13+
from ably.util.helper import is_callable_or_coroutine
14+
15+
if TYPE_CHECKING:
16+
from ably.realtime.channel import RealtimeChannel
17+
from ably.realtime.connectionmanager import ConnectionManager
18+
19+
log = logging.getLogger(__name__)
20+
21+
22+
class RealtimeAnnotations:
23+
"""
24+
Provides realtime methods for managing annotations on messages,
25+
including publishing annotations and subscribing to annotation events.
26+
"""
27+
28+
__connection_manager: ConnectionManager
29+
__channel: RealtimeChannel
30+
31+
def __init__(self, channel: RealtimeChannel, connection_manager: ConnectionManager):
32+
"""
33+
Initialize RealtimeAnnotations.
34+
35+
Args:
36+
channel: The Realtime Channel this annotations instance belongs to
37+
"""
38+
self.__channel = channel
39+
self.__connection_manager = connection_manager
40+
self.__subscriptions = EventEmitter()
41+
self.__rest_annotations = RestAnnotations(channel)
42+
43+
async def publish(self, msg_or_serial, annotation: dict | Annotation, params: dict=None):
44+
"""
45+
Publish an annotation on a message via the realtime connection.
46+
47+
Args:
48+
msg_or_serial: Either a message serial (string) or a Message object
49+
annotation: Dict containing annotation properties (type, name, data, etc.) or Annotation object
50+
params: Optional dict of query parameters
51+
52+
Returns:
53+
None
54+
55+
Raises:
56+
AblyException: If the request fails, inputs are invalid, or channel is in unpublishable state
57+
"""
58+
annotation = construct_validate_annotation(msg_or_serial, annotation)
59+
60+
# Check if channel and connection are in publishable state
61+
self.__channel._throw_if_unpublishable_state()
62+
63+
log.info(
64+
f'RealtimeAnnotations.publish(), channelName = {self.__channel.name}, '
65+
f'sending annotation with messageSerial = {annotation.message_serial}, '
66+
f'type = {annotation.type}'
67+
)
68+
69+
# Convert to wire format (array of annotations)
70+
wire_annotation = annotation.as_dict(binary=self.__channel.ably.options.use_binary_protocol)
71+
72+
# Build protocol message
73+
protocol_message = {
74+
"action": ProtocolMessageAction.ANNOTATION,
75+
"channel": self.__channel.name,
76+
"annotations": [wire_annotation],
77+
}
78+
79+
if params:
80+
# Stringify boolean params
81+
stringified_params = {k: str(v).lower() if isinstance(v, bool) else v for k, v in params.items()}
82+
protocol_message["params"] = stringified_params
83+
84+
# Send via WebSocket
85+
await self.__connection_manager.send_protocol_message(protocol_message)
86+
87+
async def delete(self, msg_or_serial, annotation: dict | Annotation, params=None, timeout=None):
88+
"""
89+
Delete an annotation on a message.
90+
91+
This is a convenience method that sets the action to 'annotation.delete'
92+
and calls publish().
93+
94+
Args:
95+
msg_or_serial: Either a message serial (string) or a Message object
96+
annotation: Dict containing annotation properties or Annotation object
97+
params: Optional dict of query parameters
98+
timeout: Optional timeout (not used for realtime, kept for compatibility)
99+
100+
Returns:
101+
None
102+
103+
Raises:
104+
AblyException: If the request fails or inputs are invalid
105+
"""
106+
if isinstance(annotation, Annotation):
107+
annotation_values = annotation.as_dict()
108+
else:
109+
annotation_values = annotation.copy()
110+
annotation_values['action'] = AnnotationAction.ANNOTATION_DELETE
111+
return await self.publish(msg_or_serial, annotation_values, params)
112+
113+
async def subscribe(self, *args):
114+
"""
115+
Subscribe to annotation events on this channel.
116+
117+
Parameters
118+
----------
119+
*args: type, listener
120+
Subscribe type and listener
121+
122+
arg1(type): str, optional
123+
Subscribe to annotations of the given type
124+
125+
arg2(listener): callable
126+
Subscribe to all annotations on the channel
127+
128+
When no type is provided, arg1 is used as the listener.
129+
130+
Raises
131+
------
132+
AblyException
133+
If unable to subscribe due to invalid channel state or missing ANNOTATION_SUBSCRIBE mode
134+
ValueError
135+
If no valid subscribe arguments are passed
136+
"""
137+
# Parse arguments similar to channel.subscribe
138+
if len(args) == 0:
139+
raise ValueError("annotations.subscribe called without arguments")
140+
141+
if len(args) >= 2 and isinstance(args[0], str):
142+
annotation_type = args[0]
143+
if not args[1]:
144+
raise ValueError("annotations.subscribe called without listener")
145+
if not is_callable_or_coroutine(args[1]):
146+
raise ValueError("subscribe listener must be function or coroutine function")
147+
listener = args[1]
148+
elif is_callable_or_coroutine(args[0]):
149+
listener = args[0]
150+
annotation_type = None
151+
else:
152+
raise ValueError('invalid subscribe arguments')
153+
154+
# Register subscription
155+
if annotation_type is not None:
156+
self.__subscriptions.on(annotation_type, listener)
157+
else:
158+
self.__subscriptions.on(listener)
159+
160+
await self.__channel.attach()
161+
162+
# Check if ANNOTATION_SUBSCRIBE mode is enabled
163+
if self.__channel.state == ChannelState.ATTACHED:
164+
if not Flag.ANNOTATION_SUBSCRIBE in self.__channel.modes:
165+
raise AblyException(
166+
"You are trying to add an annotation listener, but you haven't requested the "
167+
"annotation_subscribe channel mode in ChannelOptions, so this won't do anything "
168+
"(we only deliver annotations to clients who have explicitly requested them)",
169+
93001,
170+
400
171+
)
172+
173+
def unsubscribe(self, *args):
174+
"""
175+
Unsubscribe from annotation events on this channel.
176+
177+
Parameters
178+
----------
179+
*args: type, listener
180+
Unsubscribe type and listener
181+
182+
arg1(type): str, optional
183+
Unsubscribe from annotations of the given type
184+
185+
arg2(listener): callable
186+
Unsubscribe from all annotations on the channel
187+
188+
When no type is provided, arg1 is used as the listener.
189+
190+
Raises
191+
------
192+
ValueError
193+
If no valid unsubscribe arguments are passed
194+
"""
195+
if len(args) == 0:
196+
raise ValueError("annotations.unsubscribe called without arguments")
197+
198+
if len(args) >= 2 and isinstance(args[0], str):
199+
annotation_type = args[0]
200+
listener = args[1]
201+
self.__subscriptions.off(annotation_type, listener)
202+
elif is_callable_or_coroutine(args[0]):
203+
listener = args[0]
204+
self.__subscriptions.off(listener)
205+
else:
206+
raise ValueError('invalid unsubscribe arguments')
207+
208+
def _process_incoming(self, incoming_annotations):
209+
"""
210+
Process incoming annotations from the server.
211+
212+
This is called internally when ANNOTATION protocol messages are received.
213+
214+
Args:
215+
incoming_annotations: List of Annotation objects received from the server
216+
"""
217+
for annotation in incoming_annotations:
218+
# Emit to type-specific listeners and catch-all listeners
219+
annotation_type = annotation.type or ''
220+
self.__subscriptions._emit(annotation_type, annotation)
221+
222+
async def get(self, msg_or_serial, params=None):
223+
"""
224+
Retrieve annotations for a message with pagination support.
225+
226+
This delegates to the REST implementation.
227+
228+
Args:
229+
msg_or_serial: Either a message serial (string) or a Message object
230+
params: Optional dict of query parameters (limit, start, end, direction)
231+
232+
Returns:
233+
PaginatedResult: A paginated result containing Annotation objects
234+
235+
Raises:
236+
AblyException: If the request fails or serial is invalid
237+
"""
238+
# Delegate to REST implementation
239+
return await self.__rest_annotations.get(msg_or_serial, params)

ably/realtime/channel.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
import logging
55
from typing import TYPE_CHECKING
66

7+
from ably.realtime.annotations import RealtimeAnnotations
78
from ably.realtime.connection import ConnectionState
9+
from ably.realtime.presence import RealtimePresence
810
from ably.rest.channel import Channel
911
from ably.rest.channel import Channels as RestChannels
1012
from ably.transport.websockettransport import ProtocolMessageAction
13+
from ably.types.annotation import Annotation
1114
from ably.types.channeloptions import ChannelOptions
1215
from ably.types.channelstate import ChannelState, ChannelStateChange
1316
from ably.types.flags import Flag, has_flag
@@ -18,6 +21,7 @@
1821
from ably.util.eventemitter import EventEmitter
1922
from ably.util.exceptions import AblyException, IncompatibleClientIdException
2023
from ably.util.helper import Timer, is_callable_or_coroutine, validate_message_size
24+
from ably.types.channelmode import ChannelMode, decode_channel_mode, encode_channel_mode
2125

2226
if TYPE_CHECKING:
2327
from ably.realtime.realtime import AblyRealtime
@@ -64,6 +68,7 @@ def __init__(self, realtime: AblyRealtime, name: str, channel_options: ChannelOp
6468
self.__error_reason: AblyException | None = None
6569
self.__channel_options = channel_options or ChannelOptions()
6670
self.__params: dict[str, str] | None = None
71+
self.__modes: list[ChannelMode] = list() # Channel mode flags from ATTACHED message
6772

6873
# Delta-specific fields for RTL19/RTL20 compliance
6974
vcdiff_decoder = self.__realtime.options.vcdiff_decoder if self.__realtime.options.vcdiff_decoder else None
@@ -74,12 +79,15 @@ def __init__(self, realtime: AblyRealtime, name: str, channel_options: ChannelOp
7479
# will be disrupted if the user called .off() to remove all listeners
7580
self.__internal_state_emitter = EventEmitter()
7681

82+
# Pass channel options as dictionary to parent Channel class
83+
Channel.__init__(self, realtime, name, self.__channel_options.to_dict())
84+
7785
# Initialize presence for this channel
78-
from ably.realtime.presence import RealtimePresence
86+
7987
self.__presence = RealtimePresence(self)
8088

81-
# Pass channel options as dictionary to parent Channel class
82-
Channel.__init__(self, realtime, name, self.__channel_options.to_dict())
89+
# Initialize realtime annotations for this channel (override REST annotations)
90+
self._Channel__annotations = RealtimeAnnotations(self, realtime.connection.connection_manager)
8391

8492
async def set_options(self, channel_options: ChannelOptions) -> None:
8593
"""Set channel options"""
@@ -149,8 +157,10 @@ def _attach_impl(self):
149157
"channel": self.name,
150158
}
151159

152-
if self.__attach_resume:
153-
attach_msg["flags"] = Flag.ATTACH_RESUME
160+
flags = self._encode_flags()
161+
162+
if flags:
163+
attach_msg["flags"] = flags
154164
if self.__channel_serial:
155165
attach_msg["channelSerial"] = self.__channel_serial
156166

@@ -491,8 +501,8 @@ async def _send_update(
491501
if not message.serial:
492502
raise AblyException(
493503
"Message serial is required for update/delete/append operations",
494-
400,
495-
40003
504+
status_code=400,
505+
code=40003,
496506
)
497507

498508
# Check connection and channel state
@@ -702,6 +712,8 @@ def _on_message(self, proto_msg: dict) -> None:
702712
resumed = has_flag(flags, Flag.RESUMED)
703713
# RTP1: Check for HAS_PRESENCE flag
704714
has_presence = has_flag(flags, Flag.HAS_PRESENCE)
715+
# Store channel attach flags
716+
self.__modes = decode_channel_mode(flags)
705717

706718
# RTL12
707719
if self.state == ChannelState.ATTACHED:
@@ -744,6 +756,15 @@ def _on_message(self, proto_msg: dict) -> None:
744756
decoded_presence = PresenceMessage.from_encoded_array(presence_messages, cipher=self.cipher)
745757
sync_channel_serial = proto_msg.get('channelSerial')
746758
self.__presence.set_presence(decoded_presence, is_sync=True, sync_channel_serial=sync_channel_serial)
759+
elif action == ProtocolMessageAction.ANNOTATION:
760+
# Handle ANNOTATION messages
761+
annotation_data = proto_msg.get('annotations', [])
762+
try:
763+
annotations = Annotation.from_encoded_array(annotation_data, cipher=self.cipher)
764+
# Process annotations through the annotations handler
765+
self.annotations._process_incoming(annotations)
766+
except Exception as e:
767+
log.error(f"Annotation processing error {e}. Skip annotations {annotation_data}")
747768
elif action == ProtocolMessageAction.ERROR:
748769
error = AblyException.from_dict(proto_msg.get('error'))
749770
self._notify_state(ChannelState.FAILED, reason=error)
@@ -890,6 +911,11 @@ def presence(self):
890911
"""Get the RealtimePresence object for this channel"""
891912
return self.__presence
892913

914+
@property
915+
def modes(self):
916+
"""Get the list of channel modes"""
917+
return self.__modes
918+
893919
def _start_decode_failure_recovery(self, error: AblyException) -> None:
894920
"""Start RTL18 decode failure recovery procedure"""
895921

@@ -908,6 +934,20 @@ def _start_decode_failure_recovery(self, error: AblyException) -> None:
908934
self._notify_state(ChannelState.ATTACHING, reason=error)
909935
self._check_pending_state()
910936

937+
def _encode_flags(self) -> int | None:
938+
if not self.__channel_options.modes and not self.__attach_resume:
939+
return None
940+
941+
flags = 0
942+
943+
if self.__attach_resume:
944+
flags |= Flag.ATTACH_RESUME
945+
946+
if self.__channel_options.modes:
947+
flags |= encode_channel_mode(self.__channel_options.modes)
948+
949+
return flags
950+
911951

912952
class Channels(RestChannels):
913953
"""Creates and destroys RealtimeChannel objects.

0 commit comments

Comments
 (0)