Просмотр исходного кода

Merge pull request #10425 from lafirest/feat/opentsdb

feat(opents): OpenTSDB integration
lafirest 2 лет назад
Родитель
Сommit
903b559e4f

+ 1 - 0
.ci/docker-compose-file/.env

@@ -7,6 +7,7 @@ INFLUXDB_TAG=2.5.0
 TDENGINE_TAG=3.0.2.4
 DYNAMO_TAG=1.21.0
 CASSANDRA_TAG=3.11.6
+OPENTS_TAG=9aa7f88
 
 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server
 SQLSERVER_TAG=2019-CU19-ubuntu-20.04

+ 9 - 0
.ci/docker-compose-file/docker-compose-opents.yaml

@@ -0,0 +1,9 @@
+version: '3.9'
+
+services:
+  opents_server:
+    container_name: opents
+    image: petergrace/opentsdb-docker:${OPENTS_TAG}
+    restart: always
+    networks:
+      - emqx_bridge

+ 1 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -26,6 +26,7 @@ services:
       - 19876:9876
       - 19042:9042
       - 19142:9142
+      - 14242:4242
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -101,5 +101,11 @@
     "listen": "0.0.0.0:1433",
     "upstream": "sqlserver:1433",
     "enabled": true
+  },
+  {
+    "name": "opents",
+    "listen": "0.0.0.0:4242",
+    "upstream": "opents:4242",
+    "enabled": true
   }
 ]

+ 1 - 0
.github/workflows/run_test_cases.yaml

@@ -168,6 +168,7 @@ jobs:
           REDIS_TAG: "7.0"
           INFLUXDB_TAG: "2.5.0"
           TDENGINE_TAG: "3.0.2.4"
+          OPENTS_TAG: "9aa7f88"
           PROFILE: ${{ matrix.profile }}
           CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}
         run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }}

+ 94 - 0
apps/emqx_bridge_opents/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 36 - 0
apps/emqx_bridge_opents/README.md

@@ -0,0 +1,36 @@
+# EMQX OpenTSDB Bridge
+
+[OpenTSDB](http://opentsdb.net) is a distributed, scalable Time Series Database (TSDB) written on top of HBase.
+
+OpenTSDB was written to address a common need: store, index and serve metrics collected from computer systems (network gear, operating systems, applications) at a large scale, and make this data easily accessible and graphable.
+
+OpenTSDB allows you to collect thousands of metrics from tens of thousands of hosts and applications, at a high rate (every few seconds). 
+
+OpenTSDB will never delete or downsample data and can easily store hundreds of billions of data points.
+
+The application is used to connect EMQX and OpenTSDB. User can create a rule and easily ingest IoT data into OpenTSDB by leveraging the
+[EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html).
+
+
+# Documentation
+
+- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)
+  for the EMQX rules engine introduction.
+
+
+# HTTP APIs
+
+- Several APIs are provided for bridge management, which includes create bridge,
+  update bridge, get bridge, stop or restart bridge and list bridges etc.
+
+  Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) for more detailed information.
+
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

+ 2 - 0
apps/emqx_bridge_opents/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+opents

+ 8 - 0
apps/emqx_bridge_opents/rebar.config

@@ -0,0 +1,8 @@
+{erl_opts, [debug_info]}.
+
+{deps, [
+    {opentsdb, {git, "https://github.com/emqx/opentsdb-client-erl", {tag, "v0.5.1"}}},
+    {emqx_connector, {path, "../../apps/emqx_connector"}},
+    {emqx_resource, {path, "../../apps/emqx_resource"}},
+    {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+]}.

+ 15 - 0
apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src

@@ -0,0 +1,15 @@
+{application, emqx_bridge_opents, [
+    {description, "EMQX Enterprise OpenTSDB Bridge"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        opentsdb
+    ]},
+    {env, []},
+    {modules, []},
+
+    {licenses, ["BSL"]},
+    {links, []}
+]}.

+ 85 - 0
apps/emqx_bridge_opents/src/emqx_bridge_opents.erl

