فهرست منبع

refactor(style): improve all types declaration

- Add dialyzer for code analysis
- Correct all the module type declarations
- Use `-type()` to declaration a type instead of `-opaque`
  (It is advantageous to the code dialyzer)

BROKEN CHANGES:
- Add a ?DEFAULT_SUBOPTS for emqx_broker:subscribe/1,2,3
- Remove the needless internal function `emqx_vm:port_info(PortTerm, Keys)`
JianBo He 5 سال پیش
والد
کامیت
e3489b9d46

+ 4 - 0
.github/workflows/run_test_case.yaml

@@ -13,6 +13,10 @@ jobs:
       
         steps:
         - uses: actions/checkout@v1
+        - name: Code dialyzer
+          run: |
+            make dialyzer
+            rm -f rebar.lock
         - name: Run tests
           run: |
             make xref

+ 4 - 0
Makefile

@@ -14,6 +14,10 @@ RUN_NODE_NAME = emqxdebug@127.0.0.1
 .PHONY: all
 all: compile
 
+.PHONY: dialyzer
+dialyzer:
+	@rebar3 dialyzer
+
 .PHONY: tests
 tests: eunit ct
 

+ 6 - 5
include/emqx.hrl

@@ -99,7 +99,7 @@
           node_id        :: trie_node_id(),
           edge_count = 0 :: non_neg_integer(),
           topic          :: binary() | undefined,
-          flags          :: list(atom())
+          flags          :: list(atom()) | undefined
         }).
 
 -record(trie_edge, {
@@ -121,7 +121,8 @@
           severity  :: notice | warning | error | critical,
           title     :: iolist(),
           summary   :: iolist(),
-          timestamp :: erlang:timestamp()
+          %% Timestamp (Unit: millisecond)
+          timestamp :: integer() | undefined
         }).
 
 %%--------------------------------------------------------------------
@@ -130,11 +131,11 @@
 
 -record(plugin, {
           name           :: atom(),
-          dir            :: string(),
+          dir            :: string() | undefined,
           descr          :: string(),
-          vendor         :: string(),
+          vendor         :: string() | undefined,
           active = false :: boolean(),
-          info           :: map(),
+          info   = #{}   :: map(),
           type           :: atom()
         }).
 

+ 2 - 0
include/types.hrl

@@ -22,3 +22,5 @@
 
 -type(ok_or_error(Value, Reason) :: {ok, Value} | {error, Reason}).
 
+-type(mfargs() :: {module(), atom(), [term()]}).
+

+ 5 - 3
src/emqx.erl

@@ -146,7 +146,7 @@ unsubscribe(Topic) ->
 -spec(topics() -> list(emqx_topic:topic())).
 topics() -> emqx_router:topics().
 
--spec(subscribers(emqx_topic:topic() | string()) -> list(emqx_types:subscriber())).
+-spec(subscribers(emqx_topic:topic() | string()) -> [pid()]).
 subscribers(Topic) ->
     emqx_broker:subscribers(iolist_to_binary(Topic)).
 
@@ -168,7 +168,9 @@ subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) ->
 hook(HookPoint, Action) ->
     emqx_hooks:add(HookPoint, Action).
 
--spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter() | integer())
+-spec(hook(emqx_hooks:hookpoint(),
+           emqx_hooks:action(),
+           emqx_hooks:filter() | integer() | list())
       -> ok | {error, already_exists}).
 hook(HookPoint, Action, Priority) when is_integer(Priority) ->
     emqx_hooks:add(HookPoint, Action, Priority);
@@ -182,7 +184,7 @@ hook(HookPoint, Action, InitArgs) when is_list(InitArgs) ->
 hook(HookPoint, Action, Filter, Priority) ->
     emqx_hooks:add(HookPoint, Action, Filter, Priority).
 
--spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok).
+-spec(unhook(emqx_hooks:hookpoint(), function() | {module(), atom()}) -> ok).
 unhook(HookPoint, Action) ->
     emqx_hooks:del(HookPoint, Action).
 

+ 1 - 1
src/emqx_boot.erl

@@ -20,7 +20,7 @@
 
 -define(BOOT_MODULES, [router, broker, listeners]).
 
--spec(is_enabled(all|list(router|broker|listeners)) -> boolean()).
+-spec(is_enabled(all|router|broker|listeners) -> boolean()).
 is_enabled(Mod) ->
     (BootMods = boot_modules()) =:= all orelse lists:member(Mod, BootMods).
 

+ 10 - 8
src/emqx_broker.erl

@@ -21,6 +21,7 @@
 -include("emqx.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include("emqx_mqtt.hrl").
 
 -logger_header("[Broker]").
 
@@ -118,12 +119,13 @@ subscribe(Topic) when is_binary(Topic) ->
 
 -spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
 subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
-    subscribe(Topic, SubId, #{qos => 0});
+    subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
 subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
     subscribe(Topic, undefined, SubOpts).
 
 -spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
-subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) ->
+subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts0) ->
+    SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
     case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of
         false -> %% New
             ok = emqx_broker_helper:register_sub(SubPid, SubId),
