Преглед изворни кода

Optimize metrics using counters and persistent_term (#2597)

* Optimize emqx_metrics module using persistent_term and counters in OTP 22
Feng Lee пре 6 година
родитељ
комит
f801ff1e53

+ 4 - 4
src/emqx_broker.erl

@@ -295,7 +295,7 @@ dispatch({shard, I}, Topic, Msg) ->
 inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
     ok;
 inc_dropped_cnt(_Topic) ->
-    emqx_metrics:inc('messages/dropped').
+    emqx_metrics:inc('messages.dropped').
 
 -spec(subscribers(emqx_topic:topic()) -> [pid()]).
 subscribers(Topic) when is_binary(Topic) ->
@@ -377,9 +377,9 @@ topics() ->
 %%------------------------------------------------------------------------------
 
 stats_fun() ->
-    safe_update_stats(?SUBSCRIBER, 'subscribers/count', 'subscribers/max'),
-    safe_update_stats(?SUBSCRIPTION, 'subscriptions/count', 'subscriptions/max'),
-    safe_update_stats(?SUBOPTION, 'suboptions/count', 'suboptions/max').
+    safe_update_stats(?SUBSCRIBER, 'subscribers.count', 'subscribers.max'),
+    safe_update_stats(?SUBSCRIPTION, 'subscriptions.count', 'subscriptions.max'),
+    safe_update_stats(?SUBOPTION, 'suboptions.count', 'suboptions.max').
 
 safe_update_stats(Tab, Stat, MaxStat) ->
     case ets:info(Tab, size) of

+ 1 - 1
src/emqx_cm.erl

@@ -205,5 +205,5 @@ clean_down({Pid, ClientId}) ->
 stats_fun() ->
     case ets:info(?CONN_TAB, size) of
         undefined -> ok;
-        Size -> emqx_stats:setstat('connections/count', 'connections/max', Size)
+        Size -> emqx_stats:setstat('connections.count', 'connections.max', Size)
     end.

+ 2 - 3
src/emqx_connection.erl

@@ -219,7 +219,7 @@ connected(enter, _, _State) ->
 
 %% Handle Input
 connected(cast, {incoming, Packet = ?PACKET(Type)}, State) ->
-    _ = emqx_metrics:received(Packet),
+    ok = emqx_metrics:inc_recv(Packet),
     (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1),
     handle_packet(Packet, fun(NState) ->
                               {keep_state, NState}
@@ -296,7 +296,7 @@ handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
     Oct = iolist_size(Data),
     ?LOG(debug, "[Connection] RECV ~p", [Data]),
     emqx_pd:update_counter(incoming_bytes, Oct),
-    emqx_metrics:trans(inc, 'bytes/received', Oct),
+    ok = emqx_metrics:inc('bytes.received', Oct),
     NState = ensure_stats_timer(maybe_gc({1, Oct}, State)),
     process_incoming(Data, [], NState);
 
@@ -336,7 +336,6 @@ handle(info, {timeout, Timer, emit_stats},
        State = #state{stats_timer = Timer,
                       proto_state = ProtoState,
                       gc_state = GcState}) ->
-    emqx_metrics:commit(),
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
     NState = State#state{stats_timer = undefined},
     Limits = erlang:get(force_shutdown_policy),

+ 324 - 214
src/emqx_metrics.erl

@@ -20,30 +20,31 @@
 -include("types.hrl").
 -include("emqx_mqtt.hrl").
 
--export([start_link/0]).
+-export([ start_link/0
+        , stop/0
+        ]).
 
 -export([ new/1
+        , new/2
         , all/0
         ]).
 
 -export([ val/1
         , inc/1
         , inc/2
-        , inc/3
+        , dec/1
         , dec/2
-        , dec/3
         , set/2
         ]).
 
 -export([ trans/2
         , trans/3
-        , trans/4
         , commit/0
         ]).
 
-%% Received/sent metrics
--export([ received/1
-        , sent/1
+%% Inc received/sent metrics
+-export([ inc_recv/1
+        , inc_sent/1
         ]).
 
 %% gen_server callbacks
@@ -55,267 +56,320 @@
         , code_change/3
         ]).
 
-%% Bytes sent and received of Broker
+-opaque(metric_idx() :: 1..1024).
+
+-type(metric_name() :: atom() | string() | binary()).
+
+-export_type([metric_idx/0]).
+
+-define(MAX_SIZE, 1024).
+-define(RESERVED_IDX, 256).
+-define(TAB, ?MODULE).
+-define(SERVER, ?MODULE).
+
+%% Bytes sent and received of broker
 -define(BYTES_METRICS, [
-    {counter, 'bytes/received'}, % Total bytes received
-    {counter, 'bytes/sent'}      % Total bytes sent
+    {counter, 'bytes.received'}, % Total bytes received
+    {counter, 'bytes.sent'}      % Total bytes sent
 ]).
 
 %% Packets sent and received of broker
 -define(PACKET_METRICS, [
-    {counter, 'packets/received'},              % All Packets received
-    {counter, 'packets/sent'},                  % All Packets sent
-    {counter, 'packets/connect'},               % CONNECT Packets received
-    {counter, 'packets/connack'},               % CONNACK Packets sent
-    {counter, 'packets/publish/received'},      % PUBLISH packets received
-    {counter, 'packets/publish/sent'},          % PUBLISH packets sent
-    {counter, 'packets/puback/received'},       % PUBACK packets received
-    {counter, 'packets/puback/sent'},           % PUBACK packets sent
-    {counter, 'packets/puback/missed'},         % PUBACK packets missed
-    {counter, 'packets/pubrec/received'},       % PUBREC packets received
-    {counter, 'packets/pubrec/sent'},           % PUBREC packets sent
-    {counter, 'packets/pubrec/missed'},         % PUBREC packets missed
-    {counter, 'packets/pubrel/received'},       % PUBREL packets received
-    {counter, 'packets/pubrel/sent'},           % PUBREL packets sent
-    {counter, 'packets/pubrel/missed'},         % PUBREL packets missed
-    {counter, 'packets/pubcomp/received'},      % PUBCOMP packets received
-    {counter, 'packets/pubcomp/sent'},          % PUBCOMP packets sent
-    {counter, 'packets/pubcomp/missed'},        % PUBCOMP packets missed
-    {counter, 'packets/subscribe'},             % SUBSCRIBE Packets received
-    {counter, 'packets/suback'},                % SUBACK packets sent
-    {counter, 'packets/unsubscribe'},           % UNSUBSCRIBE Packets received
-    {counter, 'packets/unsuback'},              % UNSUBACK Packets sent
-    {counter, 'packets/pingreq'},               % PINGREQ packets received
-    {counter, 'packets/pingresp'},              % PINGRESP Packets sent
-    {counter, 'packets/disconnect/received'},   % DISCONNECT Packets received
-    {counter, 'packets/disconnect/sent'},       % DISCONNECT Packets sent
-    {counter, 'packets/auth'}                   % Auth Packets received
+    {counter, 'packets.received'},              % All Packets received
+    {counter, 'packets.sent'},                  % All Packets sent
+    {counter, 'packets.connect.received'},      % CONNECT Packets received
+    {counter, 'packets.connack.sent'},          % CONNACK Packets sent
+    {counter, 'packets.connack.error'},         % CONNACK error sent
+    {counter, 'packets.connack.auth_error'},    % CONNACK auth_error sent
+    {counter, 'packets.publish.received'},      % PUBLISH packets received
+    {counter, 'packets.publish.sent'},          % PUBLISH packets sent
+    {counter, 'packets.publish.error'},         % PUBLISH failed for error
+    {counter, 'packets.publish.auth_error'},    % PUBLISH failed for auth error
+    {counter, 'packets.puback.received'},       % PUBACK packets received
+    {counter, 'packets.puback.sent'},           % PUBACK packets sent
+    {counter, 'packets.puback.missed'},         % PUBACK packets missed
+    {counter, 'packets.pubrec.received'},       % PUBREC packets received
+    {counter, 'packets.pubrec.sent'},           % PUBREC packets sent
+    {counter, 'packets.pubrec.missed'},         % PUBREC packets missed
+    {counter, 'packets.pubrel.received'},       % PUBREL packets received
+    {counter, 'packets.pubrel.sent'},           % PUBREL packets sent
+    {counter, 'packets.pubrel.missed'},         % PUBREL packets missed
+    {counter, 'packets.pubcomp.received'},      % PUBCOMP packets received
+    {counter, 'packets.pubcomp.sent'},          % PUBCOMP packets sent
+    {counter, 'packets.pubcomp.missed'},        % PUBCOMP packets missed
+    {counter, 'packets.subscribe.received'},    % SUBSCRIBE Packets received
+    {counter, 'packets.subscribe.error'},       % SUBSCRIBE error
+    {counter, 'packets.subscribe.auth_error'},  % SUBSCRIBE failed for not auth
+    {counter, 'packets.suback.sent'},           % SUBACK packets sent
+    {counter, 'packets.unsubscribe.received'},  % UNSUBSCRIBE Packets received
+    {counter, 'packets.unsubscribe.error'},     % UNSUBSCRIBE error
+    {counter, 'packets.unsuback.sent'},         % UNSUBACK Packets sent
+    {counter, 'packets.pingreq.received'},      % PINGREQ packets received
+    {counter, 'packets.pingresp.sent'},         % PINGRESP Packets sent
+    {counter, 'packets.disconnect.received'},   % DISCONNECT Packets received
+    {counter, 'packets.disconnect.sent'},       % DISCONNECT Packets sent
+    {counter, 'packets.auth.received'},         % Auth Packets received
+    {counter, 'packets.auth.sent'}              % Auth Packets sent
 ]).
 
 %% Messages sent and received of broker
 -define(MESSAGE_METRICS, [
-    {counter, 'messages/received'},      % All Messages received
-    {counter, 'messages/sent'},          % All Messages sent
-    {counter, 'messages/qos0/received'}, % QoS0 Messages received
-    {counter, 'messages/qos0/sent'},     % QoS0 Messages sent
-    {counter, 'messages/qos1/received'}, % QoS1 Messages received
-    {counter, 'messages/qos1/sent'},     % QoS1 Messages sent
-    {counter, 'messages/qos2/received'}, % QoS2 Messages received
-    {counter, 'messages/qos2/expired'},  % QoS2 Messages expired
-    {counter, 'messages/qos2/sent'},     % QoS2 Messages sent
-    {counter, 'messages/qos2/dropped'},  % QoS2 Messages dropped
-    {gauge,   'messages/retained'},      % Messagea retained
-    {counter, 'messages/dropped'},       % Messages dropped
-    {counter, 'messages/expired'},       % Messages expired
-    {counter, 'messages/forward'}        % Messages forward
+    {counter, 'messages.received'},      % All Messages received
+    {counter, 'messages.sent'},          % All Messages sent
+    {counter, 'messages.qos0.received'}, % QoS0 Messages received
+    {counter, 'messages.qos0.sent'},     % QoS0 Messages sent
+    {counter, 'messages.qos1.received'}, % QoS1 Messages received
+    {counter, 'messages.qos1.sent'},     % QoS1 Messages sent
+    {counter, 'messages.qos2.received'}, % QoS2 Messages received
+    {counter, 'messages.qos2.expired'},  % QoS2 Messages expired
+    {counter, 'messages.qos2.sent'},     % QoS2 Messages sent
+    {counter, 'messages.qos2.dropped'},  % QoS2 Messages dropped
+    {gauge,   'messages.retained'},      % Messagea retained
+    {counter, 'messages.dropped'},       % Messages dropped
+    {counter, 'messages.expired'},       % Messages expired
+    {counter, 'messages.forward'}        % Messages forward
 ]).
 
--define(TAB, ?MODULE).
--define(SERVER, ?MODULE).
+-record(state, {next_idx = 1}).
+
+-record(metric, {name, type, idx}).
 
 %% @doc Start the metrics server.
 -spec(start_link() -> startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
+-spec(stop() -> ok).
+stop() ->
+    gen_server:stop(?SERVER).
+
 %%------------------------------------------------------------------------------
 %% Metrics API
 %%------------------------------------------------------------------------------
 
-new({gauge, Name}) ->
-    ets:insert(?TAB, {{Name, 0}, 0});
-
-new({counter, Name}) ->
-    Schedulers = lists:seq(1, emqx_vm:schedulers()),
-    ets:insert(?TAB, [{{Name, I}, 0} || I <- Schedulers]).
+-spec(new(metric_name()) -> ok).
+new(Name) ->
+    new(counter, Name).
+
+-spec(new(gauge|counter, metric_name()) -> ok).
+new(gauge, Name) ->
+    create(gauge, Name);
+new(counter, Name) ->
+    create(counter, Name).
+
+%% @private
+create(Type, Name) ->
+    case gen_server:call(?SERVER, {create, Type, Name}) of
+        {ok, _Idx} -> ok;
+        {error, Reason} -> error(Reason)
+    end.
 
 %% @doc Get all metrics
--spec(all() -> [{atom(), non_neg_integer()}]).
+-spec(all() -> [{metric_name(), non_neg_integer()}]).
 all() ->
-    maps:to_list(
-        ets:foldl(
-            fun({{Metric, _N}, Val}, Map) ->
-                    case maps:find(Metric, Map) of
-                        {ok, Count} -> maps:put(Metric, Count+Val, Map);
-                        error -> maps:put(Metric, Val, Map)
-                    end
-            end, #{}, ?TAB)).
+    CRef = persistent_term:get(?MODULE),
+    [{Name, counters:get(CRef, Idx)}
+     || #metric{name = Name, idx = Idx} <- ets:tab2list(?TAB)].
 
 %% @doc Get metric value
--spec(val(atom()) -> non_neg_integer()).
-val(Metric) ->
-    lists:sum(ets:select(?TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
+-spec(val(metric_name()) -> maybe(non_neg_integer())).
+val(Name) ->
+    case ets:lookup(?TAB, Name) of
+        [#metric{idx = Idx}] ->
+            CRef = persistent_term:get(?MODULE),
+            counters:get(CRef, Idx);
+        [] -> undefined
+    end.
 
 %% @doc Increase counter
--spec(inc(atom()) -> non_neg_integer()).
-inc(Metric) ->
-    inc(counter, Metric, 1).
-
-%% @doc Increase metric value
--spec(inc({counter | gauge, atom()} | atom(), pos_integer()) -> non_neg_integer()).
-inc({gauge, Metric}, Val) ->
-    inc(gauge, Metric, Val);
-inc({counter, Metric}, Val) ->
-    inc(counter, Metric, Val);
-inc(Metric, Val) when is_atom(Metric) ->
-    inc(counter, Metric, Val).
+-spec(inc(metric_name()) -> ok).
+inc(Name) ->
+    inc(Name, 1).
 
 %% @doc Increase metric value
--spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()).
-inc(Type, Metric, Val) ->
-    update_counter(key(Type, Metric), {2, Val}).
+-spec(inc(metric_name(), pos_integer()) -> ok).
+inc(Name, Value) ->
+    update_counter(Name, Value).
 
 %% @doc Decrease metric value
--spec(dec(gauge, atom()) -> integer()).
-dec(gauge, Metric) ->
-    dec(gauge, Metric, 1).
+-spec(dec(metric_name()) -> ok).
+dec(Name) ->
+    dec(Name, 1).
 
 %% @doc Decrease metric value
--spec(dec(gauge, atom(), pos_integer()) -> integer()).
-dec(gauge, Metric, Val) ->
-    update_counter(key(gauge, Metric), {2, -Val}).
+-spec(dec(metric_name(), pos_integer()) -> ok).
+dec(Name, Value) ->
+    update_counter(Name, -Value).
 
 %% @doc Set metric value
-set(Metric, Val) when is_atom(Metric) ->
-    set(gauge, Metric, Val).
-set(gauge, Metric, Val) ->
-    ets:insert(?TAB, {key(gauge, Metric), Val}).
-
-trans(inc, Metric) ->
-    trans(inc, {counter, Metric}, 1).
-
-trans(Opt, {gauge, Metric}, Val) ->
-    trans(Opt, gauge, Metric, Val);
-trans(inc, {counter, Metric}, Val) ->
-    trans(inc, counter, Metric, Val);
-trans(inc, Metric, Val) when is_atom(Metric) ->
-    trans(inc, counter, Metric, Val);
-trans(dec, gauge, Metric) ->
-    trans(dec, gauge, Metric, 1).
-
-trans(inc, Type, Metric, Val) ->
-    hold(Type, Metric, Val);
-trans(dec, gauge, Metric, Val) ->
-    hold(gauge, Metric, -Val).
-
-hold(Type, Metric, Val) when Type =:= counter orelse Type =:= gauge ->
+-spec(set(metric_name(), integer()) -> ok).
+set(Name, Value) ->
+    CRef = persistent_term:get(?MODULE),
+    Idx = ets:lookup_element(?TAB, Name, 4),
+    counters:put(CRef, Idx, Value).
+
+-spec(trans(inc | dec, metric_name()) -> ok).
+trans(Op, Name) when Op =:= inc; Op =:= dec ->
+    trans(Op, Name, 1).
+
+-spec(trans(inc | dec, metric_name(), pos_integer()) -> ok).
+trans(inc, Name, Value) ->
+    cache(Name, Value);
+trans(dec, Name, Value) ->
+    cache(Name, -Value).
+
+-spec(cache(metric_name(), integer()) -> ok).
+cache(Name, Value) ->
     put('$metrics', case get('$metrics') of
                         undefined ->
-                            #{{Type, Metric} => Val};
+                            #{Name => Value};
                         Metrics ->
-                            maps:update_with({Type, Metric}, fun(Cnt) -> Cnt + Val end, Val, Metrics)
-                    end).
+                            maps:update_with(Name, fun(Cnt) -> Cnt + Value end, Value, Metrics)
+                    end),
+    ok.
 
+-spec(commit() -> ok).
 commit() ->
     case get('$metrics') of
         undefined -> ok;
         Metrics ->
-            maps:fold(fun({Type, Metric}, Val, _Acc) ->
-                          update_counter(key(Type, Metric), {2, Val})
-                      end, 0, Metrics),
-            erase('$metrics')
+            _ = erase('$metrics'),
+            lists:foreach(fun update_counter/1, maps:to_list(Metrics))
     end.
 
-%% @doc Metric key
-key(gauge, Metric) ->
-    {Metric, 0};
-key(counter, Metric) ->
-    {Metric, erlang:system_info(scheduler_id)}.
-
-update_counter(Key, UpOp) ->
-    ets:update_counter(?TAB, Key, UpOp).
-
-%%-----------------------------------------------------------------------------
-%% Received/Sent metrics
-%%-----------------------------------------------------------------------------
-
-%% @doc Count packets received.
--spec(received(emqx_mqtt_types:packet()) -> ok).
-received(Packet) ->
-    inc('packets/received'),
-    received1(Packet).
-received1(?PUBLISH_PACKET(QoS, _PktId)) ->
-    inc('packets/publish/received'),
-    inc('messages/received'),
-    qos_received(QoS);
-received1(?PACKET(Type)) ->
-    received2(Type).
-received2(?CONNECT) ->
-    inc('packets/connect');
-received2(?PUBACK) ->
-    inc('packets/puback/received');
-received2(?PUBREC) ->
-    inc('packets/pubrec/received');
-received2(?PUBREL) ->
-    inc('packets/pubrel/received');
-received2(?PUBCOMP) ->
-    inc('packets/pubcomp/received');
-received2(?SUBSCRIBE) ->
-    inc('packets/subscribe');
-received2(?UNSUBSCRIBE) ->
-    inc('packets/unsubscribe');
-received2(?PINGREQ) ->
-    inc('packets/pingreq');
-received2(?DISCONNECT) ->
-    inc('packets/disconnect/received');
-received2(_) ->
+update_counter({Name, Value}) ->
+    update_counter(Name, Value).
+
+update_counter(Name, Value) ->
+    CRef = persistent_term:get(?MODULE),
+    CIdx = case reserved_idx(Name) of
+               Idx when is_integer(Idx) -> Idx;
+               undefined ->
+                   ets:lookup_element(?TAB, Name, 4)
+           end,
+    counters:add(CRef, CIdx, Value).
+
+%%------------------------------------------------------------------------------
+%% Inc Received/Sent metrics
+%%------------------------------------------------------------------------------
+
+%% @doc Inc packets received.
+-spec(inc_recv(emqx_mqtt_types:packet()) -> ok).
+inc_recv(Packet) ->
+    inc('packets.received'),
+    do_inc_recv(Packet).
+
+do_inc_recv(?PACKET(?CONNECT)) ->
+    inc('packets.connect.received');
+do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) ->
+    inc('messages.received'),
+    case QoS of
+        ?QOS_0 -> inc('messages.qos0.received');
+        ?QOS_1 -> inc('messages.qos1.received');
+        ?QOS_2 -> inc('messages.qos2.received')
+    end,
+    inc('packets.publish.received');
+do_inc_recv(?PACKET(?PUBACK)) ->
+    inc('packets.puback.received');
+do_inc_recv(?PACKET(?PUBREC)) ->
+    inc('packets.pubrec.received');
+do_inc_recv(?PACKET(?PUBREL)) ->
+    inc('packets.pubrel.received');
+do_inc_recv(?PACKET(?PUBCOMP)) ->
+    inc('packets.pubcomp.received');
+do_inc_recv(?PACKET(?SUBSCRIBE)) ->
+    inc('packets.subscribe.received');
+do_inc_recv(?PACKET(?UNSUBSCRIBE)) ->
+    inc('packets.unsubscribe.received');
+do_inc_recv(?PACKET(?PINGREQ)) ->
+    inc('packets.pingreq.received');
+do_inc_recv(?PACKET(?DISCONNECT)) ->
+    inc('packets.disconnect.received');
+do_inc_recv(?PACKET(?AUTH)) ->
+    inc('packets.auth.received');
+do_inc_recv(_Packet) ->
     ignore.
-qos_received(?QOS_0) ->
-    inc('messages/qos0/received');
-qos_received(?QOS_1) ->
-    inc('messages/qos1/received');
-qos_received(?QOS_2) ->
-    inc('messages/qos2/received').
-
-%% @doc Count packets received. Will not count $SYS PUBLISH.
--spec(sent(emqx_mqtt_types:packet()) -> ignore | non_neg_integer()).
-sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
+
+%% @doc Inc packets sent. Will not count $SYS PUBLISH.
+-spec(inc_sent(emqx_mqtt_types:packet()) -> ok | ignore).
+inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
     ignore;
-sent(Packet) ->
-    inc('packets/sent'),
-    sent1(Packet).
-sent1(?PUBLISH_PACKET(QoS, _PktId)) ->
-    inc('packets/publish/sent'),
-    inc('messages/sent'),
-    qos_sent(QoS);
-sent1(?PACKET(Type)) ->
-    sent2(Type).
-sent2(?CONNACK) ->
-    inc('packets/connack');
-sent2(?PUBACK) ->
-    inc('packets/puback/sent');
-sent2(?PUBREC) ->
-    inc('packets/pubrec/sent');
-sent2(?PUBREL) ->
-    inc('packets/pubrel/sent');
-sent2(?PUBCOMP) ->
-    inc('packets/pubcomp/sent');
-sent2(?SUBACK) ->
-    inc('packets/suback');
-sent2(?UNSUBACK) ->
-    inc('packets/unsuback');
-sent2(?PINGRESP) ->
-    inc('packets/pingresp');
-sent2(?DISCONNECT) ->
-    inc('packets/disconnect/sent');
-sent2(_Type) ->
+inc_sent(Packet) ->
+    inc('packets.sent'),
+    do_inc_sent(Packet).
+
+do_inc_sent(?CONNACK_PACKET(ReasonCode)) ->
+    (ReasonCode == ?RC_SUCCESS) orelse inc('packets.connack.error'),
+    (ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.connack.auth_error'),
+    (ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'),
+    inc('packets.connack.sent');
+
+do_inc_sent(?PUBLISH_PACKET(QoS, _PacketId)) ->
+    inc('messages.sent'),
+    case QoS of
+        ?QOS_0 -> inc('messages.qos0.sent');
+        ?QOS_1 -> inc('messages.qos1.sent');
+        ?QOS_2 -> inc('messages.qos2.sent')
+    end,
+    inc('packets.publish.sent');
+do_inc_sent(?PUBACK_PACKET(_PacketId, ReasonCode)) ->
+    (ReasonCode >= ?RC_UNSPECIFIED_ERROR) andalso inc('packets.publish.error'),
+    (ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.publish.auth_error'),
+    inc('packets.puback.sent');
+do_inc_sent(?PUBREC_PACKET(_PacketId, ReasonCode)) ->
+    (ReasonCode >= ?RC_UNSPECIFIED_ERROR) andalso inc('packets.publish.error'),
+    (ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.publish.auth_error'),
+    inc('packets.pubrec.sent');
+do_inc_sent(?PACKET(?PUBREL)) ->
+    inc('packets.pubrel.sent');
+do_inc_sent(?PACKET(?PUBCOMP)) ->
+    inc('packets.pubcomp.sent');
+do_inc_sent(?PACKET(?SUBACK)) ->
+    inc('packets.suback.sent');
+do_inc_sent(?PACKET(?UNSUBACK)) ->
+    inc('packets.unsuback.sent');
+do_inc_sent(?PACKET(?PINGRESP)) ->
+    inc('packets.pingresp.sent');
+do_inc_sent(?PACKET(?DISCONNECT)) ->
+    inc('packets.disconnect.sent');
+do_inc_sent(?PACKET(?AUTH)) ->
+    inc('packets.auth.sent');
+do_inc_sent(_Packet) ->
     ignore.
-qos_sent(?QOS_0) ->
-    inc('messages/qos0/sent');
-qos_sent(?QOS_1) ->
-    inc('messages/qos1/sent');
-qos_sent(?QOS_2) ->
-    inc('messages/qos2/sent').
 
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
 %%------------------------------------------------------------------------------
 
 init([]) ->
-    % Create metrics table
-    ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]),
-    lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
-    {ok, #{}, hibernate}.
+    % Create counters array
+    CRef = counters:new(?MAX_SIZE, [write_concurrency]),
+    ok = persistent_term:put(?MODULE, CRef),
+    % Create index mapping table
+    ok = emqx_tables:new(?TAB, [protected, {keypos, 2}, {read_concurrency, true}]),
+    % Store reserved indices
+    lists:foreach(fun({Type, Name}) ->
+                          Idx = reserved_idx(Name),
+                          Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)},
+                          true = ets:insert(?TAB, Metric),
+                          ok = counters:put(CRef, Idx, 0)
+                  end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
+    {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
+
+handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->
+    ?LOG(error, "[Metrics] Failed to create ~s:~s for index exceeded.", [Type, Name]),
+    {reply, {error, metric_index_exceeded}, State};
+
+handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) ->
+    case ets:lookup(?TAB, Name) of
+        [#metric{idx = Idx}] ->
+            ?LOG(warning, "[Metrics] ~s already exists.", [Name]),
+            {reply, {ok, Idx}, State};
+        [] ->
+            Metric = #metric{name = Name, type = Type, idx = NextIdx},
+            true = ets:insert(?TAB, Metric),
+            {reply, {ok, NextIdx}, State#state{next_idx = NextIdx + 1}}
+    end;
 
 handle_call(Req, _From, State) ->
     ?LOG(error, "[Metrics] Unexpected call: ~p", [Req]),
@@ -329,9 +383,65 @@ handle_info(Info, State) ->
     ?LOG(error, "[Metrics] Unexpected info: ~p", [Info]),
     {noreply, State}.
 
-terminate(_Reason, #{}) ->
+terminate(_Reason, _State) ->
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+reserved_idx('bytes.received')               -> 01;
+reserved_idx('bytes.sent')                   -> 02;
+reserved_idx('packets.received')             -> 03;
+reserved_idx('packets.sent')                 -> 04;
+reserved_idx('packets.connect.received')     -> 05;
+reserved_idx('packets.connack.sent')         -> 06;
+reserved_idx('packets.connack.error')        -> 07;
+reserved_idx('packets.connack.auth_error')   -> 08;
+reserved_idx('packets.publish.received')     -> 09;
+reserved_idx('packets.publish.sent')         -> 10;
+reserved_idx('packets.publish.error')        -> 11;
+reserved_idx('packets.publish.auth_error')   -> 12;
+reserved_idx('packets.puback.received')      -> 13;
+reserved_idx('packets.puback.sent')          -> 14;
+reserved_idx('packets.puback.missed')        -> 15;
+reserved_idx('packets.pubrec.received')      -> 16;
+reserved_idx('packets.pubrec.sent')          -> 17;
+reserved_idx('packets.pubrec.missed')        -> 18;
+reserved_idx('packets.pubrel.received')      -> 19;
+reserved_idx('packets.pubrel.sent')          -> 20;
+reserved_idx('packets.pubrel.missed')        -> 21;
+reserved_idx('packets.pubcomp.received')     -> 22;
+reserved_idx('packets.pubcomp.sent')         -> 23;
+reserved_idx('packets.pubcomp.missed')       -> 24;
+reserved_idx('packets.subscribe.received')   -> 25;
+reserved_idx('packets.subscribe.error')      -> 26;
+reserved_idx('packets.subscribe.auth_error') -> 27;
+reserved_idx('packets.suback.sent')          -> 28;
+reserved_idx('packets.unsubscribe.received') -> 29;
+reserved_idx('packets.unsubscribe.error')    -> 30;
+reserved_idx('packets.unsuback.sent')        -> 31;
+reserved_idx('packets.pingreq.received')     -> 32;
+reserved_idx('packets.pingresp.sent')        -> 33;
+reserved_idx('packets.disconnect.received')  -> 34;
+reserved_idx('packets.disconnect.sent')      -> 35;
+reserved_idx('packets.auth.received')        -> 36;
+reserved_idx('packets.auth.sent')            -> 37;
+reserved_idx('messages.received')            -> 38;
+reserved_idx('messages.sent')                -> 39;
+reserved_idx('messages.qos0.received')       -> 40;
+reserved_idx('messages.qos0.sent')           -> 41;
+reserved_idx('messages.qos1.received')       -> 42;
+reserved_idx('messages.qos1.sent')           -> 43;
+reserved_idx('messages.qos2.received')       -> 44;
+reserved_idx('messages.qos2.expired')        -> 45;
+reserved_idx('messages.qos2.sent')           -> 46;
+reserved_idx('messages.qos2.dropped')        -> 47;
+reserved_idx('messages.retained')            -> 48;
+reserved_idx('messages.dropped')             -> 49;
+reserved_idx('messages.expired')             -> 50;
+reserved_idx('messages.forward')             -> 51;
+reserved_idx(_)                              -> undefined.

+ 2 - 2
src/emqx_protocol.erl

@@ -699,8 +699,8 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send})
             {ok, PState};
         {ok, Data} ->
             trace(send, Packet),
-            emqx_metrics:sent(Packet),
-            emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)),
+            emqx_metrics:inc_sent(Packet),
+            ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
             {ok, inc_stats(send, Type, PState)};
         {error, Reason} ->
             {error, Reason}

+ 2 - 2
src/emqx_router_helper.erl

@@ -160,8 +160,8 @@ stats_fun() ->
     case ets:info(?ROUTE, size) of
         undefined -> ok;
         Size ->
-            emqx_stats:setstat('routes/count', 'routes/max', Size),
-            emqx_stats:setstat('topics/count', 'topics/max', Size)
+            emqx_stats:setstat('routes.count', 'routes.max', Size),
+            emqx_stats:setstat('topics.count', 'topics.max', Size)
     end.
 
 cleanup_routes(Node) ->

+ 7 - 9
src/emqx_session.erl

@@ -422,7 +422,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From,
               end;
           true ->
               ?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
-              emqx_metrics:trans(inc, 'messages/qos2/dropped'),
+              ok = emqx_metrics:inc('messages.qos2.dropped'),
               {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
       end);
 
@@ -434,7 +434,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
               {ok, ensure_stats_timer(acked(pubrec, PacketId, State))};
           false ->
               ?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]),
-              emqx_metrics:trans(inc, 'packets/pubrec/missed'),
+              ok = emqx_metrics:inc('packets.pubrec.missed'),
               {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
       end);
 
@@ -446,7 +446,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
               {ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})};
           error ->
               ?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]),
