Kaynağa Gözat

perf(buffer_worker): avoid calling `ets:info/2`

(Almost?) fixes https://emqx.atlassian.net/browse/EMQX-9637

During the course of performance tests comparing the performance of
e5.0.3 and e4.4.16 regarding the webhook bridge in sync mode, we
observed that the throughput in e5.0.3 (sync) was much lower than in
e4.4.16: ~ 9 k msgs / s vs. ~ 50 k msgs / s, respectively.

Analyzing `observer_cli` output, we noticed that a lot of the time
both buffer workers and ehttpc processes was spent in `ets:info/2`.
That function was called to check the size of the inflight table when
updating metrics and checking if the inflight table was full.  Other
uses of `ets:info/2` were contained inside the arguments to some
`?tp/2` macro usages (https://github.com/kafka4beam/snabbkaffe/pull/60).

By using a specific record to track the size of the table, we managed
to improve the bridge performance to ~ 45 k msgs / s in sync mode.
Thales Macedo Garitezi 2 yıl önce
ebeveyn
işleme
8aa7c014e7

+ 1 - 1
apps/emqx/rebar.config

@@ -33,7 +33,7 @@
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
-    {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}}
+    {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
 ]}.
 
 {plugins, [{rebar3_proper, "0.12.1"}, rebar3_path_deps]}.

+ 11 - 12
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -482,14 +482,16 @@ flush(Data0) ->
     Data1 = cancel_flush_timer(Data0),
     CurrentCount = queue_count(Q0),
     IsFull = is_inflight_full(InflightTID),
-    ?tp(buffer_worker_flush, #{
+    ?tp_ignore_side_effects_in_prod(buffer_worker_flush, #{
         queued => CurrentCount,
         is_inflight_full => IsFull,
         inflight => inflight_count(InflightTID)
     }),
     case {CurrentCount, IsFull} of
         {0, _} ->
-            ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}),
+            ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
+                inflight => inflight_count(InflightTID)
+            }),
             {keep_state, Data1};
         {_, true} ->
             ?tp(buffer_worker_flush_but_inflight_full, #{}),
@@ -620,7 +622,7 @@ do_flush(
                     }),
                     flush_worker(self());
                 false ->
-                    ?tp(buffer_worker_queue_drained, #{
+                    ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
                         inflight => inflight_count(InflightTID)
                     }),
                     ok
@@ -701,7 +703,7 @@ do_flush(#{queue := Q1} = Data0, #{
             Data2 =
                 case {CurrentCount > 0, CurrentCount >= BatchSize} of
                     {false, _} ->
-                        ?tp(buffer_worker_queue_drained, #{
+                        ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
                             inflight => inflight_count(InflightTID)
                         }),
                         Data1;
@@ -1279,13 +1281,10 @@ append_queue(Id, Index, Q, Queries) ->
 %% the inflight queue for async query
 -define(MAX_SIZE_REF, max_size).
 -define(SIZE_REF, size).
+-define(BATCH_COUNT_REF, batch_count).
 -define(INITIAL_TIME_REF, initial_time).
 -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
 
-%% NOTE
-%% There are 4 metadata rows in an inflight table, keyed by atoms declared above. ☝
--define(INFLIGHT_META_ROWS, 4).
-
 inflight_new(InfltWinSZ, Id, Index) ->
     TableId = ets:new(
         emqx_resource_buffer_worker_inflight_tab,
@@ -1295,6 +1294,7 @@ inflight_new(InfltWinSZ, Id, Index) ->
     %% we use this counter because we might deal with batches as
     %% elements.
     inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
+    inflight_append(TableId, {?BATCH_COUNT_REF, 0}, Id, Index),
     inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
     inflight_append(
         TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index
@@ -1344,10 +1344,7 @@ is_inflight_full(InflightTID) ->
     Size >= MaxSize.
 
 inflight_count(InflightTID) ->
-    case ets:info(InflightTID, size) of
-        undefined -> 0;
-        Size -> max(0, Size - ?INFLIGHT_META_ROWS)
-    end.
+    emqx_utils_ets:lookup_value(InflightTID, ?BATCH_COUNT_REF, 0).
 
 inflight_num_msgs(InflightTID) ->
     [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
@@ -1476,12 +1473,14 @@ update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
 
 inc_inflight(InflightTID, Count) ->
     _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
+    _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
     ok.
 
 dec_inflight(_InflightTID, 0) ->
     ok;
 dec_inflight(InflightTID, Count) when Count > 0 ->
     _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
+    _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
     ok.
 
 %%==============================================================================

+ 4 - 9
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -2327,7 +2327,7 @@ t_expiration_retry(_Config) ->
             resume_interval => 300
         }
     ),
-    do_t_expiration_retry(single).
+    do_t_expiration_retry().
 
 t_expiration_retry_batch(_Config) ->
     emqx_connector_demo:set_callback_mode(always_sync),
@@ -2344,9 +2344,9 @@ t_expiration_retry_batch(_Config) ->
             resume_interval => 300
         }
     ),
-    do_t_expiration_retry(batch).
+    do_t_expiration_retry().
 
-do_t_expiration_retry(IsBatch) ->
+do_t_expiration_retry() ->
     ResumeInterval = 300,
     ?check_trace(
         begin
@@ -2399,15 +2399,10 @@ do_t_expiration_retry(IsBatch) ->
                     ResumeInterval * 10
                 ),
 
-            SuccessEventKind =
-                case IsBatch of
-                    batch -> buffer_worker_retry_inflight_succeeded;
-                    single -> buffer_worker_flush_ack
-                end,
             {ok, {ok, _}} =
                 ?wait_async_action(
                     emqx_resource:simple_sync_query(?ID, resume),
-                    #{?snk_kind := SuccessEventKind},
+                    #{?snk_kind := buffer_worker_retry_inflight_succeeded},
                     ResumeInterval * 5
                 ),
 

+ 2 - 0
changes/ce/perf-10573.en.md

@@ -0,0 +1,2 @@
+Improved performance of Webhook bridge when using synchronous query mode.
+This also should improve the performance of other bridges when they are configured with no batching.

+ 1 - 1
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl

@@ -524,7 +524,7 @@ t_write_failure(Config) ->
                             send_message(Config, SentData)
                     end,
                     #{?snk_kind := buffer_worker_flush_nack},
-                    1_000
+                    10_000
                 )
         end),
         fun(Trace0) ->

+ 1 - 1
mix.exs

@@ -71,7 +71,7 @@ defmodule EMQXUmbrella.MixProject do
       {:telemetry, "1.1.0"},
       # in conflict by emqtt and hocon
       {:getopt, "1.0.2", override: true},
-      {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.7", override: true},
+      {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
       {:hocon, github: "emqx/hocon", tag: "0.38.2", override: true},
       {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
       {:esasl, github: "emqx/esasl", tag: "0.2.0"},

+ 1 - 1
rebar.config

@@ -74,7 +74,7 @@
     , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
     , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
     , {getopt, "1.0.2"}
-    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}}
+    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.38.2"}}}
     , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}