@@ -215,9 +217,8 @@ safe_publish(Msg) when is_record(Msg, message) ->
     catch
         _:Error:Stk->
             ?LOG(error, "Publish error: ~0p~n~s~n~0p",
-                 [Error, emqx_message:format(Msg), Stk])
-    after
-        []
+                 [Error, emqx_message:format(Msg), Stk]),
+            []
     end.
 
 -compile({inline, [delivery/1]}).
@@ -283,7 +284,7 @@ forward(Node, To, Delivery, sync) ->
 dispatch(Topic, #delivery{message = Msg}) ->
     case subscribers(Topic) of
         [] -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
-              ok = inc_dropped_cnt(Topic),
+              ok = inc_dropped_cnt(Msg),
               {error, no_subscribers};
         [Sub] -> %% optimize?
             dispatch(Sub, Topic, Msg);
@@ -322,7 +323,8 @@ inc_dropped_cnt(Msg) ->
     end.
 
 -compile({inline, [subscribers/1]}).
--spec(subscribers(emqx_topic:topic()) -> [pid()]).
+-spec(subscribers(emqx_topic:topic() | {shard, emqx_topic:topic(), non_neg_integer()})
+      -> [pid()]).
 subscribers(Topic) when is_binary(Topic) ->
     lookup_value(?SUBSCRIBER, Topic, []);
 subscribers(Shard = {shard, _Topic, _I})  ->
@@ -367,7 +369,7 @@ subscriptions(SubId) ->
         undefined -> []
     end.
 
--spec(subscribed(pid(), emqx_topic:topic()) -> boolean()).
+-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic()) -> boolean()).
 subscribed(SubPid, Topic) when is_pid(SubPid) ->
     ets:member(?SUBOPTION, {SubPid, Topic});
 subscribed(SubId, Topic) when ?is_subid(SubId) ->

+ 1 - 1
src/emqx_broker_helper.erl

@@ -75,7 +75,7 @@ register_sub(SubPid, SubId) when is_pid(SubPid) ->
 lookup_subid(SubPid) when is_pid(SubPid) ->
     emqx_tables:lookup_value(?SUBMON, SubPid).
 
--spec(lookup_subpid(emqx_types:subid()) -> pid()).
+-spec(lookup_subpid(emqx_types:subid()) -> maybe(pid())).
 lookup_subpid(SubId) ->
     emqx_tables:lookup_value(?SUBID, SubId).
 

+ 7 - 6
src/emqx_channel.erl

@@ -62,9 +62,9 @@
           %% MQTT ClientInfo
           clientinfo :: emqx_types:clientinfo(),
           %% MQTT Session
-          session :: emqx_session:session(),
+          session :: maybe(emqx_session:session()),
           %% Keepalive
-          keepalive :: emqx_keepalive:keepalive(),
+          keepalive :: maybe(emqx_keepalive:keepalive()),
           %% MQTT Will Msg
           will_msg :: maybe(emqx_types:message()),
           %% MQTT Topic Aliases
@@ -108,6 +108,8 @@
 
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
 
+-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
+
 %%--------------------------------------------------------------------
 %% Info, Attrs and Caps
 %%--------------------------------------------------------------------
@@ -818,7 +820,8 @@ return_unsuback(Packet, Channel) ->
 
 -spec(handle_call(Req :: term(), channel())
       -> {reply, Reply :: term(), channel()}
-       | {shutdown, Reason :: term(), Reply :: term(), channel()}).
+       | {shutdown, Reason :: term(), Reply :: term(), channel()}
+       | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}).
 handle_call(kick, Channel) ->
     Channel1 = ensure_disconnected(kicked, Channel),
     disconnect_and_shutdown(kicked, ok, Channel1);
