case1.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import sys, time
  2. import paho.mqtt.client as mqtt
  3. quit_now = False
  4. DeviceId = "jXtestlwm2m"
  5. test_step = 0
  6. def on_connect(mqttc, userdata, flags, rc):
  7. global DeviceId
  8. json = "{\"CmdID\":5,\"Command\":\"Discover\",\"BaseName\":\"/3/0\"}"
  9. mqttc.publish("lwm2m/"+DeviceId+"/command", json)
  10. def on_message(mqttc, userdata, msg):
  11. global quit_now, conclusion, test_step
  12. if msg:
  13. print(["incoming message topic ", msg.topic, " payload ", msg.payload])
  14. if msg.topic == 'lwm2m/jXtestlwm2m/response':
  15. if test_step == 0:
  16. if msg.payload == '{"CmdID":5,"Command":"Discover","Result":"</3/0>,</3/0/0>,</3/0/1>,</3/0/16>,</3/0/4>"}':
  17. json = "{\"CmdID\":6,\"Command\":\"Read\",\"BaseName\":\"/3/0\"}"
  18. mqttc.publish("lwm2m/"+DeviceId+"/command", json)
  19. test_step = 1
  20. elif test_step == 1:
  21. if msg.payload == '{"CmdID":6,"Command":"Read","Result":{"bn":"/3/0","e":[{"n":"0","sv":"Open Mobile Alliance"},{"n":"1","sv":"Lightweight M2M Client"},{"n":"16","sv":"U"}]}}':
  22. test_step = 2
  23. quit_now = True
  24. def on_publish(mqttc, userdata, mid):
  25. pass
  26. def main():
  27. global DeviceId, test_step
  28. timeout = 7
  29. mqttc = mqtt.Client("test_coap_lwm2m_c02334")
  30. mqttc.on_message = on_message
  31. mqttc.on_publish = on_publish
  32. mqttc.on_connect = on_connect
  33. mqttc.connect("127.0.0.1", 1883, 120)
  34. mqttc.subscribe("lwm2m/"+DeviceId+"/response", qos=1)
  35. mqttc.loop_start()
  36. while quit_now == False and timeout > 0:
  37. time.sleep(1)
  38. timeout = timeout - 1
  39. mqttc.disconnect()
  40. mqttc.loop_stop()
  41. if test_step == 2:
  42. print("\n\n CASE1 PASS\n\n")
  43. else:
  44. print("\n\n CASE1 FAIL\n\n")
  45. if __name__ == "__main__":
  46. main()