Просмотр исходного кода

Add more test cases for connection, channel and session modules

Feng Lee 6 лет назад
Родитель
Сommit
7117dde879

+ 8 - 4
src/emqx_channel.erl

@@ -615,13 +615,13 @@ handle_out(connack, {?RC_SUCCESS, SP, ConnPkt},
     AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
     case maybe_resume_session(Channel2) of
         ignore ->
-            {ok, [{enter, connected}, {outgoing, AckPacket}], Channel2};
+            {ok, [{connack, AckPacket}], Channel2};
         {ok, Publishes, NSession} ->
             Channel3 = Channel2#channel{session  = NSession,
                                         resuming = false,
                                         pendings = []},
             {ok, {outgoing, Packets}, _} = handle_out({publish, Publishes}, Channel3),
-            {ok, [{enter, connected}, {outgoing, [AckPacket|Packets]}], Channel3}
+            {ok, [{connack, AckPacket}, {outgoing, Packets}], Channel3}
     end;
 
 handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
@@ -880,7 +880,7 @@ interval(alive_timer, #channel{keepalive = KeepAlive}) ->
 interval(retry_timer, #channel{session = Session}) ->
     emqx_session:info(retry_interval, Session);
 interval(await_timer, #channel{session = Session}) ->
-    emqx_session:info(await_rel_timeout, Session);
+    emqx_session:info(awaiting_rel_timeout, Session);
 interval(expire_timer, #channel{conninfo = ConnInfo}) ->
     timer:seconds(maps:get(expiry_interval, ConnInfo));
 interval(will_timer, #channel{will_msg = WillMsg}) ->
@@ -1068,13 +1068,13 @@ check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
     end.
 
 %% Check Pub Alias
+%% TODO: Fixme later
 check_pub_alias(#mqtt_packet{
                    variable = #mqtt_packet_publish{
                                  properties = #{'Topic-Alias' := AliasId}
                                 }
                   },
                 #channel{alias_maximum = Limits}) ->
-    %% TODO: Move to Protocol
     case (Limits == undefined)
             orelse (Max = maps:get(inbound, Limits, 0)) == 0
                 orelse (AliasId > Max) of
@@ -1219,10 +1219,14 @@ reply(Reply, Channel) ->
     {reply, Reply, Channel}.
 
 -compile({inline, [shutdown/2]}).
+shutdown(success, Channel) ->
+    shutdown(normal, Channel);
 shutdown(Reason, Channel) ->
     {shutdown, Reason, Channel}.
 
 -compile({inline, [shutdown/3]}).
+shutdown(success, Reply, Channel) ->
+    shutdown(normal, Reply, Channel);
 shutdown(Reason, Reply, Channel) ->
     {shutdown, Reason, Reply, Channel}.
 

+ 14 - 3
src/emqx_connection.erl

@@ -516,9 +516,20 @@ send(IoData, State = #state{transport = Transport,
 %%--------------------------------------------------------------------
 %% Handle Info
 
-handle_info({enter, _}, State = #state{active_n  = ActiveN,
-                                       sockstate = SockSt,
-                                       channel   = Channel}) ->
+handle_info({connack, ConnAck}, State = #state{active_n  = ActiveN,
+                                                sockstate = SockSt,
+                                                channel   = Channel}) ->
+    NState = handle_outgoing(ConnAck, State),
+    ChanAttrs = emqx_channel:attrs(Channel),
+    SockAttrs = #{active_n  => ActiveN,
+                  sockstate => SockSt
+                 },
+    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
+    handle_info({register, Attrs, stats(State)}, NState);
+
+handle_info({enter, disconnected}, State = #state{active_n  = ActiveN,
+                                                sockstate = SockSt,
+                                                channel   = Channel}) ->
     ChanAttrs = emqx_channel:attrs(Channel),
     SockAttrs = #{active_n  => ActiveN,
                   sockstate => SockSt

+ 18 - 13
src/emqx_session.erl

@@ -116,7 +116,7 @@
           %% Max Packets Awaiting PUBREL
           max_awaiting_rel :: non_neg_integer(),
           %% Awaiting PUBREL Timeout
-          await_rel_timeout :: timeout(),
+          awaiting_rel_timeout :: timeout(),
           %% Deliver Stats
           deliver_stats :: emqx_types:stats(),
           %% Created at
@@ -135,7 +135,7 @@
                     mqueue_max,
                     retry_interval,
                     awaiting_rel_max,
-                    await_rel_timeout,
+                    awaiting_rel_timeout,
                     created_at
                    ]).
 