-              emqx_metrics:trans(inc, 'packets/pubrel/missed'),
+              ok = emqx_metrics:inc('packets.pubrel.missed'),
               {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
       end);
 
@@ -496,7 +496,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
               ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
           false ->
               ?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]),
-              emqx_metrics:trans(inc, 'packets/puback/missed'),
+              ok = emqx_metrics:inc('packets.puback.missed'),
               State
       end);
 
@@ -508,7 +508,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
               ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State)));
           false ->
               ?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]),
-              emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
+              ok = emqx_metrics:inc('packets.pubcomp.missed'),
               State
       end);
 
@@ -587,7 +587,6 @@ handle_info({timeout, Timer, emit_stats},
             State = #state{client_id = ClientId,
                            stats_timer = Timer,
                            gc_state = GcState}) ->
-    emqx_metrics:commit(),
     _ = emqx_sm:set_session_stats(ClientId, stats(State)),
     NewState = State#state{stats_timer = undefined},
     Limits = erlang:get(force_shutdown_policy),
@@ -652,7 +651,6 @@ terminate(Reason, #state{will_msg = WillMsg,
                          username = Username,
                          conn_pid = ConnPid,
                          old_conn_pid = OldConnPid}) ->
-    emqx_metrics:commit(),
     send_willmsg(WillMsg),
     [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
     ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
@@ -728,7 +726,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
                             {publish, {PacketId, Msg}} ->
                                 case emqx_message:is_expired(Msg) of
                                     true ->
-                                        emqx_metrics:trans(inc, 'messages/expired'),
+                                        ok = emqx_metrics:inc('messages.expired'),
                                         emqx_inflight:delete(PacketId, Inflight);
                                     false ->
                                         redeliver({PacketId, Msg}, State),
@@ -770,7 +768,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
     Timeout = get_env(Zone, await_rel_timeout),
     case (timer:now_diff(Now, Ts) div 1000) of
         Age when Age >= Timeout ->
-            emqx_metrics:trans(inc, 'messages/qos2/expired'),
+            ok = emqx_metrics:inc('messages.qos2.expired'),
             ?LOG(warning, "[Session] Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
             expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
         Age ->

+ 1 - 1
src/emqx_shared_sub.erl

@@ -364,7 +364,7 @@ cleanup_down(SubPid) ->
         end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
 
 update_stats(State) ->
-    emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)),
+    emqx_stats:setstat('subscriptions.shared.count', 'subscriptions.shared.max', ets:info(?TAB, size)),
     State.
 
 %% Return 'true' if the subscriber process is alive AND not in the failed list

