prop_exhook_hooks.erl 21 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(prop_exhook_hooks).
  17. -include_lib("proper/include/proper.hrl").
  18. -include_lib("eunit/include/eunit.hrl").
  19. -import(
  20. emqx_proper_types,
  21. [
  22. conninfo/0,
  23. clientinfo/0,
  24. sessioninfo/0,
  25. message/0,
  26. connack_return_code/0,
  27. topictab/0,
  28. topic/0,
  29. subopts/0
  30. ]
  31. ).
  32. -define(CONF_DEFAULT, <<
  33. "\n"
  34. "exhook {\n"
  35. " servers =\n"
  36. " [ { name = default,\n"
  37. " url = \"http://127.0.0.1:9000\"\n"
  38. " }\n"
  39. " ]\n"
  40. "}\n"
  41. >>).
  42. -define(ALL(Vars, Types, Exprs),
  43. ?SETUP(
  44. fun() ->
  45. State = do_setup(),
  46. fun() -> do_teardown(State) end
  47. end,
  48. ?FORALL(Vars, Types, Exprs)
  49. )
  50. ).
  51. -define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
  52. -define(DEFAULT_CLUSTER_NAME_BIN, <<"emqxcl">>).
  53. %%--------------------------------------------------------------------
  54. %% Properties
  55. %%--------------------------------------------------------------------
  56. prop_client_connect() ->
  57. ?ALL(
  58. {ConnInfo, ConnProps, Meta},
  59. {conninfo(), conn_properties(), request_meta()},
  60. begin
  61. ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
  62. {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
  63. Expected =
  64. #{
  65. props => properties(ConnProps),
  66. conninfo => from_conninfo(ConnInfo),
  67. meta => Meta
  68. },
  69. ?assertEqual(Expected, Resp),
  70. true
  71. end
  72. ).
  73. prop_client_connack() ->
  74. ?ALL(
  75. {ConnInfo, Rc, AckProps, Meta},
  76. {conninfo(), connack_return_code(), ack_properties(), request_meta()},
  77. begin
  78. ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]),
  79. {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
  80. Expected =
  81. #{
  82. props => properties(AckProps),
  83. result_code => atom_to_binary(Rc, utf8),
  84. conninfo => from_conninfo(ConnInfo),
  85. meta => Meta
  86. },
  87. ?assertEqual(Expected, Resp),
  88. true
  89. end
  90. ).
  91. prop_client_authenticate() ->
  92. ?ALL(
  93. {ClientInfo0, AuthResult, Meta},
  94. {clientinfo(), authresult(), request_meta()},
  95. begin
  96. ClientInfo = inject_magic_into(username, ClientInfo0),
  97. OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
  98. ExpectedAuthResult =
  99. case maps:get(username, ClientInfo) of
  100. <<"baduser">> ->
  101. {error, not_authorized};
  102. <<"gooduser">> ->
  103. ok;
  104. <<"normaluser">> ->
  105. ok;
  106. _ ->
  107. case AuthResult of
  108. ok -> ok;
  109. _ -> {error, not_authorized}
  110. end
  111. end,
  112. ?assertEqual(ExpectedAuthResult, OutAuthResult),
  113. {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
  114. Expected =
  115. #{
  116. result => authresult_to_bool(AuthResult),
  117. clientinfo => from_clientinfo(ClientInfo),
  118. meta => Meta
  119. },
  120. ?assertEqual(Expected, Resp),
  121. true
  122. end
  123. ).
  124. prop_client_authorize() ->
  125. ?ALL(
  126. {ClientInfo0, PubSub, Topic, Result, Meta},
  127. {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny]), request_meta()},
  128. begin
  129. ClientInfo = inject_magic_into(username, ClientInfo0),
  130. OutResult = emqx_hooks:run_fold(
  131. 'client.authorize',
  132. [ClientInfo, PubSub, Topic],
  133. Result
  134. ),
  135. ExpectedOutResult =
  136. case maps:get(username, ClientInfo) of
  137. <<"baduser">> -> deny;
  138. <<"gooduser">> -> allow;
  139. <<"normaluser">> -> allow;
  140. _ -> Result
  141. end,
  142. ?assertEqual(ExpectedOutResult, OutResult),
  143. {'on_client_authorize', Resp} = emqx_exhook_demo_svr:take(),
  144. Expected =
  145. #{
  146. result => aclresult_to_bool(Result),
  147. type => pubsub_to_enum(PubSub),
  148. topic => Topic,
  149. clientinfo => from_clientinfo(ClientInfo),
  150. meta => Meta
  151. },
  152. ?assertEqual(Expected, Resp),
  153. true
  154. end
  155. ).
  156. prop_client_connected() ->
  157. ?ALL(
  158. {ClientInfo, ConnInfo, Meta},
  159. {clientinfo(), conninfo(), request_meta()},
  160. begin
  161. ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
  162. {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
  163. Expected =
  164. #{
  165. clientinfo => from_clientinfo(ClientInfo),
  166. meta => Meta
  167. },
  168. ?assertEqual(Expected, Resp),
  169. true
  170. end
  171. ).
  172. prop_client_disconnected() ->
  173. ?ALL(
  174. {ClientInfo, Reason, ConnInfo, Meta},
  175. {clientinfo(), shutdown_reason(), conninfo(), request_meta()},
  176. begin
  177. ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
  178. {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
  179. Expected =
  180. #{
  181. reason => stringfy(Reason),
  182. clientinfo => from_clientinfo(ClientInfo),
  183. meta => Meta
  184. },
  185. ?assertEqual(Expected, Resp),
  186. true
  187. end
  188. ).
  189. prop_client_subscribe() ->
  190. ?ALL(
  191. {ClientInfo, SubProps, TopicTab, Meta},
  192. {clientinfo(), sub_properties(), topictab(), request_meta()},
  193. begin
  194. ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]),
  195. {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
  196. Expected =
  197. #{
  198. props => properties(SubProps),
  199. topic_filters => topicfilters(TopicTab),
  200. clientinfo => from_clientinfo(ClientInfo),
  201. meta => Meta
  202. },
  203. ?assertEqual(Expected, Resp),
  204. true
  205. end
  206. ).
  207. prop_client_unsubscribe() ->
  208. ?ALL(
  209. {ClientInfo, UnSubProps, TopicTab, Meta},
  210. {clientinfo(), unsub_properties(), topictab(), request_meta()},
  211. begin
  212. ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]),
  213. {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
  214. Expected =
  215. #{
  216. props => properties(UnSubProps),
  217. topic_filters => topicfilters(TopicTab),
  218. clientinfo => from_clientinfo(ClientInfo),
  219. meta => Meta
  220. },
  221. ?assertEqual(Expected, Resp),
  222. true
  223. end
  224. ).
  225. prop_session_created() ->
  226. ?ALL(
  227. {ClientInfo, SessInfo, Meta},
  228. {clientinfo(), sessioninfo(), request_meta()},
  229. begin
  230. ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
  231. {'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
  232. Expected =
  233. #{
  234. clientinfo => from_clientinfo(ClientInfo),
  235. meta => Meta
  236. },
  237. ?assertEqual(Expected, Resp),
  238. true
  239. end
  240. ).
  241. prop_session_subscribed() ->
  242. ?ALL(
  243. {ClientInfo, Topic, SubOpts, Meta},
  244. {clientinfo(), topic(), subopts(), request_meta()},
  245. begin
  246. ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
  247. {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
  248. Expected =
  249. #{
  250. topic => Topic,
  251. subopts => subopts(SubOpts),
  252. clientinfo => from_clientinfo(ClientInfo),
  253. meta => Meta
  254. },
  255. ?assertEqual(Expected, Resp),
  256. true
  257. end
  258. ).
  259. prop_session_unsubscribed() ->
  260. ?ALL(
  261. {ClientInfo, Topic, SubOpts, Meta},
  262. {clientinfo(), topic(), subopts(), request_meta()},
  263. begin
  264. ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
  265. {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
  266. Expected =
  267. #{
  268. topic => Topic,
  269. clientinfo => from_clientinfo(ClientInfo),
  270. meta => Meta
  271. },
  272. ?assertEqual(Expected, Resp),
  273. true
  274. end
  275. ).
  276. prop_session_resumed() ->
  277. ?ALL(
  278. {ClientInfo, SessInfo, Meta},
  279. {clientinfo(), sessioninfo(), request_meta()},
  280. begin
  281. ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
  282. {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
  283. Expected =
  284. #{
  285. clientinfo => from_clientinfo(ClientInfo),
  286. meta => Meta
  287. },
  288. ?assertEqual(Expected, Resp),
  289. true
  290. end
  291. ).
  292. prop_session_discared() ->
  293. ?ALL(
  294. {ClientInfo, SessInfo, Meta},
  295. {clientinfo(), sessioninfo(), request_meta()},
  296. begin
  297. ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
  298. {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
  299. Expected =
  300. #{clientinfo => from_clientinfo(ClientInfo), meta => Meta},
  301. ?assertEqual(Expected, Resp),
  302. true
  303. end
  304. ).
  305. prop_session_takenover() ->
  306. ?ALL(
  307. {ClientInfo, SessInfo, Meta},
  308. {clientinfo(), sessioninfo(), request_meta()},
  309. begin
  310. ok = emqx_hooks:run('session.takenover', [ClientInfo, SessInfo]),
  311. {'on_session_takenover', Resp} = emqx_exhook_demo_svr:take(),
  312. Expected =
  313. #{clientinfo => from_clientinfo(ClientInfo), meta => Meta},
  314. ?assertEqual(Expected, Resp),
  315. true
  316. end
  317. ).
  318. prop_session_terminated() ->
  319. ?ALL(
  320. {ClientInfo, Reason, SessInfo, Meta},
  321. {clientinfo(), shutdown_reason(), sessioninfo(), request_meta()},
  322. begin
  323. ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
  324. {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
  325. Expected =
  326. #{
  327. reason => stringfy(Reason),
  328. clientinfo => from_clientinfo(ClientInfo),
  329. meta => Meta
  330. },
  331. ?assertEqual(Expected, Resp),
  332. true
  333. end
  334. ).
  335. prop_message_publish() ->
  336. ?ALL(
  337. {Msg0, Meta},
  338. {message(), request_meta()},
  339. begin
  340. Msg = emqx_message:from_map(
  341. inject_magic_into(from, emqx_message:to_map(Msg0))
  342. ),
  343. OutMsg = emqx_hooks:run_fold('message.publish', [], Msg),
  344. case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
  345. true ->
  346. ?assertEqual(Msg, OutMsg),
  347. skip;
  348. _ ->
  349. ExpectedOutMsg =
  350. case emqx_message:from(Msg) of
  351. <<"baduser">> ->
  352. MsgMap =
  353. #{headers := Headers} =
  354. emqx_message:to_map(Msg),
  355. emqx_message:from_map(
  356. MsgMap#{
  357. qos => 0,
  358. topic => <<"">>,
  359. payload => <<"">>,
  360. headers => maps:put(allow_publish, false, Headers)
  361. }
  362. );
  363. <<"gooduser">> = From ->
  364. MsgMap =
  365. #{headers := Headers} =
  366. emqx_message:to_map(Msg),
  367. emqx_message:from_map(
  368. MsgMap#{
  369. topic => From,
  370. payload => From,
  371. headers => maps:put(allow_publish, true, Headers)
  372. }
  373. );
  374. _ ->
  375. Msg
  376. end,
  377. ?assertEqual(ExpectedOutMsg, OutMsg),
  378. {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(),
  379. Expected =
  380. #{
  381. message => from_message(Msg),
  382. meta => Meta
  383. },
  384. ?assertEqual(Expected, Resp)
  385. end,
  386. true
  387. end
  388. ).
  389. prop_message_dropped() ->
  390. ?ALL(
  391. {Msg, By, Reason, Meta},
  392. {message(), hardcoded, shutdown_reason(), request_meta()},
  393. begin
  394. ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]),
  395. case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
  396. true ->
  397. skip;
  398. _ ->
  399. {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(),
  400. Expected =
  401. #{
  402. reason => stringfy(Reason),
  403. message => from_message(Msg),
  404. meta => Meta
  405. },
  406. ?assertEqual(Expected, Resp)
  407. end,
  408. true
  409. end
  410. ).
  411. prop_message_delivered() ->
  412. ?ALL(
  413. {ClientInfo, Msg, Meta},
  414. {clientinfo(), message(), request_meta()},
  415. begin
  416. ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]),
  417. case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
  418. true ->
  419. skip;
  420. _ ->
  421. {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(),
  422. Expected =
  423. #{
  424. clientinfo => from_clientinfo(ClientInfo),
  425. message => from_message(Msg),
  426. meta => Meta
  427. },
  428. ?assertEqual(Expected, Resp)
  429. end,
  430. true
  431. end
  432. ).
  433. prop_message_acked() ->
  434. ?ALL(
  435. {ClientInfo, Msg, Meta},
  436. {clientinfo(), message(), request_meta()},
  437. begin
  438. ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
  439. case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
  440. true ->
  441. skip;
  442. _ ->
  443. {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(),
  444. Expected =
  445. #{
  446. clientinfo => from_clientinfo(ClientInfo),
  447. message => from_message(Msg),
  448. meta => Meta
  449. },
  450. ?assertEqual(Expected, Resp)
  451. end,
  452. true
  453. end
  454. ).
  455. nodestr() ->
  456. stringfy(node()).
  457. peerhost(#{peername := {Host, _}}) ->
  458. ntoa(Host).
  459. sockport(#{sockname := {_, Port}}) ->
  460. Port.
  461. %% copied from emqx_exhook
  462. ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
  463. list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
  464. ntoa(IP) ->
  465. list_to_binary(inet_parse:ntoa(IP)).
  466. maybe(undefined) -> <<>>;
  467. maybe(B) -> B.
  468. properties(undefined) ->
  469. [];
  470. properties(M) when is_map(M) ->
  471. maps:fold(
  472. fun(K, V, Acc) ->
  473. [
  474. #{
  475. name => stringfy(K),
  476. value => stringfy(V)
  477. }
  478. | Acc
  479. ]
  480. end,
  481. [],
  482. M
  483. ).
  484. topicfilters(Tfs) when is_list(Tfs) ->
  485. [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
  486. %% @private
  487. stringfy(Term) when is_binary(Term) ->
  488. Term;
  489. stringfy(Term) when is_integer(Term) ->
  490. integer_to_binary(Term);
  491. stringfy(Term) when is_atom(Term) ->
  492. atom_to_binary(Term, utf8);
  493. stringfy(Term) when is_list(Term) ->
  494. list_to_binary(Term);
  495. stringfy(Term) ->
  496. unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
  497. subopts(SubOpts) ->
  498. #{
  499. qos => maps:get(qos, SubOpts, 0),
  500. rh => maps:get(rh, SubOpts, 0),
  501. rap => maps:get(rap, SubOpts, 0),
  502. nl => maps:get(nl, SubOpts, 0),
  503. share => maps:get(share, SubOpts, <<>>)
  504. }.
  505. authresult_to_bool(AuthResult) ->
  506. AuthResult == ok.
  507. aclresult_to_bool(Result) ->
  508. Result == allow.
  509. pubsub_to_enum(publish) -> 'PUBLISH';
  510. pubsub_to_enum(subscribe) -> 'SUBSCRIBE'.
  511. from_conninfo(ConnInfo) ->
  512. #{
  513. node => nodestr(),
  514. clientid => maps:get(clientid, ConnInfo),
  515. username => maybe(maps:get(username, ConnInfo, <<>>)),
  516. peerhost => peerhost(ConnInfo),
  517. sockport => sockport(ConnInfo),
  518. proto_name => maps:get(proto_name, ConnInfo),
  519. proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
  520. keepalive => maps:get(keepalive, ConnInfo)
  521. }.
  522. from_clientinfo(ClientInfo) ->
  523. #{
  524. node => nodestr(),
  525. clientid => maps:get(clientid, ClientInfo),
  526. username => maybe(maps:get(username, ClientInfo, <<>>)),
  527. password => maybe(maps:get(password, ClientInfo, <<>>)),
  528. peerhost => ntoa(maps:get(peerhost, ClientInfo)),
  529. sockport => maps:get(sockport, ClientInfo),
  530. protocol => stringfy(maps:get(protocol, ClientInfo)),
  531. mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
  532. is_superuser => maps:get(is_superuser, ClientInfo, false),
  533. anonymous => maps:get(anonymous, ClientInfo, true),
  534. cn => maybe(maps:get(cn, ClientInfo, <<>>)),
  535. dn => maybe(maps:get(dn, ClientInfo, <<>>))
  536. }.
  537. from_message(Msg) ->
  538. #{
  539. node => nodestr(),
  540. id => emqx_guid:to_hexstr(emqx_message:id(Msg)),
  541. qos => emqx_message:qos(Msg),
  542. from => stringfy(emqx_message:from(Msg)),
  543. topic => emqx_message:topic(Msg),
  544. payload => emqx_message:payload(Msg),
  545. timestamp => emqx_message:timestamp(Msg),
  546. headers => emqx_exhook_handler:headers(
  547. emqx_message:get_headers(Msg)
  548. )
  549. }.
  550. %%--------------------------------------------------------------------
  551. %% Helper
  552. %%--------------------------------------------------------------------
  553. do_setup() ->
  554. logger:set_primary_config(#{level => warning}),
  555. ok = ekka:start(),
  556. application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
  557. _ = emqx_exhook_demo_svr:start(),
  558. ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT),
  559. emqx_common_test_helpers:start_apps([emqx_exhook]),
  560. %% waiting first loaded event
  561. {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(),
  562. ok.
  563. do_teardown(_) ->
  564. emqx_common_test_helpers:stop_apps([emqx_exhook]),
  565. %% waiting last unloaded event
  566. {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(),
  567. _ = emqx_exhook_demo_svr:stop(),
  568. logger:set_primary_config(#{level => notice}),
  569. timer:sleep(2000),
  570. ok.
  571. %%--------------------------------------------------------------------
  572. %% Generators
  573. %%--------------------------------------------------------------------
  574. conn_properties() ->
  575. #{}.
  576. ack_properties() ->
  577. #{}.
  578. sub_properties() ->
  579. #{}.
  580. unsub_properties() ->
  581. #{}.
  582. shutdown_reason() ->
  583. oneof([utf8(), {shutdown, emqx_proper_types:limited_atom()}]).
  584. authresult() ->
  585. ?LET(
  586. RC,
  587. connack_return_code(),
  588. case RC of
  589. success -> ok;
  590. _ -> {error, RC}
  591. end
  592. ).
  593. inject_magic_into(Key, Object) ->
  594. case castspell() of
  595. muggles -> Object;
  596. Spell -> Object#{Key => Spell}
  597. end.
  598. castspell() ->
  599. L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles],
  600. lists:nth(rand:uniform(length(L)), L).
  601. request_meta() ->
  602. #{
  603. node => nodestr(),
  604. version => stringfy(emqx_sys:version()),
  605. sysdescr => stringfy(emqx_sys:sysdescr()),
  606. cluster_name => ?DEFAULT_CLUSTER_NAME_BIN
  607. }.