Sfoglia il codice sorgente

Merge pull request #8658 from HJianBo/fix-coap-topic

fix(coap): remove the leading `/` in assembling publish topic
JianBo He 3 anni fa
parent
commit
0167baf24c

+ 1 - 0
CHANGES-5.0.md

@@ -6,6 +6,7 @@
 * Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867)
 * Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867)
 * Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887)
 * Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887)
 * Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893)
 * Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893)
+* Fix the extra / prefix when CoAP gateway parsing client topics. [#8658](https://github.com/emqx/emqx/pull/8658)
 * Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857)
 * Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857)
 
 
 ## Enhancements
 ## Enhancements

+ 6 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_http.erl

@@ -361,7 +361,12 @@ handle_response(Headers, Body) ->
                 _ ->
                 _ ->
                     ignore
                     ignore
             end;
             end;
-        {error, _Reason} ->
+        {error, Reason} ->
+            ?TRACE_AUTHN_PROVIDER(
+                error,
+                "parse_http_response_failed",
+                #{content_type => ContentType, body => Body, reason => Reason}
+            ),
             ignore
             ignore
     end.
     end.
 
 

+ 6 - 1
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -153,7 +153,7 @@ init(
             mountpoint => Mountpoint
             mountpoint => Mountpoint
         }
         }
     ),
     ),
-
+    %% FIXME: it should coap.hearbeat instead of idle_timeout?
     Heartbeat = ?GET_IDLE_TIME(Config),
     Heartbeat = ?GET_IDLE_TIME(Config),
     #channel{
     #channel{
         ctx = Ctx,
         ctx = Ctx,
@@ -447,6 +447,7 @@ enrich_conninfo(
         conninfo = ConnInfo
         conninfo = ConnInfo
     }
     }
 ) ->
 ) ->
+    %% FIXME: generate a random clientid if absent
     case Queries of
     case Queries of
         #{<<"clientid">> := ClientId} ->
         #{<<"clientid">> := ClientId} ->
             Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
             Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
@@ -467,6 +468,9 @@ enrich_clientinfo(
     {Queries, Msg},
     {Queries, Msg},
     Channel = #channel{clientinfo = ClientInfo0}
     Channel = #channel{clientinfo = ClientInfo0}
 ) ->
 ) ->
+    %% FIXME:
+    %% 1. generate a random clientid if absent;
+    %% 2. assgin username, password to `undefined` if absent
     case Queries of
     case Queries of
         #{
         #{
             <<"username">> := UserName,
             <<"username">> := UserName,
@@ -542,6 +546,7 @@ process_connect(
         )
         )
     of
     of
         {ok, _Sess} ->
         {ok, _Sess} ->
+            %% FIXME: Token in cluster wide?
             RandVal = rand:uniform(?TOKEN_MAXIMUM),
             RandVal = rand:uniform(?TOKEN_MAXIMUM),
             Token = erlang:list_to_binary(erlang:integer_to_list(RandVal)),
             Token = erlang:list_to_binary(erlang:integer_to_list(RandVal)),
             NResult = Result#{events => [{event, connected}]},
             NResult = Result#{events => [{event, connected}]},

+ 25 - 30
apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl

@@ -69,17 +69,7 @@ handle_method(_, _, Msg, _, _) ->
 check_topic([]) ->
 check_topic([]) ->
     error;
     error;
 check_topic(Path) ->
 check_topic(Path) ->
-    Sep = <<"/">>,
-    {ok,
-        emqx_http_lib:uri_decode(
-            lists:foldl(
-                fun(Part, Acc) ->
-                    <<Acc/binary, Sep/binary, Part/binary>>
-                end,
-                <<>>,
-                Path
-            )
-        )}.
+    {ok, emqx_http_lib:uri_decode(iolist_to_binary(lists:join(<<"/">>, Path)))}.
 
 
 get_sub_opts(#coap_message{options = Opts} = Msg) ->
 get_sub_opts(#coap_message{options = Opts} = Msg) ->
     SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts),
     SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts),
@@ -124,25 +114,30 @@ get_publish_qos(Msg) ->
     end.
     end.
 
 
 apply_publish_opts(Msg, MQTTMsg) ->
 apply_publish_opts(Msg, MQTTMsg) ->