@@ -151,7 +151,7 @@
                     next_pkt_id,
                     awaiting_rel,
                     awaiting_rel_max,
-                    await_rel_timeout,
+                    awaiting_rel_timeout,
                     created_at
                    ]).
 
@@ -183,7 +183,7 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
              retry_interval    = get_env(Zone, retry_interval, 0),
              awaiting_rel      = #{},
              max_awaiting_rel  = get_env(Zone, max_awaiting_rel, 100),
-             await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000),
+             awaiting_rel_timeout = get_env(Zone, awaiting_rel_timeout, 3600*1000),
              created_at        = erlang:system_time(second)
             }.
 
@@ -236,13 +236,17 @@ info(mqueue_dropped, #session{mqueue = MQueue}) ->
 info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
     PacketId;
 info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
-    maps:values(AwaitingRel);
-info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
     AwaitingRel;
+info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
+    maps:size(AwaitingRel);
 info(awaiting_rel_max, #session{max_awaiting_rel = MaxAwaitingRel}) ->
     MaxAwaitingRel;
-info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
+info(awaiting_rel_timeout, #session{awaiting_rel_timeout = Timeout}) ->
     Timeout;
+info(enqueue_cnt, #session{deliver_stats = undefined}) ->
+    0;
+info(enqueue_cnt, #session{deliver_stats = Stats}) ->
+    maps:get(enqueue_cnt, Stats, 0);
 info(deliver_stats, #session{deliver_stats = Stats}) ->
     Stats;
 info(created_at, #session{created_at = CreatedAt}) ->
@@ -338,8 +342,7 @@ unsubscribe(ClientInfo, TopicFilter, Session = #session{subscriptions = Subs}) -
 %%--------------------------------------------------------------------
 
 -spec(publish(emqx_types:packet_id(), emqx_types:message(), session())
-      -> {ok, emqx_types:publish_result()} |
-         {ok, emqx_types:publish_result(), session()} |
+      -> {ok, emqx_types:publish_result(), session()} |
          {error, emqx_types:reason_code()}).
 publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) ->
     case is_awaiting_full(Session) of
@@ -350,8 +353,8 @@ publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) ->
     end;
 
 %% Publish QoS0/1 directly
-publish(_PacketId, Msg, _Session) ->
-    {ok, emqx_broker:publish(Msg)}.
+publish(_PacketId, Msg, Session) ->
+    {ok, emqx_broker:publish(Msg), Session}.
 
 is_awaiting_full(#session{max_awaiting_rel = 0}) ->
     false;
@@ -621,11 +624,11 @@ expire_awaiting_rel([], _Now, Session) ->
 
 expire_awaiting_rel([{PacketId, Ts} | More], Now,
                     Session = #session{awaiting_rel = AwaitingRel,
-                                       await_rel_timeout = Timeout}) ->
+                                       awaiting_rel_timeout = Timeout}) ->
     case (timer:now_diff(Now, Ts) div 1000) of
         Age when Age >= Timeout ->
             ok = emqx_metrics:inc('messages.qos2.expired'),
-            ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
+            ?LOG(warning, "Dropped qos2 packet ~s for awaiting_rel_timeout", [PacketId]),
             Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)},
             expire_awaiting_rel(More, Now, Session1);
         Age ->
@@ -648,6 +651,8 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
 
 inc_deliver_stats(Key, Session) ->
     inc_deliver_stats(Key, 1, Session).
+inc_deliver_stats(Key, I, Session = #session{deliver_stats = undefined}) ->
+    Session#session{deliver_stats = #{Key => I}};
 inc_deliver_stats(Key, I, Session = #session{deliver_stats = Stats}) ->
     NStats = maps:update_with(Key, fun(V) -> V+I end, I, Stats),
     Session#session{deliver_stats = NStats}.

+ 11 - 2
src/emqx_ws_connection.erl

@@ -93,7 +93,9 @@ info(sockname, #state{sockname = Sockname}) ->
 info(sockstate, #state{sockstate = SockSt}) ->
     SockSt;
 info(channel, #state{channel = Channel}) ->
-    emqx_channel:info(Channel).
+    emqx_channel:info(Channel);
+info(stop_reason, #state{stop_reason = Reason}) ->
+    Reason.
 
 -spec(stats(pid()|state()) -> emqx_types:stats()).
 stats(WsPid) when is_pid(WsPid) ->
@@ -290,7 +292,14 @@ handle_call(From, Req, State = #state{channel = Channel}) ->
 %%--------------------------------------------------------------------
 %% Handle Info
 
-handle_info({enter, _}, State = #state{channel = Channel}) ->
+handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
+    ChanAttrs = emqx_channel:attrs(Channel),
+    SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
+    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
+    ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel),
+    reply(enqueue(ConnAck, State));
+
+handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
     ChanAttrs = emqx_channel:attrs(Channel),
     SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
     Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),

+ 102 - 59
test/emqx_channel_SUITE.erl

@@ -51,21 +51,34 @@ end_per_suite(_Config) ->
     ok.
 
 init_per_testcase(_TestCase, Config) ->
+    %% CM Meck
+    ok = meck:new(emqx_cm, [passthrough, no_history]),
+    %% Access Control Meck
+    ok = meck:new(emqx_access_control, [passthrough, no_history]),
+    ok = meck:expect(emqx_access_control, authenticate,
+                     fun(_) -> {ok, #{auth_result => success}} end),
+    ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
+    %% Broker Meck
     ok = meck:new(emqx_broker, [passthrough, no_history]),
+    %% Hooks Meck
     ok = meck:new(emqx_hooks, [passthrough, no_history]),
     ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
     ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
+    %% Session Meck
     ok = meck:new(emqx_session, [passthrough, no_history]),
+    %% Metrics
     ok = meck:new(emqx_metrics, [passthrough, no_history]),
     ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
     ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
     Config.
 
 end_per_testcase(_TestCase, Config) ->
+    ok = meck:unload(emqx_access_control),
     ok = meck:unload(emqx_metrics),
     ok = meck:unload(emqx_session),
     ok = meck:unload(emqx_broker),
     ok = meck:unload(emqx_hooks),
+    ok = meck:unload(emqx_cm),
     Config.
 
 %%--------------------------------------------------------------------
@@ -106,9 +119,12 @@ t_chan_init(_) ->
 %%--------------------------------------------------------------------
 
 t_handle_in_connect_packet_sucess(_) ->
-    ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}),
-    {ok, {connack, ConnAck}, Channel}
-      = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), channel()),
+    ok = meck:expect(emqx_cm, open_session,
+                     fun(true, _ClientInfo, _ConnInfo) ->
+                             {ok, #{session => session(), present => false}}
+                     end),
+    {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, 0)}], Channel}
+        = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), channel(#{conn_state => idle})),
     ClientInfo = emqx_channel:info(clientinfo, Channel),
     ?assertMatch(#{clientid := <<"clientid">>,
                    username := <<"username">>
@@ -117,8 +133,8 @@ t_handle_in_connect_packet_sucess(_) ->
 
 t_handle_in_unexpected_connect_packet(_) ->
     Channel = emqx_channel:set_field(conn_state, connected, channel()),
-    Result = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel),
-    ?assertEqual({shutdown, protocol_error, Channel}, Result).
+    {shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Channel}
+        = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
 
 t_handle_in_qos0_publish(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
@@ -133,15 +149,16 @@ t_handle_in_qos1_publish(_) ->
     Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
     {ok, ?PUBACK_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel),
     ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
-    ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
+    ?assertEqual(#{publish_in => 1, puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
 
 t_handle_in_qos2_publish(_) ->
     ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end),
+    ok = meck:expect(emqx_session, info, fun(awaiting_rel_timeout, _Session) -> 300000 end),
     Channel = channel(#{conn_state => connected}),
     Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
     {ok, ?PUBREC_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel),
     ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
-    ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
+    ?assertEqual(#{publish_in => 1, pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
 
 t_handle_in_puback_ok(_) ->
     Msg = emqx_message:make(<<"t">>, <<"payload">>),
@@ -156,18 +173,16 @@ t_handle_in_puback_id_in_use(_) ->
                      fun(_, _Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_IN_USE}
                      end),
-    Channel = channel(#{conn_state => connected}),
-    PubAck = ?PUBACK_PACKET(1, ?RC_SUCCESS),
-    {ok, Channel} = emqx_channel:handle_in(PubAck, Channel).
+    {ok, Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
 
 t_handle_in_puback_id_not_found(_) ->
     ok = meck:expect(emqx_session, puback,
                      fun(_, _Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
                      end),
-    Channel = channel(#{conn_state => connected}),
-    PubAck = ?PUBACK_PACKET(1, ?RC_SUCCESS),
-    {ok, Channel} = emqx_channel:handle_in(PubAck, Channel).
+    {ok, Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
 
 t_handle_in_pubrec_ok(_) ->
     Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>),
@@ -183,18 +198,20 @@ t_handle_in_pubrec_id_in_use(_) ->
                      fun(_, Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_IN_USE}
                      end),
-    Channel = channel(#{conn_state => connected}),
     {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel}
-        = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
+        = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
+                 emqx_channel:info(pub_stats, Channel)).
 
 t_handle_in_pubrec_id_not_found(_) ->
     ok = meck:expect(emqx_session, pubrec,
                      fun(_, Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
                      end),
-    Channel = channel(#{conn_state => connected}),
     {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel}
-        = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
+        = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
+                 emqx_channel:info(pub_stats, Channel)).
 
 t_handle_in_pubrel_ok(_) ->
     ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end),
@@ -209,15 +226,13 @@ t_handle_in_pubrel_not_found_error(_) ->
                      fun(_PacketId, _Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
                      end),
-    Channel = channel(#{conn_state => connected}),
-    {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel}
-        = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
+    {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel}
+        = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
 
 t_handle_in_pubcomp_ok(_) ->
     ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end),
-    Channel = channel(#{conn_state => connected}),
-    {ok, Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel),
-    ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)).
+    {ok, Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)).
 
 t_handle_in_pubcomp_not_found_error(_) ->
     ok = meck:expect(emqx_session, pubcomp,
@@ -270,35 +285,38 @@ t_handle_in_frame_error(_) ->
     {shutdown, frame_too_large, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), _}
         = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
     ConnectedChan = channel(#{conn_state => connected}),
-    {shutdown, frame_too_large, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _}
+    {shutdown, malformed_Packet, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _}
         = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
     DisconnectedChan = channel(#{conn_state => disconnected}),
     {ok, DisconnectedChan}
         = emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
 
+%% TODO:
 t_handle_in_expected_packet(_) ->
-    {ok, _Chan} = emqx_channel:handle_in(packet, channel()).
+    {shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _Chan}
+        = emqx_channel:handle_in(packet, channel()).
 
 t_process_connect(_) ->
     ok = meck:expect(emqx_cm, open_session,
                      fun(true, _ClientInfo, _ConnInfo) ->
                              {ok, #{session => session(), present => false}}
                      end),
-    ConnPkt = connpkt(),
-    {ok, ?CONNACK_PACKET(?RC_SUCCESS), ConnPkt, _}
-        = emqx_channel:process_connect(ConnPkt, channel()).
+    {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Channel}
+        = emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
 
-t_handle_publish(_) ->
-    Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
-    {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _Channel}
-        = emqx_channel:handle_publish(Publish, channel()).
+t_handle_publish_qos0(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
+    Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
+    {ok, _Channel} = emqx_channel:handle_publish(Publish, channel()).
 
 t_process_publish_qos1(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
     Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
-    {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _Channel}
+    {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel}
         = emqx_channel:process_publish(1, Msg, channel()).
 
 t_process_subscribe(_) ->
+    ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
     TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
     {[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, channel()).
 
@@ -312,11 +330,17 @@ t_process_unsubscribe(_) ->
 %%--------------------------------------------------------------------
 
 t_handle_out_delivers(_) ->
-    ok = emqx_meck:expect(emqx_session, deliver,
-                          fun(Delivers, Session) ->
-                                  Msgs = [Msg || {deliver, _, Msg} <- Delivers],
-                                  [{publish, PacketId, Msg} || {PacketId, Msg} <- lists:zip(lists:seq(1, length(Msgs)), Msgs)]
-                          end),
+    WithPacketId = fun(Msgs) ->
+                           lists:zip(lists:seq(1, length(Msgs)), Msgs)
+                   end,
+    ok = meck:expect(emqx_session, deliver,
+                     fun(Delivers, Session) ->
+                             Msgs = [Msg || {deliver, _, Msg} <- Delivers],
+                             Publishes = [{publish, PacketId, Msg}
+                                          || {PacketId, Msg} <- WithPacketId(Msgs)],
+                             {ok, Publishes, Session}
+                     end),
+    ok = meck:expect(emqx_session, info, fun(retry_interval, _Session) -> 20000 end),
     Msg0 = emqx_message:make(test, ?QOS_1, <<"t1">>, <<"qos1">>),
     Msg1 = emqx_message:make(test, ?QOS_2, <<"t2">>, <<"qos2">>),
     Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
@@ -334,7 +358,7 @@ t_handle_out_publishes(_) ->
 
 t_handle_out_publish(_) ->
     Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
-    {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, <<"payload">>), _Channel}
+    {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan}
         = emqx_channel:handle_out({publish, 1, Msg}, channel()).
 
 t_handle_out_publish_nl(_) ->
@@ -345,13 +369,12 @@ t_handle_out_publish_nl(_) ->
     {ok, Channel} = emqx_channel:handle_out(Publish, Channel).
 
 t_handle_out_connack_sucess(_) ->
-    Channel = channel(#{conn_state => connected}),
-    {ok, {connack, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}, _Chan}
-        = emqx_channel:handle_out({connack, ?RC_SUCCESS, 0, connpkt()}, Channel).
+    {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan}
+        = emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()).
 
 t_handle_out_connack_failure(_) ->
     {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan}
-        = emqx_channel:handle_out({connack, ?RC_NOT_AUTHORIZED, connpkt()}, channel()).
+        = emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()).
 
 t_handle_out_puback(_) ->
     Channel = channel(#{conn_state => connected}),
@@ -362,33 +385,33 @@ t_handle_out_puback(_) ->
 t_handle_out_pubrec(_) ->
     Channel = channel(#{conn_state => connected}),
     {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), NChannel}
-        = emqx_channel:handle_out({pubrec, 1, ?RC_SUCCESS}, Channel),
+        = emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel),
     ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
 
 t_handle_out_pubrel(_) ->
     Channel = channel(#{conn_state => connected}),
     {ok, ?PUBREL_PACKET(1), Channel1}
-        = emqx_channel:handle_out({pubrel, 1, ?RC_SUCCESS}, Channel),
+        = emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
     {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), Channel2}
-        = emqx_channel:handle_out({pubrel, 2, ?RC_SUCCESS}, Channel1),
+        = emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1),
     ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)).
 
 t_handle_out_pubcomp(_) ->
     {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel}
-        = emqx_channel:handle_out({pubcomp, 2, ?RC_SUCCESS}, channel()),
+        = emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()),
     ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel)).
 
 t_handle_out_suback(_) ->
     {ok, ?SUBACK_PACKET(1, [?QOS_2]), _Channel}
-        = emqx_channel:handle_out({suback, 1, [?QOS_2]}, channel()).
+        = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
 
 t_handle_out_unsuback(_) ->
     {ok, ?UNSUBACK_PACKET(1, [?RC_SUCCESS]), _Channel}
-        = emqx_channel:handle_out({unsuback, 1, [?RC_SUCCESS]}, channel()).
+        = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
 
 t_handle_out_disconnect(_) ->
     {shutdown, normal, ?DISCONNECT_PACKET(?RC_SUCCESS), _Chan}
-        = emqx_channel:handle_out({disconnect, ?RC_SUCCESS}, channel()).
+        = emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
 
 t_handle_out_unexpected(_) ->
     {ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()).
@@ -430,26 +453,40 @@ t_handle_info_unsubscribe(_) ->
     {ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()).
 
 t_handle_info_sock_closed(_) ->
-    {ok, _Chan} = emqx_channel:handle_out({sock_closed, reason}, channel(#{conn_state => disconnected})).
+    {ok, _Chan} = emqx_channel:handle_out({sock_closed, reason},
+                                          channel(#{conn_state => disconnected})).
 
 %%--------------------------------------------------------------------
 %% Test cases for handle_timeout
 %%--------------------------------------------------------------------
 
 t_handle_timeout_emit_stats(_) ->
-    {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {emit_stats, []}, channel()).
+    ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end),
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{stats_timer => TRef}, channel()),
+    {ok, _Chan} = emqx_channel:handle_timeout(TRef, {emit_stats, []}, Channel).
 
 t_handle_timeout_keepalive(_) ->
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{alive_timer => TRef}, channel()),
     {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, channel()).
 
 t_handle_timeout_retry_delivery(_) ->
-    {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), retry_delivery, channel()).
+    ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end),
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
+    {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, channel()).
 
 t_handle_timeout_expire_awaiting_rel(_) ->
-    {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), expire_awaiting_rel, channel()).
+    ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end),
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()),
+    {ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel).
 
 t_handle_timeout_expire_session(_) ->
-    {shutdown, expired, _Chan} = emqx_channel:handle_timeout(make_ref(), expire_awaiting_rel, channel()).
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{expire_timer => TRef}, channel()),
+    {shutdown, expired, _Chan} = emqx_channel:handle_timeout(TRef, expire_session, Channel).
 
 t_handle_timeout_will_message(_) ->
     {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), will_message, channel()).
