Browse Source

test: make all test passed

JianBo He 1 year ago
parent
commit
6959c4ef50

+ 1 - 0
.ci/docker-compose-file/.env

@@ -12,6 +12,7 @@ OPENTS_TAG=9aa7f88
 KINESIS_TAG=2.1
 HSTREAMDB_TAG=v0.19.3
 HSTREAMDB_ZK_TAG=3.8.1
+DATALAYERS_TAG=nightly
 
 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server
 SQLSERVER_TAG=2019-CU19-ubuntu-20.04

+ 13 - 0
.ci/docker-compose-file/docker-compose-datalayers-tcp.yaml

@@ -0,0 +1,13 @@
+version: '3.9'
+
+services:
+  datalayers_server_tcp:
+    container_name: datalayers_tcp
+    image: datalayers/datalayers:${DATALAYERS_TAG}
+    expose:
+      - "8361"
+    environment:
+      DATALAYERS_SERVER__TIMEZONE: "Etc/GMT"
+    restart: always
+    networks:
+      - emqx_bridge

+ 19 - 0
.ci/docker-compose-file/docker-compose-datalayers-tls.yaml

@@ -0,0 +1,19 @@
+version: '3.9'
+
+services:
+  datalayers_server_tls:
+    container_name: datalayers_tls
+    image: datalayers/datalayers:${DATALAYERS_TAG}
+    expose:
+      - "8362"
+    environment:
+      DATALAYERS_SERVER__TIMEZONE: "Etc/GMT"
+      DATALAYERS_SERVER__HTTP: "0.0.0.0:8362"
+      DATALAYERS_SERVER__TLS__KEY: "/etc/datalayers/key.pem"
+      DATALAYERS_SERVER__TLS__CERT: "/etc/datalayers/cert.pem"
+    volumes:
+      - ./certs/server.crt:/etc/datalayers/cert.pem
+      - ./certs/server.key:/etc/datalayers/key.pem
+    restart: always
+    networks:
+      - emqx_bridge

+ 11 - 0
.ci/docker-compose-file/docker-compose-datalayers.yaml

@@ -0,0 +1,11 @@
+version: '3.9'
+
+services:
+  datalayers_server_tcp:
+    container_name: datalayers_tcp
+    image: datalayers/datalayers:${DATALAYERS_TAG}
+    expose:
+      - "8361"
+    restart: always
+    networks:
+      - emqx_bridge

+ 3 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -58,6 +58,9 @@ services:
       # GreptimeDB
       - 4000:4000
       - 4001:4001
+      # Datalayers
+      - 8361:8361
+      - 8362:8362
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 12 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -239,5 +239,17 @@
     "listen": "0.0.0.0:8093",
     "upstream": "couchbase:8093",
     "enabled": true
+  },
+  {
+    "name": "datalayers_tcp",
+    "listen": "0.0.0.0:8361",
+    "upstream": "datalayers_tcp:8361",
+    "enabled": true
+  },
+  {
+    "name": "datalayers_tls",
+    "listen": "0.0.0.0:8362",
+    "upstream": "datalayers_tls:8362",
+    "enabled": true
   }
 ]

+ 2 - 2
apps/emqx_bridge_datalayers/src/emqx_bridge_datalayers.erl

@@ -101,7 +101,7 @@ connector_values_v(datalayers_api_v1) ->
 basic_connector_values() ->
     #{
         enable => true,
-        server => <<"127.0.0.1:8086">>,
+        server => <<"127.0.0.1:8361">>,
         ssl => #{enable => false}
     }.
 
@@ -129,7 +129,7 @@ values(common, Protocol, SupportUint, TypeOpts) ->
             batch_size => 100,
             batch_time => <<"20ms">>
         },
-        server => <<"127.0.0.1:8086">>,
+        server => <<"127.0.0.1:8361">>,
         ssl => #{enable => false}
     },
     maps:merge(TypeOpts, CommonConfigs).

