Просмотр исходного кода

Merge pull request #6456 from HJianBo/gw-review-r1

JianBo He 4 лет назад
Родитель
Сommit
db08cee20f
24 измененных файлов с 513 добавлено и 227 удалено
  1. 6 0
      apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl
  2. 140 37
      apps/emqx_gateway/src/coap/emqx_coap_channel.erl
  3. 8 5
      apps/emqx_gateway/src/coap/emqx_coap_frame.erl
  4. 2 2
      apps/emqx_gateway/src/coap/emqx_coap_medium.erl
  5. 5 5
      apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl
  6. 4 2
      apps/emqx_gateway/src/coap/emqx_coap_session.erl
  7. 4 3
      apps/emqx_gateway/src/coap/emqx_coap_tm.erl
  8. 3 4
      apps/emqx_gateway/src/coap/emqx_coap_transport.erl
  9. 2 2
      apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl
  10. 1 1
      apps/emqx_gateway/src/coap/include/emqx_coap.hrl
  11. 0 2
      apps/emqx_gateway/src/emqx_gateway.erl
  12. 10 10
      apps/emqx_gateway/src/emqx_gateway_api.erl
  13. 2 2
      apps/emqx_gateway/src/emqx_gateway_api_authn.erl
  14. 30 17
      apps/emqx_gateway/src/emqx_gateway_api_clients.erl
  15. 3 1
      apps/emqx_gateway/src/emqx_gateway_conf.erl
  16. 20 15
      apps/emqx_gateway/src/emqx_gateway_http.erl
  17. 2 2
      apps/emqx_gateway/src/emqx_gateway_schema.erl
  18. 1 1
      apps/emqx_gateway/src/emqx_gateway_utils.erl
  19. 22 11
      apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl
  20. 147 25
      apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl
  21. 83 62
      apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl
  22. 3 3
      apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl
  23. 14 14
      apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl
  24. 1 1
      apps/emqx_gateway/test/emqx_stomp_SUITE.erl

+ 6 - 0
apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl

@@ -32,6 +32,12 @@
 
 -type serialize_options() :: map().
 
+-export_type([ parse_state/0
+             , parse_result/0
+             , serialize_options/0
+             , frame/0
+             ]).
+
 %% Callbacks
 
 %% @doc Initial the frame parser states

+ 140 - 37
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -25,7 +25,8 @@
         , validator/4
         , metrics_inc/2
         , run_hooks/3
-        , send_request/2]).
+        , send_request/2
+        ]).
 
 -export([ init/2
         , handle_in/2
@@ -48,59 +49,76 @@
 -define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
 
 -record(channel, {
-                  %% Context
-                  ctx           :: emqx_gateway_ctx:context(),
-                  %% Connection Info
-                  conninfo      :: emqx_types:conninfo(),
-                  %% Client Info
-                  clientinfo    :: emqx_types:clientinfo(),
-                  %% Session
-                  session       :: emqx_coap_session:session() | undefined,
-                  %% Keepalive
-                  keepalive     :: emqx_keepalive:keepalive() | undefined,
-                  %% Timer
-                  timers :: #{atom() => disable | undefined | reference()},
-
-                  connection_required :: boolean(),
-
-                  conn_state :: idle | connected | disconnected,
-
-                  token :: binary() | undefined
-                 }).
+          %% Context
+          ctx           :: emqx_gateway_ctx:context(),
+          %% Connection Info
+          conninfo      :: emqx_types:conninfo(),
+          %% Client Info
+          clientinfo    :: emqx_types:clientinfo(),
+          %% Session
+          session       :: emqx_coap_session:session() | undefined,
+          %% Keepalive
+          keepalive     :: emqx_keepalive:keepalive() | undefined,
+          %% Timer
+          timers :: #{atom() => disable | undefined | reference()},
+          %% Connection mode
+          connection_required :: boolean(),
+          %% Connection State
+          conn_state :: conn_state(),
+          %% Session token to identity this connection
+          token :: binary() | undefined
+         }).
 
 -type channel() :: #channel{}.
+
+-type conn_state() :: idle | connecting | connected | disconnected.
+
+-type reply() :: {outgoing, coap_message()}
+               | {outgoing, [coap_message()]}
+               | {event, conn_state()|updated}
+               | {close, Reason :: atom()}.
+
+-type replies() :: reply() | [reply()].
+
 -define(TOKEN_MAXIMUM, 4294967295).
+
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
+
 -define(DEF_IDLE_TIME, timer:seconds(30)).
 -define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)).
 
 -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
+
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
 
+-spec info(channel()) -> emqx_types:infos().
 info(Channel) ->
     maps:from_list(info(?INFO_KEYS, Channel)).
 
