Przeglądaj źródła

Merge pull request #10871 from HJianBo/fix-coap-bugs

fix(coap): to better handle coap requests in connection mode
JianBo He 2 lat temu
rodzic
commit
8ce0132569

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway, [
     {description, "The Gateway management application"},
-    {vsn, "0.1.18"},
+    {vsn, "0.1.19"},
     {registered, []},
     {mod, {emqx_gateway_app, []}},
     {applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]},

+ 1 - 0
apps/emqx_gateway/src/emqx_gateway_cm.erl

@@ -33,6 +33,7 @@
 -export([
     open_session/5,
     open_session/6,
+    discard_session/2,
     kick_session/2,
     kick_session/3,
     takeover_session/2,

+ 86 - 35
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -304,6 +304,8 @@ handle_call(
 handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
     Subs = emqx_coap_session:info(subscriptions, Session),
     {reply, {ok, maps:to_list(Subs)}, Channel};
+handle_call({check_token, ReqToken}, _From, Channel = #channel{token = Token}) ->
+    {reply, ReqToken == Token, Channel};
 handle_call(kick, _From, Channel) ->
     NChannel = ensure_disconnected(kicked, Channel),
     shutdown_and_reply(kicked, ok, NChannel);
@@ -319,6 +321,9 @@ handle_call(Req, _From, Channel) ->
 
 -spec handle_cast(Req :: term(), channel()) ->
     ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
+handle_cast(close, Channel) ->
+    ?SLOG(info, #{msg => "close_connection"}),
+    shutdown(normal, Channel);
 handle_cast(Req, Channel) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
     {ok, Channel}.
@@ -376,18 +381,54 @@ ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) ->
     Heartbeat = emqx_keepalive:info(interval, KeepAlive),
     Fun(keepalive, Heartbeat, keepalive, Channel).
 
-check_auth_state(Msg, #channel{connection_required = Required} = Channel) ->
-    check_token(Required, Msg, Channel).
+check_auth_state(Msg, #channel{connection_required = true} = Channel) ->
+    case is_create_connection_request(Msg) of
+        true ->
+            call_session(handle_request, Msg, Channel);
+        false ->
+            URIQuery = emqx_coap_message:get_option(uri_query, Msg, #{}),
+            case maps:get(<<"token">>, URIQuery, undefined) of
+                undefined ->
+                    ?SLOG(debug, #{msg => "token_required_in_conn_mode", message => Msg});
+                _ ->
+                    check_token(Msg, Channel)
+            end
+    end.
+
+is_create_connection_request(Msg = #coap_message{method = Method}) when
+    is_atom(Method) andalso Method =/= undefined
+->
+    URIPath = emqx_coap_message:get_option(uri_path, Msg, []),
+    case URIPath of
+        [<<"mqtt">>, <<"connection">>] when Method == post ->
+            true;
+        _ ->
+            false
+    end;
+is_create_connection_request(_Msg) ->
+    false.
+
+is_delete_connection_request(Msg = #coap_message{method = Method}) when
+    is_atom(Method) andalso Method =/= undefined
+->
+    URIPath = emqx_coap_message:get_option(uri_path, Msg, []),
+    case URIPath of
+        [<<"mqtt">>, <<"connection">>] when Method == delete ->
+            true;
+        _ ->
+            false
+    end;
+is_delete_connection_request(_Msg) ->
+    false.
 
 check_token(
-    true,
     Msg,
     #channel{
         token = Token,
-        clientinfo = ClientInfo,
-        conn_state = CState
+        clientinfo = ClientInfo
     } = Channel
 ) ->
+    IsDeleteConn = is_delete_connection_request(Msg),
     #{clientid := ClientId} = ClientInfo,
     case emqx_coap_message:get_option(uri_query, Msg) of
         #{
@@ -395,37 +436,33 @@ check_token(
             <<"token">> := Token
         } ->
             call_session(handle_request, Msg, Channel);
-        #{<<"clientid">> := DesireId} ->
-            try_takeover(CState, DesireId, Msg, Channel);
-        _ ->
-            Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
-            {ok, {outgoing, Reply}, Channel}
-    end;
-check_token(false, Msg, Channel) ->
-    call_session(handle_request, Msg, Channel).
-
-try_takeover(idle, DesireId, Msg, Channel) ->
-    case emqx_coap_message:get_option(uri_path, Msg, []) of
-        [<<"mqtt">>, <<"connection">> | _] ->
-            %% may be is a connect request
-            %% TODO need check repeat connect, unless we implement the
-            %% udp connection baseon the clientid
-            call_session(handle_request, Msg, Channel);
-        _ ->
-            case emqx_conf:get([gateway, coap, ?AUTHN], undefined) of
+        #{<<"clientid">> := ReqClientId, <<"token">> := ReqToken} ->
+            case emqx_gateway_cm:call(coap, ReqClientId, {check_token, ReqToken}) of
+                undefined when IsDeleteConn ->
+                    Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
+                    {shutdown, normal, Reply, Channel};
                 undefined ->
-                    call_session(handle_request, Msg, Channel);
-                _ ->
-                    do_takeover(DesireId, Msg, Channel)
-            end
-    end;
-try_takeover(_, DesireId, Msg, Channel) ->
-    do_takeover(DesireId, Msg, Channel).
-
-do_takeover(_DesireId, Msg, Channel) ->
-    %% TODO completed the takeover, now only reset the message
-    Reset = emqx_coap_message:reset(Msg),
-    {ok, {outgoing, Reset}, Channel}.
+                    ?SLOG(info, #{
+                        msg => "remote_connection_not_found",
+                        clientid => ReqClientId,
+                        token => ReqToken
+                    }),
+                    Reply = emqx_coap_message:reset(Msg),
+                    {shutdown, normal, Reply, Channel};
+                false ->
+                    ?SLOG(info, #{
+                        msg => "request_token_invalid", clientid => ReqClientId, token => ReqToken
+                    }),
+                    Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
+                    {shutdown, normal, Reply, Channel};
+                true ->
+                    call_session(handle_request, Msg, Channel)
+            end;
+        _ ->
+            ErrMsg = <<"Missing token or clientid in connection mode">>,
+            Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
+            {shutdown, normal, Reply, Channel}
+    end.
 
 run_conn_hooks(
     Input,
@@ -612,6 +649,9 @@ ensure_disconnected(
     end,
     Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
 
+shutdown(Reason, Channel) ->
+    {shutdown, Reason, Channel}.
+
 shutdown_and_reply(Reason, Reply, Channel) ->
     {shutdown, Reason, Reply, Channel}.
 
@@ -759,6 +799,17 @@ process_connection(
         Channel
     );
 process_connection({close, Msg}, _, Channel, _) ->
+    Queries = emqx_coap_message:get_option(uri_query, Msg),
+    case maps:get(<<"clientid">>, Queries, undefined) of
+        undefined ->
+            ok;
+        ClientId ->
+            %% XXX: A cluster-level connection shutdown needs to be performed here.
+            %%
+            %% due to the possibility that the current close request may be
+            %% from a CoAP client from another IP + Port tuple
+            emqx_gateway_cm:cast(coap, ClientId, close)
+    end,
     Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
     NChannel = ensure_disconnected(normal, Channel),
     {shutdown, normal, Reply, NChannel}.

+ 1 - 1
apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src

@@ -1,6 +1,6 @@
 {application, emqx_gateway_coap, [
     {description, "CoAP Gateway"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 2 - 0
changes/ce/fix-10871.en.md

@@ -0,0 +1,2 @@
+Fixes for connection deletion and message publishing requests not taking effect
+issues once the connection has been created in a different UDP port first.