Просмотр исходного кода

Merge branch 'develop' into bump-version

tigercl 6 лет назад
Родитель
Сommit
6288cd1345

+ 5 - 7
include/emqx.hrl

@@ -154,15 +154,13 @@
 %% Banned
 %%--------------------------------------------------------------------
 
--type(banned_who() ::  {clientid,  binary()}
-                     | {username,   binary()}
-                     | {ip_address, inet:ip_address()}).
-
 -record(banned, {
-          who    :: banned_who(),
-          reason :: binary(),
+          who    :: {clientid,  binary()}
+                  | {username,   binary()}
+                  | {ip_address, inet:ip_address()},
           by     :: binary(),
-          desc   :: binary(),
+          reason :: binary(),
+          at     :: integer(),
           until  :: integer()
         }).
 

+ 29 - 11
src/emqx_banned.erl

@@ -33,7 +33,7 @@
 -export([start_link/0, stop/0]).
 
 -export([ check/1
-        , add/1
+        , create/1
         , delete/1
         , info/1
         ]).
@@ -74,21 +74,39 @@ start_link() ->
 stop() -> gen_server:stop(?MODULE).
 
 -spec(check(emqx_types:clientinfo()) -> boolean()).
-check(#{clientid := ClientId,
-        username := Username,
-        peerhost := IPAddr}) ->
-    ets:member(?BANNED_TAB, {clientid, ClientId})
-        orelse ets:member(?BANNED_TAB, {username, Username})
-            orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}).
-
--spec(add(emqx_types:banned()) -> ok).
-add(Banned) when is_record(Banned, banned) ->
+check(ClientInfo) ->
+    do_check({clientid, maps:get(clientid, ClientInfo, undefined)})
+        orelse do_check({username, maps:get(username, ClientInfo, undefined)})
+            orelse do_check({peerhost, maps:get(peerhost, ClientInfo, undefined)}).
+
+do_check({_, undefined}) ->
+    false;
+do_check(Who) when is_tuple(Who) ->
+    case mnesia:dirty_read(?BANNED_TAB, Who) of
+        [] -> false;
+        [#banned{until = Until}] ->
+            Until > erlang:system_time(millisecond)
+    end.
+
+-spec(create(emqx_types:banned()) -> ok).
+create(#{who    := Who,
+         by     := By,
+         reason := Reason,
+         at     := At,
+         until  := Until}) ->
+    mnesia:dirty_write(?BANNED_TAB, #banned{who = Who,
+                                            by = By,
+                                            reason = Reason,
+                                            at = At,
+                                            until = Until});
+create(Banned) when is_record(Banned, banned) ->
     mnesia:dirty_write(?BANNED_TAB, Banned).
 
 -spec(delete({clientid, emqx_types:clientid()}
            | {username, emqx_types:username()}
            | {peerhost, emqx_types:peerhost()}) -> ok).
-delete(Key) -> mnesia:dirty_delete(?BANNED_TAB, Key).
+delete(Who) ->
+    mnesia:dirty_delete(?BANNED_TAB, Who).
 
 info(InfoKey) ->
     mnesia:table_info(?BANNED_TAB, InfoKey).

+ 1 - 9
src/emqx_channel.erl

@@ -219,7 +219,6 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
                    fun enrich_client/2,
                    fun set_logger_meta/2,
                    fun check_banned/2,
-                   fun check_flapping/2,
                    fun auth_connect/2], ConnPkt, Channel) of
         {ok, NConnPkt, NChannel} ->
             process_connect(NConnPkt, NChannel);
@@ -942,7 +941,7 @@ set_logger_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
     emqx_logger:set_metadata_clientid(ClientId).
 
 %%--------------------------------------------------------------------
-%% Check banned/flapping
+%% Check banned
 %%--------------------------------------------------------------------
 
 check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
@@ -951,13 +950,6 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
         false -> ok
     end.
 
