Преглед изворни кода

fix(test): update test cases for emqx_connection_SUITE

Shawn пре 4 година
родитељ
комит
042ff2e0d7

+ 1 - 1
apps/emqx/src/emqx_misc.erl

@@ -198,7 +198,7 @@ check_oom(Policy) ->
 
 -spec(check_oom(pid(), emqx_types:oom_policy()) -> ok | {shutdown, term()}).
 check_oom(_Pid, #{enable := false}) -> ok;
-check_oom(Pid, #{message_queue_len := MaxQLen,
+check_oom(Pid, #{max_message_queue_len := MaxQLen,
                  max_heap_size := MaxHeapSize}) ->
     case process_info(Pid, [message_queue_len, total_heap_size]) of
         undefined -> ok;

+ 18 - 17
apps/emqx/src/emqx_session.erl

@@ -92,8 +92,6 @@
 
 -export_type([session/0]).
 
--import(emqx_zone, [get_env/3]).
-
 -record(session, {
           %% Client’s Subscriptions.
           subscriptions :: map(),
@@ -159,27 +157,28 @@
 %%--------------------------------------------------------------------
 
 -spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()).
-init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
-    #session{max_subscriptions = get_env(Zone, max_subscriptions, 0),
+init(#{zone := Zone, listener := Listener}, #{receive_maximum := MaxInflight}) ->
+    #session{max_subscriptions = get_conf(Zone, Listener, max_subscriptions),
              subscriptions     = #{},
-             upgrade_qos       = get_env(Zone, upgrade_qos, false),
+             upgrade_qos       = get_conf(Zone, Listener, upgrade_qos),
              inflight          = emqx_inflight:new(MaxInflight),
-             mqueue            = init_mqueue(Zone),
+             mqueue            = init_mqueue(Zone, Listener),
              next_pkt_id       = 1,
-             retry_interval    = timer:seconds(get_env(Zone, retry_interval, 0)),
+             retry_interval    = timer:seconds(get_conf(Zone, Listener, retry_interval)),
              awaiting_rel      = #{},
-             max_awaiting_rel  = get_env(Zone, max_awaiting_rel, 100),
-             await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)),
+             max_awaiting_rel  = get_conf(Zone, Listener, max_awaiting_rel),
+             await_rel_timeout = timer:seconds(get_conf(Zone, Listener, await_rel_timeout)),
              created_at        = erlang:system_time(millisecond)
             }.
 
 %% @private init mq
-init_mqueue(Zone) ->
-    emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000),
-                       store_qos0 => get_env(Zone, mqueue_store_qos0, true),
-                       priorities => get_env(Zone, mqueue_priorities, none),
-                       default_priority => get_env(Zone, mqueue_default_priority, lowest)
-                      }).
+init_mqueue(Zone, Listener) ->
+    emqx_mqueue:init(#{
+        max_len => get_conf(Zone, Listener, max_mqueue_len),
+        store_qos0 => get_conf(Zone, Listener, mqueue_store_qos0),
+        priorities => get_conf(Zone, Listener, mqueue_priorities),
+        default_priority => get_conf(Zone, Listener, mqueue_default_priority)
+    }).
 
 %%--------------------------------------------------------------------
 %% Info, Stats
@@ -253,7 +252,7 @@ subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
     end.
 
 -compile({inline, [is_subscriptions_full/1]}).
-is_subscriptions_full(#session{max_subscriptions = 0}) ->
+is_subscriptions_full(#session{max_subscriptions = infinity}) ->
     false;
 is_subscriptions_full(#session{subscriptions = Subs,
                                max_subscriptions = MaxLimit}) ->
@@ -302,7 +301,7 @@ publish(_PacketId, Msg, Session) ->
     {ok, emqx_broker:publish(Msg), Session}.
 
 -compile({inline, [is_awaiting_full/1]}).
-is_awaiting_full(#session{max_awaiting_rel = 0}) ->
+is_awaiting_full(#session{max_awaiting_rel = infinity}) ->
     false;
 is_awaiting_full(#session{awaiting_rel = AwaitingRel,
                           max_awaiting_rel = MaxLimit}) ->
@@ -697,3 +696,5 @@ set_field(Name, Value, Session) ->
     Pos = emqx_misc:index_of(Name, record_info(fields, session)),
     setelement(Pos+1, Session, Value).
 
+get_conf(Zone, Listener, Key) ->
+    emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key]).

