Przeglądaj źródła

Add more test cases

Feng Lee 6 lat temu
rodzic
commit
aecda09b9a
2 zmienionych plików z 200 dodań i 31 usunięć
  1. 31 17
      src/emqx_session.erl
  2. 169 14
      test/emqx_session_SUITE.erl

+ 31 - 17
src/emqx_session.erl

@@ -100,27 +100,25 @@
           max_subscriptions :: non_neg_integer(),
           %% Upgrade QoS?
           upgrade_qos :: boolean(),
-          %% Client <- Broker:
-          %% Inflight QoS1, QoS2 messages sent to the client but unacked.
+          %% Client <- Broker: QoS1/2 messages sent to the client but unacked.
           inflight :: emqx_inflight:inflight(),
-          %% All QoS1, QoS2 messages published to when client is disconnected.
-          %% QoS 1 and QoS 2 messages pending transmission to the Client.
+          %% All QoS1/2 messages published to when client is disconnected,
+          %% or QoS1/2 messages pending transmission to the Client.
           %%
-          %% Optionally, QoS 0 messages pending transmission to the Client.
+          %% Optionally, QoS0 messages pending transmission to the Client.
           mqueue :: emqx_mqueue:mqueue(),
           %% Next packet id of the session
           next_pkt_id = 1 :: emqx_types:packet_id(),
           %% Retry interval for redelivering QoS1/2 messages
           retry_interval :: timeout(),
-          %% Client -> Broker:
-          %% Inflight QoS2 messages received from client and waiting for pubrel.
+          %% Client -> Broker: QoS2 messages received from client and waiting for pubrel.
           awaiting_rel :: map(),
           %% Max Packets Awaiting PUBREL
           max_awaiting_rel :: non_neg_integer(),
           %% Awaiting PUBREL Timeout
           await_rel_timeout :: timeout(),
-          %% Enqueue Count
-          enqueue_cnt :: non_neg_integer(),
+          %% Deliver Stats
+          deliver_stats :: emqx_types:stats(),
           %% Created at
           created_at :: pos_integer()
          }).
@@ -131,7 +129,9 @@
 
 -define(DEFAULT_BATCH_N, 1000).
 
--define(ATTR_KEYS, [inflight_max,
+-define(ATTR_KEYS, [inflight_cnt,
+                    inflight_max,
+                    mqueue_len,
                     mqueue_max,
                     retry_interval,
                     awaiting_rel_max,
@@ -168,7 +168,7 @@
                     ]).
 
 %%--------------------------------------------------------------------
-%% Init a session
+%% Init a Session
 %%--------------------------------------------------------------------
 
 %% @doc Init a session.
@@ -184,10 +184,10 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
              awaiting_rel      = #{},
              max_awaiting_rel  = get_env(Zone, max_awaiting_rel, 100),
              await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000),
-             enqueue_cnt       = 0,
              created_at        = erlang:system_time(second)
             }.
 
+%% @private init mq
 init_mqueue(Zone) ->
     emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000),
                        store_qos0 => get_env(Zone, mqueue_store_qos0, true),
