Просмотр исходного кода

feat(emqx_coap): add emqx_coap_api

1. add a request api for emqx_coap
2. fix some emqx_coap logic error
lafirest 4 лет назад
Родитель
Сommit
89f48f89eb

+ 0 - 1
apps/emqx_gateway/etc/emqx_gateway.conf

@@ -59,7 +59,6 @@ gateway.coap {
   ## When publishing or subscribing, prefix all topics with a mountpoint string.
   ## When publishing or subscribing, prefix all topics with a mountpoint string.
   mountpoint = ""
   mountpoint = ""
 
 
-  heartbeat = 30s
   notify_type = qos
   notify_type = qos
 
 
   ## if true, you need to establish a connection before use
   ## if true, you need to establish a connection before use

+ 145 - 0
apps/emqx_gateway/src/coap/emqx_coap_api.erl

@@ -0,0 +1,145 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_coap_api).
+
+-behaviour(minirest_api).
+
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
+
+%% API
+-export([api_spec/0]).
+
+-export([request/2]).
+
+-define(PREFIX, "/gateway/coap/:clientid").
+-define(DEF_WAIT_TIME, 10).
+
+-import(emqx_mgmt_util, [ schema/1
+                        , schema/2
+                        , object_schema/1
+                        , object_schema/2
+                        , error_schema/2
+                        , properties/1]).
+
+%%--------------------------------------------------------------------
+%%  API
+%%--------------------------------------------------------------------
+api_spec() ->
+    {[request_api()], []}.
+
+request_api() ->
+    Metadata = #{post => request_method_meta()},
+    {?PREFIX ++ "/request", Metadata, request}.
+
+request(post, #{body := Body, bindings := Bindings}) ->
+    ClientId = maps:get(clientid, Bindings, undefined),
+
+    Method = maps:get(<<"method">>, Body, <<"get">>),
+    CT = maps:get(<<"content_type">>, Body, <<"text/plain">>),
+    Token = maps:get(<<"token">>, Body, <<>>),
+    Payload = maps:get(<<"payload">>, Body, <<>>),
+    WaitTime =  maps:get(<<"timeout">>, Body, ?DEF_WAIT_TIME),
+
+    Payload2 = parse_payload(CT, Payload),
+    ReqType = erlang:binary_to_atom(Method),
+
+    Msg = emqx_coap_message:request(con,
+                                    ReqType, Payload2, #{content_format => CT}),
+
+    Msg2 = Msg#coap_message{token = Token},
+
+    case call_client(ClientId, Msg2, timer:seconds(WaitTime)) of
+        timeout ->
+            {504};
+        not_found ->
+            {404};
+        Response ->
+            {200, format_to_response(CT, Response)}
+    end.
+
+%%--------------------------------------------------------------------
+%%  Internal functions
+%%--------------------------------------------------------------------
+request_parameters() ->
+    [#{name => clientid,
+       in => path,
+       schema => #{type => string},
+       required => true}].
+
+request_properties() ->
+    properties([ {token, string, "message token, can be empty"}
+               , {method, string, "request method type", ["get", "put", "post", "delete"]}
+               , {timeout, integer, "timespan for response"}
+               , {content_type, string, "payload type",
+                  [<<"text/plain">>, <<"application/json">>, <<"application/octet-stream">>]}
+               , {payload, string, "payload"}]).
+
+coap_message_properties() ->
+    properties([ {id, integer, "message id"}
+               , {token, string, "message token, can be empty"}
+               , {method, string, "response code"}
+               , {payload, string, "payload"}]).
+
+request_method_meta() ->
+    #{description => <<"lookup matching messages">>,
+      parameters => request_parameters(),
+      'requestBody' =>  object_schema(request_properties(),
+                                      <<"request payload, binary must encode by base64">>),
+      responses => #{
+                     <<"200">> => object_schema(coap_message_properties()),
+                     <<"404">> => schema(<<"NotFound">>),
+                     <<"504">> => schema(<<"Timeout">>)
+                    }}.
+
+
+format_to_response(ContentType, #coap_message{id = Id,
+                                              token = Token,
+                                              method = Method,
+                                              payload = Payload}) ->
+    #{id => Id,
+      token => Token,
+      method => format_to_binary(Method),
+      payload => format_payload(ContentType, Payload)}.
+
+format_to_binary(Obj) ->
+    erlang:list_to_binary(io_lib:format("~p", [Obj])).
+
+format_payload(<<"application/octet-stream">>, Payload) ->
+    base64:encode(Payload);
+
+format_payload(_, Payload) ->
+    Payload.
+
+parse_payload(<<"application/octet-stream">>, Body) ->
+    base64:decode(Body);
+
+parse_payload(_, Body) ->
+    Body.
+
+call_client(ClientId, Msg, Timeout) ->
+    case emqx_gateway_cm_registry:lookup_channels(coap, ClientId) of
+        [Channel | _] ->
+            RequestId = emqx_coap_channel:send_request(Channel, Msg),
+            case gen_server:wait_response(RequestId, Timeout) of
+             {reply, Reply} ->
+                 Reply;
+             _ ->
+                 timeout
+            end;
+        _ ->
+            not_found
+    end.

