Procházet zdrojové kódy

fix(iotdb): return error if set up a Thrift driver in async mode

firest před 1 rokem
rodič
revize
d45b324d38

+ 9 - 0
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -653,6 +653,15 @@ on_add_channel(
             Channels2 = Channels#{ChannelId => Channel},
             {ok, OldState#{channels := Channels2}}
     end;
+on_add_channel(
+    _InstanceId,
+    #{driver := thrift},
+    _ChannelId,
+    #{
+        resource_opts := #{query_mode := async}
+    }
+) ->
+    {error, <<"Thrift does not support async mode">>};
 on_add_channel(
     _InstanceId,
     #{driver := thrift, channels := Channels} = OldState,

+ 22 - 5
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -158,7 +158,7 @@ init_per_testcase(TestCase, Config0) ->
     >>,
     {_ConfigString, ConnectorConfig} = connector_config(TestCase, Name, Config0),
 
-    {_, ActionConfig} = action_config(Name, Config0),
+    {_, ActionConfig} = action_config(TestCase, Name, Config0),
     Config = [
         {connector_type, Type},
         {connector_name, Name},
@@ -196,6 +196,7 @@ bridge_config(TestCase, Config) ->
         (atom_to_binary(TestCase))/binary, UniqueNum/binary
     >>,
     ServerURL = iotdb_server_url(Host, Port),
+    QueryMode = query_mode(TestCase),
     ConfigString =
         io_lib:format(
             "bridges.~s.~s {\n"
@@ -210,7 +211,7 @@ bridge_config(TestCase, Config) ->
             "  resource_opts = {\n"
             "     health_check_interval = \"1s\"\n"
             "     request_ttl = 30s\n"
-            "     query_mode = \"async\"\n"
+            "     query_mode = \"~s\"\n"
             "     worker_pool_size = 1\n"
             "  }\n"
             "}\n",
@@ -218,7 +219,8 @@ bridge_config(TestCase, Config) ->
                 Type,
                 Name,
                 ServerURL,
-                Version
+                Version,
+                QueryMode
             ]
         ),
     {ok, InnerConfigMap} = hocon:binary(ConfigString),
@@ -313,8 +315,9 @@ is_error_check(Reason) ->
         ?assertEqual({error, Reason}, Result)
     end.
 
-action_config(Name, Config) ->
+action_config(TestCase, Name, Config) ->
     Type = ?config(bridge_type, Config),
+    QueryMode = query_mode(TestCase),
     ConfigString =
         io_lib:format(
             "actions.~s.~s {\n"
@@ -323,11 +326,15 @@ action_config(Name, Config) ->
             "  parameters = {\n"
             "     data = []\n"
             "  }\n"
+            "  resource_opts = {\n"
+            "     query_mode = \"~s\"\n"
+            "  }\n"
             "}\n",
             [
                 Type,
                 Name,
-                Name
+                Name,
+                QueryMode
             ]
         ),
     ct:pal("ActionConfig:~ts~n", [ConfigString]),
@@ -786,3 +793,13 @@ make_thrift_server(_, Config) ->
     Host = ?config(bridge_host, Config),
     Port = ?config(bridge_port, Config),
     lists:flatten(io_lib:format("~s:~p", [Host, Port])).
+
+query_mode(TestCase) ->
+    Name = erlang:atom_to_list(TestCase),
+    Tokens = string:tokens(Name, "_"),
+    case lists:member("async", Tokens) of
+        true ->
+            async;
+        _ ->
+            sync
+    end.

+ 3 - 0
changes/ee/fix-14125.en.md

@@ -0,0 +1,3 @@
+For IoTDB, since the Thrift never supported the `async` mode, now
+An error log will be printed if a Thrift driver is set up in `async` mode.
+