Selaa lähdekoodia

feat(resource): resource batch/async/queue config schema

JimMoen 3 vuotta sitten
vanhempi
commit
22a4ca311c

+ 119 - 0
apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf

@@ -0,0 +1,119 @@
+emqx_resource_schema {
+  batch {
+    desc {
+      en: """
+Configuration of batch query.</br>
+Batch requests are made immediately when the number of requests reaches the `batch_size`, or also immediately when the number of requests is less than the batch request size but the maximum batch_time has been reached.
+"""
+      zh: """
+批量请求配置。</br>
+请求数达到批量请求大小时立刻进行批量请求,或当请求数不足批量请求数大小,但已经达到最大批量等待时间时也立即进行批量请求。
+"""
+    }
+    label {
+      en: """batch"""
+      zh: """批量请求"""
+    }
+  }
+
+  queue {
+    desc {
+      en: """Configuration of queue."""
+      zh: """请求队列配置"""
+    }
+    label {
+      en: """queue"""
+      zh: """请求队列"""
+    }
+  }
+
+  query_mode {
+    desc {
+      en: """Query mode. Optional 'sync/async', default 'sync'."""
+      zh: """请求模式。可选 '同步/异步',默认为'同步'模式。"""
+    }
+    label {
+      en: """query_mode"""
+      zh: """query_mode"""
+    }
+  }
+
+  enable_batch {
+    desc {
+      en: """Batch mode enabled."""
+      zh: """启用批量模式。"""
+    }
+    label {
+      en: """enable_batch"""
+      zh: """启用批量模式"""
+    }
+  }
+
+  enable_queue {
+    desc {
+      en: """Queue mode enabled."""
+      zh: """启用队列模式。"""
+    }
+    label {
+      en: """enable_queue"""
+      zh: """启用队列模式"""
+    }
+  }
+
+  resume_interval {
+    desc {
+      en: """Resume time interval when resource down."""
+      zh: """资源不可用时的重试时间"""
+    }
+    label {
+      en: """resume_interval"""
+      zh: """恢复时间"""
+    }
+  }
+
+  async_inflight_window {
+    desc {
+      en: """Async queyr inflight window."""
+      zh: """异步请求飞行队列窗口大小"""
+    }
+    label {
+      en: """async_inflight_window"""
+      zh: """异步请求飞行队列窗口"""
+    }
+  }
+
+  batch_size {
+    desc {
+      en: """Maximum batch count."""
+      zh: """批量请求大小"""
+    }
+    label {
+      en: """batch_size"""
+      zh: """批量请求大小"""
+    }
+  }
+
+  batch_time {
+    desc {
+      en: """Maximum batch waiting interval."""
+      zh: """最大批量请求等待时间。"""
+    }
+    label {
+      en: """batch_time"""
+      zh: """批量等待间隔"""
+    }
+  }
+
+  queue_max_bytes {
+    desc {
+      en: """Maximum queue storage size in bytes."""
+      zh: """消息队列的最大长度,以字节计。"""
+    }
+    label {
+      en: """queue_max_bytes"""
+      zh: """队列最大长度"""
+    }
+  }
+
+
+}

+ 12 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -60,5 +60,17 @@
     | {error, term()}
     | {resource_down, term()}.
 
+%% count
+-define(DEFAULT_BATCH_SIZE, 100).
+%% milliseconds
+-define(DEFAULT_BATCH_TIME, 10).
+
+%% bytes
+-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
+
+-define(DEFAULT_INFLIGHT, 100).
+
+-define(RESUME_INTERVAL, 15000).
+
 -define(TEST_ID_PREFIX, "_test_:").
 -define(RES_METRICS, resource_metrics).

+ 4 - 13
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -52,13 +52,6 @@
 
 -export([reply_after_query/6, batch_reply_after_query/6]).
 
--define(RESUME_INTERVAL, 15000).
-
-%% count
--define(DEFAULT_BATCH_SIZE, 100).
-%% milliseconds
--define(DEFAULT_BATCH_TIME, 10).
-
 -define(Q_ITEM(REQUEST), {q_item, REQUEST}).
 
 -define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}).
@@ -69,8 +62,6 @@
     {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}}
 ).
 -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).
--define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
--define(DEFAULT_INFLIGHT, 100).
 
 -type id() :: binary().
 -type query() :: {query, from(), request()}.
@@ -122,7 +113,7 @@ init({Id, Index, Opts}) ->
     Name = name(Id, Index),
     BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
     Queue =