+ 51 - 18
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -25,7 +25,10 @@
 -export([ info/1
 -export([ info/1
         , info/2
         , info/2
         , stats/1
         , stats/1
-        , validator/4]).
+        , validator/4
+        , metrics_inc/2
+        , run_hooks/3
+        , send_request/2]).
 
 
 -export([ init/2
 -export([ init/2
         , handle_in/2
         , handle_in/2
@@ -57,7 +60,7 @@
 
 
                   connection_required :: boolean(),
                   connection_required :: boolean(),
 
 
-                  conn_state :: idle | connected,
+                  conn_state :: idle | connected | disconnected,
 
 
                   token :: binary() | undefined
                   token :: binary() | undefined
                  }).
                  }).
@@ -99,7 +102,7 @@ init(ConnInfo = #{peername := {PeerHost, _},
                   sockname := {_, SockPort}},
                   sockname := {_, SockPort}},
      #{ctx := Ctx} = Config) ->
      #{ctx := Ctx} = Config) ->
     Peercert = maps:get(peercert, ConnInfo, undefined),
     Peercert = maps:get(peercert, ConnInfo, undefined),
-    Mountpoint = maps:get(mountpoint, Config, undefined),
+    Mountpoint = maps:get(mountpoint, Config, <<>>),
     ClientInfo = set_peercert_infos(
     ClientInfo = set_peercert_infos(
                    Peercert,
                    Peercert,
                    #{ zone => default
                    #{ zone => default
@@ -128,6 +131,10 @@ init(ConnInfo = #{peername := {PeerHost, _},
 validator(Type, Topic, Ctx, ClientInfo) ->
 validator(Type, Topic, Ctx, ClientInfo) ->
     emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic).
     emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic).
 
 
+-spec send_request(pid(), emqx_coap_message()) -> any().
+send_request(Channel, Request) ->
+    gen_server:send_request(Channel, {?FUNCTION_NAME, Request}).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle incoming packet
 %% Handle incoming packet
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -143,8 +150,9 @@ handle_in(Msg, ChannleT) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle Delivers from broker to client
 %% Handle Delivers from broker to client
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-handle_deliver(Delivers, Channel) ->
-    call_session(deliver, Delivers, Channel).
+handle_deliver(Delivers, #channel{session = Session,
+                                  ctx = Ctx} = Channel) ->
+    handle_result(emqx_coap_session:deliver(Delivers, Ctx, Session), Channel).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle timeout
 %% Handle timeout
