2424# [START iot_mqtt_includes]
2525import argparse
2626import datetime
27+ import logging
2728import os
2829import random
2930import ssl
3334import paho .mqtt .client as mqtt
3435# [END iot_mqtt_includes]
3536
37+ logging .getLogger ('googleapiclient.discovery_cache' ).setLevel (logging .CRITICAL )
38+
3639# The initial backoff time after a disconnection occurs, in seconds.
3740minimum_backoff_time = 1
3841
@@ -169,37 +172,193 @@ def get_client(
169172# [END iot_mqtt_config]
170173
171174
175+ def detach_device (client , device_id ):
176+ """Detach the device from the gateway."""
177+ # [START detach_device]
178+ detach_topic = '/devices/{}/detach' .format (device_id )
179+ print ('Detaching: {}' .format (detach_topic ))
180+ client .publish (detach_topic , '{}' , qos = 1 )
181+ # [END detach_device]
182+
183+
184+ def attach_device (client , device_id , auth ):
185+ """Attach the device to the gateway."""
186+ # [START attach_device]
187+ attach_topic = '/devices/{}/attach' .format (device_id )
188+ attach_payload = '{{"authorization" : "{}"}}' .format (auth )
189+ client .publish (attach_topic , attach_payload , qos = 1 )
190+ # [END attach_device]
191+
192+
193+ def listen_for_messages (
194+ service_account_json , project_id , cloud_region , registry_id , device_id ,
195+ gateway_id , num_messages , private_key_file , algorithm , ca_certs ,
196+ mqtt_bridge_hostname , mqtt_bridge_port , jwt_expires_minutes , duration ,
197+ cb = None ):
198+ """Listens for messages sent to the gateway and bound devices."""
199+ # [START listen_for_messages]
200+ global minimum_backoff_time
201+
202+ jwt_iat = datetime .datetime .utcnow ()
203+ jwt_exp_mins = jwt_expires_minutes
204+ # Use gateway to connect to server
205+ client = get_client (
206+ project_id , cloud_region , registry_id , gateway_id ,
207+ private_key_file , algorithm , ca_certs , mqtt_bridge_hostname ,
208+ mqtt_bridge_port )
209+
210+ attach_device (client , device_id , '' )
211+ print ('Waiting for device to attach.' )
212+ time .sleep (5 )
213+
214+ # The topic devices receive configuration updates on.
215+ device_config_topic = '/devices/{}/config' .format (device_id )
216+ client .subscribe (device_config_topic , qos = 1 )
217+
218+ # The topic gateways receive configuration updates on.
219+ gateway_config_topic = '/devices/{}/config' .format (gateway_id )
220+ client .subscribe (gateway_config_topic , qos = 1 )
221+
222+ # The topic gateways receive error updates on. QoS must be 0.
223+ error_topic = '/devices/{}/errors' .format (gateway_id )
224+ client .subscribe (error_topic , qos = 0 )
225+
226+ # Wait for about a minute for config messages.
227+ for i in range (1 , duration ):
228+ client .loop ()
229+ if cb is not None :
230+ cb (client )
231+
232+ if should_backoff :
233+ # If backoff time is too large, give up.
234+ if minimum_backoff_time > MAXIMUM_BACKOFF_TIME :
235+ print ('Exceeded maximum backoff time. Giving up.' )
236+ break
237+
238+ delay = minimum_backoff_time + random .randint (0 , 1000 ) / 1000.0
239+ time .sleep (delay )
240+ minimum_backoff_time *= 2
241+ client .connect (mqtt_bridge_hostname , mqtt_bridge_port )
242+
243+ seconds_since_issue = (datetime .datetime .utcnow () - jwt_iat ).seconds
244+ if seconds_since_issue > 60 * jwt_exp_mins :
245+ print ('Refreshing token after {}s' ).format (seconds_since_issue )
246+ jwt_iat = datetime .datetime .utcnow ()
247+ client = get_client (
248+ project_id , cloud_region , registry_id , gateway_id ,
249+ private_key_file , algorithm , ca_certs , mqtt_bridge_hostname ,
250+ mqtt_bridge_port )
251+
252+ time .sleep (1 )
253+
254+ detach_device (client , device_id )
255+
256+ print ('Finished.' )
257+ # [END listen_for_messages]
258+
259+
260+ def send_data_from_bound_device (
261+ service_account_json , project_id , cloud_region , registry_id , device_id ,
262+ gateway_id , num_messages , private_key_file , algorithm , ca_certs ,
263+ mqtt_bridge_hostname , mqtt_bridge_port , jwt_expires_minutes , payload ):
264+ """Sends data from a gateway on behalf of a device that is bound to it."""
265+ # [START send_data_from_bound_device]
266+ global minimum_backoff_time
267+
268+ # Publish device events and gateway state.
269+ device_topic = '/devices/{}/{}' .format (device_id , 'state' )
270+ gateway_topic = '/devices/{}/{}' .format (gateway_id , 'state' )
271+
272+ jwt_iat = datetime .datetime .utcnow ()
273+ jwt_exp_mins = jwt_expires_minutes
274+ # Use gateway to connect to server
275+ client = get_client (
276+ project_id , cloud_region , registry_id , gateway_id ,
277+ private_key_file , algorithm , ca_certs , mqtt_bridge_hostname ,
278+ mqtt_bridge_port )
279+
280+ attach_device (client , device_id , '' )
281+ print ('Waiting for device to attach.' )
282+ time .sleep (5 )
283+
284+ # Publish state to gateway topic
285+ gateway_state = 'Starting gateway at: {}' .format (time .time ())
286+ print (gateway_state )
287+ client .publish (gateway_topic , gateway_state , qos = 1 )
288+
289+ # Publish num_messages mesages to the MQTT bridge
290+ for i in range (1 , num_messages + 1 ):
291+ client .loop ()
292+
293+ if should_backoff :
294+ # If backoff time is too large, give up.
295+ if minimum_backoff_time > MAXIMUM_BACKOFF_TIME :
296+ print ('Exceeded maximum backoff time. Giving up.' )
297+ break
298+
299+ delay = minimum_backoff_time + random .randint (0 , 1000 ) / 1000.0
300+ time .sleep (delay )
301+ minimum_backoff_time *= 2
302+ client .connect (mqtt_bridge_hostname , mqtt_bridge_port )
303+
304+ payload = '{}/{}-{}-payload-{}' .format (
305+ registry_id , gateway_id , device_id , i )
306+
307+ print ('Publishing message {}/{}: \' {}\' to {}' .format (
308+ i , num_messages , payload , device_topic ))
309+ client .publish (
310+ device_topic , '{} : {}' .format (device_id , payload ), qos = 1 )
311+
312+ seconds_since_issue = (datetime .datetime .utcnow () - jwt_iat ).seconds
313+ if seconds_since_issue > 60 * jwt_exp_mins :
314+ print ('Refreshing token after {}s' ).format (seconds_since_issue )
315+ jwt_iat = datetime .datetime .utcnow ()
316+ client = get_client (
317+ project_id , cloud_region , registry_id , gateway_id ,
318+ private_key_file , algorithm , ca_certs , mqtt_bridge_hostname ,
319+ mqtt_bridge_port )
320+
321+ time .sleep (5 )
322+
323+ detach_device (client , device_id )
324+
325+ print ('Finished.' )
326+ # [END send_data_from_bound_device]
327+
328+
172329def parse_command_line_args ():
173330 """Parse command line arguments."""
174331 parser = argparse .ArgumentParser (description = (
175332 'Example Google Cloud IoT Core MQTT device connection code.' ))
176- parser .add_argument (
177- '--project_id' ,
178- default = os .environ .get ('GOOGLE_CLOUD_PROJECT' ),
179- help = 'GCP cloud project name' )
180- parser .add_argument (
181- '--registry_id' , required = True , help = 'Cloud IoT Core registry id' )
182- parser .add_argument (
183- '--device_id' , required = True , help = 'Cloud IoT Core device id' )
184- parser .add_argument (
185- '--private_key_file' ,
186- required = True , help = 'Path to private key file.' )
187333 parser .add_argument (
188334 '--algorithm' ,
189335 choices = ('RS256' , 'ES256' ),
190336 required = True ,
191337 help = 'Which encryption algorithm to use to generate the JWT.' )
192- parser .add_argument (
193- '--cloud_region' , default = 'us-central1' , help = 'GCP cloud region' )
194338 parser .add_argument (
195339 '--ca_certs' ,
196340 default = 'roots.pem' ,
197341 help = ('CA root from https://pki.google.com/roots.pem' ))
198342 parser .add_argument (
199- '--num_messages' ,
343+ '--cloud_region' , default = 'us-central1' , help = 'GCP cloud region' )
344+ parser .add_argument (
345+ '--data' ,
346+ default = 'Hello there' ,
347+ help = 'The telemetry data sent on behalf of a device' )
348+ parser .add_argument (
349+ '--device_id' , required = True , help = 'Cloud IoT Core device id' )
350+ parser .add_argument (
351+ '--gateway_id' , required = False , help = 'Gateway identifier.' )
352+ parser .add_argument (
353+ '--jwt_expires_minutes' ,
354+ default = 20 ,
200355 type = int ,
201- default = 100 ,
202- help = 'Number of messages to publish.' )
356+ help = ('Expiration time, in minutes, for JWT tokens.' ))
357+ parser .add_argument (
358+ '--listen_dur' ,
359+ default = 60 ,
360+ type = int ,
361+ help = 'Duration (seconds) to listen for configuration messages' )
203362 parser .add_argument (
204363 '--message_type' ,
205364 choices = ('event' , 'state' ),
@@ -217,19 +376,48 @@ def parse_command_line_args():
217376 type = int ,
218377 help = 'MQTT bridge port.' )
219378 parser .add_argument (
220- '--jwt_expires_minutes' ,
221- default = 20 ,
379+ '--num_messages' ,
222380 type = int ,
223- help = ('Expiration time, in minutes, for JWT tokens.' ))
381+ default = 100 ,
382+ help = 'Number of messages to publish.' )
383+ parser .add_argument (
384+ '--private_key_file' ,
385+ required = True ,
386+ help = 'Path to private key file.' )
387+ parser .add_argument (
388+ '--project_id' ,
389+ default = os .environ .get ('GOOGLE_CLOUD_PROJECT' ),
390+ help = 'GCP cloud project name' )
391+ parser .add_argument (
392+ '--registry_id' , required = True , help = 'Cloud IoT Core registry id' )
393+ parser .add_argument (
394+ '--service_account_json' ,
395+ default = os .environ .get ("GOOGLE_APPLICATION_CREDENTIALS" ),
396+ help = 'Path to service account json file.' )
397+
398+ # Command subparser
399+ command = parser .add_subparsers (dest = 'command' )
400+
401+ command .add_parser (
402+ 'device_demo' ,
403+ help = mqtt_device_demo .__doc__ )
404+
405+ command .add_parser (
406+ 'gateway_send' ,
407+ help = send_data_from_bound_device .__doc__ )
408+
409+ command .add_parser (
410+ 'gateway_listen' ,
411+ help = listen_for_messages .__doc__ )
224412
225413 return parser .parse_args ()
226414
227415
228- # [START iot_mqtt_run]
229- def main ():
416+ def mqtt_device_demo (args ):
417+ """Connects a device, sends data, and receives data."""
418+ # [START iot_mqtt_run]
230419 global minimum_backoff_time
231-
232- args = parse_command_line_args ()
420+ global MAXIMUM_BACKOFF_TIME
233421
234422 # Publish to the events or state topic based on the flag.
235423 sub_topic = 'events' if args .message_type == 'event' else 'state'
@@ -239,9 +427,9 @@ def main():
239427 jwt_iat = datetime .datetime .utcnow ()
240428 jwt_exp_mins = args .jwt_expires_minutes
241429 client = get_client (
242- args .project_id , args .cloud_region , args .registry_id , args . device_id ,
243- args .private_key_file , args .algorithm , args .ca_certs ,
244- args .mqtt_bridge_hostname , args .mqtt_bridge_port )
430+ args .project_id , args .cloud_region , args .registry_id ,
431+ args .device_id , args .private_key_file , args .algorithm ,
432+ args .ca_certs , args . mqtt_bridge_hostname , args .mqtt_bridge_port )
245433
246434 # Publish num_messages mesages to the MQTT bridge once per second.
247435 for i in range (1 , args .num_messages + 1 ):
@@ -284,9 +472,32 @@ def main():
284472
285473 # Send events every second. State should not be updated as often
286474 time .sleep (1 if args .message_type == 'event' else 5 )
475+ # [END iot_mqtt_run]
476+
287477
478+ def main ():
479+ args = parse_command_line_args ()
480+
481+ if args .command == 'gateway_listen' :
482+ listen_for_messages (
483+ args .service_account_json , args .project_id ,
484+ args .cloud_region , args .registry_id , args .device_id ,
485+ args .gateway_id , args .num_messages , args .private_key_file ,
486+ args .algorithm , args .ca_certs , args .mqtt_bridge_hostname ,
487+ args .mqtt_bridge_port , args .jwt_expires_minutes ,
488+ args .listen_dur )
489+ return
490+ elif args .command == 'gateway_send' :
491+ send_data_from_bound_device (
492+ args .service_account_json , args .project_id ,
493+ args .cloud_region , args .registry_id , args .device_id ,
494+ args .gateway_id , args .num_messages , args .private_key_file ,
495+ args .algorithm , args .ca_certs , args .mqtt_bridge_hostname ,
496+ args .mqtt_bridge_port , args .jwt_expires_minutes , args .data )
497+ return
498+ else :
499+ mqtt_device_demo (args )
288500 print ('Finished.' )
289- # [END iot_mqtt_run]
290501
291502
292503if __name__ == '__main__' :
0 commit comments