@@ -471,7 +508,6 @@ t_check_flapping(_) ->
     ok = emqx_channel:check_flapping(connpkt(), channel()).
 
 t_auth_connect(_) ->
-    ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {ok, #{}} end),
     {ok, _Chan} = emqx_channel:auth_connect(connpkt(), channel()).
 
 t_process_alias(_) ->
@@ -481,15 +517,22 @@ t_process_alias(_) ->
         = emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
 
 t_check_pub_acl(_) ->
+    ok = meck:new(emqx_zone, [passthrough, no_history]),
+    ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end),
     Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
-    ok = emqx_channel:check_pub_acl(Publish, channel()).
+    ok = emqx_channel:check_pub_acl(Publish, channel()),
+    ok = meck:unload(emqx_zone).
 
 t_check_pub_alias(_) ->
     Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
-    ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, channel()).
+    Channel = emqx_channel:set_field(alias_maximum, #{inbound => 10}, channel()),
+    ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel).
 
 t_check_subscribe(_) ->
-    ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()).
+    ok = meck:new(emqx_zone, [passthrough, no_history]),
+    ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end),
+    ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()),
+    ok = meck:unload(emqx_zone).
 
 t_enrich_caps(_) ->
     ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),

+ 35 - 31
test/emqx_session_SUITE.erl

@@ -48,7 +48,7 @@ end_per_testcase(_TestCase, Config) ->
 %%--------------------------------------------------------------------
 
 t_session_init(_) ->