+ 24 - 27
apps/emqx_bridge_datalayers/src/emqx_bridge_datalayers_connector.erl

@@ -215,8 +215,7 @@ on_format_query_result(Result) ->
     emqx_bridge_http_connector:on_format_query_result(Result).
 
 on_get_status(_InstId, #{client := Client}) ->
-    %case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
-    case influxdb:is_alive(Client) of
+    case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
         true ->
             connected;
         false ->
@@ -362,31 +361,29 @@ do_start_client(InstId, ClientConfig, Config) ->
         {ok, Client} ->
             case influxdb:is_alive(Client, true) of
                 true ->
-                    State = #{client => Client, channels => #{}},
-                    {ok, State};
-                %case influxdb:check_auth(Client) of
-                %    ok ->
-                %        State = #{client => Client, channels => #{}},
-                %        ?SLOG(info, #{
-                %            msg => "starting_datalayers_connector_success",
-                %            connector => InstId,
-                %            client => redact_auth(Client),
-                %            state => redact_auth(State)
-                %        }),
-                %        {ok, State};
-                %    Error ->
-                %        ?tp(datalayers_connector_start_failed, #{error => auth_error}),
-                %        ?SLOG(warning, #{
-                %            msg => "failed_to_start_datalayers_connector",
-                %            error => Error,
-                %            connector => InstId,
-                %            client => redact_auth(Client),
-                %            reason => auth_error
-                %        }),
-                %        %% no leak
-                %        _ = influxdb:stop_client(Client),
-                %        {error, connect_ok_but_auth_failed}
-                %end;
+                    case influxdb:check_auth(Client) of
+                        ok ->
+                            State = #{client => Client, channels => #{}},
+                            ?SLOG(info, #{
+                                msg => "starting_datalayers_connector_success",
+                                connector => InstId,
+                                client => redact_auth(Client),
+                                state => redact_auth(State)
+                            }),
+                            {ok, State};
+                        Error ->
+                            ?tp(datalayers_connector_start_failed, #{error => auth_error}),
+                            ?SLOG(warning, #{
+                                msg => "failed_to_start_datalayers_connector",
+                                error => Error,
+                                connector => InstId,
+                                client => redact_auth(Client),
+                                reason => auth_error
+                            }),
+                            %% no leak
+                            _ = influxdb:stop_client(Client),
+                            {error, connect_ok_but_auth_failed}
+                    end;
                 {false, Reason} ->
                     ?tp(datalayers_connector_start_failed, #{
                         error => datalayers_client_not_alive, reason => Reason

File diff suppressed because it is too large
+ 253 - 392
apps/emqx_bridge_datalayers/test/emqx_bridge_influxdb_SUITE.erl


+ 59 - 43
apps/emqx_bridge_datalayers/test/emqx_bridge_influxdb_connector_SUITE.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_bridge_influxdb_connector_SUITE).
+-module(emqx_bridge_datalayers_connector_SUITE).
 
 -compile(nowarn_export_all).
 -compile(export_all).
@@ -11,7 +11,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--define(INFLUXDB_RESOURCE_MOD, emqx_bridge_influxdb_connector).
+-define(DATALAYERS_RESOURCE_MOD, emqx_bridge_datalayers_connector).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -20,35 +20,51 @@ groups() ->
     [].
 
 init_per_suite(Config) ->
