Browse Source

feat(opents): improve the OpentsDB bridge to v2 style

firest 2 năm trước cách đây
mục cha
commit
e337e1dc40

+ 2 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -98,7 +98,8 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_timescale_action_info,
         emqx_bridge_redis_action_info,
         emqx_bridge_iotdb_action_info,
-        emqx_bridge_es_action_info
+        emqx_bridge_es_action_info,
+        emqx_bridge_opents_action_info
     ].
 -else.
 hard_coded_action_info_modules_ee() ->

+ 121 - 7
apps/emqx_bridge_opents/src/emqx_bridge_opents.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 -module(emqx_bridge_opents).
 
@@ -7,10 +7,12 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1, ref/2, array/1]).
 
 -export([
-    conn_bridge_examples/1
+    conn_bridge_examples/1,
+    bridge_v2_examples/1,
+    default_data_template/0
 ]).
 
 -export([
@@ -20,8 +22,11 @@
     desc/1
 ]).
 
+-define(CONNECTOR_TYPE, opents).
+-define(ACTION_TYPE, ?CONNECTOR_TYPE).
+
 %% -------------------------------------------------------------------------------------------------
-%% api
+%% v1 examples
 conn_bridge_examples(Method) ->
     [
         #{
@@ -34,7 +39,7 @@ conn_bridge_examples(Method) ->
 
 values(_Method) ->
     #{
-        enable => true,
+        enabledb => true,
         type => opents,
         name => <<"foo">>,
         server => <<"http://127.0.0.1:4242">>,
@@ -50,7 +55,37 @@ values(_Method) ->
     }.
 
 %% -------------------------------------------------------------------------------------------------
-%% Hocon Schema Definitions
+%% v2 examples
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"opents">> => #{
+                summary => <<"OpenTSDB Action">>,
+                value => emqx_bridge_v2_schema:action_values(
+                    Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
+                )
+            }
+        }
+    ].
+
+action_values() ->
+    #{
+        parameters => #{
+            data => default_data_template()
+        }
+    }.
+
+default_data_template() ->
+    [
+        #{
+            metric => <<"${metric}">>,
+            tags => <<"${tags}">>,
+            value => <<"${value}">>
+        }
+    ].
+
+%% -------------------------------------------------------------------------------------------------
+%% V1 Schema Definitions
 namespace() -> "bridge_opents".
 
 roots() -> [].
@@ -65,10 +100,89 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:status_fields() ++ fields("post").
+    emqx_bridge_schema:status_fields() ++ fields("post");
+%% -------------------------------------------------------------------------------------------------
+%% V2 Schema Definitions
+
+fields(action) ->
+    {opents,
+        mk(
+            hoconsc:map(name, ref(?MODULE, action_config)),
+            #{
+                desc => <<"OpenTSDB Action Config">>,
+                required => false
+            }
+        )};
+fields(action_config) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        mk(
+            ref(?MODULE, action_parameters),
+            #{
+                required => true, desc => ?DESC("action_parameters")
+            }
+        )
+    );
+fields(action_parameters) ->
+    [
+        {data,
+            mk(
+                array(ref(?MODULE, action_parameters_data)),
+                #{
+                    desc => ?DESC("action_parameters_data"),
+                    default => <<"[]">>
+                }
+            )}
+    ];
+fields(action_parameters_data) ->
+    [
+        {timestamp,
+            mk(
+                binary(),
+                #{
+                    desc => ?DESC("config_parameters_timestamp"),
+                    required => false
+                }
+            )},
+        {metric,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_metric")
+                }
+            )},
+        {tags,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_tags")
+                }
+            )},
+        {value,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_value")
+                }
+            )}
+    ];
+fields("post_bridge_v2") ->
+    emqx_bridge_schema:type_and_name_fields(enum([opents])) ++ fields(action_config);
+fields("put_bridge_v2") ->
+    fields(action_config);
+fields("get_bridge_v2") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2").
 
 desc("config") ->
     ?DESC("desc_config");
+desc(action_config) ->
+    ?DESC("desc_config");
+desc(action_parameters) ->
+    ?DESC("action_parameters");
+desc(action_parameters_data) ->
+    ?DESC("action_parameters_data");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
     ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."];
 desc(_) ->

+ 71 - 0
apps/emqx_bridge_opents/src/emqx_bridge_opents_action_info.erl

