Sfoglia il codice sorgente

Merge pull request #5660 from lafirest/fix/emqx_lwm2m

fix(emqx_lwm2m): fix some error and incomplete function
lafirest 4 anni fa
parent
commit
35cc0d972d

+ 9 - 6
apps/emqx_gateway/etc/emqx_gateway.conf

@@ -61,6 +61,9 @@ gateway.coap {
 
 
   heartbeat = 30s
   heartbeat = 30s
   notify_type = qos
   notify_type = qos
+
+  ## if true, you need to establish a connection before use
+  connection_required = false
   subscribe_qos = qos0
   subscribe_qos = qos0
   publish_qos = qos1
   publish_qos = qos1
 
 
@@ -128,7 +131,7 @@ gateway.lwm2m {
   enable_stats = true
   enable_stats = true
 
 
   ## When publishing or subscribing, prefix all topics with a mountpoint string.
   ## When publishing or subscribing, prefix all topics with a mountpoint string.
-  mountpoint = "lwm2m"
+  mountpoint = "lwm2m/%u"
 
 
   xml_dir = "{{ platform_etc_dir }}/lwm2m_xml"
   xml_dir = "{{ platform_etc_dir }}/lwm2m_xml"
 
 
@@ -143,27 +146,27 @@ gateway.lwm2m {
 
 
   translators {
   translators {
     command  {
     command  {
-      topic = "dn/#"
+      topic = "/dn/#"
       qos = 0
       qos = 0
     }
     }
 
 
     response {
     response {
-      topic = "up/resp"
+      topic = "/up/resp"
       qos = 0
       qos = 0
     }
     }
 
 
     notify {
     notify {
-      topic = "up/notify"
+      topic = "/up/notify"
       qos = 0
       qos = 0
     }
     }
 
 
     register {
     register {
-      topic = "up/resp"
+      topic = "/up/resp"
       qos = 0
       qos = 0
     }
     }
 
 
     update {
     update {
-      topic = "up/resp"
+      topic = "/up/resp"
       qos = 0
       qos = 0
     }
     }
   }
   }

+ 13 - 4
apps/emqx_gateway/src/coap/README.md

@@ -414,21 +414,30 @@ Server will return token **X** in payload
 
 
 2. Update Connection
 2. Update Connection
 ```
 ```
-coap-client -m put -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&username=admin&password=public&token=X"
+coap-client -m put -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&token=X"
 ```
 ```
 
 
 3. Publish
 3. Publish
 ```
 ```
-coap-client -m post -e "Hellow" "coap://127.0.0.1/ps/coap/test?clientid=123&username=admin&password=public"
+coap-client -m post -e "Hellow" "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&username=admin&password=public"
 ```
 ```
 if you want to publish with auth, you must first establish a connection, and then post publish request on the same socket, so libcoap client can't simulation publish with a token
 if you want to publish with auth, you must first establish a connection, and then post publish request on the same socket, so libcoap client can't simulation publish with a token
 
 
+```
+coap-client -m post -e "Hellow" "coap://127.0.0.1/ps/coap/test?clientid=123&token=X"
+```
+
 4. Subscribe
 4. Subscribe
 ```
 ```
 coap-client -m get -s 60 -O 6,0x00 -o - -T "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&username=admin&password=public"
 coap-client -m get -s 60 -O 6,0x00 -o - -T "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&username=admin&password=public"
 ```
 ```
+**Or**
 
 
+```
+coap-client -m get -s 60 -O 6,0x00 -o - -T "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&token=X"
+```
 5. Close Connection
 5. Close Connection
 ```
 ```
-coap-client -m delete -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&username=admin&password=public&token=X"
-```
+coap-client -m delete -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&token=X
+```
+

+ 10 - 6
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -55,6 +55,8 @@
                   %% Timer
                   %% Timer
                   timers :: #{atom() => disable | undefined | reference()},
                   timers :: #{atom() => disable | undefined | reference()},
 
 
+                  connection_required :: boolean(),
+
                   conn_state :: idle | connected,
                   conn_state :: idle | connected,
 
 
                   token :: binary() | undefined
                   token :: binary() | undefined
@@ -63,6 +65,8 @@
 -type channel() :: #channel{}.
 -type channel() :: #channel{}.
 -define(TOKEN_MAXIMUM, 4294967295).
 -define(TOKEN_MAXIMUM, 4294967295).
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
+-define(DEF_IDLE_TIME, timer:seconds(30)).
+-define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)).
 
 
 -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
 -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -110,13 +114,14 @@ init(ConnInfo = #{peername := {PeerHost, _},
                     }
                     }
                   ),
                   ),
 
 
-    Heartbeat = emqx:get_config([gateway, coap, idle_timeout]),
+    Heartbeat = ?GET_IDLE_TIME(Config),
     #channel{ ctx = Ctx
     #channel{ ctx = Ctx
             , conninfo = ConnInfo
             , conninfo = ConnInfo
             , clientinfo = ClientInfo
             , clientinfo = ClientInfo
             , timers = #{}
             , timers = #{}
             , session = emqx_coap_session:new()
             , session = emqx_coap_session:new()
             , keepalive = emqx_keepalive:init(Heartbeat)
             , keepalive = emqx_keepalive:init(Heartbeat)
+            , connection_required = maps:get(connection_required, Config, false)
             , conn_state = idle
             , conn_state = idle
             }.
             }.
 
 
@@ -216,13 +221,12 @@ make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) ->
 ensure_keepalive_timer(Channel) ->
 ensure_keepalive_timer(Channel) ->
     ensure_keepalive_timer(fun ensure_timer/4, Channel).
     ensure_keepalive_timer(fun ensure_timer/4, Channel).
 
 
-ensure_keepalive_timer(Fun, Channel) ->
-    Heartbeat = emqx:get_config([gateway, coap, idle_timeout]),
+ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) ->
+    Heartbeat = emqx_keepalive:info(interval, KeepAlive),
     Fun(keepalive, Heartbeat, keepalive, Channel).
     Fun(keepalive, Heartbeat, keepalive, Channel).
 
 
-check_auth_state(Msg, Channel) ->
-    Enable = emqx:get_config([gateway, coap, enable_stats]),
-    check_token(Enable, Msg, Channel).
+check_auth_state(Msg, #channel{connection_required = Required} = Channel) ->
+    check_token(Required, Msg, Channel).
 
 
 check_token(true,
 check_token(true,
             Msg,
             Msg,

+ 5 - 3
apps/emqx_gateway/src/coap/emqx_coap_medium.erl

@@ -29,13 +29,14 @@
         , reply/2, reply/3, reply/4]).
         , reply/2, reply/3, reply/4]).
 
 
 %%-type result() :: map() | empty.
 %%-type result() :: map() | empty.
