|
|
@@ -16,10 +16,7 @@
|
|
|
-define(BRIDGE_TYPE, gcp_pubsub).
|
|
|
-define(BRIDGE_TYPE_BIN, <<"gcp_pubsub">>).
|
|
|
|
|
|
--import(emqx_common_test_helpers, [on_exit/2, run_on_exit_callbacks/1]).
|
|
|
-
|
|
|
--define(on_exit_key(TESTCASE), {?MODULE, TESTCASE}).
|
|
|
--define(on_exit(FUN), on_exit({?MODULE, ?FUNCTION_NAME}, FUN)).
|
|
|
+-import(emqx_common_test_helpers, [on_exit/1]).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% CT boilerplate
|
|
|
@@ -138,9 +135,8 @@ init_per_testcase(TestCase, Config0) ->
|
|
|
Config = generate_config(Config0),
|
|
|
[{telemetry_table, Tid} | Config].
|
|
|
|
|
|
-end_per_testcase(TestCase, _Config) ->
|
|
|
+end_per_testcase(_TestCase, _Config) ->
|
|
|
ok = snabbkaffe:stop(),
|
|
|
- run_on_exit_callbacks(?on_exit_key(TestCase)),
|
|
|
delete_all_bridges(),
|
|
|
ok = emqx_connector_web_hook_server:stop(),
|
|
|
ok.
|
|
|
@@ -515,7 +511,7 @@ install_telemetry_handler(TestCase) ->
|
|
|
end,
|
|
|
unused_config
|
|
|
),
|
|
|
- on_exit(?on_exit_key(TestCase), fun() ->
|
|
|
+ on_exit(fun() ->
|
|
|
telemetry:detach(HandlerId),
|
|
|
ets:delete(Tid)
|
|
|
end),
|
|
|
@@ -567,7 +563,7 @@ t_publish_success(Config) ->
|
|
|
end
|
|
|
),
|
|
|
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
|
|
- ?on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
assert_empty_metrics(ResourceId),
|
|
|
Payload = <<"payload">>,
|
|
|
Message = emqx_message:make(Topic, Payload),
|
|
|
@@ -669,7 +665,7 @@ t_publish_templated(Config) ->
|
|
|
end
|
|
|
),
|
|
|
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
|
|
- ?on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
assert_empty_metrics(ResourceId),
|
|
|
Payload = <<"payload">>,
|
|
|
Message =
|
|
|
@@ -734,7 +730,7 @@ t_publish_success_batch(Config) ->
|
|
|
)
|
|
|
),
|
|
|
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
|
|
- ?on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
assert_empty_metrics(ResourceId),
|
|
|
NumMessages = BatchSize * 2,
|
|
|
Messages = [emqx_message:make(Topic, integer_to_binary(N)) || N <- lists:seq(1, NumMessages)],
|
|
|
@@ -916,7 +912,7 @@ t_publish_econnrefused(Config) ->
|
|
|
%% in ehttpc.
|
|
|
{ok, _} = create_bridge(Config, #{<<"pipelining">> => 1}),
|
|
|
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
|
|
- ?on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
assert_empty_metrics(ResourceId),
|
|
|
ok = emqx_connector_web_hook_server:stop(),
|
|
|
do_econnrefused_or_timeout_test(Config, econnrefused).
|
|
|
@@ -931,7 +927,7 @@ t_publish_timeout(Config) ->
|
|
|
<<"resource_opts">> => #{<<"batch_size">> => 1}
|
|
|
}),
|
|
|
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
|
|
- ?on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
assert_empty_metrics(ResourceId),
|
|
|
TestPid = self(),
|
|
|
TimeoutHandler =
|
|
|
@@ -1132,7 +1128,7 @@ t_success_no_body(Config) ->
|
|
|
Topic = <<"t/topic">>,
|
|
|
{ok, _} = create_bridge(Config),
|
|
|
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
|
|
- ?on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
Payload = <<"payload">>,
|
|
|
Message = emqx_message:make(Topic, Payload),
|
|
|
?check_trace(
|
|
|
@@ -1170,7 +1166,7 @@ t_failure_with_body(Config) ->
|
|
|
Topic = <<"t/topic">>,
|
|
|
{ok, _} = create_bridge(Config),
|
|
|
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
|
|
- ?on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
Payload = <<"payload">>,
|
|
|
Message = emqx_message:make(Topic, Payload),
|
|
|
?check_trace(
|
|
|
@@ -1208,7 +1204,7 @@ t_failure_no_body(Config) ->
|
|
|
Topic = <<"t/topic">>,
|
|
|
{ok, _} = create_bridge(Config),
|
|
|
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
|
|
- ?on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
Payload = <<"payload">>,
|
|
|
Message = emqx_message:make(Topic, Payload),
|
|
|
?check_trace(
|
|
|
@@ -1257,7 +1253,7 @@ t_unrecoverable_error(Config) ->
|
|
|
{ok, _} = create_bridge(Config),
|
|
|
assert_empty_metrics(ResourceId),
|
|
|
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
|
|
- ?on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
Payload = <<"payload">>,
|
|
|
Message = emqx_message:make(Topic, Payload),
|
|
|
?check_trace(
|