Преглед изворни кода

Merge pull request #10740 from thalesmg/perf-buffer-worker-periodic-metrics-v50

perf(buffer_worker): update metrics periodically rather than immediately (v5.0)
Thales Macedo Garitezi пре 2 година
родитељ
комит
1c4c7fad92

+ 15 - 11
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -100,17 +100,21 @@
     ?assertMetrics(Pat, true, BridgeID)
 ).
 -define(assertMetrics(Pat, Guard, BridgeID),
-    ?assertMatch(
-        #{
-            <<"metrics">> := Pat,
-            <<"node_metrics">> := [
-                #{
-                    <<"node">> := _,
-                    <<"metrics">> := Pat
-                }
-            ]
-        } when Guard,
-        request_bridge_metrics(BridgeID)
+    ?retry(
+        _Sleep = 300,
+        _Attempts0 = 20,
+        ?assertMatch(
+            #{
+                <<"metrics">> := Pat,
+                <<"node_metrics">> := [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> := Pat
+                    }
+                ]
+            } when Guard,
+            request_bridge_metrics(BridgeID)
+        )
     )
 ).
 

+ 0 - 0
apps/emqx_bridge_clickhouse/etc/emqx_bridge_clickhouse.conf


+ 11 - 46
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl

@@ -288,6 +288,7 @@ gcp_pubsub_config(Config) ->
             "  pipelining = ~b\n"
             "  resource_opts = {\n"
             "    request_timeout = 500ms\n"
+            "    metrics_flush_interval = 700ms\n"
             "    worker_pool_size = 1\n"
             "    query_mode = ~s\n"
             "    batch_size = ~b\n"
@@ -529,12 +530,14 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
     end.
 
 receive_all_events(EventName, Timeout) ->
-    receive_all_events(EventName, Timeout, []).
+    receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []).
 
-receive_all_events(EventName, Timeout, Acc) ->
+receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
+    lists:reverse(Acc);
+receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
     receive
         {telemetry, #{name := [_, _, EventName]} = Event} ->
-            receive_all_events(EventName, Timeout, [Event | Acc])
+            receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
     after Timeout ->
         lists:reverse(Acc)
     end.
@@ -557,8 +560,9 @@ wait_n_events(_TelemetryTable, _ResourceId, NEvents, _Timeout, _EventName) when
     ok;
 wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) ->
     receive
-        {telemetry, #{name := [_, _, EventName]}} ->
-            wait_n_events(TelemetryTable, ResourceId, NEvents - 1, Timeout, EventName)
+        {telemetry, #{name := [_, _, EventName], measurements := #{counter_inc := Inc}} = Event} ->
+            ct:pal("telemetry event: ~p", [Event]),
+            wait_n_events(TelemetryTable, ResourceId, NEvents - Inc, Timeout, EventName)
     after Timeout ->
         RecordedEvents = ets:tab2list(TelemetryTable),
         CurrentMetrics = current_metrics(ResourceId),
@@ -575,7 +579,6 @@ t_publish_success(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
-    QueryMode = ?config(query_mode, Config),
     Topic = <<"t/topic">>,
     ?check_trace(
         create_bridge(Config),
@@ -604,17 +607,6 @@ t_publish_success(Config) ->
     ),
     %% to avoid test flakiness
     wait_telemetry_event(TelemetryTable, success, ResourceId),
-    ExpectedInflightEvents =
-        case QueryMode of
-            sync -> 1;
-            async -> 3
-        end,
-    wait_telemetry_event(
-        TelemetryTable,
-        inflight,
-        ResourceId,
-        #{n_events => ExpectedInflightEvents, timeout => 5_000}
-    ),
     wait_until_gauge_is(queuing, 0, 500),
     wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
@@ -635,7 +627,6 @@ t_publish_success_local_topic(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
-    QueryMode = ?config(query_mode, Config),
     LocalTopic = <<"local/topic">>,
     {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}),
     assert_empty_metrics(ResourceId),
@@ -654,17 +645,6 @@ t_publish_success_local_topic(Config) ->
     ),
     %% to avoid test flakiness
     wait_telemetry_event(TelemetryTable, success, ResourceId),
-    ExpectedInflightEvents =
-        case QueryMode of
-            sync -> 1;
-            async -> 3
-        end,
-    wait_telemetry_event(
-        TelemetryTable,
-        inflight,
-        ResourceId,
-        #{n_events => ExpectedInflightEvents, timeout => 5_000}
-    ),
     wait_until_gauge_is(queuing, 0, 500),
     wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
@@ -696,7 +676,6 @@ t_publish_templated(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
-    QueryMode = ?config(query_mode, Config),
     Topic = <<"t/topic">>,
     PayloadTemplate = <<
         "{\"payload\": \"${payload}\","
@@ -742,17 +721,6 @@ t_publish_templated(Config) ->
     ),
     %% to avoid test flakiness
     wait_telemetry_event(TelemetryTable, success, ResourceId),
-    ExpectedInflightEvents =
-        case QueryMode of
-            sync -> 1;
-            async -> 3
-        end,
-    wait_telemetry_event(
-        TelemetryTable,
-        inflight,
-        ResourceId,
-        #{n_events => ExpectedInflightEvents, timeout => 5_000}
-    ),
     wait_until_gauge_is(queuing, 0, 500),
     wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
@@ -1089,9 +1057,6 @@ do_econnrefused_or_timeout_test(Config, Error) ->
         %% message as dropped; and since it never considers the
         %% response expired, this succeeds.
         econnrefused ->
-            wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
-                timeout => 10_000, n_events => 1
-            }),
             %% even waiting, hard to avoid flakiness... simpler to just sleep
             %% a bit until stabilization.
             ct:sleep(200),
