Преглед изворни кода

Merge pull request #9886 from zhongwencool/mongo-connection-default-async

fix: remove async mode from mongodb/redis/mysql/pgsql bridge
zhongwencool пре 3 година
родитељ
комит
ee852d8204

+ 11 - 0
apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf

@@ -89,6 +89,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
     }
   }
 
+  query_mode_sync_only {
+    desc {
+      en: """Query mode. Only support 'sync'."""
+      zh: """请求模式。目前只支持同步模式。"""
+    }
+    label {
+      en: """Query mode"""
+      zh: """请求模式"""
+    }
+  }
+
   request_timeout {
     desc {
       en: """Timeout for requests.  If <code>query_mode</code> is <code>sync</code>, calls to the resource will be blocked for this amount of time before timing out."""

+ 29 - 5
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -30,16 +30,25 @@ namespace() -> "resource_schema".
 
 roots() -> [].
 
+fields("resource_opts_sync_only") ->
+    [
+        {resource_opts,
+            mk(
+                ref(?MODULE, "creation_opts_sync_only"),
+                resource_opts_meta()
+            )}
+    ];
+fields("creation_opts_sync_only") ->
+    Fields0 = fields("creation_opts"),
+    Fields1 = lists:keydelete(async_inflight_window, 1, Fields0),
+    QueryMod = {query_mode, fun query_mode_sync_only/1},
+    lists:keyreplace(query_mode, 1, Fields1, QueryMod);
 fields("resource_opts") ->
     [
         {resource_opts,
             mk(
                 ref(?MODULE, "creation_opts"),
-                #{
-                    required => false,
-                    default => #{},
-                    desc => ?DESC(<<"resource_opts">>)
-                }
+                resource_opts_meta()
             )}
     ];
 fields("creation_opts") ->
@@ -59,6 +68,13 @@ fields("creation_opts") ->
         {max_queue_bytes, fun max_queue_bytes/1}
     ].
 
+resource_opts_meta() ->
+    #{
+        required => false,
+        default => #{},
+        desc => ?DESC(<<"resource_opts">>)
+    }.
+
 worker_pool_size(type) -> non_neg_integer();
 worker_pool_size(desc) -> ?DESC("worker_pool_size");
 worker_pool_size(default) -> ?WORKER_POOL_SIZE;
@@ -95,6 +111,12 @@ query_mode(default) -> async;
 query_mode(required) -> false;
 query_mode(_) -> undefined.
 
+query_mode_sync_only(type) -> enum([sync]);
+query_mode_sync_only(desc) -> ?DESC("query_mode_sync_only");
+query_mode_sync_only(default) -> sync;
+query_mode_sync_only(required) -> false;
+query_mode_sync_only(_) -> undefined.
+
 request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
 request_timeout(desc) -> ?DESC("request_timeout");
 request_timeout(default) -> <<"15s">>;
@@ -139,4 +161,6 @@ max_queue_bytes(required) -> false;
 max_queue_bytes(_) -> undefined.
 
 desc("creation_opts") ->
+    ?DESC("creation_opts");
+desc("creation_opts_sync_only") ->
     ?DESC("creation_opts").

+ 2 - 27
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl

@@ -39,7 +39,7 @@ fields("config") ->
         {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
         {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
         {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
-    ] ++ fields("resource_opts");
+    ] ++ emqx_resource_schema:fields("resource_opts_sync_only");
 fields(mongodb_rs) ->
     emqx_connector_mongo:fields(rs) ++ fields("config");
 fields(mongodb_sharded) ->
@@ -69,32 +69,7 @@ fields("get_sharded") ->
 fields("get_single") ->
     emqx_bridge_schema:status_fields() ++
         fields(mongodb_single) ++
-        type_and_name_fields(mongodb_single);
-fields("creation_opts") ->
-    lists:map(
-        fun
-            ({query_mode, _FieldSchema}) ->
-                {query_mode,
-                    mk(
-                        enum([sync, async]),
-                        #{
-                            desc => ?DESC(emqx_resource_schema, "query_mode"),
-                            default => sync
-                        }
-                    )};
-            (Field) ->
-                Field
-        end,
-        emqx_resource_schema:fields("creation_opts")
-    );
-fields("resource_opts") ->
-    [
-        {resource_opts,
-            mk(
-                ref(?MODULE, "creation_opts"),
-                #{default => #{}, desc => ?DESC(emqx_resource_schema, "resource_opts")}
-            )}
-    ].
+        type_and_name_fields(mongodb_single).
 
 conn_bridge_examples(Method) ->
     [

+ 1 - 6
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl

@@ -98,8 +98,7 @@ fields("config") ->
         (emqx_connector_mysql:fields(config) --
             emqx_connector_schema_lib:prepare_statement_fields());
 fields("creation_opts") ->
-    Opts = emqx_resource_schema:fields("creation_opts"),
-    [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
+    emqx_resource_schema:fields("creation_opts_sync_only");
 fields("post") ->
     [type_field(), name_field() | fields("config")];
 fields("put") ->
@@ -118,10 +117,6 @@ desc(_) ->
 
 %% -------------------------------------------------------------------------------------------------
 %% internal
-is_hidden_opts(Field) ->
-    lists:member(Field, [
-        async_inflight_window
-    ]).
 
 type_field() ->
     {type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}.

+ 1 - 7
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl

@@ -100,8 +100,7 @@ fields("config") ->
         (emqx_connector_pgsql:fields(config) --
             emqx_connector_schema_lib:prepare_statement_fields());
 fields("creation_opts") ->
-    Opts = emqx_resource_schema:fields("creation_opts"),
-    [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
+    emqx_resource_schema:fields("creation_opts_sync_only");
 fields("post") ->
     fields("post", pgsql);
 fields("put") ->
@@ -122,11 +121,6 @@ desc(_) ->
     undefined.
 
 %% -------------------------------------------------------------------------------------------------
-%% internal
-is_hidden_opts(Field) ->
-    lists:member(Field, [
-        async_inflight_window
-    ]).
 
 type_field(Type) ->
     {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.

+ 3 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl

@@ -181,10 +181,10 @@ resource_fields(Type) ->
 resource_creation_fields("redis_cluster") ->
     % TODO
     % Cluster bridge is currently incompatible with batching.
-    Fields = emqx_resource_schema:fields("creation_opts"),
-    lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]);
+    Fields = emqx_resource_schema:fields("creation_opts_sync_only"),
+    lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time, enable_batch]);
 resource_creation_fields(_) ->
-    emqx_resource_schema:fields("creation_opts").
+    emqx_resource_schema:fields("creation_opts_sync_only").
 
 desc("config") ->
     ?DESC("desc_config");

+ 2 - 2
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl

@@ -509,7 +509,7 @@ redis_connect_configs() ->
 toxiproxy_redis_bridge_config() ->
     Conf0 = ?REDIS_TOXYPROXY_CONNECT_CONFIG#{
         <<"resource_opts">> => #{
-            <<"query_mode">> => <<"async">>,
+            <<"query_mode">> => <<"sync">>,
             <<"worker_pool_size">> => <<"1">>,
             <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
             <<"health_check_interval">> => <<"1s">>,
@@ -537,7 +537,7 @@ resource_configs() ->
             <<"start_timeout">> => <<"15s">>
         },
         batch_on => #{
-            <<"query_mode">> => <<"async">>,
+            <<"query_mode">> => <<"sync">>,
             <<"worker_pool_size">> => <<"1">>,
             <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
             <<"start_timeout">> => <<"15s">>