Просмотр исходного кода

Merge remote-tracking branch 'origin/develop'

zhanghongtong 6 лет назад
Родитель
Сommit
dc5799e4fd

+ 9 - 5
src/emqx_access_control.erl

@@ -34,9 +34,9 @@
 
 
 -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
 -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
 authenticate(ClientInfo = #{zone := Zone}) ->
 authenticate(ClientInfo = #{zone := Zone}) ->
-    case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
+    case run_hooks('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
         Result = #{auth_result := success} ->
         Result = #{auth_result := success} ->
-	        {ok, Result};
+            {ok, Result};
 	    Result ->
 	    Result ->
             {error, maps:get(auth_result, Result, unknown_error)}
             {error, maps:get(auth_result, Result, unknown_error)}
     end.
     end.
@@ -61,7 +61,7 @@ check_acl_cache(ClientInfo, PubSub, Topic) ->
 
 
 do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
 do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
     Default = emqx_zone:get_env(Zone, acl_nomatch, deny),
     Default = emqx_zone:get_env(Zone, acl_nomatch, deny),
-    case emqx_hooks:run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of
+    case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], Default) of
         allow  -> allow;
         allow  -> allow;
         _Other -> deny
         _Other -> deny
     end.
     end.
@@ -73,7 +73,11 @@ reload_acl() ->
 
 
 default_auth_result(Zone) ->
 default_auth_result(Zone) ->
     case emqx_zone:get_env(Zone, allow_anonymous, false) of
     case emqx_zone:get_env(Zone, allow_anonymous, false) of
-	    true  -> #{auth_result => success, anonymous => true};
-	    false -> #{auth_result => not_authorized, anonymous => false}
+        true  -> #{auth_result => success, anonymous => true};
+        false -> #{auth_result => not_authorized, anonymous => false}
     end.
     end.
 
 
+-compile({inline, [run_hooks/3]}).
+run_hooks(Name, Args, Acc) ->
+    ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
+

+ 3 - 3
src/emqx_broker.erl

@@ -230,7 +230,7 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
 -spec(route([emqx_types:route_entry()], emqx_types:delivery())
 -spec(route([emqx_types:route_entry()], emqx_types:delivery())
       -> emqx_types:publish_result()).
       -> emqx_types:publish_result()).
 route([], #delivery{message = Msg}) ->
 route([], #delivery{message = Msg}) ->
-    emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
+    ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
     ok = inc_dropped_cnt(Msg),
     ok = inc_dropped_cnt(Msg),
     [];
     [];
 
 
@@ -282,8 +282,8 @@ forward(Node, To, Delivery, sync) ->
 -spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
 -spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
 dispatch(Topic, #delivery{message = Msg}) ->
 dispatch(Topic, #delivery{message = Msg}) ->
     case subscribers(Topic) of
     case subscribers(Topic) of
-        [] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
-              _ = inc_dropped_cnt(Topic),
+        [] -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
+              ok = inc_dropped_cnt(Topic),
               {error, no_subscribers};
               {error, no_subscribers};
         [Sub] -> %% optimize?
         [Sub] -> %% optimize?
             dispatch(Sub, Topic, Msg);
             dispatch(Sub, Topic, Msg);

+ 95 - 79
src/emqx_channel.erl

@@ -45,7 +45,7 @@
         , terminate/2
         , terminate/2
         ]).
         ]).
 
 
-%% export for ct
+%% Exports for CT
 -export([set_field/3]).
 -export([set_field/3]).
 
 
 -import(emqx_misc,
 -import(emqx_misc,
@@ -204,9 +204,10 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connected}) ->
 
 
 handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
 handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
     case pipeline([fun enrich_conninfo/2,
     case pipeline([fun enrich_conninfo/2,
+                   fun run_conn_hooks/2,
                    fun check_connect/2,
                    fun check_connect/2,
                    fun enrich_client/2,
                    fun enrich_client/2,
-                   fun set_logger_meta/2,
+                   fun set_log_meta/2,
                    fun check_banned/2,
                    fun check_banned/2,
                    fun auth_connect/2
                    fun auth_connect/2
                   ], ConnPkt, Channel#channel{conn_state = connecting}) of
                   ], ConnPkt, Channel#channel{conn_state = connecting}) of
@@ -288,11 +289,13 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
 handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
 handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
           Channel = #channel{clientinfo = ClientInfo}) ->
           Channel = #channel{clientinfo = ClientInfo}) ->
     case emqx_packet:check(Packet) of
     case emqx_packet:check(Packet) of
-        ok -> TopicFilters1 = enrich_subid(Properties, parse_topic_filters(TopicFilters)),
-              TopicFilters2 = emqx_hooks:run_fold('client.subscribe',
-                                                  [ClientInfo, Properties],
-                                                  TopicFilters1),
-              {ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel),
+        ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
+              TopicFilters2 = enrich_subid(Properties, TopicFilters1),
+              TopicFilters3 = run_hooks('client.subscribe',
+                                        [ClientInfo, Properties],
+                                        TopicFilters2
+                                       ),
+              {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel),
               handle_out(suback, {PacketId, ReasonCodes}, NChannel);
               handle_out(suback, {PacketId, ReasonCodes}, NChannel);
         {error, ReasonCode} ->
         {error, ReasonCode} ->
             handle_out(disconnect, ReasonCode, Channel)
             handle_out(disconnect, ReasonCode, Channel)
@@ -301,9 +304,10 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
 handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
 handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
           Channel = #channel{clientinfo = ClientInfo}) ->
           Channel = #channel{clientinfo = ClientInfo}) ->
     case emqx_packet:check(Packet) of
     case emqx_packet:check(Packet) of
-        ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
-                                                  [ClientInfo, Properties],
-                                                  parse_topic_filters(TopicFilters)),
+        ok -> TopicFilters1 = run_hooks('client.unsubscribe',
+                                        [ClientInfo, Properties],
+                                        parse_topic_filters(TopicFilters)
+                                       ),
               {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
               {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
               handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
               handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
         {error, ReasonCode} ->
         {error, ReasonCode} ->
@@ -549,26 +553,24 @@ not_nacked({deliver, _Topic, Msg}) ->
        | {ok, replies(), channel()}
        | {ok, replies(), channel()}
        | {shutdown, Reason :: term(), channel()}
        | {shutdown, Reason :: term(), channel()}
        | {shutdown, Reason :: term(), replies(), channel()}).
        | {shutdown, Reason :: term(), replies(), channel()}).
-handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel) ->
+handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
     AckProps = run_fold([fun enrich_connack_caps/2,
     AckProps = run_fold([fun enrich_connack_caps/2,
                          fun enrich_server_keepalive/2,
                          fun enrich_server_keepalive/2,
                          fun enrich_assigned_clientid/2
                          fun enrich_assigned_clientid/2
                         ], #{}, Channel),
                         ], #{}, Channel),
-    AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
+    AckPacket = run_hooks('client.connack', [ConnInfo],
+                          ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps)),
     return_connack(AckPacket,
     return_connack(AckPacket,
                    ensure_keepalive(AckProps,
                    ensure_keepalive(AckProps,
                                     ensure_connected(ConnPkt, Channel)));
                                     ensure_connected(ConnPkt, Channel)));
 
 
-handle_out(connack, {ReasonCode, _ConnPkt},
-           Channel = #channel{conninfo   = ConnInfo,
-                              clientinfo = ClientInfo}) ->
-    ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
-    AckPacket = ?CONNACK_PACKET(
-                   case maps:get(proto_ver, ConnInfo) of
-                       ?MQTT_PROTO_V5 -> ReasonCode;
-                       _Other -> emqx_reason_codes:compat(connack, ReasonCode)
-                   end),
-    shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel);
+handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
+    AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
+                                    ?MQTT_PROTO_V5 -> ReasonCode;
+                                    _ -> emqx_reason_codes:compat(connack, ReasonCode)
+                                end),
+    AckPacket1 = run_hooks('client.connack', [ConnInfo], AckPacket),
+    shutdown(emqx_reason_codes:name(ReasonCode), AckPacket1, Channel);
 
 
 %% Optimize?
 %% Optimize?
 handle_out(publish, [], Channel) ->
 handle_out(publish, [], Channel) ->