@@ -1111,8 +1076,8 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                 CurrentMetrics
             );
         timeout ->
-            wait_until_gauge_is(inflight, 0, _Timeout = 400),
-            wait_until_gauge_is(queuing, 0, _Timeout = 400),
+            wait_until_gauge_is(inflight, 0, _Timeout = 1_000),
+            wait_until_gauge_is(queuing, 0, _Timeout = 1_000),
             assert_metrics(
                 #{
                     dropped => 0,

+ 4 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -103,6 +103,10 @@
 -define(HEALTHCHECK_INTERVAL, 15000).
 -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>).
 
+%% milliseconds
+-define(DEFAULT_METRICS_FLUSH_INTERVAL, 5_000).
+-define(DEFAULT_METRICS_FLUSH_INTERVAL_RAW, <<"5s">>).
+
 %% milliseconds
 -define(START_TIMEOUT, 5000).
 -define(START_TIMEOUT_RAW, <<"5s">>).

Разлика између датотеке није приказан због своје велике величине
+ 302 - 204
apps/emqx_resource/src/emqx_resource_buffer_worker.erl


+ 6 - 0
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -44,6 +44,7 @@ fields("creation_opts") ->
         {worker_pool_size, fun worker_pool_size/1},
         {health_check_interval, fun health_check_interval/1},
         {resume_interval, fun resume_interval/1},
+        {metrics_flush_interval, fun metrics_flush_interval/1},
         {start_after_created, fun start_after_created/1},
         {start_timeout, fun start_timeout/1},
         {auto_restart_interval, fun auto_restart_interval/1},
@@ -77,6 +78,11 @@ resume_interval(desc) -> ?DESC("resume_interval");
 resume_interval(required) -> false;
 resume_interval(_) -> undefined.
 
+metrics_flush_interval(type) -> emqx_schema:duration_ms();
+metrics_flush_interval(importance) -> ?IMPORTANCE_HIDDEN;
+metrics_flush_interval(required) -> false;
+metrics_flush_interval(_) -> undefined.
+
 health_check_interval(type) -> emqx_schema:duration_ms();
 health_check_interval(desc) -> ?DESC("health_check_interval");
 health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;

+ 70 - 16
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -316,7 +316,11 @@ t_query_counter_async_query(_) ->
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
         #{name => test_resource, register => true},
-        #{query_mode => async, batch_size => 1}
+        #{
+            query_mode => async,
+            batch_size => 1,
+            metrics_flush_interval => 50
+        }
     ),
     ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
     NMsgs = 1_000,
@@ -350,7 +354,11 @@ t_query_counter_async_query(_) ->
         end
     ),
     #{counters := C} = emqx_resource:get_metrics(?ID),
-    ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
+    ?retry(
+        _Sleep = 300,
+        _Attempts0 = 20,
+        ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C)
+    ),
     ok = emqx_resource:remove_local(?ID).
 
 t_query_counter_async_callback(_) ->
@@ -1171,6 +1179,7 @@ t_unblock_only_required_buffer_workers(_) ->
         #{
             query_mode => async,
             batch_size => 5,
+            metrics_flush_interval => 50,
             batch_time => 100
         }
     ),
@@ -1219,6 +1228,7 @@ t_retry_batch(_Config) ->
             batch_size => 5,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1318,6 +1328,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
             worker_pool_size => NumBufferWorkers,
             buffer_mode => volatile_offload,
             buffer_seg_bytes => 100,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1354,10 +1365,16 @@ t_delete_and_re_create_with_same_name(_Config) ->
 
             %% ensure that stuff got enqueued into disk
             tap_metrics(?LINE),
-            Queuing1 = emqx_resource_metrics:queuing_get(?ID),
-            Inflight1 = emqx_resource_metrics:inflight_get(?ID),
-            ?assert(Queuing1 > 0),
-            ?assertEqual(2, Inflight1),
+            ?retry(
+                _Sleep = 300,
+                _Attempts0 = 20,
+                ?assert(emqx_resource_metrics:queuing_get(?ID) > 0)
+            ),
+            ?retry(
+                _Sleep = 300,
+                _Attempts0 = 20,
+                ?assertEqual(2, emqx_resource_metrics:inflight_get(?ID))
+            ),
 
             %% now, we delete the resource
             process_flag(trap_exit, true),