@@ -155,7 +163,7 @@ handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel
             Channel2 = ensure_keepalive_timer(fun make_timer/4, Channel),
             Channel2 = ensure_keepalive_timer(fun make_timer/4, Channel),
             {ok, Channel2#channel{keepalive = NewKeepAlive}};
             {ok, Channel2#channel{keepalive = NewKeepAlive}};
         {error, timeout} ->
         {error, timeout} ->
-            {shutdown, timeout, Channel}
+            {shutdown, timeout, ensure_disconnected(keepalive_timeout, Channel)}
     end;
     end;
 
 
 handle_timeout(_, {transport, Msg}, Channel) ->
 handle_timeout(_, {transport, Msg}, Channel) ->
@@ -170,6 +178,10 @@ handle_timeout(_, _, Channel) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle call
 %% Handle call
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
+handle_call({send_request, Msg}, From, Channel) ->
+    Result = call_session(handle_out, {{send_request, From}, Msg}, Channel),
+    erlang:setelement(1, Result, noreply);
+
 handle_call(Req, _From, Channel) ->
 handle_call(Req, _From, Channel) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
     ?LOG(error, "Unexpected call: ~p", [Req]),
     {reply, ignored, Channel}.
     {reply, ignored, Channel}.
@@ -184,6 +196,9 @@ handle_cast(Req, Channel) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle Info
 %% Handle Info
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
+handle_info({subscribe, _}, Channel) ->
+    {ok, Channel};
+
 handle_info(Info, Channel) ->
 handle_info(Info, Channel) ->
     ?LOG(error, "Unexpected info: ~p", [Info]),
     ?LOG(error, "Unexpected info: ~p", [Info]),
     {ok, Channel}.
     {ok, Channel}.
@@ -191,8 +206,10 @@ handle_info(Info, Channel) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Terminate
 %% Terminate
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-terminate(_Reason, _Channel) ->
-    ok.
+terminate(Reason, #channel{clientinfo = ClientInfo,
+                           ctx = Ctx,
+                           session = Session}) ->
+    run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
@@ -242,17 +259,17 @@ check_token(true,
             try_takeover(CState, DesireId, Msg, Channel);
             try_takeover(CState, DesireId, Msg, Channel);
         _ ->
         _ ->
             Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
             Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
-            {ok, {outgoing, Reply}, Msg}
+            {ok, {outgoing, Reply}, Channel}
     end;
     end;
 
 
 check_token(false, Msg, Channel) ->
 check_token(false, Msg, Channel) ->
     case emqx_coap_message:get_option(uri_query, Msg) of
     case emqx_coap_message:get_option(uri_query, Msg) of
         #{<<"clientid">> := _} ->
         #{<<"clientid">> := _} ->
             Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
             Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
-            {ok, {outgoing, Reply}, Msg};
+            {ok, {outgoing, Reply}, Channel};
         #{<<"token">> := _} ->
         #{<<"token">> := _} ->
             Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
             Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
-            {ok, {outgoing, Reply}, Msg};
+            {ok, {outgoing, Reply}, Channel};
         _ ->
         _ ->
             call_session(handle_request, Msg, Channel)
             call_session(handle_request, Msg, Channel)
     end.
     end.
@@ -322,11 +339,9 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx,
             {error, Reason}
             {error, Reason}
     end.
     end.
 
 
-fix_mountpoint(_Packet, #{mountpoint := undefined} = ClientInfo) ->
+fix_mountpoint(_Packet, #{mountpoint := <<>>} = ClientInfo) ->
     {ok, ClientInfo};
     {ok, ClientInfo};
 fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
 fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
-    %% TODO: Enrich the varibale replacement????
-    %%       i.e: ${ClientInfo.auth_result.productKey}
     Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
     Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
     {ok, ClientInfo#{mountpoint := Mountpoint1}}.
     {ok, ClientInfo#{mountpoint := Mountpoint1}}.
 
 
@@ -338,6 +353,7 @@ ensure_connected(Channel = #channel{ctx = Ctx,
                          , proto_ver => <<"1">>
                          , proto_ver => <<"1">>
                          },
                          },
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
+    _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]),
     Channel#channel{conninfo = NConnInfo}.
     Channel#channel{conninfo = NConnInfo}.
 
 
 process_connect(#channel{ctx = Ctx,
 process_connect(#channel{ctx = Ctx,
@@ -374,19 +390,32 @@ run_hooks(Ctx, Name, Args, Acc) ->
     emqx_gateway_ctx:metrics_inc(Ctx, Name),
     emqx_gateway_ctx:metrics_inc(Ctx, Name),
     emqx_hooks:run_fold(Name, Args, Acc).
     emqx_hooks:run_fold(Name, Args, Acc).
 
 
+metrics_inc(Name, Ctx) ->
+    emqx_gateway_ctx:metrics_inc(Ctx, Name).
+
+ensure_disconnected(Reason, Channel = #channel{
+                                         ctx = Ctx,
+                                         conninfo = ConnInfo,
+                                         clientinfo = ClientInfo}) ->
+    NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
+    ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]),
+    Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Call Chain
 %% Call Chain
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-call_session(Fun,
-             Msg,
-             #channel{session = Session} = Channel) ->
+call_session(Fun, Msg, #channel{session = Session} = Channel) ->
+    Result = emqx_coap_session:Fun(Msg, Session),
+    handle_result(Result, Channel).
+
+handle_result(Result, Channel) ->
     iter([ session, fun process_session/4
     iter([ session, fun process_session/4
          , proto, fun process_protocol/4
          , proto, fun process_protocol/4
          , reply, fun process_reply/4
          , reply, fun process_reply/4
          , out, fun process_out/4
          , out, fun process_out/4
          , fun process_nothing/3
          , fun process_nothing/3
          ],
          ],
-         emqx_coap_session:Fun(Msg, Session),
+         Result,
          Channel).
          Channel).
 
 
 call_handler(request, Msg, Result,
 call_handler(request, Msg, Result,
@@ -406,6 +435,10 @@ call_handler(request, Msg, Result,
          maps:merge(Result, HandlerResult),
          maps:merge(Result, HandlerResult),
          Channel);
          Channel);
 
 
+call_handler(response, {{send_request, From}, Response}, Result, Channel, Iter) ->
+    gen_server:reply(From, Response),
+    iter(Iter, Result, Channel);
+
 call_handler(_, _, Result, Channel, Iter) ->
 call_handler(_, _, Result, Channel, Iter) ->
     iter(Iter, Result, Channel).
     iter(Iter, Result, Channel).
 
 

+ 7 - 3
apps/emqx_gateway/src/coap/emqx_coap_session.erl

@@ -33,7 +33,7 @@
         , handle_response/2
         , handle_response/2
         , handle_out/2
         , handle_out/2
         , set_reply/2
         , set_reply/2
-        , deliver/2
+        , deliver/3
         , timeout/2]).
         , timeout/2]).
 
 
 -export_type([session/0]).
 -export_type([session/0]).
@@ -66,6 +66,7 @@
                     ]).
                     ]).
 
 
 -import(emqx_coap_medium, [iter/3]).
 -import(emqx_coap_medium, [iter/3]).
