Преглед изворни кода

Merge pull request #10257 from HJianBo/gw-lwm2m-fixes

Fixed the issue where `auto_observe` was not working in LwM2M Gateway
JianBo He пре 2 година
родитељ
комит
fee4ff6a07

+ 2 - 1
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl

@@ -138,7 +138,7 @@ mqtt_to_coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"discover">>, <<"dat
             [
             [
                 {uri_path, FullPathList},
                 {uri_path, FullPathList},
                 {uri_query, QueryList},
                 {uri_query, QueryList},
-                {'accept', ?LWM2M_FORMAT_LINK}
+                {accept, ?LWM2M_FORMAT_LINK}
             ]
             ]
         ),
         ),
         InputCmd
         InputCmd
@@ -241,6 +241,7 @@ empty_ack_to_mqtt(Ref) ->
 coap_failure_to_mqtt(Ref, MsgType) ->
 coap_failure_to_mqtt(Ref, MsgType) ->
     make_base_response(maps:put(<<"msgType">>, MsgType, Ref)).
     make_base_response(maps:put(<<"msgType">>, MsgType, Ref)).
 
 
+%% TODO: application/link-format
 content_to_mqtt(CoapPayload, <<"text/plain">>, Ref) ->
 content_to_mqtt(CoapPayload, <<"text/plain">>, Ref) ->
     emqx_lwm2m_message:text_to_json(extract_path(Ref), CoapPayload);
     emqx_lwm2m_message:text_to_json(extract_path(Ref), CoapPayload);
 content_to_mqtt(CoapPayload, <<"application/octet-stream">>, Ref) ->
 content_to_mqtt(CoapPayload, <<"application/octet-stream">>, Ref) ->

+ 23 - 9
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl

@@ -15,11 +15,12 @@
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 -module(emqx_lwm2m_session).
 -module(emqx_lwm2m_session).
 
 