@@ -1409,6 +1426,7 @@ t_always_overflow(_Config) ->
             batch_size => 1,
             worker_pool_size => 1,
             max_buffer_bytes => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1446,6 +1464,7 @@ t_retry_sync_inflight(_Config) ->
             query_mode => sync,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1496,6 +1515,7 @@ t_retry_sync_inflight_batch(_Config) ->
             batch_size => 2,
             batch_time => 200,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1546,6 +1566,7 @@ t_retry_async_inflight(_Config) ->
             query_mode => async,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1590,6 +1611,7 @@ t_retry_async_inflight_full(_Config) ->
             inflight_window => AsyncInflightWindow,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1653,6 +1675,7 @@ t_async_reply_multi_eval(_Config) ->
             batch_size => 3,
             batch_time => 10,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1667,7 +1690,7 @@ t_async_reply_multi_eval(_Config) ->
         #{}
     ),
     ?retry(
-        ResumeInterval,
+        2 * ResumeInterval,
         TotalTime div ResumeInterval,
         begin
             Metrics = tap_metrics(?LINE),
@@ -1683,7 +1706,7 @@ t_async_reply_multi_eval(_Config) ->
                 failed := Failed
             } = Counters,
             ?assertEqual(TotalQueries, Matched - 1),
-            ?assertEqual(Matched, Success + Dropped + LateReply + Failed)
+            ?assertEqual(Matched, Success + Dropped + LateReply + Failed, #{counters => Counters})
         end
     ).
 
@@ -1700,6 +1723,7 @@ t_retry_async_inflight_batch(_Config) ->
             batch_size => 2,
             batch_time => 200,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1745,6 +1769,7 @@ t_async_pool_worker_death(_Config) ->
             query_mode => async,
             batch_size => 1,
             worker_pool_size => NumBufferWorkers,
+            metrics_refresh_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1768,8 +1793,11 @@ t_async_pool_worker_death(_Config) ->
             inc_counter_in_parallel_increasing(NumReqs, 1, ReqOpts),
             {ok, _} = snabbkaffe:receive_events(SRef0),
 
-            Inflight0 = emqx_resource_metrics:inflight_get(?ID),
-            ?assertEqual(NumReqs, Inflight0),
+            ?retry(
+                _Sleep = 300,
+                _Attempts0 = 20,
+                ?assertEqual(NumReqs, emqx_resource_metrics:inflight_get(?ID))
+            ),
 
             %% grab one of the worker pids and kill it
             {ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state),
@@ -1820,6 +1848,7 @@ t_expiration_sync_before_sending(_Config) ->
             query_mode => sync,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1837,6 +1866,7 @@ t_expiration_sync_batch_before_sending(_Config) ->
             batch_size => 2,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1853,6 +1883,7 @@ t_expiration_async_before_sending(_Config) ->
             query_mode => async,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1870,6 +1901,7 @@ t_expiration_async_batch_before_sending(_Config) ->
             batch_size => 2,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1950,6 +1982,7 @@ t_expiration_sync_before_sending_partial_batch(_Config) ->
             batch_size => 2,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 250,
             resume_interval => 1_000
         }
     ),
@@ -1968,6 +2001,7 @@ t_expiration_async_before_sending_partial_batch(_Config) ->
             batch_size => 2,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 250,
             resume_interval => 1_000
         }
     ),
@@ -2057,7 +2091,14 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
                 ],
                 ?of_kind(buffer_worker_flush_potentially_partial, Trace)
             ),
-            wait_until_gauge_is(inflight, 0, 500),
+            wait_until_gauge_is(
+                inflight,
+                #{
+                    expected_value => 0,
+                    timeout => 500,
+                    max_events => 10
+                }
+            ),
             Metrics = tap_metrics(?LINE),
             case QueryMode of
                 async ->
@@ -2933,8 +2974,15 @@ install_telemetry_handler(TestCase) ->
     put({?MODULE, telemetry_table}, Tid),
     Tid.
 
-wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
-    Events = receive_all_events(GaugeName, Timeout),
+wait_until_gauge_is(
+    GaugeName,
+    #{
+        expected_value := ExpectedValue,
+        timeout := Timeout,
+        max_events := MaxEvents
+    }
+) ->
+    Events = receive_all_events(GaugeName, Timeout, MaxEvents),
     case length(Events) > 0 andalso lists:last(Events) of
         #{measurements := #{gauge_set := ExpectedValue}} ->
             ok;
@@ -2948,12 +2996,18 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
     end.
 
 receive_all_events(EventName, Timeout) ->
-    receive_all_events(EventName, Timeout, []).
+    receive_all_events(EventName, Timeout, _MaxEvents = 50, _Count = 0, _Acc = []).
+
+receive_all_events(EventName, Timeout, MaxEvents) ->
+    receive_all_events(EventName, Timeout, MaxEvents, _Count = 0, _Acc = []).
 
-receive_all_events(EventName, Timeout, Acc) ->
+receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
+    lists:reverse(Acc);
+receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
     receive
         {telemetry, #{name := [_, _, EventName]} = Event} ->
-            receive_all_events(EventName, Timeout, [Event | Acc])
+            ct:pal("telemetry event: ~p", [Event]),
+            receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
     after Timeout ->
         lists:reverse(Acc)
     end.