+ 5 - 2
apps/emqx/test/emqx_channel_SUITE.erl

@@ -151,7 +151,7 @@ default_zone_conf() ->
                 conn_congestion =>
                     #{enable_alarm => true, min_alarm_sustain_duration => 60000},
                 flapping_detect =>
-                    #{ban_time => 300000,enable => true,
+                    #{ban_time => 300000,enable => false,
                     max_count => 15,window_time => 60000},
                 force_gc =>
                     #{bytes => 16777216,count => 16000,
@@ -168,6 +168,9 @@ default_zone_conf() ->
         }
     }.
 
+set_default_zone_conf() ->
+    emqx_config:put(default_zone_conf()).
+
 %%--------------------------------------------------------------------
 %% CT Callbacks
 %%--------------------------------------------------------------------
@@ -207,7 +210,7 @@ end_per_suite(_Config) ->
                 ]).
 
 init_per_testcase(_TestCase, Config) ->
-    emqx_config:put(default_zone_conf()),
+    set_default_zone_conf(),
     Config.
 
 end_per_testcase(_TestCase, Config) ->

+ 21 - 24
apps/emqx/test/emqx_connection_SUITE.erl

@@ -57,6 +57,7 @@ init_per_suite(Config) ->
     ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
     ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end),
 
+    emqx_channel_SUITE:set_default_zone_conf(),
     Config.
 
 end_per_suite(_Config) ->
@@ -120,14 +121,13 @@ t_info(_) ->
                     end
                 end),
     #{sockinfo := SockInfo} = emqx_connection:info(CPid),
-    ?assertMatch(#{active_n := 100,
-                    peername := {{127,0,0,1},3456},
+    ?assertMatch(#{ peername := {{127,0,0,1},3456},
                     sockname := {{127,0,0,1},1883},
                     sockstate := idle,
                     socktype := tcp}, SockInfo).
 
 t_info_limiter(_) ->
-    St = st(#{limiter => emqx_limiter:init(external, [])}),
+    St = st(#{limiter => emqx_limiter:init(default, [])}),
     ?assertEqual(undefined, emqx_connection:info(limiter, St)).
 
 t_stats(_) ->
@@ -219,8 +219,10 @@ t_handle_msg_deliver(_) ->
 
 t_handle_msg_inet_reply(_) ->
     ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
-    ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))),
-    ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))),
+    emqx_config:put_listener_conf(default, mqtt_tcp, [tcp, active_n], 0),
+    ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())),
+    emqx_config:put_listener_conf(default, mqtt_tcp, [tcp, active_n], 100),
+    ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())),
     ?assertMatch({stop, {shutdown, for_testing}, _St},
                  handle_msg({inet_reply, for_testing, {error, for_testing}}, st())).
 
@@ -331,12 +333,12 @@ t_ensure_rate_limit(_) ->
     ?assertEqual(undefined, emqx_connection:info(limiter, State)),
 
     ok = meck:expect(emqx_limiter, check,
-                     fun(_, _) -> {ok, emqx_limiter:init(external, [])} end),
+                     fun(_, _) -> {ok, emqx_limiter:init(default, [])} end),
     State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
     ?assertEqual(undefined, emqx_connection:info(limiter, State1)),
 
     ok = meck:expect(emqx_limiter, check,
-                     fun(_, _) -> {pause, 3000, emqx_limiter:init(external, [])} end),
+                     fun(_, _) -> {pause, 3000, emqx_limiter:init(default, [])} end),
     State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
     ?assertEqual(undefined, emqx_connection:info(limiter, State2)),
     ?assertEqual(blocked, emqx_connection:info(sockstate, State2)).