-check_flapping(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
-    case emqx_zone:enable_flapping_detect(Zone)
-         andalso emqx_flapping:check(ClientInfo) of
-        true -> {error, ?RC_CONNECTION_RATE_EXCEEDED};
-        false -> ok
-    end.
-
 %%--------------------------------------------------------------------
 %% Auth Connect
 %%--------------------------------------------------------------------

+ 20 - 73
src/emqx_flapping.erl

@@ -27,7 +27,7 @@
 -export([start_link/0, stop/0]).
 
 %% API
--export([check/1, detect/1]).
+-export([detect/1]).
 
 %% gen_server callbacks
 -export([ init/1
@@ -54,8 +54,7 @@
           clientid   :: emqx_types:clientid(),
           peerhost   :: emqx_types:peerhost(),
           started_at :: pos_integer(),
-          detect_cnt :: pos_integer(),
-          banned_at  :: pos_integer()
+          detect_cnt :: pos_integer()
          }).
 
 -opaque(flapping() :: #flapping{}).
@@ -68,27 +67,14 @@ start_link() ->
 
 stop() -> gen_server:stop(?MODULE).
 
-%% @doc Check flapping when a MQTT client connected.
--spec(check(emqx_types:clientinfo()) -> boolean()).
-check(#{clientid := ClientId}) ->
-    check(ClientId, get_policy()).
-
-check(ClientId, #{banned_interval := Interval}) ->
-    case ets:lookup(?FLAPPING_TAB, {banned, ClientId}) of
-        [] -> false;
-        [#flapping{banned_at = BannedAt}] ->
-            now_diff(BannedAt) < Interval
-    end.
-
 %% @doc Detect flapping when a MQTT client disconnected.
 -spec(detect(emqx_types:clientinfo()) -> boolean()).
 detect(Client) -> detect(Client, get_policy()).
 
-detect(#{clientid := ClientId, peerhost := PeerHost},
-       Policy = #{threshold := Threshold}) ->
+detect(#{clientid := ClientId, peerhost := PeerHost}, Policy = #{threshold := Threshold}) ->
     try ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}) of
         Cnt when Cnt < Threshold -> false;
-        _Cnt -> case ets:lookup(?FLAPPING_TAB, ClientId) of
+        _Cnt -> case ets:take(?FLAPPING_TAB, ClientId) of
                     [Flapping] ->
                         ok = gen_server:cast(?MODULE, {detected, Flapping, Policy}),
                         true;
@@ -118,52 +104,44 @@ now_diff(TS) -> erlang:system_time(millisecond) - TS.
 %%--------------------------------------------------------------------
 
 init([]) ->
-    #{duration := Duration, banned_interval := Interval} = get_policy(),
     ok = emqx_tables:new(?FLAPPING_TAB, [public, set,
                                          {keypos, 2},
                                          {read_concurrency, true},
                                          {write_concurrency, true}
                                         ]),
-    State = #{time => max(Duration, Interval) + 1, tref => undefined},
-    {ok, ensure_timer(State), hibernate}.
+    {ok, #{}, hibernate}.
 
 handle_call(Req, _From, State) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
-handle_cast({detected, Flapping = #flapping{clientid   = ClientId,
-                                            peerhost   = PeerHost,
-                                            started_at = StartedAt,
-                                            detect_cnt = DetectCnt},
-             #{duration := Duration}}, State) ->
-    case (Interval = now_diff(StartedAt)) < Duration of
+handle_cast({detected, #flapping{clientid   = ClientId,
+                                 peerhost   = PeerHost,
+                                 started_at = StartedAt,
+                                 detect_cnt = DetectCnt},
+             #{duration := Duration, banned_interval := Interval}}, State) ->
+    case now_diff(StartedAt) < Duration of
         true -> %% Flapping happened:(
-            %% Log first
             ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
-                 [ClientId, inet:ntoa(PeerHost), DetectCnt, Duration]),
-            %% Banned.
-            BannedFlapping = Flapping#flapping{clientid  = {banned, ClientId},
-                                               banned_at = erlang:system_time(millisecond)
-                                              },
-            alarm_handler:set_alarm({{flapping_detected, ClientId}, BannedFlapping}),
-            ets:insert(?FLAPPING_TAB, BannedFlapping);
+                 [ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Duration]),
+            Now = erlang:system_time(millisecond),
+            Banned = #banned{who    = {clientid, ClientId},
+                             by     = <<"flapping detector">>,
+                             reason = <<"flapping is detected">>,
+                             at     = Now,
+                             until  = Now + Interval},
+            alarm_handler:set_alarm({{flapping_detected, ClientId}, Banned}),
+            emqx_banned:create(Banned);
         false ->
             ?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
                  [ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval])
     end,
-    ets:delete_object(?FLAPPING_TAB, Flapping),
     {noreply, State};
 
 handle_cast(Msg, State) ->
     ?LOG(error, "Unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
-handle_info({timeout, TRef, expire_flapping}, State = #{tref := TRef}) ->
-    with_flapping_tab(fun expire_flapping/2,
-                      [erlang:system_time(millisecond),
-                       get_policy()]),
-    {noreply, ensure_timer(State#{tref => undefined}), hibernate};
-
 handle_info(Info, State) ->
     ?LOG(error, "Unexpected info: ~p", [Info]),
     {noreply, State}.
@@ -173,34 +151,3 @@ terminate(_Reason, _State) ->
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-ensure_timer(State = #{time := Time, tref := undefined}) ->
-    State#{tref => emqx_misc:start_timer(Time, expire_flapping)};
-ensure_timer(State) -> State.
-
-with_flapping_tab(Fun, Args) ->
-    case ets:info(?FLAPPING_TAB, size) of
-        undefined -> ok;
-        0         -> ok;
-        _Size     -> erlang:apply(Fun, Args)
-    end.
-
-expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) ->
-    case ets:select(?FLAPPING_TAB,
-                    [{#flapping{started_at = '$1', banned_at = undefined, _ = '_'},
-                      [{'<', '$1', NowTime-Duration}], ['$_']},
-                     {#flapping{clientid = {banned, '_'}, banned_at = '$1', _ = '_'},
-                      [{'<', '$1', NowTime-Interval}], ['$_']}]) of
-        [] -> ok;
-        Flappings ->
-            lists:foreach(fun(Flapping = #flapping{clientid = {banned, ClientId}}) ->
-                              ets:delete_object(?FLAPPING_TAB, Flapping),
-                              alarm_handler:clear_alarm({flapping_detected, ClientId});
-                             (_) -> ok
-                            end, Flappings)
-    end.
-

+ 5 - 1
src/emqx_plugins.erl

@@ -32,6 +32,10 @@
         , load_expand_plugin/1
         ]).
 
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -82,7 +86,7 @@ load_expand_plugin(PluginDir) ->
     init_expand_plugin_config(PluginDir),
     Ebin = filename:join([PluginDir, "ebin"]),
     code:add_patha(Ebin),
-    Modules = filelib:wildcard(filename:join([Ebin ++ "*.beam"])),
+    Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
     lists:foreach(fun(Mod) ->
         Module = list_to_atom(filename:basename(Mod, ".beam")),
         code:load_file(Module)

+ 1 - 1
src/emqx_session.erl

@@ -157,7 +157,7 @@
 
 -define(STATS_KEYS, [subscriptions_cnt,
                      subscriptions_max,
-                     inflight,
+                     inflight_cnt,
                      inflight_max,
                      mqueue_len,
                      mqueue_max,

+ 1 - 1
src/emqx_sup.erl

@@ -43,7 +43,7 @@ start_link() ->
     supervisor:start_link({local, ?SUP}, ?MODULE, []).
 
 -spec(start_child(supervisor:child_spec()) -> startchild_ret()).
-start_child(ChildSpec) when is_tuple(ChildSpec) ->
+start_child(ChildSpec) when is_map(ChildSpec) ->
     supervisor:start_child(?SUP, ChildSpec).
 
 -spec(start_child(module(), worker | supervisor) -> startchild_ret()).

+ 6 - 1
src/emqx_sys.erl

@@ -45,6 +45,11 @@
         , terminate/2
         ]).
 
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
 -import(emqx_topic, [systop/1]).
 -import(emqx_misc, [start_timer/2]).
 
@@ -192,7 +197,7 @@ uptime(hours, H) when H < 24 ->
 uptime(hours, H) ->
     [uptime(days, H div 24), integer_to_list(H rem 24), " hours, "];
 uptime(days, D) ->
-    [integer_to_list(D), " days,"].
+    [integer_to_list(D), " days, "].
 
 publish(uptime, Uptime) ->
     safe_publish(systop(uptime), Uptime);

+ 111 - 111
src/emqx_vm.erl

@@ -19,6 +19,7 @@
 -export([ schedulers/0
         , scheduler_usage/1
         , microsecs/0
+        , system_info_keys/0
         , get_system_info/0
         , get_system_info/1
         , get_memory/0
@@ -26,11 +27,12 @@
         , loads/0
         ]).
 
--export([ get_process_list/0
+-export([ process_info_keys/0
         , get_process_info/0
         , get_process_info/1
-        , get_process_gc/0
-        , get_process_gc/1
+        , process_gc_info_keys/0
+        , get_process_gc_info/0
+        , get_process_gc_info/1
         , get_process_group_leader_info/1
         , get_process_limit/0
         ]).
@@ -62,86 +64,83 @@
                           sl_alloc,
                           ll_alloc,
                           fix_alloc,
-                          std_alloc]).
-
--define(PROCESS_LIST, [initial_call,
-                       reductions,
-                       memory,
-                       message_queue_len,
-                       current_function]).
-
--define(PROCESS_INFO, [initial_call,
-                       current_function,
-                       registered_name,
-                       status,
-                       message_queue_len,
-                       group_leader,
-                       priority,
-                       trap_exit,
-                       reductions,
-                       %%binary,
-                       last_calls,
-                       catchlevel,
-                       trace,
-                       suspending,
-                       sequential_trace_token,
-                       error_handler]).
-
--define(PROCESS_GC, [memory,
-                     total_heap_size,
-                     heap_size,
-                     stack_size,
-                     min_heap_size]).
-                     %fullsweep_after]).
-
--define(SYSTEM_INFO, [allocated_areas,
-                      allocator,
-                      alloc_util_allocators,
-                      build_type,
-                      check_io,
-                      compat_rel,
-                      creation,
-                      debug_compiled,
-                      dist,
-                      dist_ctrl,
-                      driver_version,
-                      elib_malloc,
-                      dist_buf_busy_limit,
-                      %fullsweep_after, % included in garbage_collection
-                      garbage_collection,
-                      %global_heaps_size, % deprecated
-                      heap_sizes,
-                      heap_type,
-                      info,
-                      kernel_poll,
-                      loaded,
-                      logical_processors,
-                      logical_processors_available,
-                      logical_processors_online,
-                      machine,
-                      %min_heap_size, % included in garbage_collection
-                      %min_bin_vheap_size, % included in garbage_collection
-                      modified_timing_level,
-                      multi_scheduling,
-                      multi_scheduling_blockers,
-                      otp_release,
-                      port_count,
-                      process_count,
-                      process_limit,
-                      scheduler_bind_type,
-                      scheduler_bindings,
-                      scheduler_id,
-                      schedulers,
-                      schedulers_online,
-                      smp_support,
-                      system_version,
-                      system_architecture,
-                      threads,
-                      thread_pool_size,
-                      trace_control_word,
-                      update_cpu_info,
-                      version,
-                      wordsize]).
+                          std_alloc
+                         ]).
+
+-define(PROCESS_INFO_KEYS, [initial_call,
+                            current_function,
+                            registered_name,
+                            status,
+                            message_queue_len,
+                            group_leader,
+                            priority,
+                            trap_exit,
+                            reductions,
+                            %%binary,
+                            last_calls,
+                            catchlevel,
+                            trace,
+                            suspending,
+                            sequential_trace_token,
+                            error_handler
+                           ]).
+
+-define(PROCESS_GC_KEYS, [memory,
+                          total_heap_size,
+                          heap_size,
+                          stack_size,
+                          min_heap_size
+                         ]).
+
+-define(SYSTEM_INFO_KEYS, [allocated_areas,
+                           allocator,
+                           alloc_util_allocators,
+                           build_type,
+                           check_io,
+                           compat_rel,
+                           creation,
+                           debug_compiled,
+                           dist,
+                           dist_ctrl,
+                           driver_version,
+                           elib_malloc,
+                           dist_buf_busy_limit,
+                           %fullsweep_after, % included in garbage_collection
+                           garbage_collection,
+                           %global_heaps_size, % deprecated
+                           heap_sizes,
+                           heap_type,
+                           info,
+                           kernel_poll,
+                           loaded,
+                           logical_processors,
+                           logical_processors_available,
+                           logical_processors_online,
+                           machine,
+                           %min_heap_size, % included in garbage_collection
+                           %min_bin_vheap_size, % included in garbage_collection
+                           modified_timing_level,
+                           multi_scheduling,
+                           multi_scheduling_blockers,
+                           otp_release,
+                           port_count,
+                           process_count,
+                           process_limit,
+                           scheduler_bind_type,
+                           scheduler_bindings,
+                           scheduler_id,
+                           schedulers,
+                           schedulers_online,
+                           smp_support,
+                           system_version,
+                           system_architecture,
+                           threads,
+                           thread_pool_size,
+                           trace_control_word,
+                           update_cpu_info,
+                           version,
+                           wordsize
+                          ]).
 
 -define(SOCKET_OPTS, [active,
                       broadcast,
@@ -166,7 +165,8 @@
                       send_timeout,
                       send_timeout_close,
                       sndbuf,
-                      tos]).
+                      tos
+                     ]).
 
 schedulers() ->
     erlang:system_info(schedulers).
@@ -178,16 +178,16 @@ microsecs() ->
 loads() ->
     [{load1,  ftos(avg1()/256)},
      {load5,  ftos(avg5()/256)},
-     {load15, ftos(avg15()/256)}].
+     {load15, ftos(avg15()/256)}
+    ].
+
+system_info_keys() -> ?SYSTEM_INFO_KEYS.
 
 get_system_info() ->
-    [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO].
+    [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO_KEYS].
 
 get_system_info(Key) ->
-    try erlang:system_info(Key) catch
-    error:badarg->undefined
-    end.
-%% conversion functions for erlang:system_info(Key)
+    try erlang:system_info(Key) catch error:badarg-> undefined end.
 
 format_system_info(allocated_areas, List) ->
     [convert_allocated_areas(Value) || Value <- List];
@@ -221,8 +221,9 @@ convert_allocated_areas({Key, Value}) ->
 
 mem_info() ->
     Dataset = memsup:get_system_memory_data(),
-    [{total_memory, proplists:get_value(total_memory, Dataset)},
-     {used_memory, proplists:get_value(total_memory, Dataset) - proplists:get_value(free_memory, Dataset)}].
+    Total = proplists:get_value(total_memory, Dataset),
+    Free = proplists:get_value(free_memory, Dataset),
+    [{total_memory, Total}, {used_memory, Total - Free}].
 
 ftos(F) ->
     S = io_lib:format("~.2f", [F]), S.
@@ -300,24 +301,24 @@ container_value(Props, Pos, Type, Container) ->
     TypeProps = proplists:get_value(Type, Props),
     element(Pos, lists:keyfind(Container, 1, TypeProps)).
 
-get_process_list()->
-    [get_process_list(Pid) || Pid <- processes()].
-
-get_process_list(Pid) when is_pid(Pid) ->
-    [{pid, Pid} | [process_info(Pid, Key) || Key <- ?PROCESS_LIST]].
+process_info_keys() ->
+    ?PROCESS_INFO_KEYS.
 
 get_process_info() ->
-    [get_process_info(Pid) || Pid <- processes()].
+    get_process_info(self()).
 get_process_info(Pid) when is_pid(Pid) ->
-    process_info(Pid, ?PROCESS_INFO).
+    process_info(Pid, ?PROCESS_INFO_KEYS).
+
+process_gc_info_keys() ->
+    ?PROCESS_GC_KEYS.
 
-get_process_gc() ->
-    [get_process_gc(Pid) || Pid <- processes()].
-get_process_gc(Pid) when is_pid(Pid) ->
-    process_info(Pid, ?PROCESS_GC).
+get_process_gc_info() ->
+    get_process_gc_info(self()).
+get_process_gc_info(Pid) when is_pid(Pid) ->
+    process_info(Pid, ?PROCESS_GC_KEYS).
 
 get_process_group_leader_info(LeaderPid) when is_pid(LeaderPid) ->
-    [{Key, Value}|| {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO)].
+    [{Key, Value}|| {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO_KEYS)].
 
 get_process_limit() ->
     erlang:system_info(process_limit).
@@ -446,8 +447,7 @@ ports_type_count(Types) ->
 
 mapping(Entries) ->
     mapping(Entries, []).
-mapping([], Acc) ->
-    Acc;
+mapping([], Acc) -> Acc;
 mapping([{owner, V}|Entries], Acc) when is_pid(V) ->
     OwnerInfo = process_info(V),
     Owner = proplists:get_value(registered_name, OwnerInfo, undefined),
@@ -470,10 +470,10 @@ cpu_util() ->
 compat_windows(Fun) ->
     case os:type() of
         {win32, nt} -> 0;
-        _Other -> handle_error(Fun())
+        _Type ->
+            case catch Fun() of
+                Val when is_number(Val) -> Val;
+                _Error -> 0
+            end
     end.
 
-handle_error(Value) when is_number(Value) ->
-    Value;
-handle_error({error, _Reason}) ->
-    0.

+ 7 - 0
test/emqx_SUITE.erl

@@ -32,6 +32,13 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
 
+t_restart(_) ->
+    ConfFile = "test.config",
+    Data = "[{emqx_statsd,[{interval,15000},{push_gateway,\"http://127.0.0.1:9091\"}]}].",
+    file:write_file(ConfFile, list_to_binary(Data)),
+    emqx:restart(ConfFile),
+    file:delete(ConfFile).
+
 t_stop_start(_) ->
     emqx:stop(),
     false = emqx:is_running(node()),

+ 39 - 9
test/emqx_access_control_SUITE.erl

@@ -19,22 +19,52 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include("emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 all() -> emqx_ct:all(?MODULE).
 
-init_per_testcase(_TestCase, Config) ->
+init_per_suite(Config) ->
+    emqx_ct_helpers:boot_modules([router, broker]),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
-end_per_testcase(_TestCase, Config) ->
-    Config.
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+t_authenticate(_) ->
+    emqx_zone:set_env(zone, allow_anonymous, false),
+    ?assertMatch({error, _}, emqx_access_control:authenticate(clientinfo())),
+    emqx_zone:set_env(zone, allow_anonymous, true),
+    ?assertMatch({ok, _}, emqx_access_control:authenticate(clientinfo())).
+
+t_check_acl(_) ->
+    emqx_zone:set_env(zone, acl_nomatch, deny),
+    application:set_env(emqx, enable_acl_cache, false),
+    Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
+    ?assertEqual(deny, emqx_access_control:check_acl(clientinfo(), Publish, <<"t">>)),
 
-% t_authenticate(_) ->
-%     error('TODO').
+    emqx_zone:set_env(zone, acl_nomatch, allow),
+    application:set_env(emqx, enable_acl_cache, true),
+    Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
+    ?assertEqual(allow, emqx_access_control:check_acl(clientinfo(), Publish, <<"t">>)).
 
-% t_check_acl(_) ->
-%     error('TODO').
+t_reload_acl(_) ->
+    ?assertEqual(ok, emqx_access_control:reload_acl()).
 
-% t_reload_acl(_) ->
-%     error('TODO').
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
 
+clientinfo() -> clientinfo(#{}).
+clientinfo(InitProps) ->
+    maps:merge(#{zone       => zone,
+                 protocol   => mqtt,
+                 peerhost   => {127,0,0,1},
+                 clientid   => <<"clientid">>,
+                 username   => <<"username">>,
+                 password   => <<"passwd">>,
+                 is_superuser => false,
+                 peercert   => undefined,
+                 mountpoint => undefined
+                }, InitProps).

+ 67 - 7
test/emqx_access_rule_SUITE.erl

@@ -23,15 +23,75 @@
 
 all() -> emqx_ct:all(?MODULE).
 
-init_per_testcase(_TestCase, Config) ->
+init_per_suite(Config) ->
+    emqx_ct_helpers:boot_modules([router, broker]),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
-end_per_testcase(_TestCase, Config) ->
-    Config.
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+t_compile(_) ->
+    Rule1 = {allow, all, pubsub, <<"%u">>},
+    Compile1 = {allow, all, pubsub, [{pattern,[<<"%u">>]}]},
+
+    Rule2 = {allow, {ipaddr, "127.0.0.1"}, pubsub, <<"%c">>},
+    Compile2 = {allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, pubsub, [{pattern,[<<"%c">>]}]},
+
+    Rule3 = {allow, {'and', [{client, <<"testClient">>}, {user, <<"testUser">>}]}, pubsub, [<<"testTopics1">>,  <<"testTopics2">>]},
+    Compile3 = {allow, {'and', [{client, <<"testClient">>}, {user, <<"testUser">>}]}, pubsub, [[<<"testTopics1">>],  [<<"testTopics2">>]]},
 
-% t_compile(_) ->
-%     error('TODO').
+    Rule4 = {allow, {'or', [{client, all}, {user, all}]}, pubsub, [ <<"testTopics1">>,  <<"testTopics2">>]},
+    Compile4 = {allow, {'or', [{client, all}, {user, all}]}, pubsub, [[<<"testTopics1">>],  [<<"testTopics2">>]]},
 
-% t_match(_) ->
-%     error('TODO').
+    ?assertEqual(Compile1, emqx_access_rule:compile(Rule1)),
+    ?assertEqual(Compile2, emqx_access_rule:compile(Rule2)),
+    ?assertEqual(Compile3, emqx_access_rule:compile(Rule3)),
+    ?assertEqual(Compile4, emqx_access_rule:compile(Rule4)).
 
+t_match(_) ->
+    ClientInfo1 = #{zone => external,
+                    clientid => <<"testClient">>,
+                    username => <<"TestUser">>,
+                    peerhost => {127,0,0,1}
+                   },
+    ClientInfo2 = #{zone => external,
+                    clientid => <<"testClient">>,
+                    username => <<"TestUser">>,
+                    peerhost => {192,168,0,10}
+                   },
+    ClientInfo3 = #{zone => external,
+                    clientid => <<"testClient">>,
+                    username => <<"TestUser">>,
+                    peerhost => undefined
+                   },
+    ?assertEqual({matched, deny}, emqx_access_rule:match([], [], {deny, all})),
+    ?assertEqual({matched, allow}, emqx_access_rule:match([], [], {allow, all})),
+    ?assertEqual(nomatch, emqx_access_rule:match(ClientInfo1, <<"Test/Topic">>,
+                                                    emqx_access_rule:compile({allow, {user, all}, pubsub, []}))),
+    ?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"Test/Topic">>,
+                                                            emqx_access_rule:compile({allow, {client, all}, pubsub, ["$SYS/#", "#"]}))),
+    ?assertEqual(nomatch, emqx_access_rule:match(ClientInfo3, <<"Test/Topic">>,
+                                                    emqx_access_rule:compile({allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}))),
+    ?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"Test/Topic">>,
+                                                            emqx_access_rule:compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}))),
+    ?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo2, <<"Test/Topic">>,
+                                                            emqx_access_rule:compile({allow, {ipaddr, "192.168.0.1/24"}, subscribe, ["$SYS/#", "#"]}))),
+    ?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"d/e/f/x">>,
+                                                            emqx_access_rule:compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]}))),
+    ?assertEqual(nomatch, emqx_access_rule:match(ClientInfo1, <<"d/e/f/x">>,
+                                                    emqx_access_rule:compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}))),
+    ?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"testTopics/testClient">>,
+                                                            emqx_access_rule:compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}))),
+    ?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"clients/testClient">>,
+                                                            emqx_access_rule:compile({allow, all, pubsub, ["clients/%c"]}))),
+    ?assertEqual({matched, allow}, emqx_access_rule:match(#{username => <<"user2">>}, <<"users/user2/abc/def">>,
+                                                            emqx_access_rule:compile({allow, all, subscribe, ["users/%u/#"]}))),
+    ?assertEqual({matched, deny}, emqx_access_rule:match(ClientInfo1, <<"d/e/f">>,
+                                                            emqx_access_rule:compile({deny, all, subscribe, ["$SYS/#", "#"]}))),
+    ?assertEqual(nomatch, emqx_access_rule:match(ClientInfo1, <<"Topic">>,
+                                                    emqx_access_rule:compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}))),
+    ?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"Topic">>,
+                                                            emqx_access_rule:compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}))),
+    ?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"Topic">>,
+                                                            emqx_access_rule:compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}))).