+-spec info(list(atom())|atom(), channel()) -> term().
 info(Keys, Channel) when is_list(Keys) ->
     [{Key, info(Key, Channel)} || Key <- Keys];
 
 info(conninfo, #channel{conninfo = ConnInfo}) ->
     ConnInfo;
-info(conn_state, #channel{conn_state = CState}) ->
-    CState;
+info(conn_state, #channel{conn_state = ConnState}) ->
+    ConnState;
 info(clientinfo, #channel{clientinfo = ClientInfo}) ->
     ClientInfo;
 info(session, #channel{session = Session}) ->
-    emqx_misc:maybe_apply(fun emqx_session:info/1, Session);
+    emqx_misc:maybe_apply(fun emqx_coap_session:info/1, Session);
 info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
     ClientId;
 info(ctx, #channel{ctx = Ctx}) ->
     Ctx.
 
+-spec stats(channel()) -> emqx_types:stats().
 stats(_) ->
     [].
 
+-spec init(map(), map()) -> channel().
 init(ConnInfoT = #{peername := {PeerHost, _},
                    sockname := {_, SockPort}},
      #{ctx := Ctx} = Config) ->
@@ -126,8 +144,8 @@ init(ConnInfoT = #{peername := {PeerHost, _},
                     }
                   ),
 
-    %% because it is possible to disconnect after init, and then trigger the $event.disconnected hook
-    %% and these two fields are required in the hook
+    %% because it is possible to disconnect after init, and then trigger the
+    %% $event.disconnected hook and these two fields are required in the hook
     ConnInfo = ConnInfoT#{proto_name => <<"CoAP">>, proto_ver => <<"1">>},
 
     Heartbeat = ?GET_IDLE_TIME(Config),
@@ -144,13 +162,19 @@ init(ConnInfoT = #{peername := {PeerHost, _},
 validator(Type, Topic, Ctx, ClientInfo) ->
     emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic).
 
--spec send_request(pid(), emqx_coap_message()) -> any().
+-spec send_request(pid(), coap_message()) -> any().
 send_request(Channel, Request) ->
     gen_server:send_request(Channel, {?FUNCTION_NAME, Request}).
 
 %%--------------------------------------------------------------------
 %% Handle incoming packet
 %%--------------------------------------------------------------------
+
+-spec handle_in(coap_message() | {frame_error, any()}, channel())
+      -> {ok, channel()}
+       | {ok, replies(), channel()}
+       | {shutdown, Reason :: term(), channel()}
+       | {shutdown, Reason :: term(), replies(), channel()}.
 handle_in(Msg, ChannleT) ->
     Channel = ensure_keepalive_timer(ChannleT),
     case emqx_coap_message:is_request(Msg) of
@@ -170,6 +194,7 @@ handle_deliver(Delivers, #channel{session = Session,
 %%--------------------------------------------------------------------
 %% Handle timeout
 %%--------------------------------------------------------------------
+
 handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) ->
     case emqx_keepalive:check(NewVal, KeepAlive) of
         {ok, NewKeepAlive} ->
@@ -191,10 +216,72 @@ handle_timeout(_, _, Channel) ->
 %%--------------------------------------------------------------------
 %% Handle call
 %%--------------------------------------------------------------------
+
+-spec(handle_call(Req :: term(), From :: term(), channel())
+    -> {reply, Reply :: term(), channel()}
+     | {reply, Reply :: term(), replies(), channel()}
+     | {shutdown, Reason :: term(), Reply :: term(), channel()}
+     | {shutdown, Reason :: term(), Reply :: term(), coap_message(), channel()}).
 handle_call({send_request, Msg}, From, Channel) ->
     Result = call_session(handle_out, {{send_request, From}, Msg}, Channel),
     erlang:setelement(1, Result, noreply);
 
+handle_call({subscribe, Topic, SubOpts}, _From,
+            Channel = #channel{
+                         ctx = Ctx,
+                         clientinfo = ClientInfo
+                                    = #{clientid := ClientId,
+                                        mountpoint := Mountpoint},
+                         session = Session}) ->
+    Token = maps:get(token,
+                     maps:get(sub_props, SubOpts, #{}),
+                     <<>>),
+    NSubOpts = maps:merge(
+                 emqx_gateway_utils:default_subopts(),
+                 SubOpts),
+    MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
+    _ = emqx_broker:subscribe(MountedTopic, ClientId, NSubOpts),
+
+    _ = run_hooks(Ctx, 'session.subscribed',
+                  [ClientInfo, MountedTopic, NSubOpts]),
+    %% modifty session state
+    SubReq = {Topic, Token},
+    TempMsg = #coap_message{type = non},
+    Result  = emqx_coap_session:process_subscribe(
+                SubReq, TempMsg, #{}, Session),
+    NSession = maps:get(session, Result),
+    {reply, {ok, {MountedTopic, NSubOpts}}, Channel#channel{session = NSession}};
+
+handle_call({unsubscribe, Topic}, _From,
+            Channel = #channel{
+                         ctx = Ctx,
+                         clientinfo = ClientInfo
+                                    = #{mountpoint := Mountpoint},
+                         session = Session}) ->
+    MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
+    ok = emqx_broker:unsubscribe(MountedTopic),
+    _ = run_hooks(Ctx, 'session.unsubscribe',
+                  [ClientInfo, MountedTopic, #{}]),
+
+    %% modifty session state
+    UnSubReq = Topic,
+    TempMsg = #coap_message{type = non},
+    Result  = emqx_coap_session:process_subscribe(
+                UnSubReq, TempMsg, #{}, Session),
+    NSession = maps:get(session, Result),
+    {reply, ok, Channel#channel{session = NSession}};
+
+handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
+    Subs = emqx_coap_session:info(subscriptions, Session),
+    {reply, {ok, maps:to_list(Subs)}, Channel};
+
+handle_call(kick, _From, Channel) ->
+    NChannel = ensure_disconnected(kicked, Channel),
+    shutdown_and_reply(kicked, ok, NChannel);
+
+handle_call(discard, _From, Channel) ->
+    shutdown_and_reply(discarded, ok, Channel);
+
 handle_call(Req, _From, Channel) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, Channel}.
@@ -202,6 +289,9 @@ handle_call(Req, _From, Channel) ->
 %%--------------------------------------------------------------------
 %% Handle Cast
 %%--------------------------------------------------------------------
+
+-spec handle_cast(Req :: term(), channel())
+      -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
 handle_cast(Req, Channel) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
     {ok, Channel}.
@@ -209,9 +299,9 @@ handle_cast(Req, Channel) ->
 %%--------------------------------------------------------------------
 %% Handle Info
 %%--------------------------------------------------------------------
-handle_info({subscribe, _}, Channel) ->
-    {ok, Channel};
 
+-spec(handle_info(Info :: term(), channel())
+      -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
 handle_info(Info, Channel) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {ok, Channel}.
@@ -352,15 +442,6 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
     Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
     {ok, ClientInfo#{mountpoint := Mountpoint1}}.
 
-ensure_connected(Channel = #channel{ctx = Ctx,
-                                    conninfo = ConnInfo,
-                                    clientinfo = ClientInfo}) ->
-    NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond)
-                         },
-    ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
-    _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]),
-    Channel#channel{conninfo = NConnInfo}.
-
 process_connect(#channel{ctx = Ctx,
                          session = Session,
                          conninfo = ConnInfo,
@@ -401,6 +482,21 @@ run_hooks(Ctx, Name, Args, Acc) ->
 metrics_inc(Name, Ctx) ->
     emqx_gateway_ctx:metrics_inc(Ctx, Name).
 
+%%--------------------------------------------------------------------
+%% Ensure connected
+
+ensure_connected(Channel = #channel{ctx = Ctx,
+                                    conninfo = ConnInfo,
+                                    clientinfo = ClientInfo}) ->
+    NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond)
+                         },
+    _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]),
+    ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
+    Channel#channel{conninfo = NConnInfo, conn_state = connected}.
+
+%%--------------------------------------------------------------------
+%% Ensure disconnected
+
 ensure_disconnected(Reason, Channel = #channel{
                                          ctx = Ctx,
                                          conninfo = ConnInfo,
@@ -409,9 +505,16 @@ ensure_disconnected(Reason, Channel = #channel{
     ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]),
     Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
 
+shutdown_and_reply(Reason, Reply, Channel) ->
+    {shutdown, Reason, Reply, Channel}.
+
+%shutdown_and_reply(Reason, Reply, OutPkt, Channel) ->
+%    {shutdown, Reason, Reply, OutPkt, Channel}.
+
 %%--------------------------------------------------------------------
 %% Call Chain
 %%--------------------------------------------------------------------
+
 call_session(Fun, Msg, #channel{session = Session} = Channel) ->
     Result = emqx_coap_session:Fun(Msg, Session),
     handle_result(Result, Channel).

+ 8 - 5
apps/emqx_gateway/src/coap/emqx_coap_frame.erl

@@ -18,17 +18,15 @@
 
 -behaviour(emqx_gateway_frame).
 
-%% emqx_gateway_frame API
+%% emqx_gateway_frame callbacks
 -export([ initial_parse_state/1
         , serialize_opts/0
         , serialize_pkt/2
         , parse/2
         , format/1
         , type/1
-        , is_message/1]).
-
-%% API
--export([]).
+        , is_message/1
+        ]).
 
 -include("include/emqx_coap.hrl").
 -include("apps/emqx/include/types.hrl").
@@ -58,9 +56,11 @@
 %% API
 %%--------------------------------------------------------------------
 
+-spec initial_parse_state(map()) -> emqx_gateway_frame:parse_state().
 initial_parse_state(_) ->
     #{}.
 
+-spec serialize_opts() -> emqx_gateway_frame:serialize_options().
 serialize_opts() ->
     #{}.
 
@@ -235,6 +235,9 @@ method_to_class_code(Method) ->
 %%--------------------------------------------------------------------
 %% parse
 %%--------------------------------------------------------------------
