Просмотр исходного кода

Merge remote-tracking branch 'origin/dev/ee5.0' into 1116-sync-v5.0.10-to-ee50

Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
83fe35b148

+ 36 - 0
.ci/docker-compose-file/docker-compose-influxdb-tcp.yaml

@@ -0,0 +1,36 @@
+version: '3.9'
+
+services:
+  influxdb_server_tcp:
+    container_name: influxdb_tcp
+    image: influxdb:${INFLUXDB_TAG}
+    expose:
+      - "8086"
+      - "8089/udp"
+      - "8083"
+    # ports:
+    #   - "8086:8086"
+    environment:
+      DOCKER_INFLUXDB_INIT_MODE: setup
+      DOCKER_INFLUXDB_INIT_USERNAME: root
+      DOCKER_INFLUXDB_INIT_PASSWORD: emqx@123
+      DOCKER_INFLUXDB_INIT_ORG: emqx
+      DOCKER_INFLUXDB_INIT_BUCKET: mqtt
+      DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: abcdefg
+    volumes:
+      - "./influxdb/setup-v1.sh:/docker-entrypoint-initdb.d/setup-v1.sh"
+    restart: always
+    networks:
+      - emqx_bridge
+
+# networks:
+#   emqx_bridge:
+#     driver: bridge
+#     name: emqx_bridge
+#     ipam:
+#       driver: default
+#       config:
+#         - subnet: 172.100.239.0/24
+#           gateway: 172.100.239.1
+#         - subnet: 2001:3200:3200::/64
+#           gateway: 2001:3200:3200::1

+ 42 - 0
.ci/docker-compose-file/docker-compose-influxdb-tls.yaml

@@ -0,0 +1,42 @@
+version: '3.9'
+
+services:
+  influxdb_server_tls:
+    container_name: influxdb_tls
+    image: influxdb:${INFLUXDB_TAG}
+    expose:
+      - "8086"
+      - "8089/udp"
+      - "8083"
+    # ports:
+    #   - "8087:8086"
+    environment:
+      DOCKER_INFLUXDB_INIT_MODE: setup
+      DOCKER_INFLUXDB_INIT_USERNAME: root
+      DOCKER_INFLUXDB_INIT_PASSWORD: emqx@123
+      DOCKER_INFLUXDB_INIT_ORG: emqx
+      DOCKER_INFLUXDB_INIT_BUCKET: mqtt
+      DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: abcdefg
+    volumes:
+      - ./certs/server.crt:/etc/influxdb/cert.pem
+      - ./certs/server.key:/etc/influxdb/key.pem
+      - "./influxdb/setup-v1.sh:/docker-entrypoint-initdb.d/setup-v1.sh"
+    command:
+      - influxd
+      - --tls-cert=/etc/influxdb/cert.pem
+      - --tls-key=/etc/influxdb/key.pem
+    restart: always
+    networks:
+      - emqx_bridge
+
+# networks:
+#   emqx_bridge:
+#     driver: bridge
+#     name: emqx_bridge
+#     ipam:
+#       driver: default
+#       config:
+#         - subnet: 172.100.239.0/24
+#           gateway: 172.100.239.1
+#         - subnet: 2001:3200:3200::/64
+#           gateway: 2001:3200:3200::1

+ 18 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -0,0 +1,18 @@
+version: '3.9'
+
+services:
+  toxiproxy:
+    container_name: toxiproxy
+    image: ghcr.io/shopify/toxiproxy:2.5.0
+    restart: always
+    networks:
+      - emqx_bridge
+    volumes:
+      - "./toxiproxy.json:/config/toxiproxy.json"
+    ports:
+      - 8474:8474
+      - 8086:8086
+      - 8087:8087
+    command:
+      - "-host=0.0.0.0"
+      - "-config=/config/toxiproxy.json"

+ 16 - 0
.ci/docker-compose-file/influxdb/setup-v1.sh

@@ -0,0 +1,16 @@
+#!/usr/bin/env bash
+
+set -e
+
+# influx v1 dbrp create \
+#   --bucket-id ${DOCKER_INFLUXDB_INIT_BUCKET_ID} \
+#   --db ${V1_DB_NAME} \
+#   --rp ${V1_RP_NAME} \
+#   --default \
+#   --org ${DOCKER_INFLUXDB_INIT_ORG}
+
+influx v1 auth create \
+  --username "${DOCKER_INFLUXDB_INIT_USERNAME}" \
+  --password "${DOCKER_INFLUXDB_INIT_PASSWORD}" \
+  --write-bucket "${DOCKER_INFLUXDB_INIT_BUCKET_ID}" \
+  --org "${DOCKER_INFLUXDB_INIT_ORG}"

+ 14 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -0,0 +1,14 @@
+[
+  {
+    "name": "influxdb_tcp",
+    "listen": "0.0.0.0:8086",
+    "upstream": "influxdb_tcp:8086",
+    "enabled": true
+  },
+  {
+    "name": "influxdb_tls",
+    "listen": "0.0.0.0:8087",
+    "upstream": "influxdb_tls:8086",
+    "enabled": true
+  }
+]

+ 1 - 0
.github/workflows/run_test_cases.yaml

@@ -151,6 +151,7 @@ jobs:
             MYSQL_TAG: 8
             PGSQL_TAG: 13
             REDIS_TAG: 6
+            INFLUXDB_TAG: 2.5.0
             WHICH_APP: ${{ matrix.app[0] }}
             PROFILE: ${{ matrix.app[1] }}
           run: |

+ 4 - 0
Makefile

@@ -97,7 +97,11 @@ $(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app))))
 .PHONY: ct-suite
 ct-suite: $(REBAR)
 ifneq ($(TESTCASE),)