-    Session = emqx_session:init(#{zone => external}, #{receive_maximum => 64}),
+    Session = emqx_session:init(#{zone => zone}, #{receive_maximum => 64}),
     ?assertEqual(#{}, emqx_session:info(subscriptions, Session)),
     ?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)),
     ?assertEqual(0, emqx_session:info(subscriptions_max, Session)),
@@ -67,30 +67,28 @@ t_session_init(_) ->
 %%--------------------------------------------------------------------
 
 t_session_info(_) ->
+    Info = emqx_session:info(session()),
     ?assertMatch(#{subscriptions := #{},
                    subscriptions_max := 0,
                    upgrade_qos := false,
-                   inflight := 0,
-                   inflight_max := 64,
+                   inflight_max := 0,
                    retry_interval := 0,
                    mqueue_len := 0,
                    mqueue_max := 1000,
                    mqueue_dropped := 0,
                    next_pkt_id := 1,
-                   awaiting_rel := 0,
-                   awaiting_rel_max := 0,
-                   await_rel_timeout := 3600000
-                  }, emqx_session:info(session())).
+                   awaiting_rel := #{},
+                   awaiting_rel_max := 100,
+                   awaiting_rel_timeout := 3600000
+                  }, Info).
 
 t_session_attrs(_) ->
     Attrs = emqx_session:attrs(session()),
