Преглед изворни кода

Merge pull request #6410 from lafirest/fix/lw_cmd_record

fix(emqx_lwm2m): limit the max size of cmd record
lafirest пре 4 година
родитељ
комит
f25c8cdf0f
1 измењених фајлова са 19 додато и 5 уклоњено
  1. 19 5
      apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl

+ 19 - 5
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl

@@ -51,7 +51,8 @@
 -type cmd_code_msg() :: binary().
 -type cmd_code_content() :: list(map()).
 -type cmd_result() :: undefined | {cmd_code(), cmd_code_msg(), cmd_code_content()}.
--type cmd_record() :: #{cmd_record_key() => cmd_result()}.
+-type cmd_record() :: #{cmd_record_key() => cmd_result(),
+                        queue := queue:queue()}.
 
 -record(session, { coap :: emqx_coap_tm:manager()
                  , queue :: queue:queue(queued_request())
@@ -75,6 +76,8 @@
                         <<"7">>, <<"9">>, <<"15">>]).
 
 -define(CMD_KEY(Path, Type), {Path, Type}).
+-define(MAX_RECORD_SIZE, 100).
+-define(RECORD_SIZE(R), (erlang:map_size(R) - 1)).
 
 %% uplink and downlink topic configuration
 -define(lwm2m_up_dm_topic,  {<<"/v1/up/dm">>, 0}).
@@ -114,7 +117,7 @@ new() ->
             , created_at = erlang:system_time(millisecond)
             , is_cache_mode = false
             , mountpoint = <<>>
-            , cmd_record = #{}
+            , cmd_record = #{queue => queue:new()}
             , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
 
 -spec init(emqx_coap_message(), binary(), function(), session()) -> map().
@@ -756,6 +759,17 @@ record_response(EventType, #{<<"data">> := Data}, Session) ->
     Content = maps:get(<<"content">>, Data, undefined),
     record_cmd(ReqPath, EventType, {Code, CodeMsg, Content}, Session).
 
-record_cmd(Path, Type, Result, #session{cmd_record = Record} = Session) ->
-    Record2 = Record#{?CMD_KEY(Path, Type) => Result},
-    Session#session{cmd_record = Record2}.
+record_cmd(Path, Type, Result, #session{cmd_record = #{queue := Queue} =  Record} = Session) ->
+    Key = ?CMD_KEY(Path, Type),
+    Record2 = Record#{Key => Result},
+    Queue2 = queue:in(Key, Queue),
+    Record3 = check_record_size(Record2, Queue2),
+    Session#session{cmd_record = Record3}.
+
+check_record_size(Record, Queue) when ?RECORD_SIZE(Record) =< ?MAX_RECORD_SIZE ->
+    Record#{queue := Queue};
+
+check_record_size(Record, Queue) ->
+    {{value, Key}, Queue2} = queue:out(Queue),
+    Record2 = maps:remove(Key, Record),
+    Record2#{queue := Queue2}.