-    maps:fold(
-        fun
-            (<<"retain">>, V, Acc) ->
-                Val = erlang:binary_to_atom(V),
-                emqx_message:set_flag(retain, Val, Acc);
-            (<<"expiry">>, V, Acc) ->
-                Val = erlang:binary_to_integer(V),
-                Props = emqx_message:get_header(properties, Acc),
-                emqx_message:set_header(
-                    properties,
-                    Props#{'Message-Expiry-Interval' => Val},
-                    Acc
-                );
-            (_, _, Acc) ->
-                Acc
-        end,
-        MQTTMsg,
-        emqx_coap_message:get_option(uri_query, Msg)
-    ).
+    case emqx_coap_message:get_option(uri_query, Msg) of
+        undefined ->
+            MQTTMsg;
+        Qs ->
+            maps:fold(
+                fun
+                    (<<"retain">>, V, Acc) ->
+                        Val = erlang:binary_to_atom(V),
+                        emqx_message:set_flag(retain, Val, Acc);
+                    (<<"expiry">>, V, Acc) ->
+                        Val = erlang:binary_to_integer(V),
+                        Props = emqx_message:get_header(properties, Acc),
+                        emqx_message:set_header(
+                            properties,
+                            Props#{'Message-Expiry-Interval' => Val},
+                            Acc
+                        );
+                    (_, _, Acc) ->
+                        Acc
+                end,
+                MQTTMsg,
+                Qs
+            )
+    end.
 
 
 subscribe(#coap_message{token = <<>>} = Msg, _, _, _) ->
 subscribe(#coap_message{token = <<>>} = Msg, _, _, _) ->
     reply({error, bad_request}, <<"observe without token">>, Msg);
     reply({error, bad_request}, <<"observe without token">>, Msg);

+ 54 - 26
apps/emqx_gateway/test/emqx_coap_SUITE.erl

@@ -143,12 +143,15 @@ t_connection_with_authn_failed(_) ->
     ok.
     ok.
 
 
 t_publish(_) ->
 t_publish(_) ->
-    Action = fun(Channel, Token) ->
-        Topic = <<"/abc">>,
+    %% can publish to a normal topic
+    Topics = [
+        <<"abc">>,
+        %% can publish to a `/` leading topic
+        <<"/abc">>
+    ],
+    Action = fun(Topic, Channel, Token) ->
         Payload = <<"123">>,
         Payload = <<"123">>,
-
-        TopicStr = binary_to_list(Topic),
-        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = pubsub_uri(binary_to_list(Topic), Token),
 
 
         %% Sub topic first
         %% Sub topic first
         emqx:subscribe(Topic),
         emqx:subscribe(Topic),
@@ -164,24 +167,28 @@ t_publish(_) ->
             ?assert(false)
             ?assert(false)
         end
         end
     end,
     end,
-    with_connection(Action).
+    with_connection(Topics, Action).
 
 
 t_subscribe(_) ->
 t_subscribe(_) ->
-    Topic = <<"/abc">>,
-    Fun = fun(Channel, Token) ->
-        TopicStr = binary_to_list(Topic),
+    %% can subscribe to a normal topic
+    Topics = [
+        <<"abc">>,
+        %% can subscribe to a `/` leading topic
+        <<"/abc">>
+    ],
+    Fun = fun(Topic, Channel, Token) ->
         Payload = <<"123">>,
         Payload = <<"123">>,
-
-        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = pubsub_uri(binary_to_list(Topic), Token),
         Req = make_req(get, Payload, [{observe, 0}]),
         Req = make_req(get, Payload, [{observe, 0}]),
         {ok, content, _} = do_request(Channel, URI, Req),
         {ok, content, _} = do_request(Channel, URI, Req),
         ?LOGT("observer topic:~ts~n", [Topic]),
         ?LOGT("observer topic:~ts~n", [Topic]),
 
 
+        %% ensure subscribe succeed
         timer:sleep(100),
         timer:sleep(100),
         [SubPid] = emqx:subscribers(Topic),
         [SubPid] = emqx:subscribers(Topic),
         ?assert(is_pid(SubPid)),
         ?assert(is_pid(SubPid)),
 
 
-        %% Publish a message
+        %% publish a message
         emqx:publish(emqx_message:make(Topic, Payload)),
         emqx:publish(emqx_message:make(Topic, Payload)),
         {ok, content, Notify} = with_response(Channel),
         {ok, content, Notify} = with_response(Channel),
         ?LOGT("observer get Notif=~p", [Notify]),
         ?LOGT("observer get Notif=~p", [Notify]),
@@ -191,18 +198,27 @@ t_subscribe(_) ->
         ?assertEqual(Payload, PayloadRecv)
         ?assertEqual(Payload, PayloadRecv)
     end,
     end,
 
 
-    with_connection(Fun),
-    timer:sleep(100),
+    with_connection(Topics, Fun),
 
 
-    ?assertEqual([], emqx:subscribers(Topic)).
+    %% subscription removed if coap client disconnected
+    timer:sleep(100),
+    lists:foreach(
+        fun(Topic) ->
+            ?assertEqual([], emqx:subscribers(Topic))
+        end,
+        Topics
+    ).
 
 
 t_un_subscribe(_) ->
 t_un_subscribe(_) ->
-    Topic = <<"/abc">>,
-    Fun = fun(Channel, Token) ->
-        TopicStr = binary_to_list(Topic),
+    %% can unsubscribe to a normal topic
+    Topics = [
+        <<"abc">>,
+        %% can unsubscribe to a `/` leading topic
+        <<"/abc">>
+    ],
+    Fun = fun(Topic, Channel, Token) ->
         Payload = <<"123">>,
         Payload = <<"123">>,
-
-        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = pubsub_uri(binary_to_list(Topic), Token),
 
 
         Req = make_req(get, Payload, [{observe, 0}]),
         Req = make_req(get, Payload, [{observe, 0}]),
         {ok, content, _} = do_request(Channel, URI, Req),
         {ok, content, _} = do_request(Channel, URI, Req),
@@ -219,16 +235,15 @@ t_un_subscribe(_) ->
         ?assertEqual([], emqx:subscribers(Topic))
         ?assertEqual([], emqx:subscribers(Topic))
     end,
     end,
 
 
-    with_connection(Fun).
+    with_connection(Topics, Fun).
 
 
 t_observe_wildcard(_) ->
 t_observe_wildcard(_) ->
     Fun = fun(Channel, Token) ->
     Fun = fun(Channel, Token) ->
         %% resolve_url can't process wildcard with #
         %% resolve_url can't process wildcard with #
-        Topic = <<"/abc/+">>,
-        TopicStr = binary_to_list(Topic),
+        Topic = <<"abc/+">>,
         Payload = <<"123">>,
         Payload = <<"123">>,
 
 
-        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = pubsub_uri(binary_to_list(Topic), Token),
         Req = make_req(get, Payload, [{observe, 0}]),
         Req = make_req(get, Payload, [{observe, 0}]),
         {ok, content, _} = do_request(Channel, URI, Req),
         {ok, content, _} = do_request(Channel, URI, Req),
         ?LOGT("observer topic:~ts~n", [Topic]),
         ?LOGT("observer topic:~ts~n", [Topic]),
@@ -238,7 +253,7 @@ t_observe_wildcard(_) ->
         ?assert(is_pid(SubPid)),
         ?assert(is_pid(SubPid)),
 
 
         %% Publish a message
         %% Publish a message
-        PubTopic = <<"/abc/def">>,
+        PubTopic = <<"abc/def">>,
         emqx:publish(emqx_message:make(PubTopic, Payload)),
         emqx:publish(emqx_message:make(PubTopic, Payload)),
         {ok, content, Notify} = with_response(Channel),
         {ok, content, Notify} = with_response(Channel),
 
 
@@ -320,7 +335,7 @@ t_clients_get_subscription_api(_) ->
 
 
         {200, [Subs]} = request(get, Path),
         {200, [Subs]} = request(get, Path),
 
 
-        ?assertEqual(<<"/coap/observe">>, maps:get(topic, Subs)),
+        ?assertEqual(<<"coap/observe">>, maps:get(topic, Subs)),
 
 
         observe(Channel, Token, false),
         observe(Channel, Token, false),
 
 
@@ -386,6 +401,9 @@ observe(Channel, Token, false) ->
     {ok, nocontent, _Data} = do_request(Channel, URI, Req),
     {ok, nocontent, _Data} = do_request(Channel, URI, Req),
     ok.
     ok.
 
 
+pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) ->
+    ?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token.
+
 make_req(Method) ->
 make_req(Method) ->
     make_req(Method, <<>>).
     make_req(Method, <<>>).
 
 
@@ -442,6 +460,16 @@ with_connection(Action) ->
     end,
     end,
     do(Fun).
     do(Fun).
 
 
+with_connection(Checks, Action) ->
+    Fun = fun(Channel) ->
+        Token = connection(Channel),
+        timer:sleep(100),
+        lists:foreach(fun(E) -> Action(E, Channel, Token) end, Checks),
+        disconnection(Channel, Token),
+        timer:sleep(100)
+    end,
+    do(Fun).
+
 receive_deliver(Wait) ->
 receive_deliver(Wait) ->
     receive
     receive
         {deliver, _, Msg} ->
         {deliver, _, Msg} ->

+ 2 - 2
apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl

@@ -98,7 +98,7 @@ t_case_coap_publish(_) ->
     Prefix = Mod:ps_prefix(),
     Prefix = Mod:ps_prefix(),
     Fun = fun(Channel, Token, Topic, Checker) ->
     Fun = fun(Channel, Token, Topic, Checker) ->
         TopicStr = binary_to_list(Topic),
         TopicStr = binary_to_list(Topic),
-        URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
 
 
         Req = Mod:make_req(post, <<>>),
         Req = Mod:make_req(post, <<>>),
         Checker(Mod:do_request(Channel, URI, Req))
         Checker(Mod:do_request(Channel, URI, Req))
@@ -114,7 +114,7 @@ t_case_coap_subscribe(_) ->
     Prefix = Mod:ps_prefix(),
     Prefix = Mod:ps_prefix(),
     Fun = fun(Channel, Token, Topic, Checker) ->
     Fun = fun(Channel, Token, Topic, Checker) ->
         TopicStr = binary_to_list(Topic),
         TopicStr = binary_to_list(Topic),
-        URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
 
 
         Req = Mod:make_req(get, <<>>, [{observe, 0}]),
         Req = Mod:make_req(get, <<>>, [{observe, 0}]),
         Checker(Mod:do_request(Channel, URI, Req))
         Checker(Mod:do_request(Channel, URI, Req))