Browse Source

feat(sys): support client events notification

JianBo He 4 years ago
parent
commit
ba6cfd595b
3 changed files with 235 additions and 36 deletions
  1. 45 16
      apps/emqx/etc/emqx.conf
  2. 53 11
      apps/emqx/src/emqx_schema.erl
  3. 137 9
      apps/emqx/src/emqx_sys.erl

+ 45 - 16
apps/emqx/etc/emqx.conf

@@ -1009,22 +1009,6 @@ zones.default {
 ## Broker
 ##==================================================================
 broker {
-  ## System interval of publishing $SYS messages.
-  ##
-  ## @doc broker.sys_msg_interval
-  ## ValueType: Duration | disabled
-  ## Default: 1m
-  sys_msg_interval = 1m
-
-  ## System heartbeat interval of publishing following heart beat message:
-  ##  - "$SYS/brokers/<node>/uptime"
-  ##  - "$SYS/brokers/<node>/datetime"
-  ##
-  ## @doc broker.sys_heartbeat_interval
-  ## ValueType: Duration
-  ## Default: 30s | disabled
-  sys_heartbeat_interval = 30s
-
   ## Session locking strategy in a cluster.
   ##
   ## @doc broker.session_locking_strategy
@@ -1095,6 +1079,51 @@ broker {
   perf.trie_compaction = true
 }
 
+##==================================================================
+## System Topic
+##==================================================================
+
+sys_topic {
+  ## System interval of publishing $SYS messages.
+  ##
+  ## @doc broker.sys_msg_interval
+  ## ValueType: Duration | disabled
+  ## Default: 1m
+  sys_msg_interval = 1m
+
+  ## System heartbeat interval of publishing following heart beat message:
+  ##  - "$SYS/brokers/<node>/uptime"
+  ##  - "$SYS/brokers/<node>/datetime"
+  ##
+  ## @doc broker.sys_heartbeat_interval
+  ## ValueType: Duration
+  ## Default: 30s | disabled
+  sys_heartbeat_interval = 30s
+
+  ## Whether to enable Client lifecycle event messages publish.
+  ## The following options are not only for enabling MQTT client event messages
+  ## publish but also for Gateway clients. However, these kinds of clients type
+  ## are distinguished by the Topic prefix:
+  ## - For the MQTT client, its event topic format is:
+  ##    $SYS/broker/<node>/clients/<clientid>/<event>
+  ## - For the Gateway client, it is
+  ##    $SYS/broker/<node>/gateway/<gateway-name>/clients/<clientid>/<event>
+  sys_event_messages {
+    ## Enable to publish client connected event messages.
+    ##  - Topic: "$SYS/broker/<node>/clients/<clientid>/connected"
+    client_connected = true
+    ## Enable to publish client disconnected event messages.
+    ##  - Topic: "$SYS/broker/<node>/clients/<clientid>/disconnected"
+    client_disconnected = true
+    ## Enable to publish event message that client subscribed a topic successfully.
+    ##  - Topic: "$SYS/broker/<node>/clients/<clientid>/subscribed"
+    client_subscribed = false
+    ## Enable to publish event message that client unsubscribed a topic successfully.
+    ##  - Topic: "$SYS/broker/<node>/clients/<clientid>/unsubscribed"
+    client_unsubscribed = false
+  }
+}
+
 ##==================================================================
 ## System Monitor
 ##==================================================================

+ 53 - 11
apps/emqx/src/emqx_schema.erl

@@ -144,6 +144,9 @@ roots(medium) ->
     [ {"broker",
        sc(ref("broker"),
          #{})}
+    , {"sys_topic",
+       sc(ref("sys_topic"),
+          #{})}
     , {"rate_limit",
        sc(ref("rate_limit"),
           #{})}
@@ -857,17 +860,7 @@ fields("deflate_opts") ->
     ];
 
 fields("broker") ->
-    [ {"sys_msg_interval",
-       sc(hoconsc:union([disabled, duration()]),
-          #{ default => "1m"
-           })
-      }
-    , {"sys_heartbeat_interval",
-       sc(hoconsc:union([disabled, duration()]),
-          #{ default => "30s"
-           })
-      }
-    , {"enable_session_registry",
+    [ {"enable_session_registry",
        sc(boolean(),
           #{ default => true
            })
@@ -909,6 +902,55 @@ fields("broker_perf") ->
            })}
     ];
 
+fields("sys_topic") ->
+    [ {"sys_msg_interval",
+       sc(hoconsc:union([disabled, duration()]),
+          #{ default => "1m"
+           })
+      }
+    , {"sys_heartbeat_interval",
+       sc(hoconsc:union([disabled, duration()]),
+          #{ default => "30s"
+           })
+      }
+    , {"sys_event_messages",
+       sc(ref("event_names"),
+          #{ desc =>
+           """Whether to enable Client lifecycle event messages publish.<br/>
+The following options are not only for enabling MQTT client event messages
+publish but also for Gateway clients. However, these kinds of clients type
+are distinguished by the Topic prefix:
+- For the MQTT client, its event topic format is:<br/>
+  <code>$SYS/broker/<node>/clients/<clientid>/<event></code><br/>
+- For the Gateway client, it is
+  <code>$SYS/broker/<node>/gateway/<gateway-name>/clients/<clientid>/<event></code>"""
+           })
+      }
+    ];
+
+fields("event_names") ->
+    [ {"client_connected",
+       sc(boolean(),
+          #{default => true
+           })
+      }
+    , {"client_disconnected",
+       sc(boolean(),
+          #{default => true
+           })
+      }
+    , {"client_subscribed",
+       sc(boolean(),
+          #{default => false
+           })
+      }
+    , {"client_unsubscribed",
+       sc(boolean(),
+          #{default => false
+           })
+      }
+    ];
+
 fields("sysmon") ->
     [ {"vm",
        sc(ref("sysmon_vm"),

+ 137 - 9
apps/emqx/src/emqx_sys.erl

@@ -43,6 +43,12 @@
         , terminate/2
         ]).
 
+-export([ on_client_connected/2
+        , on_client_disconnected/3
+        , on_client_subscribed/3
+        , on_client_unsubscribed/3
+        ]).
+
 -ifdef(TEST).
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -67,9 +73,9 @@
         , sysdescr % Broker description
         ]).
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% APIs
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 -spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
 start_link() ->