-    InfluxDBTCPHost = os:getenv("INFLUXDB_APIV2_TCP_HOST", "toxiproxy"),
-    InfluxDBTCPPort = list_to_integer(os:getenv("INFLUXDB_APIV2_TCP_PORT", "8086")),
-    InfluxDBTLSHost = os:getenv("INFLUXDB_APIV2_TLS_HOST", "toxiproxy"),
-    InfluxDBTLSPort = list_to_integer(os:getenv("INFLUXDB_APIV2_TLS_PORT", "8087")),
-    Servers = [{InfluxDBTCPHost, InfluxDBTCPPort}, {InfluxDBTLSHost, InfluxDBTLSPort}],
+    DatalayersTCPHost = os:getenv("DATALAYERS_TCP_HOST", "toxiproxy"),
+    DatalayersTCPPort = list_to_integer(os:getenv("DATALAYERS_TCP_PORT", "8361")),
+    DatalayersTLSHost = os:getenv("DATALAYERS_TLS_HOST", "toxiproxy"),
+    DatalayersTLSPort = list_to_integer(os:getenv("DATALAYERS_TLS_PORT", "8362")),
+    Servers = [{DatalayersTCPHost, DatalayersTCPPort}, {DatalayersTLSHost, DatalayersTLSPort}],
     case emqx_common_test_helpers:is_all_tcp_servers_available(Servers) of
         true ->
             Apps = emqx_cth_suite:start(
                 [
                     emqx_conf,
-                    emqx_bridge_influxdb,
+                    emqx_bridge_datalayers,
                     emqx_bridge
                 ],
                 #{work_dir => emqx_cth_suite:work_dir(Config)}
             ),
-            [
-                {apps, Apps},
-                {influxdb_tcp_host, InfluxDBTCPHost},
-                {influxdb_tcp_port, InfluxDBTCPPort},
-                {influxdb_tls_host, InfluxDBTLSHost},
-                {influxdb_tls_port, InfluxDBTLSPort}
-                | Config
-            ];
+            EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_apiv1">>,
+            EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin),
+            EHttpcPoolOpts = [
+                {host, DatalayersTCPHost},
+                {port, DatalayersTCPPort},
+                {pool_size, 1},
+                {transport, tcp},
+                {transport_opts, []}
+            ],
+
+            {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts),
+
+            NewConfig =
+                [
+                    {apps, Apps},
+                    {datalayers_host, DatalayersTCPHost},
+                    {datalayers_port, DatalayersTCPPort},
+                    {datalayers_tls_host, DatalayersTLSHost},
+                    {datalayers_tls_port, DatalayersTLSPort},
+                    {ehttpc_pool_name, EHttpcPoolName}
+                    | Config
+                ],
+            emqx_bridge_datalayers_SUITE:ensure_database(NewConfig),
+            NewConfig;
         false ->
             case os:getenv("IS_CI") of
                 "yes" ->
-                    throw(no_influxdb);
+                    throw(no_datalayers);
                 _ ->
-                    {skip, no_influxdb}
+                    {skip, no_datalayers}
             end
     end.
 
@@ -68,19 +84,19 @@ end_per_testcase(_, _Config) ->
 % %%------------------------------------------------------------------------------
 
 t_lifecycle(Config) ->
-    Host = ?config(influxdb_tcp_host, Config),
-    Port = ?config(influxdb_tcp_port, Config),
+    Host = ?config(datalayers_host, Config),
+    Port = ?config(datalayers_port, Config),
     perform_lifecycle_check(
-        <<"emqx_bridge_influxdb_connector_SUITE">>,
-        influxdb_connector_config(Host, Port, false, <<"verify_none">>)
+        <<"emqx_bridge_datalayers_connector_SUITE">>,
+        datalayers_connector_config(Host, Port, false, <<"verify_none">>)
     ).
 
 perform_lifecycle_check(PoolName, InitialConfig) ->
     {ok, #{config := CheckedConfig}} =
-        emqx_resource:check_config(?INFLUXDB_RESOURCE_MOD, InitialConfig),
+        emqx_resource:check_config(?DATALAYERS_RESOURCE_MOD, InitialConfig),
     % We need to add a write_syntax to the config since the connector
     % expects this
-    FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
+    FullConfig = CheckedConfig#{write_syntax => datalayers_write_syntax()},
     {ok, #{
         id := ResourceId,
         state := #{client := #{pool := ReturnedPoolName}} = State,
@@ -88,7 +104,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
     }} = emqx_resource:create_local(
         PoolName,
         ?CONNECTOR_RESOURCE_GROUP,
-        ?INFLUXDB_RESOURCE_MOD,
+        ?DATALAYERS_RESOURCE_MOD,
         FullConfig,
         #{}
     ),
