prop_exhook_hooks.erl 22 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(prop_exhook_hooks).
  17. -include_lib("proper/include/proper.hrl").
  18. -include_lib("eunit/include/eunit.hrl").
  19. -include_lib("emqx/include/emqx_access_control.hrl").
  20. -import(
  21. emqx_proper_types,
  22. [
  23. conninfo/0,
  24. clientinfo/0,
  25. sessioninfo/0,
  26. message/0,
  27. connack_return_code/0,
  28. topictab/0,
  29. topic/0,
  30. subopts/0,
  31. pubsub/0
  32. ]
  33. ).
  34. -define(CONF_DEFAULT, <<
  35. "\n"
  36. "exhook {\n"
  37. " servers =\n"
  38. " [ { name = default,\n"
  39. " url = \"http://127.0.0.1:9000\"\n"
  40. " }\n"
  41. " ]\n"
  42. "}\n"
  43. >>).
  44. -define(ALL(Vars, Types, Exprs),
  45. ?SETUP(
  46. fun() ->
  47. State = do_setup(),
  48. fun() -> do_teardown(State) end
  49. end,
  50. ?FORALL(Vars, Types, Exprs)
  51. )
  52. ).
  53. -define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
  54. -define(DEFAULT_CLUSTER_NAME_BIN, <<"emqxcl">>).
  55. %%--------------------------------------------------------------------
  56. %% Properties
  57. %%--------------------------------------------------------------------
  58. prop_client_connect() ->
  59. ?ALL(
  60. {ConnInfo, ConnProps, Meta},
  61. {conninfo(), conn_properties(), request_meta()},
  62. begin
  63. ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
  64. {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
  65. Expected =
  66. #{
  67. props => properties(ConnProps),
  68. conninfo => from_conninfo(ConnInfo),
  69. meta => Meta
  70. },
  71. ?assertEqual(Expected, Resp),
  72. true
  73. end
  74. ).
  75. prop_client_connack() ->
  76. ?ALL(
  77. {ConnInfo, Rc, AckProps, Meta},
  78. {conninfo(), connack_return_code(), ack_properties(), request_meta()},
  79. begin
  80. ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]),
  81. {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
  82. Expected =
  83. #{
  84. props => properties(AckProps),
  85. result_code => atom_to_binary(Rc, utf8),
  86. conninfo => from_conninfo(ConnInfo),
  87. meta => Meta
  88. },
  89. ?assertEqual(Expected, Resp),
  90. true
  91. end
  92. ).
  93. prop_client_authenticate() ->
  94. ?ALL(
  95. {ClientInfo0, AuthResult, Meta},
  96. {clientinfo(), authresult(), request_meta()},
  97. begin
  98. ClientInfo = inject_magic_into(username, ClientInfo0),
  99. OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
  100. ExpectedAuthResult =
  101. case maps:get(username, ClientInfo) of
  102. <<"baduser">> ->
  103. {error, not_authorized};
  104. <<"gooduser">> ->
  105. ok;
  106. <<"normaluser">> ->
  107. ok;
  108. _ ->
  109. case AuthResult of
  110. ok -> ok;
  111. _ -> {error, not_authorized}
  112. end
  113. end,
  114. ?assertEqual(ExpectedAuthResult, OutAuthResult),
  115. {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
  116. Expected =
  117. #{
  118. result => authresult_to_bool(AuthResult),
  119. clientinfo => from_clientinfo(ClientInfo),
  120. meta => Meta
  121. },
  122. ?assertEqual(Expected, Resp),
  123. true
  124. end
  125. ).
  126. prop_client_authorize() ->
  127. MkResult = fun(Result) -> #{result => Result, from => exhook} end,
  128. ?ALL(
  129. {ClientInfo0, PubSub, Topic, Result, Meta},
  130. {
  131. clientinfo(),
  132. pubsub(),
  133. topic(),
  134. oneof([MkResult(allow), MkResult(deny)]),
  135. request_meta()
  136. },
  137. begin
  138. ClientInfo = inject_magic_into(username, ClientInfo0),
  139. OutResult = emqx_hooks:run_fold(
  140. 'client.authorize',
  141. [ClientInfo, PubSub, Topic],
  142. Result
  143. ),
  144. ExpectedOutResult =
  145. case maps:get(username, ClientInfo) of
  146. <<"baduser">> -> MkResult(deny);
  147. <<"gooduser">> -> MkResult(allow);
  148. <<"normaluser">> -> MkResult(allow);
  149. _ -> Result
  150. end,
  151. ?assertEqual(ExpectedOutResult, OutResult),
  152. {'on_client_authorize', Resp} = emqx_exhook_demo_svr:take(),
  153. Expected =
  154. #{
  155. result => aclresult_to_bool(Result),
  156. type => pubsub_to_enum(PubSub),
  157. topic => Topic,
  158. clientinfo => from_clientinfo(ClientInfo),
  159. meta => Meta
  160. },
  161. ?assertEqual(Expected, Resp),
  162. true
  163. end
  164. ).
  165. prop_client_connected() ->
  166. ?ALL(
  167. {ClientInfo, ConnInfo, Meta},
  168. {clientinfo(), conninfo(), request_meta()},
  169. begin
  170. ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
  171. {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
  172. Expected =
  173. #{
  174. clientinfo => from_clientinfo(ClientInfo),
  175. meta => Meta
  176. },
  177. ?assertEqual(Expected, Resp),
  178. true
  179. end
  180. ).
  181. prop_client_disconnected() ->
  182. ?ALL(
  183. {ClientInfo, Reason, ConnInfo, Meta},
  184. {clientinfo(), shutdown_reason(), conninfo(), request_meta()},
  185. begin
  186. ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
  187. {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
  188. Expected =
  189. #{
  190. reason => stringfy(Reason),
  191. clientinfo => from_clientinfo(ClientInfo),
  192. meta => Meta
  193. },
  194. ?assertEqual(Expected, Resp),
  195. true
  196. end
  197. ).
  198. prop_client_subscribe() ->
  199. ?ALL(
  200. {ClientInfo, SubProps, TopicTab, Meta},
  201. {clientinfo(), sub_properties(), topictab(), request_meta()},
  202. begin
  203. ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]),
  204. {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
  205. Expected =
  206. #{
  207. props => properties(SubProps),
  208. topic_filters => topicfilters(TopicTab),
  209. clientinfo => from_clientinfo(ClientInfo),
  210. meta => Meta
  211. },
  212. ?assertEqual(Expected, Resp),
  213. true
  214. end
  215. ).
  216. prop_client_unsubscribe() ->
  217. ?ALL(
  218. {ClientInfo, UnSubProps, TopicTab, Meta},
  219. {clientinfo(), unsub_properties(), topictab(), request_meta()},
  220. begin
  221. ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]),
  222. {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
  223. Expected =
  224. #{
  225. props => properties(UnSubProps),
  226. topic_filters => topicfilters(TopicTab),
  227. clientinfo => from_clientinfo(ClientInfo),
  228. meta => Meta
  229. },
  230. ?assertEqual(Expected, Resp),
  231. true
  232. end
  233. ).
  234. prop_session_created() ->
  235. ?ALL(
  236. {ClientInfo, SessInfo, Meta},
  237. {clientinfo(), sessioninfo(), request_meta()},
  238. begin
  239. ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
  240. {'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
  241. Expected =
  242. #{
  243. clientinfo => from_clientinfo(ClientInfo),
  244. meta => Meta
  245. },
  246. ?assertEqual(Expected, Resp),
  247. true
  248. end
  249. ).
  250. prop_session_subscribed() ->
  251. ?ALL(
  252. {ClientInfo, Topic, SubOpts, Meta},
  253. {clientinfo(), topic(), subopts(), request_meta()},
  254. begin
  255. ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
  256. {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
  257. Expected =
  258. #{
  259. topic => Topic,
  260. subopts => subopts(SubOpts),
  261. clientinfo => from_clientinfo(ClientInfo),
  262. meta => Meta
  263. },
  264. ?assertEqual(Expected, Resp),
  265. true
  266. end
  267. ).
  268. prop_session_unsubscribed() ->
  269. ?ALL(
  270. {ClientInfo, Topic, SubOpts, Meta},
  271. {clientinfo(), topic(), subopts(), request_meta()},
  272. begin
  273. ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
  274. {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
  275. Expected =
  276. #{
  277. topic => Topic,
  278. clientinfo => from_clientinfo(ClientInfo),
  279. meta => Meta
  280. },
  281. ?assertEqual(Expected, Resp),
  282. true
  283. end
  284. ).
  285. prop_session_resumed() ->
  286. ?ALL(
  287. {ClientInfo, SessInfo, Meta},
  288. {clientinfo(), sessioninfo(), request_meta()},
  289. begin
  290. ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
  291. {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
  292. Expected =
  293. #{
  294. clientinfo => from_clientinfo(ClientInfo),
  295. meta => Meta
  296. },
  297. ?assertEqual(Expected, Resp),
  298. true
  299. end
  300. ).
  301. prop_session_discared() ->
  302. ?ALL(
  303. {ClientInfo, SessInfo, Meta},
  304. {clientinfo(), sessioninfo(), request_meta()},
  305. begin
  306. ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
  307. {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
  308. Expected =
  309. #{clientinfo => from_clientinfo(ClientInfo), meta => Meta},
  310. ?assertEqual(Expected, Resp),
  311. true
  312. end
  313. ).
  314. prop_session_takenover() ->
  315. ?ALL(
  316. {ClientInfo, SessInfo, Meta},
  317. {clientinfo(), sessioninfo(), request_meta()},
  318. begin
  319. ok = emqx_hooks:run('session.takenover', [ClientInfo, SessInfo]),
  320. {'on_session_takenover', Resp} = emqx_exhook_demo_svr:take(),
  321. Expected =
  322. #{clientinfo => from_clientinfo(ClientInfo), meta => Meta},
  323. ?assertEqual(Expected, Resp),
  324. true
  325. end
  326. ).
  327. prop_session_terminated() ->
  328. ?ALL(
  329. {ClientInfo, Reason, SessInfo, Meta},
  330. {clientinfo(), shutdown_reason(), sessioninfo(), request_meta()},
  331. begin
  332. ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
  333. {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
  334. Expected =
  335. #{
  336. reason => stringfy(Reason),
  337. clientinfo => from_clientinfo(ClientInfo),
  338. meta => Meta
  339. },
  340. ?assertEqual(Expected, Resp),
  341. true
  342. end
  343. ).
  344. prop_message_publish() ->
  345. ?ALL(
  346. {Msg0, Meta},
  347. {message(), request_meta()},
  348. begin
  349. Msg = emqx_message:from_map(
  350. inject_magic_into(from, emqx_message:to_map(Msg0))
  351. ),
  352. OutMsg = emqx_hooks:run_fold('message.publish', [], Msg),
  353. case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
  354. true ->
  355. ?assertEqual(Msg, OutMsg),
  356. skip;
  357. _ ->
  358. ExpectedOutMsg =
  359. case emqx_message:from(Msg) of
  360. <<"baduser">> ->
  361. MsgMap =
  362. #{headers := Headers} =
  363. emqx_message:to_map(Msg),
  364. emqx_message:from_map(
  365. MsgMap#{
  366. qos => 0,
  367. topic => <<"">>,
  368. payload => <<"">>,
  369. headers => maps:put(allow_publish, false, Headers)
  370. }
  371. );
  372. <<"gooduser">> = From ->
  373. MsgMap =
  374. #{headers := Headers} =
  375. emqx_message:to_map(Msg),
  376. emqx_message:from_map(
  377. MsgMap#{
  378. topic => From,
  379. payload => From,
  380. headers => maps:put(allow_publish, true, Headers)
  381. }
  382. );
  383. _ ->
  384. Msg
  385. end,
  386. ?assertEqual(ExpectedOutMsg, OutMsg),
  387. {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(),
  388. Expected =
  389. #{
  390. message => from_message(Msg),
  391. meta => Meta
  392. },
  393. ?assertEqual(Expected, Resp)
  394. end,
  395. true
  396. end
  397. ).
  398. prop_message_dropped() ->
  399. ?ALL(
  400. {Msg, By, Reason, Meta},
  401. {message(), hardcoded, shutdown_reason(), request_meta()},
  402. begin
  403. ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]),
  404. case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
  405. true ->
  406. skip;
  407. _ ->
  408. {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(),
  409. Expected =
  410. #{
  411. reason => stringfy(Reason),
  412. message => from_message(Msg),
  413. meta => Meta
  414. },
  415. ?assertEqual(Expected, Resp)
  416. end,
  417. true
  418. end
  419. ).
  420. prop_message_delivered() ->
  421. ?ALL(
  422. {ClientInfo, Msg, Meta},
  423. {clientinfo(), message(), request_meta()},
  424. begin
  425. ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]),
  426. case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
  427. true ->
  428. skip;
  429. _ ->
  430. {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(),
  431. Expected =
  432. #{
  433. clientinfo => from_clientinfo(ClientInfo),
  434. message => from_message(Msg),
  435. meta => Meta
  436. },
  437. ?assertEqual(Expected, Resp)
  438. end,
  439. true
  440. end
  441. ).
  442. prop_message_acked() ->
  443. ?ALL(
  444. {ClientInfo, Msg, Meta},
  445. {clientinfo(), message(), request_meta()},
  446. begin
  447. ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
  448. case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
  449. true ->
  450. skip;
  451. _ ->
  452. {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(),
  453. Expected =
  454. #{
  455. clientinfo => from_clientinfo(ClientInfo),
  456. message => from_message(Msg),
  457. meta => Meta
  458. },
  459. ?assertEqual(Expected, Resp)
  460. end,
  461. true
  462. end
  463. ).
  464. nodestr() ->
  465. stringfy(node()).
  466. peerhost(#{peername := {Host, _}}) ->
  467. ntoa(Host).
  468. peerport(#{peername := {_, Port}}) ->
  469. Port.
  470. sockport(#{sockname := {_, Port}}) ->
  471. Port.
  472. %% copied from emqx_exhook
  473. ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
  474. list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
  475. ntoa(IP) ->
  476. list_to_binary(inet_parse:ntoa(IP)).
  477. option(undefined) -> <<>>;
  478. option(B) -> B.
  479. properties(undefined) ->
  480. [];
  481. properties(M) when is_map(M) ->
  482. maps:fold(
  483. fun(K, V, Acc) ->
  484. [
  485. #{
  486. name => stringfy(K),
  487. value => stringfy(V)
  488. }
  489. | Acc
  490. ]
  491. end,
  492. [],
  493. M
  494. ).
  495. topicfilters(Tfs) when is_list(Tfs) ->
  496. [
  497. #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)}
  498. || {Topic, SubOpts} <- Tfs
  499. ].
  500. %% @private
  501. stringfy(Term) when is_binary(Term) ->
  502. Term;
  503. stringfy(Term) when is_integer(Term) ->
  504. integer_to_binary(Term);
  505. stringfy(Term) when is_atom(Term) ->
  506. atom_to_binary(Term, utf8);
  507. stringfy(Term) when is_list(Term) ->
  508. list_to_binary(Term);
  509. stringfy(Term) ->
  510. unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
  511. subopts(SubOpts) ->
  512. #{
  513. qos => maps:get(qos, SubOpts, 0),
  514. rh => maps:get(rh, SubOpts, 0),
  515. rap => maps:get(rap, SubOpts, 0),
  516. nl => maps:get(nl, SubOpts, 0)
  517. }.
  518. authresult_to_bool(AuthResult) ->
  519. AuthResult == ok.
  520. aclresult_to_bool(#{result := Result}) ->
  521. Result == allow.
  522. pubsub_to_enum(?authz_action(publish)) -> 'PUBLISH';
  523. pubsub_to_enum(?authz_action(subscribe)) -> 'SUBSCRIBE'.
  524. from_conninfo(ConnInfo) ->
  525. #{
  526. node => nodestr(),
  527. clientid => maps:get(clientid, ConnInfo),
  528. username => option(maps:get(username, ConnInfo, <<>>)),
  529. peerhost => peerhost(ConnInfo),
  530. peerport => peerport(ConnInfo),
  531. sockport => sockport(ConnInfo),
  532. proto_name => maps:get(proto_name, ConnInfo),
  533. proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
  534. keepalive => maps:get(keepalive, ConnInfo)
  535. }.
  536. from_clientinfo(ClientInfo) ->
  537. #{
  538. node => nodestr(),
  539. clientid => maps:get(clientid, ClientInfo),
  540. username => option(maps:get(username, ClientInfo, <<>>)),
  541. password => option(maps:get(password, ClientInfo, <<>>)),
  542. peerhost => ntoa(maps:get(peerhost, ClientInfo)),
  543. peerport => maps:get(peerport, ClientInfo),
  544. sockport => maps:get(sockport, ClientInfo),
  545. protocol => stringfy(maps:get(protocol, ClientInfo)),
  546. mountpoint => option(maps:get(mountpoint, ClientInfo, <<>>)),
  547. is_superuser => maps:get(is_superuser, ClientInfo, false),
  548. anonymous => maps:get(anonymous, ClientInfo, true),
  549. cn => option(maps:get(cn, ClientInfo, <<>>)),
  550. dn => option(maps:get(dn, ClientInfo, <<>>))
  551. }.
  552. from_message(Msg) ->
  553. #{
  554. node => nodestr(),
  555. id => emqx_guid:to_hexstr(emqx_message:id(Msg)),
  556. qos => emqx_message:qos(Msg),
  557. from => stringfy(emqx_message:from(Msg)),
  558. topic => emqx_message:topic(Msg),
  559. payload => emqx_message:payload(Msg),
  560. timestamp => emqx_message:timestamp(Msg),
  561. headers => emqx_exhook_handler:headers(
  562. emqx_message:get_headers(Msg)
  563. )
  564. }.
  565. %%--------------------------------------------------------------------
  566. %% Helper
  567. %%--------------------------------------------------------------------
  568. do_setup() ->
  569. logger:set_primary_config(#{level => warning}),
  570. ok = ekka:start(),
  571. application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
  572. _ = emqx_exhook_demo_svr:start(),
  573. ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT),
  574. emqx_common_test_helpers:start_apps([emqx_exhook]),
  575. %% waiting first loaded event
  576. {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(),
  577. ok.
  578. do_teardown(_) ->
  579. emqx_common_test_helpers:stop_apps([emqx_exhook]),
  580. %% waiting last unloaded event
  581. {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(),
  582. _ = emqx_exhook_demo_svr:stop(),
  583. logger:set_primary_config(#{level => notice}),
  584. timer:sleep(2000),
  585. ok.
  586. %%--------------------------------------------------------------------
  587. %% Generators
  588. %%--------------------------------------------------------------------
  589. conn_properties() ->
  590. #{}.
  591. ack_properties() ->
  592. #{}.
  593. sub_properties() ->
  594. #{}.
  595. unsub_properties() ->
  596. #{}.
  597. shutdown_reason() ->
  598. oneof([utf8(), {shutdown, emqx_proper_types:limited_latin_atom()}]).
  599. authresult() ->
  600. ?LET(
  601. RC,
  602. connack_return_code(),
  603. case RC of
  604. success -> ok;
  605. _ -> {error, RC}
  606. end
  607. ).
  608. inject_magic_into(Key, Object) ->
  609. case castspell() of
  610. muggles -> Object;
  611. Spell -> Object#{Key => Spell}
  612. end.
  613. castspell() ->
  614. L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles],
  615. lists:nth(rand:uniform(length(L)), L).
  616. request_meta() ->
  617. #{
  618. node => nodestr(),
  619. version => stringfy(emqx_sys:version()),
  620. sysdescr => stringfy(emqx_sys:sysdescr()),
  621. cluster_name => ?DEFAULT_CLUSTER_NAME_BIN
  622. }.