Просмотр исходного кода

Merge pull request #6815 from k32/bpapi-gateway

Bpapi gateway
k32 4 лет назад
Родитель
Сommit
f67278b13d

+ 2 - 1
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -304,10 +304,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _},
 %% format funcs
 
 format_channel_info({_, Infos, Stats} = R) ->
+    Node = maps:get(node, Infos, node()),
     ClientInfo = maps:get(clientinfo, Infos, #{}),
     ConnInfo = maps:get(conninfo, Infos, #{}),
     SessInfo = maps:get(session, Infos, #{}),
-    FetchX = [ {node, ClientInfo, node()}
+    FetchX = [ {node, ClientInfo, Node}
              , {clientid, ClientInfo}
              , {username, ClientInfo}
              , {proto_name, ConnInfo}

+ 97 - 38
apps/emqx_gateway/src/emqx_gateway_cm.erl

@@ -36,6 +36,7 @@
         , register_channel/4
         , unregister_channel/2
         , insert_channel_info/4
+        , lookup_by_clientid/2
         , set_chan_info/3
         , set_chan_info/4
         , get_chan_info/2
@@ -63,6 +64,20 @@
         , code_change/3
         ]).
 
+%% RPC targets
+-export([ do_lookup_by_clientid/2
+        , do_get_chan_info/3
+        , do_set_chan_info/4
+        , do_get_chan_stats/3
+        , do_set_chan_stats/4
+        , do_discard_session/3
+        , do_kick_session/3
+        , do_get_chann_conn_mod/3
+        ]).
+
+-export_type([ gateway_name/0
+             ]).
+
 -record(state, {
           gwname    :: gateway_name(), %% Gateway Name
           locker    :: pid(),          %% ClientId Locker for CM
@@ -146,16 +161,38 @@ get_chan_info(GwName, ClientId) ->
             get_chan_info(GwName, ClientId, ChanPid)
         end).
 
--spec get_chan_info(gateway_name(), emqx_types:clientid(), pid())
+-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) ->
+          [pid()].
+do_lookup_by_clientid(GwName, ClientId) ->
+    ChanTab = emqx_gateway_cm:tabname(chan, GwName),
+    [Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)].
+
+-spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid())
       -> emqx_types:infos() | undefined.
-get_chan_info(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+do_get_chan_info(GwName, ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
-    try ets:lookup_element(tabname(info, GwName), Chan, 2)
+    try
+        Info = ets:lookup_element(tabname(info, GwName), Chan, 2),
+        Info#{node => node()}
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid())
+      -> emqx_types:infos() | undefined.
 get_chan_info(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_info, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)).
+
+-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) ->
+          [pid()].
+lookup_by_clientid(GwName, ClientId) ->
+    Nodes = mria_mnesia:running_nodes(),
+    case emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId) of
+        {Pids, []} ->
+            lists:append(Pids);
+        {_, _BadNodes} ->
+            error(badrpc)
+    end.
 
 %% @doc Update infos of the channel.
 -spec set_chan_info(gateway_name(),
@@ -164,18 +201,23 @@ get_chan_info(GwName, ClientId, ChanPid) ->
 set_chan_info(GwName, ClientId, Infos) ->
     set_chan_info(GwName, ClientId, self(), Infos).
 
--spec set_chan_info(gateway_name(),
-                    emqx_types:clientid(),
-                    pid(),
-                    emqx_types:infos()) -> boolean().
-set_chan_info(GwName, ClientId, ChanPid, Infos) when node(ChanPid) == node() ->
+-spec do_set_chan_info(gateway_name(),
+                       emqx_types:clientid(),
+                       pid(),
+                       emqx_types:infos()) -> boolean().
+do_set_chan_info(GwName, ClientId, ChanPid, Infos) ->
     Chan = {ClientId, ChanPid},
     try ets:update_element(tabname(info, GwName), Chan, {2, Infos})
     catch
         error:badarg -> false
-    end;
+    end.
+
+-spec set_chan_info(gateway_name(),
+                    emqx_types:clientid(),
+                    pid(),
+                    emqx_types:infos()) -> boolean().
 set_chan_info(GwName, ClientId, ChanPid, Infos) ->
-    rpc_call(node(ChanPid), set_chan_info, [GwName, ClientId, ChanPid, Infos]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_info(GwName, ClientId, ChanPid, Infos)).
 
 %% @doc Get channel's stats.
 -spec get_chan_stats(gateway_name(), emqx_types:clientid())
@@ -186,16 +228,19 @@ get_chan_stats(GwName, ClientId) ->
             get_chan_stats(GwName, ClientId, ChanPid)
         end).
 
--spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid())
+-spec do_get_chan_stats(gateway_name(), emqx_types:clientid(), pid())
       -> emqx_types:stats() | undefined.
-get_chan_stats(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+do_get_chan_stats(GwName, ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try ets:lookup_element(tabname(info, GwName), Chan, 3)
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid())
+      -> emqx_types:stats() | undefined.
 get_chan_stats(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_stats, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_stats(GwName, ClientId, ChanPid)).
 
 -spec set_chan_stats(gateway_name(),
                      emqx_types:clientid(),
@@ -203,19 +248,23 @@ get_chan_stats(GwName, ClientId, ChanPid) ->
 set_chan_stats(GwName, ClientId, Stats) ->
     set_chan_stats(GwName, ClientId, self(), Stats).
 
+-spec do_set_chan_stats(gateway_name(),
+                        emqx_types:clientid(),
+                        pid(),
+                        emqx_types:stats()) -> boolean().
+do_set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
+    Chan = {ClientId, ChanPid},
+    try ets:update_element(tabname(info, GwName), Chan, {3, Stats})
+    catch
+        error:badarg -> false
+    end.
+
 -spec set_chan_stats(gateway_name(),
                      emqx_types:clientid(),
                      pid(),
                      emqx_types:stats()) -> boolean().
-set_chan_stats(GwName, ClientId, ChanPid, Stats)
-  when node(ChanPid) == node() ->
-    Chan = {ClientId, self()},
-    try ets:update_element(tabname(info, GwName), Chan, {3, Stats})
-    catch
-        error:badarg -> false
-    end;
 set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
-    rpc_call(node(ChanPid), set_chan_stats, [GwName, ClientId, ChanPid, Stats]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_stats(GwName, ClientId, ChanPid, Stats)).
 
 -spec connection_closed(gateway_name(), emqx_types:clientid()) -> true.
 connection_closed(GwName, ClientId) ->
@@ -297,11 +346,11 @@ create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
 discard_session(GwName, ClientId) when is_binary(ClientId) ->
     case lookup_channels(GwName, ClientId) of
         [] -> ok;
-        ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(GwName, ClientId, Pid) end, ChanPids)
+        ChanPids -> lists:foreach(fun(Pid) -> safe_discard_session(GwName, ClientId, Pid) end, ChanPids)
     end.
 
 %% @private
-do_discard_session(GwName, ClientId, Pid) ->
+safe_discard_session(GwName, ClientId, Pid) ->
     try
         discard_session(GwName, ClientId, Pid)
     catch
@@ -315,17 +364,20 @@ do_discard_session(GwName, ClientId, Pid) ->
             ok
     end.
 
-%% @private
-discard_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_discard_session(gateway_name(), emqx_types:clientid(), pid()) ->
+          _.
+do_discard_session(GwName, ClientId, ChanPid) ->
     case get_chann_conn_mod(GwName, ClientId, ChanPid) of
         undefined -> ok;
         ConnMod when is_atom(ConnMod) ->
             ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
-    end;
+    end.
 
 %% @private
+-spec discard_session(gateway_name(), emqx_types:clientid(), pid()) ->
+          _.
 discard_session(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), discard_session, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:discard_session(GwName, ClientId, ChanPid)).
 
 -spec kick_session(gateway_name(), emqx_types:clientid())
     -> {error, any()}
