Sfoglia il codice sorgente

chore(gw): clients http implement skeleton

JianBo He 4 anni fa
parent
commit
14b39224d4

+ 4 - 4
apps/emqx_gateway/src/emqx_gateway.erl

@@ -25,7 +25,7 @@
         , post_config_update/4
         ]).
 
-%% APIs
+%% Gateway APIs
 -export([ registered_gateway/0
         , load/2
         , unload/1
@@ -48,7 +48,7 @@ registered_gateway() ->
     emqx_gateway_registry:list().
 
 %%--------------------------------------------------------------------
-%% Gateway Instace APIs
+%% Gateway APIs
 
 -spec list() -> [gateway()].
 list() ->
@@ -96,7 +96,8 @@ update_rawconf(RawName, RawConfDiff) ->
 %%--------------------------------------------------------------------
 %% Config Handler
 
--spec pre_config_update(emqx_config:update_request(), emqx_config:raw_config()) ->
+-spec pre_config_update(emqx_config:update_request(),
+                        emqx_config:raw_config()) ->
     {ok, emqx_config:update_request()} | {error, term()}.
 pre_config_update({RawName, RawConfDiff}, RawConf) ->
     {ok, emqx_map_lib:deep_merge(RawConf, #{RawName => RawConfDiff})}.
@@ -117,4 +118,3 @@ post_config_update({RawName, _}, NewConfig, OldConfig, _AppEnvs) ->
 %%--------------------------------------------------------------------
 %% Internal funcs
 %%--------------------------------------------------------------------
-

+ 8 - 13
apps/emqx_gateway/src/emqx_gateway_api.erl

@@ -20,8 +20,13 @@
 
 -compile(nowarn_unused_function).
 
--import(emqx_mgmt_util, [ schema/1
-                        ]).
+-import(emqx_mgmt_util,
+        [ schema/1
+        ]).
+
+-import(emqx_gateway_http,
+        [ return_http_error/2
+        ]).
 
 %% minirest behaviour callbacks
 -export([api_spec/0]).
@@ -342,7 +347,7 @@ gateway(get, Request) ->
                  undefined -> all;
                  S0 -> binary_to_existing_atom(S0, utf8)
              end,
-    {200, emqx_gateway_intr:gateways(Status)}.
+    {200, emqx_gateway_http:gateways(Status)}.
 
 gateway_insta(delete, #{bindings := #{name := Name0}}) ->
     Name = binary_to_existing_atom(Name0),
@@ -380,13 +385,3 @@ gateway_insta(put, #{body := RawConfsIn,
 
 gateway_insta_stats(get, _Req) ->
     return_http_error(401, <<"Implement it later (maybe 5.1)">>).
-
-return_http_error(Code, Msg) ->
-    emqx_json:encode(
-      #{code => codestr(Code),
-        reason => emqx_gateway_utils:stringfy(Msg)
-       }).
-
-codestr(404) -> 'RESOURCE_NOT_FOUND';
-codestr(401) -> 'NOT_SUPPORTED_NOW';
-codestr(500) -> 'UNKNOW_ERROR'.

+ 155 - 57
apps/emqx_gateway/src/emqx_gateway_api_client.erl

@@ -13,8 +13,8 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
-%%
--module(emqx_gateway_api_client).
+
+-module(emqx_gateway_api_clients).
 
 -behaviour(minirest_api).
 
@@ -32,6 +32,10 @@
         , format_channel_info/1
         ]).
 
+-import(emqx_gateway_http,
+        [ return_http_error/2
+        ]).
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -46,17 +50,14 @@ apis() ->
     , {"/gateway/:name/clients/:clientid/subscriptions/:topic", subscriptions}
     ].
 
-
 -define(CLIENT_QS_SCHEMA,
     [ {<<"node">>, atom}
     , {<<"clientid">>, binary}
     , {<<"username">>, binary}
-    %%, {<<"zone">>, atom}
     , {<<"ip_address">>, ip}
     , {<<"conn_state">>, atom}
     , {<<"clean_start">>, atom}
-    %%, {<<"proto_name">>, binary}
-    %%, {<<"proto_ver">>, integer}
+    , {<<"proto_ver">>, integer}
     , {<<"like_clientid">>, binary}
     , {<<"like_username">>, binary}
     , {<<"gte_created_at">>, timestamp}
@@ -90,14 +91,69 @@ clients(get, #{ bindings := #{name := GwName0}
             {200, Response}
     end.
 
-clients_insta(get, _Req) ->
-    {200, <<"{}">>};
-clients_insta(delete, _Req) ->
+clients_insta(get, #{ bindings := #{name := GwName0,
+                                    clientid := ClientId}
+                    }) ->
+    GwName = binary_to_existing_atom(GwName0),
+    TabName = emqx_gateway_cm:tabname(info, GwName),
+    %% XXX: We need a lookuo function for it instead of a query
+    #{data := Data} = emqx_mgmt_api:cluster_query(
+                        #{<<"clientid">> => ClientId},
+                        TabName, ?CLIENT_QS_SCHEMA, ?query_fun
+                       ),
+    case Data of
+        [ClientInfo] ->
+            {200, ClientInfo};
+        [] ->
+            return_http_error(404, <<"Gateway or ClientId not found">>)
+    end;
+
+clients_insta(delete, #{ bindings := #{name := GwName0,
+                                       clientid := ClientId0}
+                       }) ->
+    GwName = binary_to_existing_atom(GwName0),
+    ClientId = emqx_mgmt_util:urldecode(ClientId0),
+    emqx_gateway_http:client_kickout(GwName, ClientId),
     {200}.
 