@@ -386,8 +388,7 @@ t_start_link_exit_on_activate(_) ->
 t_get_conn_info(_) ->
     with_conn(fun(CPid) ->
                       #{sockinfo := SockInfo} = emqx_connection:info(CPid),
-                      ?assertEqual(#{active_n => 100,
-                                     peername => {{127,0,0,1},3456},
+                      ?assertEqual(#{peername => {{127,0,0,1},3456},
                                      sockname => {{127,0,0,1},1883},
                                      sockstate => running,
                                      socktype => tcp
@@ -397,16 +398,12 @@ t_get_conn_info(_) ->
 t_oom_shutdown(init, Config) ->
     ok = snabbkaffe:start_trace(),
     ok = meck:new(emqx_misc, [non_strict, passthrough, no_history, no_link]),
-    ok = meck:new(emqx_zone, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(emqx_zone, oom_policy,
-                fun(_Zone) -> #{message_queue_len => 10, max_heap_size => 8000000} end),
     meck:expect(emqx_misc, check_oom,
                 fun(_) -> {shutdown, "fake_oom"} end),
     Config;
 t_oom_shutdown('end', _Config) ->
     snabbkaffe:stop(),
     meck:unload(emqx_misc),
-    meck:unload(emqx_zone),
     ok.
 
 t_oom_shutdown(_) ->
@@ -455,13 +452,11 @@ exit_on_activate_error(SockErr, Reason) ->
 with_conn(TestFun) ->
     with_conn(TestFun, #{trap_exit => false}).
 
-with_conn(TestFun, Options) when is_map(Options) ->
-    with_conn(TestFun, maps:to_list(Options));
-
-with_conn(TestFun, Options) ->
-    TrapExit = proplists:get_value(trap_exit, Options, false),
+with_conn(TestFun, Opts) when is_map(Opts) ->
+    TrapExit = maps:get(trap_exit, Opts, false),
     process_flag(trap_exit, TrapExit),
-    {ok, CPid} = emqx_connection:start_link(emqx_transport, sock, Options),
+    {ok, CPid} = emqx_connection:start_link(emqx_transport, sock,
+        maps:merge(Opts, #{zone => default, listener => mqtt_tcp})),
     TestFun(CPid),
     TrapExit orelse emqx_connection:stop(CPid),
     ok.
@@ -483,7 +478,8 @@ st() -> st(#{}, #{}).
 st(InitFields) when is_map(InitFields) ->
     st(InitFields, #{}).
 st(InitFields, ChannelFields) when is_map(InitFields) ->
-    St = emqx_connection:init_state(emqx_transport, sock, [#{zone => external}]),
+    St = emqx_connection:init_state(emqx_transport, sock, #{zone => default,
+        listener => mqtt_tcp}),
     maps:fold(fun(N, V, S) -> emqx_connection:set_field(N, V, S) end,
               emqx_connection:set_field(channel, channel(ChannelFields), St),
               InitFields
@@ -503,7 +499,8 @@ channel(InitFields) ->
                  receive_maximum => 100,
                  expiry_interval => 0
                 },
-    ClientInfo = #{zone       => zone,
+    ClientInfo = #{zone       => default,
+                   listener   => mqtt_tcp,
                    protocol   => mqtt,
                    peerhost   => {127,0,0,1},
                    clientid   => <<"clientid">>,
@@ -512,13 +509,13 @@ channel(InitFields) ->
                    peercert   => undefined,
                    mountpoint => undefined
                   },
-    Session = emqx_session:init(#{zone => external},
+    Session = emqx_session:init(#{zone => default, listener => mqtt_tcp},
                                 #{receive_maximum => 0}
                                ),
     maps:fold(fun(Field, Value, Channel) ->
-                      emqx_channel:set_field(Field, Value, Channel)
+                emqx_channel:set_field(Field, Value, Channel)
               end,
-              emqx_channel:init(ConnInfo, [{zone, zone}]),
+              emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_tcp}),
               maps:merge(#{clientinfo => ClientInfo,
                            session    => Session,
                            conn_state => connected