@@ -936,9 +939,6 @@ handle_timeout(_TRef, retry_delivery,
     case emqx_session:retry(Session) of
         {ok, NSession} ->
             {ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
-        {ok, Publishes, NSession} ->
-            NChannel = Channel#channel{session = NSession},
-            handle_out(publish, Publishes, reset_timer(retry_timer, NChannel));
         {ok, Publishes, Timeout, NSession} ->
             NChannel = Channel#channel{session = NSession},
             handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
@@ -1013,6 +1013,7 @@ interval(will_timer, #channel{will_msg = WillMsg}) ->
 %% Terminate
 %%--------------------------------------------------------------------
 
+-spec(terminate(any(), channel()) -> ok).
 terminate(_, #channel{conn_state = idle}) -> ok;
 terminate(normal, Channel) ->
     run_terminate_hook(normal, Channel);

+ 7 - 6
src/emqx_cm.erl

@@ -107,7 +107,7 @@ start_link() ->
 register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) ->
     Chan = {ClientId, ChanPid = self()},
     true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
-    register_channel(ClientId, ChanPid, ConnInfo);
+    register_channel_(ClientId, ChanPid, ConnInfo).
 
 %% @private
 %% @doc Register a channel with pid and conn_mod.
@@ -117,7 +117,7 @@ register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) ->
 %% the conn_mod first for taking up the clientid access right.
 %%
 %% Note that: It should be called on a lock transaction
-register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
+register_channel_(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
     Chan = {ClientId, ChanPid},
     true = ets:insert(?CHAN_TAB, Chan),
     true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
@@ -211,7 +211,7 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
     CleanStart = fun(_) ->
                      ok = discard_session(ClientId),
                      Session = create_session(ClientInfo, ConnInfo),
-                     register_channel(ClientId, Self, ConnInfo),
+                     register_channel_(ClientId, Self, ConnInfo),
                      {ok, #{session => Session, present => false}}
                  end,
     emqx_cm_locker:trans(ClientId, CleanStart);
@@ -223,13 +223,13 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
                           {ok, ConnMod, ChanPid, Session} ->
                               ok = emqx_session:resume(ClientInfo, Session),
                               Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
-                              register_channel(ClientId, Self, ConnInfo),
+                              register_channel_(ClientId, Self, ConnInfo),
                               {ok, #{session  => Session,
                                      present  => true,
                                      pendings => Pendings}};
                           {error, not_found} ->
                               Session = create_session(ClientInfo, ConnInfo),
-                              register_channel(ClientId, Self, ConnInfo),
+                              register_channel_(ClientId, Self, ConnInfo),
                               {ok, #{session => Session, present => false}}
                       end
                   end,
@@ -243,7 +243,8 @@ create_session(ClientInfo, ConnInfo) ->
 
 %% @doc Try to takeover a session.
 -spec(takeover_session(emqx_types:clientid())
-      -> {ok, emqx_session:session()} | {error, Reason :: term()}).
+      -> {error, term()}
+       | {ok, atom(), pid(), emqx_session:session()}).
 takeover_session(ClientId) ->
     case lookup_channels(ClientId) of
         [] -> {error, not_found};

+ 7 - 0
src/emqx_connection.erl

@@ -103,6 +103,13 @@
 
 -define(ENABLED(X), (X =/= undefined)).
 
+-dialyzer({no_match, [info/2]}).
+-dialyzer({nowarn_function, [ init/4
+                            , init_state/3
+                            , run_loop/2
+                            , system_terminate/4
+                            ]}).
+
 -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
       -> {ok, pid()}).
 start_link(Transport, Socket, Options) ->

+ 1 - 1
src/emqx_ctl.erl

@@ -159,7 +159,7 @@ format(Msg) ->
 format(Format, Args) ->
     lists:flatten(io_lib:format(Format, Args)).
 
--spec(format_usage([cmd_usage()]) -> ok).
+-spec(format_usage([cmd_usage()]) -> [string()]).
 format_usage(UsageList) ->
     lists:map(
         fun({CmdParams, Desc}) ->

+ 5 - 3
src/emqx_frame.erl

@@ -42,10 +42,10 @@
                      version => emqx_types:version()
                     }).
 
--opaque(parse_state() :: {none, options()} | cont_fun()).
+-type(parse_state() :: {none, options()} | cont_fun()).
 
--opaque(parse_result() :: {more, cont_fun()}
-                        | {ok, emqx_types:packet(), binary(), parse_state()}).
+-type(parse_result() :: {more, cont_fun()}
+                      | {ok, emqx_types:packet(), binary(), parse_state()}).
 
 -type(cont_fun() :: fun((binary()) -> parse_result())).
 
@@ -59,6 +59,8 @@
           version     => ?MQTT_PROTO_V4
          }).
 
+-dialyzer({no_match, [serialize_utf8_string/2]}).
+
 %%--------------------------------------------------------------------
 %% Init Parse State
 %%--------------------------------------------------------------------

+ 0 - 13
src/emqx_gen_mod.erl

@@ -16,21 +16,8 @@
 
 -module(emqx_gen_mod).
 
--ifdef(use_specs).
-
 -callback(load(Opts :: any()) -> ok | {error, term()}).
 
 -callback(unload(State :: term()) -> term()).
 
 -callback(description() -> any()).
-
--else.
-
--export([behaviour_info/1]).
-
-behaviour_info(callbacks) ->
-    [{load, 1}, {unload, 1}];
-behaviour_info(_Other) ->
-    undefined.
-
--endif.

+ 4 - 4
src/emqx_hooks.erl

@@ -60,12 +60,12 @@
 %%     equal priority values.
 
 -type(hookpoint() :: atom()).
--type(action() :: function() | mfa()).
--type(filter() :: function() | mfa()).
+-type(action() :: function() | {function(), [term()]} | mfargs()).
+-type(filter() :: function() | mfargs()).
 
 -record(callback, {
           action :: action(),
-          filter :: filter(),
+          filter :: maybe(filter()),
           priority :: integer()
          }).
 