--define(DEFINE_DEF(Name), Name(Msg) -> Name(Msg, #{})).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% API
 %% API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 empty() -> #{}.
 empty() -> #{}.
 
 
-?DEFINE_DEF(reset).
+reset(Msg) ->
+    reset(Msg, #{}).
 
 
 reset(Msg, Result) ->
 reset(Msg, Result) ->
     out(emqx_coap_message:reset(Msg), Result).
     out(emqx_coap_message:reset(Msg), Result).
@@ -49,7 +50,8 @@ out(Msg, #{out := Outs} = Result) ->
 out(Msg, Result) ->
 out(Msg, Result) ->
     Result#{out => [Msg]}.
     Result#{out => [Msg]}.
 
 
-?DEFINE_DEF(proto_out).
+proto_out(Proto) ->
+    proto_out(Proto, #{}).
 
 
 proto_out(Proto, Resut) ->
 proto_out(Proto, Resut) ->
     Resut#{proto => Proto}.
     Resut#{proto => Proto}.

+ 1 - 0
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -83,6 +83,7 @@ fields(mqttsn_predefined) ->
 
 
 fields(coap_structs) ->
 fields(coap_structs) ->
     [ {heartbeat, sc(duration(), undefined, <<"30s">>)}
     [ {heartbeat, sc(duration(), undefined, <<"30s">>)}
+    , {connection_required, sc(boolean(), undefined, false)}
     , {notify_type, sc(union([non, con, qos]), undefined, qos)}
     , {notify_type, sc(union([non, con, qos]), undefined, qos)}
     , {subscribe_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)}
     , {subscribe_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)}
     , {publish_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)}
     , {publish_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)}

+ 57 - 35
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl

@@ -24,8 +24,7 @@
 -export([ info/1
 -export([ info/1
         , info/2
         , info/2
         , stats/1
         , stats/1
-        , validator/2
-        , validator/4
+        , with_context/2
         , do_takeover/3]).
         , do_takeover/3]).
 
 
 -export([ init/2
 -export([ init/2
@@ -53,11 +52,10 @@
                   %% Timer
                   %% Timer
                   timers :: #{atom() => disable | undefined | reference()},
                   timers :: #{atom() => disable | undefined | reference()},
 
 
-                  validator :: function()
+                  with_context :: function()
                  }).
                  }).
 
 
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
-
 -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
 -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -109,16 +107,13 @@ init(ConnInfo = #{peername := {PeerHost, _},
             , clientinfo = ClientInfo
             , clientinfo = ClientInfo
             , timers = #{}
             , timers = #{}
             , session = emqx_lwm2m_session:new()
             , session = emqx_lwm2m_session:new()
-            , validator = validator(Ctx, ClientInfo)
+            , with_context = with_context(Ctx, ClientInfo)
             }.
             }.
 
 
-validator(_Type, _Topic, _Ctx, _ClientInfo) ->
-    allow.
-                                                %emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic).
 
 
-validator(Ctx, ClientInfo) ->
+with_context(Ctx, ClientInfo) ->
     fun(Type, Topic) ->
     fun(Type, Topic) ->
-            validator(Type, Topic, Ctx, ClientInfo)
+            with_context(Type, Topic, Ctx, ClientInfo)
     end.
     end.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -137,7 +132,10 @@ handle_deliver(Delivers, Channel) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle timeout
 %% Handle timeout
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-handle_timeout(_, lifetime, Channel) ->
+handle_timeout(_, lifetime, #channel{ctx = Ctx,
+                                     clientinfo = ClientInfo,
+                                     conninfo = ConnInfo} = Channel) ->
+    ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, timeout, ConnInfo]),
     {shutdown, timeout, Channel};
     {shutdown, timeout, Channel};
 
 
 handle_timeout(_, {transport, _} = Msg, Channel) ->
 handle_timeout(_, {transport, _} = Msg, Channel) ->
@@ -166,6 +164,10 @@ handle_cast(Req, Channel) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle Info
 %% Handle Info
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
+handle_info({subscribe, _AutoSubs}, Channel) ->
+    %% not need handle this message
+    {ok, Channel};
+
 handle_info(Info, Channel) ->
 handle_info(Info, Channel) ->
     ?LOG(error, "Unexpected info: ~p", [Info]),
     ?LOG(error, "Unexpected info: ~p", [Info]),
     {ok, Channel}.
     {ok, Channel}.
@@ -173,8 +175,12 @@ handle_info(Info, Channel) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Terminate
 %% Terminate
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-terminate(_Reason, #channel{session = Session}) ->
-    emqx_lwm2m_session:on_close(Session).
+terminate(Reason, #channel{ctx = Ctx,
+                           clientinfo = ClientInfo,
+                           session = Session}) ->
+    MountedTopic = emqx_lwm2m_session:on_close(Session),
+    _ = run_hooks(Ctx, 'session.unsubscribe', [ClientInfo, MountedTopic, #{}]),
+    run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
@@ -220,12 +226,12 @@ do_connect(Req, Result, Channel, Iter) ->
            Req,
            Req,
            Channel) of
            Channel) of
         {ok, _Input, #channel{session = Session,
         {ok, _Input, #channel{session = Session,
-                              validator = Validator} = NChannel} ->
+                              with_context = WithContext} = NChannel} ->
             case emqx_lwm2m_session:info(reg_info, Session) of
             case emqx_lwm2m_session:info(reg_info, Session) of
                 undefined ->
                 undefined ->
                     process_connect(ensure_connected(NChannel), Req, Result, Iter);
                     process_connect(ensure_connected(NChannel), Req, Result, Iter);
                 _ ->
                 _ ->
-                    NewResult = emqx_lwm2m_session:reregister(Req, Validator, Session),
+                    NewResult = emqx_lwm2m_session:reregister(Req, WithContext, Session),
                     iter(Iter, maps:merge(Result, NewResult), NChannel)
                     iter(Iter, maps:merge(Result, NewResult), NChannel)
             end;
             end;
         {error, ReasonCode, NChannel} ->
         {error, ReasonCode, NChannel} ->
@@ -251,7 +257,7 @@ check_lwm2m_version(#coap_message{options = Opts},
               end,
               end,
     if IsValid ->
     if IsValid ->
             NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond)
             NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond)