+ifneq ($(GROUP),)
+	$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE)  --case $(TESTCASE) --group $(GROUP)
+else
 	$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE)  --case $(TESTCASE)
+endif
 else ifneq ($(GROUP),)
 	$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE)  --group $(GROUP)
 else

+ 145 - 0
apps/emqx/test/emqx_common_test_helpers.erl

@@ -65,6 +65,15 @@
     stop_slave/1
 ]).
 
+-export([clear_screen/0]).
+-export([with_mock/4]).
+
+%% Toxiproxy API
+-export([
+    with_failure/5,
+    reset_proxy/2
+]).
+
 -define(CERTS_PATH(CertName), filename:join(["etc", "certs", CertName])).
 
 -define(MQTT_SSL_TWOWAY, [
@@ -769,3 +778,139 @@ expand_node_specs(Specs, CommonOpts) ->
         end,
         Specs
     ).
+
+%% is useful when iterating on the tests in a loop, to get rid of all
+%% the garbaged printed before the test itself beings.
+clear_screen() ->
+    io:format(standard_io, "\033[H\033[2J", []),
+    io:format(standard_error, "\033[H\033[2J", []),
+    io:format(standard_io, "\033[H\033[3J", []),
+    io:format(standard_error, "\033[H\033[3J", []),
+    ok.
+
+with_mock(Mod, FnName, MockedFn, Fun) ->
+    ok = meck:new(Mod, [non_strict, no_link, no_history, passthrough]),
+    ok = meck:expect(Mod, FnName, MockedFn),
+    try
+        Fun()
+    after
+        ok = meck:unload(Mod)
+    end.
+
+%%-------------------------------------------------------------------------------
+%% Toxiproxy utils
+%%-------------------------------------------------------------------------------
+
+reset_proxy(ProxyHost, ProxyPort) ->
+    Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/reset",
+    Body = <<>>,
+    {ok, {{_, 204, _}, _, _}} = httpc:request(
+        post,
+        {Url, [], "application/json", Body},
+        [],
+        [{body_format, binary}]
+    ).
+
+with_failure(FailureType, Name, ProxyHost, ProxyPort, Fun) ->
+    enable_failure(FailureType, Name, ProxyHost, ProxyPort),
+    try
+        Fun()
+    after
+        heal_failure(FailureType, Name, ProxyHost, ProxyPort)
+    end.
+
+enable_failure(FailureType, Name, ProxyHost, ProxyPort) ->
+    case FailureType of
+        down -> switch_proxy(off, Name, ProxyHost, ProxyPort);
+        timeout -> timeout_proxy(on, Name, ProxyHost, ProxyPort);
+        latency_up -> latency_up_proxy(on, Name, ProxyHost, ProxyPort)
+    end.
+
+heal_failure(FailureType, Name, ProxyHost, ProxyPort) ->
+    case FailureType of
+        down -> switch_proxy(on, Name, ProxyHost, ProxyPort);
+        timeout -> timeout_proxy(off, Name, ProxyHost, ProxyPort);
+        latency_up -> latency_up_proxy(off, Name, ProxyHost, ProxyPort)
+    end.
+
+switch_proxy(Switch, Name, ProxyHost, ProxyPort) ->
+    Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name,
+    Body =
+        case Switch of
+            off -> #{<<"enabled">> => false};
+            on -> #{<<"enabled">> => true}
+        end,
+    BodyBin = emqx_json:encode(Body),
+    {ok, {{_, 200, _}, _, _}} = httpc:request(
+        post,
+        {Url, [], "application/json", BodyBin},
+        [],
+        [{body_format, binary}]
+    ).
+
+timeout_proxy(on, Name, ProxyHost, ProxyPort) ->
+    Url =
+        "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
+            "/toxics",
+    NameBin = list_to_binary(Name),
+    Body = #{
+        <<"name">> => <<NameBin/binary, "_timeout">>,
+        <<"type">> => <<"timeout">>,
+        <<"stream">> => <<"upstream">>,
+        <<"toxicity">> => 1.0,
+        <<"attributes">> => #{<<"timeout">> => 0}
+    },
+    BodyBin = emqx_json:encode(Body),
+    {ok, {{_, 200, _}, _, _}} = httpc:request(
+        post,
+        {Url, [], "application/json", BodyBin},
+        [],
+        [{body_format, binary}]
+    );
+timeout_proxy(off, Name, ProxyHost, ProxyPort) ->
+    ToxicName = Name ++ "_timeout",
+    Url =
+        "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
+            "/toxics/" ++ ToxicName,
+    Body = <<>>,
+    {ok, {{_, 204, _}, _, _}} = httpc:request(
+        delete,
+        {Url, [], "application/json", Body},
+        [],
+        [{body_format, binary}]
+    ).
+
+latency_up_proxy(on, Name, ProxyHost, ProxyPort) ->
+    Url =
+        "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
+            "/toxics",
+    NameBin = list_to_binary(Name),
+    Body = #{
+        <<"name">> => <<NameBin/binary, "_latency_up">>,
+        <<"type">> => <<"latency">>,
+        <<"stream">> => <<"upstream">>,
+        <<"toxicity">> => 1.0,
+        <<"attributes">> => #{
+            <<"latency">> => 20_000,
+            <<"jitter">> => 3_000
+        }
+    },
+    BodyBin = emqx_json:encode(Body),
+    {ok, {{_, 200, _}, _, _}} = httpc:request(
+        post,
+        {Url, [], "application/json", BodyBin},
+        [],
+        [{body_format, binary}]
+    );
+latency_up_proxy(off, Name, ProxyHost, ProxyPort) ->
+    ToxicName = Name ++ "_latency_up",
+    Url =
+        "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
+            "/toxics/" ++ ToxicName,
+    Body = <<>>,
+    {ok, {{_, 204, _}, _, _}} = httpc:request(
+        delete,
+        {Url, [], "application/json", Body},
+        [],
+        [{body_format, binary}]
+    ).

