emqx_message_SUITE.erl 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_message_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx.hrl").
  20. -include_lib("emqx/include/emqx_mqtt.hrl").
  21. -include_lib("eunit/include/eunit.hrl").
  22. all() -> emqx_common_test_helpers:all(?MODULE).
  23. suite() ->
  24. [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
  25. t_make(_) ->
  26. Msg = emqx_message:make(<<"topic">>, <<"payload">>),
  27. ?assertEqual(?QOS_0, emqx_message:qos(Msg)),
  28. ?assertEqual(undefined, emqx_message:from(Msg)),
  29. ?assertEqual(<<"payload">>, emqx_message:payload(Msg)),
  30. Msg1 = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
  31. ?assertEqual(?QOS_0, emqx_message:qos(Msg1)),
  32. ?assertEqual(<<"topic">>, emqx_message:topic(Msg1)),
  33. Msg2 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>),
  34. ?assert(is_binary(emqx_message:id(Msg2))),
  35. ?assertEqual(?QOS_2, emqx_message:qos(Msg2)),
  36. ?assertEqual(<<"clientid">>, emqx_message:from(Msg2)),
  37. ?assertEqual(<<"topic">>, emqx_message:topic(Msg2)),
  38. ?assertEqual(<<"payload">>, emqx_message:payload(Msg2)).
  39. t_id(_) ->
  40. Msg = emqx_message:make(<<"topic">>, <<"payload">>),
  41. ?assert(is_binary(emqx_message:id(Msg))).
  42. t_qos(_) ->
  43. Msg = emqx_message:make(<<"topic">>, <<"payload">>),
  44. ?assertEqual(?QOS_0, emqx_message:qos(Msg)),
  45. Msg1 = emqx_message:make(id, ?QOS_1, <<"t">>, <<"payload">>),
  46. ?assertEqual(?QOS_1, emqx_message:qos(Msg1)),
  47. Msg2 = emqx_message:make(id, ?QOS_2, <<"t">>, <<"payload">>),
  48. ?assertEqual(?QOS_2, emqx_message:qos(Msg2)).
  49. t_topic(_) ->
  50. Msg = emqx_message:make(<<"t">>, <<"payload">>),
  51. ?assertEqual(<<"t">>, emqx_message:topic(Msg)).
  52. t_payload(_) ->
  53. Msg = emqx_message:make(<<"t">>, <<"payload">>),
  54. ?assertEqual(<<"payload">>, emqx_message:payload(Msg)).
  55. t_timestamp(_) ->
  56. Msg = emqx_message:make(<<"t">>, <<"payload">>),
  57. timer:sleep(1),
  58. ?assert(erlang:system_time(millisecond) > emqx_message:timestamp(Msg)).
  59. t_is_sys(_) ->
  60. Msg0 = emqx_message:make(<<"t">>, <<"payload">>),
  61. ?assertNot(emqx_message:is_sys(Msg0)),
  62. Msg1 = emqx_message:set_flag(sys, Msg0),
  63. ?assert(emqx_message:is_sys(Msg1)),
  64. Msg2 = emqx_message:make(<<"$SYS/events">>, <<"payload">>),
  65. ?assert(emqx_message:is_sys(Msg2)).
  66. t_clean_dup(_) ->
  67. Msg = emqx_message:make(<<"topic">>, <<"payload">>),
  68. ?assertNot(emqx_message:get_flag(dup, Msg)),
  69. Msg = emqx_message:clean_dup(Msg),
  70. Msg1 = emqx_message:set_flag(dup, Msg),
  71. ?assert(emqx_message:get_flag(dup, Msg1)),
  72. Msg2 = emqx_message:clean_dup(Msg1),
  73. ?assertNot(emqx_message:get_flag(dup, Msg2)).
  74. t_get_set_flags(_) ->
  75. Msg = #message{id = <<"id">>, qos = ?QOS_1},
  76. Msg1 = emqx_message:set_flags(#{retain => true}, Msg),
  77. ?assertEqual(#{retain => true}, emqx_message:get_flags(Msg1)),
  78. Msg2 = emqx_message:set_flags(#{dup => true}, Msg1),
  79. ?assertEqual(#{retain => true, dup => true}, emqx_message:get_flags(Msg2)).
  80. t_get_set_flag(_) ->
  81. Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
  82. ?assertNot(emqx_message:get_flag(dup, Msg)),
  83. ?assertNot(emqx_message:get_flag(retain, Msg)),
  84. Msg1 = emqx_message:set_flag(dup, true, Msg),
  85. Msg2 = emqx_message:set_flag(retain, true, Msg1),
  86. Msg3 = emqx_message:set_flag(dup, Msg2),
  87. ?assert(emqx_message:get_flag(dup, Msg3)),
  88. ?assert(emqx_message:get_flag(retain, Msg3)),
  89. Msg4 = emqx_message:unset_flag(dup, Msg3),
  90. Msg5 = emqx_message:unset_flag(retain, Msg4),
  91. Msg5 = emqx_message:unset_flag(badflag, Msg5),
  92. ?assertEqual(undefined, emqx_message:get_flag(dup, Msg5, undefined)),
  93. ?assertEqual(undefined, emqx_message:get_flag(retain, Msg5, undefined)),
  94. Msg6 = emqx_message:set_flags(#{dup => true, retain => true}, Msg5),
  95. ?assert(emqx_message:get_flag(dup, Msg6)),
  96. ?assert(emqx_message:get_flag(retain, Msg6)),
  97. Msg7 = #message{id = <<"id">>, qos = ?QOS_1},
  98. Msg8 = emqx_message:set_flag(retain, Msg7),
  99. Msg9 = emqx_message:set_flag(retain, true, Msg7),
  100. ?assertEqual(#{retain => true}, emqx_message:get_flags(Msg8)),
  101. ?assertEqual(#{retain => true}, emqx_message:get_flags(Msg9)).
  102. t_get_set_headers(_) ->
  103. Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
  104. Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg),
  105. Msg2 = emqx_message:set_headers(#{c => 3}, Msg1),
  106. ?assertEqual(#{a => 1, b => 2, c => 3}, emqx_message:get_headers(Msg2)).
  107. t_get_set_header(_) ->
  108. Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
  109. Msg = emqx_message:remove_header(x, Msg),
  110. ?assertEqual(undefined, emqx_message:get_header(a, Msg)),
  111. Msg1 = emqx_message:set_header(a, 1, Msg),
  112. Msg2 = emqx_message:set_header(b, 2, Msg1),
  113. Msg3 = emqx_message:set_header(c, 3, Msg2),
  114. ?assertEqual(1, emqx_message:get_header(a, Msg3)),
  115. ?assertEqual(4, emqx_message:get_header(d, Msg2, 4)),
  116. Msg4 = emqx_message:remove_header(a, Msg3),
  117. Msg4 = emqx_message:remove_header(a, Msg4),
  118. ?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg4)).
  119. t_undefined_headers(_) ->
  120. Msg = #message{id = <<"id">>, qos = ?QOS_0},
  121. Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg),
  122. ?assertEqual(1, emqx_message:get_header(a, Msg1)),
  123. Msg2 = emqx_message:set_header(c, 3, Msg),
  124. ?assertEqual(3, emqx_message:get_header(c, Msg2)).
  125. t_is_expired_1(_) ->
  126. test_msg_expired_property(?MODULE).
  127. make_zone_default_conf() ->
  128. maps:from_list([{Root, #{}} || Root <- emqx_zone_schema:roots()]).
  129. t_is_expired_2(_) ->
  130. %% if the 'Message-Expiry-Interval' property is set, the message_expiry_interval should be ignored
  131. try
  132. emqx_config:put(make_zone_default_conf()),
  133. emqx_config:put_zone_conf(?MODULE, [mqtt, message_expiry_interval], timer:seconds(10)),
  134. test_msg_expired_property(?MODULE)
  135. after
  136. emqx_config:erase_all()
  137. end.
  138. t_is_expired_3(_) ->
  139. try
  140. emqx_config:put(make_zone_default_conf()),
  141. emqx_config:put_zone_conf(?MODULE, [mqtt, message_expiry_interval], 100),
  142. Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
  143. ?assertNot(emqx_message:is_expired(Msg, ?MODULE)),
  144. timer:sleep(120),
  145. ?assert(emqx_message:is_expired(Msg, ?MODULE))
  146. after
  147. emqx_config:erase_all()
  148. end.
  149. test_msg_expired_property(Zone) ->
  150. Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
  151. ?assertNot(emqx_message:is_expired(Msg, Zone)),
  152. Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg),
  153. timer:sleep(500),
  154. ?assertNot(emqx_message:is_expired(Msg1, Zone)),
  155. timer:sleep(600),
  156. ?assert(emqx_message:is_expired(Msg1, Zone)).
  157. t_update_expired(_) ->
  158. Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
  159. timer:sleep(1000),
  160. ?assertEqual(Msg, emqx_message:update_expiry(Msg)),
  161. Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg),
  162. Props = emqx_message:get_header(properties, emqx_message:update_expiry(Msg1)),
  163. ?assertEqual(1, maps:get('Message-Expiry-Interval', Props)).
  164. % t_to_list(_) ->
  165. % error('TODO').
  166. t_to_packet(_) ->
  167. Pkt = #mqtt_packet{
  168. header = #mqtt_packet_header{
  169. type = ?PUBLISH,
  170. qos = ?QOS_0,
  171. retain = false,
  172. dup = false
  173. },
  174. variable = #mqtt_packet_publish{
  175. topic_name = <<"topic">>,
  176. packet_id = 10,
  177. properties = #{}
  178. },
  179. payload = <<"payload">>
  180. },
  181. Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
  182. ?assertEqual(Pkt, emqx_message:to_packet(10, Msg)).
  183. t_to_packet_with_props(_) ->
  184. Props = #{'Subscription-Identifier' => 1},
  185. Pkt = #mqtt_packet{
  186. header = #mqtt_packet_header{
  187. type = ?PUBLISH,
  188. qos = ?QOS_0,
  189. retain = false,
  190. dup = false
  191. },
  192. variable = #mqtt_packet_publish{
  193. topic_name = <<"topic">>,
  194. packet_id = 10,
  195. properties = Props
  196. },
  197. payload = <<"payload">>
  198. },
  199. Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
  200. Msg1 = emqx_message:set_header(properties, #{'Subscription-Identifier' => 1}, Msg),
  201. ?assertEqual(Pkt, emqx_message:to_packet(10, Msg1)).
  202. t_to_map(_) ->
  203. Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>),
  204. List = [
  205. {id, emqx_message:id(Msg)},
  206. {qos, ?QOS_1},
  207. {from, <<"clientid">>},
  208. {flags, #{}},
  209. {headers, #{}},
  210. {topic, <<"topic">>},
  211. {payload, <<"payload">>},
  212. {timestamp, emqx_message:timestamp(Msg)},
  213. {extra, #{}}
  214. ],
  215. ?assertEqual(List, emqx_message:to_list(Msg)),
  216. ?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
  217. t_from_map(_) ->
  218. Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>),
  219. Map = #{
  220. id => emqx_message:id(Msg),
  221. qos => ?QOS_1,
  222. from => <<"clientid">>,
  223. flags => #{},
  224. headers => #{},
  225. topic => <<"topic">>,
  226. payload => <<"payload">>,
  227. timestamp => emqx_message:timestamp(Msg),
  228. extra => #{}
  229. },
  230. ?assertEqual(Map, emqx_message:to_map(Msg)),
  231. ?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))).