+
+-spec parse(binary(), emqx_gateway_frame:parse_state())
+    -> emqx_gateway_frame:parse_result().
 parse(<<?VERSION:2, Type:2, 0:4, 0:3, 0:5, MsgId:16>>, ParseState) ->
     {ok,
      #coap_message{ type = decode_type(Type)

+ 2 - 2
apps/emqx_gateway/src/coap/emqx_coap_medium.erl

@@ -53,8 +53,8 @@ out(Msg, Result) ->
 proto_out(Proto) ->
     proto_out(Proto, #{}).
 
-proto_out(Proto, Resut) ->
-    Resut#{proto => Proto}.
+proto_out(Proto, Result) ->
+    Result#{proto => Proto}.
 
 reply(Method, Req) when not is_record(Method, coap_message) ->
     reply(Method, <<>>, Req);

+ 5 - 5
apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl

@@ -23,7 +23,6 @@
 
 -define(MAX_SEQ_ID, 16777215).
 
--type topic() :: binary().
 -type token() :: binary().
 -type seq_id() :: 0 .. ?MAX_SEQ_ID.
 
@@ -31,7 +30,7 @@
                 , seq_id := seq_id()
                 }.
 
--type manager() :: #{topic => res()}.
+-type manager() :: #{emqx_types:topic() => res()}.
 
 %%--------------------------------------------------------------------
 %% API
@@ -40,7 +39,7 @@
 new_manager() ->
     #{}.
 
--spec insert(topic(), token(), manager()) -> {seq_id(), manager()}.
+-spec insert(emqx_types:topic(), token(), manager()) -> {seq_id(), manager()}.
 insert(Topic, Token, Manager) ->
     Res = case maps:get(Topic, Manager, undefined) of
               undefined ->
@@ -50,11 +49,11 @@ insert(Topic, Token, Manager) ->
           end,
     {maps:get(seq_id, Res), Manager#{Topic => Res}}.
 
--spec remove(topic(), manager()) -> manager().
+-spec remove(emqx_types:topic(), manager()) -> manager().
 remove(Topic, Manager) ->
     maps:remove(Topic, Manager).
 
--spec res_changed(topic(), manager()) -> undefined | {token(), seq_id(), manager()}.
+-spec res_changed(emqx_types:topic(), manager()) -> undefined | {token(), seq_id(), manager()}.
 res_changed(Topic, Manager) ->
     case maps:get(Topic, Manager, undefined) of
         undefined ->
@@ -73,6 +72,7 @@ foreach(F, Manager) ->
               Manager),
     ok.
 
+-spec subscriptions(manager()) -> [emqx_types:topic()].
 subscriptions(Manager) ->
     maps:keys(Manager).
 

+ 4 - 2
apps/emqx_gateway/src/coap/emqx_coap_session.erl

@@ -22,7 +22,8 @@
 
 %% API
 -export([ new/0
-        , process_subscribe/4]).
+        , process_subscribe/4
+        ]).
 
 -export([ info/1
         , info/2
@@ -90,7 +91,8 @@ info(Session) ->
 info(Keys, Session) when is_list(Keys) ->
     [{Key, info(Key, Session)} || Key <- Keys];
 info(subscriptions, #session{observe_manager = OM}) ->
-    emqx_coap_observe_res:subscriptions(OM);
+    Topics = emqx_coap_observe_res:subscriptions(OM),
+    lists:foldl(fun(T, Acc) -> Acc#{T => ?DEFAULT_SUBOPTS} end, #{}, Topics);
 info(subscriptions_cnt, #session{observe_manager = OM}) ->
     erlang:length(emqx_coap_observe_res:subscriptions(OM));
 info(subscriptions_max, _) ->

+ 4 - 3
apps/emqx_gateway/src/coap/emqx_coap_tm.erl

@@ -63,7 +63,7 @@
 
 -type event_result(State) ::
         #{next => State,
-          outgoing => emqx_coap_message(),
+          outgoing => coap_message(),
           timeouts => list(ttimeout()),
           has_sub  => undefined | sub_register(),
           transport => emqx_coap_transport:transprot()}.
@@ -75,12 +75,13 @@
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
+
+-spec new() -> manager().
 new() ->
     #{ seq_id => 1
      , next_msg_id => rand:uniform(?MAX_MESSAGE_ID)
      }.
 
-%% client request
 handle_request(#coap_message{id = MsgId} = Msg, TM) ->
     Id = {in, MsgId},
     case find_machine(Id, TM) of
@@ -296,7 +297,7 @@ new_in_machine(MachineId, #{seq_id := SeqId} = Manager) ->
                        SeqId => Machine,
                        MachineId => SeqId}}.
 
--spec new_out_machine(state_machine_key(), any(), emqx_coap_message(), manager()) ->
+-spec new_out_machine(state_machine_key(), any(), coap_message(), manager()) ->
           {state_machine(), manager()}.
 new_out_machine(MachineId,
                 Ctx,

+ 3 - 4
apps/emqx_gateway/src/coap/emqx_coap_transport.erl

@@ -11,7 +11,7 @@
 
 -type request_context() :: any().
 
--record(transport, { cache :: undefined | emqx_coap_message()
+-record(transport, { cache :: undefined | coap_message()
                    , req_context :: request_context()
                    , retry_interval :: non_neg_integer()
                    , retry_count :: non_neg_integer()
@@ -26,7 +26,6 @@
 
 -export_type([transport/0]).
 
--import(emqx_coap_message, [reset/1]).
 -import(emqx_coap_medium, [ empty/0, reset/2, proto_out/2
                           , out/1, out/2, proto_out/1
                           , reply/2]).
@@ -166,7 +165,7 @@ observe(in,
         {error, _} ->
             #{next => stop};
         _ ->
-            reset(Message)
+            emqx_coap_message:reset(Message)
     end.
 
 until_stop(_, _, _) ->
@@ -187,5 +186,5 @@ on_response(#coap_message{type = Type} = Message,
                       out(Ack, #{next => NextState,
                                  transport => Transport#transport{cache = Ack}}));
        true ->
-            reset(Message)
+            emqx_coap_message:reset(Message)
     end.

+ 2 - 2
apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl

@@ -28,7 +28,7 @@
 
 -define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}).
 -define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}).
--define(SUBOPTS, #{qos => 0, rh => 0, rap => 0, nl => 0, is_new => false}).
+-define(SUBOPTS, #{qos => 0, rh => 1, rap => 0, nl => 0, is_new => false}).
 
 %% TODO maybe can merge this code into emqx_coap_session, simplify the call chain
 
@@ -146,7 +146,7 @@ subscribe(#coap_message{token = Token} = Msg, Topic, Ctx, CInfo) ->
             SubOpts = get_sub_opts(Msg),
             MountTopic = mount(CInfo, Topic),
             emqx_broker:subscribe(MountTopic, ClientId, SubOpts),
-            run_hooks(Ctx, 'session.subscribed', [CInfo, Topic, SubOpts]),
+            run_hooks(Ctx, 'session.subscribed', [CInfo, MountTopic, SubOpts]),
             ?SUB(MountTopic, Token, Msg);
         _ ->
             reply({error, unauthorized}, Msg)

+ 1 - 1
apps/emqx_gateway/src/coap/include/emqx_coap.hrl

@@ -73,4 +73,4 @@
                       , options = #{}
                       , payload = <<>>}).
 
--type emqx_coap_message() :: #coap_message{}.
+-type coap_message() :: #coap_message{}.

+ 0 - 2
apps/emqx_gateway/src/emqx_gateway.erl

@@ -16,8 +16,6 @@
 
 -module(emqx_gateway).
 
--behaviour(emqx_config_handler).
-
 -include("include/emqx_gateway.hrl").
 
 %% Gateway APIs

+ 10 - 10
apps/emqx_gateway/src/emqx_gateway_api.erl

@@ -497,11 +497,11 @@ examples_gateway_confs() ->
              , auto_observe => false
              , update_msg_publish_condition => <<"always">>
              , translators =>
-                #{ command => #{topic => <<"/dn/#">>}
-                 , response => #{topic => <<"/up/resp">>}
-                 , notify => #{topic => <<"/up/notify">>}
-                 , register => #{topic => <<"/up/resp">>}
-                 , update => #{topic => <<"/up/resp">>}
+                #{ command => #{topic => <<"dn/#">>}
+                 , response => #{topic => <<"up/resp">>}
+                 , notify => #{topic => <<"up/notify">>}
+                 , register => #{topic => <<"up/resp">>}
+                 , update => #{topic => <<"up/resp">>}
                  }
              , listeners =>
                 [ #{ type => <<"udp">>
@@ -599,11 +599,11 @@ examples_update_gateway_confs() ->
              , auto_observe => false
              , update_msg_publish_condition => <<"always">>
              , translators =>
-                #{ command => #{topic => <<"/dn/#">>}
-                 , response => #{topic => <<"/up/resp">>}
-                 , notify => #{topic => <<"/up/notify">>}
-                 , register => #{topic => <<"/up/resp">>}
-                 , update => #{topic => <<"/up/resp">>}
+                #{ command => #{topic => <<"dn/#">>}
+                 , response => #{topic => <<"up/resp">>}
+                 , notify => #{topic => <<"up/notify">>}
+                 , register => #{topic => <<"up/resp">>}
+                 , update => #{topic => <<"up/resp">>}
                  }
              }
          }

+ 2 - 2
apps/emqx_gateway/src/emqx_gateway_api_authn.erl

@@ -194,8 +194,8 @@ schema("/gateway/:name/authentication/users") ->
           , responses =>
               ?STANDARD_RESP(
                  #{ 200 => emqx_dashboard_swagger:schema_with_example(
-                             ref(emqx_authn_api, response_user),
-                             emqx_authn_api:response_user_examples())
+                             ref(emqx_authn_api, response_users),
+                             emqx_authn_api:response_users_example())
                   })
           },
        post =>

+ 30 - 17
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -87,8 +87,7 @@ paths() ->
     , {<<"lte_lifetime">>, timestamp}
     ]).
 
