ソースを参照

feat(iotdb): the Thrift driver supports multi-address

firest 1 年間 前
コミット
acbaf8ebe7

+ 1 - 1
apps/emqx_bridge_iotdb/rebar.config

@@ -10,7 +10,7 @@
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}},
     {emqx_bridge_http, {path, "../emqx_bridge_http"}},
-    {iotdb, {git, "https://github.com/emqx/iotdb-client-erl.git", {tag, "0.1.7"}}}
+    {iotdb, {git, "https://github.com/emqx/iotdb-client-erl.git", {tag, "0.1.8"}}}
 ]}.
 {plugins, [rebar3_path_deps]}.
 {project_plugins, [erlfmt]}.

+ 1 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_iotdb, [
     {description, "EMQX Enterprise Apache IoTDB Bridge"},
-    {vsn, "0.2.5"},
+    {vsn, "0.2.6"},
     {modules, [
         emqx_bridge_iotdb,
         emqx_bridge_iotdb_connector

+ 2 - 3
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -334,7 +334,7 @@ on_start(
                 2
         end,
 
-    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?THRIFT_HOST_OPTIONS),
+    Addresses = emqx_schema:parse_servers(Server, ?THRIFT_HOST_OPTIONS),
 
     DriverOpts = maps:merge(
         #{
@@ -357,8 +357,7 @@ on_start(
 
     IoTDBOpts = IoTDBOpts0#{
         version => Version,
-        host => Host,
-        port => Port,
+        addresses => Addresses,
         options => DriverOpts1
     },
 

+ 19 - 7
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -26,7 +26,7 @@ all() ->
     ].
 
 groups() ->
-    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    AllTCs = emqx_common_test_helpers:all(?MODULE) -- [t_thrift_auto_recon],
     Async = [
         t_async_device_id_missing,
         t_async_invalid_template,
@@ -39,7 +39,7 @@ groups() ->
         {iotdb110, AllTCs},
         {iotdb130, AllTCs},
         {legacy, AllTCs},
-        {thrift, AllTCs -- Async}
+        {thrift, (AllTCs -- Async) ++ [t_thrift_auto_recon]}
     ].
 
 init_per_suite(Config) ->
@@ -156,7 +156,7 @@ init_per_testcase(TestCase, Config0) ->
     Name = <<
         (atom_to_binary(TestCase))/binary, UniqueNum/binary
     >>,
-    {_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
+    {_ConfigString, ConnectorConfig} = connector_config(TestCase, Name, Config0),
 
     {_, ActionConfig} = action_config(Name, Config0),
     Config = [
@@ -333,7 +333,7 @@ action_config(Name, Config) ->
     ct:pal("ActionConfig:~ts~n", [ConfigString]),
     {ConfigString, parse_action_and_check(ConfigString, Type, Name)}.
 
-connector_config(Name, Config) ->
+connector_config(TestCase, Name, Config) ->
     Host = ?config(bridge_host, Config),
     Port = ?config(bridge_port, Config),
     Type = ?config(bridge_type, Config),
@@ -342,11 +342,12 @@ connector_config(Name, Config) ->
     ConfigString =
         case ?config(test_group, Config) of
             thrift ->
+                Server = make_thrift_server(TestCase, Config),
                 io_lib:format(
                     "connectors.~s.~s {\n"
                     "  enable = true\n"
                     "  driver = \"thrift\"\n"
-                    "  server = \"~s:~p\"\n"
+                    "  server = \"~s\"\n"
                     "  protocol_version = \"~p\"\n"
                     "  username = \"root\"\n"
                     "  password = \"root\"\n"
@@ -356,8 +357,7 @@ connector_config(Name, Config) ->
                     [
                         Type,
                         Name,
-                        Host,
-                        Port,
+                        Server,
                         Version
                     ]
                 );
@@ -770,7 +770,19 @@ t_sync_query_unmatched_type(Config) ->
         Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
     ).
 
+t_thrift_auto_recon(Config) ->
+    emqx_bridge_v2_testlib:t_on_get_status(Config).
+
 is_empty(null) -> true;
 is_empty([]) -> true;
 is_empty([[]]) -> true;
 is_empty(_) -> false.
+
+make_thrift_server(t_thrift_auto_recon, Config) ->
+    Host = ?config(bridge_host, Config),
+    Port = ?config(bridge_port, Config),
+    lists:flatten(io_lib:format("127.0.0.1:9999,~s:~p", [Host, Port]));
+make_thrift_server(_, Config) ->
+    Host = ?config(bridge_host, Config),
+    Port = ?config(bridge_port, Config),
+    lists:flatten(io_lib:format("~s:~p", [Host, Port])).

+ 4 - 0
changes/ee/feat-14044.en.md

@@ -0,0 +1,4 @@
+The IoTDB Thrift driver supports multiple addresses in the `server` field.
+
+If the current connection is broken, The driver will try to connect to the next address.
+