소스 검색

feat(coap): use content-sensitive udp proxy for coap

firest 1 년 전
부모
커밋
854754eb60

+ 1 - 1
apps/emqx/rebar.config

@@ -27,7 +27,7 @@
     {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
-    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
+    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
     {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.1"}}},

+ 46 - 6
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -57,7 +57,7 @@
 
 -record(state, {
     %% TCP/SSL/UDP/DTLS Wrapped Socket
-    socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
+    socket :: {esockd_transport, esockd:socket()} | {udp, _, _} | {esockd_udp_proxy, _, _},
     %% Peername of the connection
     peername :: emqx_types:peername(),
     %% Sockname of the connection
@@ -122,6 +122,9 @@ start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
 start_link(esockd_transport, Sock, Options) ->
     Socket = {esockd_transport, Sock},
     Args = [self(), Socket, undefined, Options] ++ callback_modules(Options),
+    {ok, proc_lib:spawn_link(?MODULE, init, Args)};
+start_link(Socket = {esockd_udp_proxy, _ProxyId, _Sock}, Peername, Options) ->
+    Args = [self(), Socket, Peername, Options] ++ callback_modules(Options),
     {ok, proc_lib:spawn_link(?MODULE, init, Args)}.
 
 callback_modules(Options) ->
@@ -196,10 +199,14 @@ esockd_peername({udp, _SockPid, _Sock}, Peername) ->
     Peername;
 esockd_peername({esockd_transport, Sock}, _Peername) ->
     {ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]),
+    Peername;
+esockd_peername({esockd_udp_proxy, _ProxyId, _Sock}, Peername) ->
     Peername.
 
 esockd_wait(Socket = {udp, _SockPid, _Sock}) ->
     {ok, Socket};
+esockd_wait(Socket = {esockd_udp_proxy, _ProxyId, _Sock}) ->
+    {ok, Socket};
 esockd_wait({esockd_transport, Sock}) ->
     case esockd_transport:wait(Sock) of
         {ok, NSock} -> {ok, {esockd_transport, NSock}};
@@ -211,29 +218,41 @@ esockd_close({udp, _SockPid, _Sock}) ->
     %%gen_udp:close(Sock);
     ok;
 esockd_close({esockd_transport, Sock}) ->
-    esockd_transport:fast_close(Sock).
+    esockd_transport:fast_close(Sock);
+esockd_close({esockd_udp_proxy, ProxyId, _Sock}) ->
+    esockd_udp_proxy:close(ProxyId).
 
 esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) ->
     nossl;
 esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) ->
     esockd_transport:ensure_ok_or_exit(Fun, [Sock]);
 esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) ->
-    esockd_transport:ensure_ok_or_exit(Fun, [Socket]).
+    esockd_transport:ensure_ok_or_exit(Fun, [Socket]);
+esockd_ensure_ok_or_exit(Fun, {esockd_udp_proxy, _ProxyId, Sock}) ->
+    esockd_transport:ensure_ok_or_exit(Fun, [Sock]).
 
 esockd_type({udp, _, _}) ->
     udp;
 esockd_type({esockd_transport, Socket}) ->
-    esockd_transport:type(Socket).
+    esockd_transport:type(Socket);
+esockd_type({esockd_udp_proxy, _ProxyId, Sock}) when is_port(Sock) ->
+    udp;
+esockd_type({esockd_udp_proxy, _ProxyId, _Sock}) ->
+    ssl.
 
 esockd_setopts({udp, _, _}, _) ->
     ok;
 esockd_setopts({esockd_transport, Socket}, Opts) ->
     %% FIXME: DTLS works??
+    esockd_transport:setopts(Socket, Opts);
+esockd_setopts({esockd_udp_proxy, _ProxyId, Socket}, Opts) ->
     esockd_transport:setopts(Socket, Opts).
 
 esockd_getstat({udp, _SockPid, Sock}, Stats) ->
     inet:getstat(Sock, Stats);
 esockd_getstat({esockd_transport, Sock}, Stats) ->
