Просмотр исходного кода

feat(gw): support the sub/unsub operation

JianBo He 4 лет назад
Родитель
Сommit
1748de5ee3

+ 37 - 13
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -18,6 +18,8 @@
 
 -behaviour(minirest_api).
 
+-include_lib("emqx/include/logger.hrl").
+
 %% minirest behaviour callbacks
 -export([api_spec/0]).
 
@@ -92,20 +94,22 @@ clients(get, #{ bindings := #{name := GwName0}
     end.
 
 clients_insta(get, #{ bindings := #{name := GwName0,
-                                    clientid := ClientId}
+                                    clientid := ClientId0}
                     }) ->
     GwName = binary_to_existing_atom(GwName0),
-    TabName = emqx_gateway_cm:tabname(info, GwName),
-    %% XXX: We need a lookuo function for it instead of a query
-    #{data := Data} = emqx_mgmt_api:cluster_query(
-                        #{<<"clientid">> => ClientId},
-                        TabName, ?CLIENT_QS_SCHEMA, ?query_fun
-                       ),
-    case Data of
+    ClientId = emqx_mgmt_util:urldecode(ClientId0),
+
+    case emqx_gateway_http:lookup_client(GwName, ClientId,
+                                         {?MODULE, format_channel_info}) of
         [ClientInfo] ->
             {200, ClientInfo};
+        [ClientInfo|_More] ->
+            ?LOG(warning, "More than one client info was returned on ~s",
+                          [ClientId]),
+            {200, ClientInfo};
         [] ->
             return_http_error(404, <<"Gateway or ClientId not found">>)
+
     end;
 
 clients_insta(delete, #{ bindings := #{name := GwName0,
@@ -113,7 +117,7 @@ clients_insta(delete, #{ bindings := #{name := GwName0,
                        }) ->
     GwName = binary_to_existing_atom(GwName0),
     ClientId = emqx_mgmt_util:urldecode(ClientId0),
-    emqx_gateway_http:client_kickout(GwName, ClientId),
+    emqx_gateway_http:kickout_client(GwName, ClientId),
     {200}.
 
 subscriptions(get, #{ bindings := #{name := GwName0,
@@ -121,8 +125,7 @@ subscriptions(get, #{ bindings := #{name := GwName0,
                     }) ->
     GwName = binary_to_existing_atom(GwName0),
     ClientId = emqx_mgmt_util:urldecode(ClientId0),
-    emqx_gateway_http:client_subscriptions(GwName, ClientId),
-    {200, []};
+    {200, emqx_gateway_http:list_client_subscriptions(GwName, ClientId)};
 
 subscriptions(post, #{ bindings := #{name := GwName0,
                                      clientid := ClientId0},
@@ -131,8 +134,7 @@ subscriptions(post, #{ bindings := #{name := GwName0,
     GwName = binary_to_existing_atom(GwName0),
     ClientId = emqx_mgmt_util:urldecode(ClientId0),
 
-    case {maps:get(<<"topic">>, Body, undefined),
-          maps:get(<<"qos">>, Body, 0)} of
+    case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
         {undefined, _} ->
             %% FIXME: more reasonable error code??
             return_http_error(404, <<"Request paramter missed: topic">>);
@@ -156,6 +158,23 @@ subscriptions(delete, #{ bindings := #{name := GwName0,
     _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
     {200}.
 
+%%--------------------------------------------------------------------
+%% Utils
+
+subopts(Req) ->
+    #{ qos => maps:get(<<"qos">>, Req, 0)
+     , rap => maps:get(<<"rap">>, Req, 0)
+     , nl => maps:get(<<"nl">>, Req, 0)
+     , rh => maps:get(<<"rh">>, Req, 0)
+     , sub_prop => extra_sub_prop(maps:get(<<"sub_prop">>, Req, #{}))
+     }.
+
+extra_sub_prop(Props) ->
+    maps:filter(
+      fun(_, V) -> V =/= undefined end,
+      #{subid => maps:get(<<"subid">>, Props, undefined)}
+     ).
+
 %%--------------------------------------------------------------------
 %% query funcs
 
@@ -576,6 +595,10 @@ properties_client() ->
       ]).
 
 properties_subscription() ->
+    ExtraProps = [ {subid, integer,
+                    <<"Only stomp protocol, an uniquely identity for "
+                      "the subscription. range: 1-65535.">>}
+                 ],
     emqx_mgmt_util:properties(
      [ {topic, string,
         <<"Topic Fillter">>}
@@ -587,4 +610,5 @@ properties_subscription() ->
         <<"Retain as Published option, enum: 0, 1">>}
      , {rh, integer,
         <<"Retain Handling option, enum: 0, 1, 2">>}
+     , {sub_prop, object, ExtraProps}
      ]).