--define(query_fun, {?MODULE, query}).
--define(format_fun, {?MODULE, format_channel_info}).
+-define(QUERY_FUN, {?MODULE, query}).
 
 clients(get, #{ bindings := #{name := Name0}
               , query_string := Params
@@ -99,14 +98,14 @@ clients(get, #{ bindings := #{name := Name0}
             undefined ->
                 Response = emqx_mgmt_api:cluster_query(
                              Params, TabName,
-                             ?CLIENT_QS_SCHEMA, ?query_fun),
+                             ?CLIENT_QS_SCHEMA, ?QUERY_FUN),
                 emqx_mgmt_util:generate_response(Response);
             Node1 ->
                 Node = binary_to_atom(Node1, utf8),
                 ParamsWithoutNode = maps:without([<<"node">>], Params),
                 Response = emqx_mgmt_api:node_query(
                              Node, ParamsWithoutNode,
-                             TabName, ?CLIENT_QS_SCHEMA, ?query_fun),
+                             TabName, ?CLIENT_QS_SCHEMA, ?QUERY_FUN),
                 emqx_mgmt_util:generate_response(Response)
         end
     end).
@@ -148,6 +147,10 @@ subscriptions(get, #{ bindings := #{name := Name0,
     ClientId = emqx_mgmt_util:urldecode(ClientId0),
     with_gateway(Name0, fun(GwName, _) ->
         case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
+            {error, nosupport} ->
+                return_http_error(405, <<"Not support to list subscriptions">>);
+            {error, noimpl} ->
+                return_http_error(501, <<"Not implemented now">>);
             {error, Reason} ->
                 return_http_error(500, Reason);
             {ok, Subs} ->
@@ -165,13 +168,21 @@ subscriptions(post, #{ bindings := #{name := Name0,
         case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
             {undefined, _} ->
                 return_http_error(400, "Miss topic property");
-            {Topic, QoS} ->
+            {Topic, SubOpts} ->
                 case emqx_gateway_http:client_subscribe(
-                       GwName, ClientId, Topic, QoS) of
+                       GwName, ClientId, Topic, SubOpts) of
+                    {error, nosupport} ->
+                        return_http_error(
+                          405,
+                          <<"Not support to add a subscription">>);
+                    {error, noimpl} ->
+                        return_http_error(
+                          501,
+                          <<"Not implemented now">>);
                     {error, Reason} ->
                         return_http_error(404, Reason);
-                    ok ->
-                        {204}
+                    {ok, {NTopic, NSubOpts}}->
+                        {201, maps:merge(NSubOpts, #{topic => NTopic})}
                 end
         end
     end);
@@ -193,12 +204,16 @@ subscriptions(delete, #{ bindings := #{name := Name0,
 %% Utils
 
 subopts(Req) ->
-    #{ qos => maps:get(<<"qos">>, Req, 0)
-     , rap => maps:get(<<"rap">>, Req, 0)
-     , nl => maps:get(<<"nl">>, Req, 0)
-     , rh => maps:get(<<"rh">>, Req, 0)
-     , sub_props => extra_sub_props(maps:get(<<"sub_props">>, Req, #{}))
-     }.
+    SubOpts = #{ qos => maps:get(<<"qos">>, Req, 0)
+               , rap => maps:get(<<"rap">>, Req, 0)
+               , nl => maps:get(<<"nl">>, Req, 0)
+               , rh => maps:get(<<"rh">>, Req, 1)
+               },
+    SubProps = extra_sub_props(maps:get(<<"sub_props">>, Req, #{})),
+    case maps:size(SubProps) of
+        0 -> SubOpts;
+        _ -> maps:put(sub_props, SubProps, SubOpts)
+    end.
 
 extra_sub_props(Props) ->
     maps:filter(
@@ -444,8 +459,7 @@ schema("/gateway/:name/clients/:clientid/subscriptions") ->
      , post =>
         #{ description => <<"Create a subscription membership">>
          , parameters => params_client_insta()
-         %% FIXME:
-         , requestBody => emqx_dashboard_swagger:schema_with_examples(
+         , 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
                             ref(subscription),
                             examples_subsctiption())
          , responses =>
@@ -878,5 +892,4 @@ example_general_subscription() ->
      , nl => 0
      , rap => 0
      , rh => 0
-     , sub_props => #{}
      }.

+ 3 - 1
apps/emqx_gateway/src/emqx_gateway_conf.erl

@@ -17,6 +17,8 @@
 %% @doc The gateway configuration management module
 -module(emqx_gateway_conf).
 
+-behaviour(emqx_config_handler).
+
 %% Load/Unload
 -export([ load/0
         , unload/0
@@ -270,7 +272,7 @@ ret_gw(GwName, {ok, #{raw_config := GwConf}}) ->
                 lists:map(fun({LName, LConf}) ->
                     do_convert_listener2(GwName, LType, LName, LConf)
                 end, maps:to_list(SubConf)),
-            [NLConfs|Acc]
+            [NLConfs | Acc]
         end, [], maps:to_list(LsConf)),
     {ok, maps:merge(GwConf1, #{<<"listeners">> => NLsConf})};
 ret_gw(_GwName, Err) -> Err.

+ 20 - 15
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -235,7 +235,7 @@ confexp({error, already_exist}) ->
 %%--------------------------------------------------------------------
 
 -spec lookup_client(gateway_name(),
-                    emqx_type:clientid(), {atom(), atom()}) -> list().
+                    emqx_types:clientid(), {atom(), atom()}) -> list().
 lookup_client(GwName, ClientId, FormatFun) ->
     lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun)
                   || Node <- mria_mnesia:running_nodes()]).
@@ -253,7 +253,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) ->
     rpc_call(Node, lookup_client,
              [Node, GwName, {clientid, ClientId}, FormatFun]).
 
--spec kickout_client(gateway_name(), emqx_type:clientid())
+-spec kickout_client(gateway_name(), emqx_types:clientid())
     -> {error, any()}
      | ok.
 kickout_client(GwName, ClientId) ->
@@ -270,25 +270,28 @@ kickout_client(Node, GwName, ClientId) when Node =:= node() ->
 kickout_client(Node, GwName, ClientId) ->
     rpc_call(Node, kickout_client, [Node, GwName, ClientId]).
 
--spec list_client_subscriptions(gateway_name(), emqx_type:clientid())
+-spec list_client_subscriptions(gateway_name(), emqx_types:clientid())
     -> {error, any()}
      | {ok, list()}.
 list_client_subscriptions(GwName, ClientId) ->
-    %% Get the subscriptions from session-info
     with_channel(GwName, ClientId,
         fun(Pid) ->
-            Subs = emqx_gateway_conn:call(
-                     Pid,
-                     subscriptions, ?DEFAULT_CALL_TIMEOUT),
-            {ok, lists:map(fun({Topic, SubOpts}) ->
-                     SubOpts#{topic => Topic}
-                 end, Subs)}
+            case emqx_gateway_conn:call(
+                   Pid,
+                   subscriptions, ?DEFAULT_CALL_TIMEOUT) of
+                {ok, Subs} ->
+                    {ok, lists:map(fun({Topic, SubOpts}) ->
+                        SubOpts#{topic => Topic}
+                    end, Subs)};
+                {error, Reason} ->
+                    {error, Reason}
+            end
         end).
 
--spec client_subscribe(gateway_name(), emqx_type:clientid(),
-                       emqx_type:topic(), emqx_type:subopts())
+-spec client_subscribe(gateway_name(), emqx_types:clientid(),
+                       emqx_types:topic(), emqx_types:subopts())
     -> {error, any()}
-     | ok.
+     | {ok, {emqx_types:topic(), emqx_types:subopts()}}.
 client_subscribe(GwName, ClientId, Topic, SubOpts) ->
     with_channel(GwName, ClientId,
         fun(Pid) ->
@@ -299,7 +302,7 @@ client_subscribe(GwName, ClientId, Topic, SubOpts) ->
         end).
 
 -spec client_unsubscribe(gateway_name(),
-                         emqx_type:clientid(), emqx_type:topic())
+                         emqx_types:clientid(), emqx_types:topic())
     -> {error, any()}
      | ok.
 client_unsubscribe(GwName, ClientId, Topic) ->
@@ -330,7 +333,9 @@ return_http_error(Code, Msg) ->
 codestr(400) -> 'BAD_REQUEST';
 codestr(401) -> 'NOT_SUPPORTED_NOW';
 codestr(404) -> 'RESOURCE_NOT_FOUND';
-codestr(500) -> 'UNKNOW_ERROR'.
+codestr(405) -> 'METHOD_NOT_ALLOWED';
+codestr(500) -> 'UNKNOW_ERROR';
+codestr(501) -> 'NOT_IMPLEMENTED'.
 
 -spec with_authn(binary(), function()) -> any().
 with_authn(GwName0, Fun) ->

+ 2 - 2
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -449,7 +449,7 @@ it has two purposes:
        sc(ref(clientinfo_override),
           #{ desc => ""
            })}
-    , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME,  authentication_schema()}
+    , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()}
     ].
 
 common_listener_opts() ->
