فهرست منبع

Subscribe or unsubscribe via HTTP API skip ACL checking

zhouzb 5 سال پیش
والد
کامیت
f456f40c59
1فایلهای تغییر یافته به همراه37 افزوده شده و 0 حذف شده
  1. 37 0
      src/emqx_channel.erl

+ 37 - 0
src/emqx_channel.erl

@@ -538,6 +538,21 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
         {error, RC} -> {RC, Channel}
     end.
 
+-compile({inline, [process_force_subscribe/2]}).
+process_force_subscribe(Subscriptions, Channel =
+             #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
+                      session = Session}) ->
+    lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) ->
+                    NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
+                    NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc),
+                    case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
+                        {ok, NSession} ->
+                            {ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}};
+                        {error, ReasonCode} ->
+                            {ReasonCodes ++ [ReasonCode], ChannelAcc}
+                    end
+                end, {[], Channel}, Subscriptions).
+
 %%--------------------------------------------------------------------
 %% Process Unsubscribe
 %%--------------------------------------------------------------------
@@ -563,6 +578,20 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
         {error, RC} -> {RC, Channel}
     end.
 
+-compile({inline, [process_force_unsubscribe/2]}).
+process_force_unsubscribe(Subscriptions, Channel =
+               #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
+                        session = Session}) ->
+    lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) ->
+                    NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
+                    case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of
+                        {ok, NSession} ->
+                            {ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}};
+                        {error, ReasonCode} ->
+                            {ReasonCodes ++ [ReasonCode], ChannelAcc}
+                    end
+                end, {[], Channel}, Subscriptions).
+
 %%--------------------------------------------------------------------
 %% Process Disconnect
 %%--------------------------------------------------------------------
@@ -818,6 +847,10 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf
     {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
     {ok, NChannel};
 
+handle_info({force_subscribe, TopicFilters}, Channel) ->
+    {_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel),
+    {ok, NChannel};
+
 handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
     TopicFilters1 = run_hooks('client.unsubscribe',
                               [ClientInfo, #{'Internal' => true}],
@@ -826,6 +859,10 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
     {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
     {ok, NChannel};
 
+handle_info({force_unsubscribe, TopicFilters}, Channel) ->
+    {_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel),
+    {ok, NChannel};
+
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);