emqx_trace_handler_SUITE.erl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_trace_handler_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -define(CLIENT, [
  22. {host, "localhost"},
  23. {clientid, <<"client">>},
  24. {username, <<"testuser">>},
  25. {password, <<"pass">>}
  26. ]).
  27. all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8].
  28. init_per_suite(Config) ->
  29. Apps = emqx_cth_suite:start(
  30. [emqx],
  31. #{work_dir => emqx_cth_suite:work_dir(Config)}
  32. ),
  33. [{apps, Apps} | Config].
  34. end_per_suite(Config) ->
  35. Apps = ?config(apps, Config),
  36. ok = emqx_cth_suite:stop(Apps),
  37. ok.
  38. init_per_testcase(t_trace_clientid, Config) ->
  39. init(),
  40. Config;
  41. init_per_testcase(_Case, Config) ->
  42. _ = [logger:remove_handler(Id) || #{id := Id} <- emqx_trace_handler:running()],
  43. init(),
  44. Config.
  45. end_per_testcase(_Case, _Config) ->
  46. terminate(),
  47. ok.
  48. t_trace_clientid(_Config) ->
  49. %% Start tracing
  50. %% add list clientid
  51. ok = emqx_trace_handler:install("CLI-client1", clientid, "client", debug, "tmp/client.log"),
  52. ok = emqx_trace_handler:install("CLI-client2", clientid, <<"client2">>, all, "tmp/client2.log"),
  53. ok = emqx_trace_handler:install("CLI-client3", clientid, <<"client3">>, all, "tmp/client3.log"),
  54. {error, {handler_not_added, {file_error, ".", eisdir}}} =
  55. emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
  56. emqx_trace:check(),
  57. ok = filesync(<<"CLI-client1">>, clientid),
  58. ok = filesync(<<"CLI-client2">>, clientid),
  59. ok = filesync(<<"CLI-client3">>, clientid),
  60. %% Verify the tracing file exits
  61. ?assert(filelib:is_regular("tmp/client.log")),
  62. ?assert(filelib:is_regular("tmp/client2.log")),
  63. ?assert(filelib:is_regular("tmp/client3.log")),
  64. %% Get current traces
  65. ?assertMatch(
  66. [
  67. #{
  68. type := clientid,
  69. filter := <<"client">>,
  70. name := <<"CLI-client1">>,
  71. level := debug,
  72. dst := "tmp/client.log"
  73. },
  74. #{
  75. type := clientid,
  76. filter := <<"client2">>,
  77. name := <<"CLI-client2">>,
  78. level := debug,
  79. dst := "tmp/client2.log"
  80. },
  81. #{
  82. type := clientid,
  83. filter := <<"client3">>,
  84. name := <<"CLI-client3">>,
  85. level := debug,
  86. dst := "tmp/client3.log"
  87. }
  88. ],
  89. emqx_trace_handler:running()
  90. ),
  91. %% Client with clientid = "client" publishes a "hi" message to "a/b/c".
  92. {ok, T} = emqtt:start_link(?CLIENT),
  93. emqtt:connect(T),
  94. emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
  95. emqtt:ping(T),
  96. ok = filesync(<<"CLI-client1">>, clientid),
  97. ok = filesync(<<"CLI-client2">>, clientid),
  98. ok = filesync(<<"CLI-client3">>, clientid),
  99. %% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
  100. {ok, Bin} = file:read_file("tmp/client.log"),
  101. ?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNECT">>])),
  102. ?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNACK">>])),
  103. ?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
  104. ?assertNotEqual(nomatch, binary:match(Bin, [<<"PINGREQ">>])),
  105. ?assert(filelib:file_size("tmp/client2.log") == 0),
  106. %% Stop tracing
  107. ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client1">>),
  108. ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client2">>),
  109. ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client3">>),
  110. emqtt:disconnect(T),
  111. ?assertEqual([], emqx_trace_handler:running()).
  112. t_trace_clientid_utf8(_) ->
  113. Utf8Id = <<"client 漢字編碼"/utf8>>,
  114. ok = emqx_trace_handler:install("CLI-UTF8", clientid, Utf8Id, debug, "tmp/client-utf8.log"),
  115. emqx_trace:check(),
  116. {ok, T} = emqtt:start_link([{clientid, Utf8Id}]),
  117. emqtt:connect(T),
  118. [
  119. begin
  120. emqtt:publish(T, <<"a/b/c">>, <<"hi">>)
  121. end
  122. || _ <- lists:seq(1, 10)
  123. ],
  124. emqtt:ping(T),
  125. ok = filesync("CLI-UTF8", clientid),
  126. ok = emqx_trace_handler:uninstall(clientid, "CLI-UTF8"),
  127. emqtt:disconnect(T),
  128. ?assertEqual([], emqx_trace_handler:running()),
  129. ok.
  130. t_trace_topic(_Config) ->
  131. {ok, T} = emqtt:start_link(?CLIENT),
  132. emqtt:connect(T),
  133. %% Start tracing
  134. ok = emqx_trace_handler:install("CLI-TOPIC-1", topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
  135. ok = emqx_trace_handler:install("CLI-TOPIC-2", topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
  136. emqx_trace:check(),
  137. ok = filesync("CLI-TOPIC-1", topic),
  138. ok = filesync("CLI-TOPIC-2", topic),
  139. %% Verify the tracing file exits
  140. ?assert(filelib:is_regular("tmp/topic_trace_x.log")),
  141. ?assert(filelib:is_regular("tmp/topic_trace_y.log")),
  142. %% Get current traces
  143. ?assertMatch(
  144. [
  145. #{
  146. type := topic,
  147. filter := <<"x/#">>,
  148. level := debug,
  149. dst := "tmp/topic_trace_x.log",
  150. name := <<"CLI-TOPIC-1">>
  151. },
  152. #{
  153. type := topic,
  154. filter := <<"y/#">>,
  155. name := <<"CLI-TOPIC-2">>,
  156. level := debug,
  157. dst := "tmp/topic_trace_y.log"
  158. }
  159. ],
  160. emqx_trace_handler:running()
  161. ),
  162. %% Client with clientid = "client" publishes a "hi" message to "x/y/z".
  163. emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
  164. emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
  165. emqtt:subscribe(T, <<"x/y/z">>),
  166. emqtt:unsubscribe(T, <<"x/y/z">>),
  167. ok = filesync("CLI-TOPIC-1", topic),
  168. ok = filesync("CLI-TOPIC-2", topic),
  169. {ok, Bin} = file:read_file("tmp/topic_trace_x.log"),
  170. ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
  171. ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
  172. ?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
  173. ?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
  174. ?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
  175. ?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0),
  176. %% Stop tracing
  177. ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-1">>),
  178. ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-2">>),
  179. {error, _Reason} = emqx_trace_handler:uninstall(topic, <<"z/#">>),
  180. ?assertEqual([], emqx_trace_handler:running()),
  181. emqtt:disconnect(T).
  182. t_trace_ip_address(_Config) ->
  183. {ok, T} = emqtt:start_link(?CLIENT),
  184. emqtt:connect(T),
  185. %% Start tracing
  186. ok = emqx_trace_handler:install("CLI-IP-1", ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
  187. ok = emqx_trace_handler:install(
  188. "CLI-IP-2",
  189. ip_address,
  190. "192.168.1.1",
  191. all,
  192. "tmp/ip_trace_y.log"
  193. ),
  194. emqx_trace:check(),
  195. ok = filesync(<<"CLI-IP-1">>, ip_address),
  196. ok = filesync(<<"CLI-IP-2">>, ip_address),
  197. %% Verify the tracing file exits
  198. ?assert(filelib:is_regular("tmp/ip_trace_x.log")),
  199. ?assert(filelib:is_regular("tmp/ip_trace_y.log")),
  200. %% Get current traces
  201. ?assertMatch(
  202. [
  203. #{
  204. type := ip_address,
  205. filter := "127.0.0.1",
  206. name := <<"CLI-IP-1">>,
  207. level := debug,
  208. dst := "tmp/ip_trace_x.log"
  209. },
  210. #{
  211. type := ip_address,
  212. filter := "192.168.1.1",
  213. name := <<"CLI-IP-2">>,
  214. level := debug,
  215. dst := "tmp/ip_trace_y.log"
  216. }
  217. ],
  218. emqx_trace_handler:running()
  219. ),
  220. %% Client with clientid = "client" publishes a "hi" message to "x/y/z".
  221. emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
  222. emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
  223. emqtt:subscribe(T, <<"x/y/z">>),
  224. emqtt:unsubscribe(T, <<"x/y/z">>),
  225. ok = filesync(<<"CLI-IP-1">>, ip_address),
  226. ok = filesync(<<"CLI-IP-2">>, ip_address),
  227. {ok, Bin} = file:read_file("tmp/ip_trace_x.log"),
  228. ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
  229. ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
  230. ?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
  231. ?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
  232. ?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
  233. ?assert(filelib:file_size("tmp/ip_trace_y.log") =:= 0),
  234. %% Stop tracing
  235. ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-1">>),
  236. ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-2">>),
  237. {error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>),
  238. emqtt:disconnect(T),
  239. ?assertEqual([], emqx_trace_handler:running()).
  240. filesync(Name, Type) ->
  241. ct:sleep(50),
  242. filesync(Name, Type, 3).
  243. %% sometime the handler process is not started yet.
  244. filesync(_Name, _Type, 0) ->
  245. ok;
  246. filesync(Name0, Type, Retry) ->
  247. Name =
  248. case is_binary(Name0) of
  249. true -> Name0;
  250. false -> list_to_binary(Name0)
  251. end,
  252. try
  253. Handler = binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>),
  254. ok = logger_disk_log_h:filesync(Handler)
  255. catch
  256. E:R ->
  257. ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]),
  258. ct:sleep(100),
  259. filesync(Name, Type, Retry - 1)
  260. end.
  261. init() ->
  262. emqx_trace:start_link().
  263. terminate() ->
  264. catch ok = gen_server:stop(emqx_trace, normal, 5000).