@@ -101,7 +117,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
         emqx_resource:get_instance(PoolName),
     ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
     %% install actions to the connector
-    ActionConfig = influxdb_action_config(),
+    ActionConfig = datalayers_action_config(),
     ChannelId = <<"test_channel">>,
     ?assertEqual(
         ok,
@@ -150,9 +166,9 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
 
 t_tls_verify_none(Config) ->
     PoolName = <<"testpool-1">>,
-    Host = ?config(influxdb_tls_host, Config),
-    Port = ?config(influxdb_tls_port, Config),
-    InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_none">>),
+    Host = ?config(datalayers_tls_host, Config),
+    Port = ?config(datalayers_tls_port, Config),
+    InitialConfig = datalayers_connector_config(Host, Port, true, <<"verify_none">>),
     ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
     ?assertEqual(connected, ValidStatus),
     InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail),
@@ -161,9 +177,9 @@ t_tls_verify_none(Config) ->
 
 t_tls_verify_peer(Config) ->
     PoolName = <<"testpool-2">>,
-    Host = ?config(influxdb_tls_host, Config),
-    Port = ?config(influxdb_tls_port, Config),
-    InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_peer">>),
+    Host = ?config(datalayers_tls_host, Config),
+    Port = ?config(datalayers_tls_port, Config),
+    InitialConfig = datalayers_connector_config(Host, Port, true, <<"verify_peer">>),
     %% This works without a CA-cert & friends since we are using a mock
     ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
     ?assertEqual(connected, ValidStatus),
@@ -173,7 +189,7 @@ t_tls_verify_peer(Config) ->
 
 perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
     {ok, #{config := CheckedConfig}} =
-        emqx_resource:check_config(?INFLUXDB_RESOURCE_MOD, InitialConfig),
+        emqx_resource:check_config(?DATALAYERS_RESOURCE_MOD, InitialConfig),
     % Meck handling of TLS opt handling so that we can inject custom
     % verification returns
     meck:new(emqx_tls_lib, [passthrough, no_link]),
@@ -192,14 +208,14 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
     try
         % We need to add a write_syntax to the config since the connector
         % expects this
-        FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
+        FullConfig = CheckedConfig#{write_syntax => datalayers_write_syntax()},
         {ok, #{
             config := #{ssl := #{enable := SslEnabled}},
             status := Status
         }} = emqx_resource:create_local(
             PoolName,
             ?CONNECTOR_RESOURCE_GROUP,
-            ?INFLUXDB_RESOURCE_MOD,
+            ?DATALAYERS_RESOURCE_MOD,
             FullConfig,
             #{}
         ),
@@ -216,14 +232,14 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
 % %% Helpers
 % %%------------------------------------------------------------------------------
 
-influxdb_connector_config(Host, Port, SslEnabled, Verify) ->
+datalayers_connector_config(Host, Port, SslEnabled, Verify) ->
     Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])),
     ConnectorConf = #{
         <<"parameters">> => #{
-            <<"influxdb_type">> => <<"influxdb_api_v2">>,
-            <<"bucket">> => <<"mqtt">>,
-            <<"org">> => <<"emqx">>,
-            <<"token">> => <<"abcdefg">>
+            <<"datalayers_type">> => <<"datalayers_api_v1">>,
+            <<"database">> => <<"mqtt">>,
+            <<"username">> => <<"admin">>,
+            <<"password">> => <<"public">>
         },
         <<"server">> => Server,
         <<"ssl">> => #{
@@ -233,10 +249,10 @@ influxdb_connector_config(Host, Port, SslEnabled, Verify) ->
     },
     #{<<"config">> => ConnectorConf}.
 
-influxdb_action_config() ->
+datalayers_action_config() ->
     #{
         parameters => #{
-            write_syntax => influxdb_write_syntax(),
+            write_syntax => datalayers_write_syntax(),
             precision => ms
         }
     }.