-    io:format("~p~n", [Attrs]),
-    error('TODO').
+    io:format("~p~n", [Attrs]).
 
 t_session_stats(_) ->
     Stats = emqx_session:stats(session()),
-    io:format("~p~n", [Stats]),
-    error('TODO').
+    io:format("~p~n", [Stats]).
 
 %%--------------------------------------------------------------------
 %% Test cases for pub/sub
@@ -125,7 +123,7 @@ t_publish_qos2(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
     Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<"payload">>),
     {ok, [], Session} = emqx_session:publish(1, Msg, session()),
-    ?assertEqual(awaiting_rel_cnt, emqx_session:info(awaiting_rel_cnt, Session)).
+    ?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)).
 
 t_publish_qos1(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
@@ -151,7 +149,7 @@ t_puback(_) ->
     Inflight = emqx_inflight:insert(1, {Msg, os:timestamp()}, emqx_inflight:new()),
     Session = set_field(inflight, Inflight, session()),
     {ok, Msg, NSession} = emqx_session:puback(1, Session),
-    ?assertEqual([], emqx_session:info(inflight, NSession)).
+    ?assertEqual(0, emqx_session:info(inflight_cnt, NSession)).
 
 t_puback_error_packet_id_in_use(_) ->
     Inflight = emqx_inflight:insert(1, {pubrel, os:timestamp()}, emqx_inflight:new()),
@@ -166,14 +164,14 @@ t_pubrec(_) ->
     Inflight = emqx_inflight:insert(2, {Msg, os:timestamp()}, emqx_inflight:new()),
     Session = set_field(inflight, Inflight, session()),
     {ok, Msg, NSession} = emqx_session:pubrec(2, Session),
-    ?assertMatch([{pubrel, _}], emqx_session:info(inflight, NSession)).
+    ?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, NSession))).
 