-                                 , proto_name => <<"lwm2m">>
+                                 , proto_name => <<"LwM2M">>
                                  , proto_ver => Ver
                                  , proto_ver => Ver
                                  },
                                  },
             {ok, Channel#channel{conninfo = NConnInfo}};
             {ok, Channel#channel{conninfo = NConnInfo}};
@@ -274,7 +280,7 @@ enrich_clientinfo(#coap_message{options = Options} = Msg,
     Query = maps:get(uri_query, Options, #{}),
     Query = maps:get(uri_query, Options, #{}),
     case Query of
     case Query of
         #{<<"ep">> := Epn} ->
         #{<<"ep">> := Epn} ->
-            UserName = maps:get(<<"imei">>, Query, undefined),
+            UserName = maps:get(<<"imei">>, Query, Epn),
             Password = maps:get(<<"password">>, Query, undefined),
             Password = maps:get(<<"password">>, Query, undefined),
             ClientId = maps:get(<<"device_id">>, Query, Epn),
             ClientId = maps:get(<<"device_id">>, Query, Epn),
             ClientInfo =
             ClientInfo =
@@ -298,7 +304,7 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx,
     case emqx_gateway_ctx:authenticate(Ctx, ClientInfo) of
     case emqx_gateway_ctx:authenticate(Ctx, ClientInfo) of
         {ok, NClientInfo} ->
         {ok, NClientInfo} ->
             {ok, Channel#channel{clientinfo = NClientInfo,
             {ok, Channel#channel{clientinfo = NClientInfo,
-                                 validator = validator(Ctx, ClientInfo)}};
+                                 with_context = with_context(Ctx, ClientInfo)}};
         {error, Reason} ->
         {error, Reason} ->
             ?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
             ?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
                  [ClientId, Username, Reason]),
                  [ClientId, Username, Reason]),
@@ -308,14 +314,13 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx,
 fix_mountpoint(_Packet, #{mountpoint := undefined} = ClientInfo) ->
 fix_mountpoint(_Packet, #{mountpoint := undefined} = ClientInfo) ->
     {ok, ClientInfo};
     {ok, ClientInfo};
 fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
 fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
-    %% TODO: Enrich the varibale replacement????
-    %%       i.e: ${ClientInfo.auth_result.productKey}
     Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
     Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
     {ok, ClientInfo#{mountpoint := Mountpoint1}}.
     {ok, ClientInfo#{mountpoint := Mountpoint1}}.
 
 
 ensure_connected(Channel = #channel{ctx = Ctx,
 ensure_connected(Channel = #channel{ctx = Ctx,
                                     conninfo = ConnInfo,
                                     conninfo = ConnInfo,
                                     clientinfo = ClientInfo}) ->
                                     clientinfo = ClientInfo}) ->
+    _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]),
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, ConnInfo]),
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, ConnInfo]),
     Channel.
     Channel.
 
 
@@ -323,7 +328,7 @@ process_connect(Channel = #channel{ctx = Ctx,
                                    session = Session,
                                    session = Session,
                                    conninfo = ConnInfo,
                                    conninfo = ConnInfo,
                                    clientinfo = ClientInfo,
                                    clientinfo = ClientInfo,
-                                   validator = Validator},
+                                   with_context = WithContext},
                 Msg, Result, Iter) ->
                 Msg, Result, Iter) ->
     %% inherit the old session
     %% inherit the old session
     SessFun = fun(_,_) -> #{} end,
     SessFun = fun(_,_) -> #{} end,
@@ -336,7 +341,8 @@ process_connect(Channel = #channel{ctx = Ctx,
            emqx_lwm2m_session
            emqx_lwm2m_session
           ) of
           ) of
         {ok, _} ->
         {ok, _} ->
-            NewResult = emqx_lwm2m_session:init(Msg, Validator, Session),
+            Mountpoint = maps:get(mountpoint, ClientInfo, <<>>),
+            NewResult = emqx_lwm2m_session:init(Msg, Mountpoint, WithContext, Session),
             iter(Iter, maps:merge(Result, NewResult), Channel);
             iter(Iter, maps:merge(Result, NewResult), Channel);
         {error, Reason} ->
         {error, Reason} ->
             ?LOG(error, "Failed to open session du to ~p", [Reason]),
             ?LOG(error, "Failed to open session du to ~p", [Reason]),
@@ -358,13 +364,34 @@ gets([H | T], Map) ->
 gets([], Val) ->
 gets([], Val) ->
     Val.
     Val.
 
 
