فهرست منبع

Merge pull request #14068 from terry-xiaoyu/improve-log-readability-gateway-frame-error

fix: add handle_frame_error/2 to all gateway implementations
Xinyu Liu 1 سال پیش
والد
کامیت
ce433e752f

+ 16 - 34
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -490,7 +490,7 @@ handle_msg(
         channel = Channel
     }
 ) ->
-    ?SLOG(debug, #{msg => "RECV_data", data => Data}),
+    ?SLOG(debug, #{msg => "received_udp_proxy_data", data => Data}),
     Oct = iolist_size(Data),
     inc_counter(incoming_bytes, Oct),
     Ctx = ChannMod:info(ctx, Channel),
@@ -716,7 +716,7 @@ parse_incoming(
         channel = Channel
     }
 ) ->
-    ?SLOG(debug, #{msg => "RECV_data", data => Data}),
+    ?SLOG(debug, #{msg => "received_data", data => Data}),
     Oct = iolist_size(Data),
     inc_counter(incoming_bytes, Oct),
     Ctx = ChannMod:info(ctx, Channel),
@@ -726,14 +726,8 @@ parse_incoming(
 
 parse_incoming(<<>>, Packets, State) ->
     {Packets, State};
-parse_incoming(
-    Data,
-    Packets,
-    State = #state{
-        frame_mod = FrameMod,
-        parse_state = ParseState
-    }
-) ->
+parse_incoming(Data, Packets, State) ->
+    #state{frame_mod = FrameMod, parse_state = ParseState} = State,
     try FrameMod:parse(Data, ParseState) of
         {more, NParseState} ->
             {Packets, State#state{parse_state = NParseState}};
@@ -759,34 +753,22 @@ next_incoming_msgs(Packets) ->
 
 %%--------------------------------------------------------------------
 %% Handle incoming packet
-
-handle_incoming(
-    Packet,
-    State = #state{
-        channel = Channel,
-        frame_mod = FrameMod,
-        chann_mod = ChannMod
-    }
-) ->
+handle_incoming(Packet, State) ->
+    #state{channel = Channel, frame_mod = FrameMod, chann_mod = ChannMod} = State,
     Ctx = ChannMod:info(ctx, Channel),
     ok = inc_incoming_stats(Ctx, FrameMod, Packet),
-    ?SLOG(debug, #{
-        msg => "RECV_packet",
-        packet => FrameMod:format(Packet)
-    }),
+    do_handle_incoming(Packet, FrameMod, State).
+
+do_handle_incoming({frame_error, Reason}, _FrameMod, State) ->
+    with_channel(handle_frame_error, [Reason], State);
+do_handle_incoming(Packet, FrameMod, State) ->
+    ?SLOG(debug, #{msg => "packet_received", packet => FrameMod:format(Packet)}),
     with_channel(handle_in, [Packet], State).
 
 %%--------------------------------------------------------------------
 %% With Channel
 
-with_channel(
-    Fun,
-    Args,
-    State = #state{
-        chann_mod = ChannMod,
-        channel = Channel
-    }
-) ->
+with_channel(Fun, Args, State = #state{chann_mod = ChannMod, channel = Channel}) ->
     case erlang:apply(ChannMod, Fun, Args ++ [Channel]) of
         ok ->
             {ok, State};
@@ -842,7 +824,7 @@ serialize_and_inc_stats_fun(#state{
         try
             Data = FrameMod:serialize_pkt(Packet, Serialize),
             ?SLOG(debug, #{
-                msg => "SEND_packet",
+                msg => "send_packet",
                 %% XXX: optimize it, less cpu comsuption?
                 packet => FrameMod:format(Packet)
             }),
@@ -880,7 +862,7 @@ send(
         channel = Channel
     }
 ) ->
-    ?SLOG(debug, #{msg => "SEND_data", data => IoData}),
+    ?SLOG(debug, #{msg => "send_data", data => IoData}),
     Ctx = ChannMod:info(ctx, Channel),
     Oct = iolist_size(IoData),
     ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct),
@@ -921,7 +903,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
     end;
 handle_info({sock_error, Reason}, State) ->
     ?SLOG(debug, #{
-        msg => "sock_error",
+        msg => "gateway_sock_error",
         reason => Reason
     }),
     handle_info({sock_closed, Reason}, close_socket(State));

+ 4 - 0
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -32,6 +32,7 @@
 -export([
     init/2,
     handle_in/2,
+    handle_frame_error/2,
     handle_deliver/2,
     handle_timeout/3,
     terminate/2
@@ -188,6 +189,9 @@ handle_in(Msg, Channel0) ->
             call_session(handle_response, Msg, Channel)
     end.
 
+handle_frame_error(Reason, Channel) ->
+    {shutdown, Reason, Channel}.
+
 %%--------------------------------------------------------------------
 %% Handle Delivers from broker to client
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_coap, [
     {description, "CoAP Gateway"},
-    {vsn, "0.1.11"},
+    {vsn, "0.1.12"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 4 - 0
apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl

@@ -32,6 +32,7 @@
 -export([
     init/2,
     handle_in/2,
+    handle_frame_error/2,
     handle_deliver/2,
     handle_timeout/3,
     handle_call/3,
@@ -233,6 +234,9 @@ handle_in(Data, Channel) ->
     Req = #{bytes => Data},
     {ok, dispatch(on_received_bytes, Req, Channel)}.
 
+handle_frame_error(Reason, Channel) ->
+    {shutdown, Reason, Channel}.
+
 -spec handle_deliver(list(emqx_types:deliver()), channel()) ->
     {ok, channel()}
     | {shutdown, Reason :: term(), channel()}.

+ 1 - 1
apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_exproto, [
     {description, "ExProto Gateway"},
-    {vsn, "0.1.13"},
+    {vsn, "0.1.14"},
     {registered, []},
     {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_gbt32960, [
     {description, "GBT32960 Gateway"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 5 - 1
apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl

@@ -20,6 +20,7 @@
 -export([
     init/2,
     handle_in/2,
+    handle_frame_error/2,
     handle_deliver/2,
     handle_timeout/3,
     terminate/2,
@@ -271,9 +272,12 @@ handle_in(Frame = #frame{cmd = Cmd}, Channel = #channel{inflight = Inflight}) ->
     _ = upstreaming(Frame, NChannel),
     {ok, [{outgoing, Outgoings}], NChannel};
 handle_in(Frame, Channel) ->
-    log(warning, #{msg => "unexpected_frame", frame => Frame}, Channel),
+    log(warning, #{msg => "unexpected_gbt32960_frame", frame => Frame}, Channel),
     {ok, Channel}.
 
+handle_frame_error(Reason, Channel) ->
+    shutdown(Reason, Channel).
+
 %%--------------------------------------------------------------------
 %% Handle out
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_jt808, [
     {description, "JT/T 808 Gateway"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 5 - 1
apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl

@@ -21,6 +21,7 @@
 -export([
     init/2,
     handle_in/2,
+    handle_frame_error/2,
     handle_deliver/2,
     handle_timeout/3,
     handle_call/3,
@@ -237,9 +238,12 @@ handle_in(Frame = ?MSG(MType), Channel) when
     ?SLOG(debug, #{msg => "recv_frame", frame => Frame, info => "jt808_client_deregister"}),
     do_handle_in(Frame, Channel#channel{conn_state = disconnected});
 handle_in(Frame, Channel) ->
-    ?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
+    ?SLOG(error, #{msg => "unexpected_lwm2m_frame", frame => Frame}),
     {shutdown, unexpected_frame, Channel}.
 
+handle_frame_error(Reason, Channel) ->
+    {shutdown, Reason, Channel}.
+
 %% @private
 do_handle_in(Frame = ?MSG(?MC_GENERAL_RESPONSE), Channel = #channel{inflight = Inflight}) ->
     #{<<"body">> := #{<<"seq">> := Seq, <<"id">> := Id}} = Frame,

+ 1 - 1
apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_lwm2m, [
     {description, "LwM2M Gateway"},
-    {vsn, "0.1.7"},
+    {vsn, "0.1.8"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap, xmerl]},
     {env, []},

+ 4 - 0
apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl

@@ -37,6 +37,7 @@
 -export([
     init/2,
     handle_in/2,
+    handle_frame_error/2,
     handle_deliver/2,
     handle_timeout/3,
     terminate/2
@@ -179,6 +180,9 @@ handle_in(Msg, Channle) ->
     NChannel = update_life_timer(Channle),
     call_session(handle_coap_in, Msg, NChannel).
 
+handle_frame_error(Error, Channel) ->
+    {shutdown, Error, Channel}.
+
 %%--------------------------------------------------------------------
 %% Handle Delivers from broker to client
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_mqttsn, [
     {description, "MQTT-SN Gateway"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 5 - 10
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

@@ -36,6 +36,7 @@
 -export([
     init/2,
     handle_in/2,
+    handle_frame_error/2,
     handle_out/3,
     handle_deliver/2,
     handle_timeout/3,
@@ -440,7 +441,7 @@ ensure_keepalive_timer(Interval, Channel) ->
 %% Handle incoming packet
 %%--------------------------------------------------------------------
 
--spec handle_in(emqx_types:packet() | {frame_error, any()}, channel()) ->
+-spec handle_in(mqtt_sn_message(), channel()) ->
     {ok, channel()}
     | {ok, replies(), channel()}
     | {shutdown, Reason :: term(), channel()}
@@ -1017,15 +1018,9 @@ handle_in(
 ) ->
     AckPkt = ?SN_WILLMSGRESP_MSG(?SN_RC_ACCEPTED),
     NWillMsg = update_will_msg(WillMsg, Payload),
-    {ok, {outgoing, AckPkt}, Channel#channel{will_msg = NWillMsg}};
-handle_in(
-    {frame_error, Reason},
-    Channel = #channel{conn_state = _ConnState}
-) ->
-    ?SLOG(error, #{
-        msg => "unexpected_frame_error",
-        reason => Reason
-    }),
+    {ok, {outgoing, AckPkt}, Channel#channel{will_msg = NWillMsg}}.
+
+handle_frame_error(Reason, Channel) ->
     shutdown(Reason, Channel).
 
 after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) ->

+ 1 - 1
apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src

@@ -1,6 +1,6 @@
 {application, emqx_gateway_ocpp, [
     {description, "OCPP-J 1.6 Gateway for EMQX"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [kernel, stdlib, jesse, emqx, emqx_gateway]},
     {env, []},

+ 9 - 4
apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl

@@ -41,6 +41,7 @@
     init/2,
     authenticate/2,
     handle_in/2,
+    handle_frame_error/2,
     handle_deliver/2,
     handle_out/3,
     handle_timeout/3,
@@ -436,16 +437,20 @@ handle_in(Frame = #{type := Type}, Channel) when
 ->
     _ = publish(Frame, Channel),
     try_deliver(Channel);
-handle_in({frame_error, {badjson, ReasonStr}}, Channel) ->
+handle_in(Frame, Channel) ->
+    ?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
+    {ok, Channel}.
+
+handle_frame_error({badjson, ReasonStr}, Channel) ->
     shutdown({frame_error, {badjson, iolist_to_binary(ReasonStr)}}, Channel);
-handle_in({frame_error, {validation_faliure, Id, ReasonStr}}, Channel) ->
+handle_frame_error({validation_failure, Id, ReasonStr}, Channel) ->
     handle_out(
         dnstream,
         ?ERR_FRAME(Id, ?OCPP_ERR_FormationViolation, iolist_to_binary(ReasonStr)),
         Channel
     );
-handle_in(Frame, Channel) ->
-    ?SLOG(error, #{msg => "unexpected_incoming", frame => Frame}),
+handle_frame_error(Reason, Channel) ->
+    ?SLOG(error, #{msg => "ocpp_frame_error", reason => Reason}),
     {ok, Channel}.
 
 %%--------------------------------------------------------------------

+ 2 - 2
apps/emqx_gateway_ocpp/src/emqx_ocpp_frame.erl

@@ -79,7 +79,7 @@ parse([?OCPP_MSG_TYPE_ID_CALL, Id, Action, Payload], Parser) ->
         ok ->
             {ok, Frame, <<>>, Parser};
         {error, ReasonStr} ->
-            error({validation_faliure, Id, ReasonStr})
+            error({validation_failure, Id, ReasonStr})
     end;
 %% CALLRESULT
 parse([?OCPP_MSG_TYPE_ID_CALLRESULT, Id, Payload], Parser) ->
@@ -93,7 +93,7 @@ parse([?OCPP_MSG_TYPE_ID_CALLRESULT, Id, Payload], Parser) ->
     %%    ok ->
     %%        {ok, Frame, <<>>, Parser};
     %%    {error, ReasonStr} ->
-    %%        error({validation_faliure, Id, ReasonStr})
+    %%        error({validation_failure, Id, ReasonStr})
     %%end;
     {ok, Frame, <<>>, Parser};
 %% CALLERROR

+ 1 - 1
apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_stomp, [
     {description, "Stomp Gateway"},
-    {vsn, "0.1.8"},
+    {vsn, "0.1.9"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 5 - 3
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -35,6 +35,7 @@
 -export([
     init/2,
     handle_in/2,
+    handle_frame_error/2,
     handle_out/3,
     handle_deliver/2,
     handle_timeout/3,
@@ -670,10 +671,11 @@ handle_in(
 ) ->
     NewVal = emqx_pd:get_counter(recv_pkt),
     NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, NewVal, Heartbeat),
-    {ok, Channel#channel{heartbeat = NewHeartbeat}};
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
+    {ok, Channel#channel{heartbeat = NewHeartbeat}}.
+
+handle_frame_error(Reason, Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
+handle_frame_error(Reason, Channel = #channel{conn_state = _ConnState}) ->
     ErrMsg = io_lib:format("Frame error: ~0p", [Reason]),
     Frame = error_frame(undefined, ErrMsg),
     shutdown(Reason, Frame, Channel).

+ 1 - 0
changes/ce/fix-14068.en.md

@@ -0,0 +1 @@
+Add the `handle_frame_error/2` callback to all gateway implementation modules to handle message parsing errors.