Quellcode durchsuchen

fix(session): update testcases for emqx_session

Shawn vor 4 Jahren
Ursprung
Commit
a6408cee4f
2 geänderte Dateien mit 39 neuen und 39 gelöschten Zeilen
  1. 12 14
      src/emqx_session.erl
  2. 27 25
      test/emqx_session_SUITE.erl

+ 12 - 14
src/emqx_session.erl

@@ -392,7 +392,7 @@ dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
         true  -> {ok, Session};
         false ->
             {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q),
-            deliver(Msgs, [], Session#session{mqueue = Q1})
+            do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1})
     end.
 
 dequeue(_ClientInfo, 0, Msgs, Q) ->
@@ -422,22 +422,22 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1.
 -spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session())
       -> {ok, session()} | {ok, replies(), session()}).
 deliver(ClientInfo, [Deliver], Session) -> %% Optimize
-    Enrich = enrich_fun(Session),
-    deliver_msg(ClientInfo, Enrich(Deliver), Session);
+    Msg = enrich_delivers(Deliver, Session),
+    deliver_msg(ClientInfo, Msg, Session);
 
 deliver(ClientInfo, Delivers, Session) ->
-    Msgs = lists:map(enrich_fun(Session), Delivers),
-    deliver(ClientInfo, Msgs, [], Session).
+    Msgs = [enrich_delivers(D, Session) || D <- Delivers],
+    do_deliver(ClientInfo, Msgs, [], Session).
 
-deliver(_ClientInfo, [], Publishes, Session) ->
+do_deliver(_ClientInfo, [], Publishes, Session) ->
     {ok, lists:reverse(Publishes), Session};
 
-deliver(ClientInfo, [Msg | More], Acc, Session) ->
+do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
     case deliver_msg(ClientInfo, Msg, Session) of
         {ok, Session1} ->
-            deliver(More, Acc, Session1);
+            do_deliver(ClientInfo, More, Acc, Session1);
         {ok, [Publish], Session1} ->
-            deliver(More, [Publish|Acc], Session1)
+            do_deliver(ClientInfo, More, [Publish|Acc], Session1)
     end.
 
 deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
@@ -462,7 +462,7 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
 -spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(),
               session()) -> session()).
 enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) ->
-    Msgs = lists:map(enrich_fun(Session), Delivers),
+    Msgs = [enrich_delivers(D, Session) || D <- Delivers],
     lists:foldl(fun(Msg, Session0) ->
             enqueue(ClientInfo, Msg, Session0)
         end, Session, Msgs);
@@ -487,10 +487,8 @@ log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
                  [emqx_message:format(Msg)])
     end.
 
-enrich_fun(Session = #session{subscriptions = Subs}) ->
-    fun({deliver, Topic, Msg}) ->
-            enrich_subopts(get_subopts(Topic, Subs), Msg, Session)
-    end.
+enrich_delivers({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) ->
+    enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
 
 maybe_ack(Msg) ->
     case emqx_shared_sub:is_ack_required(Msg) of

+ 27 - 25
test/emqx_session_SUITE.erl

@@ -165,7 +165,7 @@ t_puback(_) ->
     Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
     Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
     Session = session(#{inflight => Inflight, mqueue => mqueue()}),
-    {ok, Msg, Session1} = emqx_session:puback(1, Session),
+    {ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
 
 t_puback_with_dequeue(_) ->
@@ -174,7 +174,7 @@ t_puback_with_dequeue(_) ->
     Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
     {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
     Session = session(#{inflight => Inflight, mqueue => Q}),
-    {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session),
+    {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(clientinfo(), 1, Session),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
     ?assertEqual(0, emqx_session:info(mqueue_len, Session1)),
     ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
@@ -182,10 +182,10 @@ t_puback_with_dequeue(_) ->
 t_puback_error_packet_id_in_use(_) ->
     Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
     {error, ?RC_PACKET_IDENTIFIER_IN_USE} =
-        emqx_session:puback(1, session(#{inflight => Inflight})).
+        emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})).
 
 t_puback_error_packet_id_not_found(_) ->
-    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()).
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(clientinfo(), 1, session()).
 
 t_pubrec(_) ->
     Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
@@ -213,17 +213,17 @@ t_pubrel_error_packetid_not_found(_) ->
 t_pubcomp(_) ->
     Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
-    {ok, Session1} = emqx_session:pubcomp(1, Session),
+    {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
 
 t_pubcomp_error_packetid_in_use(_) ->
     Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
     Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
-    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session).
+    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(clientinfo(), 1, Session).
 
 t_pubcomp_error_packetid_not_found(_) ->
-    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()).
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(clientinfo(), 1, session()).
 
 %%--------------------------------------------------------------------
 %% Test cases for deliver/retry
@@ -231,14 +231,16 @@ t_pubcomp_error_packetid_not_found(_) ->
 
 t_dequeue(_) ->
     Q = mqueue(#{store_qos0 => true}),
-    {ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})),
+    {ok, Session} = emqx_session:dequeue(clientinfo(), session(#{mqueue => Q})),
     Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>),
             emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
             emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>)
            ],
-    Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs),
+    Session1 = lists:foldl(fun(Msg, Session0) ->
+            emqx_session:enqueue(clientinfo(), Msg, Session0)
+        end, Session, Msgs),
     {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} =
-        emqx_session:dequeue(Session1),
+        emqx_session:dequeue(clientinfo(), Session1),
     ?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)),
     ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)),