-t_pubrec_error_packet_id_in_use(_) ->
+t_pubrec_packet_id_in_use_error(_) ->
     Inflight = emqx_inflight:insert(1, {pubrel, ts()}, emqx_inflight:new()),
     Session = set_field(inflight, Inflight, session()),
-    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, session()).
+    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session).
 
-t_pubrec_error_packet_id_not_found(_) ->
+t_pubrec_packet_id_not_found_error(_) ->
     {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()).
 
 t_pubrel(_) ->
@@ -188,7 +186,7 @@ t_pubcomp(_) ->
     Inflight = emqx_inflight:insert(2, {pubrel, os:timestamp()}, emqx_inflight:new()),
     Session = emqx_session:set_field(inflight, Inflight, session()),
     {ok, NSession} = emqx_session:pubcomp(2, Session),
-    ?assertEqual([], emqx_session:info(inflight, NSession)).
+    ?assertEqual(0, emqx_session:info(inflight_cnt, NSession)).
 
 t_pubcomp_id_not_found(_) ->
     {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(2, session()).
@@ -200,36 +198,39 @@ t_pubcomp_id_not_found(_) ->
 t_dequeue(_) ->
     {ok, Session} = emqx_session:dequeue(session()).
 
-t_bach_n(_) ->
-    error('TODO').
-
-t_dequeue_with_msgs(_) ->
-    error('TODO').
-
 t_deliver(_) ->
-    error('TODO').
+    Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
+    {ok, Publishes, _Session} = emqx_session:deliver(Delivers, session()),
+    ?assertEqual(2, length(Publishes)).
 
 t_enqueue(_) ->
-    error('TODO').
+    Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
+    Session = emqx_session:enqueue(Delivers, session()),
+    ?assertEqual(2, emqx_session:info(mqueue_len, Session)).
 
 t_retry(_) ->
-    error('TODO').
+    {ok, _Session} = emqx_session:retry(session()).
 
 %%--------------------------------------------------------------------
 %% Test cases for takeover/resume
 %%--------------------------------------------------------------------
 
 t_takeover(_) ->
-    error('TODO').
+    ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
+    Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
+    ok = emqx_session:takeover(Session).
 
 t_resume(_) ->
-    error('TODO').
+    ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
+    Subs = #{<<"t">> => ?DEFAULT_SUBOPTS},
+    Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
+    ok = emqx_session:resume(<<"clientid">>, Session).
 
 t_redeliver(_) ->
-    error('TODO').
+    {ok, [], _Session} = emqx_session:redeliver(session()).
 
 t_expire(_) ->
-    error('TODO').
+    {ok, _Session} = emqx_session:expire(awaiting_rel, session()).
 
 %%--------------------------------------------------------------------
 %% Helper functions
@@ -254,5 +255,8 @@ subopts() -> subopts(#{}).
 subopts(Init) ->
     maps:merge(?DEFAULT_SUBOPTS, Init).
 
+delivery(QoS, Topic) ->
+    {deliver, Topic, emqx_message:make(test, QoS, Topic, <<"payload">>)}.
+
 ts() -> erlang:system_time(second).
 

+ 29 - 14
test/emqx_ws_connection_SUITE.erl

@@ -120,8 +120,8 @@ t_websocket_handle_ping_pong(_) ->
 
 t_websocket_handle_bad_frame(_) ->
     with_ws_conn(fun(WsConn) ->
-                         {stop, {shutdown, unexpected_ws_frame}, WsConn}
-                            = websocket_handle({badframe, <<>>}, WsConn)
+                         {stop, WsConn1} = websocket_handle({badframe, <<>>}, WsConn),
+                         ?assertEqual({shutdown, unexpected_ws_frame}, stop_reason(WsConn1))
                  end).
 
 t_websocket_info_call(_) ->
@@ -132,11 +132,11 @@ t_websocket_info_call(_) ->
                  end).
 
 t_websocket_info_cast(_) ->
-    with_ws_conn(fun(WsConn) ->
-                         websocket_info({cast, msg}, WsConn)
-                 end).
+    ok = meck:expect(emqx_channel, handle_info, fun(_Msg, Channel) -> {ok, Channel} end),
+    with_ws_conn(fun(WsConn) -> websocket_info({cast, msg}, WsConn) end).
 
 t_websocket_info_incoming(_) ->
+    ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
     with_ws_conn(fun(WsConn) ->
                          Connect = ?CONNECT_PACKET(
                                       #mqtt_packet_connect{proto_ver   = ?MQTT_PROTO_V5,
@@ -146,14 +146,18 @@ t_websocket_info_incoming(_) ->
                                                            keepalive   = 60}),
                          {ok, WsConn1} = websocket_info({incoming, Connect}, WsConn),
                          Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
-                         {ok, WsConn2} = websocket_info({incoming, Publish}, WsConn1)
+                         {ok, _WsConn2} = websocket_info({incoming, Publish}, WsConn1)
                  end).
 
 t_websocket_info_deliver(_) ->
     with_ws_conn(fun(WsConn) ->
-                         Msg = emqx_message:make(<<"topic">>, <<"payload">>),
-                         Deliver = {deliver, <<"#">>, Msg},
-                         {ok, WsConn1} = websocket_info(Deliver, WsConn)
+                         ok = meck:expect(emqx_channel, handle_out,
+                                          fun(Delivers, Channel) ->
+                                                  Packets = [emqx_message:to_packet(1, Msg) || {deliver, _, Msg} <- Delivers],
+                                                  {ok, {outgoing, Packets}, Channel}
+                                          end),
+                         Deliver = {deliver, <<"#">>, emqx_message:make(<<"topic">>, <<"payload">>)},
+                         {reply, {binary, _Data}, _WsConn1} = websocket_info(Deliver, WsConn)
                  end).
 
 t_websocket_info_timeout(_) ->
