simple_example4.erl 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. -module(simple_example4).
  2. -include("emqx_sn.hrl").
  3. -define(HOST, {127,0,0,1}).
  4. -define(PORT, 1884).
  5. -export([start/0]).
  6. start(LoopTimes) ->
  7. io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
  8. %% create udp socket
  9. {ok, Socket} = gen_udp:open(0, [binary]),
  10. %% connect to emqx_sn broker
  11. Packet = gen_connect_packet(<<"client1">>),
  12. ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
  13. io:format("send connect packet=~p~n", [Packet]),
  14. %% receive message
  15. wait_response(),
  16. %% register topic_id
  17. RegisterPacket = gen_register_packet(<<"TopicA">>, 0),
  18. ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket),
  19. io:format("send register packet=~p~n", [RegisterPacket]),
  20. TopicId = wait_response(),
  21. %% subscribe
  22. SubscribePacket = gen_subscribe_packet(TopicId),
  23. ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
  24. io:format("send subscribe packet=~p~n", [SubscribePacket]),
  25. wait_response(),
  26. %% loop publish
  27. [begin
  28. timer:sleep(1000),
  29. io:format("~n-------------------- publish ~p start --------------------~n", [N]),
  30. PublishPacket = gen_publish_packet(TopicId, <<"Payload...">>),
  31. ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
  32. io:format("send publish packet=~p~n", [PublishPacket]),
  33. % wait for publish ack
  34. wait_response(),
  35. % wait for subscribed message from broker
  36. wait_response(),
  37. PingReqPacket = gen_pingreq_packet(),
  38. ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket),
  39. % wait for pingresp
  40. wait_response(),
  41. io:format("--------------------- publish ~p end ---------------------~n", [N])
  42. end || N <- lists:seq(1, LoopTimes)],
  43. %% disconnect from emqx_sn broker
  44. DisConnectPacket = gen_disconnect_packet(),
  45. ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
  46. io:format("send disconnect packet=~p~n", [DisConnectPacket]).
  47. gen_connect_packet(ClientId) ->
  48. Length = 6+byte_size(ClientId),
  49. MsgType = ?SN_CONNECT,
  50. Dup = 0,
  51. QoS = 0,
  52. Retain = 0,
  53. Will = 0,
  54. CleanSession = 1,
  55. TopicIdType = 0,
  56. Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
  57. ProtocolId = 1,
  58. Duration = 10,
  59. <<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
  60. gen_subscribe_packet(TopicId) ->
  61. Length = 7,
  62. MsgType = ?SN_SUBSCRIBE,
  63. Dup = 0,
  64. Retain = 0,
  65. Will = 0,
  66. QoS = 1,
  67. CleanSession = 0,
  68. TopicIdType = 1,
  69. Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
  70. MsgId = 1,
  71. <<Length:8, MsgType:8, Flag/binary, MsgId:16, TopicId:16>>.
  72. gen_register_packet(Topic, TopicId) ->
  73. Length = 6+byte_size(Topic),
  74. MsgType = ?SN_REGISTER,
  75. MsgId = 1,
  76. <<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
  77. gen_publish_packet(TopicId, Payload) ->
  78. Length = 7+byte_size(Payload),
  79. MsgType = ?SN_PUBLISH,
  80. Dup = 0,
  81. QoS = 1,
  82. Retain = 0,
  83. Will = 0,
  84. CleanSession = 0,
  85. MsgId = 1,
  86. TopicIdType = 1,
  87. Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
  88. <<Length:8, MsgType:8, Flag/binary, TopicId:16, MsgId:16, Payload/binary>>.
  89. gen_puback_packet(TopicId, MsgId) ->
  90. Length = 7,
  91. MsgType = ?SN_PUBACK,
  92. <<Length:8, MsgType:8, TopicId:16, MsgId:16, 0:8>>.
  93. gen_pingreq_packet() ->
  94. Length = 2,
  95. MsgType = ?SN_PINGREQ,
  96. <<Length:8, MsgType:8>>.
  97. gen_disconnect_packet()->
  98. Length = 2,
  99. MsgType = ?SN_DISCONNECT,
  100. <<Length:8, MsgType:8>>.
  101. wait_response() ->
  102. receive
  103. {udp, Socket, _, _, Bin} ->
  104. case Bin of
  105. <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
  106. io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]),
  107. ok = gen_udp:send(Socket, ?HOST, ?PORT, gen_puback_packet(TopicId, MsgId));
  108. <<_Len:8, ?SN_CONNACK, 0:8>> ->
  109. io:format("recv connect ack~n");
  110. <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
  111. io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
  112. TopicId;
  113. <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
  114. io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
  115. <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
  116. io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
  117. <<_Len:8, ?SN_PINGRESP>> ->
  118. io:format("recv pingresp~n");
  119. _ ->
  120. io:format("ignore bin=~p~n", [Bin])
  121. end;
  122. Any ->
  123. io:format("recv something else from udp socket ~p~n", [Any])
  124. after
  125. 2000 ->
  126. io:format("Error: receive timeout!~n"),
  127. wait_response()
  128. end.