Переглянути джерело

test(opents): add test cases for OpenTSDB

firest 2 роки тому
батько
коміт
0b46acda87

+ 1 - 0
.ci/docker-compose-file/.env

@@ -7,6 +7,7 @@ INFLUXDB_TAG=2.5.0
 TDENGINE_TAG=3.0.2.4
 DYNAMO_TAG=1.21.0
 CASSANDRA_TAG=3.11.6
+OPENTS_TAG=9aa7f88
 
 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server
 SQLSERVER_TAG=2019-CU19-ubuntu-20.04

+ 9 - 0
.ci/docker-compose-file/docker-compose-opents.yaml

@@ -0,0 +1,9 @@
+version: '3.9'
+
+services:
+  opents_server:
+    container_name: opents
+    image: petergrace/opentsdb-docker:${OPENTS_TAG}
+    restart: always
+    networks:
+      - emqx_bridge

+ 1 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -26,6 +26,7 @@ services:
       - 19876:9876
       - 19042:9042
       - 19142:9142
+      - 14242:4242
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -101,5 +101,11 @@
     "listen": "0.0.0.0:1433",
     "upstream": "sqlserver:1433",
     "enabled": true
+  },
+  {
+    "name": "opents",
+    "listen": "0.0.0.0:4242",
+    "upstream": "opents:4242",
+    "enabled": true
   }
 ]

+ 1 - 0
.github/workflows/run_test_cases.yaml

@@ -168,6 +168,7 @@ jobs:
           REDIS_TAG: "7.0"
           INFLUXDB_TAG: "2.5.0"
           TDENGINE_TAG: "3.0.2.4"
+          OPENTS_TAG: "9aa7f88"
           PROFILE: ${{ matrix.profile }}
           CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}
         run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }}

+ 0 - 19
apps/emqx_bridge_opents/.gitignore

@@ -1,19 +0,0 @@
-.rebar3
-_*
-.eunit
-*.o
-*.beam
-*.plt
-*.swp
-*.swo
-.erlang.cookie
-ebin
-log
-erl_crash.dump
-.rebar
-logs
-_build
-.idea
-*.iml
-rebar3.crashdump
-*~

+ 2 - 0
apps/emqx_bridge_opents/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+opents

+ 0 - 0
apps/emqx_bridge_opents/etc/emqx_bridge_opents.conf


+ 363 - 0
apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl

@@ -0,0 +1,363 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_opents_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+% DB defaults
+-define(BATCH_SIZE, 10).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, with_batch},
+        {group, without_batch}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {with_batch, TCs},
+        {without_batch, TCs}
+    ].
+
+init_per_group(with_batch, Config0) ->
+    Config = [{batch_size, ?BATCH_SIZE} | Config0],
+    common_init(Config);
+init_per_group(without_batch, Config0) ->
+    Config = [{batch_size, 1} | Config0],
+    common_init(Config);
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
+    ok.
+
+init_per_testcase(_Testcase, Config) ->
+    delete_bridge(Config),
+    snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_Testcase, Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ok = snabbkaffe:stop(),
+    delete_bridge(Config),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+common_init(ConfigT) ->
+    Host = os:getenv("OPENTS_HOST", "toxiproxy"),
+    Port = list_to_integer(os:getenv("OPENTS_PORT", "4242")),
+
+    Config0 = [
+        {opents_host, Host},
+        {opents_port, Port},
+        {proxy_name, "opents"}
+        | ConfigT
+    ],
+
+    BridgeType = proplists:get_value(bridge_type, Config0, <<"opents">>),
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
+        true ->
+            % Setup toxiproxy
+            ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+            ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            % Ensure EE bridge module is loaded
+            _ = application:load(emqx_ee_bridge),
+            _ = emqx_ee_bridge:module_info(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            {Name, OpenTSConf} = opents_config(BridgeType, Config0),
+            Config =
+                [
+                    {opents_config, OpenTSConf},
+                    {opents_bridge_type, BridgeType},
+                    {opents_name, Name},
+                    {proxy_host, ProxyHost},
+                    {proxy_port, ProxyPort}
+                    | Config0
+                ],
+            Config;
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_opents);
+                _ ->
+                    {skip, no_opents}
+            end
+    end.
+
+opents_config(BridgeType, Config) ->
+    Port = integer_to_list(?config(opents_port, Config)),
+    Server = "http://" ++ ?config(opents_host, Config) ++ ":" ++ Port,
+    Name = atom_to_binary(?MODULE),
+    BatchSize = ?config(batch_size, Config),
+    ConfigString =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  enable = true\n"
+            "  server = ~p\n"
+            "  resource_opts = {\n"
+            "    request_timeout = 500ms\n"
+            "    batch_size = ~b\n"
+            "    query_mode = sync\n"
+            "  }\n"
+            "}",
+            [
+                BridgeType,
+                Name,
+                Server,
+                BatchSize
+            ]
+        ),
+    {Name, parse_and_check(ConfigString, BridgeType, Name)}.
+
+parse_and_check(ConfigString, BridgeType, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
+    Config.
+
+create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
+    BridgeType = ?config(opents_bridge_type, Config),
+    Name = ?config(opents_name, Config),
+    Config0 = ?config(opents_config, Config),
+    Config1 = emqx_utils_maps:deep_merge(Config0, Overrides),
+    emqx_bridge:create(BridgeType, Name, Config1).
+
+delete_bridge(Config) ->
+    BridgeType = ?config(opents_bridge_type, Config),
+    Name = ?config(opents_name, Config),
+    emqx_bridge:remove(BridgeType, Name).
+
+create_bridge_http(Params) ->
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+send_message(Config, Payload) ->
+    Name = ?config(opents_name, Config),
+    BridgeType = ?config(opents_bridge_type, Config),
+    BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
+    emqx_bridge:send_message(BridgeID, Payload).
+
+query_resource(Config, Request) ->
+    query_resource(Config, Request, 1_000).
+
+query_resource(Config, Request, Timeout) ->
+    Name = ?config(opents_name, Config),
+    BridgeType = ?config(opents_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    emqx_resource:query(ResourceID, Request, #{timeout => Timeout}).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_setup_via_config_and_publish(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    SentData = make_data(),
+    ?check_trace(
+        begin
+            {_, {ok, #{result := Result}}} =
+                ?wait_async_action(
+                    send_message(Config, SentData),
+                    #{?snk_kind := buffer_worker_flush_ack},
+                    2_000
+                ),
+            ?assertMatch(
+                {ok, 200, #{failed := 0, success := 1}}, Result
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(opents_connector_query_return, Trace0),
+            ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace),
+            ok
+        end
+    ),
+    ok.
+
+t_setup_via_http_api_and_publish(Config) ->
+    BridgeType = ?config(opents_bridge_type, Config),
+    Name = ?config(opents_name, Config),
+    OpentsConfig0 = ?config(opents_config, Config),
+    OpentsConfig = OpentsConfig0#{
+        <<"name">> => Name,
+        <<"type">> => BridgeType
+    },
+    ?assertMatch(
+        {ok, _},
+        create_bridge_http(OpentsConfig)
+    ),
+    SentData = make_data(),
+    ?check_trace(
+        begin
+            Request = {send_message, SentData},
+            Res0 = query_resource(Config, Request, 2_500),
+            ?assertMatch(
+                {ok, 200, #{failed := 0, success := 1}}, Res0
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(opents_connector_query_return, Trace0),
+            ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace),
+            ok
+        end
+    ),
+    ok.
+
+t_get_status(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+
+    Name = ?config(opents_name, Config),
+    BridgeType = ?config(opents_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+
+    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
+    ok.
+
+t_create_disconnected(Config) ->
+    BridgeType = proplists:get_value(bridge_type, Config, <<"opents">>),
+    Config1 = lists:keyreplace(opents_port, 1, Config, {opents_port, 61234}),
+    {_Name, OpenTSConf} = opents_config(BridgeType, Config1),
+
+    Config2 = lists:keyreplace(opents_config, 1, Config1, {opents_config, OpenTSConf}),
+    ?assertMatch({ok, _}, create_bridge(Config2)),
+
+    Name = ?config(opents_name, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)),
+    ok.
+
+t_write_failure(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    {ok, _} = create_bridge(Config),
+    SentData = make_data(),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        {_, {ok, #{result := Result}}} =
+            ?wait_async_action(
+                send_message(Config, SentData),
+                #{?snk_kind := buffer_worker_flush_ack},
+                2_000
+            ),
+        ?assertMatch({error, _}, Result),
+        ok
+    end),
+    ok.
+
+t_write_timeout(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    {ok, _} = create_bridge(
+        Config,
+        #{
+            <<"resource_opts">> => #{
+                <<"request_timeout">> => 500,
+                <<"resume_interval">> => 100,
+                <<"health_check_interval">> => 100
+            }
+        }
+    ),
+    SentData = make_data(),
+    emqx_common_test_helpers:with_failure(
+        timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
+            ?assertMatch(
+                {error, {resource_error, #{reason := timeout}}},
+                query_resource(Config, {send_message, SentData})
+            )
+        end
+    ),
+    ok.
+
+t_missing_data(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    {_, {ok, #{result := Result}}} =
+        ?wait_async_action(
+            send_message(Config, #{}),
+            #{?snk_kind := buffer_worker_flush_ack},
+            2_000
+        ),
+    ?assertMatch(
+        {error, {400, #{failed := 1, success := 0}}},
+        Result
+    ),
+    ok.
+
+t_bad_data(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Data = maps:without([metric], make_data()),
+    {_, {ok, #{result := Result}}} =
+        ?wait_async_action(
+            send_message(Config, Data),
+            #{?snk_kind := buffer_worker_flush_ack},
+            2_000
+        ),
+
+    ?assertMatch(
+        {error, {400, #{failed := 1, success := 0}}}, Result
+    ),
+    ok.
+
+make_data() ->
+    make_data(<<"cpu">>, 12).
+
+make_data(Metric, Value) ->
+    #{
+        metric => Metric,
+        tags => #{
+            <<"host">> => <<"serverA">>
+        },
+        value => Value
+    }.

+ 9 - 7
lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl

@@ -108,13 +108,15 @@ on_batch_query(
     do_query(InstanceId, Datas, State).
 
 on_get_status(_InstanceId, #{server := Server}) ->
-    case opentsdb_connectivity(Server) of
-        ok ->
-            connected;
-        {error, Reason} ->
-            ?SLOG(error, #{msg => "OpenTSDB lost connection", reason => Reason}),
-            connecting
-    end.
+    Result =
+        case opentsdb_connectivity(Server) of
+            ok ->
+                connected;
+            {error, Reason} ->
+                ?SLOG(error, #{msg => "OpenTSDB lost connection", reason => Reason}),
+                connecting
+        end,
+    Result.
 
 %%========================================================================================
 %% Helper fns

+ 4 - 1
mix.exs

@@ -157,6 +157,7 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_kafka,
       :emqx_bridge_gcp_pubsub,
       :emqx_bridge_cassandra,
+      :emqx_bridge_opents,
       :emqx_bridge_clickhouse,
       :emqx_bridge_dynamo,
       :emqx_bridge_hstreamdb,
@@ -182,7 +183,8 @@ defmodule EMQXUmbrella.MixProject do
       {:brod, github: "kafka4beam/brod", tag: "3.16.8"},
       {:snappyer, "1.2.8", override: true},
       {:crc32cer, "0.1.8", override: true},
-      {:supervisor3, "1.1.12", override: true}
+      {:supervisor3, "1.1.12", override: true},
+      {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true}
     ]
   end
 
@@ -360,6 +362,7 @@ defmodule EMQXUmbrella.MixProject do
           emqx_bridge_kafka: :permanent,
           emqx_bridge_gcp_pubsub: :permanent,
           emqx_bridge_cassandra: :permanent,
+          emqx_bridge_opents: :permanent,
           emqx_bridge_clickhouse: :permanent,
           emqx_bridge_dynamo: :permanent,
           emqx_bridge_hstreamdb: :permanent,

+ 2 - 0
rebar.config.erl

@@ -81,6 +81,7 @@ is_enterprise(ee) -> true.
 is_community_umbrella_app("apps/emqx_bridge_kafka") -> false;
 is_community_umbrella_app("apps/emqx_bridge_gcp_pubsub") -> false;
 is_community_umbrella_app("apps/emqx_bridge_cassandra") -> false;
+is_community_umbrella_app("apps/emqx_bridge_opents") -> false;
 is_community_umbrella_app("apps/emqx_bridge_clickhouse") -> false;
 is_community_umbrella_app("apps/emqx_bridge_dynamo") -> false;
 is_community_umbrella_app("apps/emqx_bridge_hstreamdb") -> false;
@@ -455,6 +456,7 @@ relx_apps_per_edition(ee) ->
         emqx_bridge_kafka,
         emqx_bridge_gcp_pubsub,
         emqx_bridge_cassandra,
+        emqx_bridge_opents,
         emqx_bridge_clickhouse,
         emqx_bridge_dynamo,
         emqx_bridge_hstreamdb,

+ 1 - 1
rel/i18n/emqx_ee_connector_opents.hocon

@@ -1,4 +1,4 @@
-emqx_ee_connector_opents {
+emqx_bridge_opents_connector {
 
     server.desc:
         """The URL of OpenTSDB endpoint."""

+ 8 - 1
scripts/ct/run.sh

@@ -115,7 +115,11 @@ case "${WHICH_APP}" in
           export PROFILE='emqx'
         fi
         ;;
-    *)
+    apps/emqx_bridge_opents)
+        ## ensure enterprise profile when testing ee applications
+        export PROFILE='emqx-enterprise'
+        ;;
+     *)
         export PROFILE="${PROFILE:-emqx}"
         ;;
 esac
@@ -188,6 +192,9 @@ for dep in ${CT_DEPS}; do
             ODBC_REQUEST='yes'
             FILES+=( '.ci/docker-compose-file/docker-compose-sqlserver.yaml' )
             ;;
+        opents)
+            FILES+=( '.ci/docker-compose-file/docker-compose-opents.yaml' )
+            ;; 
         *)
             echo "unknown_ct_dependency $dep"
             exit 1

+ 3 - 0
scripts/find-apps.sh

@@ -72,6 +72,9 @@ describe_app() {
         runner="docker"
     fi
     case "${app}" in
+        apps/emqx_bridge_opents)
+            profile='emqx-enterprise'
+            ;;
         apps/*)
             if [[ -f "${app}/BSL.txt" ]]; then
               profile='emqx-enterprise'