+ 2 - 1
lib-ee/emqx_ee_bridge/docker-ct

@@ -1,3 +1,4 @@
+influxdb
+kafka
 mongo
 mongo_rs_sharded
-kafka

+ 857 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl

@@ -0,0 +1,857 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_influxdb_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, with_batch},
+        {group, without_batch}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {with_batch, [
+            {group, sync_query},
+            {group, async_query}
+        ]},
+        {without_batch, [
+            {group, sync_query},
+            {group, async_query}
+        ]},
+        {sync_query, [
+            {group, apiv1_tcp},
+            {group, apiv1_tls},
+            {group, apiv2_tcp},
+            {group, apiv2_tls}
+        ]},
+        {async_query, [
+            {group, apiv1_tcp},
+            {group, apiv1_tls},
+            {group, apiv2_tcp},
+            {group, apiv2_tls}
+        ]},
+        {apiv1_tcp, TCs},
+        {apiv1_tls, TCs},
+        {apiv2_tcp, TCs},
+        {apiv2_tls, TCs}
+    ].
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]),
+    _ = application:stop(emqx_connector),
+    ok.
+
+init_per_group(InfluxDBType, Config0) when
+    InfluxDBType =:= apiv1_tcp;
+    InfluxDBType =:= apiv1_tls
+->
+    #{
+        host := InfluxDBHost,
+        port := InfluxDBPort,
+        use_tls := UseTLS,
+        proxy_name := ProxyName
+    } =
+        case InfluxDBType of
+            apiv1_tcp ->
+                #{
+                    host => os:getenv("INFLUXDB_APIV1_TCP_HOST", "toxiproxy"),
+                    port => list_to_integer(os:getenv("INFLUXDB_APIV1_TCP_PORT", "8086")),
+                    use_tls => false,
+                    proxy_name => "influxdb_tcp"
+                };
+            apiv1_tls ->
+                #{
+                    host => os:getenv("INFLUXDB_APIV1_TLS_HOST", "toxiproxy"),
+                    port => list_to_integer(os:getenv("INFLUXDB_APIV1_TLS_PORT", "8087")),
+                    use_tls => true,
+                    proxy_name => "influxdb_tls"
+                }
+        end,
+    case emqx_common_test_helpers:is_tcp_server_available(InfluxDBHost, InfluxDBPort) of
+        true ->
+            ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+            ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+            ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]),
+            {ok, _} = application:ensure_all_started(emqx_connector),
+            Config = [{use_tls, UseTLS} | Config0],
+            {Name, ConfigString, InfluxDBConfig} = influxdb_config(
+                apiv1, InfluxDBHost, InfluxDBPort, Config
+            ),
+            EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_apiv1">>,
+            EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin),
+            {EHttpcTransport, EHttpcTransportOpts} =
+                case UseTLS of
+                    true -> {tls, [{verify, verify_none}]};
+                    false -> {tcp, []}
+                end,
+            EHttpcPoolOpts = [
+                {host, InfluxDBHost},
+                {port, InfluxDBPort},
+                {pool_size, 1},
+                {transport, EHttpcTransport},
+                {transport_opts, EHttpcTransportOpts}
+            ],
+            {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts),
+            [
+                {proxy_host, ProxyHost},
+                {proxy_port, ProxyPort},
+                {proxy_name, ProxyName},
+                {influxdb_host, InfluxDBHost},
+                {influxdb_port, InfluxDBPort},
+                {influxdb_type, apiv1},
+                {influxdb_config, InfluxDBConfig},
+                {influxdb_config_string, ConfigString},
+                {ehttpc_pool_name, EHttpcPoolName},
+                {influxdb_name, Name}
+                | Config
+            ];
+        false ->
+            {skip, no_influxdb}
+    end;
+init_per_group(InfluxDBType, Config0) when
+    InfluxDBType =:= apiv2_tcp;
+    InfluxDBType =:= apiv2_tls
+->
+    #{
+        host := InfluxDBHost,
+        port := InfluxDBPort,
+        use_tls := UseTLS,
+        proxy_name := ProxyName
+    } =
+        case InfluxDBType of
+            apiv2_tcp ->
+                #{
+                    host => os:getenv("INFLUXDB_APIV2_TCP_HOST", "toxiproxy"),
+                    port => list_to_integer(os:getenv("INFLUXDB_APIV2_TCP_PORT", "8086")),
+                    use_tls => false,
+                    proxy_name => "influxdb_tcp"
+                };
+            apiv2_tls ->
+                #{
+                    host => os:getenv("INFLUXDB_APIV2_TLS_HOST", "toxiproxy"),
+                    port => list_to_integer(os:getenv("INFLUXDB_APIV2_TLS_PORT", "8087")),
+                    use_tls => true,
+                    proxy_name => "influxdb_tls"
+                }
+        end,
+    case emqx_common_test_helpers:is_tcp_server_available(InfluxDBHost, InfluxDBPort) of
+        true ->
+            ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+            ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+            ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]),
+            {ok, _} = application:ensure_all_started(emqx_connector),
+            Config = [{use_tls, UseTLS} | Config0],
+            {Name, ConfigString, InfluxDBConfig} = influxdb_config(
+                apiv2, InfluxDBHost, InfluxDBPort, Config
+            ),
+            EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_apiv2">>,
+            EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin),
+            {EHttpcTransport, EHttpcTransportOpts} =
+                case UseTLS of
+                    true -> {tls, [{verify, verify_none}]};
+                    false -> {tcp, []}
+                end,
+            EHttpcPoolOpts = [
+                {host, InfluxDBHost},
+                {port, InfluxDBPort},
+                {pool_size, 1},
+                {transport, EHttpcTransport},
+                {transport_opts, EHttpcTransportOpts}
+            ],
+            {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts),
+            [
+                {proxy_host, ProxyHost},
+                {proxy_port, ProxyPort},
+                {proxy_name, ProxyName},
+                {influxdb_host, InfluxDBHost},
+                {influxdb_port, InfluxDBPort},
+                {influxdb_type, apiv2},
+                {influxdb_config, InfluxDBConfig},
+                {influxdb_config_string, ConfigString},
+                {ehttpc_pool_name, EHttpcPoolName},
+                {influxdb_name, Name}
+                | Config
+            ];
+        false ->
+            {skip, no_influxdb}
+    end;
+init_per_group(sync_query, Config) ->
+    [{query_mode, sync} | Config];
+init_per_group(async_query, Config) ->
+    [{query_mode, async} | Config];
+init_per_group(with_batch, Config) ->
+    [{enable_batch, true} | Config];
+init_per_group(without_batch, Config) ->
+    [{enable_batch, false} | Config];
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(Group, Config) when
+    Group =:= apiv1_tcp;
+    Group =:= apiv1_tls;
+    Group =:= apiv2_tcp;
+    Group =:= apiv2_tls
+->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    EHttpcPoolName = ?config(ehttpc_pool_name, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ehttpc_sup:stop_pool(EHttpcPoolName),
+    delete_bridge(Config),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_testcase(_Testcase, Config) ->
+    %% catch clear_db(Config),
+    %% delete_bridge(Config),
+    delete_all_bridges(),
+    Config.
+
+end_per_testcase(_Testcase, Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ok = snabbkaffe:stop(),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    %% catch clear_db(Config),
+    %% delete_bridge(Config),
+    delete_all_bridges(),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+example_write_syntax() ->
+    %% N.B.: this single space character is relevant
+    <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
+        "${clientid}_int_value=${payload.int_key}i,",
+        "uint_value=${payload.uint_key}u,"
+        "float_value=${payload.float_key},", "undef_value=${payload.undef},",
+        "${undef_key}=\"hard-coded-value\",", "bool=${payload.bool}">>.
+
+influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
+    EnableBatch = proplists:get_value(enable_batch, Config, true),
+    QueryMode = proplists:get_value(query_mode, Config, sync),
+    UseTLS = proplists:get_value(use_tls, Config, false),
+    Name = atom_to_binary(?MODULE),
+    WriteSyntax = example_write_syntax(),
+    ConfigString =
+        io_lib:format(
+            "bridges.influxdb_api_v1.~s {\n"
+            "  enable = true\n"
+            "  server = \"~p:~b\"\n"
+            "  database = mqtt\n"
+            "  username = root\n"
+            "  password = emqx@123\n"
+            "  precision = ns\n"
+            "  write_syntax = \"~s\"\n"
+            "  resource_opts = {\n"
+            "    enable_batch = ~p\n"
+            "    query_mode = ~s\n"
+            "  }\n"
+            "  ssl {\n"
+            "    enable = ~p\n"
+            "    verify = verify_none\n"
+            "  }\n"
+            "}\n",
+            [Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS]
+        ),
+    {Name, ConfigString, parse_and_check(ConfigString, Type, Name)};
+influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
+    EnableBatch = proplists:get_value(enable_batch, Config, true),
+    QueryMode = proplists:get_value(query_mode, Config, sync),
+    UseTLS = proplists:get_value(use_tls, Config, false),
+    Name = atom_to_binary(?MODULE),
+    WriteSyntax = example_write_syntax(),
+    ConfigString =
+        io_lib:format(
+            "bridges.influxdb_api_v2.~s {\n"
+            "  enable = true\n"
+            "  server = \"~p:~b\"\n"
+            "  bucket = mqtt\n"
+            "  org = emqx\n"
+            "  token = abcdefg\n"
+            "  precision = ns\n"
+            "  write_syntax = \"~s\"\n"
+            "  resource_opts = {\n"
+            "    enable_batch = ~p\n"
+            "    query_mode = ~s\n"
+            "  }\n"
+            "  ssl {\n"
+            "    enable = ~p\n"
+            "    verify = verify_none\n"
+            "  }\n"
+            "}\n",
+            [Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS]
+        ),
+    {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}.
+
+parse_and_check(ConfigString, Type, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    TypeBin = influxdb_type_bin(Type),
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+    Config.
+
+influxdb_type_bin(apiv1) ->
+    <<"influxdb_api_v1">>;
+influxdb_type_bin(apiv2) ->
+    <<"influxdb_api_v2">>.
+
+create_bridge(Config) ->
+    Type = influxdb_type_bin(?config(influxdb_type, Config)),
+    Name = ?config(influxdb_name, Config),
+    InfluxDBConfig = ?config(influxdb_config, Config),
+    emqx_bridge:create(Type, Name, InfluxDBConfig).
+
+delete_bridge(Config) ->
+    Type = influxdb_type_bin(?config(influxdb_type, Config)),
+    Name = ?config(influxdb_name, Config),
+    emqx_bridge:remove(Type, Name).
+
+delete_all_bridges() ->
+    lists:foreach(
+        fun(#{name := Name, type := Type}) ->
+            emqx_bridge:remove(Type, Name)
+        end,
+        emqx_bridge:list()
+    ).
+
+send_message(Config, Payload) ->
+    Name = ?config(influxdb_name, Config),
+    Type = influxdb_type_bin(?config(influxdb_type, Config)),
+    BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
+    emqx_bridge:send_message(BridgeId, Payload).
+
+query_by_clientid(ClientId, Config) ->
+    InfluxDBHost = ?config(influxdb_host, Config),
+    InfluxDBPort = ?config(influxdb_port, Config),
+    EHttpcPoolName = ?config(ehttpc_pool_name, Config),
+    UseTLS = ?config(use_tls, Config),
+    Path = <<"/api/v2/query?org=emqx">>,
+    Scheme =
+        case UseTLS of
+            true -> <<"https://">>;
+            false -> <<"http://">>
+        end,
+    URI = iolist_to_binary([
+        Scheme,
+        list_to_binary(InfluxDBHost),
+        ":",
+        integer_to_binary(InfluxDBPort),
+        Path
+    ]),
+    Query =
+        <<
+            "from(bucket: \"mqtt\")\n"
+            "  |> range(start: -12h)\n"
+            "  |> filter(fn: (r) => r.clientid == \"",
+            ClientId/binary,
+            "\")"
+        >>,
+    Headers = [
+        {"Authorization", "Token abcdefg"},
+        {"Content-Type", "application/json"}
+    ],
+    Body =
+        emqx_json:encode(#{
+            query => Query,
+            dialect => #{
+                header => true,
+                delimiter => <<";">>
+            }
+        }),
+    {ok, 200, _Headers, RawBody0} =
+        ehttpc:request(
+            EHttpcPoolName,
+            post,
+            {URI, Headers, Body},
+            _Timeout = 10_000,
+            _Retry = 0
+        ),
+    RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)),
+    {ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}),
+    DecodedCSV1 = [
+        [Field || Field <- Line, Field =/= <<>>]
+     || Line <- DecodedCSV0,
+        Line =/= [<<>>]
+    ],
+    DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []),
+    index_by_field(DecodedCSV2).
+
+decode_csv(RawBody) ->
+    Lines =
+        [
+            binary:split(Line, [<<";">>], [global, trim_all])
+         || Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all])
+        ],
+    csv_lines_to_maps(Lines, []).
+
+csv_lines_to_maps([Fields, Data | Rest], Acc) ->
+    Map = maps:from_list(lists:zip(Fields, Data)),
+    csv_lines_to_maps(Rest, [Map | Acc]);
+csv_lines_to_maps(_Data, Acc) ->
+    lists:reverse(Acc).
+
+index_by_field(DecodedCSV) ->
+    maps:from_list([{Field, Data} || Data = #{<<"_field">> := Field} <- DecodedCSV]).
+
+assert_persisted_data(ClientId, Expected, PersistedData) ->
+    ClientIdIntKey = <<ClientId/binary, "_int_value">>,
+    maps:foreach(
+        fun
+            (int_value, ExpectedValue) ->
+                ?assertMatch(
+                    #{<<"_value">> := ExpectedValue},
+                    maps:get(ClientIdIntKey, PersistedData)
+                );
+            (Key, ExpectedValue) ->
+                ?assertMatch(
+                    #{<<"_value">> := ExpectedValue},
+                    maps:get(atom_to_binary(Key), PersistedData),
+                    #{expected => ExpectedValue}
+                )
+        end,
+        Expected
+    ),
+    ok.
+
+resource_id(Config) ->
+    Type = influxdb_type_bin(?config(influxdb_type, Config)),
+    Name = ?config(influxdb_name, Config),
+    emqx_bridge_resource:resource_id(Type, Name).
+
+instance_id(Config) ->
+    ResourceId = resource_id(Config),
+    [{_, InstanceId}] = ets:lookup(emqx_resource_manager, {owner, ResourceId}),
+    InstanceId.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_start_ok(Config) ->
+    QueryMode = ?config(query_mode, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{
+        int_key => -123,
+        bool => true,
+        float_key => 24.5,
+        uint_key => 123
+    },
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"timestamp">> => erlang:system_time(nanosecond),
+        <<"payload">> => Payload
+    },
+    ?check_trace(
+        begin
+            ?assertEqual(ok, send_message(Config, SentData)),
+            case QueryMode of
+                async -> ct:sleep(500);
+                sync -> ok
+            end,
+            PersistedData = query_by_clientid(ClientId, Config),
+            Expected = #{
+                bool => <<"true">>,
+                int_value => <<"-123">>,
+                uint_value => <<"123">>,
+                float_value => <<"24.5">>,
+                payload => emqx_json:encode(Payload)
+            },
+            assert_persisted_data(ClientId, Expected, PersistedData),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(influxdb_connector_send_query, Trace0),
+            ?assertMatch([#{points := [_]}], Trace),
+            [#{points := [Point]}] = Trace,
+            ct:pal("sent point: ~p", [Point]),
+            ?assertMatch(
+                #{
+                    fields := #{},
+                    measurement := <<_/binary>>,
+                    tags := #{},
+                    timestamp := TS
+                } when is_integer(TS),
+                Point
+            ),
+            #{fields := Fields} = Point,
+            ?assert(lists:all(fun is_binary/1, maps:keys(Fields))),
+            ?assertNot(maps:is_key(<<"undefined">>, Fields)),
+            ?assertNot(maps:is_key(<<"undef_value">>, Fields)),
+            ok
+        end
+    ),
+    ok.
+
+t_start_already_started(Config) ->
+    Type = influxdb_type_bin(?config(influxdb_type, Config)),
+    Name = ?config(influxdb_name, Config),
+    InfluxDBConfigString = ?config(influxdb_config_string, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    InstanceId = instance_id(Config),
+    TypeAtom = binary_to_atom(Type),
+    NameAtom = binary_to_atom(Name),
+    {ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check(
+        emqx_bridge_schema, InfluxDBConfigString
+    ),
+    ?check_trace(
+        emqx_ee_connector_influxdb:on_start(InstanceId, InfluxDBConfigMap),
+        fun(Result, Trace) ->
+            ?assertMatch({ok, _}, Result),
+            ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),
+            ok
+        end
+    ),
+    ok.
+
+t_start_ok_timestamp_write_syntax(Config) ->
+    InfluxDBType = ?config(influxdb_type, Config),
+    InfluxDBName = ?config(influxdb_name, Config),
+    InfluxDBConfigString0 = ?config(influxdb_config_string, Config),
+    InfluxDBTypeCfg =
+        case InfluxDBType of
+            apiv1 -> "influxdb_api_v1";
+            apiv2 -> "influxdb_api_v2"
+        end,
+    WriteSyntax =
+        %% N.B.: this single space characters are relevant
+        <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
+            "${clientid}_int_value=${payload.int_key}i,",
+            "uint_value=${payload.uint_key}u,"
+            "bool=${payload.bool}", " ", "${timestamp}">>,
+    %% append this to override the config
+    InfluxDBConfigString1 =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  write_syntax = \"~s\"\n"
+            "}\n",
+            [InfluxDBTypeCfg, InfluxDBName, WriteSyntax]
+        ),
+    InfluxDBConfig1 = parse_and_check(
+        InfluxDBConfigString0 ++ InfluxDBConfigString1,
+        InfluxDBType,
+        InfluxDBName
+    ),
+    Config1 = [{influxdb_config, InfluxDBConfig1} | Config],
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config1)
+    ),
+    ok.
+
+t_start_ok_no_subject_tags_write_syntax(Config) ->
+    InfluxDBType = ?config(influxdb_type, Config),
+    InfluxDBName = ?config(influxdb_name, Config),
+    InfluxDBConfigString0 = ?config(influxdb_config_string, Config),
+    InfluxDBTypeCfg =
+        case InfluxDBType of
+            apiv1 -> "influxdb_api_v1";
+            apiv2 -> "influxdb_api_v2"
+        end,
+    WriteSyntax =
+        %% N.B.: this single space characters are relevant
+        <<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,",
+            "uint_value=${payload.uint_key}u,"
+            "bool=${payload.bool}", " ", "${timestamp}">>,
+    %% append this to override the config
+    InfluxDBConfigString1 =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  write_syntax = \"~s\"\n"
+            "}\n",
+            [InfluxDBTypeCfg, InfluxDBName, WriteSyntax]
+        ),
+    InfluxDBConfig1 = parse_and_check(
+        InfluxDBConfigString0 ++ InfluxDBConfigString1,
+        InfluxDBType,
+        InfluxDBName
+    ),
+    Config1 = [{influxdb_config, InfluxDBConfig1} | Config],
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config1)
+    ),
+    ok.
+
+t_boolean_variants(Config) ->
+    QueryMode = ?config(query_mode, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    BoolVariants = #{
+        true => true,
+        false => false,
+        <<"t">> => true,
+        <<"f">> => false,
+        <<"T">> => true,
+        <<"F">> => false,
+        <<"TRUE">> => true,
+        <<"FALSE">> => false,
+        <<"True">> => true,
+        <<"False">> => false
+    },
+    maps:foreach(
+        fun(BoolVariant, Translation) ->
+            ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+            Payload = #{
+                int_key => -123,
+                bool => BoolVariant,
+                uint_key => 123
+            },
+            SentData = #{
+                <<"clientid">> => ClientId,
+                <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+                <<"timestamp">> => erlang:system_time(nanosecond),
+                <<"payload">> => Payload
+            },
+            ?assertEqual(ok, send_message(Config, SentData)),
+            case QueryMode of
+                async -> ct:sleep(500);
+                sync -> ok
+            end,
+            PersistedData = query_by_clientid(ClientId, Config),
+            Expected = #{
+                bool => atom_to_binary(Translation),
+                int_value => <<"-123">>,
+                uint_value => <<"123">>,
+                payload => emqx_json:encode(Payload)
+            },
+            assert_persisted_data(ClientId, Expected, PersistedData),
+            ok
+        end,
+        BoolVariants
+    ),
+    ok.
+
+t_bad_timestamp(Config) ->
+    InfluxDBType = ?config(influxdb_type, Config),
+    InfluxDBName = ?config(influxdb_name, Config),
+    QueryMode = ?config(query_mode, Config),
+    EnableBatch = ?config(enable_batch, Config),
+    InfluxDBConfigString0 = ?config(influxdb_config_string, Config),
+    InfluxDBTypeCfg =
+        case InfluxDBType of
+            apiv1 -> "influxdb_api_v1";
+            apiv2 -> "influxdb_api_v2"
+        end,
+    WriteSyntax =
+        %% N.B.: this single space characters are relevant
+        <<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,",
+            "uint_value=${payload.uint_key}u,"
+            "bool=${payload.bool}", " ", "bad_timestamp">>,
+    %% append this to override the config
+    InfluxDBConfigString1 =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  write_syntax = \"~s\"\n"
+            "}\n",
+            [InfluxDBTypeCfg, InfluxDBName, WriteSyntax]
+        ),
+    InfluxDBConfig1 = parse_and_check(
+        InfluxDBConfigString0 ++ InfluxDBConfigString1,
+        InfluxDBType,
+        InfluxDBName
+    ),
+    Config1 = [{influxdb_config, InfluxDBConfig1} | Config],
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config1)
+    ),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{
+        int_key => -123,
+        bool => false,
+        uint_key => 123
+    },
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"timestamp">> => erlang:system_time(nanosecond),
+        <<"payload">> => Payload
+    },
+    ?check_trace(
+        ?wait_async_action(
+            send_message(Config1, SentData),
+            #{?snk_kind := influxdb_connector_send_query_error},
+            10_000
+        ),
+        fun(Result, Trace) ->
+            ?assertMatch({_, {ok, _}}, Result),
+            {Return, {ok, _}} = Result,
+            case {QueryMode, EnableBatch} of
+                {async, true} ->
+                    ?assertEqual(ok, Return),
+                    ?assertMatch(
+                        [#{error := points_trans_failed}],
+                        ?of_kind(influxdb_connector_send_query_error, Trace)
+                    );
+                {async, false} ->
+                    ?assertEqual(ok, Return),
+                    ?assertMatch(
+                        [#{error := [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}],
+                        ?of_kind(influxdb_connector_send_query_error, Trace)
+                    );
+                {sync, false} ->
+                    ?assertEqual(
+                        {error, [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}, Return
+                    );
+                {sync, true} ->
+                    ?assertEqual({error, points_trans_failed}, Return)
+            end,
+            ok
+        end
+    ),
+    ok.
+
+t_get_status(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    {ok, _} = create_bridge(Config),
+    ResourceId = resource_id(Config),
+    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
+    end),
+    ok.
+
+t_create_disconnected(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ?check_trace(
+        emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            ?assertMatch({ok, _}, create_bridge(Config))
+        end),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{error := influxdb_client_not_alive}],
+                ?of_kind(influxdb_connector_start_failed, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_start_error(Config) ->
+    %% simulate client start error
+    ?check_trace(
+        emqx_common_test_helpers:with_mock(
+            influxdb,
+            start_client,
+            fun(_Config) -> {error, some_error} end,
+            fun() ->
+                ?wait_async_action(
+                    ?assertMatch({ok, _}, create_bridge(Config)),
+                    #{?snk_kind := influxdb_connector_start_failed},
+                    10_000
+                )
+            end
+        ),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{error := some_error}],
+                ?of_kind(influxdb_connector_start_failed, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_start_exception(Config) ->
+    %% simulate client start exception
+    ?check_trace(
+        emqx_common_test_helpers:with_mock(
+            influxdb,
+            start_client,
+            fun(_Config) -> error(boom) end,
+            fun() ->
+                ?wait_async_action(
+                    ?assertMatch({ok, _}, create_bridge(Config)),
+                    #{?snk_kind := influxdb_connector_start_exception},
+                    10_000
+                )
+            end
+        ),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{error := {error, boom}}],
+                ?of_kind(influxdb_connector_start_exception, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_write_failure(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    QueryMode = ?config(query_mode, Config),
+    {ok, _} = create_bridge(Config),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{
+        int_key => -123,
+        bool => true,
+        float_key => 24.5,
+        uint_key => 123
+    },
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"timestamp">> => erlang:system_time(nanosecond),
+        <<"payload">> => Payload
+    },
+    ?check_trace(
+        emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            send_message(Config, SentData)
+        end),
+        fun(Result, _Trace) ->
+            case QueryMode of
+                sync ->
+                    ?assert(
+                        {error, {error, {closed, "The connection was lost."}}} =:= Result orelse
+                            {error, {error, closed}} =:= Result orelse
+                            {error, {error, econnrefused}} =:= Result,
+                        #{got => Result}
+                    );
+                async ->
+                    ?assertEqual(ok, Result)
+            end,
+            ok
+        end
+    ),
+    ok.

+ 111 - 6
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -9,6 +9,7 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
@@ -51,8 +52,16 @@ on_stop(_InstId, #{client := Client}) ->
 on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
     case data_to_points(Data, SyntaxLines) of
         {ok, Points} ->
+            ?tp(
+                influxdb_connector_send_query,
+                #{points => Points, batch => false, mode => sync}
+            ),
             do_query(InstId, Client, Points);
         {error, ErrorPoints} = Err ->
+            ?tp(
+                influxdb_connector_send_query_error,
+                #{batch => false, mode => sync, error => ErrorPoints}
+            ),
             log_error_points(InstId, ErrorPoints),
             Err
     end.
@@ -62,8 +71,16 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
 on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
     case parse_batch_data(InstId, BatchData, SyntaxLines) of
         {ok, Points} ->
+            ?tp(
+                influxdb_connector_send_query,
+                #{points => Points, batch => true, mode => sync}
+            ),
             do_query(InstId, Client, Points);
         {error, Reason} ->
+            ?tp(
+                influxdb_connector_send_query_error,
+                #{batch => true, mode => sync, error => Reason}
+            ),
             {error, Reason}
     end.
 
@@ -75,8 +92,16 @@ on_query_async(
 ) ->
     case data_to_points(Data, SyntaxLines) of
         {ok, Points} ->
+            ?tp(
+                influxdb_connector_send_query,
+                #{points => Points, batch => false, mode => async}
+            ),
             do_async_query(InstId, Client, Points, {ReplayFun, Args});
         {error, ErrorPoints} = Err ->
+            ?tp(
+                influxdb_connector_send_query_error,
+                #{batch => false, mode => async, error => ErrorPoints}
+            ),
             log_error_points(InstId, ErrorPoints),
             Err
     end.
@@ -89,8 +114,16 @@ on_batch_query_async(
 ) ->
     case parse_batch_data(InstId, BatchData, SyntaxLines) of
         {ok, Points} ->
+            ?tp(
+                influxdb_connector_send_query,
+                #{points => Points, batch => true, mode => async}
+            ),
             do_async_query(InstId, Client, Points, {ReplayFun, Args});
         {error, Reason} ->
+            ?tp(
+                influxdb_connector_send_query_error,
+                #{batch => true, mode => async, error => Reason}
+            ),
             {error, Reason}
     end.
 
@@ -163,6 +196,7 @@ start_client(InstId, Config) ->
         do_start_client(InstId, ClientConfig, Config)
     catch
         E:R:S ->
+            ?tp(influxdb_connector_start_exception, #{error => {E, R}}),
             ?SLOG(error, #{
                 msg => "start influxdb connector error",
                 connector => InstId,
@@ -196,6 +230,7 @@ do_start_client(
                     }),
                     {ok, State};
                 false ->
+                    ?tp(influxdb_connector_start_failed, #{error => influxdb_client_not_alive}),
                     ?SLOG(error, #{
                         msg => "starting influxdb connector failed",
                         connector => InstId,
@@ -205,14 +240,16 @@ do_start_client(
                     {error, influxdb_client_not_alive}
             end;
         {error, {already_started, Client0}} ->
+            ?tp(influxdb_connector_start_already_started, #{}),
             ?SLOG(info, #{
-                msg => "starting influxdb connector,find already started client",
+                msg => "restarting influxdb connector, found already started client",
                 connector => InstId,
                 old_client => Client0
             }),
             _ = influxdb:stop_client(Client0),
             do_start_client(InstId, ClientConfig, Config);
         {error, Reason} ->
+            ?tp(influxdb_connector_start_failed, #{error => Reason}),
             ?SLOG(error, #{
                 msg => "starting influxdb connector failed",
                 connector => InstId,
@@ -235,7 +272,7 @@ client_config(
         {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
     ] ++ protocol_config(Config).
 
-%% api v2 config
+%% api v1 config
 protocol_config(#{
     username := Username,
     password := Password,
@@ -249,7 +286,7 @@ protocol_config(#{
         {password, str(Password)},
         {database, str(DB)}
     ] ++ ssl_config(SSL);
-%% api v1 config
+%% api v2 config
 protocol_config(#{
     bucket := Bucket,
     org := Org,
@@ -290,6 +327,7 @@ do_query(InstId, Client, Points) ->
                 points => Points
             });
         {error, Reason} = Err ->
+            ?tp(influxdb_connector_do_query_failure, #{error => Reason}),
             ?SLOG(error, #{
                 msg => "influxdb write point failed",
                 connector => InstId,
@@ -370,6 +408,14 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
             {error, points_trans_failed}
     end.
 
+-spec data_to_points(map(), [
+    #{
+        fields := [{binary(), binary()}],
+        measurement := binary(),
+        tags := [{binary(), binary()}],
+        timestamp := binary()
+    }
+]) -> {ok, [map()]} | {error, term()}.
 data_to_points(Data, SyntaxLines) ->
     lines_to_points(Data, SyntaxLines, [], []).
 
@@ -416,17 +462,18 @@ lines_to_points(
     end.
 
 maps_config_to_data(K, V, {Data, Res}) ->
-    KTransOptions = #{return => full_binary},
+    KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
     VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
-    NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions),
+    NK0 = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions),
     NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions),
-    case {NK, NV} of
+    case {NK0, NV} of
         {[undefined], _} ->
             {Data, Res};
         %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
         {_, [undefined | _]} ->
             {Data, Res};
         _ ->
+            NK = list_to_binary(NK0),
             {Data, Res#{NK => value_type(NV)}}
     end.
 
@@ -438,6 +485,8 @@ value_type([UInt, <<"u">>]) when
     is_integer(UInt)
 ->
     {uint, UInt};
+value_type([Float]) when is_float(Float) ->
+    Float;
 value_type([<<"t">>]) ->
     't';
 value_type([<<"T">>]) ->
@@ -461,6 +510,9 @@ value_type([<<"False">>]) ->
 value_type(Val) ->
     Val.
 
+key_filter(undefined) -> undefined;
+key_filter(Value) -> emqx_plugin_libs_rule:bin(Value).
+
 data_filter(undefined) -> undefined;
 data_filter(Int) when is_integer(Int) -> Int;
 data_filter(Number) when is_number(Number) -> Number;
@@ -500,3 +552,56 @@ str(B) when is_binary(B) ->
     binary_to_list(B);
 str(S) when is_list(S) ->
     S.
+
+%%===================================================================
+%% eunit tests
+%%===================================================================
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+to_server_raw_test_() ->
+    [
+        ?_assertEqual(
+            {"foobar", 1234},
+            to_server_raw(<<"http://foobar:1234">>)
+        ),
+        ?_assertEqual(
+            {"foobar", 1234},
+            to_server_raw(<<"https://foobar:1234">>)
+        ),
+        ?_assertEqual(
+            {"foobar", 1234},
+            to_server_raw(<<"foobar:1234">>)
+        )
+    ].
+
+%% for coverage
+desc_test_() ->
+    [
+        ?_assertMatch(
+            {desc, _, _},
+            desc(common)
+        ),
+        ?_assertMatch(
+            {desc, _, _},
+            desc(influxdb_udp)
+        ),
+        ?_assertMatch(
+            {desc, _, _},
+            desc(influxdb_api_v1)
+        ),
+        ?_assertMatch(
+            {desc, _, _},
+            desc(influxdb_api_v2)
+        ),
+        ?_assertMatch(
+            {desc, _, _},
+            server(desc)
+        ),
+        ?_assertMatch(
+            connector_influxdb,
+            namespace()
+        )
+    ].
+-endif.

+ 2 - 1
rebar.config.erl

@@ -150,7 +150,8 @@ test_deps() ->
         {bbmustache, "1.10.0"},
         {meck, "0.9.2"},
         {proper, "1.4.0"},
-        {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}}
+        {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}},
+        {erl_csv, "0.2.0"}
     ].
 
 common_compile_opts(Vsn) ->

+ 5 - 0
scripts/ct/run.sh

@@ -92,6 +92,11 @@ for dep in ${CT_DEPS}; do
         erlang24)
             FILES+=( '.ci/docker-compose-file/docker-compose.yaml' )
             ;;
+        influxdb)
+            FILES+=( '.ci/docker-compose-file/docker-compose-toxiproxy.yaml'
+                     '.ci/docker-compose-file/docker-compose-influxdb-tcp.yaml'
+                     '.ci/docker-compose-file/docker-compose-influxdb-tls.yaml' )
+            ;;
         mongo)
             FILES+=( '.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml'
                      '.ci/docker-compose-file/docker-compose-mongo-single-tls.yaml' )