Przeglądaj źródła

fix(gw): fix the subscription apis bugs

JianBo He 4 lat temu
rodzic
commit
40d34ccd85

+ 15 - 5
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -120,13 +120,22 @@ clients_insta(delete, #{ bindings := #{name := GwName0,
     emqx_gateway_http:kickout_client(GwName, ClientId),
     {200}.
 
+%% FIXME:
+%% List the subscription without mountpoint, but has SubOpts,
+%% for example, share group ...
 subscriptions(get, #{ bindings := #{name := GwName0,
                                     clientid := ClientId0}
                     }) ->
     GwName = binary_to_existing_atom(GwName0),
     ClientId = emqx_mgmt_util:urldecode(ClientId0),
-    {200, emqx_gateway_http:list_client_subscriptions(GwName, ClientId)};
+    case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
+        {error, Reason} ->
+            return_http_error(404, Reason);
+        {ok, Subs} ->
+            {200, Subs}
+    end;
 
+%% Create the subscription without mountpoint
 subscriptions(post, #{ bindings := #{name := GwName0,
                                      clientid := ClientId0},
                        body := Body
@@ -147,6 +156,7 @@ subscriptions(post, #{ bindings := #{name := GwName0,
             end
     end;
 
+%% Remove the subscription without mountpoint
 subscriptions(delete, #{ bindings := #{name := GwName0,
                                        clientid := ClientId0,
                                        topic := Topic0
@@ -166,10 +176,10 @@ subopts(Req) ->
      , rap => maps:get(<<"rap">>, Req, 0)
      , nl => maps:get(<<"nl">>, Req, 0)
      , rh => maps:get(<<"rh">>, Req, 0)
-     , sub_prop => extra_sub_prop(maps:get(<<"sub_prop">>, Req, #{}))
+     , sub_props => extra_sub_props(maps:get(<<"sub_props">>, Req, #{}))
      }.
 
-extra_sub_prop(Props) ->
+extra_sub_props(Props) ->
     maps:filter(
       fun(_, V) -> V =/= undefined end,
       #{subid => maps:get(<<"subid">>, Props, undefined)}
@@ -595,7 +605,7 @@ properties_client() ->
       ]).
 
 properties_subscription() ->
-    ExtraProps = [ {subid, integer,
+    ExtraProps = [ {subid, string,
                     <<"Only stomp protocol, an uniquely identity for "
                       "the subscription. range: 1-65535.">>}
                  ],
@@ -610,5 +620,5 @@ properties_subscription() ->
         <<"Retain as Published option, enum: 0, 1">>}
      , {rh, integer,
         <<"Retain Handling option, enum: 0, 1, 2">>}
-     , {sub_prop, object, ExtraProps}
+     , {sub_props, object, ExtraProps}
      ]).

+ 4 - 0
apps/emqx_gateway/src/emqx_gateway_cm.erl

@@ -48,6 +48,10 @@
         , connection_closed/2
         ]).
 
+-export([ with_channel/3
+        , lookup_channels/2
+        ]).
+
 %% Internal funcs for getting tabname by GatewayId
 -export([cmtabs/1, tabname/2]).
 

+ 27 - 45
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -141,63 +141,44 @@ kickout_client(Node, GwName, ClientId) ->
      | {ok, list()}.
 list_client_subscriptions(GwName, ClientId) ->
     %% Get the subscriptions from session-info
-    case emqx_gateway_cm:get_chan_info(GwName, ClientId) of
-        undefined ->
-            {error, not_found};
-        Infos ->
-            Subs = maps:get(subscriptions, Infos, #{}),
-            maps:fold(fun(K, V, Acc) ->
-                [maps:merge(
-                   #{topic => K},
-                   maps:with([qos, nl, rap, rh], V))
-                 |Acc]
-            end, [], Subs)
-    end.
+    with_channel(GwName, ClientId,
+        fun(Pid) ->
+            Subs = emqx_gateway_conn:call(
+                     Pid, 
+                     subscriptions, ?DEFAULT_CALL_TIMEOUT),
+            {ok, lists:map(fun({Topic, SubOpts}) ->
+                     SubOpts#{topic => Topic}
+                 end, Subs)}
+        end).
 
 -spec client_subscribe(gateway_name(), emqx_type:clientid(),
                        emqx_type:topic(), emqx_type:subopts())
     -> {error, any()}
      | ok.
 client_subscribe(GwName, ClientId, Topic, SubOpts) ->
-    case emqx_gateway_cm:lookup_channels(GwName, ClientId) of
-        [] -> {error, not_found};
-        [Pid] ->
-            %% fixed conn module?
+    with_channel(GwName, ClientId,
+        fun(Pid) ->
             emqx_gateway_conn:call(
               Pid, {subscribe, Topic, SubOpts},
               ?DEFAULT_CALL_TIMEOUT
-             );
-        Pids ->
-            ?LOG(warning, "More than one client process ~p was found "
-                          "clientid ~s", [Pids, ClientId]),
-            _ = [
-                 emqx_gateway_conn:call(
-                  Pid, {subscribe, Topic, SubOpts},
-                  ?DEFAULT_CALL_TIMEOUT
-                 ) || Pid <- Pids],
-            ok
-    end.
+             )
+        end).
 
 -spec client_unsubscribe(gateway_name(),
                          emqx_type:clientid(), emqx_type:topic())
     -> {error, any()}
      | ok.
 client_unsubscribe(GwName, ClientId, Topic) ->
-    case emqx_gateway_cm:lookup_channels(GwName, ClientId) of
-        [] -> {error, not_found};
-        [Pid] ->
+    with_channel(GwName, ClientId,
+        fun(Pid) ->
             emqx_gateway_conn:call(
-              Pid, {unsubscribe, Topic},
-              ?DEFAULT_CALL_TIMEOUT);
-        Pids ->
-            ?LOG(warning, "More than one client process ~p was found "
-                          "clientid ~s", [Pids, ClientId]),
-            _ = [
-                 emqx_gateway_conn:call(
-                   Pid, {unsubscribe, Topic},
-                   ?DEFAULT_CALL_TIMEOUT
-                 ) || Pid <- Pids],
-            ok
+              Pid, {unsubscribe, Topic}, ?DEFAULT_CALL_TIMEOUT)
+        end).
+
+with_channel(GwName, ClientId, Fun) ->
+    case emqx_gateway_cm:with_channel(GwName, ClientId, Fun) of
+        undefined -> {error, not_found};
+        Res -> Res
     end.
 
 %%--------------------------------------------------------------------