+-import(emqx_coap_channel, [metrics_inc/2]).
 
 
 %%%-------------------------------------------------------------------
 %%%-------------------------------------------------------------------
 %%% API
 %%% API
@@ -147,13 +148,16 @@ set_reply(Msg, #session{transport_manager = TM} = Session) ->
     TM2 = emqx_coap_tm:set_reply(Msg, TM),
     TM2 = emqx_coap_tm:set_reply(Msg, TM),
     Session#session{transport_manager = TM2}.
     Session#session{transport_manager = TM2}.
 
 
-deliver(Delivers, #session{observe_manager = OM,
-                           transport_manager = TM} = Session) ->
+deliver(Delivers, Ctx, #session{observe_manager = OM,
+                                transport_manager = TM} = Session) ->
     Fun = fun({_, Topic, Message}, {OutAcc, OMAcc, TMAcc} = Acc) ->
     Fun = fun({_, Topic, Message}, {OutAcc, OMAcc, TMAcc} = Acc) ->
                   case emqx_coap_observe_res:res_changed(Topic, OMAcc) of
                   case emqx_coap_observe_res:res_changed(Topic, OMAcc) of
                       undefined ->
                       undefined ->
+                          metrics_inc('delivery.dropped', Ctx),
+                          metrics_inc('delivery.dropped.no_subid', Ctx),
                           Acc;
                           Acc;
                       {Token, SeqId, OM2} ->
                       {Token, SeqId, OM2} ->
+                          metrics_inc('messages.delivered', Ctx),
                           Msg = mqtt_to_coap(Message, Token, SeqId),
                           Msg = mqtt_to_coap(Message, Token, SeqId),
                           #{out := Out, tm := TM2} = emqx_coap_tm:handle_out(Msg, TMAcc),
                           #{out := Out, tm := TM2} = emqx_coap_tm:handle_out(Msg, TMAcc),
                           {Out ++ OutAcc, OM2, TM2}
                           {Out ++ OutAcc, OM2, TM2}

+ 8 - 4
apps/emqx_gateway/src/coap/emqx_coap_tm.erl

@@ -108,6 +108,9 @@ handle_response(#coap_message{type = Type, id = MsgId, token = Token} = Msg, TM)
     end.
     end.
 
 
 %% send to a client, msg can be request/piggyback/separate/notify
 %% send to a client, msg can be request/piggyback/separate/notify
+handle_out({Ctx, Msg}, TM) ->
+    handle_out(Msg, Ctx, TM);
+
 handle_out(Msg, TM) ->
 handle_out(Msg, TM) ->
     handle_out(Msg, undefined, TM).
     handle_out(Msg, undefined, TM).
 
 
@@ -119,8 +122,8 @@ handle_out(#coap_message{token = Token} = MsgT, Ctx, TM) ->
     %% TODO why find by token ?
     %% TODO why find by token ?
     case find_machine_by_keys([Id, TokenId], TM2) of
     case find_machine_by_keys([Id, TokenId], TM2) of
         undefined ->
         undefined ->
-            {Machine, TM3} = new_out_machine(Id, Msg, TM),
-            process_event(out, {Ctx, Msg}, TM3, Machine);
+            {Machine, TM3} = new_out_machine(Id, Ctx, Msg, TM2),
+            process_event(out, Msg, TM3, Machine);
         _ ->
         _ ->
             %% ignore repeat send
             %% ignore repeat send
             empty()
             empty()
@@ -293,9 +296,10 @@ new_in_machine(MachineId, #{seq_id := SeqId} = Manager) ->
                        SeqId => Machine,
                        SeqId => Machine,
                        MachineId => SeqId}}.
                        MachineId => SeqId}}.
 
 
