Просмотр исходного кода

fix: rename queue_max_bytes -> max_queue_bytes

Shawn 3 лет назад
Родитель
Сommit
9e50866cd0

+ 1 - 1
apps/emqx/src/emqx_metrics_worker.erl

@@ -173,7 +173,7 @@ get_metrics(Name, Id) ->
 inc(Name, Id, Metric) ->
     inc(Name, Id, Metric, 1).
 
--spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok.
+-spec inc(handler_name(), metric_id(), atom(), integer()) -> ok.
 inc(Name, Id, Metric, Val) ->
     counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val).
 

+ 1 - 1
apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf

@@ -143,7 +143,7 @@ emqx_resource_schema {
     }
   }
 
-  queue_max_bytes {
+  max_queue_bytes {
     desc {
       en: """Maximum queue storage."""
       zh: """消息队列的最大长度。"""

+ 6 - 3
apps/emqx_resource/include/emqx_resource.hrl

@@ -68,7 +68,7 @@
     batch_size => pos_integer(),
     batch_time => pos_integer(),
     enable_queue => boolean(),
-    queue_max_bytes => pos_integer(),
+    max_queue_bytes => pos_integer(),
     query_mode => query_mode(),
     resume_interval => pos_integer(),
     async_inflight_window => pos_integer()
@@ -81,8 +81,11 @@
 
 -define(WORKER_POOL_SIZE, 16).
 
--define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
--define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>).
+-define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024).
+-define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>).
+
+-define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024 * 1024).
+-define(DEFAULT_QUEUE_SIZE_RAW, <<"100GB">>).
 
 %% count
 -define(DEFAULT_BATCH_SIZE, 100).

+ 6 - 2
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -577,8 +577,12 @@ maybe_alarm(_Status, ResId) ->
     ).
 
 maybe_resume_resource_workers(connected) ->
-    {_, Pid, _, _} = supervisor:which_children(emqx_resource_worker_sup),
-    emqx_resource_worker:resume(Pid);
+    lists:foreach(
+        fun({_, Pid, _, _}) ->
+            emqx_resource_worker:resume(Pid)
+        end,
+        supervisor:which_children(emqx_resource_worker_sup)
+    );
 maybe_resume_resource_workers(_) ->
     ok.
 

+ 21 - 15
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -123,13 +123,15 @@ init({Id, Index, Opts}) ->
             true ->
                 replayq:open(#{
                     dir => disk_queue_dir(Id, Index),
-                    seg_bytes => maps:get(queue_max_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
+                    seg_bytes => maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
+                    max_total_bytes => maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
                     sizer => fun ?MODULE:estimate_size/1,
                     marshaller => fun ?MODULE:queue_item_marshaller/1
                 });
             false ->
                 undefined
         end,
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', replayq:count(Queue)),
     ok = inflight_new(Name),
     St = #{
         id => Id,
@@ -323,23 +325,27 @@ flush(
     end.
 
 maybe_append_queue(Id, undefined, _Items) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
     emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'),
     undefined;
 maybe_append_queue(Id, Q, Items) ->
-    case replayq:overflow(Q) of
-        Overflow when Overflow =< 0 ->
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'),
-            replayq:append(Q, Items);
-        Overflow ->
-            PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
-            {Q1, QAckRef, Items} = replayq:pop(Q, PopOpts),
-            ok = replayq:ack(Q1, QAckRef),
-            Dropped = length(Items),
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped),
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
-            ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
-            Q1
-    end.
+    Q2 =
+        case replayq:overflow(Q) of
+            Overflow when Overflow =< 0 ->
+                Q;
+            Overflow ->
+                PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
+                {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
+                ok = replayq:ack(Q1, QAckRef),
+                Dropped = length(Items2),
+                emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped),
+                emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
+                emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
+                ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
+                Q1
+        end,
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'),
+    replayq:append(Q2, Items).
 
 batch_reply_caller(Id, BatchResult, Batch) ->
     lists:foldl(

+ 6 - 6
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -53,7 +53,7 @@ fields("creation_opts") ->
         {batch_size, fun batch_size/1},
         {batch_time, fun batch_time/1},
         {enable_queue, fun enable_queue/1},
-        {max_queue_bytes, fun queue_max_bytes/1}
+        {max_queue_bytes, fun max_queue_bytes/1}
     ].
 
 worker_pool_size(type) -> pos_integer();
@@ -110,11 +110,11 @@ batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW;
 batch_time(required) -> false;
 batch_time(_) -> undefined.
 
-queue_max_bytes(type) -> emqx_schema:bytesize();
-queue_max_bytes(desc) -> ?DESC("queue_max_bytes");
-queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
-queue_max_bytes(required) -> false;
-queue_max_bytes(_) -> undefined.
+max_queue_bytes(type) -> emqx_schema:bytesize();
+max_queue_bytes(desc) -> ?DESC("max_queue_bytes");
+max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
+max_queue_bytes(required) -> false;
+max_queue_bytes(_) -> undefined.
 
 desc("creation_opts") ->
     ?DESC("creation_opts").

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -522,6 +522,6 @@ inc_action_metrics(R, RuleId) ->
 is_ok_result(ok) ->
     true;
 is_ok_result(R) when is_tuple(R) ->
-    ok = erlang:element(1, R);
+    ok == erlang:element(1, R);
 is_ok_result(ok) ->
     false.