Procházet zdrojové kódy

feat(gw): implement clients list http-api

JianBo He před 4 roky
rodič
revize
52b6d620ee
1 změnil soubory, kde provedl 200 přidání a 11 odebrání
  1. 200 11
      apps/emqx_gateway/src/emqx_gateway_api_client.erl

+ 200 - 11
apps/emqx_gateway/src/emqx_gateway_api_client.erl

@@ -21,11 +21,21 @@
 %% minirest behaviour callbacks
 -export([api_spec/0]).
 
+%% http handlers
 -export([ clients/2
         , clients_insta/2
         , subscriptions/2
         ]).
 
+%% internal exports (for client query)
+-export([ query/4
+        , format_channel_info/1
+        ]).
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
 api_spec() ->
     {metadata(apis()), []}.
 
@@ -36,8 +46,49 @@ apis() ->
     , {"/gateway/:name/clients/:clientid/subscriptions/:topic", subscriptions}
     ].
 
-clients(get, _Req) ->
-    {200, []}.
+
+-define(CLIENT_QS_SCHEMA,
+    [ {<<"node">>, atom}
+    , {<<"clientid">>, binary}
+    , {<<"username">>, binary}
+    %%, {<<"zone">>, atom}
+    , {<<"ip_address">>, ip}
+    , {<<"conn_state">>, atom}
+    , {<<"clean_start">>, atom}
+    %%, {<<"proto_name">>, binary}
+    %%, {<<"proto_ver">>, integer}
+    , {<<"like_clientid">>, binary}
+    , {<<"like_username">>, binary}
+    , {<<"gte_created_at">>, timestamp}
+    , {<<"lte_created_at">>, timestamp}
+    , {<<"gte_connected_at">>, timestamp}
+    , {<<"lte_connected_at">>, timestamp}
+    ]).
+
+-define(query_fun, {?MODULE, query}).
+-define(format_fun, {?MODULE, format_channel_info}).
+
+clients(get, #{ bindings := #{name := GwName0}
+              , query_string := Qs
+              }) ->
+    GwName = binary_to_existing_atom(GwName0),
+    TabName = emqx_gateway_cm:tabname(info, GwName),
+    case maps:get(<<"node">>, Qs, undefined) of
+        undefined ->
+            Response = emqx_mgmt_api:cluster_query(
+                         Qs, TabName,
+                         ?CLIENT_QS_SCHEMA, ?query_fun
+                        ),
+            {200, Response};
+        Node1 ->
+            Node = binary_to_atom(Node1, utf8),
+            ParamsWithoutNode = maps:without([<<"node">>], Qs),
+            Response = emqx_mgmt_api:node_query(
+                         Node, ParamsWithoutNode,
+                         TabName, ?CLIENT_QS_SCHEMA, ?query_fun
+                        ),
+            {200, Response}
+    end.
 
 clients_insta(get, _Req) ->
     {200, <<"{}">>};
@@ -49,6 +100,145 @@ subscriptions(get, _Req) ->
 subscriptions(delete, _Req) ->
     {200}.
 