@@ -112,7 +112,7 @@ add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
     add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
 
 %% @doc Unregister a callback.
--spec(del(hookpoint(), action()) -> ok).
+-spec(del(hookpoint(), function() | {module(), atom()}) -> ok).
 del(HookPoint, Action) ->
     gen_server:cast(?SERVER, {del, HookPoint, Action}).
 

+ 1 - 1
src/emqx_logger.erl

@@ -221,7 +221,7 @@ trans([{eof, L} | AST], LogHeader, ResAST) ->
 trans([{attribute, _, module, _Mod} = M | AST], Header, ResAST) ->
     trans(AST, Header, [export_header_fun(), M | ResAST]);
 trans([{attribute, _, logger_header, Header} | AST], _, ResAST) ->
-    io_lib:printable_list(Header) orelse error({invalid_string, Header}),
+    io_lib:printable_list(Header) orelse erlang:error({invalid_string, Header}),
     trans(AST, Header, ResAST);
 trans([F | AST], LogHeader, ResAST) ->
     trans(AST, LogHeader, [F | ResAST]).

+ 1 - 3
src/emqx_logger_formatter.erl

@@ -253,9 +253,7 @@ format_time(SysTime,#{})
     {Date, _Time = {H, Mi, S}} = calendar:system_time_to_local_time(SysTime, microsecond),
     format_time({Date, {H, Mi, S, Ms}}).
 format_time({{Y, M, D}, {H, Mi, S, Ms}}) ->
-    io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]);
-format_time({{Y, M, D}, {H, Mi, S}}) ->
-    io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", [Y, M, D, H, Mi, S]).
+    io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]).
 
 format_mfa({M,F,A},_) when is_atom(M), is_atom(F), is_integer(A) ->
     atom_to_list(M)++":"++atom_to_list(F)++"/"++integer_to_list(A);

+ 1 - 3
src/emqx_message.erl

@@ -278,7 +278,7 @@ to_map(#message{
      }.
 
 %% @doc Message to tuple list
--spec(to_list(emqx_types:message()) -> map()).
+-spec(to_list(emqx_types:message()) -> list()).
 to_list(Msg) ->
     lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))).
 
@@ -290,8 +290,6 @@ format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, h
     io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
                   [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
 
-format(_, undefined) ->
-    "";
 format(flags, Flags) ->
     io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
 format(headers, Headers) ->

+ 9 - 3
src/emqx_misc.erl

@@ -62,11 +62,17 @@ maybe_apply(_Fun, undefined) -> undefined;
 maybe_apply(Fun, Arg) when is_function(Fun) ->
     erlang:apply(Fun, [Arg]).
 
--spec(compose(list(F)) -> G when F :: fun((any()) -> any()),
-                                 G :: fun((any()) -> any())).
+-spec(compose(list(F)) -> G
+  when F :: fun((any()) -> any()),
+       G :: fun((any()) -> any())).
 compose([F|More]) -> compose(F, More).
 
--spec(compose(fun((X) -> Y), fun((Y) -> Z)) -> fun((X) -> Z)).
+-spec(compose(F, G|[Gs]) -> C
+  when F :: fun((X1) -> X2),
+       G :: fun((X2) -> X3),
+       Gs :: [fun((Xn) -> Xn1)],
+       C :: fun((X1) -> Xm),
+       X3 :: any(), Xn :: any(), Xn1 :: any(), Xm :: any()).
 compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end;
 compose(F, [G]) -> compose(F, G);
 compose(F, [G|More]) -> compose(compose(F, G), More).

+ 2 - 5
src/emqx_mod_acl_internal.erl

@@ -33,8 +33,6 @@
         , description/0
         ]).
 
--define(MFA(M, F, A), {M, F, A}).
-
 -type(acl_rules() :: #{publish   => [emqx_access_rule:rule()],
                        subscribe => [emqx_access_rule:rule()]}).
 
@@ -44,11 +42,10 @@
 
 load(_Env) ->
     Rules = rules_from_file(emqx:get_env(acl_file)),
-    emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]),  -1).
+    emqx_hooks:add('client.check_acl', {?MODULE, check_acl, [Rules]},  -1).
 
 unload(_Env) ->