--spec new_out_machine(state_machine_key(), emqx_coap_message(), manager()) ->
+-spec new_out_machine(state_machine_key(), any(), emqx_coap_message(), manager()) ->
           {state_machine(), manager()}.
           {state_machine(), manager()}.
 new_out_machine(MachineId,
 new_out_machine(MachineId,
+                Ctx,
                 #coap_message{type = Type, token = Token, options = Opts},
                 #coap_message{type = Type, token = Token, options = Opts},
                 #{seq_id := SeqId} = Manager) ->
                 #{seq_id := SeqId} = Manager) ->
     Observe = maps:get(observe, Opts, undefined),
     Observe = maps:get(observe, Opts, undefined),
@@ -305,7 +309,7 @@ new_out_machine(MachineId,
                             , observe = Observe
                             , observe = Observe
                             , state = idle
                             , state = idle
                             , timers = #{}
                             , timers = #{}
-                            , transport = emqx_coap_transport:new()},
+                            , transport = emqx_coap_transport:new(Ctx)},
 
 
     Manager2 = Manager#{seq_id := SeqId + 1,
     Manager2 = Manager#{seq_id := SeqId + 1,
                         SeqId => Machine,
                         SeqId => Machine,

+ 6 - 5
apps/emqx_gateway/src/coap/emqx_coap_transport.erl

@@ -20,7 +20,7 @@
 
 
 -type transport() :: #transport{}.
 -type transport() :: #transport{}.
 
 
--export([ new/0, idle/3, maybe_reset/3, set_cache/2
+-export([ new/0, new/1, idle/3, maybe_reset/3, set_cache/2
         , maybe_resend_4request/3, wait_ack/3, until_stop/3
         , maybe_resend_4request/3, wait_ack/3, until_stop/3
         , observe/3, maybe_resend_4response/3]).
         , observe/3, maybe_resend_4response/3]).
 
 
@@ -33,9 +33,13 @@
 
 
 -spec new() -> transport().
 -spec new() -> transport().
 new() ->
 new() ->
+    new(undefined).
+
+new(ReqCtx) ->
     #transport{cache = undefined,
     #transport{cache = undefined,
                retry_interval = 0,
                retry_interval = 0,
-               retry_count = 0}.
+               retry_count = 0,
+               req_context = ReqCtx}.
 
 
 idle(in,
 idle(in,
      #coap_message{type = non, method = Method} = Msg,
      #coap_message{type = non, method = Method} = Msg,
@@ -62,9 +66,6 @@ idle(in,
                         timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]})
                         timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]})
     end;
     end;
 
 
