|
@@ -473,10 +473,10 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
|
|
|
State = #state{sockpid = SockPid, channel = _Channel}) ->
|
|
State = #state{sockpid = SockPid, channel = _Channel}) ->
|
|
|
?LOG(debug, "RECV ~p", [Data], State),
|
|
?LOG(debug, "RECV ~p", [Data], State),
|
|
|
Oct = iolist_size(Data),
|
|
Oct = iolist_size(Data),
|
|
|
- emqx_pd:inc_counter(recv_oct, Oct),
|
|
|
|
|
|
|
+ inc_counter(recv_oct, Oct),
|
|
|
try emqx_sn_frame:parse(Data) of
|
|
try emqx_sn_frame:parse(Data) of
|
|
|
{ok, Msg} ->
|
|
{ok, Msg} ->
|
|
|
- emqx_pd:inc_counter(recv_cnt, 1),
|
|
|
|
|
|
|
+ inc_counter(recv_cnt, 1),
|
|
|
?LOG(info, "RECV ~s at state ~s",
|
|
?LOG(info, "RECV ~s at state ~s",
|
|
|
[emqx_sn_frame:format(Msg), StateName], State),
|
|
[emqx_sn_frame:format(Msg), StateName], State),
|
|
|
{keep_state, State, next_event({incoming, Msg})}
|
|
{keep_state, State, next_event({incoming, Msg})}
|
|
@@ -597,8 +597,8 @@ handle_info(Info, State = #state{channel = Channel}) ->
|
|
|
handle_return(emqx_channel:handle_info(Info, Channel), State).
|
|
handle_return(emqx_channel:handle_info(Info, Channel), State).
|
|
|
|
|
|
|
|
handle_ping(_PingReq, State) ->
|
|
handle_ping(_PingReq, State) ->
|
|
|
- emqx_pd:inc_counter(recv_oct, 2),
|
|
|
|
|
- emqx_pd:inc_counter(recv_msg, 1),
|
|
|
|
|
|
|
+ inc_counter(recv_oct, 2),
|
|
|
|
|
+ inc_counter(recv_msg, 1),
|
|
|
ok = send_message(?SN_PINGRESP_MSG(), State),
|
|
ok = send_message(?SN_PINGRESP_MSG(), State),
|
|
|
{keep_state, State}.
|
|
{keep_state, State}.
|
|
|
|
|
|
|
@@ -1075,18 +1075,24 @@ transform_fun() ->
|
|
|
fun(Packet, State) -> transform(Packet, FunMsgIdToTopicId, State) end.
|
|
fun(Packet, State) -> transform(Packet, FunMsgIdToTopicId, State) end.
|
|
|
|
|
|
|
|
inc_incoming_stats(Type) ->
|
|
inc_incoming_stats(Type) ->
|
|
|
- emqx_pd:inc_counter(recv_pkt, 1),
|
|
|
|
|
|
|
+ inc_counter(recv_pkt, 1),
|
|
|
case Type == ?PUBLISH of
|
|
case Type == ?PUBLISH of
|
|
|
true ->
|
|
true ->
|
|
|
- emqx_pd:inc_counter(recv_msg, 1),
|
|
|
|
|
- emqx_pd:inc_counter(incoming_pubs, 1);
|
|
|
|
|
|
|
+ inc_counter(recv_msg, 1),
|
|
|
|
|
+ inc_counter(incoming_pubs, 1);
|
|
|
false -> ok
|
|
false -> ok
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
inc_outgoing_stats(Type) ->
|
|
inc_outgoing_stats(Type) ->
|
|
|
- emqx_pd:inc_counter(send_pkt, 1),
|
|
|
|
|
- (Type == ?SN_PUBLISH)
|
|
|
|
|
- andalso emqx_pd:inc_counter(send_msg, 1).
|
|
|
|
|
|
|
+ inc_counter(send_pkt, 1),
|
|
|
|
|
+ case Type =:= ?SN_PUBLISH of
|
|
|
|
|
+ true -> inc_counter(send_msg, 1);
|
|
|
|
|
+ false -> ok
|
|
|
|
|
+ end.
|
|
|
|
|
|
|
|
next_event(Content) ->
|
|
next_event(Content) ->
|
|
|
{next_event, cast, Content}.
|
|
{next_event, cast, Content}.
|
|
|
|
|
+
|
|
|
|
|
+inc_counter(Key, Inc) ->
|
|
|
|
|
+ _ = emqx_pd:inc_counter(Key, Inc),
|
|
|
|
|
+ ok.
|