-subscriptions(get, _Req) ->
+subscriptions(get, #{ bindings := #{name := GwName0,
+                                    clientid := ClientId0}
+                    }) ->
+    GwName = binary_to_existing_atom(GwName0),
+    ClientId = emqx_mgmt_util:urldecode(ClientId0),
+    emqx_gateway_http:client_subscriptions(GwName, ClientId),
     {200, []};
-subscriptions(delete, _Req) ->
+
+subscriptions(post, #{ bindings := #{name := GwName0,
+                                     clientid := ClientId0},
+                       body := Body
+                    }) ->
+    GwName = binary_to_existing_atom(GwName0),
+    ClientId = emqx_mgmt_util:urldecode(ClientId0),
+
+    case {maps:get(<<"topic">>, Body, undefined),
+          maps:get(<<"qos">>, Body, 0)} of
+        {undefined, _} ->
+            %% FIXME: more reasonable error code??
+            return_http_error(404, <<"Request paramter missed: topic">>);
+        {Topic, QoS} ->
+            case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of
+                {error, Reason} ->
+                    return_http_error(404, Reason);
+                ok ->
+                    {200}
+            end
+    end;
+
+subscriptions(delete, #{ bindings := #{name := GwName0,
+                                       clientid := ClientId0,
+                                       topic := Topic0
+                                      }
+                       }) ->
+    GwName = binary_to_existing_atom(GwName0),
+    ClientId = emqx_mgmt_util:urldecode(ClientId0),
+    Topic = emqx_mgmt_util:urldecode(Topic0),
+    _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
     {200}.
 
 %%--------------------------------------------------------------------
@@ -148,8 +204,6 @@ ms(conn_state, X) ->
     #{conn_state => X};
 ms(clean_start, X) ->
     #{conninfo => #{clean_start => X}};
-ms(proto_name, X) ->
-    #{conninfo => #{proto_name => X}};
 ms(proto_ver, X) ->
     #{conninfo => #{proto_ver => X}};
 ms(connected_at, X) ->
@@ -188,49 +242,79 @@ run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) -
 %%--------------------------------------------------------------------
 %% format funcs
 
