Просмотр исходного кода

Make use of BUMP_PACKET_ID the only way to generate packet IDs

spring2maz 7 лет назад
Родитель
Сommit
efc9e34033
5 измененных файлов с 27 добавлено и 14 удалено
  1. 2 0
      Makefile
  2. 0 5
      include/emqx_mqtt.hrl
  3. 17 5
      src/emqx_client.erl
  4. 2 2
      src/portal/emqx_portal_mqtt.erl
  5. 6 2
      test/emqx_portal_SUITE.erl

+ 2 - 0
Makefile

@@ -39,6 +39,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
 			emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
 			emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message
 
+CT_SUITES = emqx_portal
+
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
 

+ 0 - 5
include/emqx_mqtt.hrl

@@ -176,11 +176,6 @@
 
 -define(MAX_PACKET_ID, 16#ffff).
 -define(MAX_PACKET_SIZE, 16#fffffff).
--define(BUMP_PACKET_ID(Base, Bump),
-        case Base + Bump of
-            __I__ when __I__ > ?MAX_PACKET_ID -> __I__ - ?MAX_PACKET_ID;
-            __I__ -> __I__
-        end).
 
 %%--------------------------------------------------------------------
 %% MQTT Frame Mask

+ 17 - 5
src/emqx_client.erl

@@ -35,6 +35,7 @@
 -export([pubcomp/2, pubcomp/3, pubcomp/4]).
 -export([subscriptions/1]).
 -export([info/1, stop/1]).
+-export([next_packet_id/1, next_packet_id/2]).
 %% For test cases
 -export([pause/1, resume/1]).
 
@@ -421,6 +422,19 @@ disconnect(Client, ReasonCode) ->
 disconnect(Client, ReasonCode, Properties) ->
     gen_statem:call(Client, {disconnect, ReasonCode, Properties}).
 
+-spec next_packet_id(packet_id()) -> packet_id().
+next_packet_id(?MAX_PACKET_ID) -> 1;
+next_packet_id(Id) -> Id + 1.
+
+-spec next_packet_id(packet_id(), integer()) -> packet_id().
+next_packet_id(Id, Bump) ->
+    true = (Bump < ?MAX_PACKET_ID div 2), %% assert
+    N = Id + Bump,
+    case N > ?MAX_PACKET_ID of
+        true -> N - ?MAX_PACKET_ID;
+        false -> N
+    end.
+
 %%------------------------------------------------------------------------------
 %% For test cases
 %%------------------------------------------------------------------------------
@@ -1354,7 +1368,7 @@ send(Packet, State = #state{socket = Sock, proto_ver = Ver})
     Data = emqx_frame:serialize(Packet, #{version => Ver}),
     emqx_logger:debug("SEND Data: ~p", [Data]),
     case emqx_client_sock:send(Sock, Data) of
-        ok  -> {ok, next_packet_id(State)};
+        ok  -> {ok, bump_last_packet_id(State)};
         Error -> Error
     end.
 
@@ -1397,8 +1411,6 @@ assign_packet_id([H | T], Id) ->
 assign_packet_id([], _Id) ->
     [].
 
-next_packet_id(State = #state{last_packet_id = Id}) ->
-    State#state{last_packet_id = next_packet_id(Id)};
-next_packet_id(16#ffff) -> 1;
-next_packet_id(Id) -> Id + 1.
+bump_last_packet_id(State = #state{last_packet_id = Id}) ->
+    State#state{last_packet_id = next_packet_id(Id)}.
 

+ 2 - 2
src/portal/emqx_portal_mqtt.erl

@@ -103,7 +103,7 @@ safe_stop(Pid, StopF, Timeout) ->
 send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, Batch) ->
     case emqx_client:publish(ClientPid, Batch) of
         {ok, BasePktId} ->
-            LastPktId = ?BUMP_PACKET_ID(BasePktId, length(Batch) - 1),
+            LastPktId = emqx_client:next_packet_id(BasePktId, length(Batch) - 1),
             AckCollector ! ?SENT(?RANGE(BasePktId, LastPktId)),
             %% return last pakcet id as batch reference
             {ok, LastPktId};
@@ -145,7 +145,7 @@ match_acks_1(Parent, {{value, PktId}, Acked}, [?RANGE(PktId, PktId) | Sent]) ->
     ok = emqx_portal:handle_ack(Parent, PktId),
     match_acks(Parent, Acked, Sent);
 match_acks_1(Parent, {{value, PktId}, Acked}, [?RANGE(PktId, Max) | Sent]) ->
-    match_acks(Parent, Acked, [?RANGE(PktId + 1, Max) | Sent]).
+    match_acks(Parent, Acked, [?RANGE(emqx_client:next_packet_id(PktId), Max) | Sent]).
 
 %% When puback for QoS-1 message is received from remote MQTT broker
 %% NOTE: no support for QoS-2

+ 6 - 2
test/emqx_portal_SUITE.erl

@@ -156,6 +156,7 @@ t_mqtt(Config) when is_list(Config) ->
                       end, Msgs),
         ok = receive_and_match_messages(Ref, Msgs),
         ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2),
+        timer:sleep(200),
         Msgs2 = lists:seq(Max + 1, Max * 2),
         lists:foreach(fun(I) ->
                               Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)),
@@ -181,8 +182,11 @@ do_receive_and_match_messages(_Ref, []) -> ok;
 do_receive_and_match_messages(Ref, [I | Rest]) ->
     receive
         {Ref, timeout} -> erlang:error(timeout);
-        {Ref, [#{payload := P}]} ->
-            ?assertEqual(I, binary_to_integer(P)),
+        {Ref, [#{payload := P} = Msg]} ->
+            case I =:= binary_to_integer(P) of
+                true -> ok;
+                false -> throw({unexpected, Msg, [I | Rest]})
+            end,
             do_receive_and_match_messages(Ref, Rest)
     end.