@@ -101,10 +107,13 @@ datetime() ->
             "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
 
 sys_interval() ->
-    emqx:get_config([broker, sys_msg_interval]).
+    emqx:get_config([sys_topic, sys_msg_interval]).
 
 sys_heatbeat_interval() ->
-    emqx:get_config([broker, sys_heartbeat_interval]).
+    emqx:get_config([sys_topic, sys_heartbeat_interval]).
+
+sys_event_message() ->
+    emqx:get_config([sys_topic, sys_event_messages]).
 
 %% @doc Get sys info
 -spec(info() -> list(tuple())).
@@ -114,12 +123,13 @@ info() ->
      {uptime, uptime()},
      {datetime, datetime()}].
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% gen_server callbacks
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 init([]) ->
     State = #state{sysdescr   = iolist_to_binary(sysdescr())},
+    load_event_hooks(),
     {ok, heartbeat(tick(State))}.
 
 heartbeat(State) ->
@@ -127,6 +137,14 @@ heartbeat(State) ->
 tick(State) ->
     State#state{ticker = start_timer(sys_interval(), tick)}.
 
+load_event_hooks() ->
+    maps:foreach(
+      fun(_, false) -> ok;
+         (K, true) ->
+            {HookPoint, Fun} = hook_and_fun(K),
+            emqx_hooks:put(HookPoint, {?MODULE, Fun, []})
+      end, sys_event_message()).
+
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.
@@ -153,11 +171,81 @@ handle_info(Info, State) ->
     {noreply, State}.
 
 terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->
+    unload_event_hooks(),
     lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]).
 
