prop_webhook_hooks.erl 17 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(prop_webhook_hooks).
  17. -include_lib("proper/include/proper.hrl").
  18. -import(emqx_ct_proper_types,
  19. [ conninfo/0
  20. , clientinfo/0
  21. , sessioninfo/0
  22. , message/0
  23. , connack_return_code/0
  24. , topictab/0
  25. , topic/0
  26. , subopts/0
  27. ]).
  28. -define(ALL(Vars, Types, Exprs),
  29. ?SETUP(fun() ->
  30. State = do_setup(),
  31. fun() -> do_teardown(State) end
  32. end, ?FORALL(Vars, Types, Exprs))).
  33. %%--------------------------------------------------------------------
  34. %% Properties
  35. %%--------------------------------------------------------------------
  36. prop_client_connect() ->
  37. ?ALL({ConnInfo, ConnProps, Env},
  38. {conninfo(), conn_properties(), empty_env()},
  39. begin
  40. ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env),
  41. Body = receive_http_request_body(),
  42. Body = emqx_json:encode(
  43. #{action => client_connect,
  44. node => stringfy(node()),
  45. clientid => maps:get(clientid, ConnInfo),
  46. username => maybe(maps:get(username, ConnInfo)),
  47. ipaddress => peer2addr(maps:get(peername, ConnInfo)),
  48. keepalive => maps:get(keepalive, ConnInfo),
  49. proto_ver => maps:get(proto_ver, ConnInfo)
  50. }),
  51. true
  52. end).
  53. prop_client_connack() ->
  54. ?ALL({ConnInfo, Rc, AckProps, Env},
  55. {conninfo(), connack_return_code(), ack_properties(), empty_env()},
  56. begin
  57. ok = emqx_web_hook:on_client_connack(ConnInfo, Rc, AckProps, Env),
  58. Body = receive_http_request_body(),
  59. Body = emqx_json:encode(
  60. #{action => client_connack,
  61. node => stringfy(node()),
  62. clientid => maps:get(clientid, ConnInfo),
  63. username => maybe(maps:get(username, ConnInfo)),
  64. ipaddress => peer2addr(maps:get(peername, ConnInfo)),
  65. keepalive => maps:get(keepalive, ConnInfo),
  66. proto_ver => maps:get(proto_ver, ConnInfo),
  67. conn_ack => Rc
  68. }),
  69. true
  70. end).
  71. prop_client_connected() ->
  72. ?ALL({ClientInfo, ConnInfo, Env},
  73. {clientinfo(), conninfo(), empty_env()},
  74. begin
  75. ok = emqx_web_hook:on_client_connected(ClientInfo, ConnInfo, Env),
  76. Body = receive_http_request_body(),
  77. Body = emqx_json:encode(
  78. #{action => client_connected,
  79. node => stringfy(node()),
  80. clientid => maps:get(clientid, ClientInfo),
  81. username => maybe(maps:get(username, ClientInfo)),
  82. ipaddress => peer2addr(maps:get(peerhost, ClientInfo)),
  83. keepalive => maps:get(keepalive, ConnInfo),
  84. proto_ver => maps:get(proto_ver, ConnInfo),
  85. connected_at => maps:get(connected_at, ConnInfo)
  86. }),
  87. true
  88. end).
  89. prop_client_disconnected() ->
  90. ?ALL({ClientInfo, Reason, ConnInfo, Env},
  91. {clientinfo(), shutdown_reason(), disconnected_conninfo(), empty_env()},
  92. begin
  93. ok = emqx_web_hook:on_client_disconnected(ClientInfo, Reason, ConnInfo, Env),
  94. Body = receive_http_request_body(),
  95. Body = emqx_json:encode(
  96. #{action => client_disconnected,
  97. node => stringfy(node()),
  98. clientid => maps:get(clientid, ClientInfo),
  99. username => maybe(maps:get(username, ClientInfo)),
  100. disconnected_at => maps:get(disconnected_at, ConnInfo),
  101. reason => stringfy(Reason)
  102. }),
  103. true
  104. end).
  105. prop_client_subscribe() ->
  106. ?ALL({ClientInfo, SubProps, TopicTab, Env},
  107. {clientinfo(), sub_properties(), topictab(), topic_filter_env()},
  108. begin
  109. ok = emqx_web_hook:on_client_subscribe(ClientInfo, SubProps, TopicTab, Env),
  110. Matched = filter_topictab(TopicTab, Env),
  111. lists:foreach(fun({Topic, Opts}) ->
  112. Body = receive_http_request_body(),
  113. Body = emqx_json:encode(
  114. #{action => client_subscribe,
  115. node => stringfy(node()),
  116. clientid => maps:get(clientid, ClientInfo),
  117. username => maybe(maps:get(username, ClientInfo)),
  118. topic => Topic,
  119. opts => Opts})
  120. end, Matched),
  121. true
  122. end).
  123. prop_client_unsubscribe() ->
  124. ?ALL({ClientInfo, SubProps, TopicTab, Env},
  125. {clientinfo(), unsub_properties(), topictab(), topic_filter_env()},
  126. begin
  127. ok = emqx_web_hook:on_client_unsubscribe(ClientInfo, SubProps, TopicTab, Env),
  128. Matched = filter_topictab(TopicTab, Env),
  129. lists:foreach(fun({Topic, Opts}) ->
  130. Body = receive_http_request_body(),
  131. Body = emqx_json:encode(
  132. #{action => client_unsubscribe,
  133. node => stringfy(node()),
  134. clientid => maps:get(clientid, ClientInfo),
  135. username => maybe(maps:get(username, ClientInfo)),
  136. topic => Topic,
  137. opts => Opts})
  138. end, Matched),
  139. true
  140. end).
  141. prop_session_subscribed() ->
  142. ?ALL({ClientInfo, Topic, SubOpts, Env},
  143. {clientinfo(), topic(), subopts(), topic_filter_env()},
  144. begin
  145. ok = emqx_web_hook:on_session_subscribed(ClientInfo, Topic, SubOpts, Env),
  146. filter_topic_match(Topic, Env) andalso begin
  147. Body = receive_http_request_body(),
  148. Body1 = emqx_json:encode(
  149. #{action => session_subscribed,
  150. node => stringfy(node()),
  151. clientid => maps:get(clientid, ClientInfo),
  152. username => maybe(maps:get(username, ClientInfo)),
  153. topic => Topic,
  154. opts => SubOpts
  155. }),
  156. Body = Body1
  157. end,
  158. true
  159. end).
  160. prop_session_unsubscribed() ->
  161. ?ALL({ClientInfo, Topic, SubOpts, Env},
  162. {clientinfo(), topic(), subopts(), empty_env()},
  163. begin
  164. ok = emqx_web_hook:on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env),
  165. filter_topic_match(Topic, Env) andalso begin
  166. Body = receive_http_request_body(),
  167. Body = emqx_json:encode(
  168. #{action => session_unsubscribed,
  169. node => stringfy(node()),
  170. clientid => maps:get(clientid, ClientInfo),
  171. username => maybe(maps:get(username, ClientInfo)),
  172. topic => Topic
  173. })
  174. end,
  175. true
  176. end).
  177. prop_session_terminated() ->
  178. ?ALL({ClientInfo, Reason, SessInfo, Env},
  179. {clientinfo(), shutdown_reason(), sessioninfo(), empty_env()},
  180. begin
  181. ok = emqx_web_hook:on_session_terminated(ClientInfo, Reason, SessInfo, Env),
  182. Body = receive_http_request_body(),
  183. Body = emqx_json:encode(
  184. #{action => session_terminated,
  185. node => stringfy(node()),
  186. clientid => maps:get(clientid, ClientInfo),
  187. username => maybe(maps:get(username, ClientInfo)),
  188. reason => stringfy(Reason)
  189. }),
  190. true
  191. end).
  192. prop_message_publish() ->
  193. ?ALL({Msg, Env, Encode}, {message(), topic_filter_env(), payload_encode()},
  194. begin
  195. application:set_env(emqx_web_hook, encode_payload, Encode),
  196. {ok, Msg} = emqx_web_hook:on_message_publish(Msg, Env),
  197. application:unset_env(emqx_web_hook, encode_payload),
  198. (not emqx_message:is_sys(Msg))
  199. andalso filter_topic_match(emqx_message:topic(Msg), Env)
  200. andalso begin
  201. Body = receive_http_request_body(),
  202. Body = emqx_json:encode(
  203. #{action => message_publish,
  204. node => stringfy(node()),
  205. from_client_id => emqx_message:from(Msg),
  206. from_username => maybe(emqx_message:get_header(username, Msg)),
  207. topic => emqx_message:topic(Msg),
  208. qos => emqx_message:qos(Msg),
  209. retain => emqx_message:get_flag(retain, Msg),
  210. payload => encode(emqx_message:payload(Msg), Encode),
  211. ts => emqx_message:timestamp(Msg)
  212. })
  213. end,
  214. true
  215. end).
  216. prop_message_delivered() ->
  217. ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env(), payload_encode()},
  218. begin
  219. application:set_env(emqx_web_hook, encode_payload, Encode),
  220. ok = emqx_web_hook:on_message_delivered(ClientInfo, Msg, Env),
  221. application:unset_env(emqx_web_hook, encode_payload),
  222. (not emqx_message:is_sys(Msg))
  223. andalso filter_topic_match(emqx_message:topic(Msg), Env)
  224. andalso begin
  225. Body = receive_http_request_body(),
  226. Body = emqx_json:encode(
  227. #{action => message_delivered,
  228. node => stringfy(node()),
  229. clientid => maps:get(clientid, ClientInfo),
  230. username => maybe(maps:get(username, ClientInfo)),
  231. from_client_id => emqx_message:from(Msg),
  232. from_username => maybe(emqx_message:get_header(username, Msg)),
  233. topic => emqx_message:topic(Msg),
  234. qos => emqx_message:qos(Msg),
  235. retain => emqx_message:get_flag(retain, Msg),
  236. payload => encode(emqx_message:payload(Msg), Encode),
  237. ts => emqx_message:timestamp(Msg)
  238. })
  239. end,
  240. true
  241. end).
  242. prop_message_acked() ->
  243. ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), empty_env(), payload_encode()},
  244. begin
  245. application:set_env(emqx_web_hook, encode_payload, Encode),
  246. ok = emqx_web_hook:on_message_acked(ClientInfo, Msg, Env),
  247. application:unset_env(emqx_web_hook, encode_payload),
  248. (not emqx_message:is_sys(Msg))
  249. andalso filter_topic_match(emqx_message:topic(Msg), Env)
  250. andalso begin
  251. Body = receive_http_request_body(),
  252. Body = emqx_json:encode(
  253. #{action => message_acked,
  254. node => stringfy(node()),
  255. clientid => maps:get(clientid, ClientInfo),
  256. username => maybe(maps:get(username, ClientInfo)),
  257. from_client_id => emqx_message:from(Msg),
  258. from_username => maybe(emqx_message:get_header(username, Msg)),
  259. topic => emqx_message:topic(Msg),
  260. qos => emqx_message:qos(Msg),
  261. retain => emqx_message:get_flag(retain, Msg),
  262. payload => encode(emqx_message:payload(Msg), Encode),
  263. ts => emqx_message:timestamp(Msg)
  264. })
  265. end,
  266. true
  267. end).
  268. prop_try_again() ->
  269. Setup = fun() ->
  270. logger:set_module_level(emqx_web_hook, emergency),
  271. meck:new(httpc, [passthrough, no_history]),
  272. meck:expect(httpc, request,
  273. fun(Method, {Url, [], ContentType, Body}, _HttpOpts, _Opt) ->
  274. self() ! {Method, Url, ContentType, Body}, {error, get(code)}
  275. end),
  276. meck:new(emqx_metrics, [passthrough, no_history]),
  277. meck:expect(emqx_metrics, inc, fun(_) -> ok end)
  278. end,
  279. Teardown = fun() ->
  280. meck:unload(httpc),
  281. meck:unload(emqx_metrics),
  282. logger:set_module_level(emqx_web_hook, debug)
  283. end,
  284. ?SETUP(fun() -> Setup(), Teardown end,
  285. ?FORALL({ConnInfo, ConnProps, Env, Code},
  286. {conninfo(), conn_properties(), empty_env(), http_code()},
  287. begin
  288. %% pre-set error code
  289. put(code, Code),
  290. %% run hook
  291. ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env),
  292. Bodys = receive_http_request_bodys(),
  293. Body = emqx_json:encode(
  294. #{action => client_connect,
  295. node => stringfy(node()),
  296. clientid => maps:get(clientid, ConnInfo),
  297. username => maybe(maps:get(username, ConnInfo)),
  298. ipaddress => peer2addr(maps:get(peername, ConnInfo)),
  299. keepalive => maps:get(keepalive, ConnInfo),
  300. proto_ver => maps:get(proto_ver, ConnInfo)
  301. }),
  302. [ B = Body || B <- Bodys],
  303. if Code == socket_closed_remotely ->
  304. 4 = length(Bodys);
  305. true -> ok
  306. end,
  307. true
  308. end)).
  309. %%--------------------------------------------------------------------
  310. %% Helper
  311. %%--------------------------------------------------------------------
  312. do_setup() ->
  313. Self = self(),
  314. meck:new(httpc, [passthrough, no_history]),
  315. meck:expect(httpc, request,
  316. fun(Method, {Url, [], ContentType, Body}, _HttpOpts, _Opt) ->
  317. Self ! {Method, Url, ContentType, Body}, {ok, ok}
  318. end),
  319. meck:new(emqx_metrics, [passthrough, no_history]),
  320. meck:expect(emqx_metrics, inc, fun(_) -> ok end),
  321. ok.
  322. do_teardown(_) ->
  323. meck:unload(httpc),
  324. meck:unload(emqx_metrics).
  325. maybe(undefined) -> null;
  326. maybe(T) -> T.
  327. peer2addr({Host, _}) ->
  328. list_to_binary(inet:ntoa(Host));
  329. peer2addr(Host) ->
  330. list_to_binary(inet:ntoa(Host)).
  331. ensure_to_binary(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
  332. ensure_to_binary(Bin) when is_binary(Bin) -> Bin.
  333. stringfy({shutdown, Reason}) ->
  334. stringfy(Reason);
  335. stringfy(Term) when is_atom(Term); is_binary(Term) ->
  336. Term;
  337. stringfy(Term) ->
  338. unicode:characters_to_binary(io_lib:format("~0p", [Term])).
  339. receive_http_request_body() ->
  340. receive
  341. {post, "http://127.0.0.1", "application/json", Body} ->
  342. Body
  343. after 100 ->
  344. exit(waiting_message_timeout)
  345. end.
  346. receive_http_request_bodys() ->
  347. receive_http_request_bodys_([]).
  348. receive_http_request_bodys_(Acc) ->
  349. receive
  350. {post, "http://127.0.0.1", "application/json", Body} ->
  351. receive_http_request_bodys_([Body|Acc])
  352. after 1000 ->
  353. lists:reverse(Acc)
  354. end.
  355. filter_topictab(TopicTab, {undefined}) ->
  356. TopicTab;
  357. filter_topictab(TopicTab, {TopicFilter}) ->
  358. lists:filter(fun({Topic, _}) -> emqx_topic:match(Topic, TopicFilter) end, TopicTab).
  359. filter_topic_match(_Topic, {undefined}) ->
  360. true;
  361. filter_topic_match(Topic, {TopicFilter}) ->
  362. emqx_topic:match(Topic, TopicFilter).
  363. encode(Bin, base64) ->
  364. base64:encode(Bin);
  365. encode(Bin, base62) ->
  366. emqx_base62:encode(Bin);
  367. encode(Bin, _) ->
  368. Bin.
  369. %%--------------------------------------------------------------------
  370. %% Generators
  371. %%--------------------------------------------------------------------
  372. conn_properties() ->
  373. #{}.
  374. ack_properties() ->
  375. #{}.
  376. sub_properties() ->
  377. #{}.
  378. unsub_properties() ->
  379. #{}.
  380. shutdown_reason() ->
  381. oneof([any(), {shutdown, atom()}]).
  382. empty_env() ->
  383. {undefined}.
  384. topic_filter_env() ->
  385. oneof([{<<"#">>}, {undefined}, {topic()}]).
  386. payload_encode() ->
  387. oneof([base62, base64, undefined]).
  388. http_code() ->
  389. oneof([socket_closed_remotely, others]).
  390. disconnected_conninfo() ->
  391. ?LET(Info, conninfo(),
  392. begin
  393. Info#{disconnected_at => erlang:system_time(millisecond)}
  394. end).