emqx_trace_handler_SUITE.erl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_trace_handler_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -define(CLIENT, [
  22. {host, "localhost"},
  23. {clientid, <<"client">>},
  24. {username, <<"testuser">>},
  25. {password, <<"pass">>}
  26. ]).
  27. all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8].
  28. init_per_suite(Config) ->
  29. emqx_common_test_helpers:boot_modules(all),
  30. emqx_common_test_helpers:start_apps([]),
  31. Config.
  32. end_per_suite(_Config) ->
  33. emqx_common_test_helpers:stop_apps([]).
  34. init_per_testcase(t_trace_clientid, Config) ->
  35. init(),
  36. Config;
  37. init_per_testcase(_Case, Config) ->
  38. _ = [logger:remove_handler(Id) || #{id := Id} <- emqx_trace_handler:running()],
  39. init(),
  40. Config.
  41. end_per_testcase(_Case, _Config) ->
  42. terminate(),
  43. ok.
  44. t_trace_clientid(_Config) ->
  45. %% Start tracing
  46. %% add list clientid
  47. ok = emqx_trace_handler:install("CLI-client1", clientid, "client", debug, "tmp/client.log"),
  48. ok = emqx_trace_handler:install("CLI-client2", clientid, <<"client2">>, all, "tmp/client2.log"),
  49. ok = emqx_trace_handler:install("CLI-client3", clientid, <<"client3">>, all, "tmp/client3.log"),
  50. {error, {handler_not_added, {file_error, ".", eisdir}}} =
  51. emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
  52. emqx_trace:check(),
  53. ok = filesync(<<"CLI-client1">>, clientid),
  54. ok = filesync(<<"CLI-client2">>, clientid),
  55. ok = filesync(<<"CLI-client3">>, clientid),
  56. %% Verify the tracing file exits
  57. ?assert(filelib:is_regular("tmp/client.log")),
  58. ?assert(filelib:is_regular("tmp/client2.log")),
  59. ?assert(filelib:is_regular("tmp/client3.log")),
  60. %% Get current traces
  61. ?assertMatch(
  62. [
  63. #{
  64. type := clientid,
  65. filter := <<"client">>,
  66. name := <<"CLI-client1">>,
  67. level := debug,
  68. dst := "tmp/client.log"
  69. },
  70. #{
  71. type := clientid,
  72. filter := <<"client2">>,
  73. name := <<"CLI-client2">>,
  74. level := debug,
  75. dst := "tmp/client2.log"
  76. },
  77. #{
  78. type := clientid,
  79. filter := <<"client3">>,
  80. name := <<"CLI-client3">>,
  81. level := debug,
  82. dst := "tmp/client3.log"
  83. }
  84. ],
  85. emqx_trace_handler:running()
  86. ),
  87. %% Client with clientid = "client" publishes a "hi" message to "a/b/c".
  88. {ok, T} = emqtt:start_link(?CLIENT),
  89. emqtt:connect(T),
  90. emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
  91. emqtt:ping(T),
  92. ok = filesync(<<"CLI-client1">>, clientid),
  93. ok = filesync(<<"CLI-client2">>, clientid),
  94. ok = filesync(<<"CLI-client3">>, clientid),
  95. %% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
  96. {ok, Bin} = file:read_file("tmp/client.log"),
  97. ?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNECT">>])),
  98. ?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNACK">>])),
  99. ?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
  100. ?assertNotEqual(nomatch, binary:match(Bin, [<<"PINGREQ">>])),
  101. ?assert(filelib:file_size("tmp/client2.log") == 0),
  102. %% Stop tracing
  103. ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client1">>),
  104. ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client2">>),
  105. ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client3">>),
  106. emqtt:disconnect(T),
  107. ?assertEqual([], emqx_trace_handler:running()).
  108. t_trace_clientid_utf8(_) ->
  109. Utf8Id = <<"client 漢字編碼"/utf8>>,
  110. ok = emqx_trace_handler:install("CLI-UTF8", clientid, Utf8Id, debug, "tmp/client-utf8.log"),
  111. emqx_trace:check(),
  112. {ok, T} = emqtt:start_link([{clientid, Utf8Id}]),
  113. emqtt:connect(T),
  114. [
  115. begin
  116. emqtt:publish(T, <<"a/b/c">>, <<"hi">>)
  117. end
  118. || _ <- lists:seq(1, 10)
  119. ],
  120. emqtt:ping(T),
  121. ok = filesync("CLI-UTF8", clientid),
  122. ok = emqx_trace_handler:uninstall(clientid, "CLI-UTF8"),
  123. emqtt:disconnect(T),
  124. ?assertEqual([], emqx_trace_handler:running()),
  125. ok.
  126. t_trace_topic(_Config) ->
  127. {ok, T} = emqtt:start_link(?CLIENT),
  128. emqtt:connect(T),
  129. %% Start tracing
  130. ok = emqx_trace_handler:install("CLI-TOPIC-1", topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
  131. ok = emqx_trace_handler:install("CLI-TOPIC-2", topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
  132. emqx_trace:check(),
  133. ok = filesync("CLI-TOPIC-1", topic),
  134. ok = filesync("CLI-TOPIC-2", topic),
  135. %% Verify the tracing file exits
  136. ?assert(filelib:is_regular("tmp/topic_trace_x.log")),
  137. ?assert(filelib:is_regular("tmp/topic_trace_y.log")),
  138. %% Get current traces
  139. ?assertMatch(
  140. [
  141. #{
  142. type := topic,
  143. filter := <<"x/#">>,
  144. level := debug,
  145. dst := "tmp/topic_trace_x.log",
  146. name := <<"CLI-TOPIC-1">>
  147. },
  148. #{
  149. type := topic,
  150. filter := <<"y/#">>,
  151. name := <<"CLI-TOPIC-2">>,
  152. level := debug,
  153. dst := "tmp/topic_trace_y.log"
  154. }
  155. ],
  156. emqx_trace_handler:running()
  157. ),
  158. %% Client with clientid = "client" publishes a "hi" message to "x/y/z".
  159. emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
  160. emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
  161. emqtt:subscribe(T, <<"x/y/z">>),
  162. emqtt:unsubscribe(T, <<"x/y/z">>),
  163. ok = filesync("CLI-TOPIC-1", topic),
  164. ok = filesync("CLI-TOPIC-2", topic),
  165. {ok, Bin} = file:read_file("tmp/topic_trace_x.log"),
  166. ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
  167. ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
  168. ?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
  169. ?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
  170. ?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
  171. ?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0),
  172. %% Stop tracing
  173. ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-1">>),
  174. ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-2">>),
  175. {error, _Reason} = emqx_trace_handler:uninstall(topic, <<"z/#">>),
  176. ?assertEqual([], emqx_trace_handler:running()),
  177. emqtt:disconnect(T).
  178. t_trace_ip_address(_Config) ->
  179. {ok, T} = emqtt:start_link(?CLIENT),
  180. emqtt:connect(T),
  181. %% Start tracing
  182. ok = emqx_trace_handler:install("CLI-IP-1", ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
  183. ok = emqx_trace_handler:install(
  184. "CLI-IP-2",
  185. ip_address,
  186. "192.168.1.1",
  187. all,
  188. "tmp/ip_trace_y.log"
  189. ),
  190. emqx_trace:check(),
  191. ok = filesync(<<"CLI-IP-1">>, ip_address),
  192. ok = filesync(<<"CLI-IP-2">>, ip_address),
  193. %% Verify the tracing file exits
  194. ?assert(filelib:is_regular("tmp/ip_trace_x.log")),
  195. ?assert(filelib:is_regular("tmp/ip_trace_y.log")),
  196. %% Get current traces
  197. ?assertMatch(
  198. [
  199. #{
  200. type := ip_address,
  201. filter := "127.0.0.1",
  202. name := <<"CLI-IP-1">>,
  203. level := debug,
  204. dst := "tmp/ip_trace_x.log"
  205. },
  206. #{
  207. type := ip_address,
  208. filter := "192.168.1.1",
  209. name := <<"CLI-IP-2">>,
  210. level := debug,
  211. dst := "tmp/ip_trace_y.log"
  212. }
  213. ],
  214. emqx_trace_handler:running()
  215. ),
  216. %% Client with clientid = "client" publishes a "hi" message to "x/y/z".
  217. emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
  218. emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
  219. emqtt:subscribe(T, <<"x/y/z">>),
  220. emqtt:unsubscribe(T, <<"x/y/z">>),
  221. ok = filesync(<<"CLI-IP-1">>, ip_address),
  222. ok = filesync(<<"CLI-IP-2">>, ip_address),
  223. {ok, Bin} = file:read_file("tmp/ip_trace_x.log"),
  224. ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
  225. ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
  226. ?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
  227. ?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
  228. ?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
  229. ?assert(filelib:file_size("tmp/ip_trace_y.log") =:= 0),
  230. %% Stop tracing
  231. ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-1">>),
  232. ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-2">>),
  233. {error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>),
  234. emqtt:disconnect(T),
  235. ?assertEqual([], emqx_trace_handler:running()).
  236. filesync(Name, Type) ->
  237. ct:sleep(50),
  238. filesync(Name, Type, 3).
  239. %% sometime the handler process is not started yet.
  240. filesync(_Name, _Type, 0) ->
  241. ok;
  242. filesync(Name0, Type, Retry) ->
  243. Name =
  244. case is_binary(Name0) of
  245. true -> Name0;
  246. false -> list_to_binary(Name0)
  247. end,
  248. try
  249. Handler = binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>),
  250. ok = logger_disk_log_h:filesync(Handler)
  251. catch
  252. E:R ->
  253. ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]),
  254. ct:sleep(100),
  255. filesync(Name, Type, Retry - 1)
  256. end.
  257. init() ->
  258. emqx_trace:start_link().
  259. terminate() ->
  260. catch ok = gen_server:stop(emqx_trace, normal, 5000).