Преглед изворни кода

Add batch commit for metrics

周子博 пре 7 година
родитељ
комит
194dbc02c8

+ 9 - 0
etc/emqx.conf

@@ -479,6 +479,15 @@ mqtt.shared_subscription = true
 ## Value: true | false
 ## Value: true | false
 mqtt.ignore_loop_deliver = false
 mqtt.ignore_loop_deliver = false
 
 
+##--------------------------------------------------------------------
+## Metric
+##--------------------------------------------------------------------
+
+## Commit interval for metric
+##
+## Value: Duration
+metric.commit_interval = 10s
+
 ##--------------------------------------------------------------------
 ##--------------------------------------------------------------------
 ## Zones
 ## Zones
 ##--------------------------------------------------------------------
 ##--------------------------------------------------------------------

+ 9 - 0
priv/emqx.schema

@@ -616,6 +616,15 @@ end}.
   {datatype, {enum, [true, false]}}
   {datatype, {enum, [true, false]}}
 ]}.
 ]}.
 
 
+%%--------------------------------------------------------------------
+%% Metirc
+%%--------------------------------------------------------------------
+%% @doc Commit interval for metric
+{mapping, "metric.commit_interval", "emqx.metric_commit_interval", [
+  {default, "10s"},
+  {datatype, {duration, ms}}
+]}.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Zones
 %% Zones
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 7 - 2
src/emqx_broker.erl

@@ -321,14 +321,14 @@ topics() -> emqx_router:topics().
 
 
 init([Pool, Id]) ->
 init([Pool, Id]) ->
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
+    MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000),
+    emqx_metrics:start_timer(MetricCommitInterval, MetricCommitInterval div 2, {metric_commit, MetricCommitInterval}),
     {ok, #state{pool = Pool, id = Id, submap = #{}, submon = emqx_pmon:new()}}.
     {ok, #state{pool = Pool, id = Id, submap = #{}, submon = emqx_pmon:new()}}.
 
 
 handle_call(Req, _From, State) ->
 handle_call(Req, _From, State) ->
     emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
     emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
     {reply, ignored, State}.
 
 
-
-
 handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) ->
 handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) ->
     Subscriber = {SubPid, SubId},
     Subscriber = {SubPid, SubId},
     case ets:member(?SUBOPTION, {Topic, Subscriber}) of
     case ets:member(?SUBOPTION, {Topic, Subscriber}) of
@@ -373,6 +373,11 @@ handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{submap = Su
             {noreply, State}
             {noreply, State}
     end;
     end;
 
 
+handle_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) ->
+    emqx_metrics:commit(),
+    emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}),
+    {noreply, State};
+
 handle_info(Info, State) ->
 handle_info(Info, State) ->
     emqx_logger:error("[Broker] unexpected info: ~p", [Info]),
     emqx_logger:error("[Broker] unexpected info: ~p", [Info]),
     {noreply, State}.
     {noreply, State}.

+ 8 - 0
src/emqx_connection.erl

@@ -154,6 +154,10 @@ init([Transport, RawSocket, Options]) ->
             ok = emqx_misc:init_proc_mng_policy(Zone),
             ok = emqx_misc:init_proc_mng_policy(Zone),
 
 
             emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
             emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
+            MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000),
+            emqx_metrics:start_timer(MetricCommitInterval, 
+                                     MetricCommitInterval div 2, 
+                                     {metric_commit, MetricCommitInterval}),
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
                                   State, self(), IdleTimout);
                                   State, self(), IdleTimout);
         {error, Reason} ->
         {error, Reason} ->
@@ -228,6 +232,10 @@ handle_info({timeout, Timer, emit_stats},
             ?LOG(warning, "shutdown due to ~p", [Reason]),
             ?LOG(warning, "shutdown due to ~p", [Reason]),
             shutdown(Reason, NewState)
             shutdown(Reason, NewState)
     end;
     end;
+handle_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) ->
+    emqx_metrics:commit(),
+    emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}),
+    {noreply, State};
 handle_info(timeout, State) ->
 handle_info(timeout, State) ->
     shutdown(idle_timeout, State);
     shutdown(idle_timeout, State);
 
 