+    esockd_transport:getstat(Sock, Stats);
+esockd_getstat({esockd_udp_proxy, _ProxyId, Sock}, Stats) ->
     esockd_transport:getstat(Sock, Stats).
 
 esockd_send(Data, #state{
@@ -242,7 +261,9 @@ esockd_send(Data, #state{
 }) ->
     gen_udp:send(Sock, Ip, Port, Data);
 esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
-    esockd_transport:send(Sock, Data).
+    esockd_transport:send(Sock, Data);
+esockd_send(Data, #state{socket = {esockd_udp_proxy, ProxyId, _Sock}}) ->
+    esockd_udp_proxy:send(ProxyId, Data).
 
 keepalive_stats(recv) ->
     emqx_pd:get_counter(recv_pkt);
@@ -250,7 +271,8 @@ keepalive_stats(send) ->
     emqx_pd:get_counter(send_pkt).
 
 is_datadram_socket({esockd_transport, _}) -> false;
-is_datadram_socket({udp, _, _}) -> true.
+is_datadram_socket({udp, _, _}) -> true;
+is_datadram_socket({esockd_udp_proxy, _ProxyId, Sock}) -> erlang:is_port(Sock).
 
 %%--------------------------------------------------------------------
 %% callbacks
@@ -461,6 +483,21 @@ handle_msg({'$gen_cast', Req}, State) ->
     with_channel(handle_cast, [Req], State);
 handle_msg({datagram, _SockPid, Data}, State) ->
     parse_incoming(Data, State);
+handle_msg(
+    {{esockd_udp_proxy, _ProxyId, _Socket} = NSock, Data, Packets},
+    State = #state{
+        chann_mod = ChannMod,
+        channel = Channel
+    }
+) ->
+    ?SLOG(debug, #{msg => "RECV_data", data => Data}),
+    Oct = iolist_size(Data),
+    inc_counter(incoming_bytes, Oct),
+    Ctx = ChannMod:info(ctx, Channel),
+    ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct),
+
+    NState = State#state{socket = NSock},
+    {ok, next_incoming_msgs(Packets), NState};
 handle_msg({Inet, _Sock, Data}, State) when
     Inet == tcp;
     Inet == ssl
