Jelajahi Sumber

refactor: separate function to handle `frame_error`

JimMoen 1 tahun lalu
induk
melakukan
6db1c0a446

+ 34 - 25
apps/emqx/src/emqx_channel.erl

@@ -563,29 +563,8 @@ handle_in(
     process_disconnect(ReasonCode, Properties, NChannel);
     process_disconnect(ReasonCode, Properties, NChannel);
 handle_in(?AUTH_PACKET(), Channel) ->
 handle_in(?AUTH_PACKET(), Channel) ->
     handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
     handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
-    shutdown(shutdown_count(frame_error, Reason), Channel);
-handle_in(
-    {frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
-) ->
-    shutdown(
-        shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
-    );
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
-    shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
-handle_in(
-    {frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState}
-) when
-    ConnState =:= connected orelse ConnState =:= reauthenticating
-->
-    handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when
-    ConnState =:= connected orelse ConnState =:= reauthenticating
-->
-    handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
-    ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
-    {ok, Channel};
+handle_in({frame_error, Reason}, Channel) ->
+    handle_frame_error(Reason, Channel);
 handle_in(Packet, Channel) ->
 handle_in(Packet, Channel) ->
     ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
     ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
     handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
     handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
@@ -1017,6 +996,37 @@ not_nacked({deliver, _Topic, Msg}) ->
             true
             true
     end.
     end.
 
 
+%%--------------------------------------------------------------------
+%% Handle Frame Error
+%%--------------------------------------------------------------------
+
+handle_frame_error(
+    Reason,
+    Channel = #channel{conn_state = idle}
+) ->
+    shutdown(shutdown_count(frame_error, Reason), Channel);
+handle_frame_error(
+    #{cause := frame_too_large} = R, Channel = #channel{conn_state = connecting}
+) ->
+    shutdown(
+        shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
+    );
+handle_frame_error(Reason, Channel = #channel{conn_state = connecting}) ->
+    shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
+handle_frame_error(
+    #{cause := frame_too_large}, Channel = #channel{conn_state = ConnState}
+) when
+    ConnState =:= connected orelse ConnState =:= reauthenticating
+->
+    handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
+handle_frame_error(Reason, Channel = #channel{conn_state = ConnState}) when
+    ConnState =:= connected orelse ConnState =:= reauthenticating
+->
+    handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
+handle_frame_error(Reason, Channel = #channel{conn_state = disconnected}) ->
+    ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
+    {ok, Channel}.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle outgoing packet
 %% Handle outgoing packet
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -2629,8 +2639,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
     NAliases = maps:put(Topic, AliasId, Aliases),
     NAliases = maps:put(Topic, AliasId, Aliases),
     TopicAliases#{outbound => NAliases}.
     TopicAliases#{outbound => NAliases}.
 
 
--compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}).
-
+-compile({inline, [reply/2, shutdown/2, shutdown/3]}).
 reply(Reply, Channel) ->
 reply(Reply, Channel) ->
     {reply, Reply, Channel}.
     {reply, Reply, Channel}.
 
 

+ 1 - 1
apps/emqx/src/emqx_frame.erl

@@ -1134,7 +1134,7 @@ validate_connect_reserved(0) -> ok;
 validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag).
 validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag).
 
 
 %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
 %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
-validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos);
+validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos);
 %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
 %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
 validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
 validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
 %% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
 %% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]

+ 4 - 4
apps/emqx/src/emqx_quic_connection.erl

@@ -62,7 +62,7 @@
     streams := [{pid(), quicer:stream_handle()}],
     streams := [{pid(), quicer:stream_handle()}],
     %% New stream opts
     %% New stream opts
     stream_opts := map(),
     stream_opts := map(),
-    %% If conneciton is resumed from session ticket
+    %% If connection is resumed from session ticket
     is_resumed => boolean(),
     is_resumed => boolean(),
     %% mqtt message serializer config
     %% mqtt message serializer config
     serialize => undefined,
     serialize => undefined,
@@ -70,8 +70,8 @@
 }.
 }.
 -type cb_ret() :: quicer_lib:cb_ret().
 -type cb_ret() :: quicer_lib:cb_ret().
 
 
-%% @doc  Data streams initializions are started in parallel with control streams, data streams are blocked
-%%       for the activation from control stream after it is accepted as a legit conneciton.
+%% @doc  Data streams initializations are started in parallel with control streams, data streams are blocked
+%%       for the activation from control stream after it is accepted as a legit connection.
 %%       For security, the initial number of allowed data streams from client should be limited by
 %%       For security, the initial number of allowed data streams from client should be limited by
 %%       'peer_bidi_stream_count` & 'peer_unidi_stream_count`
 %%       'peer_bidi_stream_count` & 'peer_unidi_stream_count`
 -spec activate_data_streams(pid(), {
 -spec activate_data_streams(pid(), {
@@ -80,7 +80,7 @@
 activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
 activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
     gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
     gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
 
 
-%% @doc conneciton owner init callback
+%% @doc connection owner init callback
 -spec init(map()) -> {ok, cb_state()}.
 -spec init(map()) -> {ok, cb_state()}.
 init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
 init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
     init(S#{stream_opts := maps:from_list(SOpts)});
     init(S#{stream_opts := maps:from_list(SOpts)});