+ 43 - 6
src/emqx_metrics.erl

@@ -18,7 +18,8 @@
 
 
 -export([start_link/0]).
 -export([start_link/0]).
 -export([new/1, all/0]).
 -export([new/1, all/0]).
--export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]).
+-export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2, commit/0]).
+-export([start_timer/2, start_timer/3]).
 %% Received/sent metrics
 %% Received/sent metrics
 -export([received/1, sent/1]).
 -export([received/1, sent/1]).
 
 
@@ -133,10 +134,8 @@ inc(Metric, Val) when is_atom(Metric) ->
 
 
 %% @doc Increase metric value
 %% @doc Increase metric value
 -spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()).
 -spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()).
-inc(gauge, Metric, Val) ->
-    update_counter(key(gauge, Metric), {2, Val});
-inc(counter, Metric, Val) ->
-    update_counter(key(counter, Metric), {2, Val}).
+inc(Type, Metric, Val) ->
+    hold(Type, Metric, Val).
 
 
 %% @doc Decrease metric value
 %% @doc Decrease metric value
 -spec(dec(gauge, atom()) -> integer()).
 -spec(dec(gauge, atom()) -> integer()).
@@ -146,7 +145,7 @@ dec(gauge, Metric) ->
 %% @doc Decrease metric value
 %% @doc Decrease metric value
 -spec(dec(gauge, atom(), pos_integer()) -> integer()).
 -spec(dec(gauge, atom(), pos_integer()) -> integer()).
 dec(gauge, Metric, Val) ->
 dec(gauge, Metric, Val) ->
-    update_counter(key(gauge, Metric), {2, -Val}).
+    hold(gauge, Metric, -Val).
 
 
 %% @doc Set metric value
 %% @doc Set metric value
 set(Metric, Val) when is_atom(Metric) ->
 set(Metric, Val) when is_atom(Metric) ->
@@ -154,6 +153,44 @@ set(Metric, Val) when is_atom(Metric) ->
 set(gauge, Metric, Val) ->
 set(gauge, Metric, Val) ->
     ets:insert(?TAB, {key(gauge, Metric), Val}).
     ets:insert(?TAB, {key(gauge, Metric), Val}).
 
 