@@ -0,0 +1,71 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_opents_action_info).
+
+-behaviour(emqx_action_info).
+
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
+
+%% behaviour callbacks
+-export([
+    action_type_name/0,
+    bridge_v1_config_to_action_config/2,
+    bridge_v1_config_to_connector_config/1,
+    bridge_v1_type_name/0,
+    connector_action_config_to_bridge_v1_config/2,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-import(emqx_utils_conv, [bin/1]).
+
+-define(ACTION_TYPE, opents).
+-define(SCHEMA_MODULE, emqx_bridge_opents).
+
+action_type_name() -> ?ACTION_TYPE.
+bridge_v1_type_name() -> ?ACTION_TYPE.
+connector_type_name() -> ?ACTION_TYPE.
+
+schema_module() -> ?SCHEMA_MODULE.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    MergedConfig =
+        emqx_utils_maps:deep_merge(
+            maps:without(
+                [<<"description">>, <<"local_topic">>, <<"connector">>, <<"data">>],
+                emqx_utils_maps:unindent(<<"parameters">>, ActionConfig)
+            ),
+            ConnectorConfig
+        ),
+    BridgeV1Keys = schema_keys("config"),
+    maps:with(BridgeV1Keys, MergedConfig).
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    ActionTopLevelKeys = schema_keys(action_config),
+    ActionParametersKeys = schema_keys(action_parameters),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
+        ActionConfig#{<<"connector">> => ConnectorName}
+    ).
+
+bridge_v1_config_to_connector_config(BridgeV1Config) ->
+    ConnectorKeys = schema_keys(emqx_bridge_opents_connector, "config_connector"),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_connector_schema:project_to_connector_resource_opts/1,
+        maps:with(ConnectorKeys, BridgeV1Config)
+    ).
+
+make_config_map(PickKeys, IndentKeys, Config) ->
+    Conf0 = maps:with(PickKeys, Config#{<<"data">> => []}),
+    emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
+
+schema_keys(Name) ->
+    schema_keys(?SCHEMA_MODULE, Name).
+
+schema_keys(Mod, Name) ->
+    [bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))].

+ 170 - 24
apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
 -module(emqx_bridge_opents_connector).
@@ -12,7 +12,7 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
--export([roots/0, fields/1]).
+-export([namespace/0, roots/0, fields/1, desc/1]).
 
 %% `emqx_resource' API
 -export([
@@ -21,15 +21,25 @@
     on_stop/2,
     on_query/3,
     on_batch_query/3,
-    on_get_status/2
+    on_get_status/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
 ]).
 
+-export([connector_examples/1]).
+
 -export([connect/1]).
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
+-define(CONNECTOR_TYPE, opents).
+
+namespace() -> "opents_connector".
+
 %%=====================================================================
-%% Hocon schema
+%% V1 Hocon schema
 roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
@@ -40,8 +50,56 @@ fields(config) ->
         {summary, mk(boolean(), #{default => true, desc => ?DESC("summary")})},
         {details, mk(boolean(), #{default => false, desc => ?DESC("details")})},
         {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
+    ];
+%%=====================================================================
+%% V2 Hocon schema
+
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        proplists_without([auto_reconnect], fields(config));
+fields("post") ->
+    emqx_connector_schema:type_and_name_fields(enum([opents])) ++ fields("config_connector");
+fields("put") ->
+    fields("config_connector");
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("post").
+
+desc(config) ->
+    ?DESC("desc_config");
+desc("config_connector") ->
+    ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
+desc(_) ->
+    undefined.
+
+proplists_without(Keys, List) ->
+    [El || El = {K, _} <- List, not lists:member(K, Keys)].
+
+%%=====================================================================
+%% V2 examples
+connector_examples(Method) ->
+    [
+        #{
+            <<"opents">> =>
+                #{
+                    summary => <<"OpenTSDB Connector">>,
+                    value => emqx_connector_schema:connector_values(
+                        Method, ?CONNECTOR_TYPE, connector_example_values()
+                    )
+                }
+        }
     ].
 
+connector_example_values() ->
+    #{
+        name => <<"opents_connector">>,
+        type => opents,
+        enable => true,
+        server => <<"http://localhost:4242/">>,
+        pool_size => 8
+    }.
+
 %%========================================================================================
 %% `emqx_resource' API
 %%========================================================================================