@@ -165,23 +169,31 @@ t_websocket_info_timeout(_) ->
 
 t_websocket_info_close(_) ->
     with_ws_conn(fun(WsConn) ->
-                         {stop, {shutdown, sock_error}, WsConn} = websocket_info({close, sock_error}, WsConn)
+                         {stop, WsConn1} = websocket_info({close, sock_error}, WsConn),
+                         ?assertEqual({shutdown, sock_error}, stop_reason(WsConn1))
                  end).
 
 t_websocket_info_shutdown(_) ->
     with_ws_conn(fun(WsConn) ->
-                         {stop, {shutdown, reason}, WsConn} = websocket_info({shutdown, reason}, WsConn)
+                         {stop, WsConn1} = websocket_info({shutdown, reason}, WsConn),
+                         ?assertEqual({shutdown, reason}, stop_reason(WsConn1))
                  end).
 
+
 t_websocket_info_stop(_) ->
     with_ws_conn(fun(WsConn) ->
-                         {stop, normal, WsConn} = websocket_info({stop, normal}, WsConn)
+                         {stop, WsConn1} = websocket_info({stop, normal}, WsConn),
+                         ?assertEqual(normal, stop_reason(WsConn1))
                  end).
 
 t_websocket_close(_) ->
+    ok = meck:expect(emqx_channel, handle_info,
+                     fun({sock_closed, badframe}, Channel) ->
+                             {shutdown, sock_closed, Channel}
+                     end),
     with_ws_conn(fun(WsConn) ->
-                         {stop, sock_closed, WsConn}
-                            = emqx_ws_connection:websocket_close(badframe, WsConn)
+                         {stop, WsConn1} = emqx_ws_connection:websocket_close(badframe, WsConn),
+                         ?assertEqual(sock_closed, stop_reason(WsConn1))
                  end).
 
 t_handle_call(_) ->
@@ -217,3 +229,6 @@ with_ws_conn(TestFun, Opts) ->
                      [req, emqx_misc:merge_opts([{zone, external}], Opts)]),
     TestFun(WsConn).
 
+stop_reason(WsConn) ->
+    emqx_ws_connection:info(stop_reason, WsConn).
+