-format_channel_info({_, ClientInfo, ClientStats}) ->
-    Fun =
-        fun
-            (_Key, Value, Current) when is_map(Value) ->
-                maps:merge(Current, Value);
-            (Key, Value, Current) ->
-                maps:put(Key, Value, Current)
-        end,
-    StatsMap = maps:without([memory, next_pkt_id, total_heap_size],
-        maps:from_list(ClientStats)),
-    ClientInfoMap0 = maps:fold(Fun, #{}, ClientInfo),
-    IpAddress      = peer_to_binary(maps:get(peername, ClientInfoMap0)),
-    Connected      = maps:get(conn_state, ClientInfoMap0) =:= connected,
-    ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0),
-    ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1),
-    ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
-    ClientInfoMap  = maps:put(connected, Connected, ClientInfoMap3),
-    RemoveList = [
-          auth_result
-        , peername
-        , sockname
-        , peerhost
-        , conn_state
-        , send_pend
-        , conn_props
-        , peercert
-        , sockstate
-        , subscriptions
-        , receive_maximum
-        , protocol
-        , is_superuser
-        , sockport
-        , anonymous
-        , mountpoint
-        , socktype
-        , active_n
-        , await_rel_timeout
-        , conn_mod
-        , sockname
-        , retry_interval
-        , upgrade_qos
-    ],
-    maps:without(RemoveList, ClientInfoMap).
+format_channel_info({_, Infos, Stats}) ->
+    ClientInfo = maps:get(clientinfo, Infos, #{}),
+    ConnInfo = maps:get(conninfo, Infos, #{}),
+    SessInfo = maps:get(session, Infos, #{}),
+    FetchX = [ {node, ClientInfo, node()}
+             , {clientid, ClientInfo}
+             , {username, ClientInfo}
+             , {proto_name, ConnInfo}
+             , {proto_ver, ConnInfo}
+             , {ip_address, {peername, ConnInfo, fun peer_to_binary/1}}
+             , {is_bridge, ClientInfo, false}
+             , {connected_at,
+                {connected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
+             , {disconnected_at,
+                {disconnected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
+             , {connected, {conn_state, Infos, fun conn_state_to_connected/1}}
+             , {keepalive, ClientInfo, 0}
+             , {clean_start, ConnInfo, true}
+             , {expiry_interval, ConnInfo, 0}
+             , {created_at,
+                {created_at, SessInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
+             , {subscriptions_cnt, Stats, 0}
+             , {subscriptions_max, Stats, infinity}
+             , {inflight_cnt, Stats, 0}
+             , {inflight_max, Stats, infinity}
+             , {mqueue_len, Stats, 0}
+             , {mqueue_max, Stats, infinity}
+             , {mqueue_dropped, Stats, 0}
+             , {awaiting_rel_cnt, Stats, 0}
+             , {awaiting_rel_max, Stats, infinity}
+             , {recv_oct, Stats, 0}
+             , {recv_cnt, Stats, 0}
+             , {recv_pkt, Stats, 0}
+             , {recv_msg, Stats, 0}
+             , {send_oct, Stats, 0}
+             , {send_cnt, Stats, 0}
+             , {send_pkt, Stats, 0}
+             , {send_msg, Stats, 0}
+             , {mailbox_len, Stats, 0}
+             , {heap_size, Stats, 0}
+             , {reductions, Stats, 0}
+             ],
+    eval(FetchX).
+
+eval(Ls) ->
+    eval(Ls, #{}).
+eval([], AccMap) ->
+    AccMap;
+eval([{K, Vx}|More], AccMap) ->
+    case valuex_get(K, Vx) of
+        undefined -> eval(More, AccMap#{K => null});
+        Value -> eval(More, AccMap#{K => Value})
+    end;
+eval([{K, Vx, Default}|More], AccMap) ->
+    case valuex_get(K, Vx) of
+        undefined -> eval(More, AccMap#{K => Default});
+        Value -> eval(More, AccMap#{K => Value})
+    end.
+
+valuex_get(K, Vx) when is_map(Vx); is_list(Vx) ->
+    key_get(K, Vx);
+valuex_get(_K, {InKey, Obj}) when is_map(Obj); is_list(Obj) ->
+    key_get(InKey, Obj);
+valuex_get(_K, {InKey, Obj, MappingFun}) when is_map(Obj); is_list(Obj) ->
+    case key_get(InKey, Obj) of
+        undefined -> undefined;
+        Val -> MappingFun(Val)
+    end.
+
+key_get(K, M) when is_map(M) ->
+    maps:get(K, M, undefined);
+key_get(K, L) when is_list(L) ->
+    proplists:get_value(K, L).
 
 peer_to_binary({Addr, Port}) ->
     AddrBinary = list_to_binary(inet:ntoa(Addr)),
@@ -239,6 +323,9 @@ peer_to_binary({Addr, Port}) ->
 peer_to_binary(Addr) ->
     list_to_binary(inet:ntoa(Addr)).
 
+conn_state_to_connected(connected) -> true;
+conn_state_to_connected(_) -> false.
+
 %%--------------------------------------------------------------------
 %% Swagger defines
 %%--------------------------------------------------------------------
@@ -325,6 +412,7 @@ params_client_searching_in_qs() ->
       , {username, string}
       , {ip_address, string}
       , {conn_state, string}
+      , {proto_ver, string}
       , {clean_start, boolean}
       , {like_clientid, string}
       , {like_username, string}
@@ -426,6 +514,10 @@ properties_client() ->
             "when connected is false">>}
       , {connected, boolean,
          <<"Whether the client is connected">>}
+      %% FIXME: the will_msg attribute is not a general attribute
+      %% for every protocol. But it should be returned to frontend if someone
+      %% want it
+      %%
       %, {will_msg, string,
       %   <<"Client will message">>}
       %, {zone, string,
@@ -488,5 +580,11 @@ properties_subscription() ->
      [ {topic, string,
         <<"Topic Fillter">>}
      , {qos, integer,
-        <<"QoS level">>}
+        <<"QoS level, enum: 0, 1, 2">>}
+     , {nl, integer,     %% FIXME: why not boolean?
+        <<"No Local option, enum: 0, 1">>}
+     , {rap, integer,
+        <<"Retain as Published option, enum: 0, 1">>}
+     , {rh, integer,
+        <<"Retain Handling option, enum: 0, 1, 2">>}
      ]).

+ 2 - 1
apps/emqx_gateway/src/emqx_gateway_cli.erl

@@ -51,7 +51,7 @@ is_cmd(Fun) ->
 
 gateway(["list"]) ->
     lists:foreach(fun(#{name := Name} = Gateway) ->
-        %% XXX: More infos: listeners?, connected?
+        %% TODO: More infos: listeners?, connected?
         Status = maps:get(status, Gateway, stopped),
         emqx_ctl:print("Gateway(name=~s, status=~s)~n",
                        [Name, Status])
@@ -106,6 +106,7 @@ gateway(_) ->
                    ]).
 
 'gateway-clients'(["list", Name]) ->
+    %% FIXME: page me. for example: --limit 100 --page 10 ???
     InfoTab = emqx_gateway_cm:tabname(info, Name),
     case ets:info(InfoTab) of
         undefined ->

+ 72 - 2
apps/emqx_gateway/src/emqx_gateway_intr.erl

@@ -15,11 +15,26 @@
 %%--------------------------------------------------------------------
 
 %% @doc Gateway Interface Module for HTTP-APIs
--module(emqx_gateway_intr).
+-module(emqx_gateway_http).
 
+-include("include/emqx_gateway.hrl").
+
+%% Mgmt APIs - gateway
 -export([ gateways/1
         ]).
 
+%% Mgmt APIs - clients
+-export([ client_lookup/2
+        , client_kickout/2
+        , client_subscribe/4
+        , client_unsubscribe/3
+        , client_subscriptions/2
+        ]).
+
+%% Utils for http, swagger, etc.
+-export([ return_http_error/2
+        ]).
+
 -type gateway_summary() ::
         #{ name := binary()
          , status := running | stopped | unloaded
@@ -30,7 +45,7 @@
          }.
 
 %%--------------------------------------------------------------------
-%% APIs
+%% Mgmt APIs - gateway
 %%--------------------------------------------------------------------
 
 -spec gateways(Status :: all | running | stopped | unloaded)
@@ -76,3 +91,58 @@ get_listeners_status(GwName, Config) ->
 %% @private
 listener_name(GwName, Type, LisName) ->
     list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])).
+
+%%--------------------------------------------------------------------
+%% Mgmt APIs - clients
+%%--------------------------------------------------------------------
+
+-spec client_lookup(gateway_name(), emqx_type:clientid())
+    -> {ok, {emqx_types:infos(), emqx_types:stats()}}
+     | {error, any()}.
+client_lookup(_GwName, _ClientId) ->
+    %% FIXME: The Gap between `ClientInfo in HTTP-API` and
+    %% ClientInfo defination
+    todo.
+
+-spec client_kickout(gateway_name(), emqx_type:clientid())
+    -> {error, any()}
+     | ok.
+client_kickout(GwName, ClientId) ->
+    emqx_gateway_cm:kick_session(GwName, ClientId).
+
+-spec client_subscriptions(gateway_name(), emqx_type:clientid())
+    -> {error, any()}
+     | {ok, list()}.     %% FIXME: #{<<"t/1">> =>
+                         %%           #{nl => 0,qos => 0,rap => 0,rh => 0,
+                         %%             sub_props => #{}}
+client_subscriptions(_GwName, _ClientId) ->
+    todo.
+
+-spec client_subscribe(gateway_name(), emqx_type:clientid(),
+                       emqx_type:topic(), emqx_type:qos())
+    -> {error, any()}
+     | ok.
+client_subscribe(_GwName, _ClientId, _Topic, _QoS) ->
+    todo.
+
+-spec client_unsubscribe(gateway_name(),
+                         emqx_type:clientid(), emqx_type:topic())
+    -> {error, any()}
+     | ok.
+client_unsubscribe(_GwName, _ClientId, _Topic) ->
+    todo.
+
+%%--------------------------------------------------------------------
+%% Utils
+%%--------------------------------------------------------------------
+
+-spec return_http_error(integer(), binary()) -> binary().
+return_http_error(Code, Msg) ->
+    emqx_json:encode(
+      #{code => codestr(Code),
+        reason => emqx_gateway_utils:stringfy(Msg)
+       }).
+
+codestr(404) -> 'RESOURCE_NOT_FOUND';
+codestr(401) -> 'NOT_SUPPORTED_NOW';
+codestr(500) -> 'UNKNOW_ERROR'.

+ 4 - 0
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -28,6 +28,7 @@
 
 -export([ apply/2
         , format_listenon/1
+        , unix_ts_to_rfc3339/1
         , unix_ts_to_rfc3339/2
         ]).
 
@@ -121,6 +122,9 @@ unix_ts_to_rfc3339(Key, Map) ->
                emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>)}
     end.
 
+unix_ts_to_rfc3339(Ts) ->
+    emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>).
+
 -spec stringfy(term()) -> binary().
 stringfy(T) ->
     iolist_to_binary(io_lib:format("~0p", [T])).