@@ -468,7 +468,7 @@ common_listener_opts() ->
        sc(integer(),
           #{ default => 1000
            })}
-    , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME,  authentication_schema()}
+    , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()}
     , {mountpoint,
        sc(binary(),
           #{ default => undefined

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -321,7 +321,7 @@ default_udp_options() ->
     [binary].
 
 default_subopts() ->
-    #{rh  => 0, %% Retain Handling
+    #{rh  => 1, %% Retain Handling
       rap => 0, %% Retain as Publish
       nl  => 0, %% No Local
       qos => 0, %% QoS

+ 22 - 11
apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl

@@ -334,13 +334,14 @@ handle_call({subscribe_from_client, TopicFilter, Qos}, _From,
         deny ->
             {reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
         _ ->
-            {ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
+            {ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
             {reply, ok, NChannel}
     end;
 
 handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
-    {ok, NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
-    {reply, ok, NChannel};
+    {ok,
+     [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
+    {reply, {ok, {NTopicFilter, NSubOpts}}, NChannel};
 
 handle_call({unsubscribe_from_client, TopicFilter}, _From,
             Channel = #channel{conn_state = connected}) ->
@@ -351,6 +352,9 @@ handle_call({unsubscribe, Topic}, _From, Channel) ->
     {ok, NChannel} = do_unsubscribe([Topic], Channel),
     {reply, ok, NChannel};
 
+handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
+    {reply, {ok, maps:to_list(Subs)}, Channel};
+
 handle_call({publish, Topic, Qos, Payload}, _From,
             Channel = #channel{
                          ctx = Ctx,
@@ -369,7 +373,10 @@ handle_call({publish, Topic, Qos, Payload}, _From,
     end;
 
 handle_call(kick, _From, Channel) ->
-    {shutdown, kicked, ok, Channel};
+    {shutdown, kicked, ok, ensure_disconnected(kicked, Channel)};
+
+handle_call(discard, _From, Channel) ->
+    {shutdown, discarded, ok, Channel};
 
 handle_call(Req, _From, Channel) ->
     ?SLOG(warning, #{ msg => "unexpected_call"
@@ -431,11 +438,12 @@ terminate(Reason, Channel) ->
 %%--------------------------------------------------------------------
 
 do_subscribe(TopicFilters, Channel) ->
-    NChannel = lists:foldl(
-        fun({TopicFilter, SubOpts}, ChannelAcc) ->
-            do_subscribe(TopicFilter, SubOpts, ChannelAcc)
-        end, Channel, parse_topic_filters(TopicFilters)),
-    {ok, NChannel}.
+    {MadeSubs, NChannel} = lists:foldl(
+        fun({TopicFilter, SubOpts}, {MadeSubs, ChannelAcc}) ->
+            {Sub, Channel1} = do_subscribe(TopicFilter, SubOpts, ChannelAcc),
+            {MadeSubs ++ [Sub], Channel1}
+        end, {[], Channel}, parse_topic_filters(TopicFilters)),
+    {ok, MadeSubs, NChannel}.
 
 %% @private
 do_subscribe(TopicFilter, SubOpts, Channel =
@@ -445,17 +453,20 @@ do_subscribe(TopicFilter, SubOpts, Channel =
     NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
     NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
     SubId = maps:get(clientid, ClientInfo, undefined),
+    %% XXX: is_new?
     IsNew = not maps:is_key(NTopicFilter, Subs),
     case IsNew of
         true ->
             ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
             ok = emqx_hooks:run('session.subscribed',
                                 [ClientInfo, NTopicFilter, NSubOpts#{is_new => IsNew}]),
-            Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}};
+            {{NTopicFilter, NSubOpts},
+             Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}};
         _ ->
             %% Update subopts
             ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
-            Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}
+            {{NTopicFilter, NSubOpts},
+             Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}}
     end.
 
 do_unsubscribe(TopicFilters, Channel) ->

+ 147 - 25
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl

@@ -51,11 +51,26 @@
           clientinfo   :: emqx_types:clientinfo(),
           %% Session
           session      :: emqx_lwm2m_session:session() | undefined,
+          %% Channl State
+          %% TODO: is there need
+          conn_state   :: conn_state(),
           %% Timer
           timers       :: #{atom() => disable | undefined | reference()},
+          %% FIXME: don't store anonymouse func
           with_context :: function()
          }).
 
+-type channel() :: #channel{}.
+
+-type conn_state() :: idle | connecting | connected | disconnected.
+
+-type reply() :: {outgoing, coap_message()}
+               | {outgoing, [coap_message()]}
+               | {event, conn_state()|updated}
+               | {close, Reason :: atom()}.
+
+-type replies() :: reply() | [reply()].
+
 %% TODO:
 -define(DEFAULT_OVERRIDE,
         #{ clientid => <<"">>  %% Generate clientid by default
@@ -79,8 +94,8 @@ info(Keys, Channel) when is_list(Keys) ->
 
 info(conninfo, #channel{conninfo = ConnInfo}) ->
     ConnInfo;
-info(conn_state, _) ->
-    connected;
+info(conn_state, #channel{conn_state = ConnState}) ->
+    ConnState;
 info(clientinfo, #channel{clientinfo = ClientInfo}) ->
     ClientInfo;
 info(session, #channel{session = Session}) ->
@@ -125,15 +140,10 @@ init(ConnInfoT = #{peername := {PeerHost, _},
             , clientinfo = ClientInfo
             , timers = #{}
             , session = emqx_lwm2m_session:new()
-              %% FIXME: don't store anonymouse func
+            , conn_state = idle
             , with_context = with_context(Ctx, ClientInfo)
             }.
 
-with_context(Ctx, ClientInfo) ->
-    fun(Type, Topic) ->
-            with_context(Type, Topic, Ctx, ClientInfo)
-    end.
-
 lookup_cmd(Channel, Path, Action) ->
     gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}).
 
@@ -143,9 +153,15 @@ send_cmd(Channel, Cmd) ->
 %%--------------------------------------------------------------------
 %% Handle incoming packet
 %%--------------------------------------------------------------------
-handle_in(Msg, ChannleT) ->
-    Channel = update_life_timer(ChannleT),
-    call_session(handle_coap_in, Msg, Channel).
+
+-spec handle_in(coap_message() | {frame_error, any()}, channel())
+      -> {ok, channel()}
+       | {ok, replies(), channel()}
+       | {shutdown, Reason :: term(), channel()}
+       | {shutdown, Reason :: term(), replies(), channel()}.
+handle_in(Msg, Channle) ->
+    NChannel = update_life_timer(Channle),
+    call_session(handle_coap_in, Msg, NChannel).
 
 %%--------------------------------------------------------------------
 %% Handle Delivers from broker to client
@@ -174,7 +190,9 @@ handle_timeout(_, _, Channel) ->
 %%--------------------------------------------------------------------
 %% Handle call
 %%--------------------------------------------------------------------
-handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) ->
+
+handle_call({lookup_cmd, Path, Type}, _From,
+            Channel = #channel{session = Session}) ->
     Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session),
     {reply, {ok, Result}, Channel};
 
@@ -182,6 +200,66 @@ handle_call({send_cmd, Cmd}, _From, Channel) ->
     {ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel),
     {reply, ok, Outs, Channel2};
 
+handle_call({subscribe, Topic, SubOpts}, _From,
+            Channel = #channel{
+                         ctx = Ctx,
+                         clientinfo = ClientInfo
+                                    = #{clientid := ClientId,
+                                        mountpoint := Mountpoint},
+                         session = Session}) ->
+    NSubOpts = maps:merge(
+                 emqx_gateway_utils:default_subopts(),
+                 SubOpts),
+    MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
+    _ = emqx_broker:subscribe(MountedTopic, ClientId, NSubOpts),
+
+    _ = run_hooks(Ctx, 'session.subscribed',
+                  [ClientInfo, MountedTopic, NSubOpts]),
+    %% modifty session state
+    Subs = emqx_lwm2m_session:info(subscriptions, Session),
+    NSubs = maps:put(MountedTopic, NSubOpts, Subs),
+    NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session),
+    {reply, {ok, {MountedTopic, NSubOpts}}, Channel#channel{session = NSession}};
+
+handle_call({unsubscribe, Topic}, _From,
+            Channel = #channel{
+                         ctx = Ctx,
+                         clientinfo = ClientInfo
+                                    = #{mountpoint := Mountpoint},
+                         session = Session}) ->
+    MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
+    ok = emqx_broker:unsubscribe(MountedTopic),
+    _ = run_hooks(Ctx, 'session.unsubscribe',
+                  [ClientInfo, MountedTopic, #{}]),
+    %% modifty session state
+    Subs = emqx_lwm2m_session:info(subscriptions, Session),
+    NSubs = maps:remove(MountedTopic, Subs),
+    NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session),
+    {reply, ok, Channel#channel{session = NSession}};
+
+handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
+    Subs = maps:to_list(emqx_lwm2m_session:info(subscriptions, Session)),
+    {reply, {ok, Subs}, Channel};
+
+handle_call(kick, _From, Channel) ->
+    NChannel = ensure_disconnected(kicked, Channel),
+    shutdown_and_reply(kicked, ok, NChannel);
+
+handle_call(discard, _From, Channel) ->
+    shutdown_and_reply(discarded, ok, Channel);
+
+%% TODO: No Session Takeover
+%handle_call({takeover, 'begin'}, _From, Channel = #channel{session = Session}) ->
+%    reply(Session, Channel#channel{takeover = true});
+%
+%handle_call({takeover, 'end'}, _From, Channel = #channel{session  = Session,
+%                                                  pendings = Pendings}) ->
+%    ok = emqx_session:takeover(Session),
+%    %% TODO: Should not drain deliver here (side effect)
+%    Delivers = emqx_misc:drain_deliver(),
+%    AllPendings = lists:append(Delivers, Pendings),
+%    shutdown_and_reply(takenover, AllPendings, Channel);
+
 handle_call(Req, _From, Channel) ->
     ?SLOG(error, #{ msg => "unexpected_call"
                   , call => Req
@@ -223,6 +301,41 @@ terminate(Reason, #channel{ctx = Ctx,
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Ensure connected
+
+ensure_connected(Channel = #channel{
+                              ctx = Ctx,
+                              conninfo = ConnInfo,
+                              clientinfo = ClientInfo}) ->
+    _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]),
+
+    NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
+    ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
+    Channel#channel{
+      conninfo = NConnInfo,
+      conn_state = connected
+     }.
+
+%%--------------------------------------------------------------------
+%% Ensure disconnected
+
+ensure_disconnected(Reason, Channel = #channel{
+                                         ctx = Ctx,
+                                         conninfo = ConnInfo,
+                                         clientinfo = ClientInfo}) ->
+    NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
+    ok = run_hooks(Ctx, 'client.disconnected',
+                   [ClientInfo, Reason, NConnInfo]),
+    Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
+
+shutdown_and_reply(Reason, Reply, Channel) ->
+    {shutdown, Reason, Reply, Channel}.
+
+%shutdown_and_reply(Reason, Reply, OutPkt, Channel) ->
+%    {shutdown, Reason, Reply, OutPkt, Channel}.
+
 set_peercert_infos(NoSSL, ClientInfo)
   when NoSSL =:= nossl;
        NoSSL =:= undefined ->
@@ -319,6 +432,7 @@ enrich_clientinfo(#coap_message{options = Options} = Msg,
     Query = maps:get(uri_query, Options, #{}),
     case Query of
         #{<<"ep">> := Epn, <<"lt">> := Lifetime} ->
+            %% FIXME: the following keys is not belong standrad protocol
             Username = maps:get(<<"imei">>, Query, Epn),
             Password = maps:get(<<"password">>, Query, undefined),
             ClientId = maps:get(<<"device_id">>, Query, Epn),
@@ -363,13 +477,6 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
     Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
     {ok, ClientInfo#{mountpoint := Mountpoint1}}.
 
-ensure_connected(Channel = #channel{ctx = Ctx,
-                                    conninfo = ConnInfo,
-                                    clientinfo = ClientInfo}) ->
-    _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]),
-    ok = run_hooks(Ctx, 'client.connected', [ClientInfo, ConnInfo]),
-    Channel.
-
 process_connect(Channel = #channel{ctx = Ctx,
                                    session = Session,
                                    conninfo = ConnInfo,
@@ -418,29 +525,44 @@ gets([H | T], Map) ->
 gets([], Val) ->
     Val.
 
+%%--------------------------------------------------------------------
+%% With Context
+
+with_context(Ctx, ClientInfo) ->
+    fun(Type, Topic) ->
+        with_context(Type, Topic, Ctx, ClientInfo)
+    end.
+
 with_context(publish, [Topic, Msg], Ctx, ClientInfo) ->
     case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
         allow ->
-            emqx:publish(Msg);
+            _ = emqx_broker:publish(Msg),
+            ok;
         _ ->
             ?SLOG(error, #{ msg => "publish_denied"
                           , topic => Topic
-                          })
+                          }),
+            {error, deny}
     end;
 
-with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) ->
+with_context(subscribe, [Topic, Opts], Ctx, ClientInfo) ->
+    #{clientid := ClientId,
+      endpoint_name := EndpointName} = ClientInfo,
     case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
         allow ->
             run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]),
             ?SLOG(debug, #{ msg => "subscribe_topic_succeed"
                           , topic => Topic
-                          , endpoint_name => Username
+                          , clientid => ClientId
+                          , endpoint_name => EndpointName
                           }),
-            emqx:subscribe(Topic, Username, Opts);
+            emqx_broker:subscribe(Topic, ClientId, Opts),
+            ok;
         _ ->
             ?SLOG(error, #{ msg => "subscribe_denied"
                           , topic => Topic
-                          })
+                          }),
+            {error, deny}
     end;
 
 with_context(metrics, Name, Ctx, _ClientInfo) ->