+-include("src/coap/include/emqx_coap.hrl").
+-include("src/lwm2m/include/emqx_lwm2m.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
--include("src/coap/include/emqx_coap.hrl").
--include("src/lwm2m/include/emqx_lwm2m.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
 %% API
 %% API
 -export([
 -export([
@@ -513,12 +514,20 @@ observe_object_list(AlternatePath, ObjectList, Session) ->
             true ->
             true ->
                 Acc;
                 Acc;
             false ->
             false ->
-                try
-                    emqx_lwm2m_xml_object_db:find_objectid(binary_to_integer(ObjId)),
-                    observe_object(AlternatePath, ObjectPath, Acc)
-                catch
-                    error:no_xml_definition ->
-                        Acc
+                ObjId1 = binary_to_integer(ObjId),
+                case emqx_lwm2m_xml_object_db:find_objectid(ObjId1) of
+                    {error, no_xml_definition} ->
+                        ?tp(
+                            warning,
+                            ignore_observer_resource,
+                            #{
+                                reason => no_xml_definition,
+                                object_id => ObjId1
+                            }
+                        ),
+                        Acc;
+                    _ ->
+                        observe_object(AlternatePath, ObjectPath, Acc)
                 end
                 end
         end
         end
     end,
     end,
@@ -538,15 +547,20 @@ deliver_auto_observe_to_coap(AlternatePath, TermData, Session) ->
         path => AlternatePath,
         path => AlternatePath,
         data => TermData
         data => TermData
     }),
     }),
-    {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
+    {Req0, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
+    Req = alloc_token(Req0),
     maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session).
     maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session).
 
 
 is_auto_observe() ->
 is_auto_observe() ->
     emqx:get_config([gateway, lwm2m, auto_observe]).
     emqx:get_config([gateway, lwm2m, auto_observe]).
 
 
+alloc_token(Req = #coap_message{}) ->
+    Req#coap_message{token = crypto:strong_rand_bytes(4)}.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Response
 %% Response
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
+
 handle_coap_response(
 handle_coap_response(
     {Ctx = #{<<"msgType">> := EventType}, #coap_message{
     {Ctx = #{<<"msgType">> := EventType}, #coap_message{
         method = CoapMsgMethod,
         method = CoapMsgMethod,

+ 5 - 2
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl

@@ -57,6 +57,7 @@
 start_link(XmlDir) ->
 start_link(XmlDir) ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []).
     gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []).
 
 
+-spec find_objectid(integer()) -> {error, no_xml_definition} | xmerl:xmlElement().
 find_objectid(ObjectId) ->
 find_objectid(ObjectId) ->
     ObjectIdInt =
     ObjectIdInt =
         case is_list(ObjectId) of
         case is_list(ObjectId) of
@@ -65,9 +66,10 @@ find_objectid(ObjectId) ->
         end,
         end,
     case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
     case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
         [] -> {error, no_xml_definition};
         [] -> {error, no_xml_definition};
-        [{ObjectId, Xml}] -> Xml
+        [{_ObjectId, Xml}] -> Xml
     end.
     end.
 
 
+-spec find_name(string()) -> {error, no_xml_definition} | xmerl:xmlElement().
 find_name(Name) ->
 find_name(Name) ->
     NameBinary =
     NameBinary =
         case is_list(Name) of
         case is_list(Name) of
@@ -77,10 +79,11 @@ find_name(Name) ->
     case ets:lookup(?LWM2M_OBJECT_NAME_TO_ID_TAB, NameBinary) of
     case ets:lookup(?LWM2M_OBJECT_NAME_TO_ID_TAB, NameBinary) of
         [] ->
         [] ->
             {error, no_xml_definition};
             {error, no_xml_definition};
-        [{NameBinary, ObjectId}] ->
+        [{_NameBinary, ObjectId}] ->
             find_objectid(ObjectId)
             find_objectid(ObjectId)
     end.
     end.
 
 
+-spec stop() -> ok.
 stop() ->
 stop() ->
     gen_server:stop(?MODULE).
     gen_server:stop(?MODULE).
 
 

+ 32 - 1
apps/emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl

@@ -36,8 +36,39 @@
 -define(ERR_BAD_REQUEST, <<"Bad Request">>).
 -define(ERR_BAD_REQUEST, <<"Bad Request">>).
 -define(REG_PREFIX, <<"rd">>).
 -define(REG_PREFIX, <<"rd">>).
 
 
+%%--------------------------------------------------------------------
+%% Data formats for transferring resource information, defined in
+%% OMA-TS-LightweightM2M-V1_0_1-20170704-A
+
+%% 0: Plain text. 0 is numeric value used in CoAP Content-Format option.
+%% The plain text format is used for "Read" and "Write" operations on singular
+%% Resources. i.e: /3/0/0
+%%
+%% This data format has a Media Type of "text/plain".
 -define(LWM2M_FORMAT_PLAIN_TEXT, 0).
 -define(LWM2M_FORMAT_PLAIN_TEXT, 0).
+
+%% 40: Link format. 40 is numeric value used in CoAP Content-Format option.
+%%
 -define(LWM2M_FORMAT_LINK, 40).
 -define(LWM2M_FORMAT_LINK, 40).
+
+%% 42: Opaque. 41 is numeric value used in CoAP Content-Format option.
+%% The opaque format is used for "Read" and "Write" operations on singular
+%% Resources where the value of the Resource is an opaque binary value.
+%% i.e: firmware images or opaque value from top layer.
+%%
+%% This data format has a Media Type of "application/octet-stream".
 -define(LWM2M_FORMAT_OPAQUE, 42).
 -define(LWM2M_FORMAT_OPAQUE, 42).
+
+%% 11542: TLV. 11542 is numeric value used in CoAP Content-Format option.
+%% For "Read" and "Write" operation, the binary TLV format is used to represent
+%% an array of values or a single value using a compact binary representation.
+%%
+%% This data format has a Media Type of "application/vnd.oma.lwm2m+tlv".
 -define(LWM2M_FORMAT_TLV, 11542).
 -define(LWM2M_FORMAT_TLV, 11542).
--define(LWMWM_FORMAT_JSON, 11543).
+
+%% 11543: JSON. 11543 is numeric value used in CoAP Content-Format option.
+%% The client may support the JSON format for "Read" and "Write" operations to
+%% represent multiple resource or single resource values.
+%%
+%% This data format has a Media Type of "application/vnd.oma.lwm2m+json".
+-define(LWM2M_FORMAT_OMA_JSON, 11543).

+ 89 - 27
apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl

@@ -35,29 +35,7 @@
 -include("src/coap/include/emqx_coap.hrl").
 -include("src/coap/include/emqx_coap.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
-
--define(CONF_DEFAULT, <<
-    "\n"
-    "gateway.lwm2m {\n"
-    "  xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\"\n"
-    "  lifetime_min = 1s\n"
-    "  lifetime_max = 86400s\n"
-    "  qmode_time_window = 22\n"
-    "  auto_observe = false\n"
-    "  mountpoint = \"lwm2m/${username}\"\n"
-    "  update_msg_publish_condition = contains_object_list\n"
-    "  translators {\n"
-    "    command = {topic = \"/dn/#\", qos = 0}\n"
-    "    response = {topic = \"/up/resp\", qos = 0}\n"
-    "    notify = {topic = \"/up/notify\", qos = 0}\n"
-    "    register = {topic = \"/up/resp\", qos = 0}\n"
-    "    update = {topic = \"/up/resp\", qos = 0}\n"
-    "  }\n"
-    "  listeners.udp.default {\n"
-    "    bind = 5783\n"
-    "  }\n"
-    "}\n"
->>).
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
 -record(coap_content, {content_format, payload = <<>>}).
 -record(coap_content, {content_format, payload = <<>>}).
 
 
@@ -99,7 +77,8 @@ groups() ->
             %% case06_register_wrong_lifetime, %% now, will ignore wrong lifetime
             %% case06_register_wrong_lifetime, %% now, will ignore wrong lifetime
             case07_register_alternate_path_01,
             case07_register_alternate_path_01,
             case07_register_alternate_path_02,
             case07_register_alternate_path_02,
-            case08_reregister
+            case08_reregister,
+            case09_auto_observe
         ]},
         ]},
         {test_grp_1_read, [RepeatOpt], [
         {test_grp_1_read, [RepeatOpt], [
             case10_read,
             case10_read,
@@ -164,8 +143,15 @@ end_per_suite(Config) ->
     emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_authn]),
     emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_authn]),
     Config.
     Config.
 
 
-init_per_testcase(_AllTestCase, Config) ->
-    ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
+init_per_testcase(TestCase, Config) ->
+    GatewayConfig =
+        case TestCase of
+            case09_auto_observe ->
+                default_config(#{auto_observe => true});
+            _ ->
+                default_config()
+        end,
+    ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, GatewayConfig),
 
 
     {ok, _} = application:ensure_all_started(emqx_gateway),
     {ok, _} = application:ensure_all_started(emqx_gateway),
     {ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]),
     {ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]),