@@ -346,16 +398,20 @@ kick_session(GwName, ClientId) ->
             kick_session(GwName, ClientId, ChanPid)
     end.
 
-kick_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_kick_session(gateway_name(), emqx_types:clientid(), pid()) ->
+          _.
+do_kick_session(GwName, ClientId, ChanPid) ->
     case get_chan_info(GwName, ClientId, ChanPid) of
         #{conninfo := #{conn_mod := ConnMod}} ->
             ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
         undefined ->
             {error, not_found}
-    end;
+    end.
 
+-spec kick_session(gateway_name(), emqx_types:clientid(), pid()) ->
+          _.
 kick_session(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), kick_session, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:kick_session(GwName, ClientId, ChanPid)).
 
 with_channel(GwName, ClientId, Fun) ->
     case lookup_channels(GwName, ClientId) of
@@ -369,14 +425,17 @@ with_channel(GwName, ClientId, Fun) ->
 lookup_channels(GwName, ClientId) ->
     emqx_gateway_cm_registry:lookup_channels(GwName, ClientId).
 
-get_chann_conn_mod(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom().
+do_get_chann_conn_mod(GwName, ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try [ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2), ConnMod
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom().
 get_chann_conn_mod(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chann_conn_mod, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)).
 
 %% Locker
 
@@ -398,8 +457,8 @@ locker_unlock(Locker, ClientId) ->
     ekka_locker:release(Locker, ClientId, quorum).
 
 %% @private
-rpc_call(Node, Fun, Args) ->
-    case rpc:call(Node, ?MODULE, Fun, Args) of
+wrap_rpc(Ret) ->
+    case Ret of
         {badrpc, Reason} -> error(Reason);
         Res -> Res
     end.

+ 15 - 36
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -47,9 +47,7 @@
 
 %% Mgmt APIs - clients
 -export([ lookup_client/3
-        , lookup_client/4
         , kickout_client/2
-        , kickout_client/3
         , list_client_subscriptions/2
         , client_subscribe/4
         , client_unsubscribe/3
@@ -231,41 +229,28 @@ confexp({error, Reason}) -> error(Reason).
 %%--------------------------------------------------------------------
 
 -spec lookup_client(gateway_name(),
-                    emqx_types:clientid(), {atom(), atom()}) -> list().
-lookup_client(GwName, ClientId, FormatFun) ->
-    lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun)
-                  || Node <- mria_mnesia:running_nodes()]).
-
-lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() ->
-    ChanTab = emqx_gateway_cm:tabname(chan, GwName),
-    InfoTab = emqx_gateway_cm:tabname(info, GwName),
-
-    lists:append(lists:map(
-      fun(Key) ->
-        lists:map(fun M:F/1, ets:lookup(InfoTab, Key))
-      end, ets:lookup(ChanTab, ClientId)));
-
-lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) ->
-    rpc_call(Node, lookup_client,
-             [Node, GwName, {clientid, ClientId}, FormatFun]).
+                    emqx_types:clientid(), {module(), atom()}) -> list().
+lookup_client(GwName, ClientId, {M, F}) ->
+    [begin
+         Info = emqx_gateway_cm:get_chan_info(GwName, ClientId, Pid),
+         Stats = emqx_gateway_cm:get_chan_stats(GwName, ClientId, Pid),
+         M:F({{ClientId, Pid}, Info, Stats})
+     end
+     || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)].
 
 -spec kickout_client(gateway_name(), emqx_types:clientid())
     -> {error, any()}
      | ok.
 kickout_client(GwName, ClientId) ->
-    Results = [kickout_client(Node, GwName, ClientId)
-               || Node <- mria_mnesia:running_nodes()],
-    case lists:any(fun(Item) -> Item =:= ok end, Results) of
-        true  -> ok;
-        false -> lists:last(Results)
+    Results = [emqx_gateway_cm:kick_session(GwName, ClientId, Pid)
+               || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)],
+    IsOk = lists:any(fun(Item) -> Item =:= ok end, Results),
+    case {IsOk, Results} of
+        {true , _ } -> ok;
+        {_    , []} -> {error, not_found};
+        {false, _ } -> lists:last(Results)
     end.
 