+ 9 - 10
test/emqx_banned_SUITE.erl

@@ -38,20 +38,20 @@ end_per_suite(_Config) ->
 
 t_add_delete(_) ->
     Banned = #banned{who = {clientid, <<"TestClient">>},
-                     reason = <<"test">>,
                      by = <<"banned suite">>,
-                     desc = <<"test">>,
+                     reason = <<"test">>,
+                     at = erlang:system_time(second),
                      until = erlang:system_time(second) + 1000
                     },
-    ok = emqx_banned:add(Banned),
+    ok = emqx_banned:create(Banned),
     ?assertEqual(1, emqx_banned:info(size)),
     ok = emqx_banned:delete({clientid, <<"TestClient">>}),
     ?assertEqual(0, emqx_banned:info(size)).
 
 t_check(_) ->
-    ok = emqx_banned:add(#banned{who = {clientid, <<"BannedClient">>}}),
-    ok = emqx_banned:add(#banned{who = {username, <<"BannedUser">>}}),
-    ok = emqx_banned:add(#banned{who = {ipaddr, {192,168,0,1}}}),
+    ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>}}),
+    ok = emqx_banned:create(#banned{who = {username, <<"BannedUser">>}}),
+    ok = emqx_banned:create(#banned{who = {peerhost, {192,168,0,1}}}),
     ?assertEqual(3, emqx_banned:info(size)),
     ClientInfo1 = #{clientid => <<"BannedClient">>,
                     username => <<"user">>,
@@ -75,7 +75,7 @@ t_check(_) ->
     ?assertNot(emqx_banned:check(ClientInfo4)),
     ok = emqx_banned:delete({clientid, <<"BannedClient">>}),
     ok = emqx_banned:delete({username, <<"BannedUser">>}),
-    ok = emqx_banned:delete({ipaddr, {192,168,0,1}}),
+    ok = emqx_banned:delete({peerhost, {192,168,0,1}}),
     ?assertNot(emqx_banned:check(ClientInfo1)),
     ?assertNot(emqx_banned:check(ClientInfo2)),
     ?assertNot(emqx_banned:check(ClientInfo3)),
@@ -84,9 +84,8 @@ t_check(_) ->
 
 t_unused(_) ->
     {ok, Banned} = emqx_banned:start_link(),
-    ok = emqx_banned:add(#banned{who = {clientid, <<"BannedClient">>},
-                                 until = erlang:system_time(second)
-                                }),
+    ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>},
+                                    until = erlang:system_time(second)}),
     ?assertEqual(ignored, gen_server:call(Banned, unexpected_req)),
     ?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)),
     ?assertEqual(ok, Banned ! ok),