@@ -508,6 +545,9 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
 handle_msg({close, Reason}, State) ->
     ?tp(debug, force_socket_close, #{reason => Reason}),
     handle_info({sock_closed, Reason}, close_socket(State));
+handle_msg(udp_proxy_closed, State) ->
+    ?tp(debug, udp_proxy_closed, #{reason => normal}),
+    handle_info({sock_closed, normal}, close_socket(State));
 handle_msg(
     {event, connected},
     State = #state{

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway, [
     {description, "The Gateway management application"},
-    {vsn, "0.1.33"},
+    {vsn, "0.1.34"},
     {registered, []},
     {mod, {emqx_gateway_app, []}},
     {applications, [

+ 18 - 1
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -139,6 +139,16 @@ fields(websocket) ->
 fields(udp_listener) ->
     [
         %% some special configs for udp listener
+        {health_check,
+            sc(
+                ref(udp_health_check),
+                #{
+                    desc => ?DESC(
+                        udp_health_check
+                    ),
+                    required => false
+                }
+            )}
     ] ++
         udp_opts() ++
         common_listener_opts();
@@ -175,7 +185,12 @@ fields(dtls_opts) ->
             versions => dtls_all_available
         },
         _IsRanchListener = false
-    ).
+    );
+fields(udp_health_check) ->
+    [
+        {request, sc(binary(), #{desc => ?DESC(udp_health_check_request), required => false})},
+        {reply, sc(binary(), #{desc => ?DESC(udp_health_check_reply), required => false})}
+    ].
 
 desc(gateway) ->
     "EMQX Gateway configuration root.";
@@ -201,6 +216,8 @@ desc(dtls_opts) ->
     "Settings for DTLS protocol.";
 desc(websocket) ->
     "Websocket options";
+desc(udp_health_check) ->
+    "UDP health check";
 desc(_) ->
     undefined.
 

+ 8 - 2
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -151,7 +151,12 @@ find_sup_child(Sup, ChildId) ->
     {ok, [pid()]}
     | {error, term()}
 when
-    ModCfg :: #{frame_mod := atom(), chann_mod := atom(), connection_mod => atom()}.
+    ModCfg :: #{
+        frame_mod := atom(),
+        chann_mod := atom(),
+        connection_mod => atom(),
+        esockd_proxy_opts => map()
+    }.
 start_listeners(Listeners, GwName, Ctx, ModCfg) ->
     start_listeners(Listeners, GwName, Ctx, ModCfg, []).
 
@@ -519,7 +524,8 @@ esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) ->
             max_connections,
             max_conn_rate,
             proxy_protocol,
-            proxy_protocol_timeout
+            proxy_protocol_timeout,
+            health_check
         ],
         Opts0
     ),

+ 1 - 0
apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl

@@ -91,6 +91,7 @@ end_per_suite(Config) ->
 %%------------------------------------------------------------------------------
 
 t_case_coap(_) ->
+    emqx_coap_SUITE:restart_coap_with_connection_mode(false),
     Login = fun(URI, Checker) ->
         Action = fun(Channel) ->
             Req = emqx_coap_SUITE:make_req(post),

+ 2 - 44
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -410,19 +410,6 @@ is_create_connection_request(Msg = #coap_message{method = Method}) when
 is_create_connection_request(_Msg) ->
     false.
 
-is_delete_connection_request(Msg = #coap_message{method = Method}) when
-    is_atom(Method) andalso Method =/= undefined
-->
-    URIPath = emqx_coap_message:get_option(uri_path, Msg, []),
-    case URIPath of
-        [<<"mqtt">>, <<"connection">>] when Method == delete ->
-            true;
-        _ ->
-            false
-    end;
-is_delete_connection_request(_Msg) ->
-    false.
-
 check_token(
     Msg,
     #channel{
@@ -430,7 +417,6 @@ check_token(
         clientinfo = ClientInfo
     } = Channel
 ) ->
-    IsDeleteConn = is_delete_connection_request(Msg),
     #{clientid := ClientId} = ClientInfo,
     case emqx_coap_message:extract_uri_query(Msg) of
         #{
@@ -438,39 +424,10 @@ check_token(
             <<"token">> := Token
         } ->
             call_session(handle_request, Msg, Channel);
-        #{<<"clientid">> := ReqClientId, <<"token">> := ReqToken} ->
-            case emqx_gateway_cm:call(coap, ReqClientId, {check_token, ReqToken}) of
-                undefined when IsDeleteConn ->
-                    Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
-                    {shutdown, normal, Reply, Channel};
-                undefined ->
-                    ?SLOG(info, #{
-                        msg => "remote_connection_not_found",
-                        clientid => ReqClientId,
-                        token => ReqToken
-                    }),
-                    Reply = emqx_coap_message:reset(Msg),
-                    {shutdown, normal, Reply, Channel};
-                false ->
-                    ?SLOG(info, #{
-                        msg => "request_token_invalid", clientid => ReqClientId, token => ReqToken
-                    }),
-                    Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
-                    {shutdown, normal, Reply, Channel};
-                true ->
-                    %% hack: since each message request can spawn a new connection
-                    %% process, we can't rely on the `inc_incoming_stats' call in
-                    %% `emqx_gateway_conn:handle_incoming' to properly keep track of
-                    %% bumping incoming requests for an existing channel.  Since this
-                    %% number is used by keepalive, we have to bump it inside the
-                    %% requested channel/connection pid so heartbeats actually work.
-                    emqx_gateway_cm:cast(coap, ReqClientId, inc_recv_pkt),
-                    call_session(handle_request, Msg, Channel)
-            end;
         _ ->
             ErrMsg = <<"Missing token or clientid in connection mode">>,
             Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
-            {shutdown, normal, Reply, Channel}
+            {ok, {outgoing, Reply}, Channel}
     end.
 
 run_conn_hooks(
@@ -785,6 +742,7 @@ process_connection(
 ) when
     ConnState == connected
 ->
+    %% TODO should take over the session here
     Queries = emqx_coap_message:extract_uri_query(Req),
     ErrMsg0 =
         case Queries of

+ 59 - 0
apps/emqx_gateway_coap/src/emqx_coap_proxy_conn.erl

@@ -0,0 +1,59 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_coap_proxy_conn).
+
+-behaviour(esockd_udp_proxy_connection).
+
+-include("emqx_coap.hrl").
+
+-export([initialize/1, create/3, get_connection_id/4, dispatch/3, close/2]).
+
+%%--------------------------------------------------------------------
+%% Callbacks
+%%--------------------------------------------------------------------
+initialize(_Opts) ->
+    emqx_coap_frame:initial_parse_state(#{}).
+
+create(Transport, Peer, Opts) ->
+    emqx_gateway_conn:start_link(Transport, Peer, Opts).
+
+get_connection_id(_Transport, _Peer, State, Data) ->
+    case parse_incoming(Data, [], State) of
+        {[Msg | _] = Packets, NState} ->
+            case emqx_coap_message:extract_uri_query(Msg) of
+                #{
+                    <<"clientid">> := ClientId
+                } ->
+                    {ok, ClientId, Packets, NState};
+                _ ->
+                    invalid
+            end;
+        _Error ->
+            invalid
+    end.
+
+dispatch(Pid, _State, Packet) ->
+    erlang:send(Pid, Packet).
+
+close(Pid, _State) ->
+    erlang:send(Pid, udp_proxy_closed).
+
+parse_incoming(<<>>, Packets, State) ->
+    {Packets, State};
+parse_incoming(Data, Packets, State) ->
+    {ok, Packet, Rest, NParseState} = emqx_coap_frame:parse(Data, State),
+    parse_incoming(Rest, [Packet | Packets], NParseState).

+ 14 - 3
apps/emqx_gateway_coap/src/emqx_gateway_coap.erl

@@ -20,7 +20,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_gateway/include/emqx_gateway.hrl").
 
-%% define a gateway named stomp
+%% define a gateway named coap
 -gateway(#{
     name => coap,
     callback_module => ?MODULE,
@@ -58,10 +58,11 @@ on_gateway_load(
     Ctx
 ) ->
     Listeners = normalize_config(Config),
-    ModCfg = #{
+    ModCfg = maps:merge(connection_opts(Config), #{
         frame_mod => emqx_coap_frame,
         chann_mod => emqx_coap_channel
-    },
+    }),
+
     case
         start_listeners(
             Listeners, GwName, Ctx, ModCfg
@@ -105,3 +106,13 @@ on_gateway_unload(
 ) ->
     Listeners = normalize_config(Config),
     stop_listeners(GwName, Listeners).
+
+connection_opts(#{connection_required := false}) ->
+    #{};
+connection_opts(_) ->
+    #{
+        connection_mod => esockd_udp_proxy,
+        esockd_proxy_opts => #{
+            connection_mod => emqx_coap_proxy_conn
+        }
+    }.

+ 20 - 11
apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl

@@ -330,7 +330,8 @@ t_publish(_) ->
                 ?assertEqual(Payload, Msg#message.payload)
         after 500 ->
             ?assert(false)
-        end
+        end,
+        true
     end,
     with_connection(Topics, Action).
 
@@ -360,7 +361,9 @@ t_publish_with_retain_qos_expiry(_) ->
                 ?assertEqual(Payload, Msg#message.payload)
         after 500 ->
             ?assert(false)
-        end
+        end,
+
+        true
     end,
     with_connection(Topics, Action),
 
@@ -392,7 +395,8 @@ t_subscribe(_) ->
 
         #coap_content{payload = PayloadRecv} = Notify,
 
-        ?assertEqual(Payload, PayloadRecv)
+        ?assertEqual(Payload, PayloadRecv),
+        true
     end,
 
     with_connection(Topics, Fun),
@@ -431,7 +435,8 @@ t_subscribe_with_qos_opt(_) ->
 
         #coap_content{payload = PayloadRecv} = Notify,
 
-        ?assertEqual(Payload, PayloadRecv)
+        ?assertEqual(Payload, PayloadRecv),
+        true
     end,
 
     with_connection(Topics, Fun),
@@ -468,7 +473,8 @@ t_un_subscribe(_) ->
         {ok, nocontent, _} = do_request(Channel, URI, UnReq),
         ?LOGT("un observer topic:~ts~n", [Topic]),
         timer:sleep(100),
-        ?assertEqual([], emqx:subscribers(Topic))
+        ?assertEqual([], emqx:subscribers(Topic)),
+        true
     end,
 
     with_connection(Topics, Fun).
@@ -497,7 +503,8 @@ t_observe_wildcard(_) ->
 
         #coap_content{payload = PayloadRecv} = Notify,
 
-        ?assertEqual(Payload, PayloadRecv)
+        ?assertEqual(Payload, PayloadRecv),
+        true
     end,
 
     with_connection(Fun).
@@ -530,7 +537,8 @@ t_clients_api(_) ->
         {204, _} =
             request(delete, "/gateways/coap/clients/client1"),
         timer:sleep(200),
-        {200, #{data := []}} = request(get, "/gateways/coap/clients")
+        {200, #{data := []}} = request(get, "/gateways/coap/clients"),
+        false
     end,
     with_connection(Fun).
 
@@ -560,7 +568,8 @@ t_clients_subscription_api(_) ->
 
         {204, _} = request(delete, Path ++ "/tx"),
 
-        {200, []} = request(get, Path)
+        {200, []} = request(get, Path),
+        true
     end,
     with_connection(Fun).
 
@@ -578,7 +587,8 @@ t_clients_get_subscription_api(_) ->
 
         observe(Channel, Token, false),
 
-        {200, []} = request(get, Path)
+        {200, []} = request(get, Path),
+        true
     end,
     with_connection(Fun).
 
@@ -773,8 +783,7 @@ with_connection(Action) ->
     Fun = fun(Channel) ->
         Token = connection(Channel),
         timer:sleep(100),
-        Action(Channel, Token),
-        disconnection(Channel, Token),
+        _ = Action(Channel, Token) andalso disconnection(Channel, Token),
         timer:sleep(100)
     end,
     do(Fun).

+ 1 - 1
mix.exs

@@ -175,7 +175,7 @@ defmodule EMQXUmbrella.MixProject do
   end
 
   def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}
-  def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}
+  def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.3", override: true}
   def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}
   def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.1", override: true}
   def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true}

+ 1 - 1
rebar.config

@@ -81,7 +81,7 @@
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
-    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
+    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
     {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},

+ 9 - 0
rel/i18n/emqx_gateway_schema.hocon

@@ -195,4 +195,13 @@ Relevant when the EMQX cluster is deployed behind a load-balancer."""
 fields_ws_opts_proxy_address_header.label:
 """Proxy address header"""
 
+udp_health_check.desc:
+"""Some Cloud platform use a `request-reply` mechanism to check whether a UDP port is healthy, here can configure this pair."""
+
+udp_health_check_request.desc:
+"""The content of the request."""
+
+udp_health_check_reply.desc:
+"""The content to reply."""
+
 }