Browse Source

improve receive maximum in connect packet

周子博 7 năm trước cách đây
mục cha
commit
064db65206

+ 5 - 1
src/emqx_inflight.erl

@@ -14,7 +14,7 @@
 
 -module(emqx_inflight).
 
--export([new/1, contain/2, lookup/2, insert/3, update/3, delete/2, values/1,
+-export([new/1, contain/2, lookup/2, insert/3, update/3, update_size/2, delete/2, values/1,
          to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]).
 
 -type(max_size() :: pos_integer()).
@@ -46,6 +46,10 @@ delete(Key, {?MODULE, MaxSize, Tree}) ->
 update(Key, Val, {?MODULE, MaxSize, Tree}) ->
     {?MODULE, MaxSize, gb_trees:update(Key, Val, Tree)}.
 
+-spec(update_size(integer(), inflight()) -> inflight()).
+update_size(MaxSize, {?MODULE, _OldMaxSize, Tree}) ->
+    {?MODULE, MaxSize, Tree}.
+
 -spec(is_full(inflight()) -> boolean()).
 is_full({?MODULE, 0, _Tree}) ->
     false;

+ 5 - 0
src/emqx_packet.erl

@@ -61,6 +61,11 @@ validate(?PUBLISH_PACKET(_QoS, Topic, _, Properties, _)) ->
     ((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid))
         andalso validate_properties(?PUBLISH, Properties);
 
+validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' := 0}})) ->
+    error(protocol_error);
+validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' := _}})) ->
+    true;
+
 validate(_Packet) ->
     true.
 

+ 28 - 15
src/emqx_protocol.erl

@@ -208,11 +208,8 @@ received(Packet = ?PACKET(Type), PState) ->
         true ->
             {Packet1, PState1} = preprocess_properties(Packet, PState),
             process_packet(Packet1, inc_stats(recv, Type, PState1));
-        {'EXIT', {topic_filters_invalid, _Stacktrace}} ->
-            deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
-            {error, topic_filters_invalid, PState};
         {'EXIT', {Reason, _Stacktrace}} ->
-            deliver({disconnect, ?RC_MALFORMED_PACKET}, PState),
+            deliver({disconnect, rc(Reason)}, PState),
             {error, Reason, PState}
     end.
 
@@ -593,17 +590,25 @@ try_open_session(#pstate{zone        = Zone,
         clean_start => CleanStart
     },
 
-    case emqx_sm:open_session(maps:put(expiry_interval, if 
-                                ProtoVer =:= ?MQTT_PROTO_V5 ->
-                                    maps:get('Session-Expiry-Interval', ConnProps, 0);
-                                true ->
-                                    case CleanStart of
-                                        true ->
-                                            0;
-                                        false ->
-                                            emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
-                                    end  
-                            end, SessAttrs)) of
+    MaxInflight = #{max_inflight => if 
+                                        ProtoVer =:= ?MQTT_PROTO_V5 ->
+                                            maps:get('Receive-Maximum', ConnProps, 65535);
+                                        true -> 
+                                            emqx_zone:get_env(Zone, max_inflight, 65535)
+                                    end},
+    SessionExpiryInterval = #{expiry_interval => if 
+                                                    ProtoVer =:= ?MQTT_PROTO_V5 ->
+                                                        maps:get('Session-Expiry-Interval', ConnProps, 0);
+                                                    true ->
+                                                        case CleanStart of
+                                                            true ->
+                                                                0;
+                                                            false ->
+                                                                emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
+                                                        end  
+                                                 end},
+
+    case emqx_sm:open_session(maps:merge(SessAttrs, maps:merge(MaxInflight, SessionExpiryInterval))) of
         {ok, SPid} ->
             {ok, SPid, false};
         Other -> Other
@@ -782,6 +787,14 @@ start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->
     Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75),
     self() ! {keepalive, start, round(Secs * Backoff)}.
 
