|
|
@@ -23,31 +23,25 @@
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
all() ->
|
|
|
- [
|
|
|
- {group, plain},
|
|
|
- {group, tls}
|
|
|
- ].
|
|
|
+ All0 = emqx_common_test_helpers:all(?MODULE),
|
|
|
+ All = All0 -- matrix_cases(),
|
|
|
+ Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
|
|
|
+ Groups ++ All.
|
|
|
|
|
|
groups() ->
|
|
|
- AllTCs = emqx_common_test_helpers:all(?MODULE),
|
|
|
- [
|
|
|
- {plain, AllTCs},
|
|
|
- {tls, AllTCs}
|
|
|
- ].
|
|
|
+ emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
|
|
|
+
|
|
|
+matrix_cases() ->
|
|
|
+ emqx_common_test_helpers:all(?MODULE).
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
- %% Ensure enterprise bridge module is loaded
|
|
|
- _ = emqx_bridge_enterprise:module_info(),
|
|
|
- {ok, Cwd} = file:get_cwd(),
|
|
|
- PrivDir = ?config(priv_dir, Config),
|
|
|
- WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd),
|
|
|
Apps = emqx_cth_suite:start(
|
|
|
lists:flatten([
|
|
|
?APPS,
|
|
|
emqx_management,
|
|
|
emqx_mgmt_api_test_util:emqx_dashboard()
|
|
|
]),
|
|
|
- #{work_dir => WorkDir}
|
|
|
+ #{work_dir => emqx_cth_suite:work_dir(Config)}
|
|
|
),
|
|
|
[{suite_apps, Apps} | Config].
|
|
|
|
|
|
@@ -61,6 +55,7 @@ init_per_group(plain = Type, Config) ->
|
|
|
case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
|
|
|
true ->
|
|
|
Config1 = common_init_per_group(),
|
|
|
+ ConnectorName = ?MODULE,
|
|
|
NewConfig =
|
|
|
[
|
|
|
{proxy_name, ProxyName},
|
|
|
@@ -70,7 +65,7 @@ init_per_group(plain = Type, Config) ->
|
|
|
{use_tls, false}
|
|
|
| Config1 ++ Config
|
|
|
],
|
|
|
- create_connector(?MODULE, NewConfig),
|
|
|
+ create_connector(ConnectorName, NewConfig),
|
|
|
NewConfig;
|
|
|
false ->
|
|
|
maybe_skip_without_ci()
|
|
|
@@ -82,6 +77,7 @@ init_per_group(tls = Type, Config) ->
|
|
|
case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
|
|
|
true ->
|
|
|
Config1 = common_init_per_group(),
|
|
|
+ ConnectorName = ?MODULE,
|
|
|
NewConfig =
|
|
|
[
|
|
|
{proxy_name, ProxyName},
|
|
|
@@ -91,17 +87,21 @@ init_per_group(tls = Type, Config) ->
|
|
|
{use_tls, true}
|
|
|
| Config1 ++ Config
|
|
|
],
|
|
|
- create_connector(?MODULE, NewConfig),
|
|
|
+ create_connector(ConnectorName, NewConfig),
|
|
|
NewConfig;
|
|
|
false ->
|
|
|
maybe_skip_without_ci()
|
|
|
- end.
|
|
|
+ end;
|
|
|
+init_per_group(_Group, Config) ->
|
|
|
+ Config.
|
|
|
|
|
|
end_per_group(Group, Config) when
|
|
|
Group =:= plain;
|
|
|
Group =:= tls
|
|
|
->
|
|
|
common_end_per_group(Config),
|
|
|
+ ok;
|
|
|
+end_per_group(_Group, _Config) ->
|
|
|
ok.
|
|
|
|
|
|
common_init_per_group() ->
|
|
|
@@ -189,66 +189,49 @@ pulsar_connector(Config) ->
|
|
|
":",
|
|
|
integer_to_binary(PulsarPort)
|
|
|
]),
|
|
|
- Connector = #{
|
|
|
- <<"connectors">> => #{
|
|
|
- <<"pulsar">> => #{
|
|
|
- Name => #{
|
|
|
- <<"enable">> => true,
|
|
|
- <<"ssl">> => #{
|
|
|
- <<"enable">> => UseTLS,
|
|
|
- <<"verify">> => <<"verify_none">>,
|
|
|
- <<"server_name_indication">> => <<"auto">>
|
|
|
- },
|
|
|
- <<"authentication">> => <<"none">>,
|
|
|
- <<"servers">> => ServerURL
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ InnerConfigMap = #{
|
|
|
+ <<"enable">> => true,
|
|
|
+ <<"ssl">> => #{
|
|
|
+ <<"enable">> => UseTLS,
|
|
|
+ <<"verify">> => <<"verify_none">>,
|
|
|
+ <<"server_name_indication">> => <<"auto">>
|
|
|
+ },
|
|
|
+ <<"authentication">> => <<"none">>,
|
|
|
+ <<"servers">> => ServerURL
|
|
|
},
|
|
|
- parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name).
|
|
|
+ emqx_bridge_v2_testlib:parse_and_check_connector(?TYPE, Name, InnerConfigMap).
|
|
|
|
|
|
pulsar_action(Config) ->
|
|
|
+ QueryMode = proplists:get_value(query_mode, Config, <<"sync">>),
|
|
|
Name = atom_to_binary(?MODULE),
|
|
|
- Action = #{
|
|
|
- <<"actions">> => #{
|
|
|
- <<"pulsar">> => #{
|
|
|
- Name => #{
|
|
|
- <<"connector">> => Name,
|
|
|
- <<"enable">> => true,
|
|
|
- <<"parameters">> => #{
|
|
|
- <<"retention_period">> => <<"infinity">>,
|
|
|
- <<"max_batch_bytes">> => <<"1MB">>,
|
|
|
- <<"batch_size">> => 100,
|
|
|
- <<"strategy">> => <<"random">>,
|
|
|
- <<"buffer">> => #{
|
|
|
- <<"mode">> => <<"memory">>,
|
|
|
- <<"per_partition_limit">> => <<"10MB">>,
|
|
|
- <<"segment_bytes">> => <<"5MB">>,
|
|
|
- <<"memory_overload_protection">> => true
|
|
|
- },
|
|
|
- <<"message">> => #{
|
|
|
- <<"key">> => <<"${.clientid}">>,
|
|
|
- <<"value">> => <<"${.}">>
|
|
|
- },
|
|
|
- <<"pulsar_topic">> => ?config(pulsar_topic, Config)
|
|
|
- },
|
|
|
- <<"resource_opts">> => #{
|
|
|
- <<"health_check_interval">> => <<"1s">>,
|
|
|
- <<"metrics_flush_interval">> => <<"300ms">>
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ InnerConfigMap = #{
|
|
|
+ <<"connector">> => Name,
|
|
|
+ <<"enable">> => true,
|
|
|
+ <<"parameters">> => #{
|
|
|
+ <<"retention_period">> => <<"infinity">>,
|
|
|
+ <<"max_batch_bytes">> => <<"1MB">>,
|
|
|
+ <<"batch_size">> => 100,
|
|
|
+ <<"strategy">> => <<"random">>,
|
|
|
+ <<"buffer">> => #{
|
|
|
+ <<"mode">> => <<"memory">>,
|
|
|
+ <<"per_partition_limit">> => <<"10MB">>,
|
|
|
+ <<"segment_bytes">> => <<"5MB">>,
|
|
|
+ <<"memory_overload_protection">> => true
|
|
|
+ },
|
|
|
+ <<"message">> => #{
|
|
|
+ <<"key">> => <<"${.clientid}">>,
|
|
|
+ <<"value">> => <<"${.}">>
|
|
|
+ },
|
|
|
+ <<"pulsar_topic">> => ?config(pulsar_topic, Config)
|
|
|
+ },
|
|
|
+ <<"resource_opts">> => #{
|
|
|
+ <<"query_mode">> => QueryMode,
|
|
|
+ <<"request_ttl">> => <<"1s">>,
|
|
|
+ <<"health_check_interval">> => <<"1s">>,
|
|
|
+ <<"metrics_flush_interval">> => <<"300ms">>
|
|
|
}
|
|
|
},
|
|
|
- parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name).
|
|
|
-
|
|
|
-parse_and_check(Key, Mod, Conf, Name) ->
|
|
|
- ConfStr = hocon_pp:do(Conf, #{}),
|
|
|
- ct:pal(ConfStr),
|
|
|
- {ok, RawConf} = hocon:binary(ConfStr, #{format => map}),
|
|
|
- hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}),
|
|
|
- #{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf,
|
|
|
- RetConf.
|
|
|
+ emqx_bridge_v2_testlib:parse_and_check(action, ?TYPE, Name, InnerConfigMap).
|
|
|
|
|
|
instance_id(Type, Name) ->
|
|
|
ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name),
|
|
|
@@ -404,20 +387,44 @@ assert_status_api(Line, Type, Name, Status) ->
|
|
|
).
|
|
|
-define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)).
|
|
|
|
|
|
+proplists_with(Keys, PList) ->
|
|
|
+ lists:filter(fun({K, _}) -> lists:member(K, Keys) end, PList).
|
|
|
+
|
|
|
+group_path(Config) ->
|
|
|
+ case emqx_common_test_helpers:group_path(Config) of
|
|
|
+ [] ->
|
|
|
+ undefined;
|
|
|
+ Path ->
|
|
|
+ Path
|
|
|
+ end.
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Testcases
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-t_action_probe(Config) ->
|
|
|
+t_action_probe(matrix) ->
|
|
|
+ [[plain], [tls]];
|
|
|
+t_action_probe(Config) when is_list(Config) ->
|
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
|
Action = pulsar_action(Config),
|
|
|
{ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
|
|
|
?assertMatch({{_, 204, _}, _, _}, Res0),
|
|
|
ok.
|
|
|
|
|
|
-t_action(Config) ->
|
|
|
+t_action(matrix) ->
|
|
|
+ [
|
|
|
+ [plain, async],
|
|
|
+ [plain, sync],
|
|
|
+ [tls, async]
|
|
|
+ ];
|
|
|
+t_action(Config) when is_list(Config) ->
|
|
|
+ QueryMode =
|
|
|
+ case group_path(Config) of
|
|
|
+ [_, QM | _] -> atom_to_binary(QM);
|
|
|
+ _ -> <<"async">>
|
|
|
+ end,
|
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
|
- create_action(Name, Config),
|
|
|
+ create_action(Name, [{query_mode, QueryMode} | Config]),
|
|
|
Actions = emqx_bridge_v2:list(actions),
|
|
|
Any = fun(#{name := BName}) -> BName =:= Name end,
|
|
|
?assert(lists:any(Any, Actions), Actions),
|
|
|
@@ -465,7 +472,9 @@ t_action(Config) ->
|
|
|
|
|
|
%% Tests that deleting/disabling an action that share the same Pulsar topic with other
|
|
|
%% actions do not disturb the latter.
|
|
|
-t_multiple_actions_sharing_topic(Config) ->
|
|
|
+t_multiple_actions_sharing_topic(matrix) ->
|
|
|
+ [[plain], [tls]];
|
|
|
+t_multiple_actions_sharing_topic(Config) when is_list(Config) ->
|
|
|
Type = ?TYPE,
|
|
|
ConnectorName = <<"c">>,
|
|
|
ConnectorConfig = pulsar_connector(Config),
|
|
|
@@ -546,3 +555,31 @@ t_multiple_actions_sharing_topic(Config) ->
|
|
|
[]
|
|
|
),
|
|
|
ok.
|
|
|
+
|
|
|
+t_sync_query_down(matrix) ->
|
|
|
+ [[plain]];
|
|
|
+t_sync_query_down(Config0) when is_list(Config0) ->
|
|
|
+ ct:timetrap({seconds, 15}),
|
|
|
+ Payload = #{<<"x">> => <<"some data">>},
|
|
|
+ PayloadBin = emqx_utils_json:encode(Payload),
|
|
|
+ ClientId = <<"some_client">>,
|
|
|
+ Opts = #{
|
|
|
+ make_message_fn => fun(Topic) -> emqx_message:make(ClientId, Topic, PayloadBin) end,
|
|
|
+ enter_tp_filter =>
|
|
|
+ ?match_event(#{?snk_kind := "pulsar_producer_send"}),
|
|
|
+ error_tp_filter =>
|
|
|
+ ?match_event(#{?snk_kind := "resource_simple_sync_internal_buffer_query_timeout"}),
|
|
|
+ success_tp_filter =>
|
|
|
+ ?match_event(#{?snk_kind := pulsar_echo_consumer_message})
|
|
|
+ },
|
|
|
+ Config = [
|
|
|
+ {connector_type, ?TYPE},
|
|
|
+ {connector_name, ?FUNCTION_NAME},
|
|
|
+ {connector_config, pulsar_connector(Config0)},
|
|
|
+ {action_type, ?TYPE},
|
|
|
+ {action_name, ?FUNCTION_NAME},
|
|
|
+ {action_config, pulsar_action(Config0)}
|
|
|
+ | proplists_with([proxy_name, proxy_host, proxy_port], Config0)
|
|
|
+ ],
|
|
|
+ emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts),
|
|
|
+ ok.
|