Explorar el Código

refactor(emqx_cm): Move lookup_client/1 to emqx_cm

k32 hace 4 años
padre
commit
428eeeffc0

+ 22 - 0
apps/emqx/src/emqx_cm.erl

@@ -56,6 +56,8 @@
 
 -export([ lookup_channels/1
         , lookup_channels/2
+
+        , lookup_client/1
         ]).
 
 %% Test/debug interface
@@ -80,8 +82,16 @@
         , get_connected_client_count/0
         ]).
 
+-export_type([ channel_info/0
+             ]).
+
 -type(chan_pid() :: pid()).
 
+-type(channel_info() :: { _Chan :: {emqx_types:clientid(), pid()}
+                        , _Info :: emqx_types:infos()
+                        , _Stats :: emqx_types:stats()
+                        }).
+
 %% Tables for channel management.
 -define(CHAN_TAB, emqx_channel).
 -define(CHAN_CONN_TAB, emqx_channel_conn).
@@ -502,6 +512,18 @@ lookup_channels(global, ClientId) ->
 lookup_channels(local, ClientId) ->
     [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
 
+-spec lookup_client({clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
+          [channel_info()].
+lookup_client({username, Username}) ->
+    MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'}
+                 , [{'=:=','$1', Username}]
+                 , ['$_']
+                 }],
+    ets:select(emqx_channel_info, MatchSpec);
+lookup_client({clientid, ClientId}) ->
+    [Rec || Key <- ets:lookup(emqx_channel, ClientId)
+          , Rec <- ets:lookup(emqx_channel_info, Key)].
+
 %% @private
 rpc_call(Node, Fun, Args, Timeout) ->
     case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of

+ 6 - 0
apps/emqx/src/proto/emqx_broker_proto_v1.erl

@@ -24,6 +24,7 @@
         , forward_async/3
         , client_subscriptions/2
 
+        , lookup_client/2
         , kickout_client/2
         ]).
 
@@ -51,3 +52,8 @@ client_subscriptions(Node, ClientId) ->
 -spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
 kickout_client(Node, ClientId) ->
     rpc:call(Node, emqx_cm, kick_session, [ClientId]).
+
+-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
+          [emqx_cm:channel_info()] | {badrpc, _}.
+lookup_client(Node, Key) ->
+    rpc:call(Node, emqx_cm, lookup_client, [Key]).

+ 12 - 22
apps/emqx_management/src/emqx_mgmt.erl

@@ -121,6 +121,12 @@
 
 -elvis([{elvis_style, god_modules, disable}]).
 
+-export_type([listener_manage_op/0]).
+
+-type listener_manage_op() :: start_listener
+                            | stop_listener
+                            | restart_listener.
+
 %% TODO: remove these function after all api use minirest version 1.X
 return() ->
     ok.
@@ -241,24 +247,11 @@ lookup_client({username, Username}, FormatFun) ->
     lists:append([lookup_client(Node, {username, Username}, FormatFun)
                   || Node <- mria_mnesia:running_nodes()]).
 
-lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
-    lists:append(lists:map(
-      fun(Key) ->
-        lists:map(fun M:F/1, ets:lookup(emqx_channel_info, Key))
-      end, ets:lookup(emqx_channel, ClientId)));
-
-lookup_client(Node, {clientid, ClientId}, FormatFun) ->
-    rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]);
-
-lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() ->
-    MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'}
-                 , [{'=:=','$1', Username}]
-                 , ['$_']
-                 }],
-    lists:map(fun M:F/1, ets:select(emqx_channel_info, MatchSpec));
-
-lookup_client(Node, {username, Username}, FormatFun) ->
-    rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
+lookup_client(Node, Key, {M, F}) ->
+    case wrap_rpc(emqx_broker_proto_v1:lookup_client(Node, Key)) of
+        {error, Err} -> {error, Err};
+        L            -> lists:map(fun M:F/1, L)
+    end.
 
 kickout_client({ClientID, FormatFun}) ->
     case lookup_client({clientid, ClientID}, FormatFun) of
@@ -464,10 +457,7 @@ listener_id_filter(Id, Listeners) ->
     Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
     lists:filter(Filter, Listeners).
 
-
--spec manage_listener( Operation :: start_listener
-                                  | stop_listener
-                                  | restart_listener
+-spec manage_listener( listener_manage_op()
                      , Param :: map()) ->
           ok | {error, Reason :: term()}.
 manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->