+rc(Reason) ->
+    case Reason of
+        protocol_error -> ?RC_PROTOCOL_ERROR;
+        topic_filters_invalid -> ?RC_TOPIC_FILTER_INVALID;
+        topic_name_invalid -> ?RC_TOPIC_NAME_INVALID;
+        _ -> ?RC_MALFORMED_PACKET
+    end.
+
 %%-----------------------------------------------------------------------------
 %% Parse topic filters
 %%-----------------------------------------------------------------------------

+ 9 - 3
src/emqx_session.erl

@@ -47,7 +47,7 @@
 -export([info/1, attrs/1]).
 -export([stats/1]).
 -export([resume/2, discard/2]).
--export([update_expiry_interval/2]).
+-export([update_expiry_interval/2, update_max_inflight/2]).
 -export([subscribe/2, subscribe/4]).
 -export([publish/3]).
 -export([puback/2, puback/3]).
@@ -318,6 +318,9 @@ discard(SPid, ByPid) ->
 update_expiry_interval(SPid, Interval) ->
     gen_server:cast(SPid, {expiry_interval, Interval * 1000}).
 
+update_max_inflight(SPid, MaxInflight) ->
+    gen_server:cast(SPid, {max_inflight, MaxInflight}).
+
 -spec(close(spid()) -> ok).
 close(SPid) ->
     gen_server:call(SPid, close, infinity).
@@ -331,10 +334,10 @@ init([Parent, #{zone            := Zone,
                 username        := Username,
                 conn_pid        := ConnPid,
                 clean_start     := CleanStart,
-                expiry_interval := ExpiryInterval}]) ->
+                expiry_interval := ExpiryInterval,
+                max_inflight    := MaxInflight}]) ->
     process_flag(trap_exit, true),
     true = link(ConnPid),
-    MaxInflight = get_env(Zone, max_inflight),
     IdleTimout = get_env(Zone, idle_timeout, 30000),
     State = #state{idle_timeout      = IdleTimout,
                    clean_start       = CleanStart,
@@ -543,6 +546,9 @@ handle_cast({resume, ConnPid}, State = #state{client_id       = ClientId,
 handle_cast({expiry_interval, Interval}, State) ->
     {noreply, State#state{expiry_interval = Interval}};
 
+handle_cast({max_inflight, MaxInflight}, State) ->
+    {noreply, State#state{inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight)}};
+
 handle_cast(Msg, State) ->
     emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
     {noreply, State}.

+ 2 - 1
src/emqx_sm.erl

@@ -56,10 +56,11 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
                  end,
     emqx_sm_locker:trans(ClientId, CleanStart);
 
-open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) ->
+open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid, max_inflight := MaxInflight}) ->
     ResumeStart = fun(_) ->
                       case resume_session(ClientId, ConnPid) of
                           {ok, SPid} ->
+                              emqx_session:update_max_inflight(SPid, MaxInflight),
                               {ok, SPid, true};
                           {error, not_found} ->
                               emqx_session_sup:start_session(SessAttrs)

+ 2 - 1
test/emqx_mock_client.erl

@@ -51,7 +51,8 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
                conn_pid         => ClientPid,
                clean_start      => true,
                username         => undefined,
-               expiry_interval  => 0
+               expiry_interval  => 0,
+               max_inflight     => 0
              },
     {ok, SessPid} = emqx_sm:open_session(Attrs),
     {reply, {ok, SessPid},

+ 1 - 1
test/emqx_sm_SUITE.erl

@@ -25,7 +25,7 @@ t_open_close_session(_) ->
     emqx_ct_broker_helpers:run_setup_steps(),
     {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
     Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid,
-              zone => internal, username => <<"zhou">>, expiry_interval => 0},
+              zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0},
     {ok, SPid} = emqx_sm:open_session(Attrs),
     [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
     SPid = emqx_sm:lookup_session_pid(<<"client">>),