@@ -253,7 +255,7 @@ t_deliver_qos0(_) ->
                        clientinfo(), <<"t1">>, subopts(), Session),
     Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]],
     {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} =
-        emqx_session:deliver(Deliveries, Session1),
+        emqx_session:deliver(clientinfo(), Deliveries, Session1),
     ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
 
@@ -262,38 +264,38 @@ t_deliver_qos1(_) ->
     {ok, Session} = emqx_session:subscribe(
                        clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
     Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]],
-    {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session),
+    {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
-    {ok, Msg1, Session2} = emqx_session:puback(1, Session1),
+    {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
-    {ok, Msg2, Session3} = emqx_session:puback(2, Session2),
+    {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
 
 t_deliver_qos2(_) ->
     ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
     Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)],
     {ok, [{1, Msg1}, {2, Msg2}], Session} =
-        emqx_session:deliver(Delivers, session()),
+        emqx_session:deliver(clientinfo(), Delivers, session()),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session)),
     ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
 
 t_deliver_one_msg(_) ->
     {ok, [{1, Msg}], Session} =
-        emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()),
+        emqx_session:deliver(clientinfo(), [delivery(?QOS_1, <<"t1">>)], session()),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg)).
 
 t_deliver_when_inflight_is_full(_) ->
     Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
     Session = session(#{inflight => emqx_inflight:new(1)}),
-    {ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session),
+    {ok, Publishes, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
     ?assertEqual(1, length(Publishes)),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
     ?assertEqual(1, emqx_session:info(mqueue_len, Session1)),
-    {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1),
+    {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(clientinfo(), 1, Session1),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
     ?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
@@ -301,18 +303,18 @@ t_deliver_when_inflight_is_full(_) ->
 
 t_enqueue(_) ->
     %% store_qos0 = true
-    Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()),
-    Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>),
+    Session = emqx_session:enqueue(clientinfo(), [delivery(?QOS_0, <<"t0">>)], session()),
+    Session1 = emqx_session:enqueue(clientinfo(), [delivery(?QOS_1, <<"t1">>),
                                      delivery(?QOS_2, <<"t2">>)], Session),
     ?assertEqual(3, emqx_session:info(mqueue_len, Session1)).
 
 t_retry(_) ->
     Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
     Session = session(#{retry_interval => 100}),
-    {ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session),
+    {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
     ok = timer:sleep(200),
     Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
-    {ok, Msgs1, 100, Session2} = emqx_session:retry(Session1),
+    {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
 
 %%--------------------------------------------------------------------
@@ -331,11 +333,11 @@ t_resume(_) ->
 
 t_replay(_) ->
     Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
-    {ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()),
+    {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, session()),
     Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
-    Session2 = emqx_session:enqueue(Msg, Session1),
+    Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1),
     Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
-    {ok, ReplayPubs, Session3} = emqx_session:replay(Session2),
+    {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2),
     ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs),
     ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).