|
|
@@ -31,55 +31,74 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
all() ->
|
|
|
- [{group, http},
|
|
|
- {group, https},
|
|
|
- {group, ipv6http},
|
|
|
- {group, ipv6https}].
|
|
|
+ [ {group, http}
|
|
|
+ , {group, https}
|
|
|
+ , {group, ipv6http}
|
|
|
+ , {group, ipv6https}
|
|
|
+ , {group, all}
|
|
|
+ ].
|
|
|
|
|
|
groups() ->
|
|
|
- Cases = emqx_ct:all(?MODULE),
|
|
|
- [{http, [sequence], Cases},
|
|
|
- {https, [sequence], Cases},
|
|
|
- {ipv6http, [sequence], Cases},
|
|
|
- {ipv6https, [sequence], Cases}].
|
|
|
+ Cases = [test_full_flow],
|
|
|
+ [ {http, [sequence], Cases}
|
|
|
+ , {https, [sequence], Cases}
|
|
|
+ , {ipv6http, [sequence], Cases}
|
|
|
+ , {ipv6https, [sequence], Cases}
|
|
|
+ , {all, [sequence], emqx_ct:all(?MODULE)}
|
|
|
+ ].
|
|
|
+
|
|
|
+start_apps(F) -> emqx_ct_helpers:start_apps(apps(), F).
|
|
|
|
|
|
init_per_group(Name, Config) ->
|
|
|
application:ensure_all_started(emqx_management),
|
|
|
set_special_cfgs(),
|
|
|
- case Name of
|
|
|
- http ->
|
|
|
- emqx_ct_helpers:start_apps(apps(), fun set_special_configs_http/1);
|
|
|
- https ->
|
|
|
- emqx_ct_helpers:start_apps(apps(), fun set_special_configs_https/1);
|
|
|
- ipv6http ->
|
|
|
- emqx_ct_helpers:start_apps(apps(), fun set_special_configs_ipv6_http/1);
|
|
|
- ipv6https ->
|
|
|
- emqx_ct_helpers:start_apps(apps(), fun set_special_configs_ipv6_https/1)
|
|
|
- end,
|
|
|
- Config.
|
|
|
+ BasePort =
|
|
|
+ case Name of
|
|
|
+ all -> 8801;
|
|
|
+ http -> 8811;
|
|
|
+ https -> 8821;
|
|
|
+ ipv6http -> 8831;
|
|
|
+ ipv6https -> 8841
|
|
|
+ end,
|
|
|
+ CF = case Name of
|
|
|
+ all -> fun set_special_configs_http/1;
|
|
|
+ http -> fun set_special_configs_http/1;
|
|
|
+ https -> fun set_special_configs_https/1;
|
|
|
+ ipv6http -> fun set_special_configs_ipv6_http/1;
|
|
|
+ ipv6https -> fun set_special_configs_ipv6_https/1
|
|
|
+ end,
|
|
|
+ start_apps(fun(_) -> CF(BasePort) end),
|
|
|
+ Opts = case atom_to_list(Name) of
|
|
|
+ "ipv6" ++ _ -> [{ip, {0,0,0,0,0,0,0,1}}, inet6];
|
|
|
+ _ -> [inet]
|
|
|
+ end,
|
|
|
+ [{base_port, BasePort}, {transport_opts, Opts} | Config].
|
|
|
|
|
|
end_per_group(_Name, Config) ->
|
|
|
emqx_ct_helpers:stop_apps(apps()),
|
|
|
Config.
|
|
|
|
|
|
-set_special_configs_http(_) ->
|
|
|
- application:set_env(emqx_web_hook, url, "http://127.0.0.1:9999").
|
|
|
+set_special_configs_http(Port) ->
|
|
|
+ application:set_env(emqx_web_hook, url, "http://127.0.0.1:" ++ integer_to_list(Port)).
|
|
|
+
|
|
|
+set_special_configs_https(Port) ->
|
|
|
+ set_ssl_configs(),
|
|
|
+ application:set_env(emqx_web_hook, url, "https://127.0.0.1:" ++ integer_to_list(Port+1)).
|
|
|
|
|
|
-set_special_configs_https(_) ->
|
|
|
+set_special_configs_ipv6_http(Port) ->
|
|
|
+ application:set_env(emqx_web_hook, url, "http://[::1]:" ++ integer_to_list(Port)).
|
|
|
+
|
|
|
+set_special_configs_ipv6_https(Port) ->
|
|
|
+ set_ssl_configs(),
|
|
|
+ application:set_env(emqx_web_hook, url, "https://[::1]:" ++ integer_to_list(Port+1)).
|
|
|
+
|
|
|
+set_ssl_configs() ->
|
|
|
Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
|
|
|
SslOpts = [{keyfile, Path ++ "/client-key.pem"},
|
|
|
{certfile, Path ++ "/client-cert.pem"},
|
|
|
{cacertfile, Path ++ "/ca.pem"}],
|
|
|
application:set_env(emqx_web_hook, ssl, true),
|
|
|
- application:set_env(emqx_web_hook, ssloptions, SslOpts),
|
|
|
- application:set_env(emqx_web_hook, url, "https://127.0.0.1:8888").
|
|
|
-
|
|
|
-set_special_configs_ipv6_http(_) ->
|
|
|
- application:set_env(emqx_web_hook, url, "http://[::1]:9999").
|
|
|
-
|
|
|
-set_special_configs_ipv6_https(N) ->
|
|
|
- set_special_configs_https(N),
|
|
|
- application:set_env(emqx_web_hook, url, "https://[::1]:8888").
|
|
|
+ application:set_env(emqx_web_hook, ssloptions, SslOpts).
|
|
|
|
|
|
set_special_cfgs() ->
|
|
|
AllRules = [{"message.acked", "{\"action\": \"on_message_acked\"}"},
|
|
|
@@ -95,34 +114,63 @@ set_special_cfgs() ->
|
|
|
{"client.connack", "{\"action\": \"on_client_connack\"}"},
|
|
|
{"client.connect", "{\"action\": \"on_client_connect\"}"}],
|
|
|
application:set_env(emqx_web_hook, rules, AllRules).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test cases
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-t_valid(Config) ->
|
|
|
- {ok, ServerPid} = http_server:start_link(),
|
|
|
+test_full_flow(Config) ->
|
|
|
+ [_|_] = Opts = proplists:get_value(transport_opts, Config),
|
|
|
+ BasePort = proplists:get_value(base_port, Config),
|
|
|
+ Tester = self(),
|
|
|
+ {ok, ServerPid} = http_server:start_link(Tester, BasePort, Opts),
|
|
|
+ receive {ServerPid, ready} -> ok
|
|
|
+ after 1000 -> error(timeout)
|
|
|
+ end,
|
|
|
application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
|
|
|
{ok, C} = emqtt:start_link([ {clientid, <<"simpleClient">>}
|
|
|
, {proto_ver, v5}
|
|
|
, {keepalive, 60}
|
|
|
]),
|
|
|
- try
|
|
|
- {ok, _} = emqtt:connect(C),
|
|
|
- emqtt:subscribe(C, <<"TopicA">>, qos2),
|
|
|
- emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
|
|
|
- emqtt:unsubscribe(C, <<"TopicA">>),
|
|
|
- emqtt:disconnect(C),
|
|
|
- timer:sleep(100),
|
|
|
- [begin
|
|
|
- Maps = emqx_json:decode(P, [return_maps]),
|
|
|
- validate_hook_resp(Maps),
|
|
|
- validate_hook_headers(Headers)
|
|
|
+ try
|
|
|
+ do_test_full_flow(C)
|
|
|
+ after
|
|
|
+ Ref = erlang:monitor(process, ServerPid),
|
|
|
+ http_server:stop(ServerPid),
|
|
|
+ receive {'DOWN', Ref, _, _, _} -> ok
|
|
|
+ after 5000 -> error(timeout)
|
|
|
end
|
|
|
- || {{P, _Bool}, Headers} <- http_server:get_received_data()]
|
|
|
+ end.
|
|
|
+
|
|
|
+do_test_full_flow(C) ->
|
|
|
+ {ok, _} = emqtt:connect(C),
|
|
|
+ {ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos2),
|
|
|
+ {ok, _} = emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
|
|
|
+ {ok, _, _} = emqtt:unsubscribe(C, <<"TopicA">>),
|
|
|
+ emqtt:disconnect(C),
|
|
|
+ validate_params_and_headers(undefined).
|
|
|
+
|
|
|
+validate_params_and_headers(ClientState) ->
|
|
|
+ receive
|
|
|
+ {http_server, {Params0, _Bool}, Headers} ->
|
|
|
+ Params = emqx_json:decode(Params0, [return_maps]),
|
|
|
+ validate_hook_resp(Params),
|
|
|
+ validate_hook_headers(Headers),
|
|
|
+ case maps:get(<<"action">>, Params) of
|
|
|
+ <<"session_terminated">> ->
|
|
|
+ ok;
|
|
|
+ <<"client_connect">> ->
|
|
|
+ validate_params_and_headers(connected);
|
|
|
+ _ ->
|
|
|
+ validate_params_and_headers(ClientState) %% continue looping
|
|
|
+ end
|
|
|
after
|
|
|
- http_server:stop(ServerPid)
|
|
|
- end,
|
|
|
- Config.
|
|
|
+ 5000 ->
|
|
|
+ case ClientState =:= undefined of
|
|
|
+ true -> error("client_was_never_connected");
|
|
|
+ false -> error("terminate_action_is_not_received_in_time")
|
|
|
+ end
|
|
|
+ end.
|
|
|
|
|
|
t_check_hooked(_) ->
|
|
|
{ok, Rules} = application:get_env(emqx_web_hook, rules),
|