Просмотр исходного кода

fix(coap): Fixed that incorrect sub-options were displayed in CoAP gateways

firest 1 год назад
Родитель
Сommit
52f0f403f3

+ 6 - 0
apps/emqx_gateway_coap/include/emqx_coap.hrl

@@ -91,4 +91,10 @@
     {<<"r">>, <<"retain">>}
 ]).
 
+-type sub_data() :: #{
+    topic := emqx_types:topic(),
+    token := binary(),
+    subopts := emqx_types:subopts()
+}.
+
 -endif.

+ 2 - 3
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -264,7 +264,7 @@ handle_call(
         [ClientInfo, MountedTopic, NSubOpts]
     ),
     %% modify session state
-    SubReq = {Topic, Token},
+    SubReq = #{topic => Topic, token => Token, subopts => NSubOpts},
     TempMsg = #coap_message{type = non},
     %% FIXME: The subopts is not used for emqx_coap_session
     Result = emqx_coap_session:process_subscribe(
@@ -438,14 +438,13 @@ check_token(
             <<"token">> := Token
         } ->
             call_session(handle_request, Msg, Channel);
-        Any ->
+        _ ->
             %% This channel is create by this DELETE command, so here can safely close this channel
             case Token =:= undefined andalso is_delete_connection_request(Msg) of
                 true ->
                     Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
                     {shutdown, normal, Reply, Channel};
                 false ->
-                    io:format(">>> C1:~p, T1:~p~nC2:~p~n", [ClientId, Token, Any]),
                     ErrMsg = <<"Missing token or clientid in connection mode">>,
                     Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
                     {ok, {outgoing, Reply}, Channel}

+ 19 - 10
apps/emqx_gateway_coap/src/emqx_coap_observe_res.erl

@@ -16,10 +16,12 @@
 
 -module(emqx_coap_observe_res).
 
+-include("emqx_coap.hrl").
+
 %% API
 -export([
     new_manager/0,
-    insert/3,
+    insert/2,
     remove/2,
     res_changed/2,
     foreach/2,
@@ -34,7 +36,8 @@
 
 -type res() :: #{
     token := token(),
-    seq_id := seq_id()
+    seq_id := seq_id(),
+    subopts := emqx_types:subopts()
 }.
 
 -type manager() :: #{emqx_types:topic() => res()}.
@@ -46,12 +49,12 @@
 new_manager() ->
     #{}.
 
--spec insert(emqx_types:topic(), token(), manager()) -> {seq_id(), manager()}.
-insert(Topic, Token, Manager) ->
+-spec insert(sub_data(), manager()) -> {seq_id(), manager()}.
+insert(#{topic := Topic, token := Token, subopts := SubOpts}, Manager) ->
     Res =
         case maps:get(Topic, Manager, undefined) of
             undefined ->
-                new_res(Token);
+                new_res(Token, SubOpts);
             Any ->
                 Any
         end,
@@ -84,18 +87,24 @@ foreach(F, Manager) ->
     ),
     ok.
 
--spec subscriptions(manager()) -> [emqx_types:topic()].
+-spec subscriptions(manager()) -> _.
 subscriptions(Manager) ->
-    maps:keys(Manager).
+    maps:map(
+        fun(_Topic, #{subopts := SubOpts}) ->
+            SubOpts
+        end,
+        Manager
+    ).
 
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
--spec new_res(token()) -> res().
-new_res(Token) ->
+-spec new_res(token(), emqx_types:subopts()) -> res().
+new_res(Token, SubOpts) ->
     #{
         token => Token,
-        seq_id => 0
+        seq_id => 0,
+        subopts => SubOpts
     }.
 
 -spec res_changed(res()) -> res().

+ 11 - 2
apps/emqx_gateway_coap/src/emqx_coap_pubsub_handler.erl

@@ -28,7 +28,16 @@
 -import(emqx_coap_channel, [run_hooks/3]).
 
 -define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}).
--define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}).
+-define(SUB(Topic, Token, Opts, Msg), #{
+    subscribe => {
+        #{
+            topic => Topic,
+            token => Token,
+            subopts => Opts
+        },
+        Msg
+    }
+}).
 -define(SUBOPTS, #{qos => 0, rh => 1, rap => 0, nl => 0, is_new => false}).
 
 %% TODO maybe can merge this code into emqx_coap_session, simplify the call chain
@@ -172,7 +181,7 @@ subscribe(#coap_message{token = Token} = Msg, Topic, Ctx, CInfo) ->
             MountTopic = mount(CInfo, Topic),
             emqx_broker:subscribe(MountTopic, ClientId, SubOpts),
             run_hooks(Ctx, 'session.subscribed', [CInfo, MountTopic, SubOpts]),
-            ?SUB(MountTopic, Token, Msg);
+            ?SUB(MountTopic, Token, SubOpts, Msg);
         _ ->
             reply({error, unauthorized}, Msg)
     end.

+ 6 - 9
apps/emqx_gateway_coap/src/emqx_coap_session.erl

@@ -100,14 +100,9 @@ info(Session) ->
 info(Keys, Session) when is_list(Keys) ->
     [{Key, info(Key, Session)} || Key <- Keys];
 info(subscriptions, #session{observe_manager = OM}) ->
-    Topics = emqx_coap_observe_res:subscriptions(OM),
-    lists:foldl(
-        fun(T, Acc) -> Acc#{T => emqx_gateway_utils:default_subopts()} end,
-        #{},
-        Topics
-    );
+    emqx_coap_observe_res:subscriptions(OM);
 info(subscriptions_cnt, #session{observe_manager = OM}) ->
-    erlang:length(emqx_coap_observe_res:subscriptions(OM));
+    maps:size(emqx_coap_observe_res:subscriptions(OM));
 info(subscriptions_max, _) ->
     infinity;
 info(upgrade_qos, _) ->
@@ -229,8 +224,10 @@ process_subscribe(
     case Sub of
         undefined ->
             Result;
-        {Topic, Token} ->
-            {SeqId, OM2} = emqx_coap_observe_res:insert(Topic, Token, OM),
+        #{
+            topic := _Topic
+        } = SubData ->
+            {SeqId, OM2} = emqx_coap_observe_res:insert(SubData, OM),
             Replay = emqx_coap_message:piggyback({ok, content}, Msg),
             Replay2 = Replay#coap_message{options = #{observe => SeqId}},
             Result#{

+ 2 - 0
changes/ce/fix-13607.en.md

@@ -0,0 +1,2 @@
+Fixed that the CoAP subscription QoS displayed through the API was inconsistent with the actual QoS.
+