@@ -56,8 +114,7 @@ on_start(
         server := Server,
         pool_size := PoolSize,
         summary := Summary,
-        details := Details,
-        resource_opts := #{batch_size := BatchSize}
+        details := Details
     } = Config
 ) ->
     ?SLOG(info, #{
@@ -70,11 +127,10 @@ on_start(
         {server, to_str(Server)},
         {summary, Summary},
         {details, Details},
-        {max_batch_size, BatchSize},
         {pool_size, PoolSize}
     ],
 
-    State = #{pool_name => InstanceId, server => Server},
+    State = #{pool_name => InstanceId, server => Server, channels => #{}},
     case opentsdb_connectivity(Server) of
         ok ->
             case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
@@ -93,6 +149,7 @@ on_stop(InstanceId, _State) ->
         msg => "stopping_opents_connector",
         connector => InstanceId
     }),
+    ?tp(opents_bridge_stopped, #{instance_id => InstanceId}),
     emqx_resource_pool:stop(InstanceId).
 
 on_query(InstanceId, Request, State) ->
@@ -101,10 +158,14 @@ on_query(InstanceId, Request, State) ->
 on_batch_query(
     InstanceId,
     BatchReq,
-    State
+    #{channels := Channels} = State
 ) ->
-    Datas = [format_opentsdb_msg(Msg) || {_Key, Msg} <- BatchReq],
-    do_query(InstanceId, Datas, State).
+    case try_render_messages(BatchReq, Channels) of
+        {ok, Datas} ->
+            do_query(InstanceId, Datas, State);
+        Error ->
+            Error
+    end.
 
 on_get_status(_InstanceId, #{server := Server}) ->
     Result =
@@ -117,6 +178,39 @@ on_get_status(_InstanceId, #{server := Server}) ->
         end,
     Result.
 
+on_add_channel(
+    _InstanceId,
+    #{channels := Channels} = OldState,
+    ChannelId,
+    #{
+        parameters := #{data := Data} = Parameter
+    }
+) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            {error, already_exists};
+        _ ->
+            Channel = Parameter#{
+                data := preproc_data_template(Data)
+            },
+            Channels2 = Channels#{ChannelId => Channel},
+            {ok, OldState#{channels := Channels2}}
+    end.
+
+on_remove_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId) ->
+    {ok, OldState#{channels => maps:remove(ChannelId, Channels)}}.
+
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
+on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            on_get_status(InstanceId, State);
+        _ ->
+            {error, not_exists}
+    end.
+
 %%========================================================================================
 %% Helper fns
 %%========================================================================================
@@ -127,6 +221,9 @@ do_query(InstanceId, Query, #{pool_name := PoolName} = State) ->
         "opents_connector_received",
         #{connector => InstanceId, query => Query, state => State}
     ),
+
+    ?tp(opents_bridge_on_query, #{instance_id => InstanceId}),
+
     Result = ecpool:pick_and_do(PoolName, {opentsdb, put, [Query]}, no_handover),
 
     case Result of
@@ -172,17 +269,66 @@ opentsdb_connectivity(Server) ->
         end,
     emqx_connector_lib:http_connectivity(SvrUrl, ?HTTP_CONNECT_TIMEOUT).
 
-format_opentsdb_msg(Msg) ->
-    maps:with(
-        [
-            timestamp,
-            metric,
-            tags,
-            value,
-            <<"timestamp">>,
-            <<"metric">>,
-            <<"tags">>,
-            <<"value">>
-        ],
-        Msg
+try_render_messages([{ChannelId, _} | _] = BatchReq, Channels) ->
+    case maps:find(ChannelId, Channels) of
+        {ok, Channel} ->
+            {ok,
+                lists:foldl(
+                    fun({_, Message}, Acc) ->
+                        render_channel_message(Message, Channel, Acc)
+                    end,
+                    [],
+                    BatchReq
+                )};
+        _ ->
+            {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
+    end.
+
+render_channel_message(Msg, #{data := DataList}, Acc) ->
+    RawOpts = #{return => rawlist, var_trans => fun(X) -> X end},
+    lists:foldl(
+        fun(#{metric := MetricTk, tags := TagsTk, value := ValueTk} = Data, InAcc) ->
+            MetricVal = emqx_placeholder:proc_tmpl(MetricTk, Msg),
+            TagsVal =
+                case emqx_placeholder:proc_tmpl(TagsTk, Msg, RawOpts) of
+                    [undefined] ->
+                        #{};
+                    [Any] ->
+                        Any
+                end,
+            ValueVal =
+                case ValueTk of
+                    [_] ->
+                        erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts));
+                    _ ->
+                        emqx_placeholder:proc_tmpl(ValueTk, Msg)
+                end,
+            Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal},
+            [
+                case maps:get(timestamp, Data, undefined) of
+                    undefined ->
+                        Base;
+                    TimestampTk ->
+                        Base#{timestamp => emqx_placeholder:proc_tmpl(TimestampTk, Msg)}
+                end
+                | InAcc
+            ]
+        end,
+        Acc,
+        DataList
+    ).
+
+preproc_data_template([]) ->
+    preproc_data_template(emqx_bridge_opents:default_data_template());
+preproc_data_template(DataList) ->
+    lists:map(
+        fun(Data) ->
+            maps:map(
+                fun(_Key, Value) ->
+                    emqx_placeholder:preproc_tmpl(Value)
+                end,
+                Data
+            )
+        end,
+        DataList
     ).

+ 190 - 300
apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
 -module(emqx_bridge_opents_SUITE).
@@ -12,7 +12,8 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 % DB defaults
--define(BATCH_SIZE, 10).
+-define(BRIDGE_TYPE_BIN, <<"opents">>).
+-define(APPS, [opentsdb, emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_opents_SUITE]).
 
 %%------------------------------------------------------------------------------
 %% CT boilerplate
@@ -20,95 +21,34 @@
 
 all() ->
     [
-        {group, with_batch},
-        {group, without_batch}
+        {group, default}
     ].
 
 groups() ->
-    TCs = emqx_common_test_helpers:all(?MODULE),
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
     [
-        {with_batch, TCs},
-        {without_batch, TCs}
+        {default, AllTCs}
     ].
 
-init_per_group(with_batch, Config0) ->
-    Config = [{batch_size, ?BATCH_SIZE} | Config0],
-    common_init(Config);
-init_per_group(without_batch, Config0) ->
-    Config = [{batch_size, 1} | Config0],
-    common_init(Config);
-init_per_group(_Group, Config) ->
-    Config.
-
-end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
-    ProxyHost = ?config(proxy_host, Config),
-    ProxyPort = ?config(proxy_port, Config),
-    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-    ok;
-end_per_group(_Group, _Config) ->
-    ok.
-
 init_per_suite(Config) ->
-    Config.
-
-end_per_suite(_Config) ->
-    emqx_mgmt_api_test_util:end_suite(),
-    ok = emqx_common_test_helpers:stop_apps([opentsdb, emqx_bridge, emqx_resource, emqx_conf]),
-    ok.
-
-init_per_testcase(_Testcase, Config) ->
-    delete_bridge(Config),
-    snabbkaffe:start_trace(),
-    Config.
-
-end_per_testcase(_Testcase, Config) ->
-    ProxyHost = ?config(proxy_host, Config),
-    ProxyPort = ?config(proxy_port, Config),
-    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-    ok = snabbkaffe:stop(),
-    delete_bridge(Config),
-    ok.
+    emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS).
 
-%%------------------------------------------------------------------------------
-%% Helper fns
-%%------------------------------------------------------------------------------
+end_per_suite(Config) ->
+    emqx_bridge_v2_testlib:end_per_suite(Config).
 
-common_init(ConfigT) ->
-    Host = os:getenv("OPENTS_HOST", "toxiproxy"),
+init_per_group(default, Config0) ->
+    Host = os:getenv("OPENTS_HOST", "toxiproxy.emqx.net"),
     Port = list_to_integer(os:getenv("OPENTS_PORT", "4242")),
-
-    Config0 = [
-        {opents_host, Host},
-        {opents_port, Port},
-        {proxy_name, "opents"}
-        | ConfigT
-    ],
-
-    BridgeType = proplists:get_value(bridge_type, Config0, <<"opents">>),
+    ProxyName = "opents",
     case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
         true ->
-            % Setup toxiproxy
-            ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
-            ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
-            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-            % Ensure enterprise bridge module is loaded
-            ok = emqx_common_test_helpers:start_apps([
-                emqx_conf, emqx_resource, emqx_bridge
-            ]),
-            _ = application:ensure_all_started(opentsdb),
-            _ = emqx_bridge_enterprise:module_info(),
-            emqx_mgmt_api_test_util:init_suite(),
-            {Name, OpenTSConf} = opents_config(BridgeType, Config0),
-            Config =
-                [
-                    {opents_config, OpenTSConf},
-                    {opents_bridge_type, BridgeType},
-                    {opents_name, Name},
-                    {proxy_host, ProxyHost},
-                    {proxy_port, ProxyPort}
-                    | Config0
-                ],
-            Config;
+            Config = emqx_bridge_v2_testlib:init_per_group(default, ?BRIDGE_TYPE_BIN, Config0),
+            [
+                {bridge_host, Host},
+                {bridge_port, Port},
+                {proxy_name, ProxyName}
+                | Config
+            ];
         false ->
             case os:getenv("IS_CI") of
                 "yes" ->
@@ -116,250 +56,200 @@ common_init(ConfigT) ->
                 _ ->
                     {skip, no_opents}
             end
-    end.
+    end;
+init_per_group(_Group, Config) ->
+    Config.
 
-opents_config(BridgeType, Config) ->
-    Port = integer_to_list(?config(opents_port, Config)),
-    Server = "http://" ++ ?config(opents_host, Config) ++ ":" ++ Port,
-    Name = atom_to_binary(?MODULE),
-    BatchSize = ?config(batch_size, Config),
+end_per_group(default, Config) ->
+    emqx_bridge_v2_testlib:end_per_group(Config),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_testcase(TestCase, Config0) ->
+    Type = ?config(bridge_type, Config0),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    Name = <<
+        (atom_to_binary(TestCase))/binary, UniqueNum/binary
+    >>,
+    {_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
+    {_, ActionConfig} = action_config(Name, Config0),
+    Config = [
+        {connector_type, Type},
+        {connector_name, Name},
+        {connector_config, ConnectorConfig},
+        {bridge_type, Type},
+        {bridge_name, Name},
+        {bridge_config, ActionConfig}
+        | Config0
+    ],
+    %%    iotdb_reset(Config),
+    ok = snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(TestCase, Config) ->
+    emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config).
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+action_config(Name, Config) ->
+    Type = ?config(bridge_type, Config),
     ConfigString =
         io_lib:format(
-            "bridges.~s.~s {\n"
+            "actions.~s.~s {\n"
             "  enable = true\n"
-            "  server = ~p\n"
-            "  resource_opts = {\n"
-            "    request_ttl = 500ms\n"
-            "    batch_size = ~b\n"
-            "    query_mode = sync\n"
+            "  connector = \"~s\"\n"
+            "  parameters = {\n"
+            "     data = []\n"
             "  }\n"
-            "}",
+            "}\n",
             [
-                BridgeType,
+                Type,
                 Name,
-                Server,
-                BatchSize
+                Name
             ]
         ),
-    {Name, parse_and_check(ConfigString, BridgeType, Name)}.
-
-parse_and_check(ConfigString, BridgeType, Name) ->
-    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
-    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
-    #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
-    Config.
+    ct:pal("ActionConfig:~ts~n", [ConfigString]),
+    {ConfigString, parse_action_and_check(ConfigString, Type, Name)}.
+
+connector_config(Name, Config) ->
+    Host = ?config(bridge_host, Config),
+    Port = ?config(bridge_port, Config),
+    Type = ?config(bridge_type, Config),
+    ServerURL = opents_server_url(Host, Port),
+    ConfigString =
+        io_lib:format(
+            "connectors.~s.~s {\n"
+            "  enable = true\n"
+            "  server = \"~s\"\n"
+            "}\n",
+            [
+                Type,
+                Name,
+                ServerURL
+            ]
+        ),
+    ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
+    {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
 
-create_bridge(Config) ->
-    create_bridge(Config, _Overrides = #{}).
-
-create_bridge(Config, Overrides) ->
-    BridgeType = ?config(opents_bridge_type, Config),
-    Name = ?config(opents_name, Config),
-    Config0 = ?config(opents_config, Config),
-    Config1 = emqx_utils_maps:deep_merge(Config0, Overrides),
-    emqx_bridge:create(BridgeType, Name, Config1).
-
-delete_bridge(Config) ->
-    BridgeType = ?config(opents_bridge_type, Config),
-    Name = ?config(opents_name, Config),
-    emqx_bridge:remove(BridgeType, Name).
-
-create_bridge_http(Params) ->
-    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
-    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
-        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
-        Error -> Error
-    end.
-
-send_message(Config, Payload) ->
-    Name = ?config(opents_name, Config),
-    BridgeType = ?config(opents_bridge_type, Config),
-    BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
-    emqx_bridge:send_message(BridgeID, Payload).
-
-query_resource(Config, Request) ->
-    query_resource(Config, Request, 1_000).
-
-query_resource(Config, Request, Timeout) ->
-    Name = ?config(opents_name, Config),
-    BridgeType = ?config(opents_bridge_type, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
-    emqx_resource:query(ResourceID, Request, #{timeout => Timeout}).
+parse_action_and_check(ConfigString, BridgeType, Name) ->
+    parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name).
 
-%%------------------------------------------------------------------------------
-%% Testcases
-%%------------------------------------------------------------------------------
+parse_connector_and_check(ConfigString, ConnectorType, Name) ->
+    parse_and_check(
+        ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name
+    ).
+%%    emqx_utils_maps:safe_atom_key_map(Config).
 
-t_setup_via_config_and_publish(Config) ->
-    ?assertMatch(
-        {ok, _},
-        create_bridge(Config)
-    ),
-    SentData = make_data(),
-    ?check_trace(
-        begin
-            {_, {ok, #{result := Result}}} =
-                ?wait_async_action(
-                    send_message(Config, SentData),
-                    #{?snk_kind := buffer_worker_flush_ack},
-                    2_000
-                ),
-            ?assertMatch(
-                {ok, 200, #{failed := 0, success := 1}}, Result
-            ),
-            ok
-        end,
-        fun(Trace0) ->
-            Trace = ?of_kind(opents_connector_query_return, Trace0),
-            ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace),
-            ok
-        end
-    ),
-    ok.
+parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) ->
+    Type = to_bin(Type0),
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}),
+    #{RootKey := #{Type := #{Name := Config}}} = RawConf,
+    Config.
 
-t_setup_via_http_api_and_publish(Config) ->
-    BridgeType = ?config(opents_bridge_type, Config),
-    Name = ?config(opents_name, Config),
-    OpentsConfig0 = ?config(opents_config, Config),
-    OpentsConfig = OpentsConfig0#{
-        <<"name">> => Name,
-        <<"type">> => BridgeType
+to_bin(List) when is_list(List) ->
+    unicode:characters_to_binary(List, utf8);
+to_bin(Atom) when is_atom(Atom) ->
+    erlang:atom_to_binary(Atom);
+to_bin(Bin) when is_binary(Bin) ->
+    Bin.
+
+opents_server_url(Host, Port) ->
+    iolist_to_binary([
+        "http://",
+        Host,
+        ":",
+        integer_to_binary(Port)
+    ]).
+
+is_success_check({ok, 200, #{failed := Failed}}) ->
+    ?assertEqual(0, Failed);
+is_success_check(Ret) ->
+    ?assert(false, Ret).
+
+is_error_check(Result) ->
+    ?assertMatch({error, {400, #{failed := 1}}}, Result).
+
+opentds_query(Config, Metric) ->
+    Path = <<"/api/query">>,
+    Opts = #{return_all => true},
+    Body = #{
+        start => <<"1h-ago">>,
+        queries => [
+            #{
+                aggregator => <<"last">>,
+                metric => Metric,
+                tags => #{
+                    host => <<"*">>
+                }
+            }
+        ],
+        showTSUID => false,
+        showQuery => false,
+        delete => false
     },
-    ?assertMatch(
-        {ok, _},
-        create_bridge_http(OpentsConfig)
-    ),
-    SentData = make_data(),
-    ?check_trace(
-        begin
-            Request = {send_message, SentData},
-            Res0 = query_resource(Config, Request, 2_500),
-            ?assertMatch(
-                {ok, 200, #{failed := 0, success := 1}}, Res0
-            ),
-            ok
-        end,
-        fun(Trace0) ->
-            Trace = ?of_kind(opents_connector_query_return, Trace0),
-            ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace),
-            ok
-        end
-    ),
-    ok.
-
-t_get_status(Config) ->
-    ?assertMatch(
-        {ok, _},
-        create_bridge(Config)
-    ),
-
-    Name = ?config(opents_name, Config),
-    BridgeType = ?config(opents_bridge_type, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    opentsdb_request(Config, Path, Body, Opts).
 
-    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
-    ok.
+opentsdb_request(Config, Path, Body) ->
+    opentsdb_request(Config, Path, Body, #{}).
 
-t_create_disconnected(Config) ->
-    BridgeType = proplists:get_value(bridge_type, Config, <<"opents">>),
-    Config1 = lists:keyreplace(opents_port, 1, Config, {opents_port, 61234}),
-    {_Name, OpenTSConf} = opents_config(BridgeType, Config1),
+opentsdb_request(Config, Path, Body, Opts) ->
+    Host = ?config(bridge_host, Config),
+    Port = ?config(bridge_port, Config),
+    ServerURL = opents_server_url(Host, Port),
+    URL = <<ServerURL/binary, Path/binary>>,
+    emqx_mgmt_api_test_util:request_api(post, URL, [], [], Body, Opts).
 
-    Config2 = lists:keyreplace(opents_config, 1, Config1, {opents_config, OpenTSConf}),
-    ?assertMatch({ok, _}, create_bridge(Config2)),
-
-    Name = ?config(opents_name, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
-    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)),
-    ok.
-
-t_write_failure(Config) ->
-    ProxyName = ?config(proxy_name, Config),
-    ProxyPort = ?config(proxy_port, Config),
-    ProxyHost = ?config(proxy_host, Config),
-    {ok, _} = create_bridge(Config),
-    SentData = make_data(),
-    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
-        {_, {ok, #{result := Result}}} =
-            ?wait_async_action(
-                send_message(Config, SentData),
-                #{?snk_kind := buffer_worker_flush_ack},
-                2_000
-            ),
-        ?assertMatch({error, _}, Result),
-        ok
-    end),
-    ok.
+make_data(Metric, Value) ->
+    #{
+        metric => Metric,
+        tags => #{
+            <<"host">> => <<"serverA">>
+        },
+        value => Value
+    }.
 
-t_write_timeout(Config) ->
-    ProxyName = ?config(proxy_name, Config),
-    ProxyPort = ?config(proxy_port, Config),
-    ProxyHost = ?config(proxy_host, Config),
-    {ok, _} = create_bridge(
-        Config,
-        #{
-            <<"resource_opts">> => #{
-                <<"request_ttl">> => <<"500ms">>,
-                <<"resume_interval">> => <<"100ms">>,
-                <<"health_check_interval">> => <<"100ms">>
-            }
-        }
-    ),
-    SentData = make_data(),
-    emqx_common_test_helpers:with_failure(
-        timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
-            ?assertMatch(
-                {error, {resource_error, #{reason := timeout}}},
-                query_resource(Config, {send_message, SentData})
-            )
-        end
-    ),
-    ok.
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
 
-t_missing_data(Config) ->
-    ?assertMatch(
-        {ok, _},
-        create_bridge(Config)
+t_query_simple(Config) ->
+    Metric = <<"t_query_simple">>,
+    Value = 12,
+    MakeMessageFun = fun() -> make_data(Metric, Value) end,
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_success_check/1, opents_bridge_on_query
     ),
-    {_, {ok, #{result := Result}}} =
-        ?wait_async_action(
-            send_message(Config, #{}),
-            #{?snk_kind := buffer_worker_flush_ack},
-            2_000
-        ),
+    {ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
+    QResult = emqx_utils_json:decode(IoTDBResult),
     ?assertMatch(
-        {error, {400, #{failed := 1, success := 0}}},
-        Result
+        [
+            #{
+                <<"metric">> := Metric,
+                <<"dps">> := _
+            }
+        ],
+        QResult
     ),
-    ok.
+    [#{<<"dps">> := Dps}] = QResult,
+    ?assertMatch([Value | _], maps:values(Dps)).
 
-t_bad_data(Config) ->
-    ?assertMatch(
-        {ok, _},
-        create_bridge(Config)
-    ),
-    Data = maps:without([metric], make_data()),
-    {_, {ok, #{result := Result}}} =
-        ?wait_async_action(
-            send_message(Config, Data),
-            #{?snk_kind := buffer_worker_flush_ack},
-            2_000
-        ),
+t_create_via_http(Config) ->
+    emqx_bridge_v2_testlib:t_create_via_http(Config).
 
-    ?assertMatch(
-        {error, {400, #{failed := 1, success := 0}}}, Result
-    ),
-    ok.
+t_start_stop(Config) ->
+    emqx_bridge_v2_testlib:t_start_stop(Config, opents_bridge_stopped).
 
-make_data() ->
-    make_data(<<"cpu">>, 12).
+t_on_get_status(Config) ->
+    emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}).
 
-make_data(Metric, Value) ->
-    #{
-        metric => Metric,
-        tags => #{
-            <<"host">> => <<"serverA">>
-        },
-        value => Value
-    }.
+t_query_invalid_data(Config) ->
+    Metric = <<"t_query_invalid_data">>,
+    Value = 12,
+    MakeMessageFun = fun() -> maps:remove(value, make_data(Metric, Value)) end,
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_error_check/1, opents_bridge_on_query
+    ).

+ 16 - 2
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -52,6 +52,8 @@ resource_type(iotdb) ->
     emqx_bridge_iotdb_connector;
 resource_type(elasticsearch) ->
     emqx_bridge_es_connector;
+resource_type(opents) ->
+    emqx_bridge_opents_connector;
 resource_type(Type) ->
     error({unknown_connector_type, Type}).
 
@@ -66,6 +68,8 @@ connector_impl_module(iotdb) ->
     emqx_bridge_iotdb_connector;
 connector_impl_module(elasticsearch) ->
     emqx_bridge_es_connector;
+connector_impl_module(opents) ->
+    emqx_bridge_opents_connector;
 connector_impl_module(_ConnectorType) ->
     undefined.
 
@@ -193,6 +197,14 @@ connector_structs() ->
                     desc => <<"ElasticSearch Connector Config">>,
                     required => false
                 }
+            )},
+        {opents,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_opents_connector, "config_connector")),
+                #{
+                    desc => <<"OpenTSDB Connector Config">>,
+                    required => false
+                }
             )}
     ].
 
@@ -212,7 +224,8 @@ schema_modules() ->
         emqx_postgresql_connector_schema,
         emqx_bridge_redis_schema,
         emqx_bridge_iotdb_connector,
-        emqx_bridge_es_connector
+        emqx_bridge_es_connector,
+        emqx_bridge_opents_connector
     ].
 
 api_schemas(Method) ->
@@ -241,7 +254,8 @@ api_schemas(Method) ->
         api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
         api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),
         api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),
-        api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method)
+        api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
+        api_ref(emqx_bridge_opents_connector, <<"opents">>, Method)
     ].
 
 api_ref(Module, Type, Method) ->