+ 101 - 24
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -18,17 +18,20 @@
 -module(emqx_gateway_http).
 
 -include("include/emqx_gateway.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 %% Mgmt APIs - gateway
 -export([ gateways/1
         ]).
 
 %% Mgmt APIs - clients
--export([ client_lookup/2
-        , client_kickout/2
+-export([ lookup_client/3
+        , lookup_client/4
+        , kickout_client/2
+        , kickout_client/3
+        , list_client_subscriptions/2
         , client_subscribe/4
         , client_unsubscribe/3
-        , client_subscriptions/2
         ]).
 
 %% Utils for http, swagger, etc.
@@ -44,6 +47,8 @@
          , listeners => []
          }.
 
+-define(DEFAULT_CALL_TIMEOUT, 15000).
+
 %%--------------------------------------------------------------------
 %% Mgmt APIs - gateway
 %%--------------------------------------------------------------------
@@ -96,41 +101,104 @@ listener_name(GwName, Type, LisName) ->
 %% Mgmt APIs - clients
 %%--------------------------------------------------------------------
 
--spec client_lookup(gateway_name(), emqx_type:clientid())
-    -> {ok, {emqx_types:infos(), emqx_types:stats()}}
-     | {error, any()}.
-client_lookup(_GwName, _ClientId) ->
-    %% FIXME: The Gap between `ClientInfo in HTTP-API` and
-    %% ClientInfo defination
-    todo.
+-spec lookup_client(gateway_name(), emqx_type:clientid(), function()) -> list().
+lookup_client(GwName, ClientId, FormatFun) ->
+    lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun)
+                  || Node <- ekka_mnesia:running_nodes()]).
+
+lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() ->
+    ChanTab = emqx_gateway_cm:tabname(chan, GwName),
+    InfoTab = emqx_gateway_cm:tabname(info, GwName),
+
+    lists:append(lists:map(
+      fun(Key) ->
+        lists:map(fun M:F/1, ets:lookup(InfoTab, Key))
+      end, ets:lookup(ChanTab, ClientId)));
+
+lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) ->
+    rpc_call(Node, lookup_client,
+             [Node, GwName, {clientid, ClientId}, FormatFun]).
 
--spec client_kickout(gateway_name(), emqx_type:clientid())
+-spec kickout_client(gateway_name(), emqx_type:clientid())
     -> {error, any()}
      | ok.
-client_kickout(GwName, ClientId) ->
-    emqx_gateway_cm:kick_session(GwName, ClientId).
+kickout_client(GwName, ClientId) ->
+    Results = [kickout_client(Node, GwName, ClientId)
+               || Node <- ekka_mnesia:running_nodes()],
+    case lists:any(fun(Item) -> Item =:= ok end, Results) of
+        true  -> ok;
+        false -> lists:last(Results)
+    end.
+
+kickout_client(Node, GwName, ClientId) when Node =:= node() ->
+    emqx_gateway_cm:kick_session(GwName, ClientId);
+
+kickout_client(Node, GwName, ClientId) ->
+    rpc_call(Node, kickout_client, [Node, GwName, ClientId]).
 
--spec client_subscriptions(gateway_name(), emqx_type:clientid())
+-spec list_client_subscriptions(gateway_name(), emqx_type:clientid())
     -> {error, any()}
-     | {ok, list()}.     %% FIXME: #{<<"t/1">> =>
-                         %%           #{nl => 0,qos => 0,rap => 0,rh => 0,
-                         %%             sub_props => #{}}
-client_subscriptions(_GwName, _ClientId) ->
-    todo.
+     | {ok, list()}.
+list_client_subscriptions(GwName, ClientId) ->
+    %% Get the subscriptions from session-info
+    case emqx_gateway_cm:get_chan_info(GwName, ClientId) of
+        undefined ->
+            {error, not_found};
+        Infos ->
+            Subs = maps:get(subscriptions, Infos, #{}),
+            maps:fold(fun(K, V, Acc) ->
+                [maps:merge(
+                   #{topic => K},
+                   maps:with([qos, nl, rap, rh], V))
+                 |Acc]
+            end, [], Subs)
+    end.
 
 -spec client_subscribe(gateway_name(), emqx_type:clientid(),
-                       emqx_type:topic(), emqx_type:qos())
+                       emqx_type:topic(), emqx_type:subopts())
     -> {error, any()}
      | ok.
