prop_webhook_hooks.erl 16 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(prop_webhook_hooks).
  17. -include_lib("proper/include/proper.hrl").
  18. -import(emqx_ct_proper_types,
  19. [ conninfo/0
  20. , clientinfo/0
  21. , sessioninfo/0
  22. , message/0
  23. , connack_return_code/0
  24. , topictab/0
  25. , topic/0
  26. , subopts/0
  27. ]).
  28. -define(ALL(Vars, Types, Exprs),
  29. ?SETUP(fun() ->
  30. State = do_setup(),
  31. fun() -> do_teardown(State) end
  32. end, ?FORALL(Vars, Types, Exprs))).
  33. %%--------------------------------------------------------------------
  34. %% Properties
  35. %%--------------------------------------------------------------------
  36. prop_client_connect() ->
  37. ?ALL({ConnInfo, ConnProps, Env},
  38. {conninfo(), conn_properties(), empty_env()},
  39. begin
  40. ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env),
  41. Body = receive_http_request_body(),
  42. Body = emqx_json:encode(
  43. #{action => client_connect,
  44. node => stringfy(node()),
  45. clientid => maps:get(clientid, ConnInfo),
  46. username => maybe(maps:get(username, ConnInfo)),
  47. ipaddress => peer2addr(maps:get(peername, ConnInfo)),
  48. keepalive => maps:get(keepalive, ConnInfo),
  49. proto_ver => maps:get(proto_ver, ConnInfo)
  50. }),
  51. true
  52. end).
  53. prop_client_connack() ->
  54. ?ALL({ConnInfo, Rc, AckProps, Env},
  55. {conninfo(), connack_return_code(), ack_properties(), empty_env()},
  56. begin
  57. ok = emqx_web_hook:on_client_connack(ConnInfo, Rc, AckProps, Env),
  58. Body = receive_http_request_body(),
  59. Body = emqx_json:encode(
  60. #{action => client_connack,
  61. node => stringfy(node()),
  62. clientid => maps:get(clientid, ConnInfo),
  63. username => maybe(maps:get(username, ConnInfo)),
  64. ipaddress => peer2addr(maps:get(peername, ConnInfo)),
  65. keepalive => maps:get(keepalive, ConnInfo),
  66. proto_ver => maps:get(proto_ver, ConnInfo),
  67. connected_at => maps:get(connected_at, ConnInfo),
  68. conn_ack => Rc
  69. }),
  70. true
  71. end).
  72. prop_client_connected() ->
  73. ?ALL({ClientInfo, ConnInfo, Env},
  74. {clientinfo(), conninfo(), empty_env()},
  75. begin
  76. ok = emqx_web_hook:on_client_connected(ClientInfo, ConnInfo, Env),
  77. Body = receive_http_request_body(),
  78. Body = emqx_json:encode(
  79. #{action => client_connected,
  80. node => stringfy(node()),
  81. clientid => maps:get(clientid, ClientInfo),
  82. username => maybe(maps:get(username, ClientInfo)),
  83. ipaddress => peer2addr(maps:get(peerhost, ClientInfo)),
  84. keepalive => maps:get(keepalive, ConnInfo),
  85. proto_ver => maps:get(proto_ver, ConnInfo),
  86. connected_at => maps:get(connected_at, ConnInfo)
  87. }),
  88. true
  89. end).
  90. prop_client_disconnected() ->
  91. ?ALL({ClientInfo, Reason, ConnInfo, Env},
  92. {clientinfo(), shutdown_reason(), disconnected_conninfo(), empty_env()},
  93. begin
  94. ok = emqx_web_hook:on_client_disconnected(ClientInfo, Reason, ConnInfo, Env),
  95. Body = receive_http_request_body(),
  96. Body = emqx_json:encode(
  97. #{action => client_disconnected,
  98. node => stringfy(node()),
  99. clientid => maps:get(clientid, ClientInfo),
  100. username => maybe(maps:get(username, ClientInfo)),
  101. connected_at => maps:get(connected_at, ConnInfo),
  102. disconnected_at => maps:get(disconnected_at, ConnInfo),
  103. reason => stringfy(Reason)
  104. }),
  105. true
  106. end).
  107. prop_client_subscribe() ->
  108. ?ALL({ClientInfo, SubProps, TopicTab, Env},
  109. {clientinfo(), sub_properties(), topictab(), topic_filter_env()},
  110. begin
  111. ok = emqx_web_hook:on_client_subscribe(ClientInfo, SubProps, TopicTab, Env),
  112. Matched = filter_topictab(TopicTab, Env),
  113. lists:foreach(fun({Topic, Opts}) ->
  114. Body = receive_http_request_body(),
  115. Body = emqx_json:encode(
  116. #{action => client_subscribe,
  117. node => stringfy(node()),
  118. clientid => maps:get(clientid, ClientInfo),
  119. username => maybe(maps:get(username, ClientInfo)),
  120. topic => Topic,
  121. opts => Opts})
  122. end, Matched),
  123. true
  124. end).
  125. prop_client_unsubscribe() ->
  126. ?ALL({ClientInfo, SubProps, TopicTab, Env},
  127. {clientinfo(), unsub_properties(), topictab(), topic_filter_env()},
  128. begin
  129. ok = emqx_web_hook:on_client_unsubscribe(ClientInfo, SubProps, TopicTab, Env),
  130. Matched = filter_topictab(TopicTab, Env),
  131. lists:foreach(fun({Topic, Opts}) ->
  132. Body = receive_http_request_body(),
  133. Body = emqx_json:encode(
  134. #{action => client_unsubscribe,
  135. node => stringfy(node()),
  136. clientid => maps:get(clientid, ClientInfo),
  137. username => maybe(maps:get(username, ClientInfo)),
  138. topic => Topic,
  139. opts => Opts})
  140. end, Matched),
  141. true
  142. end).
  143. prop_session_subscribed() ->
  144. ?ALL({ClientInfo, Topic, SubOpts, Env},
  145. {clientinfo(), topic(), subopts(), topic_filter_env()},
  146. begin
  147. ok = emqx_web_hook:on_session_subscribed(ClientInfo, Topic, SubOpts, Env),
  148. filter_topic_match(Topic, Env) andalso begin
  149. Body = receive_http_request_body(),
  150. Body1 = emqx_json:encode(
  151. #{action => session_subscribed,
  152. node => stringfy(node()),
  153. clientid => maps:get(clientid, ClientInfo),
  154. username => maybe(maps:get(username, ClientInfo)),
  155. topic => Topic,
  156. opts => SubOpts
  157. }),
  158. Body = Body1
  159. end,
  160. true
  161. end).
  162. prop_session_unsubscribed() ->
  163. ?ALL({ClientInfo, Topic, SubOpts, Env},
  164. {clientinfo(), topic(), subopts(), empty_env()},
  165. begin
  166. ok = emqx_web_hook:on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env),
  167. filter_topic_match(Topic, Env) andalso begin
  168. Body = receive_http_request_body(),
  169. Body = emqx_json:encode(
  170. #{action => session_unsubscribed,
  171. node => stringfy(node()),
  172. clientid => maps:get(clientid, ClientInfo),
  173. username => maybe(maps:get(username, ClientInfo)),
  174. topic => Topic
  175. })
  176. end,
  177. true
  178. end).
  179. prop_session_terminated() ->
  180. ?ALL({ClientInfo, Reason, SessInfo, Env},
  181. {clientinfo(), shutdown_reason(), sessioninfo(), empty_env()},
  182. begin
  183. ok = emqx_web_hook:on_session_terminated(ClientInfo, Reason, SessInfo, Env),
  184. Body = receive_http_request_body(),
  185. Body = emqx_json:encode(
  186. #{action => session_terminated,
  187. node => stringfy(node()),
  188. clientid => maps:get(clientid, ClientInfo),
  189. username => maybe(maps:get(username, ClientInfo)),
  190. reason => stringfy(Reason)
  191. }),
  192. true
  193. end).
  194. prop_message_publish() ->
  195. ?ALL({Msg, Env, Encode}, {message(), topic_filter_env(), payload_encode()},
  196. begin
  197. application:set_env(emqx_web_hook, encoding_of_payload_field, Encode),
  198. {ok, Msg} = emqx_web_hook:on_message_publish(Msg, Env),
  199. application:unset_env(emqx_web_hook, encoding_of_payload_field),
  200. (not emqx_message:is_sys(Msg))
  201. andalso filter_topic_match(emqx_message:topic(Msg), Env)
  202. andalso begin
  203. Body = receive_http_request_body(),
  204. Body = emqx_json:encode(
  205. #{action => message_publish,
  206. node => stringfy(node()),
  207. from_client_id => emqx_message:from(Msg),
  208. from_username => maybe(emqx_message:get_header(username, Msg)),
  209. topic => emqx_message:topic(Msg),
  210. qos => emqx_message:qos(Msg),
  211. retain => emqx_message:get_flag(retain, Msg),
  212. payload => encode(emqx_message:payload(Msg), Encode),
  213. ts => emqx_message:timestamp(Msg)
  214. })
  215. end,
  216. true
  217. end).
  218. prop_message_delivered() ->
  219. ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env(), payload_encode()},
  220. begin
  221. application:set_env(emqx_web_hook, encoding_of_payload_field, Encode),
  222. ok = emqx_web_hook:on_message_delivered(ClientInfo, Msg, Env),
  223. application:unset_env(emqx_web_hook, encoding_of_payload_field),
  224. (not emqx_message:is_sys(Msg))
  225. andalso filter_topic_match(emqx_message:topic(Msg), Env)
  226. andalso begin
  227. Body = receive_http_request_body(),
  228. Body = emqx_json:encode(
  229. #{action => message_delivered,
  230. node => stringfy(node()),
  231. clientid => maps:get(clientid, ClientInfo),
  232. username => maybe(maps:get(username, ClientInfo)),
  233. from_client_id => emqx_message:from(Msg),
  234. from_username => maybe(emqx_message:get_header(username, Msg)),
  235. topic => emqx_message:topic(Msg),
  236. qos => emqx_message:qos(Msg),
  237. retain => emqx_message:get_flag(retain, Msg),
  238. payload => encode(emqx_message:payload(Msg), Encode),
  239. ts => emqx_message:timestamp(Msg)
  240. })
  241. end,
  242. true
  243. end).
  244. prop_message_acked() ->
  245. ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), empty_env(), payload_encode()},
  246. begin
  247. application:set_env(emqx_web_hook, encoding_of_payload_field, Encode),
  248. ok = emqx_web_hook:on_message_acked(ClientInfo, Msg, Env),
  249. application:unset_env(emqx_web_hook, encoding_of_payload_field),
  250. (not emqx_message:is_sys(Msg))
  251. andalso filter_topic_match(emqx_message:topic(Msg), Env)
  252. andalso begin
  253. Body = receive_http_request_body(),
  254. Body = emqx_json:encode(
  255. #{action => message_acked,
  256. node => stringfy(node()),
  257. clientid => maps:get(clientid, ClientInfo),
  258. username => maybe(maps:get(username, ClientInfo)),
  259. from_client_id => emqx_message:from(Msg),
  260. from_username => maybe(emqx_message:get_header(username, Msg)),
  261. topic => emqx_message:topic(Msg),
  262. qos => emqx_message:qos(Msg),
  263. retain => emqx_message:get_flag(retain, Msg),
  264. payload => encode(emqx_message:payload(Msg), Encode),
  265. ts => emqx_message:timestamp(Msg)
  266. })
  267. end,
  268. true
  269. end).
  270. %%--------------------------------------------------------------------
  271. %% Helper
  272. %%--------------------------------------------------------------------
  273. do_setup() ->
  274. %% Pre-defined envs
  275. application:set_env(emqx_web_hook, path, "path"),
  276. application:set_env(emqx_web_hook, headers, []),
  277. meck:new(ehttpc_pool, [passthrough, no_history]),
  278. meck:expect(ehttpc_pool, pick_worker, fun(_, _) -> ok end),
  279. Self = self(),
  280. meck:new(ehttpc, [passthrough, no_history]),
  281. meck:expect(ehttpc, request,
  282. fun(_ClientId, Method, {Path, Headers, Body}) ->
  283. Self ! {Method, Path, Headers, Body}, {ok, 200, ok}
  284. end),
  285. meck:new(emqx_metrics, [passthrough, no_history]),
  286. meck:expect(emqx_metrics, inc, fun(_) -> ok end),
  287. ok.
  288. do_teardown(_) ->
  289. meck:unload(ehttpc_pool),
  290. meck:unload(ehttpc),
  291. meck:unload(emqx_metrics).
  292. maybe(undefined) -> null;
  293. maybe(T) -> T.
  294. peer2addr({Host, _}) ->
  295. list_to_binary(inet:ntoa(Host));
  296. peer2addr(Host) ->
  297. list_to_binary(inet:ntoa(Host)).
  298. stringfy({shutdown, Reason}) ->
  299. stringfy(Reason);
  300. stringfy(Term) when is_binary(Term) ->
  301. Term;
  302. stringfy(Term) when is_atom(Term) ->
  303. atom_to_binary(Term, utf8);
  304. stringfy(Term) ->
  305. unicode:characters_to_binary(io_lib:format("~0p", [Term])).
  306. receive_http_request_body() ->
  307. receive
  308. {post, _, _, Body} ->
  309. Body
  310. after 100 ->
  311. exit(waiting_message_timeout)
  312. end.
  313. filter_topictab(TopicTab, {undefined}) ->
  314. TopicTab;
  315. filter_topictab(TopicTab, {TopicFilter}) ->
  316. lists:filter(fun({Topic, _}) -> emqx_topic:match(Topic, TopicFilter) end, TopicTab).
  317. filter_topic_match(_Topic, {undefined}) ->
  318. true;
  319. filter_topic_match(Topic, {TopicFilter}) ->
  320. emqx_topic:match(Topic, TopicFilter).
  321. encode(Bin, base64) ->
  322. base64:encode(Bin);
  323. encode(Bin, base62) ->
  324. emqx_base62:encode(Bin);
  325. encode(Bin, _) ->
  326. Bin.
  327. %%--------------------------------------------------------------------
  328. %% Generators
  329. %%--------------------------------------------------------------------
  330. conn_properties() ->
  331. #{}.
  332. ack_properties() ->
  333. #{}.
  334. sub_properties() ->
  335. #{}.
  336. unsub_properties() ->
  337. #{}.
  338. shutdown_reason() ->
  339. oneof([disconnected, not_autherised,
  340. "list_reason", <<"binary_reason">>,
  341. {tuple, reason},
  342. {shutdown, emqx_ct_proper_types:limited_atom()}]).
  343. empty_env() ->
  344. {undefined}.
  345. topic_filter_env() ->
  346. oneof([{<<"#">>}, {undefined}, {topic()}]).
  347. payload_encode() ->
  348. oneof([base62, base64, plain]).
  349. disconnected_conninfo() ->
  350. ?LET(Info, conninfo(),
  351. begin
  352. Info#{disconnected_at => erlang:system_time(millisecond)}
  353. end).