@@ -206,10 +187,11 @@ client_unsubscribe(GwName, ClientId, Topic) ->
 
 -spec return_http_error(integer(), binary()) -> binary().
 return_http_error(Code, Msg) ->
-    emqx_json:encode(
-      #{code => codestr(Code),
-        reason => emqx_gateway_utils:stringfy(Msg)
-       }).
+    {Code, emqx_json:encode(
+             #{code => codestr(Code),
+               reason => emqx_gateway_utils:stringfy(Msg)
+              })
+    }.
 
 codestr(404) -> 'RESOURCE_NOT_FOUND';
 codestr(401) -> 'NOT_SUPPORTED_NOW';

+ 2 - 1
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -209,5 +209,6 @@ default_subopts() ->
     #{rh  => 0, %% Retain Handling
       rap => 0, %% Retain as Publish
       nl  => 0, %% No Local
-      qos => 0  %% QoS
+      qos => 0, %% QoS
+      is_new => true
      }.

+ 30 - 22
apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl

@@ -393,11 +393,9 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers),
                 [] ->
                     ErrMsg = "Permission denied",
                     handle_out(error, {receipt_id(Headers), ErrMsg}, Channel);
-                [MountedTopic|_] ->
-                    NChannel1 = NChannel#channel{
-                                  subscriptions = [{SubId, MountedTopic, Ack}
-                                                   | Subs]
-                                 },
+                [{MountedTopic, SubOpts}|_] ->
+                    NSubs = [{SubId, MountedTopic, Ack, SubOpts}|Subs],
+                    NChannel1 = NChannel#channel{subscriptions = NSubs},
                     handle_out(receipt, receipt_id(Headers), NChannel1)
             end;
         {error, ErrMsg, NChannel} ->
