simple_example2.erl 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. -module(simple_example2).
  2. -include("emqx_sn.hrl").
  3. -define(HOST, "localhost").
  4. -define(PORT, 1884).
  5. -export([start/0]).
  6. start() ->
  7. io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
  8. %% create udp socket
  9. {ok, Socket} = gen_udp:open(0, [binary]),
  10. %% connect to emqx_sn broker
  11. Packet = gen_connect_packet(<<"client1">>),
  12. ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
  13. io:format("send connect packet=~p~n", [Packet]),
  14. %% receive message
  15. wait_response(),
  16. %% subscribe, SHORT TOPIC NAME
  17. SubscribePacket = gen_subscribe_packet(<<"T1">>),
  18. ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
  19. io:format("send subscribe packet=~p~n", [SubscribePacket]),
  20. wait_response(),
  21. %% publish, SHORT TOPIC NAME
  22. PublishPacket = gen_publish_packet(<<"T1">>, <<"Payload...">>),
  23. ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
  24. io:format("send publish packet=~p~n", [PublishPacket]),
  25. wait_response(),
  26. % wait for subscribed message from broker
  27. wait_response(),
  28. %% disconnect from emqx_sn broker
  29. DisConnectPacket = gen_disconnect_packet(),
  30. ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
  31. io:format("send disconnect packet=~p~n", [DisConnectPacket]).
  32. gen_connect_packet(ClientId) ->
  33. Length = 6+byte_size(ClientId),
  34. MsgType = ?SN_CONNECT,
  35. Dup = 0,
  36. QoS = 0,
  37. Retain = 0,
  38. Will = 0,
  39. CleanSession = 1,
  40. TopicIdType = 0,
  41. Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
  42. ProtocolId = 1,
  43. Duration = 10,
  44. <<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
  45. gen_subscribe_packet(ShortTopic) ->
  46. Length = 7,
  47. MsgType = ?SN_SUBSCRIBE,
  48. Dup = 0,
  49. Retain = 0,
  50. Will = 0,
  51. QoS = 1,
  52. CleanSession = 0,
  53. TopicIdType = 2, % SHORT TOPIC NAME
  54. Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
  55. MsgId = 1,
  56. <<Length:8, MsgType:8, Flag/binary, MsgId:16, ShortTopic/binary>>.
  57. gen_register_packet(Topic, TopicId) ->
  58. Length = 6+byte_size(Topic),
  59. MsgType = ?SN_REGISTER,
  60. MsgId = 1,
  61. <<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
  62. gen_publish_packet(ShortTopic, Payload) ->
  63. Length = 7+byte_size(Payload),
  64. MsgType = ?SN_PUBLISH,
  65. Dup = 0,
  66. QoS = 1,
  67. Retain = 0,
  68. Will = 0,
  69. CleanSession = 0,
  70. MsgId = 1,
  71. TopicIdType = 2, % SHORT TOPIC NAME
  72. Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
  73. <<Length:8, MsgType:8, Flag/binary, ShortTopic/binary, MsgId:16, Payload/binary>>.
  74. gen_disconnect_packet()->
  75. Length = 2,
  76. MsgType = ?SN_DISCONNECT,
  77. <<Length:8, MsgType:8>>.
  78. wait_response() ->
  79. receive
  80. {udp, _Socket, _, _, Bin} ->
  81. case Bin of
  82. <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
  83. io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]);
  84. <<_Len:8, ?SN_CONNACK, 0:8>> ->
  85. io:format("recv connect ack~n");
  86. <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
  87. io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
  88. TopicId;
  89. <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
  90. io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
  91. <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
  92. io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
  93. _ ->
  94. io:format("ignore bin=~p~n", [Bin])
  95. end;
  96. Any ->
  97. io:format("recv something else from udp socket ~p~n", [Any])
  98. after
  99. 2000 ->
  100. io:format("Error: receive timeout!~n"),
  101. wait_response()
  102. end.