+ 83 - 62
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl

@@ -25,9 +25,11 @@
 -export([ new/0, init/4, update/3, parse_object_list/1
         , reregister/3, on_close/1, find_cmd_record/3]).
 
+%% Info & Stats
 -export([ info/1
         , info/2
         , stats/1
+        , stats/2
         ]).
 
 -export([ handle_coap_in/3
@@ -37,12 +39,16 @@
         , send_cmd/3
         , set_reply/2]).
 
+%% froce update subscriptions
+-export([ set_subscriptions/2
+        ]).
+
 -export_type([session/0]).
 
 -type request_context() :: map().
 
 -type timestamp() :: non_neg_integer().
--type queued_request() :: {timestamp(), request_context(), emqx_coap_message()}.
+-type queued_request() :: {timestamp(), request_context(), coap_message()}.
 
 -type cmd_path() :: binary().
 -type cmd_type() :: binary().
@@ -66,6 +72,7 @@
                  , last_active_at :: non_neg_integer()
                  , created_at :: non_neg_integer()
                  , cmd_record :: cmd_record()
+                 , subscriptions :: map()
                  }).
 
 -type session() :: #session{}.
@@ -83,7 +90,9 @@
 -define(lwm2m_up_dm_topic,  {<<"/v1/up/dm">>, 0}).
 
 %% steal from emqx_session