@@ -415,7 +413,7 @@ handle_in(?PACKET(?CMD_UNSUBSCRIBE, Headers),
     SubId = header(<<"id">>, Headers),
     {ok, NChannel} =
         case lists:keyfind(SubId, 1, Subs) of
-            {SubId, MountedTopic, _Ack} ->
+            {SubId, MountedTopic, _Ack, _SubOpts} ->
                 Topic = emqx_mountpoint:unmount(Mountpoint, MountedTopic),
                 %% XXX: eval the return topics?
                 _ = run_hooks(Ctx, 'client.unsubscribe',
@@ -539,15 +537,16 @@ trans_pipeline([{Func, Args}|More], Outgoings, Channel) ->
 %% Subs
 
 parse_topic_filter({SubId, Topic}, Channel) ->
-    TopicFilter = emqx_topic:parse(Topic),
-    {ok, {SubId, TopicFilter}, Channel}.
+    {ParsedTopic, SubOpts} = emqx_topic:parse(Topic),
+    NSubOpts = SubOpts#{sub_props => #{subid => SubId}},
+    {ok, {SubId, {ParsedTopic, NSubOpts}}, Channel}.
 
-check_subscribed_status({SubId, TopicFilter},
+check_subscribed_status({SubId, {ParsedTopic, _SubOpts}},
                         #channel{
                            subscriptions = Subs,
                            clientinfo = #{mountpoint := Mountpoint}
                           }) ->
-    MountedTopic = emqx_mountpoint:mount(Mountpoint, TopicFilter),
+    MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic),
     case lists:keyfind(SubId, 1, Subs) of
         {SubId, MountedTopic, _Ack} ->
             ok;
@@ -557,11 +556,11 @@ check_subscribed_status({SubId, TopicFilter},
             ok
     end.
 
-check_sub_acl({_SubId, TopicFilter},
+check_sub_acl({_SubId, {ParsedTopic, _SubOpts}},
               #channel{
                  ctx = Ctx,
                  clientinfo = ClientInfo}) ->
-    case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of
+    case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, ParsedTopic) of
         deny -> {error, "ACL Deny"};
         allow -> ok
     end.
@@ -571,17 +570,17 @@ do_subscribe(TopicFilters, Channel) ->
 
 do_subscribe([], _Channel, Acc) ->
     lists:reverse(Acc);
-do_subscribe([{TopicFilter, Option}|More],
+do_subscribe([{ParsedTopic, SubOpts0}|More],
              Channel = #channel{
                           ctx = Ctx,
                           clientinfo = ClientInfo
                                      = #{clientid := ClientId,
                                          mountpoint := Mountpoint}}, Acc) ->
-    SubOpts = maps:merge(emqx_gateway_utils:default_subopts(), Option),
-    MountedTopic = emqx_mountpoint:mount(Mountpoint, TopicFilter),
+    SubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts0),
+    MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic),
     _ = emqx_broker:subscribe(MountedTopic, ClientId, SubOpts),
     run_hooks(Ctx, 'session.subscribed', [ClientInfo, MountedTopic, SubOpts]),
-    do_subscribe(More, Channel, [MountedTopic|Acc]).
+    do_subscribe(More, Channel, [{MountedTopic, SubOpts}|Acc]).
 
 %%--------------------------------------------------------------------
 %% Handle outgoing packet
@@ -631,7 +630,7 @@ handle_call({subscribe, Topic, SubOpts},
                          subscriptions = Subs
                         }) ->
     case maps:get(subid,
-                  maps:get(sub_prop, SubOpts, #{}),
+                  maps:get(sub_props, SubOpts, #{}),
                   undefined) of
         undefined ->
             reply({error, no_subid}, Channel);
@@ -641,11 +640,12 @@ handle_call({subscribe, Topic, SubOpts},
                  , fun check_subscribed_status/2
                  ], {SubId, {Topic, SubOpts}}, Channel) of
                 {ok, {_, TopicFilter}, NChannel} ->
-                    [MountedTopic] = do_subscribe([TopicFilter], NChannel),
-                    NChannel1 = NChannel#channel{
-                                  subscriptions =
-                                    [{SubId, MountedTopic, <<"auto">>}|Subs]
-                                 },
+                    [{MountedTopic, NSubOpts}] = do_subscribe(
+                                                   [TopicFilter],
+                                                   NChannel
+                                                  ),
+                    NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts}|Subs],
+                    NChannel1 = NChannel#channel{subscriptions = NSubs},
                     reply(ok, NChannel1);
                 {error, ErrMsg, NChannel} ->
                     ?LOG(error, "Failed to subscribe topic ~s, reason: ~s",
@@ -670,6 +670,14 @@ handle_call({unsubscribe, Topic},
             subscriptions = lists:keydelete(MountedTopic, 2, Subs)}
          );
 
+%% Reply :: [{emqx_types:topic(), emqx_types:subopts()}]
+handle_call(subscriptions, Channel = #channel{subscriptions = Subs}) ->
+    Reply = lists:map(
+              fun({_SubId, Topic, _Ack, SubOpts}) ->
+                {Topic, SubOpts}
+              end, Subs),
+    reply(Reply, Channel);
+
 handle_call(kick, Channel) ->
     NChannel = ensure_disconnected(kicked, Channel),
     Frame = error_frame(undefined, <<"Kicked out">>),