emqx_retainer_api_SUITE.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_retainer_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include("emqx_retainer.hrl").
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  22. -include_lib("emqx/include/asserts.hrl").
  23. -import(emqx_mgmt_api_test_util, [
  24. request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0
  25. ]).
  26. all() ->
  27. emqx_common_test_helpers:all(?MODULE).
  28. init_per_suite(Config) ->
  29. Apps = emqx_cth_suite:start(
  30. [
  31. emqx_conf,
  32. emqx,
  33. emqx_retainer,
  34. emqx_management,
  35. {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
  36. ],
  37. #{
  38. work_dir => emqx_cth_suite:work_dir(Config)
  39. }
  40. ),
  41. _ = emqx_common_test_http:create_default_app(),
  42. %% make sure no "$SYS/#" topics
  43. _ = emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
  44. [{apps, Apps} | Config].
  45. end_per_suite(Config) ->
  46. emqx_common_test_http:delete_default_app(),
  47. emqx_cth_suite:stop(?config(apps, Config)).
  48. init_per_testcase(_, Config) ->
  49. snabbkaffe:start_trace(),
  50. emqx_retainer:clean(),
  51. {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
  52. {ok, _} = emqtt:connect(C),
  53. [{client, C} | Config].
  54. end_per_testcase(_, Config) ->
  55. ok = emqtt:disconnect(?config(client, Config)),
  56. snabbkaffe:stop(),
  57. ok.
  58. %%------------------------------------------------------------------------------
  59. %% Test Cases
  60. %%------------------------------------------------------------------------------
  61. t_config(_Config) ->
  62. Path = api_path(["mqtt", "retainer"]),
  63. {ok, ConfJson} = request_api(get, Path),
  64. ReturnConf = decode_json(ConfJson),
  65. ?assertMatch(
  66. #{
  67. backend := _,
  68. enable := _,
  69. max_payload_size := _,
  70. msg_clear_interval := _,
  71. msg_expiry_interval := _
  72. },
  73. ReturnConf
  74. ),
  75. UpdateConf = fun(Enable) ->
  76. RawConf = emqx_utils_json:decode(ConfJson, [return_maps]),
  77. UpdateJson = RawConf#{<<"enable">> := Enable},
  78. {ok, UpdateResJson} = request_api(
  79. put,
  80. Path,
  81. [],
  82. auth_header_(),
  83. UpdateJson
  84. ),
  85. UpdateRawConf = emqx_utils_json:decode(UpdateResJson, [return_maps]),
  86. ?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf))
  87. end,
  88. UpdateConf(false),
  89. UpdateConf(true).
  90. t_messages1(Config) ->
  91. C = ?config(client, Config),
  92. Each = fun(I) ->
  93. emqtt:publish(
  94. C,
  95. <<"retained/", (I + 60)>>,
  96. <<"retained">>,
  97. [{qos, 0}, {retain, true}]
  98. )
  99. end,
  100. ?assertWaitEvent(
  101. lists:foreach(Each, lists:seq(1, 5)),
  102. #{?snk_kind := message_retained, topic := <<"retained/A">>},
  103. 500
  104. ),
  105. {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
  106. #{data := Msgs, meta := _} = decode_json(MsgsJson),
  107. MsgLen = erlang:length(Msgs),
  108. ?assert(
  109. MsgLen =:= 5,
  110. io_lib:format("message length is:~p~n", [MsgLen])
  111. ),
  112. [First | _] = Msgs,
  113. ?assertMatch(
  114. #{
  115. msgid := _,
  116. topic := _,
  117. qos := _,
  118. publish_at := _,
  119. from_clientid := _,
  120. from_username := _
  121. },
  122. First
  123. ).
  124. t_messages2(Config) ->
  125. C = ?config(client, Config),
  126. ok = lists:foreach(
  127. fun(Topic) ->
  128. ?assertWaitEvent(
  129. emqtt:publish(C, Topic, <<"retained">>, [{qos, 0}, {retain, true}]),
  130. #{?snk_kind := message_retained, topic := Topic},
  131. 500
  132. )
  133. end,
  134. [<<"c">>, <<"c/1">>, <<"c/1/1">>]
  135. ),
  136. {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages?topic=c"])),
  137. #{data := Msgs, meta := _} = decode_json(MsgsJson),
  138. ?assertEqual(1, length(Msgs)).
  139. t_message1(Config) ->
  140. C = ?config(client, Config),
  141. ?assertWaitEvent(
  142. emqtt:publish(C, <<"c/1">>, <<"retained">>, [{qos, 0}, {retain, true}]),
  143. #{?snk_kind := message_retained, topic := <<"c/1">>},
  144. 500
  145. ),
  146. ?assertMatch(
  147. {error, {_, 404, _}},
  148. request_api(
  149. get,
  150. api_path(["mqtt", "retainer", "message", "c"])
  151. )
  152. ),
  153. {ok, Json} =
  154. request_api(
  155. get,
  156. api_path(["mqtt", "retainer", "message", "c%2F1"])
  157. ),
  158. ?assertMatch(
  159. #{
  160. topic := <<"c/1">>,
  161. payload := <<"cmV0YWluZWQ=">>
  162. },
  163. decode_json(Json)
  164. ).
  165. t_message2(Config) ->
  166. C = ?config(client, Config),
  167. ?assertWaitEvent(
  168. emqtt:publish(C, <<"c">>, <<"retained">>, [{qos, 0}, {retain, true}]),
  169. #{?snk_kind := message_retained, topic := <<"c">>},
  170. 500
  171. ),
  172. ?assertMatch(
  173. {error, {_, 404, _}},
  174. request_api(
  175. get,
  176. api_path(["mqtt", "retainer", "message", "c%2F%2B"])
  177. )
  178. ),
  179. {ok, Json0} =
  180. request_api(
  181. get,
  182. api_path(["mqtt", "retainer", "message", "c"])
  183. ),
  184. ?assertMatch(
  185. #{
  186. topic := <<"c">>,
  187. payload := <<"cmV0YWluZWQ=">>
  188. },
  189. decode_json(Json0)
  190. ),
  191. {ok, Json1} =
  192. request_api(
  193. get,
  194. api_path(["mqtt", "retainer", "message", "c%2F%23"])
  195. ),
  196. ?assertMatch(
  197. #{
  198. topic := <<"c">>,
  199. payload := <<"cmV0YWluZWQ=">>
  200. },
  201. decode_json(Json1)
  202. ).
  203. t_messages_page(Config) ->
  204. C = ?config(client, Config),
  205. Each = fun(I) ->
  206. emqtt:publish(
  207. C,
  208. <<"retained/", (I + 60)>>,
  209. <<"retained">>,
  210. [{qos, 0}, {retain, true}]
  211. )
  212. end,
  213. ?assertWaitEvent(
  214. lists:foreach(Each, lists:seq(1, 5)),
  215. #{?snk_kind := message_retained, topic := <<"retained/A">>},
  216. 500
  217. ),
  218. Page = 4,
  219. {ok, MsgsJson} = request_api(
  220. get,
  221. api_path([
  222. "mqtt", "retainer", "messages?page=" ++ erlang:integer_to_list(Page) ++ "&limit=1"
  223. ])
  224. ),
  225. #{data := Msgs, meta := #{page := Page, limit := 1}} = decode_json(MsgsJson),
  226. MsgLen = erlang:length(Msgs),
  227. ?assert(
  228. MsgLen =:= 1,
  229. io_lib:format("message length is:~p~n", [MsgLen])
  230. ),
  231. [OnlyOne] = Msgs,
  232. Topic = <<"retained/", (Page + 60)>>,
  233. ?assertMatch(
  234. #{
  235. msgid := _,
  236. topic := Topic,
  237. qos := _,
  238. publish_at := _,
  239. from_clientid := _,
  240. from_username := _
  241. },
  242. OnlyOne
  243. ).
  244. t_lookup_and_delete(Config) ->
  245. C = ?config(client, Config),
  246. timer:sleep(300),
  247. emqtt:publish(C, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
  248. timer:sleep(300),
  249. API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]),
  250. {ok, LookupJson} = request_api(get, API),
  251. LookupResult = decode_json(LookupJson),
  252. ?assertMatch(
  253. #{
  254. msgid := _,
  255. topic := _,
  256. qos := _,
  257. payload := _,
  258. publish_at := _,
  259. from_clientid := _,
  260. from_username := _
  261. },
  262. LookupResult
  263. ),
  264. {ok, []} = request_api(delete, API),
  265. {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API),
  266. {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API).
  267. t_change_storage_type(_Config) ->
  268. Path = api_path(["mqtt", "retainer"]),
  269. {ok, ConfJson} = request_api(get, Path),
  270. RawConf = emqx_utils_json:decode(ConfJson, [return_maps]),
  271. %% pre-conditions
  272. ?assertMatch(
  273. #{
  274. <<"backend">> := #{
  275. <<"type">> := <<"built_in_database">>,
  276. <<"storage_type">> := <<"ram">>
  277. },
  278. <<"enable">> := true
  279. },
  280. RawConf
  281. ),
  282. ?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
  283. ?assertEqual(ram_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
  284. ?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
  285. %% insert some retained messages
  286. {ok, C0} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
  287. {ok, _} = emqtt:connect(C0),
  288. ok = snabbkaffe:start_trace(),
  289. Topic = <<"retained">>,
  290. Payload = <<"retained">>,
  291. {ok, {ok, _}} =
  292. ?wait_async_action(
  293. emqtt:publish(C0, Topic, Payload, [{qos, 0}, {retain, true}]),
  294. #{?snk_kind := message_retained, topic := Topic},
  295. 500
  296. ),
  297. emqtt:stop(C0),
  298. ok = snabbkaffe:stop(),
  299. {ok, MsgsJson0} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
  300. #{data := Msgs0, meta := _} = decode_json(MsgsJson0),
  301. ?assertEqual(1, length(Msgs0)),
  302. ChangedConf = emqx_utils_maps:deep_merge(
  303. RawConf,
  304. #{
  305. <<"backend">> =>
  306. #{<<"storage_type">> => <<"disc">>}
  307. }
  308. ),
  309. {ok, UpdateResJson} = request_api(
  310. put,
  311. Path,
  312. [],
  313. auth_header_(),
  314. ChangedConf
  315. ),
  316. UpdatedRawConf = emqx_utils_json:decode(UpdateResJson, [return_maps]),
  317. ?assertMatch(
  318. #{
  319. <<"backend">> := #{
  320. <<"type">> := <<"built_in_database">>,
  321. <<"storage_type">> := <<"disc">>
  322. },
  323. <<"enable">> := true
  324. },
  325. UpdatedRawConf
  326. ),
  327. ?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
  328. ?assertEqual(disc_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
  329. ?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
  330. %% keep retained messages
  331. {ok, MsgsJson1} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
  332. #{data := Msgs1, meta := _} = decode_json(MsgsJson1),
  333. ?assertEqual(1, length(Msgs1)),
  334. {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
  335. {ok, _} = emqtt:connect(C1),
  336. {ok, _, _} = emqtt:subscribe(C1, Topic),
  337. receive
  338. {publish, #{topic := T, payload := P, retain := R}} ->
  339. ?assertEqual(Payload, P),
  340. ?assertEqual(Topic, T),
  341. ?assert(R),
  342. ok
  343. after 500 ->
  344. emqtt:stop(C1),
  345. ct:fail("should have preserved retained messages")
  346. end,
  347. emqtt:stop(C1),
  348. ok.
  349. t_match_and_clean(Config) ->
  350. C = ?config(client, Config),
  351. timer:sleep(300),
  352. _ = [
  353. emqtt:publish(C, <<P/binary, "/", S/binary>>, <<"retained">>, [{qos, 0}, {retain, true}])
  354. || P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>]
  355. ],
  356. timer:sleep(1000),
  357. API = api_path(["mqtt", "retainer", "messages"]),
  358. {ok, LookupJson} = request_api(get, API, "topic=t/%2B", auth_header_()),
  359. LookupResult = decode_json(LookupJson),
  360. Expected = lists:usort([<<"t/1">>, <<"t/2">>, <<"t/3">>]),
  361. ?assertMatch(
  362. Expected,
  363. lists:usort([Topic || #{topic := Topic} <- maps:get(data, LookupResult)])
  364. ),
  365. CleanAPI = api_path(["mqtt", "retainer", "messages"]),
  366. {ok, []} = request_api(delete, CleanAPI),
  367. {ok, LookupJson2} = request_api(get, API),
  368. ?assertMatch(#{data := []}, decode_json(LookupJson2)).
  369. %%--------------------------------------------------------------------
  370. %% Internal funcs
  371. %%--------------------------------------------------------------------
  372. decode_json(Data) ->
  373. BinJson = emqx_utils_json:decode(Data, [return_maps]),
  374. emqx_utils_maps:unsafe_atom_key_map(BinJson).
  375. raw_systopic_conf() ->
  376. #{
  377. <<"sys_event_messages">> =>
  378. #{
  379. <<"client_connected">> => false,
  380. <<"client_disconnected">> => false,
  381. <<"client_subscribed">> => false,
  382. <<"client_unsubscribed">> => false
  383. },
  384. <<"sys_heartbeat_interval">> => <<"1440m">>,
  385. <<"sys_msg_interval">> => <<"1440m">>
  386. }.