case2.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import sys, time
  2. import paho.mqtt.client as mqtt
  3. quit_now = False
  4. DeviceId = "jXtestlwm2m"
  5. test_step = 0
  6. def on_connect(mqttc, userdata, flags, rc):
  7. global DeviceId
  8. json = "{\"CmdID\":5,\"Command\":\"Execute\",\"BaseName\":\"/3/0/4\"}"
  9. mqttc.publish("lwm2m/"+DeviceId+"/command", json)
  10. def on_message(mqttc, userdata, msg):
  11. global quit_now, conclusion, test_step
  12. if msg:
  13. print(["incoming message topic ", msg.topic, " payload ", msg.payload])
  14. if msg.topic == 'lwm2m/jXtestlwm2m/response':
  15. if test_step == 0:
  16. if msg.payload == '{"CmdID":5,"Command":"Execute","Result":"Changed"}':
  17. test_step = 1
  18. quit_now = True
  19. def on_publish(mqttc, userdata, mid):
  20. pass
  21. def main():
  22. global DeviceId, test_step
  23. timeout = 7
  24. mqttc = mqtt.Client("test_coap_lwm2m_c02334")
  25. mqttc.on_message = on_message
  26. mqttc.on_publish = on_publish
  27. mqttc.on_connect = on_connect
  28. mqttc.connect("127.0.0.1", 1883, 120)
  29. mqttc.subscribe("lwm2m/"+DeviceId+"/response", qos=1)
  30. mqttc.loop_start()
  31. while quit_now == False and timeout > 0:
  32. time.sleep(1)
  33. timeout = timeout - 1
  34. mqttc.disconnect()
  35. mqttc.loop_stop()
  36. if test_step == 1:
  37. print("\n\n CASE2 PASS\n\n")
  38. else:
  39. print("\n\n CASE2 FAIL\n\n")
  40. if __name__ == "__main__":
  41. main()