@@ -220,6 +220,8 @@ info(subscriptions_max, #session{max_subscriptions = MaxSubs}) ->
 info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) ->
     UpgradeQoS;
 info(inflight, #session{inflight = Inflight}) ->
+    Inflight;
+info(inflight_cnt, #session{inflight = Inflight}) ->
     emqx_inflight:size(Inflight);
 info(inflight_max, #session{inflight = Inflight}) ->
     emqx_inflight:max_size(Inflight);
@@ -234,13 +236,15 @@ info(mqueue_dropped, #session{mqueue = MQueue}) ->
 info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
     PacketId;
 info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
-    maps:size(AwaitingRel);
+    maps:values(AwaitingRel);
+info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
+    AwaitingRel;
 info(awaiting_rel_max, #session{max_awaiting_rel = MaxAwaitingRel}) ->
     MaxAwaitingRel;
 info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
     Timeout;
-info(enqueue_cnt, #session{enqueue_cnt = Cnt}) ->
-    Cnt;
+info(deliver_stats, #session{deliver_stats = Stats}) ->
+    Stats;
 info(created_at, #session{created_at = CreatedAt}) ->
     CreatedAt.
 
@@ -506,7 +510,7 @@ enqueue(Delivers, Session = #session{subscriptions = Subs}) when is_list(Deliver
             || {deliver, Topic, Msg} <- Delivers],
     lists:foldl(fun enqueue/2, Session, Msgs);
 
-enqueue(Msg, Session = #session{mqueue = Q, enqueue_cnt = Cnt})
+enqueue(Msg, Session = #session{mqueue = Q})
   when is_record(Msg, message) ->
     {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
     if is_record(Dropped, message) ->
@@ -514,7 +518,7 @@ enqueue(Msg, Session = #session{mqueue = Q, enqueue_cnt = Cnt})
                 [emqx_message:format(Dropped)]);
        true -> ok
     end,
-    Session#session{mqueue = NewQ, enqueue_cnt = Cnt+1}.
+    inc_deliver_stats(enqueue_cnt, Session#session{mqueue = NewQ}).
 
 %%--------------------------------------------------------------------
 %% Awaiting ACK for QoS1/QoS2 Messages
@@ -638,3 +642,13 @@ next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) ->
 next_pkt_id(Session = #session{next_pkt_id = Id}) ->
     Session#session{next_pkt_id = Id + 1}.
 
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+inc_deliver_stats(Key, Session) ->
+    inc_deliver_stats(Key, 1, Session).
+inc_deliver_stats(Key, I, Session = #session{deliver_stats = Stats}) ->
+    NStats = maps:update_with(Key, fun(V) -> V+I end, I, Stats),
+    Session#session{deliver_stats = NStats}.
+

+ 169 - 14
test/emqx_session_SUITE.erl

@@ -23,22 +23,73 @@
 -include_lib("proper/include/proper.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
+-import(emqx_session, [set_field/3]).
+
 all() -> emqx_ct:all(?MODULE).
 
+%%--------------------------------------------------------------------
+%% CT callbacks
+%%--------------------------------------------------------------------
+
+init_per_testcase(_TestCase, Config) ->
+    %% Meck Broker
+    ok = meck:new(emqx_broker, [passthrough, no_history]),
+    ok = meck:new(emqx_hooks, [passthrough, no_history]),
+    ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
+    Config.
+
+end_per_testcase(_TestCase, Config) ->
+    ok = meck:unload(emqx_broker),
+    ok = meck:unload(emqx_hooks),
+    Config.
+
+%%--------------------------------------------------------------------
+%% Test cases for session init
+%%--------------------------------------------------------------------
+
 t_session_init(_) ->
-    error('TODO').
+    Session = emqx_session:init(#{zone => external}, #{receive_maximum => 64}),
+    ?assertEqual(#{}, emqx_session:info(subscriptions, Session)),
+    ?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)),
+    ?assertEqual(0, emqx_session:info(subscriptions_max, Session)),
+    ?assertEqual(false, emqx_session:info(upgrade_qos, Session)),
+    ?assertEqual(0, emqx_session:info(inflight_cnt, Session)),
+    ?assertEqual(64, emqx_session:info(inflight_max, Session)),
+    ?assertEqual(1, emqx_session:info(next_pkt_id, Session)),
+    ?assertEqual(0, emqx_session:info(retry_interval, Session)),
+    ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)),
+    ?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)),
+    ?assertEqual(3600000, emqx_session:info(awaiting_rel_timeout, Session)),
+    ?assert(is_integer(emqx_session:info(created_at, Session))).
 
 %%--------------------------------------------------------------------
-%% Test cases for info/stats
+%% Test cases for session info/stats
 %%--------------------------------------------------------------------
 
 t_session_info(_) ->
-    error('TODO').
+    ?assertMatch(#{subscriptions := #{},
+                   subscriptions_max := 0,
+                   upgrade_qos := false,
+                   inflight := 0,
+                   inflight_max := 64,
+                   retry_interval := 0,
+                   mqueue_len := 0,
+                   mqueue_max := 1000,
+                   mqueue_dropped := 0,
+                   next_pkt_id := 1,
+                   awaiting_rel := 0,
+                   awaiting_rel_max := 0,
+                   await_rel_timeout := 3600000
+                  }, emqx_session:info(session())).
 
 t_session_attrs(_) ->
+    Attrs = emqx_session:attrs(session()),
+    io:format("~p~n", [Attrs]),
     error('TODO').
 
 t_session_stats(_) ->
+    Stats = emqx_session:stats(session()),
+    io:format("~p~n", [Stats]),
     error('TODO').
 
 %%--------------------------------------------------------------------
@@ -46,36 +97,115 @@ t_session_stats(_) ->
 %%--------------------------------------------------------------------
 
 t_subscribe(_) ->
-    error('TODO').
+    ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
+    ok = meck:expect(emqx_broker, set_subopts, fun(_, _) -> ok end),
+    {ok, Session} = emqx_session:subscribe(
+                      clientinfo(), <<"#">>, subopts(), session()),
+    ?assertEqual(1, emqx_session:info(subscriptions_cnt, Session)).
+
+t_is_subscriptions_full_false(_) ->
+    Session = session(#{max_subscriptions => 0}),
+    ?assertNot(emqx_session:is_subscriptions_full(Session)).
+
+t_is_subscriptions_full_true(_) ->
+    Session = session(#{max_subscriptions => 1}),
+    ?assertNot(emqx_session:is_subscriptions_full(Session)),
+    Subs = #{<<"t1">> => subopts(), <<"t2">> => subopts()},
+    NSession = set_field(subscriptions, Subs, Session),
+    ?assert(emqx_session:is_subscriptions_full(NSession)).
 
 t_unsubscribe(_) ->
-    error('TODO').
+    ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
+    Session = session(#{subscriptions => #{<<"#">> => subopts()}}),
+    {ok, NSession} = emqx_session:unsubscribe(clientinfo(), <<"#">>, Session),
+    Error = emqx_session:unsubscribe(clientinfo(), <<"#">>, NSession),
+    ?assertEqual({error, ?RC_NO_SUBSCRIPTION_EXISTED}, Error).
 
-t_publish_qos0(_) ->
-    error('TODO').
+t_publish_qos2(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
+    Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<"payload">>),
+    {ok, [], Session} = emqx_session:publish(1, Msg, session()),
+    ?assertEqual(awaiting_rel_cnt, emqx_session:info(awaiting_rel_cnt, Session)).
 
 t_publish_qos1(_) ->
-    error('TODO').
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
+    Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
+    {ok, [], Session} = emqx_session:publish(1, Msg, session()).
 
-t_publish_qos2(_) ->
-    error('TODO').
+t_publish_qos0(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
+    Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
+    {ok, [], Session} = emqx_session:publish(0, Msg, session()).
+
+t_is_awaiting_full_false(_) ->
+    ?assertNot(emqx_session:is_awaiting_full(session(#{max_awaiting_rel => 0}))).
+
+t_is_awaiting_full_true(_) ->
+    Session = session(#{max_awaiting_rel => 1,
+                        awaiting_rel => #{1 => 1}
+                       }),
+    ?assert(emqx_session:is_awaiting_full(Session)).
 
 t_puback(_) ->
-    error('TODO').
+    Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
+    Inflight = emqx_inflight:insert(1, {Msg, os:timestamp()}, emqx_inflight:new()),
+    Session = set_field(inflight, Inflight, session()),
+    {ok, Msg, NSession} = emqx_session:puback(1, Session),
+    ?assertEqual([], emqx_session:info(inflight, NSession)).
+
+t_puback_error_packet_id_in_use(_) ->
+    Inflight = emqx_inflight:insert(1, {pubrel, os:timestamp()}, emqx_inflight:new()),
+    Session = set_field(inflight, Inflight, session()),
+    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session).
+
+t_puback_error_packet_id_not_found(_) ->
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()).
 
 t_pubrec(_) ->
-    error('TODO').
+    Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
+    Inflight = emqx_inflight:insert(2, {Msg, os:timestamp()}, emqx_inflight:new()),
+    Session = set_field(inflight, Inflight, session()),
+    {ok, Msg, NSession} = emqx_session:pubrec(2, Session),
+    ?assertMatch([{pubrel, _}], emqx_session:info(inflight, NSession)).
+
+t_pubrec_error_packet_id_in_use(_) ->
+    Inflight = emqx_inflight:insert(1, {pubrel, ts()}, emqx_inflight:new()),
+    Session = set_field(inflight, Inflight, session()),
+    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, session()).
+
+t_pubrec_error_packet_id_not_found(_) ->
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()).
 
 t_pubrel(_) ->
-    error('TODO').
+    Session = set_field(awaiting_rel, #{1 => os:timestamp()}, session()),
+    {ok, NSession} = emqx_session:pubrel(1, Session),
+    ?assertEqual(#{}, emqx_session:info(awaiting_rel, NSession)).
+
+t_pubrel_id_not_found(_) ->
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()).
 
 t_pubcomp(_) ->
-    error('TODO').
+    Inflight = emqx_inflight:insert(2, {pubrel, os:timestamp()}, emqx_inflight:new()),
+    Session = emqx_session:set_field(inflight, Inflight, session()),
+    {ok, NSession} = emqx_session:pubcomp(2, Session),
+    ?assertEqual([], emqx_session:info(inflight, NSession)).
+
+t_pubcomp_id_not_found(_) ->
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(2, session()).
 
 %%--------------------------------------------------------------------
 %% Test cases for deliver/retry
 %%--------------------------------------------------------------------
 
+t_dequeue(_) ->
+    {ok, Session} = emqx_session:dequeue(session()).
+
+t_bach_n(_) ->
+    error('TODO').
+
+t_dequeue_with_msgs(_) ->
+    error('TODO').
+
 t_deliver(_) ->
     error('TODO').
 
@@ -101,3 +231,28 @@ t_redeliver(_) ->
 t_expire(_) ->
     error('TODO').
 
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+session() -> session(#{}).
+session(InitFields) when is_map(InitFields) ->
+    maps:fold(fun(Field, Value, Session) ->
+                      emqx_session:set_field(Field, Value, Session)
+              end,
+              emqx_session:init(#{zone => zone}, #{receive_maximum => 0}),
+              InitFields).
+
+
+clientinfo() -> clientinfo(#{}).
+clientinfo(Init) ->
+    maps:merge(#{clientid => <<"clientid">>,
+                 username => <<"username">>
+                }, Init).
+
+subopts() -> subopts(#{}).
+subopts(Init) ->
+    maps:merge(?DEFAULT_SUBOPTS, Init).
+
+ts() -> erlang:system_time(second).
+