--define(INFO_KEYS, [subscriptions,
+-define(INFO_KEYS, [id,
+                    is_persistent,
+                    subscriptions,
                     upgrade_qos,
                     retry_interval,
                     await_rel_timeout,
@@ -99,7 +108,8 @@
                      mqueue_dropped,
                      next_pkt_id,
                      awaiting_rel_cnt,
-                     awaiting_rel_max
+                     awaiting_rel_max,
+                     latency_stats
                     ]).
 
 -define(OUT_LIST_KEY, out_list).
@@ -118,9 +128,11 @@ new() ->
             , is_cache_mode = false
             , mountpoint = <<>>
             , cmd_record = #{queue => queue:new()}
-            , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
+            , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])
+            , subscriptions = #{}
+            }.
 
--spec init(emqx_coap_message(), binary(), function(), session()) -> map().
+-spec init(coap_message(), binary(), function(), session()) -> map().
 init(#coap_message{options = Opts,
                    payload = Payload} = Msg, MountPoint, WithContext, Session) ->
     Query = maps:get(uri_query, Opts),
@@ -152,7 +164,7 @@ update(Msg, WithContext, Session) ->
 on_close(Session) ->
     #{topic := Topic} = downlink_topic(),
     MountedTopic = mount(Topic, Session),
-    emqx:unsubscribe(MountedTopic),
+    emqx_broker:unsubscribe(MountedTopic),
     MountedTopic.
 
 -spec find_cmd_record(cmd_path(), cmd_type(), session()) -> cmd_result().
@@ -169,55 +181,56 @@ info(Session) ->
 info(Keys, Session) when is_list(Keys) ->
     [{Key, info(Key, Session)} || Key <- Keys];
 