@@ -625,10 +627,7 @@ handle_out(Type, Data, Channel) ->
 %% Return ConnAck
 %% Return ConnAck
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-return_connack(AckPacket, Channel = #channel{conninfo   = ConnInfo,
-                                             clientinfo = ClientInfo
-                                            }) ->
-    ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]),
+return_connack(AckPacket, Channel) ->
     Replies = [{event, connected}, {connack, AckPacket}],
     Replies = [{event, connected}, {connack, AckPacket}],
     case maybe_resume_session(Channel) of
     case maybe_resume_session(Channel) of
         ignore -> {ok, Replies, Channel};
         ignore -> {ok, Replies, Channel};
@@ -735,16 +734,18 @@ handle_call(Req, Channel) ->
 -spec(handle_info(Info :: term(), channel())
 -spec(handle_info(Info :: term(), channel())
       -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
       -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
 handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
 handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
-    TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
-                                        [ClientInfo, #{'Internal' => true}],
-                                        parse_topic_filters(TopicFilters)),
+    TopicFilters1 = run_hooks('client.subscribe',
+                              [ClientInfo, #{'Internal' => true}],
+                              parse_topic_filters(TopicFilters)
+                             ),
     {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
     {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
     {ok, NChannel};
     {ok, NChannel};
 
 
 handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
 handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
-    TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
-                                        [ClientInfo, #{'Internal' => true}],
-                                        parse_topic_filters(TopicFilters)),
+    TopicFilters1 = run_hooks('client.unsubscribe',
+                              [ClientInfo, #{'Internal' => true}],
+                              parse_topic_filters(TopicFilters)
+                             ),
     {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
     {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
     {ok, NChannel};
     {ok, NChannel};
 
 
@@ -754,13 +755,12 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
     shutdown(Reason, Channel);
     shutdown(Reason, Channel);
 
 
-handle_info({sock_closed, Reason},
-            Channel = #channel{conn_state = connected,
-                               clientinfo = ClientInfo = #{zone := Zone}}) ->
+handle_info({sock_closed, Reason}, Channel =
+            #channel{conn_state = connected,
+                     clientinfo = ClientInfo = #{zone := Zone}}) ->
     emqx_zone:enable_flapping_detect(Zone)
     emqx_zone:enable_flapping_detect(Zone)
         andalso emqx_flapping:detect(ClientInfo),
         andalso emqx_flapping:detect(ClientInfo),
-    Channel1 = ensure_disconnected(
-                 mabye_publish_will_msg(Channel)),
+    Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
     case maybe_shutdown(Reason, Channel1) of
     case maybe_shutdown(Reason, Channel1) of
         {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
         {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
         Shutdown -> Shutdown
         Shutdown -> Shutdown
@@ -786,9 +786,11 @@ handle_info(Info, Channel) ->
       -> {ok, channel()}
       -> {ok, channel()}
        | {ok, replies(), channel()}
        | {ok, replies(), channel()}
        | {shutdown, Reason :: term(), channel()}).
        | {shutdown, Reason :: term(), channel()}).
-handle_timeout(TRef, {keepalive, StatVal},
-               Channel = #channel{keepalive = Keepalive,
-                                  timers = #{alive_timer := TRef}}) ->
+handle_timeout(_TRef, {keepalive, _StatVal},
+               Channel = #channel{keepalive = undefined}) ->
+    {ok, Channel};
+handle_timeout(_TRef, {keepalive, StatVal},
+               Channel = #channel{keepalive = Keepalive}) ->
     case emqx_keepalive:check(StatVal, Keepalive) of
     case emqx_keepalive:check(StatVal, Keepalive) of
         {ok, NKeepalive} ->
         {ok, NKeepalive} ->
             NChannel = Channel#channel{keepalive = NKeepalive},
             NChannel = Channel#channel{keepalive = NKeepalive},
@@ -797,9 +799,8 @@ handle_timeout(TRef, {keepalive, StatVal},
             handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
             handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
     end;
     end;
 
 
-handle_timeout(TRef, retry_delivery,
-               Channel = #channel{session = Session,
-                                  timers = #{retry_timer := TRef}}) ->
+handle_timeout(_TRef, retry_delivery,
+               Channel = #channel{session = Session}) ->
     case emqx_session:retry(Session) of
     case emqx_session:retry(Session) of
         {ok, NSession} ->
         {ok, NSession} ->
             {ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
             {ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
@@ -811,9 +812,8 @@ handle_timeout(TRef, retry_delivery,
             handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
             handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
     end;
     end;
 
 
-handle_timeout(TRef, expire_awaiting_rel,
-               Channel = #channel{session = Session,
-                                  timers = #{await_timer := TRef}}) ->
+handle_timeout(_TRef, expire_awaiting_rel,
+               Channel = #channel{session = Session}) ->
     case emqx_session:expire(awaiting_rel, Session) of
     case emqx_session:expire(awaiting_rel, Session) of
         {ok, Session} ->
         {ok, Session} ->
             {ok, clean_timer(await_timer, Channel#channel{session = Session})};
             {ok, clean_timer(await_timer, Channel#channel{session = Session})};
@@ -821,11 +821,10 @@ handle_timeout(TRef, expire_awaiting_rel,
             {ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})}
             {ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})}
     end;
     end;
 
 
-handle_timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) ->
+handle_timeout(_TRef, expire_session, Channel) ->
     shutdown(expired, Channel);
     shutdown(expired, Channel);
 
 
-handle_timeout(TRef, will_message, Channel = #channel{will_msg = WillMsg,
-                                                      timers = #{will_timer := TRef}}) ->
+handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
     (WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
     (WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
     {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
     {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
 
 
@@ -879,19 +878,19 @@ interval(will_timer, #channel{will_msg = WillMsg}) ->
 %% Terminate
 %% Terminate
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-terminate(_, #channel{conn_state = idle}) ->
-    ok;
-terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
-    ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
-terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, clientinfo = ClientInfo})
-    when Reason =:= kicked orelse Reason =:= discarded orelse Reason =:= takeovered ->
-    ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]);
-terminate(Reason, #channel{conninfo = ConnInfo, clientinfo = ClientInfo, will_msg = WillMsg}) ->
-    case WillMsg of
-        undefined -> ok;
-        _ -> publish_will_msg(WillMsg)
-    end,
-    ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]).
+terminate(_, #channel{conn_state = idle}) -> ok;
+terminate(normal, Channel) ->
+    run_terminate_hook(normal, Channel);
+terminate({shutdown, Reason}, Channel)
+  when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered ->
+    run_terminate_hook(Reason, Channel);
+terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
+    (WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
+    run_terminate_hook(Reason, Channel).
+
+run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
+run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
+    emqx_session:terminate(ClientInfo, Reason, Session).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
@@ -940,6 +939,15 @@ expiry_interval(_ClientInfo, #mqtt_packet_connect{clean_start = true}) ->
 receive_maximum(#{zone := Zone}, ConnProps) ->
 receive_maximum(#{zone := Zone}, ConnProps) ->
     emqx_mqtt_props:get('Receive-Maximum', ConnProps, emqx_zone:max_inflight(Zone)).
     emqx_mqtt_props:get('Receive-Maximum', ConnProps, emqx_zone:max_inflight(Zone)).
 
 
+%%--------------------------------------------------------------------
+%% Run Connect Hooks
+
+run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
+    case run_hooks('client.connect', [ConnInfo], ConnPkt) of
+        Error = {error, _Reason} -> Error;
+        NConnPkt -> {ok, NConnPkt, Channel}
+    end.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Check Connect Packet
 %% Check Connect Packet
 
 
@@ -987,9 +995,9 @@ fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) ->
     {ok, ClientInfo#{mountpoint := MountPoint1}}.
     {ok, ClientInfo#{mountpoint := MountPoint1}}.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-%% Set logger metadata
+%% Set log metadata
 
 
-set_logger_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
+set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
     emqx_logger:set_metadata_clientid(ClientId).
     emqx_logger:set_metadata_clientid(ClientId).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -1172,6 +1180,19 @@ enrich_assigned_clientid(AckProps, #channel{conninfo   = ConnInfo,
         _Origin -> AckProps
         _Origin -> AckProps
     end.
     end.
 
 
+%%--------------------------------------------------------------------
+%% Ensure connected
+
+ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo,
+                                             clientinfo = ClientInfo}) ->
+    NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
+    ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
+    Channel#channel{conninfo   = NConnInfo,
+                    conn_state = connected,
+                    will_msg   = emqx_packet:will_msg(ConnPkt),
+                    alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
+                   }.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Init Alias Maximum
 %% Init Alias Maximum
 
 
@@ -1183,20 +1204,6 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver  = ?MQTT_PROTO_V5,
      };
      };
 init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
 init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
 
 
-%%--------------------------------------------------------------------
-%% Ensure connected
-
-ensure_connected(ConnPkt, Channel = #channel{conninfo   = ConnInfo,
-                                             clientinfo = ClientInfo}) ->
-    WillMsg = emqx_packet:will_msg(ConnPkt),
-    AliasMaximum = init_alias_maximum(ConnPkt, ClientInfo),
-    NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
-    Channel#channel{conninfo   = NConnInfo,
-                    will_msg   = WillMsg,
-                    conn_state = connected,
-                    alias_maximum = AliasMaximum
-                   }.
-
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Enrich Keepalive
 %% Enrich Keepalive
 
 
@@ -1255,8 +1262,10 @@ parse_topic_filters(TopicFilters) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Ensure disconnected
 %% Ensure disconnected
 
 
-ensure_disconnected(Channel = #channel{conninfo = ConnInfo}) ->
+ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
+                                               clientinfo = ClientInfo}) ->
     NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(second)},
     NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(second)},
+    ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
     Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
     Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -1286,6 +1295,13 @@ disconnect_reason(ReasonCode)  -> emqx_reason_codes:name(ReasonCode).
 %% Helper functions
 %% Helper functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+-compile({inline, [run_hooks/2, run_hooks/3]}).
+run_hooks(Name, Args) ->
+    ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
+
+run_hooks(Name, Args, Acc) ->
+    ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
+
 -compile({inline, [find_alias/2, save_alias/3]}).
 -compile({inline, [find_alias/2, save_alias/3]}).
 
 
 find_alias(_AliasId, undefined) -> false;
 find_alias(_AliasId, undefined) -> false;

+ 9 - 3
src/emqx_cm.erl

@@ -206,7 +206,7 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
 open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
 open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
     CleanStart = fun(_) ->
     CleanStart = fun(_) ->
                      ok = discard_session(ClientId),
                      ok = discard_session(ClientId),
-                     Session = emqx_session:init(ClientInfo, ConnInfo),
+                     Session = create_session(ClientInfo, ConnInfo),
                      {ok, #{session => Session, present => false}}
                      {ok, #{session => Session, present => false}}
                  end,
                  end,
     emqx_cm_locker:trans(ClientId, CleanStart);
     emqx_cm_locker:trans(ClientId, CleanStart);
@@ -215,18 +215,24 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
     ResumeStart = fun(_) ->
     ResumeStart = fun(_) ->
                       case takeover_session(ClientId) of
                       case takeover_session(ClientId) of
                           {ok, ConnMod, ChanPid, Session} ->
                           {ok, ConnMod, ChanPid, Session} ->
-                              ok = emqx_session:resume(ClientId, Session),
+                              ok = emqx_session:resume(ClientInfo, Session),
                               Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
                               Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
                               {ok, #{session  => Session,
                               {ok, #{session  => Session,
                                      present  => true,
                                      present  => true,
                                      pendings => Pendings}};
                                      pendings => Pendings}};
                           {error, not_found} ->
                           {error, not_found} ->
-                              Session = emqx_session:init(ClientInfo, ConnInfo),
+                              Session = create_session(ClientInfo, ConnInfo),
                               {ok, #{session => Session, present => false}}
                               {ok, #{session => Session, present => false}}
                       end
                       end
                   end,
                   end,
     emqx_cm_locker:trans(ClientId, ResumeStart).
     emqx_cm_locker:trans(ClientId, ResumeStart).
 
 
+create_session(ClientInfo, ConnInfo) ->
+    Session = emqx_session:init(ClientInfo, ConnInfo),
+    ok = emqx_metrics:inc('session.created'),
+    ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
+    Session.
+
 %% @doc Try to takeover a session.
 %% @doc Try to takeover a session.
 -spec(takeover_session(emqx_types:clientid())
 -spec(takeover_session(emqx_types:clientid())
       -> {ok, emqx_session:session()} | {error, Reason :: term()}).
       -> {ok, emqx_session:session()} | {error, Reason :: term()}).

+ 4 - 4
src/emqx_connection.erl

@@ -459,17 +459,17 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle timeout
 %% Handle timeout
 
 
-handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
+handle_timeout(_TRef, idle_timeout, State) ->
     shutdown(idle_timeout, State);
     shutdown(idle_timeout, State);
 
 
-handle_timeout(TRef, limit_timeout, State = #state{limit_timer = TRef}) ->
+handle_timeout(_TRef, limit_timeout, State) ->
     NState = State#state{sockstate   = idle,
     NState = State#state{sockstate   = idle,
                          limit_timer = undefined
                          limit_timer = undefined
                         },
                         },
     handle_info(activate_socket, NState);
     handle_info(activate_socket, NState);
 
 
-handle_timeout(TRef, emit_stats, State =
-               #state{stats_timer = TRef, channel = Channel}) ->
+handle_timeout(_TRef, emit_stats, State =
+               #state{channel = Channel}) ->
     ClientId = emqx_channel:info(clientid, Channel),
     ClientId = emqx_channel:info(clientid, Channel),
     emqx_cm:set_chan_stats(ClientId, stats(State)),
     emqx_cm:set_chan_stats(ClientId, stats(State)),
     {ok, State#state{stats_timer = undefined}};
     {ok, State#state{stats_timer = undefined}};

+ 12 - 8
src/emqx_metrics.erl

@@ -157,7 +157,9 @@
 
 
 %% Client Lifecircle metrics
 %% Client Lifecircle metrics
 -define(CLIENT_METRICS,
 -define(CLIENT_METRICS,
-        [{counter, 'client.connected'},
+        [{counter, 'client.connect'},
+         {counter, 'client.connack'},
+         {counter, 'client.connected'},
          {counter, 'client.authenticate'},
          {counter, 'client.authenticate'},
          {counter, 'client.auth.anonymous'},
          {counter, 'client.auth.anonymous'},
          {counter, 'client.check_acl'},
          {counter, 'client.check_acl'},
@@ -512,13 +514,15 @@ reserved_idx('delivery.dropped.qos0_msg')    -> 121;
 reserved_idx('delivery.dropped.queue_full')  -> 122;
 reserved_idx('delivery.dropped.queue_full')  -> 122;
 reserved_idx('delivery.dropped.expired')     -> 123;
 reserved_idx('delivery.dropped.expired')     -> 123;
 
 
-reserved_idx('client.connected')             -> 200;
-reserved_idx('client.authenticate')          -> 201;
-reserved_idx('client.auth.anonymous')        -> 202;
-reserved_idx('client.check_acl')             -> 203;
-reserved_idx('client.subscribe')             -> 204;
-reserved_idx('client.unsubscribe')           -> 205;
-reserved_idx('client.disconnected')          -> 206;
+reserved_idx('client.connect')               -> 200;
+reserved_idx('client.connack')               -> 201;
+reserved_idx('client.connected')             -> 202;
+reserved_idx('client.authenticate')          -> 203;
+reserved_idx('client.auth.anonymous')        -> 204;
+reserved_idx('client.check_acl')             -> 205;
+reserved_idx('client.subscribe')             -> 206;
+reserved_idx('client.unsubscribe')           -> 207;
+reserved_idx('client.disconnected')          -> 208;
 
 
 reserved_idx('session.created')              -> 220;
 reserved_idx('session.created')              -> 220;
 reserved_idx('session.resumed')              -> 221;
 reserved_idx('session.resumed')              -> 221;

+ 40 - 29
src/emqx_mod_presence.erl

@@ -28,12 +28,12 @@
         , unload/1
         , unload/1
         ]).
         ]).
 
 
--export([ on_client_connected/4
+-export([ on_client_connected/3
         , on_client_disconnected/4
         , on_client_disconnected/4
         ]).
         ]).
 
 
 -ifdef(TEST).
 -ifdef(TEST).
--export([ reason/1 ]).
+-export([reason/1]).
 -endif.
 -endif.
 
 
 load(Env) ->
 load(Env) ->
@@ -44,26 +44,12 @@ unload(_Env) ->
     emqx_hooks:del('client.connected',    {?MODULE, on_client_connected}),
     emqx_hooks:del('client.connected',    {?MODULE, on_client_connected}),
     emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
     emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
 
 
-on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) ->
-    #{peerhost := PeerHost} = ClientInfo,
-    #{clean_start := CleanStart,
-      proto_name := ProtoName,
-      proto_ver := ProtoVer,
-      keepalive := Keepalive,
-      expiry_interval := ExpiryInterval} = ConnInfo,
-    ClientId = clientid(ClientInfo, ConnInfo),
-    Username = username(ClientInfo, ConnInfo),
-    Presence = #{clientid => ClientId,
-                 username => Username,
-                 ipaddress => ntoa(PeerHost),
-                 proto_name => ProtoName,
-                 proto_ver => ProtoVer,
-                 keepalive => Keepalive,
-                 connack => ConnAck,
-                 clean_start => CleanStart,
-                 expiry_interval => ExpiryInterval,
-                 ts => erlang:system_time(millisecond)
-                },
+%%--------------------------------------------------------------------
+%% Callbacks
+%%--------------------------------------------------------------------
+
+on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
+    Presence = connected_presence(ClientInfo, ConnInfo),
     case emqx_json:safe_encode(Presence) of
     case emqx_json:safe_encode(Presence) of
         {ok, Payload} ->
         {ok, Payload} ->
             emqx_broker:safe_publish(
             emqx_broker:safe_publish(
@@ -72,12 +58,12 @@ on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) ->
             ?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
             ?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
     end.
     end.
 
 
-on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
-    ClientId = clientid(ClientInfo, ConnInfo),
-    Username = username(ClientInfo, ConnInfo),
+on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
+                       Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
     Presence = #{clientid => ClientId,
     Presence = #{clientid => ClientId,
                  username => Username,
                  username => Username,
                  reason => reason(Reason),
                  reason => reason(Reason),
+                 disconnected_at => DisconnectedAt,
                  ts => erlang:system_time(millisecond)
                  ts => erlang:system_time(millisecond)
                 },
                 },
     case emqx_json:safe_encode(Presence) of
     case emqx_json:safe_encode(Presence) of
@@ -88,11 +74,35 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
             ?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
             ?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
     end.
     end.
 
 
-clientid(#{clientid := undefined}, #{clientid := ClientId}) -> ClientId;
-clientid(#{clientid := ClientId}, _ConnInfo) -> ClientId.
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
 
 
-username(#{username := undefined}, #{username := Username}) -> Username;
-username(#{username := Username}, _ConnInfo) -> Username.
+connected_presence(#{peerhost := PeerHost,
+                     sockport := SockPort,
+                     clientid := ClientId,
+                     username := Username
+                    },
+                   #{clean_start := CleanStart,
+                     proto_name := ProtoName,
+                     proto_ver := ProtoVer,
+                     keepalive := Keepalive,
+                     connected_at := ConnectedAt,
+                     expiry_interval := ExpiryInterval
+                    }) ->
+    #{clientid => ClientId,
+      username => Username,
+      ipaddress => ntoa(PeerHost),
+      sockport => SockPort,
+      proto_name => ProtoName,
+      proto_ver => ProtoVer,
+      keepalive => Keepalive,
+      connack => 0, %% Deprecated?
+      clean_start => CleanStart,
+      expiry_interval => ExpiryInterval,
+      connected_at => ConnectedAt,
+      ts => erlang:system_time(millisecond)
+     }.
 
 
 make_msg(QoS, Topic, Payload) ->
 make_msg(QoS, Topic, Payload) ->
     emqx_message:set_flag(
     emqx_message:set_flag(
@@ -106,6 +116,7 @@ topic(disconnected, ClientId) ->
 
 
 qos(Env) -> proplists:get_value(qos, Env, 0).
 qos(Env) -> proplists:get_value(qos, Env, 0).
 
 
+-compile({inline, [reason/1]}).
 reason(Reason) when is_atom(Reason) -> Reason;
 reason(Reason) when is_atom(Reason) -> Reason;
 reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
 reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
 reason({Error, _}) when is_atom(Error) -> Error;
 reason({Error, _}) when is_atom(Error) -> Error;

+ 4 - 5
src/emqx_mod_subscription.erl

@@ -27,7 +27,7 @@
         ]).
         ]).
 
 
 %% APIs
 %% APIs
--export([on_client_connected/4]).
+-export([on_client_connected/3]).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Load/Unload Hook
 %% Load/Unload Hook
@@ -36,8 +36,7 @@
 load(Topics) ->
 load(Topics) ->
     emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
     emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
 
 
-on_client_connected(#{clientid := ClientId,
-                      username := Username}, ?RC_SUCCESS, _ConnInfo, Topics) ->
+on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo, Topics) ->
     Replace = fun(Topic) ->
     Replace = fun(Topic) ->
                       rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
                       rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
               end,
               end,
@@ -47,9 +46,9 @@ on_client_connected(#{clientid := ClientId,
 unload(_) ->
 unload(_) ->
     emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
     emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
 
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 
 rep(<<"%c">>, ClientId, Topic) ->
 rep(<<"%c">>, ClientId, Topic) ->
     emqx_topic:feed_var(<<"%c">>, ClientId, Topic);
     emqx_topic:feed_var(<<"%c">>, ClientId, Topic);

+ 96 - 0
src/emqx_packet.erl

@@ -27,8 +27,10 @@
         , retain/1
         , retain/1
         ]).
         ]).
 
 
+%% Field APIs
 -export([ proto_name/1
 -export([ proto_name/1
         , proto_ver/1
         , proto_ver/1
+        , info/2
         ]).
         ]).
 
 
 %% Check API
 %% Check API
@@ -95,6 +97,100 @@ proto_ver(?CONNECT_PACKET(ConnPkt)) ->
 proto_ver(#mqtt_packet_connect{proto_ver = Ver}) ->
 proto_ver(#mqtt_packet_connect{proto_ver = Ver}) ->
     Ver.
     Ver.
 
 
+%%--------------------------------------------------------------------
+%% Field Info
+%%--------------------------------------------------------------------
+
+info(proto_name, #mqtt_packet_connect{proto_name = Name}) ->
+    Name;
+info(proto_ver, #mqtt_packet_connect{proto_ver = Ver}) ->
+    Ver;
+info(is_bridge, #mqtt_packet_connect{is_bridge = IsBridge}) ->
+    IsBridge;
+info(clean_start, #mqtt_packet_connect{clean_start = CleanStart}) ->
+    CleanStart;
+info(will_flag, #mqtt_packet_connect{will_flag = WillFlag}) ->
+    WillFlag;
+info(will_qos, #mqtt_packet_connect{will_qos = WillQoS}) ->
+    WillQoS;
+info(will_retain, #mqtt_packet_connect{will_retain = WillRetain}) ->
+    WillRetain;
+info(keepalive, #mqtt_packet_connect{keepalive = KeepAlive}) ->
+    KeepAlive;
+info(properties, #mqtt_packet_connect{properties = Props}) ->
+    Props;
+info(clientid, #mqtt_packet_connect{clientid = ClientId}) ->
+    ClientId;
+info(will_props, #mqtt_packet_connect{will_props = WillProps}) ->
+    WillProps;
+info(will_topic, #mqtt_packet_connect{will_topic = WillTopic}) ->
+    WillTopic;
+info(will_payload, #mqtt_packet_connect{will_payload = Payload}) ->
+    Payload;
+info(username, #mqtt_packet_connect{username = Username}) ->
+    Username;
+info(password, #mqtt_packet_connect{password = Password}) ->
+    Password;
+
+info(ack_flags, #mqtt_packet_connack{ack_flags = Flags}) ->
+    Flags;
+info(reason_code, #mqtt_packet_connack{reason_code = RC}) ->
+    RC;
+info(properties, #mqtt_packet_connack{properties = Props}) ->
+    Props;
+
+info(topic_name, #mqtt_packet_publish{topic_name = Topic}) ->
+    Topic;
+info(packet_id, #mqtt_packet_publish{packet_id = PacketId}) ->
+    PacketId;
+info(properties, #mqtt_packet_publish{properties = Props}) ->
+    Props;
+
+info(packet_id, #mqtt_packet_puback{packet_id = PacketId}) ->
+    PacketId;
+info(reason_code, #mqtt_packet_puback{reason_code = RC}) ->
+    RC;
+info(properties,  #mqtt_packet_puback{properties = Props}) ->
+    Props;
+
+info(packet_id, #mqtt_packet_subscribe{packet_id = PacketId}) ->
+    PacketId;
+info(properties, #mqtt_packet_subscribe{properties = Props}) ->
+    Props;
+info(topic_filters, #mqtt_packet_subscribe{topic_filters = Topics}) ->
+    Topics;
+
+info(packet_id, #mqtt_packet_suback{packet_id = PacketId}) ->
+    PacketId;
+info(properties, #mqtt_packet_suback{properties = Props}) ->
+    Props;
+info(reason_codes, #mqtt_packet_suback{reason_codes = RCs}) ->
+    RCs;
+
+info(packet_id, #mqtt_packet_unsubscribe{packet_id = PacketId}) ->
+    PacketId;
+info(properties, #mqtt_packet_unsubscribe{properties = Props}) ->
+    Props;
+info(topic_filters, #mqtt_packet_unsubscribe{topic_filters = Topics}) ->
+    Topics;
+
+info(packet_id, #mqtt_packet_unsuback{packet_id = PacketId}) ->
+    PacketId;
+info(properties, #mqtt_packet_unsuback{properties = Props}) ->
+    Props;
+info(reason_codes, #mqtt_packet_unsuback{reason_codes = RCs}) ->
+    RCs;
+
+info(reason_code, #mqtt_packet_disconnect{reason_code = RC}) ->
+    RC;
+info(properties, #mqtt_packet_disconnect{properties = Props}) ->
+    Props;
+
+info(reason_code, #mqtt_packet_auth{reason_code = RC}) ->
+    RC;
+info(properties, #mqtt_packet_auth{properties = Props}) ->
+    Props.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Check MQTT Packet
 %% Check MQTT Packet
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 18 - 3
src/emqx_session.erl

@@ -76,6 +76,7 @@
 -export([ deliver/2
 -export([ deliver/2
         , enqueue/2
         , enqueue/2
         , retry/1
         , retry/1
+        , terminate/3
         ]).
         ]).
 
 
 -export([ takeover/1
 -export([ takeover/1
@@ -604,11 +605,13 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
 takeover(#session{subscriptions = Subs}) ->
 takeover(#session{subscriptions = Subs}) ->
     lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)).
     lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)).
 
 
--spec(resume(emqx_types:clientid(), session()) -> ok).
-resume(ClientId, #session{subscriptions = Subs}) ->
+-spec(resume(emqx_types:clientinfo(), session()) -> ok).
+resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = Subs}) ->
     lists:foreach(fun({TopicFilter, SubOpts}) ->
     lists:foreach(fun({TopicFilter, SubOpts}) ->
                       ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
                       ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
-                  end, maps:to_list(Subs)).
+                  end, maps:to_list(Subs)),
+    ok = emqx_metrics:inc('session.resumed'),
+    emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
 
 
 -spec(replay(session()) -> {ok, replies(), session()}).
 -spec(replay(session()) -> {ok, replies(), session()}).
 replay(Session = #session{inflight = Inflight}) ->
 replay(Session = #session{inflight = Inflight}) ->
@@ -626,6 +629,18 @@ replay(Inflight) ->
                       {PacketId, emqx_message:set_flag(dup, true, Msg)}
                       {PacketId, emqx_message:set_flag(dup, true, Msg)}
               end, emqx_inflight:to_list(Inflight)).
               end, emqx_inflight:to_list(Inflight)).
 
 
+-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
+terminate(ClientInfo, discarded, Session) ->
+    run_hook('session.discarded', [ClientInfo, info(Session)]);
+terminate(ClientInfo, takeovered, Session) ->
+    run_hook('session.takeovered', [ClientInfo, info(Session)]);
+terminate(ClientInfo, Reason, Session) ->
+    run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
+
+-compile({inline, [run_hook/2]}).
+run_hook(Name, Args) ->
+    ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Inc message/delivery expired counter
 %% Inc message/delivery expired counter
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 6 - 1
test/emqx_connection_SUITE.erl

@@ -44,6 +44,10 @@ init_per_suite(Config) ->
     ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
     ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
     ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
     ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
     ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
     ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
+    %% Meck Hooks
+    ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
+    ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> {ok, Acc} end),
     Config.
     Config.
 
 
 end_per_suite(_Config) ->
 end_per_suite(_Config) ->
@@ -53,6 +57,7 @@ end_per_suite(_Config) ->
     ok = meck:unload(emqx_limiter),
     ok = meck:unload(emqx_limiter),
     ok = meck:unload(emqx_pd),
     ok = meck:unload(emqx_pd),
     ok = meck:unload(emqx_metrics),
     ok = meck:unload(emqx_metrics),
+    ok = meck:unload(emqx_hooks),
     ok.
     ok.
 
 
 init_per_testcase(_TestCase, Config) ->
 init_per_testcase(_TestCase, Config) ->
@@ -424,4 +429,4 @@ channel(InitFields) ->
               maps:merge(#{clientinfo => ClientInfo,
               maps:merge(#{clientinfo => ClientInfo,
                            session    => Session,
                            session    => Session,
                            conn_state => connected
                            conn_state => connected
-                          }, InitFields)).
+                          }, InitFields)).

+ 79 - 0
test/emqx_packet_SUITE.erl

@@ -78,6 +78,85 @@ t_proto_ver(_) ->
               ?assertEqual(Ver, emqx_packet:proto_ver(ConnPkt))
               ?assertEqual(Ver, emqx_packet:proto_ver(ConnPkt))
       end, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]).
       end, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]).
 
 
+t_connect_info(_) ->
+    ConnPkt = #mqtt_packet_connect{will_flag = true,
+                                   clientid = <<"clientid">>,
+                                   username = <<"username">>,
+                                   will_retain = true,
+                                   will_qos = ?QOS_2,
+                                   will_topic = <<"topic">>,
+                                   will_props = undefined,
+                                   will_payload = <<"payload">>
+                                  },
+    ?assertEqual(<<"MQTT">>, emqx_packet:info(proto_name, ConnPkt)),
+    ?assertEqual(4, emqx_packet:info(proto_ver, ConnPkt)),
+    ?assertEqual(false, emqx_packet:info(is_bridge, ConnPkt)),
+    ?assertEqual(true, emqx_packet:info(clean_start, ConnPkt)),
+    ?assertEqual(true, emqx_packet:info(will_flag, ConnPkt)),
+    ?assertEqual(?QOS_2, emqx_packet:info(will_qos, ConnPkt)),
+    ?assertEqual(true, emqx_packet:info(will_retain, ConnPkt)),
+    ?assertEqual(0, emqx_packet:info(keepalive, ConnPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, ConnPkt)),
+    ?assertEqual(<<"clientid">>, emqx_packet:info(clientid, ConnPkt)),
+    ?assertEqual(undefined, emqx_packet:info(will_props, ConnPkt)),
+    ?assertEqual(<<"topic">>, emqx_packet:info(will_topic, ConnPkt)),
+    ?assertEqual(<<"payload">>, emqx_packet:info(will_payload, ConnPkt)),
+    ?assertEqual(<<"username">>, emqx_packet:info(username, ConnPkt)),
+    ?assertEqual(undefined, emqx_packet:info(password, ConnPkt)).
+
+t_connack_info(_) ->
+    AckPkt = #mqtt_packet_connack{ack_flags = 0, reason_code = 0},
+    ?assertEqual(0, emqx_packet:info(ack_flags, AckPkt)),
+    ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)).
+
+t_publish_info(_) ->
+    PubPkt = #mqtt_packet_publish{topic_name = <<"t">>, packet_id = 1},
+    ?assertEqual(1, emqx_packet:info(packet_id, PubPkt)),
+    ?assertEqual(<<"t">>, emqx_packet:info(topic_name, PubPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, PubPkt)).
+
+t_puback_info(_) ->
+    AckPkt = #mqtt_packet_puback{packet_id = 1, reason_code = 0},
+    ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)),
+    ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)).
+
+t_subscribe_info(_) ->
+    TopicFilters = [{<<"t/#">>, #{}}],
+    SubPkt = #mqtt_packet_subscribe{packet_id = 1, topic_filters = TopicFilters},
+    ?assertEqual(1, emqx_packet:info(packet_id, SubPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, SubPkt)),
+    ?assertEqual(TopicFilters, emqx_packet:info(topic_filters, SubPkt)).
+
+t_suback_info(_) ->
+    SubackPkt = #mqtt_packet_suback{packet_id = 1, reason_codes = [0]},
+    ?assertEqual(1, emqx_packet:info(packet_id, SubackPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, SubackPkt)),
+    ?assertEqual([0], emqx_packet:info(reason_codes, SubackPkt)).
+
+t_unsubscribe_info(_) ->
+    UnsubPkt = #mqtt_packet_unsubscribe{packet_id = 1, topic_filters = [<<"t/#">>]},
+    ?assertEqual(1, emqx_packet:info(packet_id, UnsubPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, UnsubPkt)),
+    ?assertEqual([<<"t/#">>], emqx_packet:info(topic_filters, UnsubPkt)).
+
+t_unsuback_info(_) ->
+    AckPkt = #mqtt_packet_unsuback{packet_id = 1, reason_codes = [0]},
+    ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)),
+    ?assertEqual([0], emqx_packet:info(reason_codes, AckPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)).
+
+t_disconnect_info(_) ->
+    DisconnPkt = #mqtt_packet_disconnect{reason_code = 0},
+    ?assertEqual(0, emqx_packet:info(reason_code, DisconnPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, DisconnPkt)).
+
+t_auth_info(_) ->
+    AuthPkt = #mqtt_packet_auth{reason_code = 0},
+    ?assertEqual(0, emqx_packet:info(reason_code, AuthPkt)),
+    ?assertEqual(undefined, emqx_packet:info(properties, AuthPkt)).
+
 t_check_publish(_) ->
 t_check_publish(_) ->
     Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1},
     Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1},
     ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)),
     ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)),

+ 1 - 2
test/emqx_session_SUITE.erl

@@ -30,7 +30,6 @@ all() -> emqx_ct:all(?MODULE).
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    %% Broker
     ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker],
     ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker],
                   [passthrough, no_history, no_link]),
                   [passthrough, no_history, no_link]),
     ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
     ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
@@ -329,7 +328,7 @@ t_takeover(_) ->
 t_resume(_) ->
 t_resume(_) ->
     ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
     ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
     Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
     Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
-    ok = emqx_session:resume(<<"clientid">>, Session).
+    ok = emqx_session:resume(#{clientid => <<"clientid">>}, Session).
 
 
 t_replay(_) ->
 t_replay(_) ->
     Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
     Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],