@@ -0,0 +1,85 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_opents).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+    conn_bridge_examples/1
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+%% -------------------------------------------------------------------------------------------------
+%% api
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"opents">> => #{
+                summary => <<"OpenTSDB Bridge">>,
+                value => values(Method)
+            }
+        }
+    ].
+
+values(_Method) ->
+    #{
+        enable => true,
+        type => opents,
+        name => <<"foo">>,
+        server => <<"http://127.0.0.1:4242">>,
+        pool_size => 8,
+        resource_opts => #{
+            worker_pool_size => 1,
+            health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+            auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
+            batch_size => ?DEFAULT_BATCH_SIZE,
+            batch_time => ?DEFAULT_BATCH_TIME,
+            query_mode => async,
+            max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
+        }
+    }.
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+namespace() -> "bridge_opents".
+
+roots() -> [].
+
+fields("config") ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}
+    ] ++ emqx_resource_schema:fields("resource_opts") ++
+        emqx_bridge_opents_connector:fields(config);
+fields("post") ->
+    [type_field(), name_field() | fields("config")];
+fields("put") ->
+    fields("config");
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("post").
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."];
+desc(_) ->
+    undefined.
+
+%% -------------------------------------------------------------------------------------------------
+%% internal
+
+type_field() ->
+    {type, mk(enum([opents]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

+ 184 - 0
apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl

@@ -0,0 +1,184 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_opents_connector).
+
+-behaviour(emqx_resource).
+
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-export([roots/0, fields/1]).
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    is_buffer_supported/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_batch_query/3,
+    on_get_status/2
+]).
+
+-export([connect/1]).
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+%%=====================================================================
+%% Hocon schema
+roots() ->
+    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
+
+fields(config) ->
+    [
+        {server, mk(binary(), #{required => true, desc => ?DESC("server")})},
+        {pool_size, fun emqx_connector_schema_lib:pool_size/1},
+        {summary, mk(boolean(), #{default => true, desc => ?DESC("summary")})},
+        {details, mk(boolean(), #{default => false, desc => ?DESC("details")})},
+        {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
+    ].
+
+%%========================================================================================
+%% `emqx_resource' API
+%%========================================================================================
+
+callback_mode() -> always_sync.
+
+is_buffer_supported() -> false.
+
+on_start(
+    InstanceId,
+    #{
+        server := Server,
+        pool_size := PoolSize,
+        summary := Summary,
+        details := Details,
+        resource_opts := #{batch_size := BatchSize}
+    } = Config
+) ->
+    ?SLOG(info, #{
+        msg => "starting_opents_connector",
+        connector => InstanceId,
+        config => emqx_utils:redact(Config)
+    }),
+
+    Options = [
+        {server, to_str(Server)},
+        {summary, Summary},
+        {details, Details},
+        {max_batch_size, BatchSize},
+        {pool_size, PoolSize}
+    ],
+
+    State = #{pool_name => InstanceId, server => Server},
+    case opentsdb_connectivity(Server) of
+        ok ->
+            case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
+                ok ->
+                    {ok, State};
+                Error ->
+                    Error
+            end;
+        {error, Reason} = Error ->
+            ?SLOG(error, #{msg => "Initiate resource failed", reason => Reason}),
+            Error
+    end.
+
+on_stop(InstanceId, #{pool_name := PoolName} = _State) ->
+    ?SLOG(info, #{
+        msg => "stopping_opents_connector",
+        connector => InstanceId
+    }),
+    emqx_resource_pool:stop(PoolName).
+
+on_query(InstanceId, Request, State) ->
+    on_batch_query(InstanceId, [Request], State).
+
+on_batch_query(
+    InstanceId,
+    BatchReq,
+    State
+) ->
+    Datas = [format_opentsdb_msg(Msg) || {_Key, Msg} <- BatchReq],
+    do_query(InstanceId, Datas, State).
+
+on_get_status(_InstanceId, #{server := Server}) ->
+    Result =
+        case opentsdb_connectivity(Server) of
+            ok ->
+                connected;
+            {error, Reason} ->
+                ?SLOG(error, #{msg => "OpenTSDB lost connection", reason => Reason}),
+                connecting
+        end,
+    Result.
+
+%%========================================================================================
+%% Helper fns
+%%========================================================================================
+
+do_query(InstanceId, Query, #{pool_name := PoolName} = State) ->
+    ?TRACE(
+        "QUERY",
+        "opents_connector_received",
+        #{connector => InstanceId, query => Query, state => State}
+    ),
+    Result = ecpool:pick_and_do(PoolName, {opentsdb, put, [Query]}, no_handover),
+
+    case Result of
+        {error, Reason} ->
+            ?tp(
+                opents_connector_query_return,
+                #{error => Reason}
+            ),
+            ?SLOG(error, #{
+                msg => "opents_connector_do_query_failed",
+                connector => InstanceId,
+                query => Query,
+                reason => Reason
+            }),
+            Result;
+        _ ->
+            ?tp(
+                opents_connector_query_return,
+                #{result => Result}
+            ),
+            Result
+    end.
+
+connect(Opts) ->
+    opentsdb:start_link(Opts).
+
+to_str(List) when is_list(List) ->
+    List;
+to_str(Bin) when is_binary(Bin) ->
+    erlang:binary_to_list(Bin).
+
+opentsdb_connectivity(Server) ->
+    SvrUrl =
+        case Server of
+            <<"http://", _/binary>> -> Server;
+            <<"https://", _/binary>> -> Server;
+            _ -> "http://" ++ Server
+        end,
+    emqx_plugin_libs_rule:http_connectivity(SvrUrl).
+
+format_opentsdb_msg(Msg) ->
+    maps:with(
+        [
+            timestamp,
+            metric,
+            tags,
+            value,
+            <<"timestamp">>,
+            <<"metric">>,
+            <<"tags">>,
+            <<"value">>
+        ],
+        Msg
+    ).

+ 363 - 0
apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl

@@ -0,0 +1,363 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_opents_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+% DB defaults
+-define(BATCH_SIZE, 10).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, with_batch},
+        {group, without_batch}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {with_batch, TCs},
+        {without_batch, TCs}
+    ].
+
+init_per_group(with_batch, Config0) ->
+    Config = [{batch_size, ?BATCH_SIZE} | Config0],
+    common_init(Config);
+init_per_group(without_batch, Config0) ->
+    Config = [{batch_size, 1} | Config0],
+    common_init(Config);
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
+    ok.
+
+init_per_testcase(_Testcase, Config) ->
+    delete_bridge(Config),
+    snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_Testcase, Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ok = snabbkaffe:stop(),
+    delete_bridge(Config),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+common_init(ConfigT) ->
+    Host = os:getenv("OPENTS_HOST", "toxiproxy"),
+    Port = list_to_integer(os:getenv("OPENTS_PORT", "4242")),
+
+    Config0 = [
+        {opents_host, Host},
+        {opents_port, Port},
+        {proxy_name, "opents"}
+        | ConfigT
+    ],
+
+    BridgeType = proplists:get_value(bridge_type, Config0, <<"opents">>),
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
+        true ->
+            % Setup toxiproxy
+            ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+            ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            % Ensure EE bridge module is loaded
+            _ = application:load(emqx_ee_bridge),
+            _ = emqx_ee_bridge:module_info(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            {Name, OpenTSConf} = opents_config(BridgeType, Config0),
+            Config =
+                [
+                    {opents_config, OpenTSConf},
+                    {opents_bridge_type, BridgeType},
+                    {opents_name, Name},
+                    {proxy_host, ProxyHost},
+                    {proxy_port, ProxyPort}
+                    | Config0
+                ],
+            Config;
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_opents);
+                _ ->
+                    {skip, no_opents}
+            end
+    end.
+
+opents_config(BridgeType, Config) ->
+    Port = integer_to_list(?config(opents_port, Config)),
+    Server = "http://" ++ ?config(opents_host, Config) ++ ":" ++ Port,
+    Name = atom_to_binary(?MODULE),
+    BatchSize = ?config(batch_size, Config),
+    ConfigString =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  enable = true\n"
+            "  server = ~p\n"
+            "  resource_opts = {\n"
+            "    request_timeout = 500ms\n"
+            "    batch_size = ~b\n"
+            "    query_mode = sync\n"
+            "  }\n"
+            "}",
+            [
+                BridgeType,
+                Name,
+                Server,
+                BatchSize
+            ]
+        ),
+    {Name, parse_and_check(ConfigString, BridgeType, Name)}.
+
+parse_and_check(ConfigString, BridgeType, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
+    Config.
+
+create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
+    BridgeType = ?config(opents_bridge_type, Config),
+    Name = ?config(opents_name, Config),
+    Config0 = ?config(opents_config, Config),
+    Config1 = emqx_utils_maps:deep_merge(Config0, Overrides),
+    emqx_bridge:create(BridgeType, Name, Config1).
+
+delete_bridge(Config) ->
+    BridgeType = ?config(opents_bridge_type, Config),
+    Name = ?config(opents_name, Config),
+    emqx_bridge:remove(BridgeType, Name).
+
+create_bridge_http(Params) ->
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+send_message(Config, Payload) ->
+    Name = ?config(opents_name, Config),
+    BridgeType = ?config(opents_bridge_type, Config),
+    BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
+    emqx_bridge:send_message(BridgeID, Payload).
+
+query_resource(Config, Request) ->
+    query_resource(Config, Request, 1_000).
+
+query_resource(Config, Request, Timeout) ->
+    Name = ?config(opents_name, Config),
+    BridgeType = ?config(opents_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    emqx_resource:query(ResourceID, Request, #{timeout => Timeout}).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_setup_via_config_and_publish(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    SentData = make_data(),
+    ?check_trace(
+        begin
+            {_, {ok, #{result := Result}}} =
+                ?wait_async_action(
+                    send_message(Config, SentData),
+                    #{?snk_kind := buffer_worker_flush_ack},
+                    2_000
+                ),
+            ?assertMatch(
+                {ok, 200, #{failed := 0, success := 1}}, Result
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(opents_connector_query_return, Trace0),
+            ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace),
+            ok
+        end
+    ),
+    ok.
+
+t_setup_via_http_api_and_publish(Config) ->
+    BridgeType = ?config(opents_bridge_type, Config),
+    Name = ?config(opents_name, Config),
+    OpentsConfig0 = ?config(opents_config, Config),
+    OpentsConfig = OpentsConfig0#{
+        <<"name">> => Name,
+        <<"type">> => BridgeType
+    },
+    ?assertMatch(
+        {ok, _},
+        create_bridge_http(OpentsConfig)
+    ),
+    SentData = make_data(),
+    ?check_trace(
+        begin
+            Request = {send_message, SentData},
+            Res0 = query_resource(Config, Request, 2_500),
+            ?assertMatch(
+                {ok, 200, #{failed := 0, success := 1}}, Res0
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(opents_connector_query_return, Trace0),
+            ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace),
+            ok
+        end
+    ),
+    ok.
+
+t_get_status(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+
+    Name = ?config(opents_name, Config),
+    BridgeType = ?config(opents_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+
+    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
+    ok.
+
+t_create_disconnected(Config) ->
+    BridgeType = proplists:get_value(bridge_type, Config, <<"opents">>),
+    Config1 = lists:keyreplace(opents_port, 1, Config, {opents_port, 61234}),
+    {_Name, OpenTSConf} = opents_config(BridgeType, Config1),
+
+    Config2 = lists:keyreplace(opents_config, 1, Config1, {opents_config, OpenTSConf}),
+    ?assertMatch({ok, _}, create_bridge(Config2)),
+
+    Name = ?config(opents_name, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)),
+    ok.
+
+t_write_failure(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    {ok, _} = create_bridge(Config),
+    SentData = make_data(),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        {_, {ok, #{result := Result}}} =
+            ?wait_async_action(
+                send_message(Config, SentData),
+                #{?snk_kind := buffer_worker_flush_ack},
+                2_000
+            ),
+        ?assertMatch({error, _}, Result),
+        ok
+    end),
+    ok.
+
+t_write_timeout(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    {ok, _} = create_bridge(
+        Config,
+        #{
+            <<"resource_opts">> => #{
+                <<"request_timeout">> => 500,
+                <<"resume_interval">> => 100,
+                <<"health_check_interval">> => 100
+            }
+        }
+    ),
+    SentData = make_data(),
+    emqx_common_test_helpers:with_failure(
+        timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
+            ?assertMatch(
+                {error, {resource_error, #{reason := timeout}}},
+                query_resource(Config, {send_message, SentData})
+            )
+        end
+    ),
+    ok.
+
+t_missing_data(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    {_, {ok, #{result := Result}}} =
+        ?wait_async_action(
+            send_message(Config, #{}),
+            #{?snk_kind := buffer_worker_flush_ack},
+            2_000
+        ),
+    ?assertMatch(
+        {error, {400, #{failed := 1, success := 0}}},
+        Result
+    ),
+    ok.
+
+t_bad_data(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Data = maps:without([metric], make_data()),
+    {_, {ok, #{result := Result}}} =
+        ?wait_async_action(
+            send_message(Config, Data),
+            #{?snk_kind := buffer_worker_flush_ack},
+            2_000
+        ),
+
+    ?assertMatch(
+        {error, {400, #{failed := 1, success := 0}}}, Result
+    ),
+    ok.
+
+make_data() ->
+    make_data(<<"cpu">>, 12).
+
+make_data(Metric, Value) ->
+    #{
+        metric => Metric,
+        tags => #{
+            <<"host">> => <<"serverA">>
+        },
+        value => Value
+    }.

+ 1 - 0
changes/ee/feat-10425.en.md

@@ -0,0 +1 @@
+Implement OpenTSDB data bridge.

+ 2 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -8,7 +8,8 @@
         emqx_ee_connector,
         telemetry,
         emqx_bridge_kafka,
-        emqx_bridge_gcp_pubsub
+        emqx_bridge_gcp_pubsub,
+        emqx_bridge_opents
     ]},
     {env, []},
     {modules, []},

+ 14 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -35,7 +35,8 @@ api_schemas(Method) ->
         ref(emqx_ee_bridge_clickhouse, Method),
         ref(emqx_ee_bridge_dynamo, Method),
         ref(emqx_ee_bridge_rocketmq, Method),
-        ref(emqx_ee_bridge_sqlserver, Method)
+        ref(emqx_ee_bridge_sqlserver, Method),
+        ref(emqx_bridge_opents, Method)
     ].
 
 schema_modules() ->
@@ -55,7 +56,8 @@ schema_modules() ->
         emqx_ee_bridge_clickhouse,
         emqx_ee_bridge_dynamo,
         emqx_ee_bridge_rocketmq,
-        emqx_ee_bridge_sqlserver
+        emqx_ee_bridge_sqlserver,
+        emqx_bridge_opents
     ].
 
 examples(Method) ->
@@ -94,7 +96,8 @@ resource_type(tdengine) -> emqx_ee_connector_tdengine;
 resource_type(clickhouse) -> emqx_ee_connector_clickhouse;
 resource_type(dynamo) -> emqx_ee_connector_dynamo;
 resource_type(rocketmq) -> emqx_ee_connector_rocketmq;
-resource_type(sqlserver) -> emqx_ee_connector_sqlserver.
+resource_type(sqlserver) -> emqx_ee_connector_sqlserver;
+resource_type(opents) -> emqx_bridge_opents_connector.
 
 fields(bridges) ->
     [
@@ -153,6 +156,14 @@ fields(bridges) ->
                     desc => <<"Cassandra Bridge Config">>,
                     required => false
                 }
+            )},
+        {opents,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_opents, "config")),
+                #{
+                    desc => <<"OpenTSDB Bridge Config">>,
+                    required => false
+                }
             )}
     ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++
         pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs().

+ 4 - 1
mix.exs

@@ -157,6 +157,7 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_kafka,
       :emqx_bridge_gcp_pubsub,
       :emqx_bridge_cassandra,
+      :emqx_bridge_opents,
       :emqx_bridge_clickhouse,
       :emqx_bridge_dynamo,
       :emqx_bridge_hstreamdb,
@@ -182,7 +183,8 @@ defmodule EMQXUmbrella.MixProject do
       {:brod, github: "kafka4beam/brod", tag: "3.16.8"},
       {:snappyer, "1.2.8", override: true},
       {:crc32cer, "0.1.8", override: true},
-      {:supervisor3, "1.1.12", override: true}
+      {:supervisor3, "1.1.12", override: true},
+      {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true}
     ]
   end
 
@@ -360,6 +362,7 @@ defmodule EMQXUmbrella.MixProject do
           emqx_bridge_kafka: :permanent,
           emqx_bridge_gcp_pubsub: :permanent,
           emqx_bridge_cassandra: :permanent,
+          emqx_bridge_opents: :permanent,
           emqx_bridge_clickhouse: :permanent,
           emqx_bridge_dynamo: :permanent,
           emqx_bridge_hstreamdb: :permanent,

+ 2 - 0
rebar.config.erl

@@ -81,6 +81,7 @@ is_enterprise(ee) -> true.
 is_community_umbrella_app("apps/emqx_bridge_kafka") -> false;
 is_community_umbrella_app("apps/emqx_bridge_gcp_pubsub") -> false;
 is_community_umbrella_app("apps/emqx_bridge_cassandra") -> false;
+is_community_umbrella_app("apps/emqx_bridge_opents") -> false;
 is_community_umbrella_app("apps/emqx_bridge_clickhouse") -> false;
 is_community_umbrella_app("apps/emqx_bridge_dynamo") -> false;
 is_community_umbrella_app("apps/emqx_bridge_hstreamdb") -> false;
@@ -455,6 +456,7 @@ relx_apps_per_edition(ee) ->
         emqx_bridge_kafka,
         emqx_bridge_gcp_pubsub,
         emqx_bridge_cassandra,
+        emqx_bridge_opents,
         emqx_bridge_clickhouse,
         emqx_bridge_dynamo,
         emqx_bridge_hstreamdb,

+ 26 - 0
rel/i18n/emqx_bridge_opents.hocon

@@ -0,0 +1,26 @@
+emqx_bridge_opents {
+
+    config_enable.desc:
+        """Enable or disable this bridge"""
+
+    config_enable.label:
+        "Enable Or Disable Bridge"
+
+    desc_config.desc:
+        """Configuration for an OpenTSDB bridge."""
+
+    desc_config.label:
+        "OpenTSDB Bridge Configuration"
+
+    desc_type.desc:
+        """The Bridge Type"""
+
+    desc_type.label:
+        "Bridge Type"
+
+    desc_name.desc:
+        """Bridge name."""
+
+    desc_name.label:
+        "Bridge Name"
+}

+ 20 - 0
rel/i18n/emqx_bridge_opents_connector.hocon

@@ -0,0 +1,20 @@
+emqx_bridge_opents_connector {
+
+    server.desc:
+        """The URL of OpenTSDB endpoint."""
+
+    server.label:
+        "URL"
+
+    summary.desc:
+        """Whether to return summary information."""
+
+    summary.label:
+        "Summary"
+
+    details.desc:
+        """Whether to return detailed information."""
+
+    details.label:
+        "Details"
+}

+ 26 - 0
rel/i18n/zh/emqx_bridge_opents.hocon

@@ -0,0 +1,26 @@
+emqx_bridge_opents {
+
+    config_enable.desc:
+        """启用/禁用桥接"""
+
+    config_enable.label:
+        "启用/禁用桥接"
+
+    desc_config.desc:
+        """OpenTSDB 桥接配置"""
+
+    desc_config.label:
+        "OpenTSDB 桥接配置"
+
+    desc_type.desc:
+        """Bridge 类型"""
+
+    desc_type.label:
+        "桥接类型"
+
+    desc_name.desc:
+        """桥接名字"""
+
+    desc_name.label:
+        "桥接名字"
+}

+ 20 - 0
rel/i18n/zh/emqx_bridge_opents_connector.hocon

@@ -0,0 +1,20 @@
+emqx_bridge_opents_connector {
+
+    server.desc:
+        """服务器的地址。"""
+
+    server.label:
+        "服务器地址"
+
+    summary.desc:
+        """是否返回摘要信息。"""
+
+    summary.label:
+        "摘要信息"
+
+    details.desc:
+        """是否返回详细信息。"""
+
+    details.label:
+        "详细信息"
+}

+ 3 - 0
scripts/ct/run.sh

@@ -188,6 +188,9 @@ for dep in ${CT_DEPS}; do
             ODBC_REQUEST='yes'
             FILES+=( '.ci/docker-compose-file/docker-compose-sqlserver.yaml' )
             ;;
+        opents)
+            FILES+=( '.ci/docker-compose-file/docker-compose-opents.yaml' )
+            ;; 
         *)
             echo "unknown_ct_dependency $dep"
             exit 1

+ 1 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -274,3 +274,4 @@ clickhouse
 FormatType
 RocketMQ
 Keyspace
+OpenTSDB