emqx_exhook_SUITE.erl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_exhook_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx_access_control.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("common_test/include/ct.hrl").
  22. -include_lib("emqx/include/emqx_hooks.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster").
  25. -define(CONF_DEFAULT, <<
  26. "exhook {\n"
  27. " servers = [\n"
  28. " { name = default,\n"
  29. " url = \"http://127.0.0.1:9000\"\n"
  30. " },\n"
  31. " { name = enable,\n"
  32. " enable = false,\n"
  33. " url = \"http://127.0.0.1:9000\"\n"
  34. " },\n"
  35. " { name = error,\n"
  36. " url = \"http://127.0.0.1:9001\"\n"
  37. " },\n"
  38. " { name = not_reconnect,\n"
  39. " auto_reconnect = false,\n"
  40. " url = \"http://127.0.0.1:9001\"\n"
  41. " }\n"
  42. " ]\n"
  43. "}\n"
  44. >>).
  45. %%--------------------------------------------------------------------
  46. %% Setups
  47. %%--------------------------------------------------------------------
  48. all() -> emqx_common_test_helpers:all(?MODULE).
  49. init_per_suite(Cfg) ->
  50. _ = emqx_exhook_demo_svr:start(),
  51. Cfg.
  52. end_per_suite(_Cfg) ->
  53. emqx_exhook_demo_svr:stop().
  54. init_per_testcase(TC, Config) ->
  55. Apps = emqx_cth_suite:start(
  56. [
  57. emqx,
  58. {emqx_conf, emqx_conf(TC)},
  59. {emqx_exhook, ?CONF_DEFAULT}
  60. ],
  61. #{work_dir => emqx_cth_suite:work_dir(TC, Config)}
  62. ),
  63. [{tc_apps, Apps} | Config].
  64. end_per_testcase(_, Config) ->
  65. ok = emqx_cth_suite:stop(?config(tc_apps, Config)).
  66. emqx_conf(t_cluster_name) ->
  67. io_lib:format("cluster.name = ~p", [?OTHER_CLUSTER_NAME_STRING]);
  68. emqx_conf(_) ->
  69. #{}.
  70. %%--------------------------------------------------------------------
  71. %% Test cases
  72. %%--------------------------------------------------------------------
  73. t_access_failed_if_no_server_running(Config) ->
  74. meck:expect(emqx_metrics_worker, inc, fun(_, _, _) -> ok end),
  75. meck:expect(emqx_metrics, inc, fun(_) -> ok end),
  76. emqx_hooks:add('client.authorize', {emqx_authz, authorize, [[]]}, ?HP_AUTHZ),
  77. ClientInfo = #{
  78. clientid => <<"user-id-1">>,
  79. username => <<"usera">>,
  80. peerhost => {127, 0, 0, 1},
  81. peerport => 3456,
  82. sockport => 1883,
  83. protocol => mqtt,
  84. mountpoint => undefined
  85. },
  86. ?assertMatch(
  87. allow,
  88. emqx_access_control:authorize(
  89. ClientInfo#{username => <<"gooduser">>},
  90. ?AUTHZ_PUBLISH,
  91. <<"acl/1">>
  92. )
  93. ),
  94. ?assertMatch(
  95. deny,
  96. emqx_access_control:authorize(
  97. ClientInfo#{username => <<"baduser">>},
  98. ?AUTHZ_PUBLISH,
  99. <<"acl/2">>
  100. )
  101. ),
  102. emqx_exhook_mgr:disable(<<"default">>),
  103. ?assertMatch(
  104. {stop, {error, not_authorized}},
  105. emqx_exhook_handler:on_client_authenticate(ClientInfo, #{auth_result => success})
  106. ),
  107. ?assertMatch(
  108. {stop, #{result := deny, from := exhook}},
  109. emqx_exhook_handler:on_client_authorize(ClientInfo, ?AUTHZ_PUBLISH, <<"t/1">>, #{
  110. result => allow, from => exhook
  111. })
  112. ),
  113. Message = emqx_message:make(<<"t/1">>, <<"abc">>),
  114. ?assertMatch(
  115. {stop, Message},
  116. emqx_exhook_handler:on_message_publish(Message)
  117. ),
  118. emqx_exhook_mgr:enable(<<"default">>),
  119. emqx_hooks:del('client.authorize', {emqx_authz, authorize}),
  120. assert_get_basic_usage_info(Config).
  121. t_lookup(_) ->
  122. Result = emqx_exhook_mgr:lookup(<<"default">>),
  123. ?assertMatch(#{name := <<"default">>, status := _}, Result),
  124. not_found = emqx_exhook_mgr:lookup(<<"not_found">>).
  125. t_list(_) ->
  126. [H | _] = emqx_exhook_mgr:list(),
  127. ?assertMatch(
  128. #{
  129. name := _,
  130. status := _,
  131. hooks := _
  132. },
  133. H
  134. ).
  135. t_unexpected(_) ->
  136. ok = gen_server:cast(emqx_exhook_mgr, unexpected),
  137. unexpected = erlang:send(erlang:whereis(emqx_exhook_mgr), unexpected),
  138. Result = gen_server:call(emqx_exhook_mgr, unexpected),
  139. ?assertEqual(Result, ok).
  140. t_timer(_) ->
  141. Pid = erlang:whereis(emqx_exhook_mgr),
  142. refresh_tick = erlang:send(Pid, refresh_tick),
  143. _ = erlang:send(Pid, {timeout, undefined, {reload, <<"default">>}}),
  144. _ = erlang:send(Pid, {timeout, undefined, {reload, <<"not_found">>}}),
  145. _ = erlang:send(Pid, {timeout, undefined, {reload, <<"error">>}}),
  146. ok.
  147. t_error_update_conf(_) ->
  148. Path = [exhook, servers],
  149. Name = <<"error_update">>,
  150. ErrorCfg = #{<<"name">> => Name},
  151. {error, not_found} = emqx_exhook_mgr:update_config(Path, {update, Name, ErrorCfg}),
  152. {error, not_found} = emqx_exhook_mgr:update_config(Path, {move, Name, top}),
  153. {error, not_found} = emqx_exhook_mgr:update_config(Path, {enable, Name, true}),
  154. ErrorAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>},
  155. {ok, _} = emqx_exhook_mgr:update_config(Path, {add, ErrorAnd}),
  156. DisableAnd = #{
  157. <<"name">> => Name,
  158. <<"url">> => <<"http://127.0.0.1:9001">>,
  159. <<"enable">> => false
  160. },
  161. {ok, _} = emqx_exhook_mgr:update_config(Path, {update, Name, DisableAnd}),
  162. {ok, _} = emqx_exhook_mgr:update_config(Path, {delete, Name}),
  163. {error, not_found} = emqx_exhook_mgr:update_config(Path, {delete, Name}),
  164. ok.
  165. t_update_conf(_Config) ->
  166. Path = [exhook],
  167. Conf = #{<<"servers">> := Servers} = emqx_config:get_raw(Path),
  168. ?assert(length(Servers) > 1),
  169. Servers1 = shuffle(Servers),
  170. ReOrderedConf = Conf#{<<"servers">> => Servers1},
  171. validate_servers(Path, ReOrderedConf, Servers1),
  172. [_ | Servers2] = Servers,
  173. DeletedConf = Conf#{<<"servers">> => Servers2},
  174. validate_servers(Path, DeletedConf, Servers2),
  175. [L1, L2 | Servers3] = Servers,
  176. UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => <<"1s">>},
  177. UpdatedServers = [L1, UpdateL2 | Servers3],
  178. UpdatedConf = Conf#{<<"servers">> => UpdatedServers},
  179. validate_servers(Path, UpdatedConf, UpdatedServers),
  180. %% reset
  181. validate_servers(Path, Conf, Servers),
  182. ok.
  183. validate_servers(Path, ReOrderConf, Servers1) ->
  184. {ok, _} = emqx_exhook_mgr:update_config(Path, ReOrderConf),
  185. ?assertEqual(ReOrderConf, emqx_config:get_raw(Path)),
  186. List = emqx_exhook_mgr:list(),
  187. ExpectL = lists:map(fun(#{<<"name">> := Name}) -> Name end, Servers1),
  188. L1 = lists:map(fun(#{name := Name}) -> Name end, List),
  189. ?assertEqual(ExpectL, L1).
  190. t_error_server_info(_) ->
  191. not_found = emqx_exhook_mgr:server_info(<<"not_exists">>),
  192. ok.
  193. t_metrics(_) ->
  194. ok = emqx_exhook_metrics:succeed(<<"default">>, 'client.connect'),
  195. ok = emqx_exhook_metrics:failed(<<"default">>, 'client.connect'),
  196. true = emqx_exhook_metrics:update(1000),
  197. timer:sleep(100),
  198. SvrMetrics = emqx_exhook_metrics:server_metrics(<<"default">>),
  199. ?assertMatch(#{succeed := _, failed := _, rate := _, max_rate := _}, SvrMetrics),
  200. SvrsMetrics = emqx_exhook_metrics:servers_metrics(),
  201. ?assertMatch(#{<<"default">> := #{succeed := _}}, SvrsMetrics),
  202. HooksMetrics = emqx_exhook_metrics:hooks_metrics(<<"default">>),
  203. ?assertMatch(#{'client.connect' := #{succeed := _}}, HooksMetrics),
  204. ok.
  205. t_handler(_) ->
  206. %% connect
  207. {ok, C} = emqtt:start_link([
  208. {host, "localhost"},
  209. {port, 1883},
  210. {username, <<"gooduser">>},
  211. {clientid, <<"exhook_gooduser">>}
  212. ]),
  213. {ok, _} = emqtt:connect(C),
  214. %% pub/sub
  215. {ok, _, _} = emqtt:subscribe(C, <<"/exhook">>, qos0),
  216. timer:sleep(100),
  217. ok = emqtt:publish(C, <<"/exhook">>, <<>>, qos0),
  218. ok = emqtt:publish(C, <<"/ignore">>, <<>>, qos0),
  219. timer:sleep(100),
  220. {ok, _, _} = emqtt:unsubscribe(C, <<"/exhook">>),
  221. %% sys pub/sub
  222. ok = emqtt:publish(C, <<"$SYS">>, <<>>, qos0),
  223. {ok, _, _} = emqtt:subscribe(C, <<"$SYS/systest">>, qos1),
  224. timer:sleep(100),
  225. {ok, _} = emqtt:publish(C, <<"$SYS/systest">>, <<>>, qos1),
  226. ok = emqtt:publish(C, <<"$SYS/ignore">>, <<>>, qos0),
  227. timer:sleep(100),
  228. {ok, _, _} = emqtt:unsubscribe(C, <<"$SYS/systest">>),
  229. %% ack
  230. {ok, _, _} = emqtt:subscribe(C, <<"/exhook1">>, qos1),
  231. timer:sleep(100),
  232. {ok, _} = emqtt:publish(C, <<"/exhook1">>, <<>>, qos1),
  233. timer:sleep(100),
  234. emqtt:stop(C),
  235. timer:sleep(100),
  236. ok.
  237. t_simulated_handler(_) ->
  238. ClientInfo = #{
  239. clientid => <<"user-id-1">>,
  240. username => <<"usera">>,
  241. peerhost => {127, 0, 0, 1},
  242. peerport => 3456,
  243. sockport => 1883,
  244. protocol => mqtt,
  245. mountpoint => undefined
  246. },
  247. %% resume/takeover
  248. ok = emqx_exhook_handler:on_session_resumed(ClientInfo, undefined),
  249. ok = emqx_exhook_handler:on_session_discarded(ClientInfo, undefined),
  250. ok = emqx_exhook_handler:on_session_takenover(ClientInfo, undefined),
  251. ok.
  252. t_misc_test(_) ->
  253. "5.0.0" = emqx_exhook_proto_v1:introduced_in(),
  254. <<"test">> = emqx_exhook_server:name(#{name => <<"test">>}),
  255. _ = emqx_exhook_server:format(#{name => <<"test">>, hookspec => #{}}),
  256. ok.
  257. t_cluster_name(_) ->
  258. ?assertEqual(?OTHER_CLUSTER_NAME_STRING, emqx_sys:cluster_name()),
  259. emqx_exhook_mgr:disable(<<"default">>),
  260. emqx_exhook_mgr:enable(<<"default">>),
  261. %% See emqx_exhook_demo_svr:on_provider_loaded/2
  262. ?assertEqual([], emqx_hooks:lookup('session.created')),
  263. ?assertEqual([], emqx_hooks:lookup('message_publish')),
  264. ?assertEqual(
  265. true,
  266. erlang:length(emqx_hooks:lookup('client.connected')) > 1
  267. ),
  268. emqx_exhook_mgr:disable(<<"default">>).
  269. t_stop_timeout(_) ->
  270. snabbkaffe:start_trace(),
  271. meck:new(emqx_exhook_demo_svr, [passthrough, no_history]),
  272. meck:expect(
  273. emqx_exhook_demo_svr,
  274. on_provider_unloaded,
  275. fun(Req, Md) ->
  276. %% ensure sleep time greater than emqx_exhook_mgr shutdown timeout
  277. timer:sleep(20000),
  278. meck:passthrough([Req, Md])
  279. end
  280. ),
  281. %% stop application
  282. ok = application:stop(emqx_exhook),
  283. ?block_until(#{?snk_kind := exhook_mgr_terminated}, 20000),
  284. %% all exhook hooked point should be unloaded
  285. Mods = lists:flatten(
  286. lists:map(
  287. fun({hook, _, Cbs}) ->
  288. lists:map(fun({callback, {M, _, _}, _, _}) -> M end, Cbs)
  289. end,
  290. ets:tab2list(emqx_hooks)
  291. )
  292. ),
  293. ?assertEqual(false, lists:any(fun(M) -> M == emqx_exhook_handler end, Mods)),
  294. %% ensure started for other tests
  295. {ok, _} = application:ensure_all_started(emqx_exhook),
  296. snabbkaffe:stop(),
  297. meck:unload(emqx_exhook_demo_svr).
  298. t_ssl_clear(_) ->
  299. SvrName = <<"ssl_test">>,
  300. SSLConf = #{
  301. <<"enable">> => true,
  302. <<"cacertfile">> => cert_file("cafile"),
  303. <<"certfile">> => cert_file("certfile"),
  304. <<"keyfile">> => cert_file("keyfile"),
  305. <<"verify">> => <<"verify_peer">>
  306. },
  307. AddConf = #{
  308. <<"auto_reconnect">> => <<"60s">>,
  309. <<"enable">> => false,
  310. <<"failed_action">> => <<"deny">>,
  311. <<"name">> => <<"ssl_test">>,
  312. <<"pool_size">> => 16,
  313. <<"request_timeout">> => <<"5s">>,
  314. <<"ssl">> => SSLConf,
  315. <<"url">> => <<"http://127.0.0.1:9000">>
  316. },
  317. emqx_exhook_mgr:update_config([exhook, servers], {add, AddConf}),
  318. ListResult1 = list_pem_dir(SvrName),
  319. ?assertMatch({ok, [_, _, _]}, ListResult1),
  320. {ok, ResultList1} = ListResult1,
  321. UpdateConf = AddConf#{<<"ssl">> => SSLConf#{<<"keyfile">> => cert_file("keyfile2")}},
  322. emqx_exhook_mgr:update_config([exhook, servers], {update, SvrName, UpdateConf}),
  323. {ok, _} = emqx_tls_certfile_gc:force(),
  324. ListResult2 = list_pem_dir(SvrName),
  325. ?assertMatch({ok, [_, _, _]}, ListResult2),
  326. {ok, ResultList2} = ListResult2,
  327. FindKeyFile = fun(List) ->
  328. case lists:search(fun(E) -> lists:prefix("key", E) end, List) of
  329. {value, Value} ->
  330. Value;
  331. _ ->
  332. ?assert(false, "Can't find keyfile")
  333. end
  334. end,
  335. ?assertNotEqual(FindKeyFile(ResultList1), FindKeyFile(ResultList2)),
  336. emqx_exhook_mgr:update_config([exhook, servers], {delete, SvrName}),
  337. {ok, _} = emqx_tls_certfile_gc:force(),
  338. ?assertMatch({error, enoent}, list_pem_dir(SvrName)),
  339. ok.
  340. %%--------------------------------------------------------------------
  341. %% Cases Helpers
  342. %%--------------------------------------------------------------------
  343. assert_get_basic_usage_info(_Config) ->
  344. #{
  345. num_servers := NumServers,
  346. servers := Servers
  347. } = emqx_exhook:get_basic_usage_info(),
  348. ?assertEqual(1, NumServers),
  349. ?assertMatch([_], Servers),
  350. [#{driver := Driver, hooks := Hooks}] = Servers,
  351. ?assertEqual(grpc, Driver),
  352. ?assertEqual(
  353. [
  354. 'client.authenticate',
  355. 'client.authorize',
  356. 'client.connack',
  357. 'client.connect',
  358. 'client.connected',
  359. 'client.disconnected',
  360. 'client.subscribe',
  361. 'client.unsubscribe',
  362. 'message.acked',
  363. 'message.delivered',
  364. 'message.dropped',
  365. 'message.publish',
  366. 'session.created',
  367. 'session.discarded',
  368. 'session.resumed',
  369. 'session.subscribed',
  370. 'session.takenover',
  371. 'session.terminated',
  372. 'session.unsubscribed'
  373. ],
  374. lists:sort(Hooks)
  375. ).
  376. %%--------------------------------------------------------------------
  377. %% Utils
  378. %%--------------------------------------------------------------------
  379. meck_print() ->
  380. meck:new(emqx_ctl, [passthrough, no_history, no_link]),
  381. meck:expect(emqx_ctl, print, fun(_) -> ok end),
  382. meck:expect(emqx_ctl, print, fun(_, Args) -> Args end).
  383. unmeck_print() ->
  384. meck:unload(emqx_ctl).
  385. loaded_exhook_hookpoints() ->
  386. lists:filtermap(
  387. fun(E) ->
  388. Name = element(2, E),
  389. Callbacks = element(3, E),
  390. case lists:any(fun is_exhook_callback/1, Callbacks) of
  391. true -> {true, Name};
  392. _ -> false
  393. end
  394. end,
  395. ets:tab2list(emqx_hooks)
  396. ).
  397. is_exhook_callback(Cb) ->
  398. Action = element(2, Cb),
  399. emqx_exhook_handler == element(1, Action).
  400. list_pem_dir(Name) ->
  401. Dir = filename:join([emqx:mutable_certs_dir(), "exhook", Name]),
  402. file:list_dir(Dir).
  403. data_file(Name) ->
  404. Dir = code:lib_dir(emqx_exhook, test),
  405. {ok, Bin} = file:read_file(filename:join([Dir, "data", Name])),
  406. Bin.
  407. cert_file(Name) ->
  408. data_file(filename:join(["certs", Name])).
  409. shuffle(List) ->
  410. Sorted = lists:sort(lists:map(fun(L) -> {rand:uniform(), L} end, List)),
  411. lists:map(fun({_, L}) -> L end, Sorted).