Sfoglia il codice sorgente

Add session proper test cases (#2768)

* Add session proper test cases
* Remove useless mockers
Gilbert 6 anni fa
parent
commit
abb4b07665
3 ha cambiato i file con 361 aggiunte e 22 eliminazioni
  1. 6 7
      src/emqx_protocol.erl
  2. 28 15
      src/emqx_session.erl
  3. 327 0
      test/prop_emqx_session.erl

+ 6 - 7
src/emqx_protocol.erl

@@ -193,8 +193,8 @@ handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState) ->
             puback(QoS, PacketId, ReasonCode, NPState)
     end;
 
-handle_in(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
-    case emqx_session:puback(PacketId, ReasonCode, Session) of
+handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), PState = #protocol{session = Session}) ->
+    case emqx_session:puback(PacketId, Session) of
         {ok, Publishes, NSession} ->
             handle_out({publish, Publishes}, PState#protocol{session = NSession});
         {ok, NSession} ->
@@ -204,7 +204,7 @@ handle_in(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses
     end;
 
 handle_in(?PUBREC_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
-    case emqx_session:pubrec(PacketId, ReasonCode, Session) of
+    case emqx_session:pubrec(PacketId, Session) of
         {ok, NSession} ->
             handle_out({pubrel, PacketId}, PState#protocol{session = NSession});
         {error, ReasonCode1} ->
@@ -212,15 +212,15 @@ handle_in(?PUBREC_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses
     end;
 
 handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
-    case emqx_session:pubrel(PacketId, ReasonCode, Session) of
+    case emqx_session:pubrel(PacketId, Session) of
         {ok, NSession} ->
             handle_out({pubcomp, PacketId}, PState#protocol{session = NSession});
         {error, ReasonCode1} ->
             handle_out({pubcomp, PacketId, ReasonCode1}, PState)
     end;
 
-handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
-    case emqx_session:pubcomp(PacketId, ReasonCode, Session) of
+handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), PState = #protocol{session = Session}) ->
+    case emqx_session:pubcomp(PacketId, Session) of
         {ok, Publishes, NSession} ->
             handle_out({publish, Publishes}, PState#protocol{session = NSession});
         {ok, NSession} ->
@@ -905,4 +905,3 @@ sp(false) -> 0.
 
 flag(true)  -> 1;
 flag(false) -> 0.
-

+ 28 - 15
src/emqx_session.erl

@@ -63,10 +63,10 @@
         ]).
 
 -export([ publish/3
-        , puback/3
-        , pubrec/3
-        , pubrel/3
-        , pubcomp/3
+        , puback/2
+        , pubrec/2
+        , pubrel/2
+        , pubcomp/2
         ]).
 
 -export([deliver/2]).
@@ -80,6 +80,9 @@
         , get_env/3
         ]).
 
+%% For test case
+-export([set_pkt_id/2]).
+
 -record(session, {
           %% Clean Start Flag
           clean_start :: boolean(),
@@ -167,8 +170,8 @@ init(CleanStart, #{zone := Zone}, #{max_inflight := MaxInflight,
 init_mqueue(Zone) ->
     emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000),
                        store_qos0 => get_env(Zone, mqueue_store_qos0, true),
-                       priorities => get_env(Zone, mqueue_priorities),
-                       default_priority => get_env(Zone, mqueue_default_priority)
+                       priorities => get_env(Zone, mqueue_priorities, none),
+                       default_priority => get_env(Zone, mqueue_default_priority, lowest)
                       }).
 
 %%--------------------------------------------------------------------
@@ -369,14 +372,17 @@ do_publish(PacketId, Msg = #message{timestamp = Ts},
 %% Client -> Broker: PUBACK
 %%--------------------------------------------------------------------
 
--spec(puback(emqx_types:packet_id(), emqx_types:reason_code(), session())
+-spec(puback(emqx_types:packet_id(), session())
       -> {ok, session()} | {ok, list(publish()), session()} |
          {error, emqx_types:reason_code()}).
-puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
+puback(PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {Msg, _Ts}} when is_record(Msg, message) ->
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
             dequeue(Session#session{inflight = Inflight1});
+        {value, {_OtherPub, _Ts}} ->
+            ?LOG(warning, "The PacketId has been used, PacketId: ~p", [PacketId]),
+            {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
             ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
             ok = emqx_metrics:inc('packets.puback.missed'),
@@ -387,9 +393,9 @@ puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
 %% Client -> Broker: PUBREC
 %%--------------------------------------------------------------------
 
--spec(pubrec(emqx_types:packet_id(), emqx_types:reason_code(), session())
+-spec(pubrec(emqx_types:packet_id(), session())
       -> {ok, session()} | {error, emqx_types:reason_code()}).
-pubrec(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
+pubrec(PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {Msg, _Ts}} when is_record(Msg, message) ->
             Inflight1 = emqx_inflight:update(PacketId, {pubrel, os:timestamp()}, Inflight),
@@ -408,9 +414,9 @@ pubrec(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
 %% Client -> Broker: PUBREL
 %%--------------------------------------------------------------------
 
--spec(pubrel(emqx_types:packet_id(), emqx_types:reason_code(), session())
+-spec(pubrel(emqx_types:packet_id(), session())
       -> {ok, session()} | {error, emqx_types:reason_code()}).
-pubrel(PacketId, _ReasonCode, Session = #session{awaiting_rel = AwaitingRel}) ->
+pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
     case maps:take(PacketId, AwaitingRel) of
         {_Ts, AwaitingRel1} ->
             {ok, Session#session{awaiting_rel = AwaitingRel1}};
@@ -424,10 +430,10 @@ pubrel(PacketId, _ReasonCode, Session = #session{awaiting_rel = AwaitingRel}) ->
 %% Client -> Broker: PUBCOMP
 %%--------------------------------------------------------------------
 
--spec(pubcomp(emqx_types:packet_id(), emqx_types:reason_code(), session())
+-spec(pubcomp(emqx_types:packet_id(), session())
       -> {ok, session()} | {ok, list(publish()), session()} |
          {error, emqx_types:reason_code()}).
-pubcomp(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
+pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:contain(PacketId, Inflight) of
         true ->
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
@@ -658,7 +664,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
             Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)},
             expire_awaiting_rel(More, Now, Session1);
         Age ->
-            ensure_await_rel_timer(Timeout - max(0, Age), Session)
+            {ok, ensure_await_rel_timer(Timeout - max(0, Age), Session)}
     end.
 
 %%--------------------------------------------------------------------
@@ -671,3 +677,10 @@ next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) ->
 next_pkt_id(Session = #session{next_pkt_id = Id}) ->
     Session#session{next_pkt_id = Id + 1}.
 
+%%---------------------------------------------------------------------
+%% For Test case
+%%---------------------------------------------------------------------
+
+
+set_pkt_id(Session, PktId) ->
+    Session#session{next_pkt_id = PktId}.

+ 327 - 0
test/prop_emqx_session.erl

@@ -0,0 +1,327 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(prop_emqx_session).
+
+-include("emqx_mqtt.hrl").
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(mock_modules,
+        [ emqx_metrics
+        , emqx_broker
+        , emqx_misc
+        , emqx_message
+        , emqx_hooks
+        , emqx_zone
+        , emqx_pd
+        ]).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+%%%%%%%%%%%%%%%%%%
+%%% Properties %%%
+%%%%%%%%%%%%%%%%%%
+prop_session_pub(opts) -> [{numtests, 1000}].
+
+prop_session_pub() ->
+    emqx_logger:set_log_level(emergency),
+
+    ?SETUP(fun() ->
+                   ok = load(?mock_modules),
+                   fun() -> ok = unload(?mock_modules) end
+           end,
+           ?FORALL({Session, OpList}, {session(), session_op_list()},
+                   begin
+                       try
+                           apply_ops(Session, OpList),
+                           true
+                       after
+                           ok
+                       end
+                   end)).
+
+%%%%%%%%%%%%%%%
+%%% Helpers %%%
+%%%%%%%%%%%%%%%
+
+apply_ops(Session, []) ->
+    ?assertEqual(session, element(1, Session));
+apply_ops(Session, [Op | Rest]) ->
+    NSession = apply_op(Session, Op),
+    apply_ops(NSession, Rest).
+
+apply_op(Session, info) ->
+    Info = emqx_session:info(Session),
+    ?assert(is_map(Info)),
+    ?assertEqual(16, maps:size(Info)),
+    Session;
+apply_op(Session, attrs) ->
+    Attrs = emqx_session:attrs(Session),
+    ?assert(is_map(Attrs)),
+    ?assertEqual(3, maps:size(Attrs)),
+    Session;
+apply_op(Session, stats) ->
+    Stats = emqx_session:stats(Session),
+    ?assert(is_list(Stats)),
+    ?assertEqual(9, length(Stats)),
+    Session;
+apply_op(Session, {subscribe, {Client, TopicFilter, SubOpts}}) ->
+    case emqx_session:subscribe(Client, TopicFilter, SubOpts, Session) of
+        {ok, NSession} ->
+            NSession;
+        {error, ?RC_QUOTA_EXCEEDED} ->
+            Session
+    end;
+apply_op(Session, {unsubscribe, {Client, TopicFilter}}) ->
+    case emqx_session:unsubscribe(Client, TopicFilter, Session) of
+        {ok, NSession} ->
+            NSession;
+        {error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
+            Session
+    end;
+apply_op(Session, {publish, {PacketId, Msg}}) ->
+    case emqx_session:publish(PacketId, Msg, Session) of
+        {ok, _Msg} ->
+            Session;
+        {ok, _Deliver, NSession} ->
+            NSession;
+        {error, _ErrorCode} ->
+            Session
+    end;
+apply_op(Session, {puback, PacketId}) ->
+    case emqx_session:puback(PacketId, Session) of
+        {ok, _Msg} ->
+            Session;
+        {ok, _Deliver, NSession} ->
+            NSession;
+        {error, _ErrorCode} ->
+            Session
+    end;
+apply_op(Session, {pubrec, PacketId}) ->
+    case emqx_session:pubrec(PacketId, Session) of
+        {ok, NSession} ->
+            NSession;
+        {error, _ErrorCode} ->
+            Session
+    end;
+apply_op(Session, {pubrel, PacketId}) ->
+    case emqx_session:pubrel(PacketId, Session) of
+        {ok, NSession} ->
+            NSession;
+        {error, _ErrorCode} ->
+            Session
+    end;
+apply_op(Session, {pubcomp, PacketId}) ->
+    case emqx_session:pubcomp(PacketId, Session) of
+        {ok, _Msgs} ->
+            Session;
+        {ok, _Msgs, NSession} ->
+            NSession;
+        {error, _ErrorCode} ->
+            Session
+    end;
+apply_op(Session, {deliver, Delivers}) ->
+    {ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session),
+    NSession;
+apply_op(Session, {timeout, {TRef, TimeoutMsg}}) ->
+    case emqx_session:timeout(TRef, TimeoutMsg, Session) of
+        {ok, NSession} ->
+            NSession;
+        {ok, _Msg, NSession} ->
+            NSession
+    end.
+
+%%%%%%%%%%%%%%%%%%
+%%% Generators %%%
+%%%%%%%%%%%%%%%%%%
+session_op_list() ->
+    Union = [info,
+             attrs,
+             stats,
+             {subscribe, sub_args()},
+             {unsubscribe, unsub_args()},
+             {publish, publish_args()},
+             {puback, puback_args()},
+             {pubrec, pubrec_args()},
+             {pubrel, pubrel_args()},
+             {pubcomp, pubcomp_args()},
+             {deliver, deliver_args()},
+             {timeout, timeout_args()}
+            ],
+    list(?LAZY(oneof(Union))).
+
+deliver_args() ->
+    list({deliver, topic(), message()}).
+
+timeout_args() ->
+    {tref(), timeout_msg()}.
+
+sub_args() ->
+    ?LET({ClientId, TopicFilter, SubOpts},
+         {clientid(), topic(), sub_opts()},
+         {#{client_id => ClientId}, TopicFilter, SubOpts}).
+
+unsub_args() ->
+    ?LET({ClientId, TopicFilter},
+         {clientid(), topic()},
+         {#{client_id => ClientId}, TopicFilter}).
+
+publish_args() ->
+    ?LET({PacketId, Message},
+         {packetid(), message()},
+         {PacketId, Message}).
+
+puback_args() ->
+    packetid().
+
+pubrec_args() ->
+    packetid().
+
+pubrel_args() ->
+    packetid().
+
+pubcomp_args() ->
+    packetid().
+
+timeout_msg() ->
+    oneof([retry_delivery, check_awaiting_rel]).
+
+tref() -> oneof([tref, undefined]).
+
+sub_opts() ->
+    ?LET({RH, RAP, NL, QOS, SHARE, SUBID},
+         {rh(), rap(), nl(), qos(), share(), subid()}
+        , make_subopts(RH, RAP, NL, QOS, SHARE, SUBID)).
+
+message() ->
+    ?LET({QoS, Topic, Payload},
+         {qos(), topic(), payload()},
+         emqx_message:make(proper, QoS, Topic, Payload)).
+
+subid() -> integer().
+
+rh() -> oneof([0, 1, 2]).
+
+rap() -> oneof([0, 1]).
+
+nl() -> oneof([0, 1]).
+
+qos() -> oneof([0, 1, 2]).
+
+share() -> binary().
+
+clientid() -> binary().
+
+topic() -> ?LET(No, choose(1, 10), begin
+                                       NoBin = integer_to_binary(No),
+                                       <<"topic/", NoBin/binary>>
+                                   end).
+
+payload() -> binary().
+
+packetid() -> choose(1, 30).
+
+zone() ->
+    ?LET(Zone, [{max_subscriptions, max_subscription()},
+                {upgrade_qos, upgrade_qos()},
+                {retry_interval, retry_interval()},
+                {max_awaiting_rel, max_awaiting_rel()},
+                {await_rel_timeout, await_rel_timeout()}]
+        , maps:from_list(Zone)).
+
+max_subscription() -> frequency([{33, 0},
+                                 {33, 1},
+                                 {34, choose(0,10)}]).
+
+upgrade_qos() -> bool().
+
+retry_interval() -> ?LET(Interval, choose(0, 20), Interval*1000).
+
+max_awaiting_rel() -> choose(0, 10).
+
+await_rel_timeout() -> ?LET(Interval, choose(0, 150), Interval*1000).
+
+max_inflight() -> choose(0, 10).
+
+expiry_interval() -> ?LET(EI, choose(1, 10), EI * 3600).
+
+option() ->
+    ?LET(Option, [{max_inflight, max_inflight()},
+                  {expiry_interval, expiry_interval()}]
+        , maps:from_list(Option)).
+
+cleanstart() -> bool().
+
+session() ->
+    ?LET({CleanStart, Zone, Options},
+         {cleanstart(), zone(), option()},
+         begin
+             Session = emqx_session:init(CleanStart, #{zone => Zone}, Options),
+             emqx_session:set_pkt_id(Session, 16#ffff)
+         end).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Internal functions %%%
+%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+make_subopts(RH, RAP, NL, QOS, SHARE, SubId) ->
+    #{rh => RH,
+      rap => RAP,
+      nl => NL,
+      qos => QOS,
+      share => SHARE,
+      subid => SubId}.
+
+
+load(Modules) ->
+    [mock(Module) || Module <- Modules],
+    ok.
+
+unload(Modules) ->
+    lists:foreach(fun(Module) ->
+                          ok = meck:unload(Module)
+                  end, Modules),
+    ok.
+
+mock(Module) ->
+    ok = meck:new(Module, [passthrough, no_history]),
+    do_mock(Module, expect(Module)).
+
+do_mock(emqx_metrics, Expect) ->
+    Expect(inc, fun(_Anything) -> ok end);
+do_mock(emqx_broker, Expect) ->
+    Expect(subscribe, fun(_, _, _) -> ok end),
+    Expect(set_subopts, fun(_, _) -> ok end),
+    Expect(unsubscribe, fun(_) -> ok end),
+    Expect(publish, fun(_) -> ok end);
+do_mock(emqx_misc, Expect) ->
+    Expect(start_timer, fun(_, _) -> tref end);
+do_mock(emqx_message, Expect) ->
+    Expect(set_header, fun(_Hdr, _Val, Msg) -> Msg end),
+    Expect(is_expired, fun(_Msg) -> (rand:uniform(16) > 8) end);
+do_mock(emqx_hooks, Expect) ->
+    Expect(run, fun(_Hook, _Args) -> ok end);
+do_mock(emqx_zone, Expect) ->
+    Expect(get_env, fun(Env, Key, Default) -> maps:get(Key, Env, Default) end);
+do_mock(emqx_pd, Expect) ->
+    Expect(update_counter, fun(_stats, _num) -> ok end).
+
+expect(Module) ->
+    fun(OldFun, NewFun) ->
+            ok = meck:expect(Module, OldFun, NewFun)
+    end.