@@ -253,7 +269,7 @@ custom_verify() ->
             {fail, unexpected_call_to_verify_fun}
     end.
 
-influxdb_write_syntax() ->
+datalayers_write_syntax() ->
     [
         #{
             measurement => "${topic}",

+ 0 - 392
apps/emqx_bridge_datalayers/test/emqx_bridge_influxdb_tests.erl

@@ -1,392 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
--module(emqx_bridge_influxdb_tests).
-
--include_lib("eunit/include/eunit.hrl").
-
--define(INVALID_LINES, [
-    "   ",
-    " \n",
-    "  \n\n\n  ",
-    "\n",
-    "  \n\n   \n  \n",
-    "measurement",
-    "measurement ",
-    "measurement,tag",
-    "measurement field",
-    "measurement,tag field",
-    "measurement,tag field ${timestamp}",
-    "measurement,tag=",
-    "measurement,tag=tag1",
-    "measurement,tag =",
-    "measurement field=",
-    "measurement field= ",
-    "measurement field = ",
-    "measurement, tag = field = ",
-    "measurement, tag = field = ",
-    "measurement, tag = tag_val field = field_val",
-    "measurement, tag = tag_val field = field_val ${timestamp}",
-    "measurement,= = ${timestamp}",
-    "measurement,t=a, f=a, ${timestamp}",
-    "measurement,t=a,t1=b, f=a,f1=b, ${timestamp}",
-    "measurement,t=a,t1=b, f=a,f1=b,",
-    "measurement,t=a, t1=b, f=a,f1=b,",
-    "measurement,t=a,,t1=b, f=a,f1=b,",
-    "measurement,t=a,,t1=b f=a,,f1=b",
-    "measurement,t=a,,t1=b f=a,f1=b ${timestamp}",
-    "measurement, f=a,f1=b",
-    "measurement, f=a,f1=b ${timestamp}",
-    "measurement,, f=a,f1=b ${timestamp}",
-    "measurement,, f=a,f1=b",
-    "measurement,, f=a,f1=b,, ${timestamp}",
-    "measurement f=a,f1=b,, ${timestamp}",
-    "measurement,t=a f=a,f1=b,, ${timestamp}",
-    "measurement,t=a f=a,f1=b,, ",
-    "measurement,t=a f=a,f1=b,,",
-    "measurement, t=a  f=a,f1=b",
-    "measurement,t=a f=a, f1=b",
-    "measurement,t=a f=a, f1=b ${timestamp}",
-    "measurement, t=a  f=a, f1=b ${timestamp}",
-    "measurement,t= a f=a,f1=b ${timestamp}",
-    "measurement,t= a f=a,f1 =b ${timestamp}",
-    "measurement, t = a f = a,f1 = b ${timestamp}",
-    "measurement,t=a f=a,f1=b \n ${timestamp}",
-    "measurement,t=a \n f=a,f1=b \n ${timestamp}",
-    "measurement,t=a \n f=a,f1=b \n ",
-    "\n measurement,t=a \n f=a,f1=b \n ${timestamp}",
-    "\n measurement,t=a \n f=a,f1=b \n",
-    %% not escaped backslash in a quoted field value is invalid
-    "measurement,tag=1 field=\"val\\1\""
-]).
-
--define(VALID_LINE_PARSED_PAIRS, [
-    {"m1,tag=tag1 field=field1 ${timestamp1}", #{
-        measurement => "m1",
-        tags => [{"tag", "tag1"}],
-        fields => [{"field", "field1"}],
-        timestamp => "${timestamp1}"
-    }},
-    {"m2,tag=tag2 field=field2", #{
-        measurement => "m2",
-        tags => [{"tag", "tag2"}],
-        fields => [{"field", "field2"}],
-        timestamp => undefined
-    }},
-    {"m3 field=field3 ${timestamp3}", #{
-        measurement => "m3",
-        tags => [],
-        fields => [{"field", "field3"}],
-        timestamp => "${timestamp3}"
-    }},
-    {"m4 field=field4", #{
-        measurement => "m4",
-        tags => [],
-        fields => [{"field", "field4"}],
-        timestamp => undefined
-    }},
-    {"m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5}",
-        #{
-            measurement => "m5",
-            tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
-            fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
-            timestamp => "${timestamp5}"
-        }},
-    {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b", #{
-        measurement => "m6",
-        tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
-        fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
-        timestamp => undefined
-    }},
-    {"m7,tag=tag7,tag_a=\"tag7a\",tag_b=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\"",
-        #{
-            measurement => "m7",
-            tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}],
-            fields => [
-                {"field", {quoted, "field7"}},
-                {"field_a", "field7a"},
-                {"field_b", {quoted, "field7b"}}
-            ],
-            timestamp => undefined
-        }},
-    {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}",
-        #{
-            measurement => "m8",
-            tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
-            fields => [
-                {"field", {quoted, "field8"}},
-                {"field_a", "field8a"},
-                {"field_b", {quoted, "field8b"}}
-            ],
-            timestamp => "${timestamp8}"
-        }},
-    {
-        "m8a,tag=tag8,tag_a=\"${tag8a}\",tag_b=tag8b field=\"${field8}\","
-        "field_a=field8a,field_b=\"${field8b}\" ${timestamp8}",
-        #{
-            measurement => "m8a",
-            tags => [{"tag", "tag8"}, {"tag_a", "\"${tag8a}\""}, {"tag_b", "tag8b"}],
-            fields => [
-                {"field", {quoted, "${field8}"}},
-                {"field_a", "field8a"},
-                {"field_b", {quoted, "${field8b}"}}
-            ],
-            timestamp => "${timestamp8}"
-        }
-    },
-    {"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
-        #{
-            measurement => "m9",
-            tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
-            fields => [
-                {"field", {quoted, "field9"}}, {"field_a", "field9a"}, {"field_b", {quoted, ""}}
-            ],
-            timestamp => "${timestamp9}"
-        }},
-    {"m10 field=\"\" ${timestamp10}", #{
-        measurement => "m10",
-        tags => [],
-        fields => [{"field", {quoted, ""}}],
-        timestamp => "${timestamp10}"
-    }}
-]).
-
--define(VALID_LINE_EXTRA_SPACES_PARSED_PAIRS, [
-    {"\n  m1,tag=tag1  field=field1  ${timestamp1} \n", #{
-        measurement => "m1",
-        tags => [{"tag", "tag1"}],
-        fields => [{"field", "field1"}],
-        timestamp => "${timestamp1}"
-    }},
-    {"  m2,tag=tag2  field=field2  ", #{
-        measurement => "m2",
-        tags => [{"tag", "tag2"}],
-        fields => [{"field", "field2"}],
-        timestamp => undefined
-    }},
-    {" m3  field=field3   ${timestamp3}  ", #{
-        measurement => "m3",
-        tags => [],
-        fields => [{"field", "field3"}],
-        timestamp => "${timestamp3}"
-    }},
-    {" \n m4  field=field4\n ", #{
-        measurement => "m4",
-        tags => [],
-        fields => [{"field", "field4"}],
-        timestamp => undefined
-    }},
-    {" \n m5,tag=tag5,tag_a=tag5a,tag_b=tag5b   field=field5,field_a=field5a,field_b=field5b    ${timestamp5}  \n",
-        #{
-            measurement => "m5",
-            tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
-            fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
-            timestamp => "${timestamp5}"
-        }},
-    {" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b  field=field6,field_a=field6a,field_b=field6b\n  ", #{
-        measurement => "m6",
-        tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
-        fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
-        timestamp => undefined
-    }}
-]).
-
--define(VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS, [
-    {"m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1}", #{
-        measurement => "m =1,",
-        tags => [{",tag =", "=tag 1,"}],
-        fields => [{",fie ld ", " field,1"}],
-        timestamp => "${timestamp1}"
-    }},
-    {"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{
-        measurement => "m2",
-        tags => [{"tag", "tag2"}],
-        fields => [{"field", {quoted, "field \"2\",\n"}}],
-        timestamp => undefined
-    }},
-    {"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{
-        measurement => "m 3",
-        tags => [],
-        fields => [{"field", {quoted, "field3"}}],
-        timestamp => "${payload.timestamp 3}"
-    }},
-    {"m4 field=\"\\\"field\\\\4\\\"\"", #{
-        measurement => "m4",
-        tags => [],
-        fields => [{"field", {quoted, "\"field\\4\""}}],
-        timestamp => undefined
-    }},
-    {
-        "m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
-        "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}",
-        #{
-            measurement => "m5,mA",
-            tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
-            fields => [
-                {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
-            ],
-            timestamp => "${timestamp5}"
-        }
-    },
-    {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"",
-        #{
-            measurement => "m6",
-            tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
-            fields => [
-                {"field", {quoted, "field6"}},
-                {"field_a", {quoted, "field6a"}},
-                {"field_b", {quoted, "field6b"}}
-            ],
-            timestamp => undefined
-        }},
-    {
-        "\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\","
-        "field_a=field7a,field_b=\"field7b\\\\\n\"",
-        #{
-            measurement => "  m7  ",
-            tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
-            fields => [
-                {"field", {quoted, "field7"}},
-                {"field_a", "field7a"},
-                {"field_b", {quoted, "field7b\\\n"}}
-            ],
-            timestamp => undefined
-        }
-    },
-    {
-        "m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,"
-        "field_b=\"\\\"field\\\" = 8b\" ${timestamp8}",
-        #{
-            measurement => "m8",
-            tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
-            fields => [
-                {"field", {quoted, "field8"}},
-                {"field_a", "field8a"},
-                {"field_b", {quoted, "\"field\" = 8b"}}
-            ],
-            timestamp => "${timestamp8}"
-        }
-    },
-    {"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
-        #{
-            measurement => "m\\9",
-            tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
-            fields => [
-                {"field=field", {quoted, "field9"}},
-                {"field_a", "field9a"},
-                {"field_b", {quoted, ""}}
-            ],
-            timestamp => "${timestamp9}"
-        }},
-    {"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{
-        measurement => "m,10",
-        tags => [],
-        %% backslash should not be un-escaped in tag key
-        fields => [{"\"field\\\\\"", {quoted, ""}}],
-        timestamp => "${timestamp10}"
-    }}
-]).
-
--define(VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS, [
-    {" \n m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\,   \\,fie\\ ld\\ =\\ field\\,1  ${timestamp1}  ", #{
-        measurement => "m =1,",
-        tags => [{",tag =", "=tag 1,"}],
-        fields => [{",fie ld ", " field,1"}],
-        timestamp => "${timestamp1}"
-    }},
-    {" m2,tag=tag2   field=\"field \\\"2\\\",\n\"  ", #{
-        measurement => "m2",
-        tags => [{"tag", "tag2"}],
-        fields => [{"field", {quoted, "field \"2\",\n"}}],
-        timestamp => undefined
-    }},
-    {"  m\\ 3   field=\"field3\"   ${payload.timestamp\\ 3}  ", #{
-        measurement => "m 3",
-        tags => [],
-        fields => [{"field", {quoted, "field3"}}],
-        timestamp => "${payload.timestamp 3}"
-    }},
-    {"   m4       field=\"\\\"field\\\\4\\\"\"    ", #{
-        measurement => "m4",
-        tags => [],
-        fields => [{"field", {quoted, "\"field\\4\""}}],
-        timestamp => undefined
-    }},
-    {
-        " m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b   \\ field\\ =field5,"
-        "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b   ${timestamp5}    ",
-        #{
-            measurement => "m5,mA",
-            tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
-            fields => [
-                {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
-            ],
-            timestamp => "${timestamp5}"
-        }
-    },
-    {"  m6,tag=tag6,tag_a=tag6a,tag_b=tag6b   field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"  ",
-        #{
-            measurement => "m6",
-            tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
-            fields => [
-                {"field", {quoted, "field6"}},
-                {"field_a", {quoted, "field6a"}},
-                {"field_b", {quoted, "field6b"}}
-            ],
-            timestamp => undefined
-        }}
-]).
-
-invalid_write_syntax_line_test_() ->
-    [?_assertThrow(_, to_influx_lines(L)) || L <- ?INVALID_LINES].
-
-invalid_write_syntax_multiline_test_() ->
-    LinesList = [
-        join("\n", ?INVALID_LINES),
-        join("\n\n\n", ?INVALID_LINES),
-        join("\n\n", lists:reverse(?INVALID_LINES))
-    ],
-    [?_assertThrow(_, to_influx_lines(Lines)) || Lines <- LinesList].
-
-valid_write_syntax_test_() ->
-    test_pairs(?VALID_LINE_PARSED_PAIRS).
-
-valid_write_syntax_with_extra_spaces_test_() ->
-    test_pairs(?VALID_LINE_EXTRA_SPACES_PARSED_PAIRS).
-
-valid_write_syntax_escaped_chars_test_() ->
-    test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS).
-
-valid_write_syntax_escaped_chars_with_extra_spaces_test_() ->
-    test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS).
-
-test_pairs(PairsList) ->
-    {Lines, AllExpected} = lists:unzip(PairsList),
-    JoinedLines = join("\n", Lines),
-    JoinedLines1 = join("\n\n\n", Lines),
-    JoinedLines2 = join("\n\n", lists:reverse(Lines)),
-    SingleLineTests =
-        [
-            ?_assertEqual([Expected], to_influx_lines(Line))
-         || {Line, Expected} <- PairsList
-        ],
-    JoinedLinesTests =
-        [
-            ?_assertEqual(AllExpected, to_influx_lines(JoinedLines)),
-            ?_assertEqual(AllExpected, to_influx_lines(JoinedLines1)),
-            ?_assertEqual(lists:reverse(AllExpected), to_influx_lines(JoinedLines2))
-        ],
-    SingleLineTests ++ JoinedLinesTests.
-
-join(Sep, LinesList) ->
-    lists:flatten(lists:join(Sep, LinesList)).
-
-to_influx_lines(RawLines) ->
-    OldLevel = emqx_logger:get_primary_log_level(),
-    try
-        %% mute error logs from this call
-        emqx_logger:set_primary_log_level(none),
-        emqx_bridge_influxdb:to_influx_lines(RawLines)
-    after
-        emqx_logger:set_primary_log_level(OldLevel)
-    end.