-        case maps:get(queue_enabled, Opts, false) of
+        case maps:get(enable_queue, Opts, false) of
             true ->
                 replayq:open(#{
                     dir => disk_queue_dir(Id, Index),
@@ -144,7 +135,7 @@ init({Id, Index, Opts}) ->
         %%  if the resource worker is overloaded
         query_mode => maps:get(query_mode, Opts, sync),
         async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
-        batch_enabled => maps:get(batch_enabled, Opts, false),
+        enable_batch => maps:get(enable_batch, Opts, false),
         batch_size => BatchSize,
         batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
         queue => Queue,
@@ -270,14 +261,14 @@ drop_head(Q) ->
     ok = replayq:ack(Q1, AckRef),
     Q1.
 
-query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Left} = St0) ->
+query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left} = St0) ->
     Acc1 = [?QUERY(From, Request) | Acc],
     St = St0#{acc := Acc1, acc_left := Left - 1},
     case Left =< 1 of
         true -> flush(St);
         false -> {keep_state, ensure_flush_timer(St)}
     end;
-query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, query_mode := QM} = St) ->
+query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, query_mode := QM} = St) ->
     QueryOpts = #{
         inflight_name => maps:get(name, St),
         inflight_window => maps:get(async_inflight_window, St)

+ 99 - 0
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -0,0 +1,99 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_schema).
+
+-include("emqx_resource.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([namespace/0, roots/0, fields/1]).
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+
+namespace() -> "resource_schema".
+
+roots() -> [].
+
+fields('batch&async&queue') ->
+    [
+        {query_mode, fun query_mode/1},
+        {resume_interval, fun resume_interval/1},
+        {async_inflight_window, fun async_inflight_window/1},
+        {batch, mk(ref(?MODULE, batch), #{desc => ?DESC("batch")})},
+        {queue, mk(ref(?MODULE, queue), #{desc => ?DESC("queue")})}
+    ];
+fields(batch) ->
+    [
+        {enable_batch, fun enable_batch/1},
+        {batch_size, fun batch_size/1},
+        {batch_time, fun batch_time/1}
+    ];
+fields(queue) ->
+    [
+        {enable_queue, fun enable_queue/1},
+        {max_queue_bytes, fun queue_max_bytes/1}
+    ].
+
+query_mode(type) -> enum([sync, async]);
+query_mode(desc) -> ?DESC("query_mode");
+query_mode(default) -> sync;
+query_mode(required) -> false;
+query_mode(_) -> undefined.
+
+enable_batch(type) -> boolean();
+enable_batch(required) -> false;
+enable_batch(default) -> false;
+enable_batch(desc) -> ?DESC("enable_batch");
+enable_batch(_) -> undefined.
+
+enable_queue(type) -> boolean();
+enable_queue(required) -> false;
+enable_queue(default) -> false;
+enable_queue(desc) -> ?DESC("enable_queue");
+enable_queue(_) -> undefined.
+
+resume_interval(type) -> emqx_schema:duration_ms();
+resume_interval(desc) -> ?DESC("resume_interval");
+resume_interval(default) -> ?RESUME_INTERVAL;
+resume_interval(required) -> false;
+resume_interval(_) -> undefined.
+
+async_inflight_window(type) -> pos_integer();
+async_inflight_window(desc) -> ?DESC("async_inflight_window");
+async_inflight_window(default) -> ?DEFAULT_INFLIGHT;
+async_inflight_window(required) -> false;
+async_inflight_window(_) -> undefined.
+
+batch_size(type) -> pos_integer();
+batch_size(desc) -> ?DESC("batch_size");
+batch_size(default) -> ?DEFAULT_BATCH_SIZE;
+batch_size(required) -> false;
+batch_size(_) -> undefined.
+
+batch_time(type) -> emqx_schema:duration_ms();
+batch_time(desc) -> ?DESC("batch_time");
+batch_time(default) -> ?DEFAULT_BATCH_TIME;
+batch_time(required) -> false;
+batch_time(_) -> undefined.
+
+queue_max_bytes(type) -> emqx_schema:bytesize();
+queue_max_bytes(desc) -> ?DESC("queue_max_bytes");
+queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE;
+queue_max_bytes(required) -> false;
+queue_max_bytes(_) -> undefined.

+ 1 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -211,7 +211,7 @@ t_batch_query_counter(_) ->
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
         #{name => test_resource, register => true},
-        #{batch_enabled => true}
+        #{enable_batch => true}
     ),
 
     ?check_trace(