Переглянути джерело

fix(pulsar action): use `sync_timeout` for simple sync query timeout

Fixes https://emqx.atlassian.net/browse/EMQX-13215
Thales Macedo Garitezi 1 рік тому
батько
коміт
0891ad11bf

+ 6 - 1
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -702,7 +702,7 @@ do_query_with_enabled_config(
     ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeType),
     ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeType),
     ConnectorResId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
     ConnectorResId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
     QueryOpts = maps:merge(
     QueryOpts = maps:merge(
-        emqx_bridge:query_opts(Config),
+        query_opts(BridgeType, Config),
         QueryOpts0#{
         QueryOpts0#{
             connector_resource_id => ConnectorResId,
             connector_resource_id => ConnectorResId,
             query_mode => QueryMode
             query_mode => QueryMode
@@ -721,6 +721,11 @@ do_query_with_enabled_config(
 send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
 send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
     query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0).
     query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0).
 
 
+query_opts(ActionOrSourceType, Config) ->
+    ConnectorType = connector_type(ActionOrSourceType),
+    Mod = emqx_connector_resource:connector_to_resource_type(ConnectorType),
+    emqx_resource:get_query_opts(Mod, Config).
+
 -spec health_check(BridgeType :: term(), BridgeName :: term()) ->
 -spec health_check(BridgeType :: term(), BridgeName :: term()) ->
     #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
     #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
 health_check(BridgeType, BridgeName) ->
 health_check(BridgeType, BridgeName) ->

+ 1 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_pulsar, [
 {application, emqx_bridge_pulsar, [
     {description, "EMQX Pulsar Bridge"},
     {description, "EMQX Pulsar Bridge"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {registered, []},
     {applications, [
     {applications, [
         kernel,
         kernel,

+ 6 - 0
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl

@@ -13,6 +13,7 @@
     resource_type/0,
     resource_type/0,
     callback_mode/0,
     callback_mode/0,
     query_mode/1,
     query_mode/1,
+    query_opts/1,
     on_start/2,
     on_start/2,
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
@@ -65,6 +66,11 @@ query_mode(#{resource_opts := #{query_mode := sync}}) ->
 query_mode(_Config) ->
 query_mode(_Config) ->
     simple_async_internal_buffer.
     simple_async_internal_buffer.
 
 
+query_opts(#{resource_opts := #{query_mode := sync}, parameters := #{sync_timeout := Timeout}}) ->
+    #{timeout => Timeout};
+query_opts(_) ->
+    #{}.
+
 -spec on_start(resource_id(), config()) -> {ok, state()}.
 -spec on_start(resource_id(), config()) -> {ok, state()}.
 on_start(InstanceId, Config) ->
 on_start(InstanceId, Config) ->
     #{servers := Servers0, ssl := SSL} = Config,
     #{servers := Servers0, ssl := SSL} = Config,

+ 14 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -98,6 +98,7 @@
     get_callback_mode/1,
     get_callback_mode/1,
     get_resource_type/1,
     get_resource_type/1,
     get_callback_mode/2,
     get_callback_mode/2,
+    get_query_opts/2,
     %% start the instance
     %% start the instance
     call_start/3,
     call_start/3,
     %% verify if the resource is working normally
     %% verify if the resource is working normally
@@ -163,7 +164,8 @@
     on_get_channels/1,
     on_get_channels/1,
     query_mode/1,
     query_mode/1,
     on_format_query_result/1,
     on_format_query_result/1,
-    callback_mode/1
+    callback_mode/1,
+    query_opts/1
 ]).
 ]).
 
 
 %% when calling emqx_resource:start/1
 %% when calling emqx_resource:start/1
@@ -215,6 +217,8 @@
 
 
 -callback query_mode(Config :: term()) -> query_mode().
 -callback query_mode(Config :: term()) -> query_mode().
 
 
+-callback query_opts(Config :: term()) -> #{timeout => timeout()}.
+
 %% This callback handles the installation of a specified channel.
 %% This callback handles the installation of a specified channel.
 %%
 %%
 %% If the channel cannot be successfully installed, the callback shall
 %% If the channel cannot be successfully installed, the callback shall
@@ -516,6 +520,15 @@ get_callback_mode(Mod, State) ->
             undefined
             undefined
     end.
     end.
 
 
+-spec get_query_opts(module(), map()) -> #{timeout => timeout()}.
+get_query_opts(Mod, ActionOrSourceConfig) ->
+    case erlang:function_exported(Mod, query_opts, 1) of
+        true ->
+            Mod:query_opts(ActionOrSourceConfig);
+        false ->
+            emqx_bridge:query_opts(ActionOrSourceConfig)
+    end.
+
 -spec call_start(resource_id(), module(), resource_config()) ->
 -spec call_start(resource_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
     {ok, resource_state()} | {error, Reason :: term()}.
 call_start(ResId, Mod, Config) ->
 call_start(ResId, Mod, Config) ->