+with_context(publish, [Topic, Msg], Ctx, ClientInfo) ->
+    case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
+        allow ->
+            emqx:publish(Msg);
+        _ ->
+            ?LOG(error, "topic:~p not allow to publish ", [Topic])
+    end;
+
+with_context(subscribe, [Topic, Opts], Ctx, #{username := UserName} = ClientInfo) ->
+    case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
+        allow ->
+            run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, UserName]),
+            ?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, UserName]),
+            emqx:subscribe(Topic, UserName, Opts);
+        _ ->
+            ?LOG(error, "Topic: ~0p not allow to subscribe", [Topic])
+    end;
+
+with_context(metrics, Name, Ctx, _ClientInfo) ->
+    emqx_gateway_ctx:metrics_inc(Ctx, Name).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Call Chain
 %% Call Chain
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 call_session(Fun,
 call_session(Fun,
              Msg,
              Msg,
              #channel{session = Session,
              #channel{session = Session,
-                      validator = Validator} = Channel) ->
+                      with_context = WithContext} = Channel) ->
     iter([ session, fun process_session/4
     iter([ session, fun process_session/4
          , proto, fun process_protocol/4
          , proto, fun process_protocol/4
          , return, fun process_return/4
          , return, fun process_return/4
@@ -373,7 +400,7 @@ call_session(Fun,
          , out, fun process_out/4
          , out, fun process_out/4
          , fun process_nothing/3
          , fun process_nothing/3
          ],
          ],
-         emqx_lwm2m_session:Fun(Msg, Validator, Session),
+         emqx_lwm2m_session:Fun(Msg, WithContext, Session),
          Channel).
          Channel).
 
 
 process_session(Session, Result, Channel, Iter) ->
 process_session(Session, Result, Channel, Iter) ->
@@ -384,8 +411,8 @@ process_protocol({request, Msg}, Result, Channel, Iter) ->
     handle_request_protocol(Method, Msg, Result, Channel, Iter);
     handle_request_protocol(Method, Msg, Result, Channel, Iter);
 
 
 process_protocol(Msg, Result,
 process_protocol(Msg, Result,
-                 #channel{validator = Validator, session = Session} = Channel, Iter) ->
-    ProtoResult = emqx_lwm2m_session:handle_protocol_in(Msg, Validator, Session),
+                 #channel{with_context = WithContext, session = Session} = Channel, Iter) ->
+    ProtoResult = emqx_lwm2m_session:handle_protocol_in(Msg, WithContext, Session),
     iter(Iter, maps:merge(Result, ProtoResult), Channel).
     iter(Iter, maps:merge(Result, ProtoResult), Channel).
 
 
 handle_request_protocol(post, #coap_message{options = Opts} = Msg,
 handle_request_protocol(post, #coap_message{options = Opts} = Msg,
@@ -415,10 +442,10 @@ handle_request_protocol(delete, #coap_message{options = Opts} = Msg,
     end.
     end.
 
 
 do_update(Location, Msg, Result,
 do_update(Location, Msg, Result,
-          #channel{session = Session, validator = Validator} = Channel, Iter) ->
+          #channel{session = Session, with_context = WithContext} = Channel, Iter) ->
     case check_location(Location, Channel) of
     case check_location(Location, Channel) of
         true ->
         true ->
-            NewResult = emqx_lwm2m_session:update(Msg, Validator, Session),
+            NewResult = emqx_lwm2m_session:update(Msg, WithContext, Session),
             iter(Iter, maps:merge(Result, NewResult), Channel);
             iter(Iter, maps:merge(Result, NewResult), Channel);
         _ ->
         _ ->
             iter(Iter, reply({error, not_found}, Msg, Result), Channel)
             iter(Iter, reply({error, not_found}, Msg, Result), Channel)
@@ -438,13 +465,8 @@ process_out(Outs, Result, Channel, _) ->
                 Reply ->
                 Reply ->
                     [Reply | Outs2]
                     [Reply | Outs2]
             end,
             end,
-    %% emqx_gateway_conn bug, work around
-    case Outs3 of
-        [] ->
-            {ok, Channel};
-        _ ->
-            {ok, {outgoing, Outs3}, Channel}
-    end.
+
+    {ok, {outgoing, Outs3}, Channel}.
 
 
 process_reply(Reply, Result, #channel{session = Session} = Channel, _) ->
 process_reply(Reply, Result, #channel{session = Session} = Channel, _) ->
     Session2 = emqx_lwm2m_session:set_reply(Reply, Session),
     Session2 = emqx_lwm2m_session:set_reply(Reply, Session),

+ 14 - 14
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl

@@ -30,20 +30,20 @@
 
 
 -define(STANDARD, 1).
 -define(STANDARD, 1).
 
 
-%-type msg_type() :: <<"create">>
-%                  | <<"delete">>
-%                  | <<"read">>
-%                  | <<"write">>
-%                  | <<"execute">>
-%                  | <<"discover">>
-%                  | <<"write-attr">>
-%                  | <<"observe">>
-%                  | <<"cancel-observe">>.
-%
-                                                %-type cmd() :: #{ <<"msgType">> := msg_type()
-                                                %                , <<"data">> := maps()
-                                                %                %% more keys?
-                                                %                }.
+%%-type msg_type() :: <<"create">>
+%%                  | <<"delete">>
+%%                  | <<"read">>
+%%                  | <<"write">>
+%%                  | <<"execute">>
+%%                  | <<"discover">>
+%%                  | <<"write-attr">>
+%%                  | <<"observe">>
+%%                  | <<"cancel-observe">>.
+%%
+%%-type cmd() :: #{ <<"msgType">> := msg_type()
+%%                , <<"data">> := maps()
+%%                %%%% more keys?
+%%                }.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% APIs
 %% APIs

+ 93 - 172
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl

@@ -22,7 +22,7 @@
 -include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl").
 -include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl").
 
 
 %% API
 %% API
--export([new/0, init/3, update/3, reregister/3, on_close/1]).
+-export([new/0, init/4, update/3, reregister/3, on_close/1]).
 
 
 -export([ info/1
 -export([ info/1
         , info/2
         , info/2
@@ -47,9 +47,10 @@
                  , wait_ack :: request_context() | undefined
                  , wait_ack :: request_context() | undefined
                  , endpoint_name :: binary() | undefined
                  , endpoint_name :: binary() | undefined
                  , location_path :: list(binary()) | undefined
                  , location_path :: list(binary()) | undefined
-                 , headers :: map() | undefined
                  , reg_info :: map() | undefined
                  , reg_info :: map() | undefined
                  , lifetime :: non_neg_integer() | undefined
                  , lifetime :: non_neg_integer() | undefined
+                 , is_cache_mode :: boolean()
+                 , mountpoint :: binary()
                  , last_active_at :: non_neg_integer()
                  , last_active_at :: non_neg_integer()
                  }).
                  }).
 
 
@@ -61,7 +62,7 @@
                         <<"7">>, <<"9">>, <<"15">>]).
                         <<"7">>, <<"9">>, <<"15">>]).
 
 
 %% uplink and downlink topic configuration
 %% uplink and downlink topic configuration
--define(lwm2m_up_dm_topic,  {<<"v1/up/dm">>, 0}).
+-define(lwm2m_up_dm_topic,  {<<"/v1/up/dm">>, 0}).
 
 
 %% steal from emqx_session
 %% steal from emqx_session
 -define(INFO_KEYS, [subscriptions,
 -define(INFO_KEYS, [subscriptions,
@@ -95,41 +96,44 @@ new() ->
     #session{ coap = emqx_coap_tm:new()
     #session{ coap = emqx_coap_tm:new()
             , queue = queue:new()
             , queue = queue:new()
             , last_active_at = ?NOW
             , last_active_at = ?NOW
+            , is_cache_mode = false
+            , mountpoint = <<>>
             , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
             , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
 
 
--spec init(emqx_coap_message(), function(), session()) -> map().
-init(#coap_message{options = Opts, payload = Payload} = Msg, Validator, Session) ->
+-spec init(emqx_coap_message(), binary(), function(), session()) -> map().
+init(#coap_message{options = Opts,
+                   payload = Payload} = Msg, MountPoint, WithContext, Session) ->
     Query = maps:get(uri_query, Opts),
     Query = maps:get(uri_query, Opts),
     RegInfo = append_object_list(Query, Payload),
     RegInfo = append_object_list(Query, Payload),
-    Headers = get_headers(RegInfo),
     LifeTime = get_lifetime(RegInfo),
     LifeTime = get_lifetime(RegInfo),
     Epn = maps:get(<<"ep">>, Query),
     Epn = maps:get(<<"ep">>, Query),
     Location = [?PREFIX, Epn],
     Location = [?PREFIX, Epn],
 
 
-    Result = return(register_init(Validator,
-                                  Session#session{headers = Headers,
-                                                  endpoint_name = Epn,
-                                                  location_path = Location,
-                                                  reg_info = RegInfo,
-                                                  lifetime = LifeTime,
-                                                  queue = queue:new()})),
+    NewSession = Session#session{endpoint_name = Epn,
+                                 location_path = Location,
+                                 reg_info = RegInfo,
+                                 lifetime = LifeTime,
+                                 mountpoint = MountPoint,
+                                 is_cache_mode = is_psm(RegInfo) orelse is_qmode(RegInfo),
+                                 queue = queue:new()},
 
 
+    Result = return(register_init(WithContext, NewSession)),
     Reply = emqx_coap_message:piggyback({ok, created}, Msg),
     Reply = emqx_coap_message:piggyback({ok, created}, Msg),
     Reply2 = emqx_coap_message:set(location_path, Location, Reply),
     Reply2 = emqx_coap_message:set(location_path, Location, Reply),
     reply(Reply2, Result#{lifetime => true}).
     reply(Reply2, Result#{lifetime => true}).
 
 
-reregister(Msg, Validator, Session) ->
-    update(Msg, Validator, <<"register">>, Session).
+reregister(Msg, WithContext, Session) ->
+    update(Msg, WithContext, <<"register">>, Session).
 
 
-update(Msg, Validator, Session) ->
-    update(Msg, Validator, <<"update">>, Session).
+update(Msg, WithContext, Session) ->
+    update(Msg, WithContext, <<"update">>, Session).
 
 
--spec on_close(session()) -> ok.
-on_close(#session{endpoint_name = Epn}) ->
+-spec on_close(session()) -> binary().
+on_close(Session) ->
     #{topic := Topic} = downlink_topic(),
     #{topic := Topic} = downlink_topic(),
-    MountedTopic = mount(Topic, mountpoint(Epn)),
+    MountedTopic = mount(Topic, Session),
     emqx:unsubscribe(MountedTopic),
     emqx:unsubscribe(MountedTopic),
-    ok.
+    MountedTopic.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Info, Stats
 %% Info, Stats
@@ -194,15 +198,15 @@ stats(Session) -> info(?STATS_KEYS, Session).
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% API
 %% API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-handle_coap_in(Msg, _Validator, Session) ->
+handle_coap_in(Msg, _WithContext, Session) ->
     call_coap(case emqx_coap_message:is_request(Msg) of
     call_coap(case emqx_coap_message:is_request(Msg) of
                   true -> handle_request;
                   true -> handle_request;
                   _ -> handle_response
                   _ -> handle_response
               end,
               end,
               Msg, Session#session{last_active_at = ?NOW}).
               Msg, Session#session{last_active_at = ?NOW}).
 
 
-handle_deliver(Delivers, _Validator, Session) ->
-    return(deliver(Delivers, Session)).
+handle_deliver(Delivers, WithContext, Session) ->
+    return(deliver(Delivers, WithContext, Session)).
 
 
 timeout({transport, Msg}, _, Session) ->
 timeout({transport, Msg}, _, Session) ->
     call_coap(timeout, Msg, Session).
     call_coap(timeout, Msg, Session).
@@ -214,17 +218,17 @@ set_reply(Msg, #session{coap = Coap} = Session) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Protocol Stack
 %% Protocol Stack
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-handle_protocol_in({response, CtxMsg}, Validator, Session) ->
-    return(handle_coap_response(CtxMsg, Validator, Session));
+handle_protocol_in({response, CtxMsg}, WithContext, Session) ->
+    return(handle_coap_response(CtxMsg, WithContext, Session));
 
 
-handle_protocol_in({ack, CtxMsg}, Validator, Session) ->
-    return(handle_ack(CtxMsg, Validator, Session));
+handle_protocol_in({ack, CtxMsg}, WithContext, Session) ->
+    return(handle_ack(CtxMsg, WithContext, Session));
 
 
-handle_protocol_in({ack_failure, CtxMsg}, Validator, Session) ->
-    return(handle_ack_failure(CtxMsg, Validator, Session));
+handle_protocol_in({ack_failure, CtxMsg}, WithContext, Session) ->
+    return(handle_ack_failure(CtxMsg, WithContext, Session));
 
 
-handle_protocol_in({reset, CtxMsg}, Validator, Session) ->
-    return(handle_ack_reset(CtxMsg, Validator, Session)).
+handle_protocol_in({reset, CtxMsg}, WithContext, Session) ->
+    return(handle_ack_reset(CtxMsg, WithContext, Session)).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Register
 %% Register
@@ -302,50 +306,6 @@ delink(Str) ->
     Ltrim = binary_util:ltrim(Str, $<),
     Ltrim = binary_util:ltrim(Str, $<),
     binary_util:rtrim(Ltrim, $>).
     binary_util:rtrim(Ltrim, $>).
 
 
-get_headers(RegInfo) ->
-    lists:foldl(fun(K, Acc) ->
-                        get_header(K, RegInfo, Acc)
-                end,
-                extract_module_params(RegInfo),
-                [<<"apn">>, <<"im">>, <<"ct">>, <<"mv">>, <<"mt">>]).
-
-get_header(Key, RegInfo, Headers) ->
-    case maps:get(Key, RegInfo, undefined) of
-        undefined ->
-            Headers;
-        Val ->
-            AtomKey = erlang:binary_to_atom(Key),
-            Headers#{AtomKey => Val}
-    end.
-
-extract_module_params(RegInfo) ->
-    Keys = [<<"module">>, <<"sv">>, <<"chip">>, <<"imsi">>, <<"iccid">>],
-    case lists:any(fun(K) -> maps:get(K, RegInfo, undefined) =:= undefined end, Keys) of
-        true -> #{module_params => undefined};
-        false ->
-            Extras = [<<"rsrp">>, <<"sinr">>, <<"txpower">>, <<"cellid">>],
-            case lists:any(fun(K) -> maps:get(K, RegInfo, undefined) =:= undefined end, Extras) of
-                true ->
-                    #{module_params =>
-                          #{module => maps:get(<<"module">>, RegInfo),
-                            softversion => maps:get(<<"sv">>, RegInfo),
-                            chiptype => maps:get(<<"chip">>, RegInfo),
-                            imsi => maps:get(<<"imsi">>, RegInfo),
-                            iccid => maps:get(<<"iccid">>, RegInfo)}};
-                false ->
-                    #{module_params =>
-                          #{module => maps:get(<<"module">>, RegInfo),
-                            softversion => maps:get(<<"sv">>, RegInfo),
-                            chiptype => maps:get(<<"chip">>, RegInfo),
-                            imsi => maps:get(<<"imsi">>, RegInfo),
-                            iccid => maps:get(<<"iccid">>, RegInfo),
-                            rsrp => maps:get(<<"rsrp">>, RegInfo),
-                            sinr => maps:get(<<"sinr">>, RegInfo),
-                            txpower => maps:get(<<"txpower">>, RegInfo),
-                            cellid => maps:get(<<"cellid">>, RegInfo)}}
-            end
-    end.
-
 get_lifetime(#{<<"lt">> := LT}) ->
 get_lifetime(#{<<"lt">> := LT}) ->
     case LT of
     case LT of
         0 -> emqx:get_config([gateway, lwm2m, lifetime_max]);
         0 -> emqx:get_config([gateway, lwm2m, lifetime_max]);
@@ -362,7 +322,7 @@ get_lifetime(_, OldRegInfo) ->
 
 
 -spec update(emqx_coap_message(), function(), binary(), session()) -> map().
 -spec update(emqx_coap_message(), function(), binary(), session()) -> map().
 update(#coap_message{options = Opts, payload = Payload} = Msg,
 update(#coap_message{options = Opts, payload = Payload} = Msg,
-       Validator,
+       WithContext,
        CmdType,
        CmdType,
        #session{reg_info = OldRegInfo} = Session) ->
        #session{reg_info = OldRegInfo} = Session) ->
     Query = maps:get(uri_query, Opts),
     Query = maps:get(uri_query, Opts),
@@ -370,58 +330,51 @@ update(#coap_message{options = Opts, payload = Payload} = Msg,
     UpdateRegInfo = maps:merge(OldRegInfo, RegInfo),
     UpdateRegInfo = maps:merge(OldRegInfo, RegInfo),
     LifeTime = get_lifetime(UpdateRegInfo, OldRegInfo),
     LifeTime = get_lifetime(UpdateRegInfo, OldRegInfo),
 
 
-    Session2 = proto_subscribe(Validator,
-                               Session#session{reg_info = UpdateRegInfo,
-                                               lifetime = LifeTime}),
+    NewSession = Session#session{reg_info = UpdateRegInfo,
+                                 is_cache_mode =
+                                     is_psm(UpdateRegInfo) orelse is_qmode(UpdateRegInfo),
+                                 lifetime = LifeTime},
+
+    Session2 = proto_subscribe(WithContext, NewSession),
     Session3 = send_dl_msg(Session2),
     Session3 = send_dl_msg(Session2),
     RegPayload = #{<<"data">> => UpdateRegInfo},
     RegPayload = #{<<"data">> => UpdateRegInfo},
-    Session4 = send_to_mqtt(#{}, CmdType, RegPayload, Validator, Session3),
+    Session4 = send_to_mqtt(#{}, CmdType, RegPayload, WithContext, Session3),
 
 
     Result = return(Session4),
     Result = return(Session4),
 
 
     Reply = emqx_coap_message:piggyback({ok, changed}, Msg),
     Reply = emqx_coap_message:piggyback({ok, changed}, Msg),
     reply(Reply, Result#{lifetime => true}).
     reply(Reply, Result#{lifetime => true}).
 
 
-register_init(Validator, #session{reg_info = RegInfo,
-                                  endpoint_name = Epn} = Session) ->
-
+register_init(WithContext, #session{reg_info = RegInfo} = Session) ->
     Session2 = send_auto_observe(RegInfo, Session),
     Session2 = send_auto_observe(RegInfo, Session),
     %% - subscribe to the downlink_topic and wait for commands
     %% - subscribe to the downlink_topic and wait for commands
     #{topic := Topic, qos := Qos} = downlink_topic(),
     #{topic := Topic, qos := Qos} = downlink_topic(),
-    MountedTopic = mount(Topic, mountpoint(Epn)),
-    Session3 = subscribe(MountedTopic, Qos, Validator, Session2),
+    MountedTopic = mount(Topic, Session),
+    Session3 = subscribe(MountedTopic, Qos, WithContext, Session2),
     Session4 = send_dl_msg(Session3),
     Session4 = send_dl_msg(Session3),
 
 
     %% - report the registration info
     %% - report the registration info
     RegPayload = #{<<"data">> => RegInfo},
     RegPayload = #{<<"data">> => RegInfo},
-    send_to_mqtt(#{}, <<"register">>, RegPayload, Validator, Session4).
+    send_to_mqtt(#{}, <<"register">>, RegPayload, WithContext, Session4).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Subscribe
 %% Subscribe
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-proto_subscribe(Validator, #session{endpoint_name = Epn, wait_ack = WaitAck} = Session) ->
+proto_subscribe(WithContext, #session{wait_ack = WaitAck} = Session) ->
     #{topic := Topic, qos := Qos} = downlink_topic(),
     #{topic := Topic, qos := Qos} = downlink_topic(),
-    MountedTopic = mount(Topic, mountpoint(Epn)),
+    MountedTopic = mount(Topic, Session),
     Session2 = case WaitAck of
     Session2 = case WaitAck of
                    undefined ->
                    undefined ->
                        Session;
                        Session;
                    Ctx ->
                    Ctx ->
                        MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, <<"coap_timeout">>),
                        MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, <<"coap_timeout">>),
-                       send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, Validator, Session)
+                       send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, WithContext, Session)
                end,
                end,
-    subscribe(MountedTopic, Qos, Validator, Session2).
-
-subscribe(Topic, Qos, Validator,
-          #session{headers = Headers, endpoint_name = EndpointName} = Session) ->
-    case Validator(subscribe, Topic) of
-        allow ->
-            ClientId = maps:get(device_id, Headers, undefined),
-            Opts = get_sub_opts(Qos),
-            ?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, EndpointName]),
-            emqx:subscribe(Topic, ClientId, Opts);
-        _ ->
-            ?LOG(error, "Topic: ~0p not allow to subscribe", [Topic])
-    end,
+    subscribe(MountedTopic, Qos, WithContext, Session2).
+
+subscribe(Topic, Qos, WithContext, Session) ->
+    Opts = get_sub_opts(Qos),
+    WithContext(subscribe, [Topic, Opts]),
     Session.
     Session.
 
 
 send_auto_observe(RegInfo, Session) ->
 send_auto_observe(RegInfo, Session) ->
@@ -486,7 +439,7 @@ handle_coap_response({Ctx = #{<<"msgType">> := EventType},
                                     type = CoapMsgType,
                                     type = CoapMsgType,
                                     payload = CoapMsgPayload,
                                     payload = CoapMsgPayload,
                                     options = CoapMsgOpts}},
                                     options = CoapMsgOpts}},
-                     Validator,
+                     WithContext,
                      Session) ->
                      Session) ->
     MqttPayload = emqx_lwm2m_cmd:coap_to_mqtt(CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Ctx),
     MqttPayload = emqx_lwm2m_cmd:coap_to_mqtt(CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Ctx),
     {ReqPath, _} = emqx_lwm2m_cmd:path_list(emqx_lwm2m_cmd:extract_path(Ctx)),
     {ReqPath, _} = emqx_lwm2m_cmd:path_list(emqx_lwm2m_cmd:extract_path(Ctx)),
@@ -495,46 +448,43 @@ handle_coap_response({Ctx = #{<<"msgType">> := EventType},
             {[<<"5">>| _], _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack ->
             {[<<"5">>| _], _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack ->
                 %% this is a notification for status update during NB firmware upgrade.
                 %% this is a notification for status update during NB firmware upgrade.
                 %% need to reply to DM http callbacks
                 %% need to reply to DM http callbacks
-                send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, Validator, Session);
+                send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, WithContext, Session);
             {_ReqPath, _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack ->
             {_ReqPath, _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack ->
                 %% this is actually a notification, correct the msgType
                 %% this is actually a notification, correct the msgType
-                send_to_mqtt(Ctx, <<"notify">>, MqttPayload, Validator, Session);
+                send_to_mqtt(Ctx, <<"notify">>, MqttPayload, WithContext, Session);
             _ ->
             _ ->
-                send_to_mqtt(Ctx, EventType, MqttPayload, Validator, Session)
+                send_to_mqtt(Ctx, EventType, MqttPayload, WithContext, Session)
         end,
         end,
     send_dl_msg(Ctx, Session2).
     send_dl_msg(Ctx, Session2).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Ack
 %% Ack
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-handle_ack({Ctx, _}, Validator, Session) ->
+handle_ack({Ctx, _}, WithContext, Session) ->
     Session2 = send_dl_msg(Ctx, Session),
     Session2 = send_dl_msg(Ctx, Session),
     MqttPayload = emqx_lwm2m_cmd:empty_ack_to_mqtt(Ctx),
     MqttPayload = emqx_lwm2m_cmd:empty_ack_to_mqtt(Ctx),
-    send_to_mqtt(Ctx, <<"ack">>, MqttPayload, Validator, Session2).
+    send_to_mqtt(Ctx, <<"ack">>, MqttPayload, WithContext, Session2).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Ack Failure(Timeout/Reset)
 %% Ack Failure(Timeout/Reset)
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-handle_ack_failure({Ctx, _}, Validator, Session) ->
-    handle_ack_failure(Ctx, <<"coap_timeout">>, Validator, Session).
+handle_ack_failure({Ctx, _}, WithContext, Session) ->
+    handle_ack_failure(Ctx, <<"coap_timeout">>, WithContext, Session).
 
 
-handle_ack_reset({Ctx, _}, Validator, Session) ->
-    handle_ack_failure(Ctx, <<"coap_reset">>, Validator, Session).
+handle_ack_reset({Ctx, _}, WithContext, Session) ->
+    handle_ack_failure(Ctx, <<"coap_reset">>, WithContext, Session).
 
 
-handle_ack_failure(Ctx, MsgType, Validator, Session) ->
+handle_ack_failure(Ctx, MsgType, WithContext, Session) ->
     Session2 = may_send_dl_msg(coap_timeout, Ctx, Session),
     Session2 = may_send_dl_msg(coap_timeout, Ctx, Session),
     MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, MsgType),
     MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, MsgType),
-    send_to_mqtt(Ctx, MsgType, MqttPayload, Validator, Session2).
+    send_to_mqtt(Ctx, MsgType, MqttPayload, WithContext, Session2).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Send To CoAP
 %% Send To CoAP
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-may_send_dl_msg(coap_timeout, Ctx, #session{headers = Headers,
-                                            reg_info = RegInfo,
-                                            wait_ack = WaitAck} = Session) ->
-    Lwm2mMode = maps:get(lwm2m_model, Headers, undefined),
-    case is_cache_mode(Lwm2mMode, RegInfo, Session) of
+may_send_dl_msg(coap_timeout, Ctx, #session{wait_ack = WaitAck} = Session) ->
+    case is_cache_mode(Session) of
         false -> send_dl_msg(Ctx, Session);
         false -> send_dl_msg(Ctx, Session);
         true ->
         true ->
             case WaitAck of
             case WaitAck of
@@ -545,14 +495,11 @@ may_send_dl_msg(coap_timeout, Ctx, #session{headers = Headers,
             end
             end
     end.
     end.
 
 
-is_cache_mode(Lwm2mMode, RegInfo, #session{last_active_at = LastActiveAt}) ->
-    case Lwm2mMode =:= psm orelse is_psm(RegInfo) orelse is_qmode(RegInfo) of
-        true ->
-            QModeTimeWind = emqx:get_config([gateway, lwm2m, qmode_time_window]),
-            Now = ?NOW,
-            (Now - LastActiveAt) >= QModeTimeWind;
-        false -> false
-    end.
+is_cache_mode(#session{is_cache_mode = IsCacheMode,
+                       last_active_at = LastActiveAt}) ->
+    IsCacheMode andalso
+                  ((?NOW - LastActiveAt) >=
+                       emqx:get_config([gateway, lwm2m, qmode_time_window])).
 
 
 is_psm(#{<<"apn">> := APN}) when APN =:= <<"Ctnb">>;
 is_psm(#{<<"apn">> := APN}) when APN =:= <<"Ctnb">>;
                                  APN =:= <<"psmA.eDRX0.ctnb">>;
                                  APN =:= <<"psmA.eDRX0.ctnb">>;
@@ -611,54 +558,27 @@ send_msg_not_waiting_ack(Ctx, Req, Session) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Send To MQTT
 %% Send To MQTT
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-send_to_mqtt(Ref, EventType, Payload, Validator, Session = #session{headers = Headers}) ->
+send_to_mqtt(Ref, EventType, Payload, WithContext, Session) ->
     #{topic := Topic, qos := Qos} = uplink_topic(EventType),
     #{topic := Topic, qos := Qos} = uplink_topic(EventType),
-    NHeaders = extract_ext_flags(Headers),
     Mheaders = maps:get(mheaders, Ref, #{}),
     Mheaders = maps:get(mheaders, Ref, #{}),
-    NHeaders1 = maps:merge(NHeaders, Mheaders),
-    proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, NHeaders1, Validator, Session).
+    proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, Mheaders, WithContext, Session).
 
 
 send_to_mqtt(Ctx, EventType, Payload, {Topic, Qos},
 send_to_mqtt(Ctx, EventType, Payload, {Topic, Qos},
-             Validator, #session{headers = Headers} = Session) ->
+             WithContext, Session) ->
     Mheaders = maps:get(mheaders, Ctx, #{}),
     Mheaders = maps:get(mheaders, Ctx, #{}),
-    NHeaders = extract_ext_flags(Headers),
-    NHeaders1 = maps:merge(NHeaders, Mheaders),
-    proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, NHeaders1, Validator, Session).
+    proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, Mheaders, WithContext, Session).
 
 
-proto_publish(Topic, Payload, Qos, Headers, Validator,
+proto_publish(Topic, Payload, Qos, Headers, WithContext,
               #session{endpoint_name = Epn} = Session) ->
               #session{endpoint_name = Epn} = Session) ->
-    MountedTopic = mount(Topic, mountpoint(Epn)),
-    _ = case Validator(publish, MountedTopic) of
-            allow ->
-                Msg = emqx_message:make(Epn, Qos, MountedTopic,
-                                        emqx_json:encode(Payload), #{}, Headers),
-                emqx:publish(Msg);
-            _ ->
-                ?LOG(error, "topic:~p not allow to publish ", [MountedTopic])
-        end,
+    MountedTopic = mount(Topic, Session),
+    Msg = emqx_message:make(Epn, Qos, MountedTopic,
+                            emqx_json:encode(Payload), #{}, Headers),
+    WithContext(publish, [MountedTopic, Msg]),
     Session.
     Session.
 
 
-mountpoint(Epn) ->
-    Prefix = emqx:get_config([gateway, lwm2m, mountpoint]),
-    <<Prefix/binary, "/", Epn/binary, "/">>.
-
-mount(Topic, MountPoint) when is_binary(Topic), is_binary(MountPoint) ->
+mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) ->
     <<MountPoint/binary, Topic/binary>>.
     <<MountPoint/binary, Topic/binary>>.
 
 
-extract_ext_flags(Headers) ->
-    Header0 = #{is_tr => maps:get(is_tr, Headers, true)},
-    check(Header0, Headers, [sota_type, appId, nbgwFlag]).
-
-check(Params, _Headers, []) -> Params;
-check(Params, Headers, [Key | Rest]) ->
-    case maps:get(Key, Headers, null) of
-        V when V == undefined; V == null ->
-            check(Params, Headers, Rest);
-        Value ->
-            Params1 = Params#{Key => Value},
-            check(Params1, Headers, Rest)
-    end.
-
 downlink_topic() ->
 downlink_topic() ->
     emqx:get_config([gateway, lwm2m, translators, command]).
     emqx:get_config([gateway, lwm2m, translators, command]).
 
 
@@ -678,29 +598,30 @@ uplink_topic(_) ->
 %% Deliver
 %% Deliver
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-deliver(Delivers, #session{headers = Headers, reg_info = RegInfo} = Session) ->
-    Lwm2mMode = maps:get(lwm2m_model, Headers, undefined),
-    IsCacheMode = is_cache_mode(Lwm2mMode, RegInfo, Session),
+deliver(Delivers, WithContext, #session{reg_info = RegInfo} = Session) ->
+    IsCacheMode = is_cache_mode(Session),
     AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
     AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
     lists:foldl(fun({deliver, _, MQTT}, Acc) ->
     lists:foldl(fun({deliver, _, MQTT}, Acc) ->
                         deliver_to_coap(AlternatePath,
                         deliver_to_coap(AlternatePath,
-                                        MQTT#message.payload, MQTT, IsCacheMode, Acc)
+                                        MQTT#message.payload, MQTT, IsCacheMode, WithContext, Acc)
                 end,
                 end,
                 Session,
                 Session,
                 Delivers).
                 Delivers).
 
 
-deliver_to_coap(AlternatePath, JsonData, MQTT, CacheMode, Session) when is_binary(JsonData)->
+deliver_to_coap(AlternatePath, JsonData, MQTT, CacheMode, WithContext, Session) when is_binary(JsonData)->
     try
     try
         TermData = emqx_json:decode(JsonData, [return_maps]),
         TermData = emqx_json:decode(JsonData, [return_maps]),
-        deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, Session)
+        deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session)
     catch
     catch
         ExClass:Error:ST ->
         ExClass:Error:ST ->
             ?LOG(error, "deliver_to_coap - Invalid JSON: ~0p, Exception: ~0p, stacktrace: ~0p",
             ?LOG(error, "deliver_to_coap - Invalid JSON: ~0p, Exception: ~0p, stacktrace: ~0p",
                  [JsonData, {ExClass, Error}, ST]),
                  [JsonData, {ExClass, Error}, ST]),
+            WithContext(metrics, 'delivery.dropped'),
             Session
             Session
     end;
     end;
 
 
-deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, Session) when is_map(TermData) ->
+deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session) when is_map(TermData) ->
+    WithContext(metrics, 'messages.delivered'),
     {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
     {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
     ExpiryTime = get_expiry_time(MQTT),
     ExpiryTime = get_expiry_time(MQTT),
     maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, Session).
     maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, Session).

+ 9 - 8
apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl

@@ -35,14 +35,14 @@ gateway.lwm2m {
   lifetime_max = 86400s
   lifetime_max = 86400s
   qmode_time_windonw = 22
   qmode_time_windonw = 22
   auto_observe = false
   auto_observe = false
-  mountpoint = \"lwm2m\"
+  mountpoint = \"lwm2m/%u\"
   update_msg_publish_condition = contains_object_list
   update_msg_publish_condition = contains_object_list
   translators {
   translators {
-    command = {topic = \"dn/#\", qos = 0}
-    response = {topic = \"up/resp\", qos = 0}
-    notify = {topic = \"up/notify\", qos = 0}
-    register = {topic = \"up/resp\", qos = 0}
-    update = {topic = \"up/resp\", qos = 0}
+    command = {topic = \"/dn/#\", qos = 0}
+    response = {topic = \"/up/resp\", qos = 0}
+    notify = {topic = \"/up/notify\", qos = 0}
+    register = {topic = \"/up/resp\", qos = 0}
+    update = {topic = \"/up/resp\", qos = 0}
   }
   }
   listeners.udp.default {
   listeners.udp.default {
     bind = 5783
     bind = 5783
@@ -66,7 +66,7 @@ all() ->
     , {group, test_grp_6_observe}
     , {group, test_grp_6_observe}
 
 
       %% {group, test_grp_8_object_19}
       %% {group, test_grp_8_object_19}
-      %% {group, test_grp_9_psm_queue_mode}
+    , {group, test_grp_9_psm_queue_mode}
     ].
     ].
 
 
 suite() -> [{timetrap, {seconds, 90}}].
 suite() -> [{timetrap, {seconds, 90}}].
@@ -1750,7 +1750,7 @@ server_cache_mode(Config, RegOption) ->
     verify_read_response_1(0, UdpSock),
     verify_read_response_1(0, UdpSock),
 
 
     %% server inters into PSM mode
     %% server inters into PSM mode
-    timer:sleep(2),
+    timer:sleep(2500),
 
 
     %% verify server caches downlink commands
     %% verify server caches downlink commands
     send_read_command_1(1, UdpSock),
     send_read_command_1(1, UdpSock),
@@ -1797,6 +1797,7 @@ verify_read_response_1(CmdId, UdpSock) ->
     ReadResult = emqx_json:encode(#{  <<"requestID">> => CmdId, <<"cacheID">> => CmdId,
     ReadResult = emqx_json:encode(#{  <<"requestID">> => CmdId, <<"cacheID">> => CmdId,
                                       <<"msgType">> => <<"read">>,
                                       <<"msgType">> => <<"read">>,
                                       <<"data">> => #{
                                       <<"data">> => #{
+                                                      <<"reqPath">> => <<"/3/0/0">>,
                                                       <<"code">> => <<"2.05">>,
                                                       <<"code">> => <<"2.05">>,
                                                       <<"codeMsg">> => <<"content">>,
                                                       <<"codeMsg">> => <<"content">>,
                                                       <<"content">> => [#{
                                                       <<"content">> => [#{