prop_webhook_hooks.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(prop_webhook_hooks).
  17. -include_lib("proper/include/proper.hrl").
  18. -import(emqx_ct_proper_types,
  19. [ conninfo/0
  20. , clientinfo/0
  21. , sessioninfo/0
  22. , message/0
  23. , connack_return_code/0
  24. , topictab/0
  25. , topic/0
  26. , subopts/0
  27. ]).
  28. -define(ALL(Vars, Types, Exprs),
  29. ?SETUP(fun() ->
  30. State = do_setup(),
  31. fun() -> do_teardown(State) end
  32. end, ?FORALL(Vars, Types, Exprs))).
  33. %%--------------------------------------------------------------------
  34. %% Properties
  35. %%--------------------------------------------------------------------
  36. prop_client_connect() ->
  37. ?ALL({ConnInfo, ConnProps, Env},
  38. {conninfo(), conn_properties(), empty_env()},
  39. begin
  40. ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env),
  41. Body = receive_http_request_body(),
  42. Body = emqx_json:encode(
  43. #{action => client_connect,
  44. node => stringfy(node()),
  45. clientid => maps:get(clientid, ConnInfo),
  46. username => maybe(maps:get(username, ConnInfo)),
  47. ipaddress => peer2addr(maps:get(peername, ConnInfo)),
  48. keepalive => maps:get(keepalive, ConnInfo),
  49. proto_ver => maps:get(proto_ver, ConnInfo),
  50. connected_at => maps:get(connected_at, ConnInfo)
  51. }),
  52. true
  53. end).
  54. prop_client_connack() ->
  55. ?ALL({ConnInfo, Rc, AckProps, Env},
  56. {conninfo(), connack_return_code(), ack_properties(), empty_env()},
  57. begin
  58. ok = emqx_web_hook:on_client_connack(ConnInfo, Rc, AckProps, Env),
  59. Body = receive_http_request_body(),
  60. Body = emqx_json:encode(
  61. #{action => client_connack,
  62. node => stringfy(node()),
  63. clientid => maps:get(clientid, ConnInfo),
  64. username => maybe(maps:get(username, ConnInfo)),
  65. ipaddress => peer2addr(maps:get(peername, ConnInfo)),
  66. keepalive => maps:get(keepalive, ConnInfo),
  67. proto_ver => maps:get(proto_ver, ConnInfo),
  68. connected_at => maps:get(connected_at, ConnInfo),
  69. conn_ack => Rc
  70. }),
  71. true
  72. end).
  73. prop_client_connected() ->
  74. ?ALL({ClientInfo, ConnInfo, Env},
  75. {clientinfo(), conninfo(), empty_env()},
  76. begin
  77. ok = emqx_web_hook:on_client_connected(ClientInfo, ConnInfo, Env),
  78. Body = receive_http_request_body(),
  79. Body = emqx_json:encode(
  80. #{action => client_connected,
  81. node => stringfy(node()),
  82. clientid => maps:get(clientid, ClientInfo),
  83. username => maybe(maps:get(username, ClientInfo)),
  84. ipaddress => peer2addr(maps:get(peerhost, ClientInfo)),
  85. keepalive => maps:get(keepalive, ConnInfo),
  86. proto_ver => maps:get(proto_ver, ConnInfo),
  87. connected_at => maps:get(connected_at, ConnInfo)
  88. }),
  89. true
  90. end).
  91. prop_client_disconnected() ->
  92. ?ALL({ClientInfo, Reason, ConnInfo, Env},
  93. {clientinfo(), shutdown_reason(), disconnected_conninfo(), empty_env()},
  94. begin
  95. ok = emqx_web_hook:on_client_disconnected(ClientInfo, Reason, ConnInfo, Env),
  96. Body = receive_http_request_body(),
  97. Body = emqx_json:encode(
  98. #{action => client_disconnected,
  99. node => stringfy(node()),
  100. clientid => maps:get(clientid, ClientInfo),
  101. username => maybe(maps:get(username, ClientInfo)),
  102. connected_at => maps:get(connected_at, ConnInfo),
  103. disconnected_at => maps:get(disconnected_at, ConnInfo),
  104. reason => stringfy(Reason)
  105. }),
  106. true
  107. end).
  108. prop_client_subscribe() ->
  109. ?ALL({ClientInfo, SubProps, TopicTab, Env},
  110. {clientinfo(), sub_properties(), topictab(), topic_filter_env()},
  111. begin
  112. ok = emqx_web_hook:on_client_subscribe(ClientInfo, SubProps, TopicTab, Env),
  113. Matched = filter_topictab(TopicTab, Env),
  114. lists:foreach(fun({Topic, Opts}) ->
  115. Body = receive_http_request_body(),
  116. Body = emqx_json:encode(
  117. #{action => client_subscribe,
  118. node => stringfy(node()),
  119. clientid => maps:get(clientid, ClientInfo),
  120. username => maybe(maps:get(username, ClientInfo)),
  121. topic => Topic,
  122. opts => Opts})
  123. end, Matched),
  124. true
  125. end).
  126. prop_client_unsubscribe() ->
  127. ?ALL({ClientInfo, SubProps, TopicTab, Env},
  128. {clientinfo(), unsub_properties(), topictab(), topic_filter_env()},
  129. begin
  130. ok = emqx_web_hook:on_client_unsubscribe(ClientInfo, SubProps, TopicTab, Env),
  131. Matched = filter_topictab(TopicTab, Env),
  132. lists:foreach(fun({Topic, Opts}) ->
  133. Body = receive_http_request_body(),
  134. Body = emqx_json:encode(
  135. #{action => client_unsubscribe,
  136. node => stringfy(node()),
  137. clientid => maps:get(clientid, ClientInfo),
  138. username => maybe(maps:get(username, ClientInfo)),
  139. topic => Topic,
  140. opts => Opts})
  141. end, Matched),
  142. true
  143. end).
  144. prop_session_subscribed() ->
  145. ?ALL({ClientInfo, Topic, SubOpts, Env},
  146. {clientinfo(), topic(), subopts(), topic_filter_env()},
  147. begin
  148. ok = emqx_web_hook:on_session_subscribed(ClientInfo, Topic, SubOpts, Env),
  149. filter_topic_match(Topic, Env) andalso begin
  150. Body = receive_http_request_body(),
  151. Body1 = emqx_json:encode(
  152. #{action => session_subscribed,
  153. node => stringfy(node()),
  154. clientid => maps:get(clientid, ClientInfo),
  155. username => maybe(maps:get(username, ClientInfo)),
  156. topic => Topic,
  157. opts => SubOpts
  158. }),
  159. Body = Body1
  160. end,
  161. true
  162. end).
  163. prop_session_unsubscribed() ->
  164. ?ALL({ClientInfo, Topic, SubOpts, Env},
  165. {clientinfo(), topic(), subopts(), empty_env()},
  166. begin
  167. ok = emqx_web_hook:on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env),
  168. filter_topic_match(Topic, Env) andalso begin
  169. Body = receive_http_request_body(),
  170. Body = emqx_json:encode(
  171. #{action => session_unsubscribed,
  172. node => stringfy(node()),
  173. clientid => maps:get(clientid, ClientInfo),
  174. username => maybe(maps:get(username, ClientInfo)),
  175. topic => Topic
  176. })
  177. end,
  178. true
  179. end).
  180. prop_session_terminated() ->
  181. ?ALL({ClientInfo, Reason, SessInfo, Env},
  182. {clientinfo(), shutdown_reason(), sessioninfo(), empty_env()},
  183. begin
  184. ok = emqx_web_hook:on_session_terminated(ClientInfo, Reason, SessInfo, Env),
  185. Body = receive_http_request_body(),
  186. Body = emqx_json:encode(
  187. #{action => session_terminated,
  188. node => stringfy(node()),
  189. clientid => maps:get(clientid, ClientInfo),
  190. username => maybe(maps:get(username, ClientInfo)),
  191. reason => stringfy(Reason)
  192. }),
  193. true
  194. end).
  195. prop_message_publish() ->
  196. ?ALL({Msg, Env, Encode}, {message(), topic_filter_env(), payload_encode()},
  197. begin
  198. application:set_env(emqx_web_hook, encoding_of_payload_field, Encode),
  199. {ok, Msg} = emqx_web_hook:on_message_publish(Msg, Env),
  200. application:unset_env(emqx_web_hook, encoding_of_payload_field),
  201. (not emqx_message:is_sys(Msg))
  202. andalso filter_topic_match(emqx_message:topic(Msg), Env)
  203. andalso begin
  204. Body = receive_http_request_body(),
  205. Body = emqx_json:encode(
  206. #{action => message_publish,
  207. node => stringfy(node()),
  208. from_client_id => emqx_message:from(Msg),
  209. from_username => maybe(emqx_message:get_header(username, Msg)),
  210. topic => emqx_message:topic(Msg),
  211. qos => emqx_message:qos(Msg),
  212. retain => emqx_message:get_flag(retain, Msg),
  213. payload => encode(emqx_message:payload(Msg), Encode),
  214. ts => emqx_message:timestamp(Msg)
  215. })
  216. end,
  217. true
  218. end).
  219. prop_message_delivered() ->
  220. ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env(), payload_encode()},
  221. begin
  222. application:set_env(emqx_web_hook, encoding_of_payload_field, Encode),
  223. ok = emqx_web_hook:on_message_delivered(ClientInfo, Msg, Env),
  224. application:unset_env(emqx_web_hook, encoding_of_payload_field),
  225. (not emqx_message:is_sys(Msg))
  226. andalso filter_topic_match(emqx_message:topic(Msg), Env)
  227. andalso begin
  228. Body = receive_http_request_body(),
  229. Body = emqx_json:encode(
  230. #{action => message_delivered,
  231. node => stringfy(node()),
  232. clientid => maps:get(clientid, ClientInfo),
  233. username => maybe(maps:get(username, ClientInfo)),
  234. from_client_id => emqx_message:from(Msg),
  235. from_username => maybe(emqx_message:get_header(username, Msg)),
  236. topic => emqx_message:topic(Msg),
  237. qos => emqx_message:qos(Msg),
  238. retain => emqx_message:get_flag(retain, Msg),
  239. payload => encode(emqx_message:payload(Msg), Encode),
  240. ts => emqx_message:timestamp(Msg)
  241. })
  242. end,
  243. true
  244. end).
  245. prop_message_acked() ->
  246. ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), empty_env(), payload_encode()},
  247. begin
  248. application:set_env(emqx_web_hook, encoding_of_payload_field, Encode),
  249. ok = emqx_web_hook:on_message_acked(ClientInfo, Msg, Env),
  250. application:unset_env(emqx_web_hook, encoding_of_payload_field),
  251. (not emqx_message:is_sys(Msg))
  252. andalso filter_topic_match(emqx_message:topic(Msg), Env)
  253. andalso begin
  254. Body = receive_http_request_body(),
  255. Body = emqx_json:encode(
  256. #{action => message_acked,
  257. node => stringfy(node()),
  258. clientid => maps:get(clientid, ClientInfo),
  259. username => maybe(maps:get(username, ClientInfo)),
  260. from_client_id => emqx_message:from(Msg),
  261. from_username => maybe(emqx_message:get_header(username, Msg)),
  262. topic => emqx_message:topic(Msg),
  263. qos => emqx_message:qos(Msg),
  264. retain => emqx_message:get_flag(retain, Msg),
  265. payload => encode(emqx_message:payload(Msg), Encode),
  266. ts => emqx_message:timestamp(Msg)
  267. })
  268. end,
  269. true
  270. end).
  271. %%--------------------------------------------------------------------
  272. %% Helper
  273. %%--------------------------------------------------------------------
  274. do_setup() ->
  275. %% Pre-defined envs
  276. application:set_env(emqx_web_hook, path, "path"),
  277. application:set_env(emqx_web_hook, headers, []),
  278. meck:new(ehttpc_pool, [passthrough, no_history]),
  279. meck:expect(ehttpc_pool, pick_worker, fun(_, _) -> ok end),
  280. Self = self(),
  281. meck:new(ehttpc, [passthrough, no_history]),
  282. meck:expect(ehttpc, request,
  283. fun(_ClientId, Method, {Path, Headers, Body}) ->
  284. Self ! {Method, Path, Headers, Body}, {ok, 200, ok}
  285. end),
  286. meck:new(emqx_metrics, [passthrough, no_history]),
  287. meck:expect(emqx_metrics, inc, fun(_) -> ok end),
  288. ok.
  289. do_teardown(_) ->
  290. meck:unload(ehttpc_pool),
  291. meck:unload(ehttpc),
  292. meck:unload(emqx_metrics).
  293. maybe(undefined) -> null;
  294. maybe(T) -> T.
  295. peer2addr({Host, _}) ->
  296. list_to_binary(inet:ntoa(Host));
  297. peer2addr(Host) ->
  298. list_to_binary(inet:ntoa(Host)).
  299. stringfy({shutdown, Reason}) ->
  300. stringfy(Reason);
  301. stringfy(Term) when is_binary(Term) ->
  302. Term;
  303. stringfy(Term) when is_atom(Term) ->
  304. atom_to_binary(Term, utf8);
  305. stringfy(Term) ->
  306. unicode:characters_to_binary(io_lib:format("~0p", [Term])).
  307. receive_http_request_body() ->
  308. receive
  309. {post, _, _, Body} ->
  310. Body
  311. after 100 ->
  312. exit(waiting_message_timeout)
  313. end.
  314. filter_topictab(TopicTab, {undefined}) ->
  315. TopicTab;
  316. filter_topictab(TopicTab, {TopicFilter}) ->
  317. lists:filter(fun({Topic, _}) -> emqx_topic:match(Topic, TopicFilter) end, TopicTab).
  318. filter_topic_match(_Topic, {undefined}) ->
  319. true;
  320. filter_topic_match(Topic, {TopicFilter}) ->
  321. emqx_topic:match(Topic, TopicFilter).
  322. encode(Bin, base64) ->
  323. base64:encode(Bin);
  324. encode(Bin, base62) ->
  325. emqx_base62:encode(Bin);
  326. encode(Bin, _) ->
  327. Bin.
  328. %%--------------------------------------------------------------------
  329. %% Generators
  330. %%--------------------------------------------------------------------
  331. conn_properties() ->
  332. #{}.
  333. ack_properties() ->
  334. #{}.
  335. sub_properties() ->
  336. #{}.
  337. unsub_properties() ->
  338. #{}.
  339. shutdown_reason() ->
  340. oneof([disconnected, not_autherised,
  341. "list_reason", <<"binary_reason">>,
  342. {tuple, reason},
  343. {shutdown, emqx_ct_proper_types:limited_atom()}]).
  344. empty_env() ->
  345. {undefined}.
  346. topic_filter_env() ->
  347. oneof([{<<"#">>}, {undefined}, {topic()}]).
  348. payload_encode() ->
  349. oneof([base62, base64, plain]).
  350. disconnected_conninfo() ->
  351. ?LET(Info, conninfo(),
  352. begin
  353. Info#{disconnected_at => erlang:system_time(millisecond)}
  354. end).