+% -spec(hold(counter | gauge, atom(), inc_dec | assign, integer()) -> integer()).
+hold(Type, Metric, Val) when Type =:= counter orelse Type =:= gauge ->
+    NewMetrics = case get(metrics) of
+                     undefined ->
+                         #{Metric => {Type, Val}};
+                     Metrics ->
+                         {Type, Count} = maps:get(Metric, Metrics, {Type, 0}),
+                         Metrics#{Metric => {Type, Count + Val}}
+                 end,
+    put(metrics, NewMetrics).
+
+commit() ->
+    case get(metrics) of
+        undefined ->
+            ok;
+        Metrics ->
+            maps:fold(fun(Metric, {Type, Val}, Acc) -> 
+                          update_counter(key(Type, Metric), {2, Val}),
+                          Acc
+                      end, 0, Metrics),
+            put(metrics, #{})
+    end.
+
+-spec(start_timer(integer(), term()) -> reference() | undefined).
+start_timer(Interval, Msg) ->
+    start_timer(Interval, 0, Msg).
+
+-spec(start_timer(integer(), integer(), term()) -> reference() | undefined).
+start_timer(Interval, MaxJitter, Msg) when Interval > 0 ->
+    emqx_misc:start_timer((Interval + case MaxJitter >= 1 of 
+                                          true ->
+                                              rand:uniform(MaxJitter);
+                                          false ->
+                                              0
+                                      end), Msg);
+start_timer(_Interval, _Jitter, _Msg) ->
+    undefined.
+
 %% @doc Metric key
 %% @doc Metric key
 key(gauge, Metric) ->
 key(gauge, Metric) ->
     {Metric, 0};
     {Metric, 0};

+ 9 - 0
src/emqx_session.erl

@@ -377,6 +377,10 @@ init([Parent, #{zone                := Zone,
     ok = emqx_gc:init(GcPolicy),
     ok = emqx_gc:init(GcPolicy),
     ok = emqx_misc:init_proc_mng_policy(Zone),
     ok = emqx_misc:init_proc_mng_policy(Zone),
     ok = proc_lib:init_ack(Parent, {ok, self()}),
     ok = proc_lib:init_ack(Parent, {ok, self()}),
+    MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000),
+    emqx_metrics:start_timer(MetricCommitInterval,
+                             MetricCommitInterval div 2,
+                             {metric_commit, MetricCommitInterval}),
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
 
 
 init_mqueue(Zone) ->
 init_mqueue(Zone) ->
@@ -624,6 +628,11 @@ handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, wil
     send_willmsg(WillMsg),
     send_willmsg(WillMsg),
     {noreply, State#state{will_msg = undefined}};
     {noreply, State#state{will_msg = undefined}};
 
 
+handle_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) ->
+    emqx_metrics:commit(),
+    emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}),
+    {noreply, State};
+
 handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
 handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
     send_willmsg(WillMsg),
     send_willmsg(WillMsg),
     {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
     {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};

+ 9 - 0
src/emqx_ws_connection.erl

@@ -137,6 +137,10 @@ websocket_init(#state{request = Req, options = Options}) ->
     lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS),
     lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS),
 
 
     emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
     emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
+    MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000),
+    emqx_metrics:start_timer(MetricCommitInterval, 
+                             MetricCommitInterval div 2, 
+                             {metric_commit, MetricCommitInterval}),
     {ok, #state{peername     = Peername,
     {ok, #state{peername     = Peername,
                 sockname     = Sockname,
                 sockname     = Sockname,
                 parser_state = ParserState,
                 parser_state = ParserState,
@@ -226,6 +230,11 @@ websocket_info({timeout, Timer, emit_stats},
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
     {ok, State#state{stats_timer = undefined}, hibernate};
     {ok, State#state{stats_timer = undefined}, hibernate};
 
 
+websocket_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) ->
+    emqx_metrics:commit(),
+    emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}),
+    {ok, State, hibernate};
+
 websocket_info({keepalive, start, Interval}, State) ->
 websocket_info({keepalive, start, Interval}, State) ->
     ?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
     ?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
     case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
     case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of

+ 4 - 0
test/emqx_metrics_SUITE.erl

@@ -29,11 +29,15 @@ t_inc_dec_metrics(_) ->
     emqx_metrics:inc(counter, 'bytes/received', 2),
     emqx_metrics:inc(counter, 'bytes/received', 2),
     emqx_metrics:inc({gauge, 'messages/retained'}, 2),
     emqx_metrics:inc({gauge, 'messages/retained'}, 2),
     emqx_metrics:inc(gauge, 'messages/retained', 2),
     emqx_metrics:inc(gauge, 'messages/retained', 2),
+    emqx_metrics:commit(),
     {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
     {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
     emqx_metrics:dec(gauge, 'messages/retained'),
     emqx_metrics:dec(gauge, 'messages/retained'),
     emqx_metrics:dec(gauge, 'messages/retained', 1),
     emqx_metrics:dec(gauge, 'messages/retained', 1),
+    emqx_metrics:commit(),
     2 = emqx_metrics:val('messages/retained'),
     2 = emqx_metrics:val('messages/retained'),
     emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}),
     emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}),
+    emqx_metrics:commit(),
     {1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')},
     {1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')},
     emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}),
     emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}),
+    emqx_metrics:commit(),
     {1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}.
     {1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}.

+ 1 - 0
test/emqx_session_SUITE.erl

@@ -45,6 +45,7 @@ ignore_loop(_Config) ->
     application:set_env(emqx, mqtt_ignore_loop_deliver, false).
     application:set_env(emqx, mqtt_ignore_loop_deliver, false).
 
 
 t_session_all(_) ->
 t_session_all(_) ->
+    application:set_env(emqx, metric_commit_interval, 10),
     ClientId = <<"ClientId">>,
     ClientId = <<"ClientId">>,
     {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
     {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
     {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
     {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),