-kickout_client(Node, GwName, ClientId) when Node =:= node() ->
-    emqx_gateway_cm:kick_session(GwName, ClientId);
-
-kickout_client(Node, GwName, ClientId) ->
-    rpc_call(Node, kickout_client, [Node, GwName, ClientId]).
-
 -spec list_client_subscriptions(gateway_name(), emqx_types:clientid())
     -> {error, any()}
      | {ok, list()}.
@@ -459,9 +444,3 @@ to_list(B) when is_binary(B) ->
 
 %%--------------------------------------------------------------------
 %% Internal funcs
-
-rpc_call(Node, Fun, Args) ->
-    case rpc:call(Node, ?MODULE, Fun, Args) of
-        {badrpc, Reason} -> {error, Reason};
-        Res -> Res
-    end.

+ 77 - 0
apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl

@@ -0,0 +1,77 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_cm_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , get_chan_info/3
+        , set_chan_info/4
+        , get_chan_stats/3
+        , set_chan_stats/4
+        , discard_session/3
+        , kick_session/3
+        , get_chann_conn_mod/3
+        , lookup_by_clientid/3
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec lookup_by_clientid([node()], emqx_gateway_cm:gateway_name(), emqx_types:clientid()) ->
+          emqx_rpc:multicall_return([pid()]).
+lookup_by_clientid(Nodes, GwName, ClientId) ->
+    rpc:multicall(Nodes, emqx_gateway_cm, do_lookup_by_clientid, [GwName, ClientId]).
+
+-spec get_chan_info(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid())
+      -> emqx_types:infos() | undefined | {badrpc, _}.
+get_chan_info(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_info, [GwName, ClientId, ChanPid]).
+
+-spec set_chan_info(emqx_gateway_cm:gateway_name(),
+                    emqx_types:clientid(),
+                    pid(),
+                    emqx_types:infos()) -> boolean() | {badrpc, _}.
+set_chan_info(GwName, ClientId, ChanPid, Infos) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_info, [GwName, ClientId, ChanPid, Infos]).
+
+-spec get_chan_stats(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid())
+      -> emqx_types:stats() | undefined | {badrpc, _}.
+get_chan_stats(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_stats, [GwName, ClientId, ChanPid]).
+
+-spec set_chan_stats(emqx_gateway_cm:gateway_name(),
+                     emqx_types:clientid(),
+                     pid(),
+                     emqx_types:stats()) -> boolean() | {badrpc, _}.
+set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_stats, [GwName, ClientId, ChanPid, Stats]).
+
+-spec discard_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _.
+discard_session(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_discard_session, [GwName, ClientId, ChanPid]).
+
+-spec kick_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _.
+kick_session(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_kick_session, [GwName, ClientId, ChanPid]).
+
+-spec get_chann_conn_mod(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> atom() | {badrpc, _}.
+get_chann_conn_mod(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chann_conn_mod, [GwName, ClientId, ChanPid]).

+ 2 - 1
apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl

@@ -125,7 +125,7 @@ t_get_set_chan_info_stats(_) ->
       #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
 
     %% Info: get/set
-    NInfo = #{newinfo => true},
+    NInfo = #{newinfo => true, node => node()},
     emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo),
     ?assertEqual(
        NInfo,
@@ -200,6 +200,7 @@ t_kick_session(_) ->
         100 ->
             ?assert(false, "waiting kick msg timeout")
     end,
+    ?assertMatch({error, not_found}, emqx_gateway_http:kickout_client(?GWNAME, <<"i-dont-exist">>)),
     meck:unload(emqx_gateway_cm_registry).
 
 t_unexpected_handle(Conf) ->