@@ -10,6 +10,7 @@ pub struct RoutingEntry {
1010
1111const DSDV_HEARTBEAT_PERIOD : u32 = 100 ;
1212const DSDV_RETRY_PERIOD : u32 = 1 * 1000 ; /* 1 second */
13+
1314type RoutingTable = HashMap < u32 , RoutingEntry > ;
1415
1516#[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
@@ -18,20 +19,32 @@ pub enum RoutableMessage {
1819 Ack ( u32 ) ,
1920}
2021
21-
2222#[ derive( Debug , Clone ) ]
2323pub enum DSDVMessage {
2424 HeartBeat ( ( RoutingTable , u32 /* from */ ) ) ,
25- RoutingRequest ( ( RoutableMessage , u32 /* rerouting_agent */ , u32 /* destination */ ) ) ,
25+ RoutingRequest (
26+ (
27+ RoutableMessage ,
28+ u32 , /* rerouting_agent */
29+ u32 , /* destination */
30+ ) ,
31+ ) ,
2632}
2733
28-
2934pub async fn dsdv_actor ( my_id : u32 , mut ctx : Context < DSDVMessage > ) {
3035 let mut last_transmission = ctx. current_step ( ) ;
31- let mut table = RoutingTable :: from ( [
32- ( my_id, RoutingEntry { metric : 0 , next_hop : my_id, sequence_number : last_transmission } ) ,
33- ] ) ;
34- ctx. send ( MessageType :: Comm ( DSDVMessage :: HeartBeat ( ( table. clone ( ) , my_id) ) ) ) ;
36+ let mut table = RoutingTable :: from ( [ (
37+ my_id,
38+ RoutingEntry {
39+ metric : 0 ,
40+ next_hop : my_id,
41+ sequence_number : last_transmission,
42+ } ,
43+ ) ] ) ;
44+ ctx. send ( MessageType :: Comm ( DSDVMessage :: HeartBeat ( (
45+ table. clone ( ) ,
46+ my_id,
47+ ) ) ) ) ;
3548
3649 log:: info!( "worker {} started" , my_id) ;
3750
@@ -50,11 +63,17 @@ pub async fn dsdv_actor(my_id: u32, mut ctx: Context<DSDVMessage>) {
5063 if rerouting_agent == my_id {
5164 if destination == my_id {
5265 // This message has achieved its addressee
53- log:: warn!( "Achieved its dest: {:?} {} {}" , rm, rerouting_agent, destination) ;
66+ log:: warn!(
67+ "Achieved its dest: {:?} {} {}" ,
68+ rm,
69+ rerouting_agent,
70+ destination
71+ ) ;
5472 match rm {
5573 RoutableMessage :: Request ( rm) => {
5674 ctx. send ( MessageType :: Request ( rm. clone ( ) ) ) ;
57- messages_to_send. push ( ( RoutableMessage :: Ack ( rm. id ) , rm. from ) ) ;
75+ messages_to_send
76+ . push ( ( RoutableMessage :: Ack ( rm. id ) , rm. from ) ) ;
5877 }
5978 RoutableMessage :: Ack ( message_id) => {
6079 retries. remove ( & message_id) ;
@@ -68,7 +87,11 @@ pub async fn dsdv_actor(my_id: u32, mut ctx: Context<DSDVMessage>) {
6887 }
6988 DSDVMessage :: HeartBeat ( ( other_table, from) ) => {
7089 for ( dst, entry) in other_table. iter ( ) {
71- if !table. contains_key ( dst) || table. contains_key ( dst) && table. get ( dst) . unwrap ( ) . sequence_number < entry. sequence_number {
90+ if !table. contains_key ( dst)
91+ || table. contains_key ( dst)
92+ && table. get ( dst) . unwrap ( ) . sequence_number
93+ < entry. sequence_number
94+ {
7295 let mut new_entry = entry. clone ( ) ;
7396 new_entry. next_hop = from;
7497 new_entry. metric = entry. metric + 1 ;
@@ -85,24 +108,34 @@ pub async fn dsdv_actor(my_id: u32, mut ctx: Context<DSDVMessage>) {
85108 messages_to_send. push ( ( RoutableMessage :: Request ( rm. clone ( ) ) , rm. to ) ) ;
86109 }
87110 }
88- log:: info!( "Size of mq: {} (agent {}) [{:?}]" , messages_to_send. len( ) , my_id, messages_to_send) ;
89-
111+ log:: info!(
112+ "Size of mq: {} (agent {}) [{:?}]" ,
113+ messages_to_send. len( ) ,
114+ my_id,
115+ messages_to_send
116+ ) ;
90117
91118 // Deduplicate messages
92- messages_to_send. sort_by_key ( |( rm, dst ) | match rm {
119+ messages_to_send. sort_by_key ( |( rm, _ ) | match rm {
93120 RoutableMessage :: Request ( rm) => rm. id ,
94- RoutableMessage :: Ack ( id) => * id
121+ RoutableMessage :: Ack ( id) => * id,
95122 } ) ;
96123 messages_to_send. dedup ( ) ;
97124 // Send all enqueued on this step messages
98125 let mut unsent_messages = Vec :: < ( RoutableMessage , u32 /* destination */ ) > :: default ( ) ;
99126 for ( msg, destination) in messages_to_send. iter ( ) {
100127 if table. contains_key ( & destination) {
101- ctx. send ( MessageType :: Comm ( DSDVMessage :: RoutingRequest ( ( msg. clone ( ) , table. get ( & destination) . unwrap ( ) . next_hop , * destination) ) ) ) ;
128+ ctx. send ( MessageType :: Comm ( DSDVMessage :: RoutingRequest ( (
129+ msg. clone ( ) ,
130+ table. get ( & destination) . unwrap ( ) . next_hop ,
131+ * destination,
132+ ) ) ) ) ;
102133 match msg {
103134 RoutableMessage :: Request ( rm) => {
104135 if retries. contains_key ( & rm. id ) {
105- retries. entry ( rm. id ) . and_modify ( |( _, last_transmission) | { * last_transmission = ctx. current_step ( ) as i32 ; } ) ;
136+ retries. entry ( rm. id ) . and_modify ( |( _, last_transmission) | {
137+ * last_transmission = ctx. current_step ( ) as i32 ;
138+ } ) ;
106139 }
107140 }
108141 RoutableMessage :: Ack ( _) => { }
@@ -117,10 +150,13 @@ pub async fn dsdv_actor(my_id: u32, mut ctx: Context<DSDVMessage>) {
117150 assert ! ( unsent_messages. is_empty( ) ) ;
118151 log:: info!( "Umq: {}" , messages_to_send. len( ) ) ;
119152 if ctx. current_step ( ) - last_transmission >= DSDV_HEARTBEAT_PERIOD {
120- for ( k , mut v) in table. iter_mut ( ) {
153+ for ( _ , mut v) in table. iter_mut ( ) {
121154 v. sequence_number = ctx. current_step ( ) ;
122155 }
123- ctx. send ( MessageType :: Comm ( DSDVMessage :: HeartBeat ( ( table. clone ( ) , my_id) ) ) ) ;
156+ ctx. send ( MessageType :: Comm ( DSDVMessage :: HeartBeat ( (
157+ table. clone ( ) ,
158+ my_id,
159+ ) ) ) ) ;
124160 last_transmission = ctx. current_step ( ) ;
125161 }
126162 }
0 commit comments