@@ -187,7 +173,37 @@ end_per_testcase(_AllTestCase, Config) ->
     ok = application:stop(emqx_gateway).
     ok = application:stop(emqx_gateway).
 
 
 default_config() ->
 default_config() ->
-    ?CONF_DEFAULT.
+    default_config(#{}).
+
+default_config(Overrides) ->
+    iolist_to_binary(
+        io_lib:format(
+            "\n"
+            "gateway.lwm2m {\n"
+            "  xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\"\n"
+            "  lifetime_min = 1s\n"
+            "  lifetime_max = 86400s\n"
+            "  qmode_time_window = 22\n"
+            "  auto_observe = ~w\n"
+            "  mountpoint = \"lwm2m/${username}\"\n"
+            "  update_msg_publish_condition = contains_object_list\n"
+            "  translators {\n"
+            "    command = {topic = \"/dn/#\", qos = 0}\n"
+            "    response = {topic = \"/up/resp\", qos = 0}\n"
+            "    notify = {topic = \"/up/notify\", qos = 0}\n"
+            "    register = {topic = \"/up/resp\", qos = 0}\n"
+            "    update = {topic = \"/up/resp\", qos = 0}\n"
+            "  }\n"
+            "  listeners.udp.default {\n"
+            "    bind = ~w\n"
+            "  }\n"
+            "}\n",
+            [
+                maps:get(auto_observe, Overrides, false),
+                maps:get(bind, Overrides, ?PORT)
+            ]
+        )
+    ).
 
 
 default_port() ->
 default_port() ->
     ?PORT.
     ?PORT.
@@ -762,6 +778,52 @@ case08_reregister(Config) ->
     %% verify the lwm2m client is still online
     %% verify the lwm2m client is still online
     ?assertEqual(ReadResult, test_recv_mqtt_response(ReportTopic)).
     ?assertEqual(ReadResult, test_recv_mqtt_response(ReportTopic)).
 
 
+case09_auto_observe(Config) ->
+    UdpSock = ?config(sock, Config),
+    Epn = "urn:oma:lwm2m:oma:3",
+    MsgId1 = 15,
+    RespTopic = list_to_binary("lwm2m/" ++ Epn ++ "/up/resp"),
+    emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
+    timer:sleep(200),
+
+    ok = snabbkaffe:start_trace(),
+
+    %% step 1, device register ...
+    test_send_coap_request(
+        UdpSock,
+        post,
+        sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=345&lwm2m=1", [?PORT, Epn]),
+        #coap_content{
+            content_format = <<"text/plain">>,
+            payload = <<
+                "</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
+                "</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>,</lwm2m/59102/0>"
+            >>
+        },
+        [],
+        MsgId1
+    ),
+    #coap_message{method = Method1} = test_recv_coap_response(UdpSock),
+    ?assertEqual({ok, created}, Method1),
+
+    #coap_message{
+        method = Method2,
+        token = Token2,
+        options = Options2
+    } = test_recv_coap_request(UdpSock),
+    ?assertEqual(get, Method2),
+    ?assertNotEqual(<<>>, Token2),
+    ?assertMatch(
+        #{
+            observe := 0,
+            uri_path := [<<"lwm2m">>, <<"3">>, <<"0">>]
+        },
+        Options2
+    ),
+
+    {ok, _} = ?block_until(#{?snk_kind := ignore_observer_resource}, 1000),
+    ok.
+
 case10_read(Config) ->
 case10_read(Config) ->
     UdpSock = ?config(sock, Config),
     UdpSock = ?config(sock, Config),
     Epn = "urn:oma:lwm2m:oma:3",
     Epn = "urn:oma:lwm2m:oma:3",

+ 11 - 0
changes/ce/fix-10257.en.md

@@ -0,0 +1,11 @@
+Fixed the issue where `auto_observe` was not working in LwM2M Gateway.
+
+Before the fix, OBSERVE requests were sent without a token, causing failures
+that LwM2M clients could not handle.
+
+After the fix, LwM2M Gateway can correctly observe the resource list carried by
+client, furthermore, unknown resources will be ignored and printing the following
+warning log:
+```
+2023-03-28T18:50:27.771123+08:00 [warning] msg: ignore_observer_resource, mfa: emqx_lwm2m_session:observe_object_list/3, line: 522, peername: 127.0.0.1:56830, clientid: testlwm2mclient, object_id: 31024, reason: no_xml_definition
+```

+ 8 - 0
changes/ce/fix-10257.zh.md

@@ -0,0 +1,8 @@
+修复 LwM2M 网关 `auto_observe` 不工作的问题。
+
+在修复之前,下发的 OBSERVE 请求没有 Token 从而导致 LwM2M 客户端无法处理的失败。
+
+修复后,能正确监听 LwM2M 携带的资源列表、和会忽略未知的资源,并打印以下日志:
+```
+2023-03-28T18:50:27.771123+08:00 [warning] msg: ignore_observer_resource, mfa: emqx_lwm2m_session:observe_object_list/3, line: 522, peername: 127.0.0.1:56830, clientid: testlwm2mclient, object_id: 31024, reason: no_xml_definition
+```