Procházet zdrojové kódy

feat(dsx): Create a stats callback for the DSX worker

ieQu1 před 1 rokem
rodič
revize
769893cabc
1 změnil soubory, kde provedl 36 přidání a 3 odebrání
  1. 36 3
      apps/emqx_durable_storage/src/emqx_dsx.erl

+ 36 - 3
apps/emqx_durable_storage/src/emqx_dsx.erl

@@ -26,7 +26,7 @@
 %% Convenience API:
 %% Convenience API:
 -export([c/0, c/1, stop/0, more/0]).
 -export([c/0, c/1, stop/0, more/0]).
 %% Callback definitions:
 %% Callback definitions:
--export([print/0, null/0]).
+-export([print/0, null/0, stats/1, create_stats_worker/1]).
 
 
 %% behaviour callbacks:
 %% behaviour callbacks:
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@@ -37,6 +37,7 @@
 -export_type([]).
 -export_type([]).
 
 
 -include("emqx_ds.hrl").
 -include("emqx_ds.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
 
 
 %%================================================================================
 %%================================================================================
 %% Type declarations
 %% Type declarations
@@ -55,6 +56,7 @@
 %% API functions
 %% API functions
 %%================================================================================
 %%================================================================================
 
 
+%%%% Print callback
 -spec print() -> callback().
 -spec print() -> callback().
 print() ->
 print() ->
     fun(Stream, Msgs, _) ->
     fun(Stream, Msgs, _) ->
@@ -66,12 +68,43 @@ print() ->
         )
         )
     end.
     end.
 
 
+%%%% Null callback
 -spec null() -> callback().
 -spec null() -> callback().
 null() ->
 null() ->
-    fun(_Stream, _Msgs, _State) ->
-        ok
+    fun(_, _, _) ->
+        undefined
     end.
     end.
 
 
+%%%% Stats callback
+%%
+%% @doc How to use this module in benchmark mode:
+%%
+%% 1. Start statistics server (`create_stats_worker(foo)')
+%%
+%% 2. Create clients with stats callback, with name of the worker passed:
+%% `emqx_dsx:start_link(#{topic => ..., callback => stats(foo)})'
+%%
+%% 3. Get stats by calling
+%% `emqx_metrics_worker:get_metrics(foo, foo).'.
+create_stats_worker(Name) ->
+    {ok, _} = emqx_metrics_worker:start_link(Name),
+    Metrics = [{counter, n_batches}, {counter, n_msgs}, {counter, n_bytes}, {slide, lag}],
+    ok = emqx_metrics_worker:create_metrics(Name, Name, Metrics).
+
+-spec stats(emqx_metrics_worker:handler_name()) -> callback().
+stats(Worker) ->
+    fun(_Stream, Msgs, _) ->
+        emqx_metrics_worker:inc(Worker, Worker, n_batches),
+        lists:foreach(fun(Msg) -> update_stats(Worker, Msg) end, Msgs)
+    end.
+
+update_stats(Worker, {_Key, Msg}) ->
+    emqx_metrics_worker:inc(Worker, Worker, n_msgs),
+    emqx_metrics_worker:inc(Worker, Worker, n_bytes, emqx_message:estimate_size(Msg)),
+    Lag = emqx_message:timestamp_now() - Msg#message.timestamp,
+    emqx_metrics_worker:observe(Worker, Worker, lag, Lag).
+
+%%%%%
 -spec start_link(#{
 -spec start_link(#{
     topic := binary(),
     topic := binary(),
     name => term(),
     name => term(),