-    Rules = rules_from_file(emqx:get_env(acl_file)),
-    emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])).
+    emqx_hooks:del('client.check_acl', {?MODULE, check_acl}).
 
 reload(_Env) ->
     emqx_acl_cache:is_enabled() andalso (

+ 3 - 11
src/emqx_mod_delayed.erl

@@ -61,7 +61,7 @@ load(_Env) ->
 
 -spec(unload(list()) -> ok).
 unload(_Env) ->
-    emqx:unhook('message.publish', {?MODULE, on_message_publish, []}),
+    emqx:unhook('message.publish', {?MODULE, on_message_publish}),
     emqx_mod_sup:stop_child(?MODULE).
 
 description() ->
@@ -83,21 +83,13 @@ on_message_publish(Msg = #message{id = Id, topic = <<"$delayed/", Topic/binary>>
                     end
             end,
     PubMsg = Msg#message{topic = Topic1},
-    Headers = case PubMsg#message.headers of
-        undefined -> #{};
-        Headers0 -> Headers0
-    end,
-    ok = store(#delayed_message{key = {PubAt, delayed_mid(Id)}, msg = PubMsg}),
+    Headers = PubMsg#message.headers,
+    ok = store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}),
     {stop, PubMsg#message{headers = Headers#{allow_publish => false}}};
 
 on_message_publish(Msg) ->
     {ok, Msg}.
 
-%% @private
-delayed_mid(undefined) ->
-    emqx_guid:gen();
-delayed_mid(MsgId) -> MsgId.
-
 %%--------------------------------------------------------------------
 %% Start delayed publish server
 %%--------------------------------------------------------------------

+ 6 - 6
src/emqx_mod_topic_metrics.erl

@@ -97,14 +97,14 @@
 
 load(_Env) ->
     emqx_mod_sup:start_child(?MODULE, worker),
-    emqx:hook('message.publish', fun ?MODULE:on_message_publish/1, []),
-    emqx:hook('message.dropped', fun ?MODULE:on_message_dropped/3, []),
-    emqx:hook('message.delivered', fun ?MODULE:on_message_delivered/2, []).
+    emqx:hook('message.publish',   {?MODULE, on_message_publish, []}),
+    emqx:hook('message.dropped',   {?MODULE, on_message_dropped, []}),
+    emqx:hook('message.delivered', {?MODULE, on_message_delivered, []}).
 
 unload(_Env) ->
-    emqx:unhook('message.publish', fun ?MODULE:on_message_publish/1),
-    emqx:unhook('message.dropped', fun ?MODULE:on_message_dropped/3),
-    emqx:unhook('message.delivered', fun ?MODULE:on_message_delivered/2),
+    emqx:unhook('message.publish',   {?MODULE, on_message_publish}),
+    emqx:unhook('message.dropped',   {?MODULE, on_message_dropped}),
+    emqx:unhook('message.delivered', {?MODULE, on_message_delivered}),
     emqx_mod_sup:stop_child(?MODULE).
 
 description() ->

+ 4 - 3
src/emqx_mqtt_caps.erl

@@ -71,8 +71,9 @@
                        }).
 
 -spec(check_pub(emqx_types:zone(),
-                #{qos => emqx_types:qos(),
-                  retain => boolean()})
+                #{qos := emqx_types:qos(),
+                  retain := boolean(),
+                  topic := emqx_topic:topic()})
       -> ok_or_error(emqx_types:reason_code())).
 check_pub(Zone, Flags) when is_map(Flags) ->
     do_check_pub(case maps:take(topic, Flags) of
@@ -156,4 +157,4 @@ with_env(Zone, Key, InitFun) ->
                      ok = emqx_zone:set_env(Zone, Key, Caps),
                      Caps;
         ZoneCaps  -> ZoneCaps
-    end.
+    end.

+ 14 - 17
src/emqx_mqtt_props.erl

@@ -128,24 +128,21 @@ name(16#29) -> 'Subscription-Identifier-Available';
 name(16#2A) -> 'Shared-Subscription-Available';
 name(Id)    -> error({unsupported_property, Id}).
 
--spec(filter(emqx_types:packet_type(), emqx_types:properties()|list())
+-spec(filter(emqx_types:packet_type(), emqx_types:properties())
       -> emqx_types:properties()).
-filter(PacketType, Props) when is_map(Props) ->
-    maps:from_list(filter(PacketType, maps:to_list(Props)));
-
-filter(PacketType, Props) when ?CONNECT =< PacketType,
-                               PacketType =< ?AUTH,
-                               is_list(Props) ->
-    Filter = fun(Name) ->
-                 case maps:find(id(Name), ?PROPS_TABLE) of
-                     {ok, {Name, _Type, 'ALL'}} ->
-                         true;
-                     {ok, {Name, _Type, AllowedTypes}} ->
-                         lists:member(PacketType, AllowedTypes);
-                     error -> false
-                 end
-             end,
-    [Prop || Prop = {Name, _} <- Props, Filter(Name)].
+filter(PacketType, Props) when is_map(Props),
+                               PacketType >= ?CONNECT,
+                               PacketType =< ?AUTH ->
+    F = fun(Name, _) ->
+            case maps:find(id(Name), ?PROPS_TABLE) of
+                {ok, {Name, _Type, 'ALL'}} ->
+                    true;
+                {ok, {Name, _Type, AllowedTypes}} ->
+                    lists:member(PacketType, AllowedTypes);
+                error -> false
+            end
+        end,
+    maps:filter(F, Props).
 
 -spec(validate(emqx_types:properties()) -> ok).
 validate(Props) when is_map(Props) ->

+ 1 - 1
src/emqx_mqueue.erl

@@ -101,7 +101,7 @@
           q          = ?PQUEUE:new()      :: pq()
          }).
 
--opaque(mqueue() :: #mqueue{}).
+-type(mqueue() :: #mqueue{}).
 
 -spec(init(options()) -> mqueue()).
 init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->

+ 0 - 3
src/emqx_os_mon.erl

@@ -144,9 +144,6 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
     NState =
     case emqx_vm:cpu_util() of %% TODO: should be improved?
         0 -> State#{timer := undefined};
-        {error, Reason} ->
-            ?LOG(error, "Failed to get cpu utilization: ~p", [Reason]),
-            ensure_check_timer(State);
         Busy when Busy / 100 >= CPUHighWatermark ->
             alarm_handler:set_alarm({cpu_high_watermark, Busy}),
             ensure_check_timer(State#{is_cpu_alarm_set := true});

+ 4 - 3
src/emqx_packet.erl

@@ -484,10 +484,11 @@ format_variable(#mqtt_packet_suback{packet_id = PacketId,
 format_variable(#mqtt_packet_unsuback{packet_id = PacketId}) ->
     io_lib:format("PacketId=~p", [PacketId]);
 
-format_variable(PacketId) when is_integer(PacketId) ->
-    io_lib:format("PacketId=~p", [PacketId]);
+format_variable(#mqtt_packet_auth{reason_code = ReasonCode}) ->
+    io_lib:format("ReasonCode=~p", [ReasonCode]);
 
-format_variable(undefined) -> undefined.
+format_variable(PacketId) when is_integer(PacketId) ->
+    io_lib:format("PacketId=~p", [PacketId]).
 
 format_password(undefined) -> undefined;
 format_password(_Password) -> '******'.

+ 5 - 0
src/emqx_plugins.erl

@@ -38,6 +38,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 -endif.
+
+-dialyzer({no_match, [ plugin_loaded/2
+                     , plugin_unloaded/2
+                     ]}).
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------

+ 4 - 2
src/emqx_pool_sup.erl

@@ -18,6 +18,8 @@
 
 -behaviour(supervisor).
 
+-include("types.hrl").
+
 -export([spec/1, spec/2]).
 
 -export([ start_link/0
@@ -46,12 +48,12 @@ spec(ChildId, Args) ->
 start_link() ->
     start_link(?POOL, random, {?POOL, start_link, []}).
 
--spec(start_link(atom() | tuple(), atom(), mfa())
+-spec(start_link(atom() | tuple(), atom(), mfargs())
       -> {ok, pid()} | {error, term()}).
 start_link(Pool, Type, MFA) ->
     start_link(Pool, Type, emqx_vm:schedulers(), MFA).
 
--spec(start_link(atom() | tuple(), atom(), pos_integer(), mfa())
+-spec(start_link(atom() | tuple(), atom(), pos_integer(), mfargs())
       -> {ok, pid()} | {error, term()}).
 start_link(Pool, Type, Size, MFA) ->
     supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]).

+ 2 - 0
src/emqx_router_helper.erl

@@ -53,6 +53,8 @@
 -define(ROUTING_NODE, emqx_routing_node).
 -define(LOCK, {?MODULE, cleanup_routes}).
 
+-dialyzer({nowarn_function, [cleanup_routes/1]}).
+
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------

+ 8 - 10
src/emqx_session.erl

@@ -123,7 +123,7 @@
           created_at :: pos_integer()
          }).
 
--opaque(session() :: #session{}).
+-type(session() :: #session{}).
 
 -type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}).
 
@@ -152,6 +152,7 @@
 
 -define(DEFAULT_BATCH_N, 1000).
 
+
 %%--------------------------------------------------------------------
 %% Init a Session
 %%--------------------------------------------------------------------
@@ -616,19 +617,16 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
 
 -spec(replay(session()) -> {ok, replies(), session()}).
 replay(Session = #session{inflight = Inflight}) ->
-    Pubs = replay(Inflight),
+    Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) ->
+                             {pubrel, PacketId};
+                        ({PacketId, {Msg, _Ts}}) ->
+                             {PacketId, emqx_message:set_flag(dup, true, Msg)}
+                     end, emqx_inflight:to_list(Inflight)),
     case dequeue(Session) of
         {ok, NSession} -> {ok, Pubs, NSession};
         {ok, More, NSession} ->
             {ok, lists:append(Pubs, More), NSession}
-    end;
-
-replay(Inflight) ->
-    lists:map(fun({PacketId, {pubrel, _Ts}}) ->
-                      {pubrel, PacketId};
-                 ({PacketId, {Msg, _Ts}}) ->
-                      {PacketId, emqx_message:set_flag(dup, true, Msg)}
-              end, emqx_inflight:to_list(Inflight)).
+    end.
 
 -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
 terminate(ClientInfo, discarded, Session) ->

+ 2 - 2
src/emqx_stats.erl

@@ -58,7 +58,7 @@
 -record(update, {name, countdown, interval, func}).
 
 -record(state, {
-          timer   :: reference(),
+          timer   :: maybe(reference()),
           updates :: [#update{}],
           tick_ms :: timeout()
          }).
@@ -159,7 +159,7 @@ setstat(Stat, Val) when is_integer(Val) ->
 
 %% @doc Set stats with max value.
 -spec(setstat(Stat :: atom(), MaxStat :: atom(),
-              Val :: pos_integer()) -> boolean()).
+              Val :: pos_integer()) -> ok).
 setstat(Stat, MaxStat, Val) when is_integer(Val) ->
     cast({setstat, Stat, MaxStat, Val}).
 

+ 5 - 12
src/emqx_sys.erl

@@ -19,6 +19,7 @@
 -behaviour(gen_server).
 
 -include("emqx.hrl").
+-include("types.hrl").
 -include("logger.hrl").
 
 -logger_header("[SYS]").
@@ -53,20 +54,12 @@
 -import(emqx_topic, [systop/1]).
 -import(emqx_misc, [start_timer/2]).
 
--type(timeref() :: reference()).
-
--type(tickeref() :: reference()).
-
--type(version() :: string()).
-
--type(sysdescr() :: string()).
-
 -record(state,
         { start_time :: erlang:timestamp()
-        , heartbeat  :: timeref()
-        , ticker     :: tickeref()
-        , version    :: version()
-        , sysdescr   :: sysdescr()
+        , heartbeat  :: maybe(reference())
+        , ticker     :: maybe(reference())
+        , version    :: binary()
+        , sysdescr   :: binary()
         }).
 
 -define(APP, emqx).

+ 6 - 4
src/emqx_sys_mon.erl

@@ -18,8 +18,8 @@
 
 -behavior(gen_server).
 
--include("logger.hrl").
 -include("types.hrl").
+-include("logger.hrl").
 
 -logger_header("[SYSMON]").
 
@@ -171,9 +171,11 @@ handle_partition_event({partition, {healed, _Node}}) ->
 
 suppress(Key, SuccFun, State = #{events := Events}) ->
     case lists:member(Key, Events) of
-        true  -> {noreply, State};
-        false -> SuccFun(),
-                 {noreply, State#{events := [Key|Events]}}
+        true ->
+            {noreply, State};
+        false ->
+            SuccFun(),
+            {noreply, State#{events := [Key|Events]}}
     end.
 
 procinfo(Pid) ->

+ 2 - 1
src/emqx_topic.erl

@@ -43,7 +43,7 @@
 -type(topic() :: binary()).
 -type(word() :: '' | '+' | '#' | binary()).
 -type(words() :: list(word())).
--opaque(triple() :: {root | binary(), word(), binary()}).
+-type(triple() :: {root | binary(), word(), binary()}).
 
 -define(MAX_TOPIC_LEN, 4096).
 
@@ -184,6 +184,7 @@ word(<<"#">>) -> '#';
 word(Bin)     -> Bin.
 
 %% @doc '$SYS' Topic.
+-spec(systop(atom()|string()|binary()) -> topic()).
 systop(Name) when is_atom(Name); is_list(Name) ->
     iolist_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name]));
 systop(Name) when is_binary(Name) ->

+ 3 - 1
src/emqx_tracer.erl

@@ -57,6 +57,8 @@
         L =:= info orelse
         L =:= debug).
 
+-dialyzer({nowarn_function, [install_trace_handler/3]}).
+
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
@@ -68,7 +70,7 @@ trace(publish, #message{from = From, topic = Topic, payload = Payload})
     emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~0p", [Topic, Payload]).
 
 %% @doc Start to trace clientid or topic.
--spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
+-spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}).
 start_trace(Who, all, LogFile) ->
     start_trace(Who, debug, LogFile);
 start_trace(Who, Level, LogFile) ->

+ 2 - 2
src/emqx_types.erl

@@ -136,7 +136,7 @@
                         is_bridge    := boolean(),
                         is_superuser := boolean(),
                         mountpoint   := maybe(binary()),
-                        ws_cookie    := maybe(list()),
+                        ws_cookie    => maybe(list()),
                         password     => maybe(binary()),
                         auth_result  => auth_result(),
                         anonymous    => boolean(),
@@ -201,7 +201,7 @@
 -type(caps() :: emqx_mqtt_caps:caps()).
 -type(attrs() :: #{atom() => term()}).
 -type(infos() :: #{atom() => term()}).
--type(stats() :: #{atom() => non_neg_integer()|stats()}).
+-type(stats() :: [{atom(), term()}]).
 
 -type(oom_policy() :: #{message_queue_len => non_neg_integer(),
                         max_heap_size => non_neg_integer()

+ 2 - 5
src/emqx_vm.erl

@@ -48,13 +48,13 @@
         , get_port_info/1
         ]).
 
+-export([cpu_util/0]).
+
 -ifdef(TEST).
 -compile(export_all).
 -compile(nowarn_export_all).
 -endif.
 
--export([cpu_util/0]).
-
 -define(UTIL_ALLOCATORS, [temp_alloc,
                           eheap_alloc,
                           binary_alloc,
@@ -408,9 +408,6 @@ port_info(PortTerm, specific) ->
                     []
             end,
     {specific, Props};
-port_info(PortTerm, Keys) when is_list(Keys) ->
-    Port = transform_port(PortTerm),
-    [erlang:port_info(Port, Key) || Key <- Keys];
 port_info(PortTerm, Key) when is_atom(Key) ->
     Port = transform_port(PortTerm),
     erlang:port_info(Port, Key).

+ 5 - 2
src/emqx_ws_connection.erl

@@ -63,7 +63,7 @@
           %% Simulate the active_n opt
           active_n :: pos_integer(),
           %% Limiter
-          limiter :: emqx_limiter:limiter(),
+          limiter :: maybe(emqx_limiter:limiter()),
           %% Limit Timer
           limit_timer :: maybe(reference()),
           %% Parse State
@@ -81,7 +81,7 @@
           %% Idle Timeout
           idle_timeout :: timeout(),
           %% Idle Timer
-          idle_timer :: reference()
+          idle_timer :: maybe(reference())
         }).
 
 -type(state() :: #state{}).
@@ -95,6 +95,9 @@
 
 -define(ENABLED(X), (X =/= undefined)).
 
+-dialyzer({no_match, [info/2]}).
+-dialyzer({nowarn_function, [websocket_init/1]}).
+
 %%--------------------------------------------------------------------
 %% Info, Stats
 %%--------------------------------------------------------------------

+ 1 - 1
src/emqx_zone.erl

@@ -180,7 +180,7 @@ enable_flapping_detect(Zone) ->
 ignore_loop_deliver(Zone) ->
     get_env(Zone, ignore_loop_deliver, false).
 
--spec(server_keepalive(zone()) -> pos_integer()).
+-spec(server_keepalive(zone()) -> maybe(pos_integer())).
 server_keepalive(Zone) ->
     get_env(Zone, server_keepalive).
 

+ 4 - 1
test/emqx_SUITE.erl

@@ -77,7 +77,10 @@ t_emqx_pubsub_api(_) ->
     ?assertEqual([self()], emqx:subscribers(Topic)),
     ?assertEqual([self()], emqx:subscribers(Topic1)),
     ?assertEqual([self()], emqx:subscribers(Topic2)),
-    ?assertEqual([{Topic,#{qos => 0,subid => ClientId}}, {Topic1,#{qos => 1,subid => ClientId}}, {Topic2,#{qos => 2,subid => ClientId}}], emqx:subscriptions(self())),
+
+    ?assertEqual([{Topic,  #{nl => 0, qos => 0, rap => 0, rh => 0, subid => ClientId}},
+                  {Topic1, #{nl => 0, qos => 1, rap => 0, rh => 0, subid => ClientId}},
+                  {Topic2, #{nl => 0, qos => 2, rap => 0, rh => 0, subid => ClientId}}], emqx:subscriptions(self())),
     ?assertEqual(true, emqx:subscribed(self(), Topic)),
     ?assertEqual(true, emqx:subscribed(ClientId, Topic)),
     ?assertEqual(true, emqx:subscribed(self(), Topic1)),

+ 13 - 7
test/emqx_broker_SUITE.erl

@@ -59,12 +59,18 @@ t_subopts(_) ->
     ?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)),
     emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
     timer:sleep(200),
-    ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
-    ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)),
+    ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
+                 emqx_broker:get_subopts(self(), <<"topic">>)),
+    ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
+                 emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)),
+
     emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}),
-    ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
-    ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 2})),
-    ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
+    ?assertEqual(#{nl => 0, qos => 2, rap => 0, rh => 0, subid => <<"clientid">>},
+                 emqx_broker:get_subopts(self(), <<"topic">>)),
+
+    ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})),
+    ?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>},
+                 emqx_broker:get_subopts(self(), <<"topic">>)),
     emqx_broker:unsubscribe(<<"topic">>).
 
 t_topics(_) ->
@@ -91,9 +97,9 @@ t_subscribers(_) ->
 t_subscriptions(_) ->
     emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
     ok = timer:sleep(100),
-    ?assertEqual(#{qos => 1, subid => <<"clientid">>},
+    ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
                  proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))),
-    ?assertEqual(#{qos => 1, subid => <<"clientid">>},
+    ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
                  proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))),
     emqx_broker:unsubscribe(<<"topic">>).
 

+ 1 - 2
test/emqx_vm_SUITE.erl

@@ -80,8 +80,7 @@ t_get_port_info(_Config) ->
     {ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]),
     emqx_vm:get_port_info(),
     ok = gen_tcp:close(Sock),
-    [Port | _] = erlang:ports(),
-    [{connected, _}, {name, _}] = emqx_vm:port_info(Port, [connected, name]).
+    [Port | _] = erlang:ports().
 
 t_transform_port(_Config) ->
     [Port | _] = erlang:ports(),