+info(id, _) ->
+    undefined;
+info(is_persistent, _) ->
+    false;
+info(subscriptions, #session{subscriptions = Subs}) ->
+    Subs;
+info(upgrade_qos, _) ->
+    false;
+info(retry_interval, _) ->
+    0;
+info(await_rel_timeout, _) ->
+    infinity;
+info(created_at, #session{created_at = CreatedAt}) ->
+    CreatedAt;
+%% used for channel
 info(location_path, #session{location_path = Path}) ->
     Path;
-
 info(lifetime, #session{lifetime = LT}) ->
     LT;
-
 info(reg_info, #session{reg_info = RI}) ->
-    RI;
+    RI.
 
-info(subscriptions, _) ->
-    [];
-info(subscriptions_cnt, _) ->
-    0;
-info(subscriptions_max, _) ->
+-spec(stats(session()) -> emqx_types:stats()).
+stats(Session) -> stats(?STATS_KEYS, Session).
+
+stats(Keys, Session) when is_list(Keys) ->
+    [{Key, stats(Key, Session)} || Key <- Keys];
+
+stats(subscriptions_cnt, #session{subscriptions = Subs}) ->
+    maps:size(Subs);
+stats(subscriptions_max, _) ->
     infinity;
-info(upgrade_qos, _) ->
-    ?QOS_0;
-info(inflight, _) ->
-    emqx_inflight:new();
-info(inflight_cnt, _) ->
+stats(inflight_cnt, _) ->
     0;
-info(inflight_max, _) ->
+stats(inflight_max, _) ->
     0;
-info(retry_interval, _) ->
-    infinity;
-info(mqueue, _) ->
-    emqx_mqueue:init(#{max_len => 0, store_qos0 => false});
-info(mqueue_len, #session{queue = Queue}) ->
-    queue:len(Queue);
-info(mqueue_max, _) ->
+stats(mqueue_len, _) ->
     0;
-info(mqueue_dropped, _) ->
+stats(mqueue_max, _) ->
     0;
-info(next_pkt_id, _) ->
+stats(mqueue_dropped, _) ->
     0;
-info(awaiting_rel, _) ->
-    #{};
-info(awaiting_rel_cnt, _) ->
+stats(next_pkt_id, _) ->
     0;
-info(awaiting_rel_max, _) ->
-    infinity;
-info(await_rel_timeout, _) ->
+stats(awaiting_rel_cnt, _) ->
+    0;
+stats(awaiting_rel_max, _) ->
     infinity;
-info(created_at, #session{created_at = CreatedAt}) ->
-    CreatedAt.
-
-%% @doc Get stats of the session.
--spec(stats(session()) -> emqx_types:stats()).
-stats(Session) -> info(?STATS_KEYS, Session).
+stats(latency_stats, _) ->
+    #{}.
 
 %%--------------------------------------------------------------------
 %% API
@@ -242,6 +255,9 @@ set_reply(Msg, #session{coap = Coap} = Session) ->
 send_cmd(Cmd, _, Session) ->
     return(send_cmd_impl(Cmd, Session)).
 
+set_subscriptions(Subs, Session) ->
+    Session#session{subscriptions = Subs}.
+
 %%--------------------------------------------------------------------
 %% Protocol Stack
 %%--------------------------------------------------------------------
@@ -347,7 +363,7 @@ get_lifetime(#{<<"lt">> := _} = NewRegInfo, _) ->
 get_lifetime(_, OldRegInfo) ->
     get_lifetime(OldRegInfo).
 
--spec update(emqx_coap_message(), function(), binary(), session()) -> map().
+-spec update(coap_message(), function(), binary(), session()) -> map().
 update(#coap_message{options = Opts, payload = Payload} = Msg,
        WithContext,
        CmdType,
@@ -377,7 +393,10 @@ register_init(WithContext, #session{reg_info = RegInfo} = Session) ->
     %% - subscribe to the downlink_topic and wait for commands
     #{topic := Topic, qos := Qos} = downlink_topic(),
     MountedTopic = mount(Topic, Session),
-    Session3 = subscribe(MountedTopic, Qos, WithContext, Session2),
+    SubOpts = maps:merge(
+                emqx_gateway_utils:default_subopts(),
+                #{qos => Qos}),
+    Session3 = do_subscribe(MountedTopic, SubOpts, WithContext, Session2),
     Session4 = send_dl_msg(Session3),
 
     %% - report the registration info
@@ -387,22 +406,33 @@ register_init(WithContext, #session{reg_info = RegInfo} = Session) ->
 %%--------------------------------------------------------------------
 %% Subscribe
 %%--------------------------------------------------------------------
+
 proto_subscribe(WithContext, #session{wait_ack = WaitAck} = Session) ->
     #{topic := Topic, qos := Qos} = downlink_topic(),
     MountedTopic = mount(Topic, Session),
-    Session2 = case WaitAck of
+    SubOpts = maps:merge(
+                emqx_gateway_utils:default_subopts(),
+                #{qos => Qos}),
+    NSession = case WaitAck of
                    undefined ->
                        Session;
                    Ctx ->
-                       MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, <<"coap_timeout">>),
-                       send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, WithContext, Session)
+                       MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(
+                                       Ctx, <<"coap_timeout">>),
+                       send_to_mqtt(Ctx, <<"coap_timeout">>,
+                                    MqttPayload, WithContext, Session)
                end,
-    subscribe(MountedTopic, Qos, WithContext, Session2).
-
-subscribe(Topic, Qos, WithContext, Session) ->
-    Opts = get_sub_opts(Qos),
-    WithContext(subscribe, [Topic, Opts]),
-    Session.
+    do_subscribe(MountedTopic, SubOpts, WithContext, NSession).
+
+do_subscribe(Topic, SubOpts, WithContext,
+             Session = #session{subscriptions = Subs}) ->
+    case WithContext(subscribe, [Topic, SubOpts]) of
+        {error, _} ->
+            Session;
+        ok ->
+            NSubs = maps:put(Topic, SubOpts, Subs),
+            Session#session{subscriptions = NSubs}
+    end.
 
 send_auto_observe(RegInfo, Session) ->
     %% - auto observe the objects
@@ -449,15 +479,6 @@ deliver_auto_observe_to_coap(AlternatePath, TermData, Session) ->
     {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
     maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session).
 
-get_sub_opts(Qos) ->
-    #{
-      qos => Qos,
-      rap => 0,
-      nl => 0,
-      rh => 0,
-      is_new => false
-     }.
-
 is_auto_observe() ->
     emqx:get_config([gateway, lwm2m, auto_observe]).
 
@@ -609,7 +630,7 @@ proto_publish(Topic, Payload, Qos, Headers, WithContext,
     %% TODO: Append message metadata into headers
     Msg = emqx_message:make(Epn, Qos, MountedTopic,
                             emqx_json:encode(Payload), #{}, Headers),
-    WithContext(publish, [MountedTopic, Msg]),
+    _ = WithContext(publish, [MountedTopic, Msg]),
     Session.
 
 mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) ->

+ 3 - 3
apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

@@ -1194,8 +1194,8 @@ handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
                 2 ->
                     case do_subscribe({?SN_INVALID_TOPIC_ID,
                                        Topic, SubOpts}, Channel) of
-                        {ok, _, NChannel} ->
-                            reply(ok, NChannel);
+                        {ok, {_, NTopicName, NSubOpts}, NChannel} ->
+                            reply({ok, {NTopicName, NSubOpts}}, NChannel);
                         {error, ?SN_EXCEED_LIMITATION} ->
                             reply({error, exceed_limitation}, Channel)
                     end;
@@ -1214,7 +1214,7 @@ handle_call({unsubscribe, Topic}, _From, Channel) ->
     reply(ok, NChannel);
 
 handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
-    reply(maps:to_list(emqx_session:info(subscriptions, Session)), Channel);
+    reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel);
 
 handle_call(kick, _From, Channel) ->
     NChannel = ensure_disconnected(kicked, Channel),

+ 14 - 14
apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl

@@ -61,7 +61,7 @@
           session       :: undefined | map(),
           %% ClientInfo override specs
           clientinfo_override :: map(),
-          %% Connection Channel
+          %% Channel State
           conn_state    :: conn_state(),
           %% Heartbeat
           heartbeat     :: emqx_stomp_heartbeat:heartbeat(),
@@ -73,16 +73,16 @@
           transaction :: #{binary() => list()}
          }).
 
--type(channel() :: #channel{}).
+-type channel() :: #channel{}.
 
--type(conn_state() :: idle | connecting | connected | disconnected).
+-type conn_state() :: idle | connecting | connected | disconnected.
 
--type(reply() :: {outgoing, stomp_frame()}
+-type reply() :: {outgoing, stomp_frame()}
                | {outgoing, [stomp_frame()]}
                | {event, conn_state()|updated}
-               | {close, Reason :: atom()}).
+               | {close, Reason :: atom()}.
 
--type(replies() :: reply() | [reply()]).
+-type replies() :: reply() | [reply()].
 
 -define(TIMER_TABLE, #{
           incoming_timer => keepalive,
@@ -155,7 +155,7 @@ setting_peercert_infos(Peercert, ClientInfo) ->
 info(Channel) ->
     maps:from_list(info(?INFO_KEYS, Channel)).
 
--spec(info(list(atom())|atom(), channel()) -> term()).
+-spec info(list(atom())|atom(), channel()) -> term().
 info(Keys, Channel) when is_list(Keys) ->
     [{Key, info(Key, Channel)} || Key <- Keys];
 
@@ -174,7 +174,7 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
 info(ctx, #channel{ctx = Ctx}) ->
     Ctx.
 
--spec(stats(channel()) -> emqx_types:stats()).
+-spec stats(channel()) -> emqx_types:stats().
 stats(#channel{subscriptions = Subs}) ->
     [{subscriptions_cnt, length(Subs)}].
 
@@ -294,9 +294,9 @@ ensure_connected(Channel = #channel{
                               clientinfo = ClientInfo}) ->
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{conninfo = NConnInfo,
-                 conn_state = connected
-                }.
+    Channel#channel{
+      conninfo = NConnInfo,
+      conn_state = connected}.
 
 process_connect(Channel = #channel{
                            ctx = Ctx,
@@ -660,7 +660,7 @@ handle_call({subscribe, Topic, SubOpts}, _From,
                                                   ),
                     NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts}|Subs],
                     NChannel1 = NChannel#channel{subscriptions = NSubs},
-                    reply(ok, NChannel1);
+                    reply({ok, {MountedTopic, NSubOpts}}, NChannel1);
                 {error, ErrMsg, NChannel} ->
                     ?SLOG(error, #{ msg => "failed_to_subscribe_topic"
                                   , topic => Topic
@@ -688,11 +688,11 @@ handle_call({unsubscribe, Topic}, _From,
 
 %% Reply :: [{emqx_types:topic(), emqx_types:subopts()}]
 handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
-    Reply = lists:map(
+    NSubs = lists:map(
               fun({_SubId, Topic, _Ack, SubOpts}) ->
                 {Topic, SubOpts}
               end, Subs),
-    reply(Reply, Channel);
+    reply({ok, NSubs}, Channel);
 
 handle_call(kick, _From, Channel) ->
     NChannel = ensure_disconnected(kicked, Channel),

+ 1 - 1
apps/emqx_gateway/test/emqx_stomp_SUITE.erl

@@ -395,7 +395,7 @@ t_rest_clienit_info(_) ->
         ?assertEqual(1, length(Subs)),
         assert_feilds_apperence([topic, qos], lists:nth(1, Subs)),
 
-        {204, _} = request(post, ClientPath ++ "/subscriptions",
+        {201, _} = request(post, ClientPath ++ "/subscriptions",
                            #{topic => <<"t/a">>, qos => 1,
                              sub_props => #{subid => <<"1001">>}}),