-%%-----------------------------------------------------------------------------
+unload_event_hooks() ->
+    lists:foreach(fun(K, _) ->
+        {HookPoint, Fun} = hook_and_fun(K),
+        emqx_hooks:del(HookPoint, {?MODULE, Fun})
+    end, sys_event_message()).
+
+%%--------------------------------------------------------------------
+%% hook callbacks
+%%--------------------------------------------------------------------
+
+on_client_connected(ClientInfo, ConnInfo) ->
+    Payload0 = common_infos(ClientInfo, ConnInfo),
+    Payload = Payload0#{
+                keepalive       => maps:get(keepalive, ConnInfo, 0),
+                clean_start     => maps:get(clean_start, ConnInfo, true),
+                expiry_interval => maps:get(expiry_interval, ConnInfo, 0)
+               },
+    publish(connected, Payload).
+
+on_client_disconnected(ClientInfo, Reason,
+                       ConnInfo = #{disconnected_at := DisconnectedAt}) ->
+
+    Payload0 = common_infos(ClientInfo, ConnInfo),
+    Payload = Payload0#{
+                reason => reason(Reason),
+                disconnected_at => DisconnectedAt
+               },
+    publish(disconnected, Payload).
+
+-compile({inline, [reason/1]}).
+reason(Reason) when is_atom(Reason) -> Reason;
+reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
+reason({Error, _}) when is_atom(Error) -> Error;
+reason(_) -> internal_error.
+
+on_client_subscribed(_ClientInfo = #{clientid := ClientId,
+                                     username := Username,
+                                     protocol := Protocol},
+                     Topic, SubOpts) ->
+    Payload = #{clientid => ClientId,
+                username => Username,
+                protocol => Protocol,
+                topic => Topic,
+                subopts => SubOpts,
+                ts => erlang:system_time(millisecond)
+               },
+    publish(subscribed, Payload).
+
+on_client_unsubscribed(_ClientInfo = #{clientid := ClientId,
+                                       username := Username,
+                                       protocol := Protocol},
+                       Topic, _SubOpts) ->
+    Payload = #{clientid => ClientId,
+                username => Username,
+                protocol => Protocol,
+                topic => Topic,
+                ts => erlang:system_time(millisecond)
+               },
+    publish(unsubscribed, Payload).
+
+%%--------------------------------------------------------------------
 %% Internal functions
-%%-----------------------------------------------------------------------------
+%%--------------------------------------------------------------------
+
+hook_and_fun(client_connected) ->
+    {'client.connected', on_client_connected};
+hook_and_fun(client_disconnected) ->
+    {'client.disconnected', on_client_disconnected};
+hook_and_fun(client_subscribed) ->
+    {'session.subscribed', on_client_subscribed};
+hook_and_fun(client_unsubscribed) ->
+    {'session.unsubscribed', on_client_unsubscribed}.
 
 publish_any(Name, Value) ->
     _ = publish(Name, Value),
@@ -179,7 +267,11 @@ publish(stats, Stats) ->
      || {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)];
 publish(metrics, Metrics) ->
     [safe_publish(systop(metric_topic(Name)), integer_to_binary(Val))
-     || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)].
+     || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)];
+publish(Event, Payload) when Event == connected; Event == disconnected;
+                             Event == subscribed; Event == unsubscribed ->
+    Topic = event_topic(Event, Payload),
+    safe_publish(Topic, emqx_json:encode(Payload)).
 
 metric_topic(Name) ->
     lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]).
@@ -191,3 +283,39 @@ safe_publish(Topic, Flags, Payload) ->
       emqx_message:set_flags(
         maps:merge(#{sys => true}, Flags),
         emqx_message:make(?SYS, Topic, iolist_to_binary(Payload)))).
+
+common_infos(
+  _ClientInfo = #{clientid := ClientId,
+                  username := Username,
+                  peerhost := PeerHost,
+                  sockport := SockPort,
+                  protocol := Protocol
+                 },
+  _ConnInfo = #{proto_name := ProtoName,
+                proto_ver := ProtoVer,
+                connected_at := ConnectedAt
+               }) ->
+    #{clientid => ClientId,
+      username => Username,
+      ipaddress => ntoa(PeerHost),
+      sockport => SockPort,
+      protocol => Protocol,
+      proto_name => ProtoName,
+      proto_ver => ProtoVer,
+      connected_at => ConnectedAt,
+      ts => erlang:system_time(millisecond)
+     }.
+
+ntoa(undefined) -> undefined;
+ntoa({IpAddr, Port}) ->
+    iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
+ntoa(IpAddr) ->
+    iolist_to_binary(inet:ntoa(IpAddr)).
+
+event_topic(Event, #{clientid := ClientId, protocol := mqtt}) ->
+    iolist_to_binary(
+      [systop("clients"), "/", ClientId, "/", atom_to_binary(Event)]);
+event_topic(Event, #{clientid := ClientId, protocol := GwName}) ->
+    iolist_to_binary(
+      [systop("gateway"), "/", atom_to_binary(GwName),
+       "/clients/", ClientId, "/", atom_to_binary(Event)]).