Преглед изворни кода

feat(lwm2m): fix check dialyzer fail

Turtle пре 4 година
родитељ
комит
bde9d052a5

+ 1 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_cmd_handler.erl

@@ -23,6 +23,7 @@
 -export([ mqtt2coap/2
         , coap2mqtt/4
         , ack2mqtt/1
+        , extract_path/1
         ]).
 
 -export([path_list/1]).

+ 49 - 25
apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl

@@ -103,6 +103,7 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
                 emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
             end),
             emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
+	    emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
 
             {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
         {error, Error} ->
@@ -120,10 +121,8 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
     _ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
     Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
 
-update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
-        life_timer = LifeTimer, register_info = RegInfo,
-        coap_pid = CoapPid}) ->
-
+update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
+                                                    coap_pid = CoapPid, endpoint_name = Epn}) ->
     UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
 
     _ = case proplists:get_value(update_msg_publish_condition,
@@ -134,6 +133,7 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
             %% - report the registration info update, but only when objectList is updated.
             case NewRegInfo of
                 #{<<"objectList">> := _} ->
+		    emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
                     send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
                 _ -> ok
             end
@@ -151,7 +151,8 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
                            register_info = UpdatedRegInfo}.
 
 replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
-                                                     coap_pid = CoapPid}) ->
+                                                     coap_pid = CoapPid,
+                                                     endpoint_name = EndpointName}) ->
     _ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
 
     %% - flush cached donwlink commands
@@ -161,7 +162,7 @@ replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
     UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
                             maps:get(<<"lt">>, NewRegInfo), LifeTimer),
 
-    _ = send_auto_observe(CoapPid, NewRegInfo),
+    _ = send_auto_observe(CoapPid, NewRegInfo, EndpointName),
 
     ?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
     Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
@@ -174,15 +175,20 @@ send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
     Lwm2mState.
 
 auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
-                                       coap_pid = CoapPid}) ->
-    _ = send_auto_observe(CoapPid, RegInfo),
+                                       coap_pid = CoapPid,
+                                       endpoint_name = EndpointName}) ->
+    _ = send_auto_observe(CoapPid, RegInfo, EndpointName),
     Lwm2mState.
 
-deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) ->
+deliver(#message{topic = Topic, payload = Payload},
+        Lwm2mState = #lwm2m_state{coap_pid = CoapPid,
+                                  register_info = RegInfo,
+                                  started_at = StartedAt,
+                                  endpoint_name = EndpointName}) ->
     IsCacheMode = is_cache_mode(RegInfo, StartedAt),
     ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
     AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
-    deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode),
+    deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
     Lwm2mState.
 
 get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
@@ -238,20 +244,21 @@ time_now() -> erlang:system_time(millisecond).
 %% Deliver downlink message to coap
 %%--------------------------------------------------------------------
 
-deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode) when is_binary(JsonData)->
+deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
     try
         TermData = emqx_json:decode(JsonData, [return_maps]),
-        deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode)
+        deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
     catch
         C:R:Stack ->
             ?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
                 [JsonData, {C, R}, Stack])
     end;
 
-deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermData) ->
+deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when is_map(TermData) ->
     ?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
     {CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
-
+    MsgType = maps:get(<<"msgType">>, Ref),
+    emqx_lwm2m_cm:register_cmd(EndpointName, emqx_lwm2m_cmd_handler:extract_path(Ref), MsgType),
     case CacheMode of
         false ->
             do_deliver_to_coap(CoapPid, CoapRequest, Ref);
@@ -266,7 +273,12 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermDat
 send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
     do_send_to_broker(EventType, Payload, Lwm2mState).
 
-do_send_to_broker(EventType, Payload, Lwm2mState) ->
+do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
+    ReqPath = maps:get(<<"reqPath">>, Data, undefined),
+    Code = maps:get(<<"code">>, Data, undefined),
+    CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
+    Content = maps:get(<<"content">>, Data, undefined),
+    emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
     NewPayload = maps:put(<<"msgType">>, EventType, Payload),
     Topic = uplink_topic(EventType, Lwm2mState),
     publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
@@ -281,7 +293,7 @@ auto_observe_object_list(Expected, Registered) ->
     Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
     lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
 
-send_auto_observe(CoapPid, RegInfo) ->
+send_auto_observe(CoapPid, RegInfo, EndpointName) ->
     %% - auto observe the objects
     case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
         false ->
@@ -292,25 +304,37 @@ send_auto_observe(CoapPid, RegInfo) ->
                             maps:get(<<"objectList">>, RegInfo, [])
                            ),
             AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
-            auto_observe(AlternatePath, Objectlists, CoapPid)
+            auto_observe(AlternatePath, Objectlists, CoapPid, EndpointName)
     end.
 
-auto_observe(AlternatePath, ObjectList, CoapPid) ->
+auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
     ?LOG(info, "Auto Observe on: ~p", [ObjectList]),
     erlang:spawn(fun() ->
-            observe_object_list(AlternatePath, ObjectList, CoapPid)
+            observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName)
         end).
 
-observe_object_list(AlternatePath, ObjectList, CoapPid) ->
+observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
     lists:foreach(fun(ObjectPath) ->
-        observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100)
+        [ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
+        case ObjId of
+            <<"19">> ->
+                [ObjInsId | _LastPath1] = LastPath,
+                case ObjInsId of
+                    <<"0">> ->
+                        observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
+                    _ ->
+                        observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
+                end;
+            _ ->
+                observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
+        end
     end, ObjectList).
 
-observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval) ->
-    observe_object(AlternatePath, ObjectPath, CoapPid),
+observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval, EndpointName) ->
+    observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName),
     timer:sleep(Interval).
 
-observe_object(AlternatePath, ObjectPath, CoapPid) ->
+observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName) ->
     Payload = #{
         <<"msgType">> => <<"observe">>,
         <<"data">> => #{
@@ -318,7 +342,7 @@ observe_object(AlternatePath, ObjectPath, CoapPid) ->
         }
     },
     ?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
-    deliver_to_coap(AlternatePath, Payload, CoapPid, false).
+    deliver_to_coap(AlternatePath, Payload, CoapPid, false, EndpointName).
 
 do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
     erlang:spawn(fun() ->