Просмотр исходного кода

feat(bridge): support async mode resource options

JianBo He 3 лет назад
Родитель
Сommit
0aa10702db

+ 8 - 0
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -20,6 +20,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -import(hoconsc, [mk/2, array/1, enum/1]).
 
@@ -237,6 +238,13 @@ mqtt_main_example() ->
         keepalive => <<"300s">>,
         retry_interval => <<"15s">>,
         max_inflight => 100,
+        resource_opts => #{
+            health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+            auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
+            query_mode => sync,
+            enable_queue => false,
+            max_queue_bytes => ?DEFAULT_QUEUE_SIZE
+        },
         ssl => #{
             enable => false
         }

+ 24 - 21
apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl

@@ -12,16 +12,29 @@
 namespace() -> "bridge_mqtt".
 
 roots() -> [].
+
 fields("config") ->
+    %% enable
     emqx_bridge_schema:common_bridge_fields() ++
+        [
+            {resource_opts,
+                mk(
+                    ref(?MODULE, "creation_opts"),
+                    #{
+                        required => false,
+                        default => #{},
+                        desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+                    }
+                )}
+        ] ++
         emqx_connector_mqtt_schema:fields("config");
+fields("creation_opts") ->
+    Opts = emqx_resource_schema:fields("creation_opts"),
+    [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
 fields("post") ->
-    [
-        type_field(),
-        name_field()
-    ] ++ emqx_connector_mqtt_schema:fields("config");
+    [type_field(), name_field() | fields("config")];
 fields("put") ->
-    emqx_connector_mqtt_schema:fields("config");
+    fields("config");
 fields("get") ->
     emqx_bridge_schema:metrics_status_fields() ++ fields("config").
 
@@ -31,22 +44,12 @@ desc(_) ->
     undefined.
 
 %%======================================================================================
+%% internal
+is_hidden_opts(Field) ->
+    lists:member(Field, [enable_batch, batch_size, batch_time]).
+
 type_field() ->
-    {type,
-        mk(
-            mqtt,
-            #{
-                required => true,
-                desc => ?DESC("desc_type")
-            }
-        )}.
+    {type, mk(mqtt, #{required => true, desc => ?DESC("desc_type")})}.
 
 name_field() ->
-    {name,
-        mk(
-            binary(),
-            #{
-                required => true,
-                desc => ?DESC("desc_name")
-            }
-        )}.
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

+ 1 - 1
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -141,7 +141,7 @@ on_message_received(Msg, HookPoint, ResId) ->
     emqx:run_hook(HookPoint, [Msg]).
 
 %% ===================================================================
-callback_mode() -> always_sync.
+callback_mode() -> async_if_possible.
 
 on_start(InstId, Conf) ->
     InstanceId = binary_to_atom(InstId, utf8),

+ 1 - 1
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl

@@ -138,7 +138,7 @@ send(#{client_pid := ClientPid}, Msg) ->
     emqtt:publish(ClientPid, Msg).
 
 send_async(#{client_pid := ClientPid}, Msg, Callback) ->
-    emqtt:publish_async(ClientPid, Msg, Callback).
+    emqtt:publish_async(ClientPid, Msg, infinity, Callback).
 
 handle_publish(Msg, undefined, _Opts) ->
     ?SLOG(error, #{

+ 2 - 2
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl

@@ -306,8 +306,8 @@ connected({call, From}, {send_to_remote, Msg}, State) ->
             {keep_state_and_data, [[reply, From, {error, Reason}]]}
     end;
 connected(cast, {send_to_remote_async, Msg, Callback}, State) ->
-    {_, NewState} = do_send_async(State, Msg, Callback),
-    {keep_state, NewState};
+    _ = do_send_async(State, Msg, Callback),
+    {keep_state, State};
 connected(
     info,
     {disconnected, Conn, Reason},