| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_web_hook_SUITE).
- -compile(export_all).
- -compile(nowarn_export_all).
- -include_lib("emqx/include/emqx.hrl").
- -include_lib("eunit/include/eunit.hrl").
- -include_lib("common_test/include/ct.hrl").
- -define(HOOK_LOOKUP(H), emqx_hooks:lookup(list_to_atom(H))).
- -define(ACTION(Name), #{<<"action">> := Name}).
- %%--------------------------------------------------------------------
- %% Setups
- %%--------------------------------------------------------------------
- all() ->
- [ {group, http}
- , {group, https}
- , {group, ipv6http}
- , {group, ipv6https}
- , {group, all}
- ].
- groups() ->
- Cases = [test_full_flow],
- [ {http, [sequence], Cases}
- , {https, [sequence], Cases}
- , {ipv6http, [sequence], Cases}
- , {ipv6https, [sequence], Cases}
- , {all, [sequence], emqx_ct:all(?MODULE)}
- ].
- start_apps(F) -> emqx_ct_helpers:start_apps(apps(), F).
- init_per_group(Name, Config) ->
- application:ensure_all_started(emqx_management),
- set_special_cfgs(),
- BasePort =
- case Name of
- all -> 8801;
- http -> 8811;
- https -> 8821;
- ipv6http -> 8831;
- ipv6https -> 8841
- end,
- CF = case Name of
- all -> fun set_special_configs_http/1;
- http -> fun set_special_configs_http/1;
- https -> fun set_special_configs_https/1;
- ipv6http -> fun set_special_configs_ipv6_http/1;
- ipv6https -> fun set_special_configs_ipv6_https/1
- end,
- start_apps(fun(_) -> CF(BasePort) end),
- Opts = case atom_to_list(Name) of
- "ipv6" ++ _ -> [{ip, {0,0,0,0,0,0,0,1}}, inet6];
- _ -> [inet]
- end,
- [{base_port, BasePort}, {transport_opts, Opts} | Config].
- end_per_group(_Name, Config) ->
- emqx_ct_helpers:stop_apps(apps()),
- Config.
- set_special_configs_http(Port) ->
- application:set_env(emqx_web_hook, url, "http://127.0.0.1:" ++ integer_to_list(Port)).
- set_special_configs_https(Port) ->
- set_ssl_configs(),
- application:set_env(emqx_web_hook, url, "https://127.0.0.1:" ++ integer_to_list(Port+1)).
- set_special_configs_ipv6_http(Port) ->
- application:set_env(emqx_web_hook, url, "http://[::1]:" ++ integer_to_list(Port)).
- set_special_configs_ipv6_https(Port) ->
- set_ssl_configs(),
- application:set_env(emqx_web_hook, url, "https://[::1]:" ++ integer_to_list(Port+1)).
- set_ssl_configs() ->
- Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
- SslOpts = [{keyfile, Path ++ "/client-key.pem"},
- {certfile, Path ++ "/client-cert.pem"},
- {cacertfile, Path ++ "/ca.pem"}],
- application:set_env(emqx_web_hook, ssl, true),
- application:set_env(emqx_web_hook, ssloptions, SslOpts).
- set_special_cfgs() ->
- AllRules = [{"message.acked", "{\"action\": \"on_message_acked\"}"},
- {"message.delivered", "{\"action\": \"on_message_delivered\"}"},
- {"message.publish", "{\"action\": \"on_message_publish\"}"},
- {"session.terminated", "{\"action\": \"on_session_terminated\"}"},
- {"session.unsubscribed", "{\"action\": \"on_session_unsubscribed\"}"},
- {"session.subscribed", "{\"action\": \"on_session_subscribed\"}"},
- {"client.unsubscribe", "{\"action\": \"on_client_unsubscribe\"}"},
- {"client.subscribe", "{\"action\": \"on_client_subscribe\"}"},
- {"client.disconnected", "{\"action\": \"on_client_disconnected\"}"},
- {"client.connected", "{\"action\": \"on_client_connected\"}"},
- {"client.connack", "{\"action\": \"on_client_connack\"}"},
- {"client.connect", "{\"action\": \"on_client_connect\"}"}],
- application:set_env(emqx_web_hook, rules, AllRules).
- %%--------------------------------------------------------------------
- %% Test cases
- %%--------------------------------------------------------------------
- test_full_flow(Config) ->
- [_|_] = Opts = proplists:get_value(transport_opts, Config),
- BasePort = proplists:get_value(base_port, Config),
- Tester = self(),
- {ok, ServerPid} = http_server:start_link(Tester, BasePort, Opts),
- receive {ServerPid, ready} -> ok
- after 1000 -> error(timeout)
- end,
- application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
- ClientId = iolist_to_binary(["client-", integer_to_list(erlang:system_time())]),
- {ok, C} = emqtt:start_link([ {clientid, ClientId}
- , {proto_ver, v5}
- , {keepalive, 60}
- ]),
- try
- do_test_full_flow(C, ClientId)
- after
- Ref = erlang:monitor(process, ServerPid),
- http_server:stop(ServerPid),
- receive {'DOWN', Ref, _, _, _} -> ok
- after 5000 -> error(timeout)
- end
- end.
- do_test_full_flow(C, ClientId) ->
- {ok, _} = emqtt:connect(C),
- {ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos2),
- {ok, _} = emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
- {ok, _, _} = emqtt:unsubscribe(C, <<"TopicA">>),
- emqtt:disconnect(C),
- validate_params_and_headers(undefined, ClientId).
- validate_params_and_headers(ClientState, ClientId) ->
- receive
- {http_server, {Params0, _Bool}, Headers} ->
- Params = emqx_json:decode(Params0, [return_maps]),
- try
- validate_hook_resp(ClientId, Params),
- validate_hook_headers(Headers),
- case maps:get(<<"action">>, Params) of
- <<"session_terminated">> ->
- ok;
- <<"client_connect">> ->
- validate_params_and_headers(connected, ClientId);
- _ ->
- validate_params_and_headers(ClientState, ClientId) %% continue looping
- end
- catch
- throw : {unknown_client, Other} ->
- ct:pal("ignored_event_from_other_client ~p~nexpecting:~p~n~p~n~p",
- [Other, ClientId, Params, Headers]),
- validate_params_and_headers(ClientState, ClientId) %% continue looping
- end
- after
- 5000 ->
- case ClientState =:= undefined of
- true -> error("client_was_never_connected");
- false -> error("terminate_action_is_not_received_in_time")
- end
- end.
- t_check_hooked(_) ->
- {ok, Rules} = application:get_env(emqx_web_hook, rules),
- lists:foreach(fun({HookName, _Action}) ->
- Hooks = ?HOOK_LOOKUP(HookName),
- ?assertEqual(true, length(Hooks) > 0)
- end, Rules).
- t_change_config(_) ->
- {ok, Rules} = application:get_env(emqx_web_hook, rules),
- emqx_web_hook:unload(),
- HookRules = lists:keydelete("message.delivered", 1, Rules),
- application:set_env(emqx_web_hook, rules, HookRules),
- emqx_web_hook:load(),
- ?assertEqual([], ?HOOK_LOOKUP("message.delivered")),
- emqx_web_hook:unload(),
- application:set_env(emqx_web_hook, rules, Rules),
- emqx_web_hook:load().
- %%--------------------------------------------------------------------
- %% Utils
- %%--------------------------------------------------------------------
- validate_hook_headers(Headers) ->
- ?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)),
- ?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)).
- validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connect">>)) ->
- assert_username_clientid(ClientId, Body),
- ?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
- ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
- ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
- ok;
- validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connack">>)) ->
- assert_username_clientid(ClientId, Body),
- ?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
- ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
- ?assertEqual(<<"success">>, maps:get(<<"conn_ack">>, Body)),
- ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
- ok;
- validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connected">>)) ->
- assert_username_clientid(ClientId, Body),
- _ = maps:get(<<"connected_at">>, Body),
- ?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
- ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
- ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
- validate_hook_resp(ClientId, Body = ?ACTION(<<"client_disconnected">>)) ->
- assert_username_clientid(ClientId, Body),
- ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
- validate_hook_resp(ClientId, Body = ?ACTION(<<"client_subscribe">>)) ->
- assert_username_clientid(ClientId, Body),
- _ = maps:get(<<"opts">>, Body),
- ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
- validate_hook_resp(ClientId, Body = ?ACTION(<<"client_unsubscribe">>)) ->
- assert_username_clientid(ClientId, Body),
- _ = maps:get(<<"opts">>, Body),
- ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
- validate_hook_resp(ClientId, Body = ?ACTION(<<"session_subscribed">>)) ->
- assert_username_clientid(ClientId, Body),
- _ = maps:get(<<"opts">>, Body),
- ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
- validate_hook_resp(ClientId, Body = ?ACTION(<<"session_unsubscribed">>)) ->
- assert_username_clientid(ClientId, Body),
- ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
- validate_hook_resp(ClientId, Body = ?ACTION(<<"session_terminated">>)) ->
- assert_username_clientid(ClientId, Body),
- ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
- validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_publish">>)) ->
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
- assert_messages_attrs(Body);
- validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_delivered">>)) ->
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
- assert_messages_attrs(Body);
- validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_acked">>)) ->
- ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
- assert_messages_attrs(Body).
- assert_username_clientid(ClientId, #{<<"clientid">> := ClientId, <<"username">> := Username}) ->
- ?assertEqual(null, Username);
- assert_username_clientid(_ClientId, #{<<"clientid">> := Other}) ->
- throw({unknown_client, Other}).
- assert_messages_attrs(#{ <<"ts">> := _
- , <<"qos">> := _
- , <<"topic">> := _
- , <<"retain">> := _
- , <<"payload">> := _
- , <<"from_username">> := _
- , <<"from_client_id">> := _
- }) ->
- ok.
- apps() ->
- [emqx_web_hook, emqx_modules, emqx_management, emqx_rule_engine].
|