+ 2 - 2
src/emqx_sm.erl

@@ -284,8 +284,8 @@ clean_down(Session = {ClientId, SessPid}) ->
     end.
 
 stats_fun() ->
-    safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'),
-    safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max').
+    safe_update_stats(?SESSION_TAB, 'sessions.count', 'sessions.max'),
+    safe_update_stats(?SESSION_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max').
 
 safe_update_stats(Tab, Stat, MaxStat) ->
     case ets:info(Tab, size) of

+ 24 - 23
src/emqx_stats.erl

@@ -48,8 +48,8 @@
         ]).
 
 -record(update, {name, countdown, interval, func}).
--record(state, {timer, updates :: [#update{}],
-                tick_ms :: timeout()}).
+
+-record(state, {timer, updates :: [#update{}], tick_ms :: timeout()}).
 
 -type(stats() :: list({atom(), non_neg_integer()})).
 
@@ -57,41 +57,41 @@
 
 %% Connection stats
 -define(CONNECTION_STATS, [
-    'connections/count', % current connections
-    'connections/max'    % maximum connections connected
+    'connections.count', % current connections
+    'connections.max'    % maximum connections connected
 ]).
 
 %% Session stats
 -define(SESSION_STATS, [
-    'sessions/count',
-    'sessions/max',
-    'sessions/persistent/count',
-    'sessions/persistent/max'
+    'sessions.count',
+    'sessions.max',
+    'sessions.persistent.count',
+    'sessions.persistent.max'
 ]).
 
 %% Subscribers, Subscriptions stats
 -define(PUBSUB_STATS, [
-    'topics/count',
-    'topics/max',
-    'suboptions/count',
-    'suboptions/max',
-    'subscribers/count',
-    'subscribers/max',
-    'subscriptions/count',
-    'subscriptions/max',
-    'subscriptions/shared/count',
-    'subscriptions/shared/max'
+    'topics.count',
+    'topics.max',
+    'suboptions.count',
+    'suboptions.max',
+    'subscribers.count',
+    'subscribers.max',
+    'subscriptions.count',
+    'subscriptions.max',
+    'subscriptions.shared.count',
+    'subscriptions.shared.max'
 ]).
 
 -define(ROUTE_STATS, [
-    'routes/count',
-    'routes/max'
+    'routes.count',
+    'routes.max'
 ]).
 
 %% Retained stats
 -define(RETAINED_STATS, [
-    'retained/count',
-    'retained/max'
+    'retained.count',
+    'retained.max'
 ]).
 
 -define(TAB, ?MODULE).
@@ -181,7 +181,8 @@ start_timer(#state{tick_ms = Ms} = State) ->
     State#state{timer = emqx_misc:start_timer(Ms, tick)}.
 
 handle_call(stop, _From, State) ->
-    {stop, normal, _Reply = ok, State};
+    {stop, normal, ok, State};
+
 handle_call(Req, _From, State) ->
     ?LOG(error, "[Stats] Unexpected call: ~p", [Req]),
     {reply, ignored, State}.

+ 5 - 2
src/emqx_sys.erl

@@ -184,8 +184,11 @@ publish(stats, Stats) ->
     [safe_publish(systop(lists:concat(['stats/', Stat])), integer_to_binary(Val))
      || {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)];
 publish(metrics, Metrics) ->
-    [safe_publish(systop(lists:concat(['metrics/', Metric])), integer_to_binary(Val))
-     || {Metric, Val} <- Metrics, is_atom(Metric), is_integer(Val)].
+    [safe_publish(systop(metric_topic(Name)), integer_to_binary(Val))
+     || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)].
+
+metric_topic(Name) ->
+    lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]).
 
 safe_publish(Topic, Payload) ->
     safe_publish(Topic, #{}, Payload).

+ 2 - 3
src/emqx_ws_connection.erl

@@ -190,12 +190,12 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState,
     ?LOG(debug, "[WS Connection] RECV ~p", [Data]),
     BinSize = iolist_size(Data),
     emqx_pd:update_counter(recv_oct, BinSize),
-    emqx_metrics:trans(inc, 'bytes/received', BinSize),
+    ok = emqx_metrics:inc('bytes.received', BinSize),
     try emqx_frame:parse(iolist_to_binary(Data), ParseState) of
         {more, ParseState1} ->
             {ok, State#state{parse_state = ParseState1}};
         {ok, Packet, Rest} ->
-            emqx_metrics:received(Packet),
+            ok = emqx_metrics:inc_recv(Packet),
             emqx_pd:update_counter(recv_cnt, 1),
             case emqx_protocol:received(Packet, ProtoState) of
                 {ok, ProtoState1} ->
@@ -255,7 +255,6 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
 
 websocket_info({timeout, Timer, emit_stats},
                State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
-    emqx_metrics:commit(),
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
     {ok, State#state{stats_timer = undefined}, hibernate};
 

+ 4 - 4
test/emqx_broker_SUITE.erl

@@ -157,13 +157,13 @@ start_session(_) ->
 %% Metric Group
 %%--------------------------------------------------------------------
 inc_dec_metric(_) ->
-    emqx_metrics:inc(gauge, 'messages/retained', 10),
-    emqx_metrics:dec(gauge, 'messages/retained', 10).
+    emqx_metrics:inc('messages.retained', 10),
+    emqx_metrics:dec('messages.retained', 10).
 
 %%--------------------------------------------------------------------
 %% Stats Group
 %%--------------------------------------------------------------------
 
 set_get_stat(_) ->
-    emqx_stats:setstat('retained/max', 99),
-    99 = emqx_stats:getstat('retained/max').
+    emqx_stats:setstat('retained.max', 99),
+    99 = emqx_stats:getstat('retained.max').

+ 48 - 32
test/emqx_metrics_SUITE.erl

@@ -18,41 +18,57 @@
 -compile(nowarn_export_all).
 
 -include("emqx_mqtt.hrl").
+-include_lib("eunit/include/eunit.hrl").
 
-all() -> [t_inc_dec_metrics, t_trans].
+all() -> [t_inc_dec, t_inc_recv, t_inc_sent, t_trans].
 
-t_inc_dec_metrics(_) ->
+t_inc_dec(_) ->
     {ok, _} = emqx_metrics:start_link(),
-    {0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
-    emqx_metrics:inc('bytes/received'),
-    emqx_metrics:inc({counter, 'bytes/received'}, 2),
-    emqx_metrics:inc(counter, 'bytes/received', 1),
-    emqx_metrics:inc('bytes/received', 1),
-    emqx_metrics:inc({gauge, 'messages/retained'}, 2),
-    emqx_metrics:inc(gauge, 'messages/retained', 2),
-    {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
-    emqx_metrics:dec(gauge, 'messages/retained'),
-    emqx_metrics:dec(gauge, 'messages/retained', 1),
-    2 = emqx_metrics:val('messages/retained'),
-    emqx_metrics:set('messages/retained', 3),
-    3 = emqx_metrics:val('messages/retained'),
-    emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}),
-    {1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')},
-    emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}),
-    {1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}.
+    ?assertEqual(0, emqx_metrics:val('bytes.received')),
+    ?assertEqual(0, emqx_metrics:val('messages.retained')),
+    ok = emqx_metrics:inc('bytes.received'),
+    ok = emqx_metrics:inc('bytes.received', 2),
+    ok = emqx_metrics:inc('bytes.received', 2),
+    ?assertEqual(5, emqx_metrics:val('bytes.received')),
+    ok = emqx_metrics:inc('messages.retained', 2),
+    ok = emqx_metrics:inc('messages.retained', 2),
+    ?assertEqual(4, emqx_metrics:val('messages.retained')),
+    ok = emqx_metrics:dec('messages.retained'),
+    ok = emqx_metrics:dec('messages.retained', 1),
+    ?assertEqual(2, emqx_metrics:val('messages.retained')),
+    ok = emqx_metrics:set('messages.retained', 3),
+    ?assertEqual(3, emqx_metrics:val('messages.retained')),
+    ok = emqx_metrics:stop().
+
+t_inc_recv(_) ->
+    {ok, _} = emqx_metrics:start_link(),
+    ok = emqx_metrics:inc_recv(?PACKET(?CONNECT)),
+    ?assertEqual(1, emqx_metrics:val('packets.received')),
+    ?assertEqual(1, emqx_metrics:val('packets.connect.received')),
+    ok = emqx_metrics:stop().
+
+t_inc_sent(_) ->
+    {ok, _} = emqx_metrics:start_link(),
+    ok = emqx_metrics:inc_sent(?CONNACK_PACKET(0)),
+    ?assertEqual(1, emqx_metrics:val('packets.sent')),
+    ?assertEqual(1, emqx_metrics:val('packets.connack.sent')),
+    ok = emqx_metrics:stop().
 
 t_trans(_) ->
     {ok, _} = emqx_metrics:start_link(),
-    emqx_metrics:trans(inc, 'bytes/received'),
-    emqx_metrics:trans(inc, {counter, 'bytes/received'}, 2),
-    emqx_metrics:trans(inc, counter, 'bytes/received', 2),
-    emqx_metrics:trans(inc, {gauge, 'messages/retained'}, 2),
-    emqx_metrics:trans(inc, gauge, 'messages/retained', 2),
-    {0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
-    emqx_metrics:commit(),
-    {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
-    emqx_metrics:trans(dec, gauge, 'messages/retained'),
-    emqx_metrics:trans(dec, gauge, 'messages/retained', 1),
-    4 = emqx_metrics:val('messages/retained'),
-    emqx_metrics:commit(),
-    2 = emqx_metrics:val('messages/retained').
+    ok = emqx_metrics:trans(inc, 'bytes.received'),
+    ok = emqx_metrics:trans(inc, 'bytes.received', 2),
+    ?assertEqual(0, emqx_metrics:val('bytes.received')),
+    ok = emqx_metrics:trans(inc, 'messages.retained', 2),
+    ok = emqx_metrics:trans(inc, 'messages.retained', 2),
+    ?assertEqual(0, emqx_metrics:val('messages.retained')),
+    ok = emqx_metrics:commit(),
+    ?assertEqual(3, emqx_metrics:val('bytes.received')),
+    ?assertEqual(4, emqx_metrics:val('messages.retained')),
+    ok = emqx_metrics:trans(dec, 'messages.retained'),
+    ok = emqx_metrics:trans(dec, 'messages.retained', 1),
+    ?assertEqual(4, emqx_metrics:val('messages.retained')),
+    ok = emqx_metrics:commit(),
+    ?assertEqual(2, emqx_metrics:val('messages.retained')),
+    ok = emqx_metrics:stop().
+

+ 17 - 17
test/emqx_stats_tests.erl

@@ -18,27 +18,27 @@
 
 get_state_test() ->
     with_proc(fun() ->
-        SetConnsCount = emqx_stats:statsfun('connections/count'),
+        SetConnsCount = emqx_stats:statsfun('connections.count'),
         SetConnsCount(1),
-        1 = emqx_stats:getstat('connections/count'),
-        emqx_stats:setstat('connections/count', 2),
-        2 = emqx_stats:getstat('connections/count'),
-        emqx_stats:setstat('connections/count', 'connections/max', 3),
+        1 = emqx_stats:getstat('connections.count'),
+        emqx_stats:setstat('connections.count', 2),
+        2 = emqx_stats:getstat('connections.count'),
+        emqx_stats:setstat('connections.count', 'connections.max', 3),
         timer:sleep(100),
-        3 = emqx_stats:getstat('connections/count'),
-        3 = emqx_stats:getstat('connections/max'),
-        emqx_stats:setstat('connections/count', 'connections/max', 2),
+        3 = emqx_stats:getstat('connections.count'),
+        3 = emqx_stats:getstat('connections.max'),
+        emqx_stats:setstat('connections.count', 'connections.max', 2),
         timer:sleep(100),
-        2 = emqx_stats:getstat('connections/count'),
-        3 = emqx_stats:getstat('connections/max'),
-        SetConns = emqx_stats:statsfun('connections/count', 'connections/max'),
+        2 = emqx_stats:getstat('connections.count'),
+        3 = emqx_stats:getstat('connections.max'),
+        SetConns = emqx_stats:statsfun('connections.count', 'connections.max'),
         SetConns(4),
         timer:sleep(100),
-        4 = emqx_stats:getstat('connections/count'),
-        4 = emqx_stats:getstat('connections/max'),
+        4 = emqx_stats:getstat('connections.count'),
+        4 = emqx_stats:getstat('connections.max'),
         Conns = emqx_stats:getstats(),
-        4 = proplists:get_value('connections/count', Conns),
-        4 = proplists:get_value('connections/max', Conns)
+        4 = proplists:get_value('connections.count', Conns),
+        4 = proplists:get_value('connections.max', Conns)
     end).
 
 update_interval_test() ->
@@ -46,10 +46,10 @@ update_interval_test() ->
     with_proc(fun() ->
         SleepMs = TickMs * 2 + TickMs div 2, %% sleep for 2.5 ticks
         emqx_stats:cancel_update(cm_stats),
-        UpdFun = fun() -> emqx_stats:setstat('connections/count',  1) end,
+        UpdFun = fun() -> emqx_stats:setstat('connections.count',  1) end,
         ok = emqx_stats:update_interval(stats_test, UpdFun),
         timer:sleep(SleepMs),
-        ?assertEqual(1, emqx_stats:getstat('connections/count'))
+        ?assertEqual(1, emqx_stats:getstat('connections.count'))
     end, TickMs).
 
 helper_test_() ->