-client_subscribe(_GwName, _ClientId, _Topic, _QoS) ->
-    todo.
+client_subscribe(GwName, ClientId, Topic, SubOpts) ->
+    case emqx_gateway_cm:lookup_channels(GwName, ClientId) of
+        [] -> {error, not_found};
+        [Pid] ->
+            %% fixed conn module?
+            emqx_gateway_conn:call(
+              Pid, {subscribe, Topic, SubOpts},
+              ?DEFAULT_CALL_TIMEOUT
+             );
+        Pids ->
+            ?LOG(warning, "More than one client process ~p was found "
+                          "clientid ~s", [Pids, ClientId]),
+            _ = [
+                 emqx_gateway_conn:call(
+                  Pid, {subscribe, Topic, SubOpts},
+                  ?DEFAULT_CALL_TIMEOUT
+                 ) || Pid <- Pids],
+            ok
+    end.
 
 -spec client_unsubscribe(gateway_name(),
                          emqx_type:clientid(), emqx_type:topic())
     -> {error, any()}
      | ok.
-client_unsubscribe(_GwName, _ClientId, _Topic) ->
-    todo.
+client_unsubscribe(GwName, ClientId, Topic) ->
+    case emqx_gateway_cm:lookup_channels(GwName, ClientId) of
+        [] -> {error, not_found};
+        [Pid] ->
+            emqx_gateway_conn:call(
+              Pid, {unsubscribe, Topic},
+              ?DEFAULT_CALL_TIMEOUT);
+        Pids ->
+            ?LOG(warning, "More than one client process ~p was found "
+                          "clientid ~s", [Pids, ClientId]),
+            _ = [
+                 emqx_gateway_conn:call(
+                   Pid, {unsubscribe, Topic},
+                   ?DEFAULT_CALL_TIMEOUT
+                 ) || Pid <- Pids],
+            ok
+    end.
 
 %%--------------------------------------------------------------------
 %% Utils
@@ -146,3 +214,12 @@ return_http_error(Code, Msg) ->
 codestr(404) -> 'RESOURCE_NOT_FOUND';
 codestr(401) -> 'NOT_SUPPORTED_NOW';
 codestr(500) -> 'UNKNOW_ERROR'.
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+
+rpc_call(Node, Fun, Args) ->
+    case rpc:call(Node, ?MODULE, Fun, Args) of
+        {badrpc, Reason} -> {error, Reason};
+        Res -> Res
+    end.

+ 10 - 8
apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl

@@ -310,7 +310,7 @@ handle_call({start_timer, keepalive, Interval},
     NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
     {reply, ok, ensure_keepalive(NChannel)};
 
-handle_call({subscribe, TopicFilter, Qos},
+handle_call({subscribe_from_client, TopicFilter, Qos},
             Channel = #channel{
                          ctx = Ctx,
                          conn_state = connected,
@@ -323,11 +323,19 @@ handle_call({subscribe, TopicFilter, Qos},
             {reply, ok, NChannel}
     end;
 
-handle_call({unsubscribe, TopicFilter},
+handle_call({subscribe, Topic, SubOpts}, Channel) ->
+    {ok, NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
+    {reply, ok, NChannel};
+
+handle_call({unsubscribe_from_client, TopicFilter},
             Channel = #channel{conn_state = connected}) ->
     {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
     {reply, ok, NChannel};
 
+handle_call({unsubscribe, Topic}, Channel) ->
+    {ok, NChannel} = do_unsubscribe([Topic], Channel),
+    {reply, ok, NChannel};
+
 handle_call({publish, Topic, Qos, Payload},
             Channel = #channel{
                          ctx = Ctx,
@@ -363,12 +371,6 @@ handle_cast(Req, Channel) ->
 -spec handle_info(any(), channel())
     -> {ok, channel()}
      | {shutdown, Reason :: term(), channel()}.
-handle_info({subscribe, TopicFilters}, Channel) ->
-    do_subscribe(TopicFilters, Channel);
-
-handle_info({unsubscribe, TopicFilters}, Channel) ->
-    do_unsubscribe(TopicFilters, Channel);
-
 handle_info({sock_closed, Reason},
             Channel = #channel{rqueue = Queue, inflight = Inflight}) ->
     case queue:len(Queue) =:= 0

+ 2 - 2
apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl

@@ -96,7 +96,7 @@ publish(Req, Md) ->
 subscribe(Req = #{conn := Conn, topic := Topic, qos := Qos}, Md)
   when ?IS_QOS(Qos) ->
     ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
-    {ok, response(call(Conn, {subscribe, Topic, Qos})), Md};
+    {ok, response(call(Conn, {subscribe_from_client, Topic, Qos})), Md};
 
 subscribe(Req, Md) ->
     ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
@@ -107,7 +107,7 @@ subscribe(Req, Md) ->
      | {error, grpc_cowboy_h:error_response()}.
 unsubscribe(Req = #{conn := Conn, topic := Topic}, Md) ->
     ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
-    {ok, response(call(Conn, {unsubscribe, Topic})), Md}.
+    {ok, response(call(Conn, {unsubscribe_from_client, Topic})), Md}.
 
 %%--------------------------------------------------------------------
 %% Internal funcs

+ 6 - 12
apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

@@ -1102,6 +1102,12 @@ message_to_packet(MsgId, Message,
        | {shutdown, Reason :: term(), Reply :: term(), channel()}
        | {shutdown, Reason :: term(), Reply :: term(),
           emqx_types:packet(), channel()}.
+handle_call({subscribe, _Topic, _Subopts}, Channel) ->
+    reply({error, not_supported_now}, Channel);
+
+handle_call({unsubscribe, _Topic}, Channel) ->
+    reply({error, not_supported_now}, Channel);
+
 handle_call(kick, Channel) ->
     NChannel = ensure_disconnected(kicked, Channel),
     shutdown_and_reply(kicked, ok, NChannel);
@@ -1150,18 +1156,6 @@ handle_cast(_Req, Channel) ->
 -spec handle_info(Info :: term(), channel())
       -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
 
-%% XXX: Received from the emqx-management ???
-%handle_info({subscribe, TopicFilters}, Channel ) ->
-%    {_, NChannel} = lists:foldl(
-%        fun({TopicFilter, SubOpts}, {_, ChannelAcc}) ->
-%            do_subscribe(TopicFilter, SubOpts, ChannelAcc)
-%        end, {[], Channel}, parse_topic_filters(TopicFilters)),
-%    {ok, NChannel};
-%
-%handle_info({unsubscribe, TopicFilters}, Channel) ->
-%    {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel),
-%    {ok, NChannel};
-
 handle_info({sock_closed, Reason},
             Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);

+ 44 - 12
apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl

@@ -626,6 +626,50 @@ handle_out(receipt, ReceiptId, Channel) ->
       -> {reply, Reply :: term(), channel()}
        | {shutdown, Reason :: term(), Reply :: term(), channel()}
        | {shutdown, Reason :: term(), Reply :: term(), stomp_frame(), channel()}).
+handle_call({subscribe, Topic, SubOpts},
+            Channel = #channel{
+                         subscriptions = Subs
+                        }) ->
+    case maps:get(subid,
+                  maps:get(sub_prop, SubOpts, #{}),
+                  undefined) of
+        undefined ->
+            reply({error, no_subid}, Channel);
+        SubId ->
+            case emqx_misc:pipeline(
+                 [ fun parse_topic_filter/2
+                 , fun check_subscribed_status/2
+                 ], {SubId, {Topic, SubOpts}}, Channel) of
+                {ok, {_, TopicFilter}, NChannel} ->
+                    [MountedTopic] = do_subscribe([TopicFilter], NChannel),
+                    NChannel1 = NChannel#channel{
+                                  subscriptions =
+                                    [{SubId, MountedTopic, <<"auto">>}|Subs]
+                                 },
+                    reply(ok, NChannel1);
+                {error, ErrMsg, NChannel} ->
+                    ?LOG(error, "Failed to subscribe topic ~s, reason: ~s",
+                                [Topic, ErrMsg]),
+                    reply({error, ErrMsg}, NChannel)
+            end
+    end;
+
+handle_call({unsubscribe, Topic},
+            Channel = #channel{
+                         ctx = Ctx,
+                         clientinfo = ClientInfo = #{mountpoint := Mountpoint},
+                         subscriptions = Subs
+                        }) ->
+    {ParsedTopic, _SubOpts} = emqx_topic:parse(Topic),
+    MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic),
+    ok = emqx_broker:unsubscribe(MountedTopic),
+    _ = run_hooks(Ctx, 'session.unsubscribe',
+                  [ClientInfo, MountedTopic, #{}]),
+    reply(ok,
+          Channel#channel{
+            subscriptions = lists:keydelete(MountedTopic, 2, Subs)}
+         );
+
 handle_call(kick, Channel) ->
     NChannel = ensure_disconnected(kicked, Channel),
     Frame = error_frame(undefined, <<"Kicked out">>),
@@ -678,18 +722,6 @@ handle_cast(_Req, Channel) ->
 -spec(handle_info(Info :: term(), channel())
       -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
 
-%% XXX: Received from the emqx-management ???
-%handle_info({subscribe, TopicFilters}, Channel ) ->
-%    {_, NChannel} = lists:foldl(
-%        fun({TopicFilter, SubOpts}, {_, ChannelAcc}) ->
-%            do_subscribe(TopicFilter, SubOpts, ChannelAcc)
-%        end, {[], Channel}, parse_topic_filters(TopicFilters)),
-%    {ok, NChannel};
-%
-%handle_info({unsubscribe, TopicFilters}, Channel) ->
-%    {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel),
-%    {ok, NChannel};
-
 handle_info({sock_closed, Reason},
             Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);