+ 15 - 13
test/emqx_cm_locker_SUITE.erl

@@ -23,21 +23,23 @@
 
 all() -> emqx_ct:all(?MODULE).
 
-init_per_testcase(_TestCase, Config) ->
+init_per_suite(Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
-end_per_testcase(_TestCase, Config) ->
-    Config.
-
-% t_start_link(_) ->
-%     error('TODO').
-
-% t_trans(_) ->
-%     error('TODO').
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
 
-% t_lock(_) ->
-%     error('TODO').
+t_start_link(_) ->
+    emqx_cm_locker:start_link().
 
-% t_unlock(_) ->
-%     error('TODO').
+t_trans(_) ->
+    ok = emqx_cm_locker:trans(undefined, fun(_) -> ok end, []),
+    ok = emqx_cm_locker:trans(<<"clientid">>, fun(_) -> ok end).
 
+t_lock_unlocak(_) ->
+    {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
+    {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
+    {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>),
+    {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>).

+ 5 - 5
test/emqx_flapping_SUITE.erl

@@ -45,13 +45,13 @@ t_detect_check(_) ->
                    peerhost => {127,0,0,1}
                   },
     false = emqx_flapping:detect(ClientInfo),
-    false = emqx_flapping:check(ClientInfo),
+    false = emqx_banned:check(ClientInfo),
     false = emqx_flapping:detect(ClientInfo),
-    false = emqx_flapping:check(ClientInfo),
+    false = emqx_banned:check(ClientInfo),
     true = emqx_flapping:detect(ClientInfo),
     timer:sleep(100),
-    true = emqx_flapping:check(ClientInfo),
-    timer:sleep(300),
-    false = emqx_flapping:check(ClientInfo),
+    true = emqx_banned:check(ClientInfo),
+    timer:sleep(200),
+    false = emqx_banned:check(ClientInfo),
     ok = emqx_flapping:stop().
 

+ 38 - 13
test/emqx_mod_acl_internal_SUITE.erl

@@ -19,28 +19,53 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include("emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 all() -> emqx_ct:all(?MODULE).
 
-init_per_testcase(_TestCase, Config) ->
+init_per_suite(Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([emqx]),
     Config.
 
-end_per_testcase(_TestCase, Config) ->
-    Config.
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([emqx]).
+
+t_load_unload(_) ->
+    ?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])),
+    ?assertEqual(ok, emqx_mod_acl_internal:unload([])),
+    ?assertEqual(ok, emqx_mod_acl_internal:load([])).
 
-% t_load(_) ->
-%     error('TODO').
+t_all_rules(_) ->
+    application:set_env(emqx, acl_file, ""),
+    ?assertMatch(#{}, emqx_mod_acl_internal:all_rules()),
 
-% t_unload(_) ->
-%     error('TODO').
+    application:set_env(emqx, acl_file, emqx_ct_helpers:deps_path(emqx, "etc/acl.conf")),
+    ?assertMatch(#{publish := _, subscribe := _}, emqx_mod_acl_internal:all_rules()).
 
-% t_all_rules(_) ->
-%     error('TODO').
+t_check_acl(_) ->
+    Rules=#{publish => [{allow,all}], subscribe => [{deny, all}]},
+    ?assertEqual({ok, allow}, emqx_mod_acl_internal:check_acl(clientinfo(), publish,  <<"t">>, [], Rules)),
+    ?assertEqual({ok, deny}, emqx_mod_acl_internal:check_acl(clientinfo(), subscribe,  <<"t">>, [], Rules)),
+    ?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect,  <<"t">>, [], Rules)).
 
-% t_check_acl(_) ->
-%     error('TODO').
+t_reload_acl(_) ->
+    ?assertEqual(ok, emqx_mod_acl_internal:reload_acl()).
 
-% t_reload_acl(_) ->
-%     error('TODO').
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
 
+clientinfo() -> clientinfo(#{}).
+clientinfo(InitProps) ->
+    maps:merge(#{zone       => zone,
+                 protocol   => mqtt,
+                 peerhost   => {127,0,0,1},
+                 clientid   => <<"clientid">>,
+                 username   => <<"username">>,
+                 password   => <<"passwd">>,
+                 is_superuser => false,
+                 peercert   => undefined,
+                 mountpoint => undefined
+                }, InitProps).

+ 31 - 9
test/emqx_mod_subscription_SUITE.erl

@@ -19,22 +19,44 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include("emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 all() -> emqx_ct:all(?MODULE).
 
-init_per_testcase(_TestCase, Config) ->
+init_per_suite(Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([emqx]),
     Config.
 
-end_per_testcase(_TestCase, Config) ->
-    Config.
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([emqx]).
+
+t_load(_) ->
+    ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, ?QOS_0}])).
+
+t_on_client_connected(_) ->
+    {ok, C} = emqtt:start_link([{host, "localhost"},
+                            {clientid, "myclient"},
+                            {username, "admin"}]),
+    {ok, _} = emqtt:connect(C),
+    emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0),
+    {ok, #{topic := Topic, payload := Payload}} = receive_publish(100),
+    ?assertEqual(<<"connected/myclient/admin">>, Topic),
+    ?assertEqual(<<"Hello world">>, Payload),
+    ok = emqtt:disconnect(C).
 
-% t_load(_) ->
-%     error('TODO').
+t_unload(_) ->
+    ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/%c/%u">>, ?QOS_0}])).
 
-% t_on_client_connected(_) ->
-%     error('TODO').
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
 
-% t_unload(_) ->
-%     error('TODO').
+receive_publish(Timeout) ->
+    receive
+        {publish, Publish} -> {ok, Publish}
+    after
+        Timeout -> {error, timeout}
+    end.
 

+ 90 - 20
test/emqx_plugins_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include("emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 all() -> emqx_ct:all(?MODULE).
@@ -30,7 +31,7 @@ init_per_suite(Config) ->
 
     DataPath = proplists:get_value(data_dir, Config),
     AppPath = filename:join([DataPath, "emqx_mini_plugin"]),
-    Cmd = lists:flatten(io_lib:format("cd ~s && make", [AppPath])),
+    Cmd = lists:flatten(io_lib:format("cd ~s && make && cp -r etc _build/default/lib/emqx_mini_plugin/", [AppPath])),
 
     ct:pal("Executing ~s~n", [Cmd]),
     ct:pal("~n ~s~n", [os:cmd(Cmd)]),
@@ -43,21 +44,6 @@ init_per_suite(Config) ->
 
     Config.
     
-% t_load_expand_plugin(_) ->
-%     error('TODO').
-
-% t_list(_) ->
-%     error('TODO').
-
-% t_find_plugin(_) ->
-%     error('TODO').
-
-% t_unload(_) ->
-%     error('TODO').
-
-% t_init(_) ->
-%     error('TODO').
-
 set_sepecial_cfg(_) ->
     ExpandPath = filename:dirname(code:lib_dir(emqx_mini_plugin)),
 
@@ -69,8 +55,92 @@ end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
 
 t_load(_) ->
-    {error, load_app_fail} = emqx_plugins:load_expand_plugin("./not_existed_path/"),
+    ?assertEqual([], emqx_plugins:load()),
+    ?assertEqual([], emqx_plugins:unload()),
+
+    ?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)),
+    ?assertMatch({ok, _}, emqx_plugins:load(emqx_mini_plugin)),
+    ?assertEqual({error, already_started}, emqx_plugins:load(emqx_mini_plugin)),
+    ?assertEqual(ok, emqx_plugins:unload(emqx_mini_plugin)),
+    ?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)),
+
+    application:set_env(emqx, expand_plugins_dir, undefined),
+    application:set_env(emqx, plugins_loaded_file, undefined),
+    ?assertEqual(ignore, emqx_plugins:load()),
+    ?assertEqual(ignore, emqx_plugins:unload()).
+
+
+t_init_config(_) ->
+    ConfFile = "emqx_mini_plugin.config",
+    Data = "[{emqx_mini_plugin,[{mininame ,test}]}].",
+    file:write_file(ConfFile, list_to_binary(Data)),
+    ?assertEqual(ok, emqx_plugins:init_config(ConfFile)),
+    file:delete(ConfFile),
+    ?assertEqual({ok,test}, application:get_env(emqx_mini_plugin, mininame)).
+
+t_load_expand_plugin(_) ->
+    ?assertEqual({error, load_app_fail}, emqx_plugins:load_expand_plugin("./not_existed_path/")).
+
+t_list(_) ->
+    ?assertMatch([{plugin, _, _, _, _, _, _, _, _} | _ ], emqx_plugins:list()).
+
+t_find_plugin(_) ->
+    ?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _, _}, emqx_plugins:find_plugin(emqx_mini_plugin)).
+
+t_plugin_type(_) ->
+    ?assertEqual(auth, emqx_plugins:plugin_type(auth)),
+    ?assertEqual(protocol, emqx_plugins:plugin_type(protocol)),
+    ?assertEqual(backend, emqx_plugins:plugin_type(backend)),
+    ?assertEqual(bridge, emqx_plugins:plugin_type(bridge)),
+    ?assertEqual(feature, emqx_plugins:plugin_type(undefined)).
+
+t_with_loaded_file(_) ->
+    ?assertMatch({error, _}, emqx_plugins:with_loaded_file("./not_existed_path/", fun(_) -> ok end)).
+
+t_plugin_loaded(_) ->
+    ?assertEqual(ok, emqx_plugins:plugin_loaded(emqx_mini_plugin, false)),
+    ?assertEqual(ok, emqx_plugins:plugin_loaded(emqx_mini_plugin, true)).
+
+t_plugin_unloaded(_) ->
+    ?assertEqual(ok, emqx_plugins:plugin_unloaded(emqx_mini_plugin, false)),
+    ?assertEqual(ok, emqx_plugins:plugin_unloaded(emqx_mini_plugin, true)).
+
+t_plugin(_) ->
+    try
+        emqx_plugins:plugin(not_existed_plugin, undefined)
+    catch
+        _Error:Reason:_Stacktrace ->
+            ?assertEqual({plugin_not_found,not_existed_plugin}, Reason)
+    end,
+    ?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _, _}, emqx_plugins:plugin(emqx_mini_plugin, undefined)).
+
+t_filter_plugins(_) ->
+    ?assertEqual([name1, name2], emqx_plugins:filter_plugins([name1, {name2,true}, {name3, false}])).
+
+t_load_plugin(_) ->
+    ok = meck:new(application, [unstick, non_strict, passthrough, no_history]),
+    ok = meck:expect(application, load, fun(already_loaded_app) -> {error, {already_loaded, already_loaded_app}};
+                                            (error_app) -> {error, error};
+                                            (_) -> ok end),
+    ok = meck:expect(application, ensure_all_started, fun(already_loaded_app) -> {error, {already_loaded_app, already_loaded}};
+                                                            (error_app) -> {error, error};
+                                                            (App) -> {ok, App} end),
+
+    ?assertMatch({error, _}, emqx_plugins:load_plugin(#plugin{name = already_loaded_app}, true)),
+    ?assertMatch({ok, _}, emqx_plugins:load_plugin(#plugin{name = normal}, true)),
+    ?assertMatch({error,_}, emqx_plugins:load_plugin(#plugin{name = error_app}, true)),
+
+    ok = meck:unload(application).
+
+t_unload_plugin(_) ->
+    ok = meck:new(application, [unstick, non_strict, passthrough, no_history]),
+    ok = meck:expect(application, stop, fun(not_started_app) -> {error, {not_started, not_started_app}};
+                                            (error_app) -> {error, error};
+                                            (_) -> ok end),
+
+    ?assertEqual(ok, emqx_plugins:unload_plugin(not_started_app, true)),
+    ?assertEqual(ok, emqx_plugins:unload_plugin(normal, true)),
+    ?assertEqual({error,error}, emqx_plugins:unload_plugin(error_app, true)),
+
+    ok = meck:unload(application).
 
-    {error, not_started} = emqx_plugins:unload(emqx_mini_plugin),
-    {ok, _} = emqx_plugins:load(emqx_mini_plugin),
-    ok = emqx_plugins:unload(emqx_mini_plugin).

+ 27 - 10
test/emqx_shared_sub_SUITE.erl

@@ -29,6 +29,9 @@
         emqx_ct_helpers:wait_for(
           ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
 
+-define(ack, shared_sub_ack).
+-define(no_ack, no_ack).
+
 all() -> emqx_ct:all(?SUITE).
 
 init_per_suite(Config) ->
@@ -39,17 +42,22 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
     
-% t_is_ack_required(_) ->
-%     error('TODO').
+t_is_ack_required(_) ->
+    ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
 
-% t_maybe_nack_dropped(_) ->
-%     error('TODO').
+t_maybe_nack_dropped(_) ->
+    ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
+    ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{shared_dispatch_ack => {self(), for_test}}})),
+    ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
 
-% t_nack_no_connection(_) ->
-%     error('TODO').
+t_nack_no_connection(_) ->
+    ?assertEqual(ok, emqx_shared_sub:nack_no_connection(#message{headers = #{shared_dispatch_ack => {self(), for_test}}})),
+    ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok after 100 -> timeout end).
 
-% t_maybe_ack(_) ->
-%     error('TODO').
+t_maybe_ack(_) ->
+    ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
+    ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(#message{headers = #{shared_dispatch_ack => {self(), for_test}}})),
+    ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
 
 % t_subscribers(_) ->
 %     error('TODO').
@@ -239,14 +247,23 @@ last_message(ExpectedPayload, Pids) ->
         <<"not yet?">>
     end.
     
-% t_dispatch(_) ->
-%     error('TODO').
+t_dispatch(_) ->
+    ok = ensure_config(random),
+    Topic = <<"foo">>,
+    ?assertEqual({error, no_subscribers}, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})),
+    emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
+    ?assertEqual(ok, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
 
 % t_unsubscribe(_) ->
 %     error('TODO').
 
 % t_subscribe(_) ->
 %     error('TODO').
+t_uncovered_func(_) ->
+    ignored = gen_server:call(emqx_shared_sub, ignored),
+    ok = gen_server:cast(emqx_shared_sub, ignored),
+    ignored = emqx_shared_sub ! ignored,
+    {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
 
 %%--------------------------------------------------------------------
 %% help functions

+ 39 - 0
test/emqx_sup_SUITE.erl

@@ -0,0 +1,39 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_sup_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+t_child(_) ->
+    ?assertMatch({error, _}, emqx_sup:start_child(undef, worker)),
+    ?assertMatch({error, not_found}, emqx_sup:stop_child(undef)),
+    ?assertMatch({error, _}, emqx_sup:start_child(emqx_broker_sup, supervisor)),
+    ?assertEqual(ok, emqx_sup:stop_child(emqx_broker_sup)),
+    ?assertMatch({ok, _}, emqx_sup:start_child(emqx_broker_sup, supervisor)).

+ 5 - 2
test/emqx_sys_SUITE.erl

@@ -49,8 +49,11 @@ end_per_suite(_Config) ->
 % t_sysdescr(_) ->
 %     error('TODO').
 
-% t_uptime(_) ->
-%     error('TODO').
+t_uptime(_) ->
+    ?assertEqual(<<"1 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 1))),
+    ?assertEqual(<<"1 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 60))),
+    ?assertEqual(<<"1 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 3600))),
+    ?assertEqual(<<"1 days, 0 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 86400))).
 
 % t_datetime(_) ->
 %     error('TODO').

+ 14 - 0
test/emqx_sys_mon_SUITE.erl

@@ -69,11 +69,25 @@ init_per_testcase(t_sys_mon2, Config) ->
             ok;
            (_) -> ok
         end),
+    Config;
+init_per_testcase(_, Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_testcase(_, _Config) ->
     emqx_ct_helpers:stop_apps([]).
 
+t_procinfo(_) ->
+    ok = meck:new(emqx_vm, [passthrough, no_history]),
+    ok = meck:expect(emqx_vm, get_process_info, fun(_) -> undefined end),
+    ok = meck:expect(emqx_vm, get_process_gc, fun(_) -> ok end),
+    ?assertEqual(undefined, emqx_sys_mon:procinfo([])),
+    ok = meck:expect(emqx_vm, get_process_info, fun(_) -> ok end),
+    ok = meck:expect(emqx_vm, get_process_gc, fun(_) -> undefined end),
+    ?assertEqual(undefined, emqx_sys_mon:procinfo([])),
+    ok = meck:unload(emqx_vm).
+
 t_sys_mon(_Config) ->
     lists:foreach(
       fun({PidOrPort, SysMonName,ValidateInfo, InfoOrPort}) ->

+ 9 - 95
test/emqx_vm_SUITE.erl

@@ -21,115 +21,29 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
--define(SYSTEM_INFO, [allocated_areas,
-                      allocator,
-                      alloc_util_allocators,
-                      build_type,
-                      check_io,
-                      compat_rel,
-                      creation,
-                      debug_compiled,
-                      dist,
-                      dist_ctrl,
-                      driver_version,
-                      elib_malloc,
-                      dist_buf_busy_limit,
-                      %fullsweep_after, % included in garbage_collection
-                      garbage_collection,
-                      %global_heaps_size, % deprecated
-                      heap_sizes,
-                      heap_type,
-                      info,
-                      kernel_poll,
-                      loaded,
-                      logical_processors,
-                      logical_processors_available,
-                      logical_processors_online,
-                      machine,
-                      %min_heap_size, % included in garbage_collection
-                      %min_bin_vheap_size, % included in garbage_collection
-                      modified_timing_level,
-                      multi_scheduling,
-                      multi_scheduling_blockers,
-                      otp_release,
-                      port_count,
-                      process_count,
-                      process_limit,
-                      scheduler_bind_type,
-                      scheduler_bindings,
-                      scheduler_id,
-                      schedulers,
-                      schedulers_online,
-                      smp_support,
-                      system_version,
-                      system_architecture,
-                      threads,
-                      thread_pool_size,
-                      trace_control_word,
-                      update_cpu_info,
-                      version,
-                      wordsize]).
-
--define(PROCESS_INFO, [initial_call,
-                       current_function,
-                       registered_name,
-                       status,
-                       message_queue_len,
-                       group_leader,
-                       priority,
-                       trap_exit,
-                       reductions,
-                       %%binary,
-                       last_calls,
-                       catchlevel,
-                       trace,
-                       suspending,
-                       sequential_trace_token,
-                       error_handler]).
-
--define(PROCESS_GC, [memory,
-                     total_heap_size,
-                     heap_size,
-                     stack_size,
-                     min_heap_size]).
-                     %fullsweep_after]).
-
 all() -> emqx_ct:all(?MODULE).
 
 t_load(_Config) ->
-    ?assertMatch([{load1, _},
-                  {load5, _},
-                  {load15, _}
-                 ], emqx_vm:loads()).
+    ?assertMatch([{load1, _}, {load5, _}, {load15, _}], emqx_vm:loads()).
 
 t_systeminfo(_Config) ->
-   Keys =  [Key || {Key, _} <- emqx_vm:get_system_info()],
-   ?SYSTEM_INFO = Keys,
-   ?assertEqual(undefined, emqx_vm:get_system_info(undefined)).
+    ?assertEqual(emqx_vm:system_info_keys(),
+                 [Key || {Key, _} <- emqx_vm:get_system_info()]),
+    ?assertEqual(undefined, emqx_vm:get_system_info(undefined)).
 
 t_mem_info(_Config) ->
     application:ensure_all_started(os_mon),
     MemInfo = emqx_vm:mem_info(),
-    [{total_memory, _},
-     {used_memory, _}]= MemInfo,
+    [{total_memory, _}, {used_memory, _}]= MemInfo,
     application:stop(os_mon).
 
-t_process_list(_Config) ->
-    Pid = self(),
-    ProcessInfo = emqx_vm:get_process_list(),
-    true = lists:member({pid, Pid}, lists:concat(ProcessInfo)).
-
 t_process_info(_Config) ->
-    ProcessInfos = emqx_vm:get_process_info(),
-    ProcessInfo = lists:last(ProcessInfos),
-    Keys = [K || {K, _V}<- ProcessInfo],
-    ?PROCESS_INFO = Keys.
+    ProcessInfo = emqx_vm:get_process_info(),
+    ?assertEqual(emqx_vm:process_info_keys(), [K || {K, _V}<- ProcessInfo]).
 
 t_process_gc(_Config) ->
-    ProcessGcs = emqx_vm:get_process_gc(),
-    ProcessGc = lists:last(ProcessGcs),
-    Keys = [K || {K, _V}<- ProcessGc],
-    ?PROCESS_GC = Keys.
+    GcInfo = emqx_vm:get_process_gc_info(),
+    ?assertEqual(emqx_vm:process_gc_info_keys(), [K || {K, _V}<- GcInfo]).
 
 t_get_ets_list(_Config) ->
     ets:new(test, [named_table]),