-idle(out, {Ctx, Msg}, Transport) ->
-    idle(out, Msg, Transport#transport{req_context = Ctx});
-
 idle(out, #coap_message{type = non} = Msg, _) ->
 idle(out, #coap_message{type = non} = Msg, _) ->
     out(Msg, #{next => maybe_reset,
     out(Msg, #{next => maybe_reset,
                timeouts => [{stop_timeout, ?NON_LIFETIME}]});
                timeouts => [{stop_timeout, ?NON_LIFETIME}]});

+ 18 - 10
apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl

@@ -24,11 +24,14 @@
 
 
 -import(emqx_coap_message, [response/2, response/3]).
 -import(emqx_coap_message, [response/2, response/3]).
 -import(emqx_coap_medium, [reply/2, reply/3]).
 -import(emqx_coap_medium, [reply/2, reply/3]).
+-import(emqx_coap_channel, [run_hooks/3]).
 
 
 -define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}).
 -define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}).
 -define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}).
 -define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}).
 -define(SUBOPTS, #{qos => 0, rh => 0, rap => 0, nl => 0, is_new => false}).
 -define(SUBOPTS, #{qos => 0, rh => 0, rap => 0, nl => 0, is_new => false}).
 
 
+%% TODO maybe can merge this code into emqx_coap_session, simplify the call chain
+
 handle_request(Path, #coap_message{method = Method} = Msg, Ctx, CInfo) ->
 handle_request(Path, #coap_message{method = Method} = Msg, Ctx, CInfo) ->
     case check_topic(Path) of
     case check_topic(Path) of
         {ok, Topic} ->
         {ok, Topic} ->
@@ -42,7 +45,7 @@ handle_method(get, Topic, Msg, Ctx, CInfo) ->
         0 ->
         0 ->
             subscribe(Msg, Topic, Ctx, CInfo);
             subscribe(Msg, Topic, Ctx, CInfo);
         1 ->
         1 ->
-            unsubscribe(Msg, Topic, CInfo);
+            unsubscribe(Msg, Topic, Ctx, CInfo);
         _ ->
         _ ->
             reply({error, bad_request}, <<"invalid observe value">>, Msg)
             reply({error, bad_request}, <<"invalid observe value">>, Msg)
     end;
     end;
@@ -51,8 +54,9 @@ handle_method(post, Topic, #coap_message{payload = Payload} = Msg, Ctx, CInfo) -
     case emqx_coap_channel:validator(publish, Topic, Ctx, CInfo) of
     case emqx_coap_channel:validator(publish, Topic, Ctx, CInfo) of
         allow ->
         allow ->
             #{clientid := ClientId} = CInfo,
             #{clientid := ClientId} = CInfo,
+            MountTopic = mount(CInfo, Topic),
             QOS = get_publish_qos(Msg),
             QOS = get_publish_qos(Msg),
-            MQTTMsg = emqx_message:make(ClientId, QOS, Topic, Payload),
+            MQTTMsg = emqx_message:make(ClientId, QOS, MountTopic, Payload),
             MQTTMsg2 = apply_publish_opts(Msg, MQTTMsg),
             MQTTMsg2 = apply_publish_opts(Msg, MQTTMsg),
             _ = emqx_broker:publish(MQTTMsg2),
             _ = emqx_broker:publish(MQTTMsg2),
             reply({ok, changed}, Msg);
             reply({ok, changed}, Msg);
@@ -139,15 +143,19 @@ subscribe(#coap_message{token = Token} = Msg, Topic, Ctx, CInfo) ->
         allow ->
         allow ->
             #{clientid := ClientId} = CInfo,
             #{clientid := ClientId} = CInfo,
             SubOpts = get_sub_opts(Msg),
             SubOpts = get_sub_opts(Msg),
-            emqx_broker:subscribe(Topic, ClientId, SubOpts),
-            emqx_hooks:run('session.subscribed',
-                           [CInfo, Topic, SubOpts]),
-            ?SUB(Topic, Token, Msg);
+            MountTopic = mount(CInfo, Topic),
+            emqx_broker:subscribe(MountTopic, ClientId, SubOpts),
+            run_hooks(Ctx, 'session.subscribed', [CInfo, Topic, SubOpts]),
+            ?SUB(MountTopic, Token, Msg);
         _ ->
         _ ->
             reply({error, unauthorized}, Msg)
             reply({error, unauthorized}, Msg)
     end.
     end.
 
 
-unsubscribe(Msg, Topic, CInfo) ->
-    emqx_broker:unsubscribe(Topic),
-    emqx_hooks:run('session.unsubscribed', [CInfo, Topic, ?SUBOPTS]),
-    ?UNSUB(Topic, Msg).
+unsubscribe(Msg, Topic, Ctx, CInfo) ->
+    MountTopic = mount(CInfo, Topic),
+    emqx_broker:unsubscribe(MountTopic),
+    run_hooks(Ctx, 'session.unsubscribed', [CInfo, Topic, ?SUBOPTS]),
+    ?UNSUB(MountTopic, Msg).
+
+mount(#{mountpoint := Mountpoint}, Topic) ->
+    <<Mountpoint/binary, Topic/binary>>.

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_app.erl

@@ -83,4 +83,4 @@ load_gateway_by_default([{Type, Confs}|More]) ->
     load_gateway_by_default(More).
     load_gateway_by_default(More).
 
 
 confs() ->
 confs() ->
-    maps:to_list(emqx:get_config([gateway], [])).
+    maps:to_list(emqx:get_config([gateway], #{})).

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_metrics.erl

@@ -18,7 +18,7 @@
 
 
 -behaviour(gen_server).
 -behaviour(gen_server).
 
 
--include("include/emqx_gateway.hrl").
+-include_lib("emqx_gateway/include/emqx_gateway.hrl").
 
 
 
 
 %% APIs
 %% APIs

+ 224 - 0
apps/emqx_gateway/test/emqx_coap_api_SUITE.erl

@@ -0,0 +1,224 @@
+%%--------------------------------------------------------------------
+%% Copyright (C) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_coap_api_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(CONF_DEFAULT, <<"
+gateway.coap {
+              idle_timeout = 30s
+              enable_stats = false
+              mountpoint = \"\"
+              notify_type = qos
+              connection_required = true
+              subscribe_qos = qos1
+              publish_qos = qos1
+              authentication = undefined
+
+              listeners.udp.default {
+                                     bind = 5683
+                                    }
+             }
+                        ">>).
+
+-define(HOST, "127.0.0.1").
+-define(PORT, 5683).
+-define(CONN_URI, "coap://127.0.0.1/mqtt/connection?clientid=client1&username=admin&password=public").
+
+-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
+
+%%--------------------------------------------------------------------
+%% Setups
+%%--------------------------------------------------------------------
+
+all() ->
+    emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
+    Config.
+
+end_per_suite(Config) ->
+    emqx_mgmt_api_test_util:end_suite([emqx_gateway]),
+    Config.
+
+set_special_configs(emqx_gatewway) ->
+    ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT);
+
+set_special_configs(_) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Cases
+%%--------------------------------------------------------------------
+t_send_request_api(_) ->
+    ClientId = start_client(),
+    timer:sleep(200),
+    Path = emqx_mgmt_api_test_util:api_path(["gateway/coap/client1/request"]),
+    Token = <<"atoken">>,
+    Payload = <<"simple echo this">>,
+    Req = #{token => Token,
+            payload => Payload,
+            timeout => 10,
+            content_type => <<"text/plain">>,
+            method => <<"get">>},
+    Auth = emqx_mgmt_api_test_util:auth_header_(),
+    {ok, Response} = emqx_mgmt_api_test_util:request_api(post,
+                                                         Path,
+                                                         "method=get",
+                                                         Auth,
+                                                         Req
+                                                        ),
+    #{<<"token">> := RToken, <<"payload">> := RPayload} =
+        emqx_json:decode(Response, [return_maps]),
+    ?assertEqual(Token, RToken),
+    ?assertEqual(Payload, RPayload),
+    erlang:exit(ClientId, kill),
+    ok.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Internal Functions
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+start_client() ->
+    spawn(fun coap_client/0).
+
+coap_client() ->
+    {ok, CSock} = gen_udp:open(0, [binary, {active, false}]),
+    test_send_coap_request(CSock, post, <<>>, [], 1),
+    Response = test_recv_coap_response(CSock),
+    ?assertEqual({ok, created}, Response#coap_message.method),
+    echo_loop(CSock).
+
+echo_loop(CSock) ->
+    #coap_message{payload = Payload} = Req = test_recv_coap_request(CSock),
+    test_send_coap_response(CSock, ?HOST, ?PORT, {ok, content}, Payload, Req),
+    echo_loop(CSock).
+
+test_send_coap_request(UdpSock, Method, Content, Options, MsgId) ->
+    is_list(Options) orelse error("Options must be a list"),
+    case resolve_uri(?CONN_URI) of
+        {coap, {IpAddr, Port}, Path, Query} ->
+            Request0 = emqx_coap_message:request(con, Method, Content,
+                                                 [{uri_path, Path},
+                                                  {uri_query, Query} | Options]),
+            Request = Request0#coap_message{id = MsgId},
+            ?LOGT("send_coap_request Request=~p", [Request]),
+            RequestBinary = emqx_coap_frame:serialize_pkt(Request, undefined),
+            ?LOGT("test udp socket send to ~p:~p, data=~p", [IpAddr, Port, RequestBinary]),
+            ok = gen_udp:send(UdpSock, IpAddr, Port, RequestBinary);
+        {SchemeDiff, ChIdDiff, _, _} ->
+            error(lists:flatten(io_lib:format("scheme ~s or ChId ~s does not match with socket", [SchemeDiff, ChIdDiff])))
+    end.
+
+test_recv_coap_response(UdpSock) ->
+    {ok, {Address, Port, Packet}} = gen_udp:recv(UdpSock, 0, 2000),
+    {ok, Response, _, _} = emqx_coap_frame:parse(Packet, undefined),
+    ?LOGT("test udp receive from ~p:~p, data1=~p, Response=~p", [Address, Port, Packet, Response]),
+    #coap_message{type = ack, method = Method, id=Id, token = Token, options = Options, payload = Payload} = Response,
+    ?LOGT("receive coap response Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]),
+    Response.
+
+test_recv_coap_request(UdpSock) ->
+    case gen_udp:recv(UdpSock, 0) of
+        {ok, {_Address, _Port, Packet}} ->
+            {ok, Request, _, _} = emqx_coap_frame:parse(Packet, undefined),
+            #coap_message{type = con, method = Method, id=Id, token = Token, payload = Payload, options = Options} = Request,
+            ?LOGT("receive coap request Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]),
+            Request;
+        {error, Reason} ->
+            ?LOGT("test_recv_coap_request failed, Reason=~p", [Reason]),
+            timeout_test_recv_coap_request
+    end.
+
+test_send_coap_response(UdpSock, Host, Port, Code, Content, Request) ->
+    is_list(Host) orelse error("Host is not a string"),
+    {ok, IpAddr} = inet:getaddr(Host, inet),
+    Response = emqx_coap_message:piggyback(Code, Content, Request),
+    ?LOGT("test_send_coap_response Response=~p", [Response]),
+    Binary = emqx_coap_frame:serialize_pkt(Response, undefined),
+    ok = gen_udp:send(UdpSock, IpAddr, Port, Binary).
+
+resolve_uri(Uri) ->
+    {ok, #{scheme := Scheme,
+           host := Host,
+           port := PortNo,
+           path := Path} = URIMap} = emqx_http_lib:uri_parse(Uri),
+    Query = maps:get(query, URIMap, undefined),
+    {ok, PeerIP} = inet:getaddr(Host, inet),
+    {Scheme, {PeerIP, PortNo}, split_path(Path), split_query(Query)}.
+
+split_path([]) -> [];
+split_path([$/]) -> [];
+split_path([$/ | Path]) -> split_segments(Path, $/, []).
+
+split_query(undefined) -> #{};
+split_query(Path) ->
+    split_segments(Path, $&, []).
+
+split_segments(Path, Char, Acc) ->
+    case string:rchr(Path, Char) of
+        0 ->
+            [make_segment(Path) | Acc];
+        N when N > 0 ->
+            split_segments(string:substr(Path, 1, N-1), Char,
+                [make_segment(string:substr(Path, N+1)) | Acc])
+    end.
+
+make_segment(Seg) ->
+    list_to_binary(emqx_http_lib:uri_decode(Seg)).
+
+
+get_coap_path(Options) ->
+    get_path(Options, <<>>).
+
+get_coap_query(Options) ->
+    proplists:get_value(uri_query, Options, []).
+
+get_coap_observe(Options) ->
+    get_observe(Options).
+
+
+get_path([], Acc) ->
+    %?LOGT("get_path Acc=~p", [Acc]),
+    Acc;
+get_path([{uri_path, Path1}|T], Acc) ->
+    %?LOGT("Path=~p, Acc=~p", [Path1, Acc]),
+    get_path(T, join_path(Path1, Acc));
+get_path([{_, _}|T], Acc) ->
+    get_path(T, Acc).
+
+get_observe([]) ->
+    undefined;
+get_observe([{observe, V}|_T]) ->
+    V;
+get_observe([{_, _}|T]) ->
+    get_observe(T).
+
+join_path([], Acc) -> Acc;
+join_path([<<"/">>|T], Acc) ->
+    join_path(T, Acc);
+join_path([H|T], Acc) ->
+    join_path(T, <<Acc/binary, $/, H/binary>>).
+
+sprintf(Format, Args) ->
+    lists:flatten(io_lib:format(Format, Args)).

+ 17 - 10
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -21,23 +21,30 @@
 -define(BASE_PATH, "/api/v5").
 -define(BASE_PATH, "/api/v5").
 
 
 init_suite() ->
 init_suite() ->
+    init_suite([]).
+
+init_suite(Apps) ->
     ekka_mnesia:start(),
     ekka_mnesia:start(),
     application:load(emqx_management),
     application:load(emqx_management),
-    emqx_ct_helpers:start_apps([emqx_dashboard], fun set_special_configs/1).
+    emqx_ct_helpers:start_apps(Apps ++ [emqx_dashboard], fun set_special_configs/1).
+
 
 
 end_suite() ->
 end_suite() ->
+    end_suite([]).
+
+end_suite(Apps) ->
     application:unload(emqx_management),
     application:unload(emqx_management),
-    emqx_ct_helpers:stop_apps([emqx_dashboard]).
+    emqx_ct_helpers:stop_apps(Apps ++ [emqx_dashboard]).
 
 
 set_special_configs(emqx_dashboard) ->
 set_special_configs(emqx_dashboard) ->
     Config = #{
     Config = #{
-        default_username => <<"admin">>,
-        default_password => <<"public">>,
-        listeners => [#{
-            protocol => http,
-            port => 18083
-        }]
-    },
+               default_username => <<"admin">>,
+               default_password => <<"public">>,
+               listeners => [#{
+                               protocol => http,
+                               port => 18083
+                              }]
+              },
     emqx_config:put([emqx_dashboard], Config),
     emqx_config:put([emqx_dashboard], Config),
     ok;
     ok;
 set_special_configs(_App) ->
 set_special_configs(_App) ->
@@ -53,7 +60,7 @@ request_api(Method, Url, QueryParams, Auth) ->
     request_api(Method, Url, QueryParams, Auth, []).
     request_api(Method, Url, QueryParams, Auth, []).
 
 
 request_api(Method, Url, QueryParams, Auth, [])
 request_api(Method, Url, QueryParams, Auth, [])
-    when (Method =:= options) orelse
+  when (Method =:= options) orelse
          (Method =:= get) orelse
          (Method =:= get) orelse
          (Method =:= put) orelse
          (Method =:= put) orelse
          (Method =:= head) orelse
          (Method =:= head) orelse