|
|
@@ -84,6 +84,7 @@ init_per_group(DatalayersType, Config0) when
|
|
|
[
|
|
|
emqx_conf,
|
|
|
emqx_bridge_datalayers,
|
|
|
+ emqx_connector,
|
|
|
emqx_bridge,
|
|
|
emqx_rule_engine,
|
|
|
emqx_management,
|
|
|
@@ -92,8 +93,8 @@ init_per_group(DatalayersType, Config0) when
|
|
|
#{work_dir => emqx_cth_suite:work_dir(Config0)}
|
|
|
),
|
|
|
Config = [{apps, Apps}, {use_tls, UseTLS} | Config0],
|
|
|
- {Name, ConfigString, DatalayersConfig} = datalayers_config(
|
|
|
- apiv1, DatalayersHost, DatalayersPort, Config
|
|
|
+ {Name, ConnConfString, ConnConfMap, ActionConfString, ActionConfMap} = datalayers_config(
|
|
|
+ DatalayersHost, DatalayersPort, Config
|
|
|
),
|
|
|
EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_apiv1">>,
|
|
|
EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin),
|
|
|
@@ -116,16 +117,20 @@ init_per_group(DatalayersType, Config0) when
|
|
|
{proxy_host, ProxyHost},
|
|
|
{proxy_port, ProxyPort},
|
|
|
{proxy_name, ProxyName},
|
|
|
+
|
|
|
{datalayers_host, DatalayersHost},
|
|
|
{datalayers_port, DatalayersPort},
|
|
|
- {datalayers_type, apiv1},
|
|
|
- {datalayers_config, DatalayersConfig},
|
|
|
- {datalayers_config_string, ConfigString},
|
|
|
{ehttpc_pool_name, EHttpcPoolName},
|
|
|
+
|
|
|
{bridge_type, datalayers},
|
|
|
{bridge_name, Name},
|
|
|
- {bridge_config, DatalayersConfig},
|
|
|
- {datalayers_name, Name}
|
|
|
+ {bridge_config, ActionConfMap},
|
|
|
+ {bridge_config_string, ActionConfString},
|
|
|
+
|
|
|
+ {connector_name, Name},
|
|
|
+ {connector_type, datalayers},
|
|
|
+ {connector_config, ConnConfMap},
|
|
|
+ {connector_config_string, ConnConfString}
|
|
|
| Config
|
|
|
],
|
|
|
ensure_database(NewConfig),
|
|
|
@@ -161,7 +166,7 @@ end_per_group(_Group, _Config) ->
|
|
|
|
|
|
init_per_testcase(_Testcase, Config) ->
|
|
|
delete_all_rules(),
|
|
|
- delete_all_bridges(),
|
|
|
+ emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
|
|
Config.
|
|
|
|
|
|
end_per_testcase(_Testcase, Config) ->
|
|
|
@@ -170,7 +175,7 @@ end_per_testcase(_Testcase, Config) ->
|
|
|
ok = snabbkaffe:stop(),
|
|
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
|
delete_all_rules(),
|
|
|
- delete_all_bridges(),
|
|
|
+ emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
|
|
ok.
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -185,26 +190,25 @@ example_write_syntax() ->
|
|
|
"float_value=${payload.float_key},", "undef_value=${payload.undef},",
|
|
|
"${undef_key}=\"hard-coded-value\",", "bool=${payload.bool}">>.
|
|
|
|
|
|
-datalayers_config(apiv1 = Type, DatalayersHost, DatalayersPort, Config) ->
|
|
|
- BatchSize = proplists:get_value(batch_size, Config, 100),
|
|
|
- QueryMode = proplists:get_value(query_mode, Config, sync),
|
|
|
- UseTLS = proplists:get_value(use_tls, Config, false),
|
|
|
+datalayers_config(DatalayersHost, DatalayersPort, Config) ->
|
|
|
Name = atom_to_binary(?MODULE),
|
|
|
- WriteSyntax = example_write_syntax(),
|
|
|
+ {ConnConfString, ConnConfMap} = datalayers_connector_config(
|
|
|
+ Name, DatalayersHost, DatalayersPort, Config
|
|
|
+ ),
|
|
|
+ {ActionConfString, ActionConfMap} = datalayers_action_config(Name, Config),
|
|
|
+ {Name, ConnConfString, ConnConfMap, ActionConfString, ActionConfMap}.
|
|
|
+
|
|
|
+datalayers_connector_config(Name, DatalayersHost, DatalayersPort, Config) ->
|
|
|
+ UseTLS = proplists:get_value(use_tls, Config, false),
|
|
|
ConfigString =
|
|
|
io_lib:format(
|
|
|
- "bridges.datalayers.~s {\n"
|
|
|
+ "connectors.datalayers.~s {\n"
|
|
|
" enable = true\n"
|
|
|
" server = \"~s:~b\"\n"
|
|
|
- " database = mqtt\n"
|
|
|
- " username = admin\n"
|
|
|
- " password = public\n"
|
|
|
- " precision = ns\n"
|
|
|
- " write_syntax = \"~s\"\n"
|
|
|
- " resource_opts = {\n"
|
|
|
- " request_ttl = 1s\n"
|
|
|
- " query_mode = ~s\n"
|
|
|
- " batch_size = ~b\n"
|
|
|
+ " parameters {\n"
|
|
|
+ " database = mqtt\n"
|
|
|
+ " username = admin\n"
|
|
|
+ " password = public\n"
|
|
|
" }\n"
|
|
|
" ssl {\n"
|
|
|
" enable = ~p\n"
|
|
|
@@ -215,46 +219,57 @@ datalayers_config(apiv1 = Type, DatalayersHost, DatalayersPort, Config) ->
|
|
|
Name,
|
|
|
DatalayersHost,
|
|
|
DatalayersPort,
|
|
|
+ UseTLS
|
|
|
+ ]
|
|
|
+ ),
|
|
|
+ {ConfigString, parse_and_check_connector(ConfigString, Name)}.
|
|
|
+
|
|
|
+datalayers_action_config(Name, Config) ->
|
|
|
+ BatchSize = proplists:get_value(batch_size, Config, 100),
|
|
|
+ QueryMode = proplists:get_value(query_mode, Config, sync),
|
|
|
+ WriteSyntax = example_write_syntax(),
|
|
|
+ ConfigString =
|
|
|
+ io_lib:format(
|
|
|
+ "actions.datalayers.~s {\n"
|
|
|
+ " enable = true\n"
|
|
|
+ " connector = ~s\n"
|
|
|
+ " parameters {\n"
|
|
|
+ " write_syntax = \"~s\"\n"
|
|
|
+ " precision = ns\n"
|
|
|
+ " }\n"
|
|
|
+ " resource_opts = {\n"
|
|
|
+ " request_ttl = 15s\n"
|
|
|
+ " query_mode = ~s\n"
|
|
|
+ " batch_size = ~b\n"
|
|
|
+ " }\n"
|
|
|
+ "}\n",
|
|
|
+ [
|
|
|
+ Name,
|
|
|
+ Name,
|
|
|
WriteSyntax,
|
|
|
QueryMode,
|
|
|
- BatchSize,
|
|
|
- UseTLS
|
|
|
+ BatchSize
|
|
|
]
|
|
|
),
|
|
|
- {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}.
|
|
|
+ {ConfigString, parse_and_check_action(ConfigString, Name)}.
|
|
|
|
|
|
-parse_and_check(ConfigString, Type, Name) ->
|
|
|
+parse_and_check_connector(ConfigString, Name) ->
|
|
|
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
|
|
- TypeBin = datalayers_type_bin(Type),
|
|
|
- hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
|
|
- #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
|
|
|
+ hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{required => false, atom_key => false}),
|
|
|
+ #{<<"connectors">> := #{<<"datalayers">> := #{Name := Config}}} = RawConf,
|
|
|
Config.
|
|
|
|
|
|
-datalayers_type_bin(apiv1) ->
|
|
|
- <<"datalayers">>.
|
|
|
+parse_and_check_action(ConfigString, Name) ->
|
|
|
+ {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
|
|
+ hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
|
|
|
+ #{<<"actions">> := #{<<"datalayers">> := #{Name := Config}}} = RawConf,
|
|
|
+ Config.
|
|
|
|
|
|
create_bridge(Config) ->
|
|
|
create_bridge(Config, _Overrides = #{}).
|
|
|
|
|
|
create_bridge(Config, Overrides) ->
|
|
|
- Type = datalayers_type_bin(?config(datalayers_type, Config)),
|
|
|
- Name = ?config(datalayers_name, Config),
|
|
|
- DatalayersConfig0 = ?config(datalayers_config, Config),
|
|
|
- DatalayersConfig = emqx_utils_maps:deep_merge(DatalayersConfig0, Overrides),
|
|
|
- emqx_bridge:create(Type, Name, DatalayersConfig).
|
|
|
-
|
|
|
-delete_bridge(Config) ->
|
|
|
- Type = datalayers_type_bin(?config(datalayers_type, Config)),
|
|
|
- Name = ?config(datalayers_name, Config),
|
|
|
- emqx_bridge:remove(Type, Name).
|
|
|
-
|
|
|
-delete_all_bridges() ->
|
|
|
- lists:foreach(
|
|
|
- fun(#{name := Name, type := Type}) ->
|
|
|
- emqx_bridge:remove(Type, Name)
|
|
|
- end,
|
|
|
- emqx_bridge:list()
|
|
|
- ).
|
|
|
+ emqx_bridge_v2_testlib:create_bridge(Config, Overrides).
|
|
|
|
|
|
delete_all_rules() ->
|
|
|
lists:foreach(
|
|
|
@@ -268,9 +283,9 @@ create_rule_and_action_http(Config) ->
|
|
|
create_rule_and_action_http(Config, _Overrides = #{}).
|
|
|
|
|
|
create_rule_and_action_http(Config, Overrides) ->
|
|
|
- DatalayersName = ?config(datalayers_name, Config),
|
|
|
- Type = datalayers_type_bin(?config(datalayers_type, Config)),
|
|
|
- BridgeId = emqx_bridge_resource:bridge_id(Type, DatalayersName),
|
|
|
+ Name = ?config(bridge_name, Config),
|
|
|
+ Type = ?config(bridge_type, Config),
|
|
|
+ BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
|
|
|
Params0 = #{
|
|
|
enable => true,
|
|
|
sql => <<"SELECT * FROM \"t/topic\"">>,
|
|
|
@@ -285,10 +300,9 @@ create_rule_and_action_http(Config, Overrides) ->
|
|
|
end.
|
|
|
|
|
|
send_message(Config, Payload) ->
|
|
|
- Name = ?config(datalayers_name, Config),
|
|
|
- Type = datalayers_type_bin(?config(datalayers_type, Config)),
|
|
|
- BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
|
|
|
- emqx_bridge:send_message(BridgeId, Payload).
|
|
|
+ Type = ?config(bridge_type, Config),
|
|
|
+ Name = ?config(bridge_name, Config),
|
|
|
+ emqx_bridge_v2:send_message(Type, Name, Payload, #{}).
|
|
|
|
|
|
query_by_clientid(Table, ClientId, Config) ->
|
|
|
SQL = <<"SELECT * FROM ", Table/binary, " WHERE clientid = '", ClientId/binary, "'">>,
|
|
|
@@ -383,10 +397,9 @@ assert_persisted_data(ClientId, Expected, PersistedData) ->
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
-resource_id(Config) ->
|
|
|
- Type = datalayers_type_bin(?config(datalayers_type, Config)),
|
|
|
- Name = ?config(datalayers_name, Config),
|
|
|
- emqx_bridge_resource:resource_id(Type, Name).
|
|
|
+connector_id(Config) ->
|
|
|
+ Name = ?config(connector_name, Config),
|
|
|
+ emqx_connector_resource:resource_id(<<"datalayers">>, Name).
|
|
|
|
|
|
ensure_database(Config) ->
|
|
|
SQL = <<"create database if not exists mqtt">>,
|
|
|
@@ -461,28 +474,24 @@ t_start_ok(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_start_stop(Config) ->
|
|
|
- ok = emqx_bridge_testlib:t_start_stop(Config, datalayers_client_stopped),
|
|
|
+ ok = emqx_bridge_v2_testlib:t_start_stop(Config, datalayers_client_stopped),
|
|
|
ok.
|
|
|
|
|
|
t_start_already_started(Config) ->
|
|
|
- Type = datalayers_type_bin(?config(datalayers_type, Config)),
|
|
|
- Name = ?config(datalayers_name, Config),
|
|
|
- DatalayersConfigString = ?config(datalayers_config_string, Config),
|
|
|
?assertMatch(
|
|
|
{ok, _},
|
|
|
create_bridge(Config)
|
|
|
),
|
|
|
- ResourceId = resource_id(Config),
|
|
|
- TypeAtom = binary_to_atom(Type),
|
|
|
- NameAtom = binary_to_atom(Name),
|
|
|
- {ok, #{bridges := #{TypeAtom := #{NameAtom := DatalayersConfigMap}}}} = emqx_hocon:check(
|
|
|
- emqx_bridge_schema, DatalayersConfigString
|
|
|
- ),
|
|
|
- ConnConfigMap = emqx_bridge_datalayers_connector:transform_bridge_v1_config_to_connector_config(
|
|
|
- DatalayersConfigMap
|
|
|
+
|
|
|
+ ConnectorId = connector_id(Config),
|
|
|
+ NameAtom = binary_to_atom(?config(connector_name, Config)),
|
|
|
+ {ok, ConnectorsConf} = emqx_hocon:check(
|
|
|
+ emqx_connector_schema, ?config(connector_config_string, Config)
|
|
|
),
|
|
|
+ ConnConfigMap = emqx_utils_maps:deep_get([connectors, datalayers, NameAtom], ConnectorsConf),
|
|
|
+
|
|
|
?check_trace(
|
|
|
- emqx_bridge_datalayers_connector:on_start(ResourceId, ConnConfigMap),
|
|
|
+ emqx_bridge_datalayers_connector:on_start(ConnectorId, ConnConfigMap),
|
|
|
fun(Result, Trace) ->
|
|
|
?assertMatch({ok, _}, Result),
|
|
|
?assertMatch([_], ?of_kind(datalayers_connector_start_already_started, Trace)),
|
|
|
@@ -492,13 +501,8 @@ t_start_already_started(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_start_ok_timestamp_write_syntax(Config) ->
|
|
|
- DatalayersType = ?config(datalayers_type, Config),
|
|
|
- DatalayersName = ?config(datalayers_name, Config),
|
|
|
- DatalayersConfigString0 = ?config(datalayers_config_string, Config),
|
|
|
- DatalayersTypeCfg =
|
|
|
- case DatalayersType of
|
|
|
- apiv1 -> "datalayers"
|
|
|
- end,
|
|
|
+ DatalayersName = ?config(bridge_name, Config),
|
|
|
+ DatalayersConfigString0 = ?config(bridge_config_string, Config),
|
|
|
WriteSyntax =
|
|
|
%% N.B.: this single space characters are relevant
|
|
|
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
|
|
|
@@ -508,17 +512,23 @@ t_start_ok_timestamp_write_syntax(Config) ->
|
|
|
%% append this to override the config
|
|
|
DatalayersConfigString1 =
|
|
|
io_lib:format(
|
|
|
- "bridges.~s.~s {\n"
|
|
|
- " write_syntax = \"~s\"\n"
|
|
|
+ "actions.datalayers.~s {\n"
|
|
|
+ " parameters {\n"
|
|
|
+ " write_syntax = \"~s\"\n"
|
|
|
+ " }\n"
|
|
|
"}\n",
|
|
|
- [DatalayersTypeCfg, DatalayersName, WriteSyntax]
|
|
|
+ [DatalayersName, WriteSyntax]
|
|
|
),
|
|
|
- DatalayersConfig1 = parse_and_check(
|
|
|
+ DatalayersConfig1 = parse_and_check_action(
|
|
|
DatalayersConfigString0 ++ DatalayersConfigString1,
|
|
|
- DatalayersType,
|
|
|
DatalayersName
|
|
|
),
|
|
|
- Config1 = [{datalayers_config, DatalayersConfig1} | Config],
|
|
|
+ Config1 = lists:keyreplace(
|
|
|
+ bridge_config,
|
|
|
+ 1,
|
|
|
+ Config,
|
|
|
+ {bridge_config, DatalayersConfig1}
|
|
|
+ ),
|
|
|
?assertMatch(
|
|
|
{ok, _},
|
|
|
create_bridge(Config1)
|
|
|
@@ -526,13 +536,8 @@ t_start_ok_timestamp_write_syntax(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_start_ok_no_subject_tags_write_syntax(Config) ->
|
|
|
- DatalayersType = ?config(datalayers_type, Config),
|
|
|
- DatalayersName = ?config(datalayers_name, Config),
|
|
|
- DatalayersConfigString0 = ?config(datalayers_config_string, Config),
|
|
|
- DatalayersTypeCfg =
|
|
|
- case DatalayersType of
|
|
|
- apiv1 -> "datalayers"
|
|
|
- end,
|
|
|
+ DatalayersName = ?config(bridge_name, Config),
|
|
|
+ DatalayersConfigString0 = ?config(bridge_config_string, Config),
|
|
|
WriteSyntax =
|
|
|
%% N.B.: this single space characters are relevant
|
|
|
<<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,",
|
|
|
@@ -541,17 +546,23 @@ t_start_ok_no_subject_tags_write_syntax(Config) ->
|
|
|
%% append this to override the config
|
|
|
DatalayersConfigString1 =
|
|
|
io_lib:format(
|
|
|
- "bridges.~s.~s {\n"
|
|
|
- " write_syntax = \"~s\"\n"
|
|
|
+ "actions.datalayers.~s {\n"
|
|
|
+ " parameters {\n"
|
|
|
+ " write_syntax = \"~s\"\n"
|
|
|
+ " }\n"
|
|
|
"}\n",
|
|
|
- [DatalayersTypeCfg, DatalayersName, WriteSyntax]
|
|
|
+ [DatalayersName, WriteSyntax]
|
|
|
),
|
|
|
- DatalayersConfig1 = parse_and_check(
|
|
|
+ DatalayersConfig1 = parse_and_check_action(
|
|
|
DatalayersConfigString0 ++ DatalayersConfigString1,
|
|
|
- DatalayersType,
|
|
|
DatalayersName
|
|
|
),
|
|
|
- Config1 = [{datalayers_config, DatalayersConfig1} | Config],
|
|
|
+ Config1 = lists:keyreplace(
|
|
|
+ bridge_config,
|
|
|
+ 1,
|
|
|
+ Config,
|
|
|
+ {bridge_config, DatalayersConfig1}
|
|
|
+ ),
|
|
|
?assertMatch(
|
|
|
{ok, _},
|
|
|
create_bridge(Config1)
|
|
|
@@ -570,16 +581,18 @@ t_const_timestamp(Config) ->
|
|
|
create_bridge(
|
|
|
Config,
|
|
|
#{
|
|
|
- <<"write_syntax">> =>
|
|
|
- <<
|
|
|
- "mqtt,clientid=${clientid} "
|
|
|
- "foo=${payload.foo}i,"
|
|
|
- "foo1=${payload.foo},"
|
|
|
- "foo2=\"${payload.foo}\","
|
|
|
- "foo3=\"${payload.foo}somestr\","
|
|
|
- "bar=5i,baz0=1.1,baz1=\"a\",baz2=\"ai\",baz3=\"au\",baz4=\"1u\" ",
|
|
|
- ConstBin/binary
|
|
|
- >>
|
|
|
+ <<"parameters">> => #{
|
|
|
+ <<"write_syntax">> =>
|
|
|
+ <<
|
|
|
+ "mqtt,clientid=${clientid} "
|
|
|
+ "foo=${payload.foo}i,"
|
|
|
+ "foo1=${payload.foo},"
|
|
|
+ "foo2=\"${payload.foo}\","
|
|
|
+ "foo3=\"${payload.foo}somestr\","
|
|
|
+ "bar=5i,baz0=1.1,baz1=\"a\",baz2=\"ai\",baz3=\"au\",baz4=\"1u\" ",
|
|
|
+ ConstBin/binary
|
|
|
+ >>
|
|
|
+ }
|
|
|
}
|
|
|
)
|
|
|
),
|
|
|
@@ -694,10 +707,16 @@ t_any_num_as_float(Config) ->
|
|
|
create_bridge(
|
|
|
Config,
|
|
|
#{
|
|
|
- <<"write_syntax">> =>
|
|
|
- <<"mqtt,clientid=${clientid}", " ",
|
|
|
- "float_no_dp=${payload.float_no_dp},float_dp=${payload.float_dp},bar=5i ",
|
|
|
- ConstBin/binary>>
|
|
|
+ <<"parameters">> => #{
|
|
|
+ <<"write_syntax">> =>
|
|
|
+ <<
|
|
|
+ "mqtt,clientid=${clientid}",
|
|
|
+ " ",
|
|
|
+ "float_no_dp=${payload.float_no_dp},"
|
|
|
+ "float_dp=${payload.float_dp},bar=5i ",
|
|
|
+ ConstBin/binary
|
|
|
+ >>
|
|
|
+ }
|
|
|
}
|
|
|
)
|
|
|
),
|
|
|
@@ -742,11 +761,13 @@ t_tag_set_use_literal_value(Config) ->
|
|
|
create_bridge(
|
|
|
Config,
|
|
|
#{
|
|
|
- <<"write_syntax">> =>
|
|
|
- <<"mqtt,clientid=${clientid},tag_key1=100,tag_key2=123.4,tag_key3=66i,tag_key4=${payload.float_dp}",
|
|
|
- " ",
|
|
|
- "field_key1=100.1,field_key2=100i,field_key3=${payload.float_dp},bar=5i",
|
|
|
- " ", ConstBin/binary>>
|
|
|
+ <<"parameters">> => #{
|
|
|
+ <<"write_syntax">> =>
|
|
|
+ <<"mqtt,clientid=${clientid},tag_key1=100,tag_key2=123.4,tag_key3=66i,tag_key4=${payload.float_dp}",
|
|
|
+ " ",
|
|
|
+ "field_key1=100.1,field_key2=100i,field_key3=${payload.float_dp},bar=5i",
|
|
|
+ " ", ConstBin/binary>>
|
|
|
+ }
|
|
|
}
|
|
|
)
|
|
|
),
|
|
|
@@ -778,38 +799,40 @@ t_tag_set_use_literal_value(Config) ->
|
|
|
?assertEqual(TsStr, TimeReturned).
|
|
|
|
|
|
t_bad_timestamp(Config) ->
|
|
|
- DatalayersType = ?config(datalayers_type, Config),
|
|
|
- DatalayersName = ?config(datalayers_name, Config),
|
|
|
+ ActionName = ?config(bridge_name, Config),
|
|
|
QueryMode = ?config(query_mode, Config),
|
|
|
BatchSize = ?config(batch_size, Config),
|
|
|
- DatalayersConfigString0 = ?config(datalayers_config_string, Config),
|
|
|
- DatalayersTypeCfg =
|
|
|
- case DatalayersType of
|
|
|
- apiv1 -> "datalayers"
|
|
|
- end,
|
|
|
+ ActionConfigString0 = ?config(bridge_config_string, Config),
|
|
|
WriteSyntax =
|
|
|
%% N.B.: this single space characters are relevant
|
|
|
<<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,",
|
|
|
"uint_value=${payload.uint_key}u,"
|
|
|
"bool=${payload.bool}", " ", "bad_timestamp">>,
|
|
|
%% append this to override the config
|
|
|
- DatalayersConfigString1 =
|
|
|
+ ActionConfigString1 =
|
|
|
io_lib:format(
|
|
|
- "bridges.~s.~s {\n"
|
|
|
- " write_syntax = \"~s\"\n"
|
|
|
+ "actions.datalayers.~s {\n"
|
|
|
+ " parameters {\n"
|
|
|
+ " write_syntax = \"~s\"\n"
|
|
|
+ " }\n"
|
|
|
"}\n",
|
|
|
- [DatalayersTypeCfg, DatalayersName, WriteSyntax]
|
|
|
+ [ActionName, WriteSyntax]
|
|
|
),
|
|
|
- DatalayersConfig1 = parse_and_check(
|
|
|
- DatalayersConfigString0 ++ DatalayersConfigString1,
|
|
|
- DatalayersType,
|
|
|
- DatalayersName
|
|
|
+ ActionConfig1 = parse_and_check_action(
|
|
|
+ ActionConfigString0 ++ ActionConfigString1,
|
|
|
+ ActionName
|
|
|
+ ),
|
|
|
+ Config1 = lists:keystore(
|
|
|
+ bridge_config,
|
|
|
+ 1,
|
|
|
+ Config,
|
|
|
+ {bridge_config, ActionConfig1}
|
|
|
),
|
|
|
- Config1 = [{datalayers_config, DatalayersConfig1} | Config],
|
|
|
?assertMatch(
|
|
|
{ok, _},
|
|
|
create_bridge(Config1)
|
|
|
),
|
|
|
+
|
|
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
|
Payload = #{
|
|
|
int_key => -123,
|
|
|
@@ -871,10 +894,10 @@ t_get_status(Config) ->
|
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
|
ProxyName = ?config(proxy_name, Config),
|
|
|
{ok, _} = create_bridge(Config),
|
|
|
- ResourceId = resource_id(Config),
|
|
|
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
|
|
|
+ ConnectorId = connector_id(Config),
|
|
|
+ ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ConnectorId)),
|
|
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
|
- ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
+ ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId))
|
|
|
end),
|
|
|
ok.
|
|
|
|
|
|
@@ -1022,8 +1045,10 @@ t_missing_field(Config) ->
|
|
|
create_bridge(
|
|
|
Config,
|
|
|
#{
|
|
|
- <<"resource_opts">> => #{<<"worker_pool_size">> => 1},
|
|
|
- <<"write_syntax">> => <<"${clientid} foo=${foo}i">>
|
|
|
+ <<"parameters">> => #{
|
|
|
+ <<"write_syntax">> => <<"${clientid} foo=${foo}i">>
|
|
|
+ },
|
|
|
+ <<"resource_opts">> => #{<<"worker_pool_size">> => 1}
|
|
|
}
|
|
|
),
|
|
|
%% note: we don't select foo here, but we interpolate it in the
|
|
|
@@ -1073,13 +1098,12 @@ t_missing_field(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_authentication_error(Config0) ->
|
|
|
- DatalayersType = ?config(datalayers_type, Config0),
|
|
|
- DatalayersConfig0 = proplists:get_value(datalayers_config, Config0),
|
|
|
- DatalayersConfig =
|
|
|
- case DatalayersType of
|
|
|
- apiv1 -> DatalayersConfig0#{<<"password">> => <<"wrong_password">>}
|
|
|
- end,
|
|
|
- Config = lists:keyreplace(datalayers_config, 1, Config0, {datalayers_config, DatalayersConfig}),
|
|
|
+ DatalayersConfig0 = proplists:get_value(connector_config, Config0),
|
|
|
+ DatalayersConfig = emqx_utils_maps:deep_merge(
|
|
|
+ DatalayersConfig0,
|
|
|
+ #{<<"parameters">> => #{<<"password">> => <<"wrong_password">>}}
|
|
|
+ ),
|
|
|
+ Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, DatalayersConfig}),
|
|
|
?check_trace(
|
|
|
begin
|
|
|
?wait_async_action(
|
|
|
@@ -1107,17 +1131,16 @@ t_authentication_error_on_get_status(Config0) ->
|
|
|
ok
|
|
|
end,
|
|
|
fun() ->
|
|
|
- DatalayersType = ?config(datalayers_type, Config0),
|
|
|
- DatalayersConfig0 = proplists:get_value(datalayers_config, Config0),
|
|
|
- DatalayersConfig =
|
|
|
- case DatalayersType of
|
|
|
- apiv1 -> DatalayersConfig0#{<<"password">> => <<"wrong_password">>}
|
|
|
- end,
|
|
|
+ DatalayersConfig0 = proplists:get_value(connector_config, Config0),
|
|
|
+ DatalayersConfig = emqx_utils_maps:deep_merge(
|
|
|
+ DatalayersConfig0,
|
|
|
+ #{<<"parameters">> => #{<<"password">> => <<"wrong_password">>}}
|
|
|
+ ),
|
|
|
Config = lists:keyreplace(
|
|
|
- datalayers_config, 1, Config0, {datalayers_config, DatalayersConfig}
|
|
|
+ connector_config, 1, Config0, {connector_config, DatalayersConfig}
|
|
|
),
|
|
|
{ok, _} = create_bridge(Config),
|
|
|
- ResourceId = resource_id(Config0),
|
|
|
+ ResourceId = connector_id(Config0),
|
|
|
?retry(
|
|
|
_Sleep = 1_000,
|
|
|
_Attempts = 10,
|
|
|
@@ -1132,13 +1155,12 @@ t_authentication_error_on_get_status(Config0) ->
|
|
|
|
|
|
t_authentication_error_on_send_message(Config0) ->
|
|
|
QueryMode = proplists:get_value(query_mode, Config0, sync),
|
|
|
- DatalayersType = ?config(datalayers_type, Config0),
|
|
|
- DatalayersConfig0 = proplists:get_value(datalayers_config, Config0),
|
|
|
- DatalayersConfig =
|
|
|
- case DatalayersType of
|
|
|
- apiv1 -> DatalayersConfig0#{<<"password">> => <<"wrong_password">>}
|
|
|
- end,
|
|
|
- Config = lists:keyreplace(datalayers_config, 1, Config0, {datalayers_config, DatalayersConfig}),
|
|
|
+ DatalayersConfig0 = proplists:get_value(connector_config, Config0),
|
|
|
+ DatalayersConfig = emqx_utils_maps:deep_merge(
|
|
|
+ DatalayersConfig0,
|
|
|
+ #{<<"parameters">> => #{<<"password">> => <<"wrong_password">>}}
|
|
|
+ ),
|
|
|
+ Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, DatalayersConfig}),
|
|
|
|
|
|
% Fake initialization to simulate credential update after bridge was created.
|
|
|
emqx_common_test_helpers:with_mock(
|
|
|
@@ -1149,7 +1171,7 @@ t_authentication_error_on_send_message(Config0) ->
|
|
|
end,
|
|
|
fun() ->
|
|
|
{ok, _} = create_bridge(Config),
|
|
|
- ResourceId = resource_id(Config),
|
|
|
+ ResourceId = connector_id(Config),
|
|
|
?retry(
|
|
|
_Sleep = 1_000,
|
|
|
_Attempts = 10,
|