+%%--------------------------------------------------------------------
+%% query funcs
+
+query(Tab, {Qs, []}, Start, Limit) ->
+    Ms = qs2ms(Qs),
+    emqx_mgmt_api:select_table(Tab, Ms, Start, Limit,
+                               fun format_channel_info/1);
+
+query(Tab, {Qs, Fuzzy}, Start, Limit) ->
+    Ms = qs2ms(Qs),
+    MatchFun = match_fun(Ms, Fuzzy),
+    emqx_mgmt_api:traverse_table(Tab, MatchFun, Start, Limit,
+                                 fun format_channel_info/1).
+
+qs2ms(Qs) ->
+    {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
+    [{{'$1', MtchHead, '_'}, Conds, ['$_']}].
+
+qs2ms([], _, {MtchHead, Conds}) ->
+    {MtchHead, lists:reverse(Conds)};
+
+qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
+    NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
+    qs2ms(Rest, N, {NMtchHead, Conds});
+qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
+    Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
+    NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
+    NConds = put_conds(Qs, Holder, Conds),
+    qs2ms(Rest, N+1, {NMtchHead, NConds}).
+
+put_conds({_, Op, V}, Holder, Conds) ->
+    [{Op, Holder, V} | Conds];
+put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
+    [{Op2, Holder, V2},
+        {Op1, Holder, V1} | Conds].
+
+ms(clientid, X) ->
+    #{clientinfo => #{clientid => X}};
+ms(username, X) ->
+    #{clientinfo => #{username => X}};
+ms(zone, X) ->
+    #{clientinfo => #{zone => X}};
+ms(ip_address, X) ->
+    #{clientinfo => #{peerhost => X}};
+ms(conn_state, X) ->
+    #{conn_state => X};
+ms(clean_start, X) ->
+    #{conninfo => #{clean_start => X}};
+ms(proto_name, X) ->
+    #{conninfo => #{proto_name => X}};
+ms(proto_ver, X) ->
+    #{conninfo => #{proto_ver => X}};
+ms(connected_at, X) ->
+    #{conninfo => #{connected_at => X}};
+ms(created_at, X) ->
+    #{session => #{created_at => X}}.
+
+%%--------------------------------------------------------------------
+%% Match funcs
+
+match_fun(Ms, Fuzzy) ->
+    MsC = ets:match_spec_compile(Ms),
+    REFuzzy = lists:map(fun({K, like, S}) ->
+        {ok, RE} = re:compile(S),
+        {K, like, RE}
+                        end, Fuzzy),
+    fun(Rows) ->
+        case ets:match_spec_run(Rows, MsC) of
+            [] -> [];
+            Ls ->
+                lists:filter(fun(E) ->
+                    run_fuzzy_match(E, REFuzzy)
+                             end, Ls)
+        end
+    end.
+
+run_fuzzy_match(_, []) ->
+    true;
+run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) ->
+    Val = case maps:get(Key, ClientInfo, "") of
+              undefined -> "";
+              V -> V
+          end,
+    re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_match(E, Fuzzy).
+
+%%--------------------------------------------------------------------
+%% format funcs
+
+format_channel_info({_, ClientInfo, ClientStats}) ->
+    Fun =
+        fun
+            (_Key, Value, Current) when is_map(Value) ->
+                maps:merge(Current, Value);
+            (Key, Value, Current) ->
+                maps:put(Key, Value, Current)
+        end,
+    StatsMap = maps:without([memory, next_pkt_id, total_heap_size],
+        maps:from_list(ClientStats)),
+    ClientInfoMap0 = maps:fold(Fun, #{}, ClientInfo),
+    IpAddress      = peer_to_binary(maps:get(peername, ClientInfoMap0)),
+    Connected      = maps:get(conn_state, ClientInfoMap0) =:= connected,
+    ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0),
+    ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1),
+    ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
+    ClientInfoMap  = maps:put(connected, Connected, ClientInfoMap3),
+    RemoveList = [
+          auth_result
+        , peername
+        , sockname
+        , peerhost
+        , conn_state
+        , send_pend
+        , conn_props
+        , peercert
+        , sockstate
+        , subscriptions
+        , receive_maximum
+        , protocol
+        , is_superuser
+        , sockport
+        , anonymous
+        , mountpoint
+        , socktype
+        , active_n
+        , await_rel_timeout
+        , conn_mod
+        , sockname
+        , retry_interval
+        , upgrade_qos
+    ],
+    maps:without(RemoveList, ClientInfoMap).
+
+peer_to_binary({Addr, Port}) ->
+    AddrBinary = list_to_binary(inet:ntoa(Addr)),
+    PortBinary = integer_to_binary(Port),
+    <<AddrBinary/binary, ":", PortBinary/binary>>;
+peer_to_binary(Addr) ->
+    list_to_binary(inet:ntoa(Addr)).
+
 %%--------------------------------------------------------------------
 %% Swagger defines
 %%--------------------------------------------------------------------
@@ -112,7 +302,7 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", post) ->
      };
 swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) ->
     #{ description => <<"Unsubscribe the topic for client">>
-     , parameters => params_client_insta() ++ params_topic_name_in_path()
+     , parameters => params_topic_name_in_path() ++ params_client_insta()
      , responses =>
         #{ <<"404">> => schema_not_found()
          , <<"204">> => schema_no_content()
@@ -120,13 +310,13 @@ swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) ->
      }.
 
 params_client_query() ->
-    params_client_searching_in_qs()
-    ++ emqx_mgmt_util:page_params()
-    ++ params_gateway_name_in_path().
+    params_gateway_name_in_path()
+    ++ params_client_searching_in_qs()
+    ++ emqx_mgmt_util:page_params().
 
 params_client_insta() ->
-    params_gateway_name_in_path()
-    ++ params_clientid_in_path().
+    params_clientid_in_path()
+    ++ params_gateway_name_in_path().
 
 params_client_searching_in_qs() ->
     queries(
@@ -183,11 +373,10 @@ schema_no_content() ->
     #{description => <<"No Content">>}.
 
 schema_clients_list() ->
-    emqx_mgmt_util:array_schema(
+    emqx_mgmt_util:page_schema(
       #{ type => object
        , properties => properties_client()
-       },
-      <<"Client lists">>
+       }
      ).
 
 schema_client() ->