emqx_web_hook_SUITE.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_web_hook_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("common_test/include/ct.hrl").
  22. -define(HOOK_LOOKUP(H), emqx_hooks:lookup(list_to_atom(H))).
  23. -define(ACTION(Name), #{<<"action">> := Name}).
  24. %%--------------------------------------------------------------------
  25. %% Setups
  26. %%--------------------------------------------------------------------
  27. all() ->
  28. [ {group, http}
  29. , {group, https}
  30. , {group, ipv6http}
  31. , {group, ipv6https}
  32. , {group, all}
  33. ].
  34. groups() ->
  35. Cases = [test_full_flow],
  36. [ {http, [sequence], Cases}
  37. , {https, [sequence], Cases}
  38. , {ipv6http, [sequence], Cases}
  39. , {ipv6https, [sequence], Cases}
  40. , {all, [sequence], emqx_ct:all(?MODULE)}
  41. ].
  42. start_apps(F) -> emqx_ct_helpers:start_apps(apps(), F).
  43. init_per_group(Name, Config) ->
  44. application:ensure_all_started(emqx_management),
  45. set_special_cfgs(),
  46. BasePort =
  47. case Name of
  48. all -> 8801;
  49. http -> 8811;
  50. https -> 8821;
  51. ipv6http -> 8831;
  52. ipv6https -> 8841
  53. end,
  54. CF = case Name of
  55. all -> fun set_special_configs_http/1;
  56. http -> fun set_special_configs_http/1;
  57. https -> fun set_special_configs_https/1;
  58. ipv6http -> fun set_special_configs_ipv6_http/1;
  59. ipv6https -> fun set_special_configs_ipv6_https/1
  60. end,
  61. start_apps(fun(_) -> CF(BasePort) end),
  62. Opts = case atom_to_list(Name) of
  63. "ipv6" ++ _ -> [{ip, {0,0,0,0,0,0,0,1}}, inet6];
  64. _ -> [inet]
  65. end,
  66. [{base_port, BasePort}, {transport_opts, Opts} | Config].
  67. end_per_group(_Name, Config) ->
  68. emqx_ct_helpers:stop_apps(apps()),
  69. Config.
  70. set_special_configs_http(Port) ->
  71. application:set_env(emqx_web_hook, url, "http://127.0.0.1:" ++ integer_to_list(Port)).
  72. set_special_configs_https(Port) ->
  73. set_ssl_configs(),
  74. application:set_env(emqx_web_hook, url, "https://127.0.0.1:" ++ integer_to_list(Port+1)).
  75. set_special_configs_ipv6_http(Port) ->
  76. application:set_env(emqx_web_hook, url, "http://[::1]:" ++ integer_to_list(Port)).
  77. set_special_configs_ipv6_https(Port) ->
  78. set_ssl_configs(),
  79. application:set_env(emqx_web_hook, url, "https://[::1]:" ++ integer_to_list(Port+1)).
  80. set_ssl_configs() ->
  81. Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
  82. SslOpts = [{keyfile, Path ++ "/client-key.pem"},
  83. {certfile, Path ++ "/client-cert.pem"},
  84. {cacertfile, Path ++ "/ca.pem"}],
  85. application:set_env(emqx_web_hook, ssl, true),
  86. application:set_env(emqx_web_hook, ssloptions, SslOpts).
  87. set_special_cfgs() ->
  88. AllRules = [{"message.acked", "{\"action\": \"on_message_acked\"}"},
  89. {"message.delivered", "{\"action\": \"on_message_delivered\"}"},
  90. {"message.publish", "{\"action\": \"on_message_publish\"}"},
  91. {"session.terminated", "{\"action\": \"on_session_terminated\"}"},
  92. {"session.unsubscribed", "{\"action\": \"on_session_unsubscribed\"}"},
  93. {"session.subscribed", "{\"action\": \"on_session_subscribed\"}"},
  94. {"client.unsubscribe", "{\"action\": \"on_client_unsubscribe\"}"},
  95. {"client.subscribe", "{\"action\": \"on_client_subscribe\"}"},
  96. {"client.disconnected", "{\"action\": \"on_client_disconnected\"}"},
  97. {"client.connected", "{\"action\": \"on_client_connected\"}"},
  98. {"client.connack", "{\"action\": \"on_client_connack\"}"},
  99. {"client.connect", "{\"action\": \"on_client_connect\"}"}],
  100. application:set_env(emqx_web_hook, rules, AllRules).
  101. %%--------------------------------------------------------------------
  102. %% Test cases
  103. %%--------------------------------------------------------------------
  104. test_full_flow(Config) ->
  105. [_|_] = Opts = proplists:get_value(transport_opts, Config),
  106. BasePort = proplists:get_value(base_port, Config),
  107. Tester = self(),
  108. {ok, ServerPid} = http_server:start_link(Tester, BasePort, Opts),
  109. receive {ServerPid, ready} -> ok
  110. after 1000 -> error(timeout)
  111. end,
  112. application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
  113. ClientId = iolist_to_binary(["client-", integer_to_list(erlang:system_time())]),
  114. {ok, C} = emqtt:start_link([ {clientid, ClientId}
  115. , {proto_ver, v5}
  116. , {keepalive, 60}
  117. ]),
  118. try
  119. do_test_full_flow(C, ClientId)
  120. after
  121. Ref = erlang:monitor(process, ServerPid),
  122. http_server:stop(ServerPid),
  123. receive {'DOWN', Ref, _, _, _} -> ok
  124. after 5000 -> error(timeout)
  125. end
  126. end.
  127. do_test_full_flow(C, ClientId) ->
  128. {ok, _} = emqtt:connect(C),
  129. {ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos2),
  130. {ok, _} = emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
  131. {ok, _, _} = emqtt:unsubscribe(C, <<"TopicA">>),
  132. emqtt:disconnect(C),
  133. validate_params_and_headers(undefined, ClientId).
  134. validate_params_and_headers(ClientState, ClientId) ->
  135. receive
  136. {http_server, {Params0, _Bool}, Headers} ->
  137. Params = emqx_json:decode(Params0, [return_maps]),
  138. try
  139. validate_hook_resp(ClientId, Params),
  140. validate_hook_headers(Headers),
  141. case maps:get(<<"action">>, Params) of
  142. <<"session_terminated">> ->
  143. ok;
  144. <<"client_connect">> ->
  145. validate_params_and_headers(connected, ClientId);
  146. _ ->
  147. validate_params_and_headers(ClientState, ClientId) %% continue looping
  148. end
  149. catch
  150. throw : {unknown_client, Other} ->
  151. ct:pal("ignored_event_from_other_client ~p~nexpecting:~p~n~p~n~p",
  152. [Other, ClientId, Params, Headers]),
  153. validate_params_and_headers(ClientState, ClientId) %% continue looping
  154. end
  155. after
  156. 5000 ->
  157. case ClientState =:= undefined of
  158. true -> error("client_was_never_connected");
  159. false -> error("terminate_action_is_not_received_in_time")
  160. end
  161. end.
  162. t_check_hooked(_) ->
  163. {ok, Rules} = application:get_env(emqx_web_hook, rules),
  164. lists:foreach(fun({HookName, _Action}) ->
  165. Hooks = ?HOOK_LOOKUP(HookName),
  166. ?assertEqual(true, length(Hooks) > 0)
  167. end, Rules).
  168. t_change_config(_) ->
  169. {ok, Rules} = application:get_env(emqx_web_hook, rules),
  170. emqx_web_hook:unload(),
  171. HookRules = lists:keydelete("message.delivered", 1, Rules),
  172. application:set_env(emqx_web_hook, rules, HookRules),
  173. emqx_web_hook:load(),
  174. ?assertEqual([], ?HOOK_LOOKUP("message.delivered")),
  175. emqx_web_hook:unload(),
  176. application:set_env(emqx_web_hook, rules, Rules),
  177. emqx_web_hook:load().
  178. %%--------------------------------------------------------------------
  179. %% Utils
  180. %%--------------------------------------------------------------------
  181. validate_hook_headers(Headers) ->
  182. ?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)),
  183. ?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)).
  184. validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connect">>)) ->
  185. assert_username_clientid(ClientId, Body),
  186. ?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
  187. ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
  188. ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
  189. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
  190. ok;
  191. validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connack">>)) ->
  192. assert_username_clientid(ClientId, Body),
  193. ?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
  194. ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
  195. ?assertEqual(<<"success">>, maps:get(<<"conn_ack">>, Body)),
  196. ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
  197. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
  198. ok;
  199. validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connected">>)) ->
  200. assert_username_clientid(ClientId, Body),
  201. _ = maps:get(<<"connected_at">>, Body),
  202. ?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
  203. ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
  204. ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
  205. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
  206. validate_hook_resp(ClientId, Body = ?ACTION(<<"client_disconnected">>)) ->
  207. assert_username_clientid(ClientId, Body),
  208. ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
  209. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
  210. validate_hook_resp(ClientId, Body = ?ACTION(<<"client_subscribe">>)) ->
  211. assert_username_clientid(ClientId, Body),
  212. _ = maps:get(<<"opts">>, Body),
  213. ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
  214. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
  215. validate_hook_resp(ClientId, Body = ?ACTION(<<"client_unsubscribe">>)) ->
  216. assert_username_clientid(ClientId, Body),
  217. _ = maps:get(<<"opts">>, Body),
  218. ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
  219. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
  220. validate_hook_resp(ClientId, Body = ?ACTION(<<"session_subscribed">>)) ->
  221. assert_username_clientid(ClientId, Body),
  222. _ = maps:get(<<"opts">>, Body),
  223. ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
  224. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
  225. validate_hook_resp(ClientId, Body = ?ACTION(<<"session_unsubscribed">>)) ->
  226. assert_username_clientid(ClientId, Body),
  227. ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
  228. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
  229. validate_hook_resp(ClientId, Body = ?ACTION(<<"session_terminated">>)) ->
  230. assert_username_clientid(ClientId, Body),
  231. ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
  232. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
  233. validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_publish">>)) ->
  234. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
  235. assert_messages_attrs(Body);
  236. validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_delivered">>)) ->
  237. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
  238. assert_messages_attrs(Body);
  239. validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_acked">>)) ->
  240. ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
  241. assert_messages_attrs(Body).
  242. assert_username_clientid(ClientId, #{<<"clientid">> := ClientId, <<"username">> := Username}) ->
  243. ?assertEqual(null, Username);
  244. assert_username_clientid(_ClientId, #{<<"clientid">> := Other}) ->
  245. throw({unknown_client, Other}).
  246. assert_messages_attrs(#{ <<"ts">> := _
  247. , <<"qos">> := _
  248. , <<"topic">> := _
  249. , <<"retain">> := _
  250. , <<"payload">> := _
  251. , <<"from_username">> := _
  252. , <<"from_client_id">> := _
  253. }) ->
  254. ok.
  255. apps() ->
  256. [emqx_web_hook, emqx_modules, emqx_management, emqx_rule_engine].