+ 16 - 12
scripts/ct/run.sh

@@ -247,18 +247,22 @@ for dep in ${CT_DEPS}; do
         otel)
             FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' )
             ;;
-	elasticsearch)
-	    FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' )
-	    ;;
-	azurite)
-	    FILES+=( '.ci/docker-compose-file/docker-compose-azurite.yaml' )
-	    ;;
-	couchbase)
-	    FILES+=( '.ci/docker-compose-file/docker-compose-couchbase.yaml' )
-	    ;;
-	kdc)
-	    FILES+=( '.ci/docker-compose-file/docker-compose-kdc.yaml' )
-	    ;;
+	    elasticsearch)
+	        FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' )
+	        ;;
+	    azurite)
+	        FILES+=( '.ci/docker-compose-file/docker-compose-azurite.yaml' )
+	        ;;
+	    couchbase)
+	        FILES+=( '.ci/docker-compose-file/docker-compose-couchbase.yaml' )
+	        ;;
+	    kdc)
+	        FILES+=( '.ci/docker-compose-file/docker-compose-kdc.yaml' )
+	        ;;
+        datalayers)
+            FILES+=( '.ci/docker-compose-file/docker-compose-datalayers-tcp.yaml'
+                     '.ci/docker-compose-file/docker-compose-datalayers-tls.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1