+ 3 - 1
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -154,7 +154,9 @@ connector_type_to_bridge_types(timescale) ->
 connector_type_to_bridge_types(iotdb) ->
     [iotdb];
 connector_type_to_bridge_types(elasticsearch) ->
-    [elasticsearch].
+    [elasticsearch];
+connector_type_to_bridge_types(opents) ->
+    [opents].
 
 actions_config_name(action) -> <<"actions">>;
 actions_config_name(source) -> <<"sources">>.

+ 31 - 0
rel/i18n/emqx_bridge_opents.hocon

@@ -23,4 +23,35 @@ emqx_bridge_opents {
 
     desc_name.label:
         "Bridge Name"
+
+action_parameters_data.desc:
+"""OpenTSDB action parameter data"""
+
+action_parameters_data.label:
+"""Parameter Data"""
+
+config_parameters_timestamp.desc:
+"""Timestamp. Placeholders in format of ${var} is supported"""
+
+config_parameters_timestamp.label:
+"""Timestamp"""
+
+config_parameters_metric.metric:
+"""Metric. Placeholders in format of ${var} is supported"""
+
+config_parameters_metric.metric:
+"""Metric"""
+
+config_parameters_tags.desc:
+"""Data Type, Placeholders in format of ${var} is supported"""
+
+config_parameters_tags.label:
+"""Tags"""
+
+config_parameters_value.desc:
+"""Value. Placeholders in format of ${var} is supported"""
+
+config_parameters_value.label:
+"""Value"""
+
 }

+ 6 - 0
rel/i18n/emqx_bridge_opents_connector.hocon

@@ -17,4 +17,10 @@ emqx_bridge_opents_connector {
 
     details.label:
         "Details"
+
+desc_config.desc:
+"""Configuration for OpenTSDB Connector."""
+
+desc_config.label:
+"""OpenTSDB Connector Configuration"""
 }