|
|
@@ -46,7 +46,8 @@ matrix_testcases() ->
|
|
|
t_start_stop,
|
|
|
t_create_via_http,
|
|
|
t_on_get_status,
|
|
|
- t_sync_query
|
|
|
+ t_sync_query,
|
|
|
+ t_map_to_redis_hset_args
|
|
|
].
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
@@ -133,7 +134,7 @@ common_init_per_testcase(TestCase, Config) ->
|
|
|
Path = group_path(Config),
|
|
|
ct:comment(Path),
|
|
|
ConnectorConfig = connector_config(Name, Path, NConfig),
|
|
|
- BridgeConfig = action_config(Name, Path, Name),
|
|
|
+ BridgeConfig = action_config(Name, Path, Name, TestCase),
|
|
|
ok = snabbkaffe:start_trace(),
|
|
|
[
|
|
|
{connector_type, ?CONNECTOR_TYPE},
|
|
|
@@ -222,7 +223,14 @@ parse_and_check_connector_config(InnerConfigMap, Name) ->
|
|
|
ct:pal("parsed config: ~p", [Config]),
|
|
|
InnerConfigMap.
|
|
|
|
|
|
-action_config(Name, Path, ConnectorId) ->
|
|
|
+action_config(Name, Path, ConnectorId, TestCase) ->
|
|
|
+ Template =
|
|
|
+ try
|
|
|
+ ?MODULE:TestCase(command_template)
|
|
|
+ catch
|
|
|
+ _:_ ->
|
|
|
+ [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>]
|
|
|
+ end,
|
|
|
[RedisType, _Transport | _] = Path,
|
|
|
CommonCfg =
|
|
|
#{
|
|
|
@@ -230,7 +238,7 @@ action_config(Name, Path, ConnectorId) ->
|
|
|
<<"connector">> => ConnectorId,
|
|
|
<<"parameters">> =>
|
|
|
#{
|
|
|
- <<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>],
|
|
|
+ <<"command_template">> => Template,
|
|
|
<<"redis_type">> => atom_to_binary(RedisType)
|
|
|
},
|
|
|
<<"local_topic">> => <<"t/redis">>,
|
|
|
@@ -262,8 +270,11 @@ parse_and_check_bridge_config(InnerConfigMap, Name) ->
|
|
|
emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
|
|
|
|
|
|
make_message() ->
|
|
|
- ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
|
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
|
+ make_message_with_payload(Payload).
|
|
|
+
|
|
|
+make_message_with_payload(Payload) ->
|
|
|
+ ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
|
#{
|
|
|
clientid => ClientId,
|
|
|
payload => Payload,
|
|
|
@@ -290,7 +301,7 @@ t_start_stop(matrix) ->
|
|
|
[sentinel, tcp],
|
|
|
[cluster, tcp]
|
|
|
]};
|
|
|
-t_start_stop(Config) ->
|
|
|
+t_start_stop(Config) when is_list(Config) ->
|
|
|
emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped),
|
|
|
ok.
|
|
|
|
|
|
@@ -300,7 +311,7 @@ t_create_via_http(matrix) ->
|
|
|
[sentinel, tcp],
|
|
|
[cluster, tcp]
|
|
|
]};
|
|
|
-t_create_via_http(Config) ->
|
|
|
+t_create_via_http(Config) when is_list(Config) ->
|
|
|
emqx_bridge_v2_testlib:t_create_via_http(Config),
|
|
|
ok.
|
|
|
|
|
|
@@ -310,7 +321,7 @@ t_on_get_status(matrix) ->
|
|
|
[sentinel, tcp],
|
|
|
[cluster, tcp]
|
|
|
]};
|
|
|
-t_on_get_status(Config) ->
|
|
|
+t_on_get_status(Config) when is_list(Config) ->
|
|
|
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
|
|
|
ok.
|
|
|
|
|
|
@@ -320,7 +331,7 @@ t_sync_query(matrix) ->
|
|
|
[sentinel, tcp],
|
|
|
[cluster, tcp]
|
|
|
]};
|
|
|
-t_sync_query(Config) ->
|
|
|
+t_sync_query(Config) when is_list(Config) ->
|
|
|
ok = emqx_bridge_v2_testlib:t_sync_query(
|
|
|
Config,
|
|
|
fun make_message/0,
|
|
|
@@ -328,3 +339,22 @@ t_sync_query(Config) ->
|
|
|
redis_bridge_connector_send_done
|
|
|
),
|
|
|
ok.
|
|
|
+
|
|
|
+t_map_to_redis_hset_args(matrix) ->
|
|
|
+ {map_to_redis_hset_args, [
|
|
|
+ [single, tcp],
|
|
|
+ [sentinel, tcp],
|
|
|
+ [cluster, tcp]
|
|
|
+ ]};
|
|
|
+t_map_to_redis_hset_args(command_template) ->
|
|
|
+ [<<"HMSET">>, <<"t_map_to_redis_hset_args">>, <<"${payload}">>];
|
|
|
+t_map_to_redis_hset_args(Config) when is_list(Config) ->
|
|
|
+ Payload = emqx_rule_funcs:map_to_redis_hset_args(#{<<"a">> => 1, <<"b">> => <<"2">>}),
|
|
|
+ MsgFn = fun() -> make_message_with_payload(Payload) end,
|
|
|
+ ok = emqx_bridge_v2_testlib:t_sync_query(
|
|
|
+ Config,
|
|
|
+ MsgFn,
|
|
|
+ fun(Res) -> ?assertMatch({ok, _}, Res) end,
|
|
|
+ redis_bridge_connector_send_done
|
|
|
+ ),
|
|
|
+ ok.
|