فهرست منبع

refactor(gw-lwm2m): refine lwm2m

JianBo He 4 سال پیش
والد
کامیت
b16cf44bf6

+ 30 - 0
apps/emqx_gateway/etc/emqx_gateway.conf

@@ -127,4 +127,34 @@ gateway: {
         #listener.udp.1: {}
         #listener.dtls.1: {}
     }
+
+    lwm2m_xml_dir: "{{ platform_etc_dir }}/lwm2m_xml"
+
+    lwm2m.1: {
+
+        lifetime_min: 1s
+
+        lifetime_max: 86400s
+
+        qmode_time_windonw: 22
+
+        auto_observe: false
+
+        mountpoint: "lwm2m/%e/"
+
+        ## always | contains_object_list
+        update_msg_publish_condition: contains_object_list
+
+        translators: {
+            command: "dn/#"
+            response: "up/resp"
+            notify: "up/notify"
+            register: "up/resp"
+            update: "up/resp"
+        }
+
+        listener.udp.1 {
+            bind: 5783
+        }
+    }
 }

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway.app.src

@@ -3,7 +3,7 @@
   {vsn, "0.1.0"},
   {registered, []},
   {mod, {emqx_gateway_app, []}},
-  {applications, [kernel, stdlib, grpc]},
+  {applications, [kernel, stdlib, grpc, lwm2m_coap]},
   {env, []},
   {modules, []},
   {licenses, ["Apache 2.0"]},

+ 7 - 2
apps/emqx_gateway/src/emqx_gateway_app.erl

@@ -44,7 +44,8 @@ load_default_gateway_applications() ->
 
 gateway_type_searching() ->
     %% FIXME: Hardcoded apps
-    [emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl, emqx_coap_impl].
+    [emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl,
+     emqx_coap_impl, emqx_lwm2m_impl].
 
 load(Mod) ->
     try
@@ -81,10 +82,14 @@ create_gateway_by_default([{Type, Name, Confs}|More]) ->
     create_gateway_by_default(More).
 
 zipped_confs() ->
-    All = maps:to_list(emqx_config:get([gateway])),
+    All = maps:to_list(
+            maps:without(exclude_options(), emqx_config:get([gateway]))),
     lists:append(lists:foldr(
       fun({Type, Gws}, Acc) ->
         {Names, Confs} = lists:unzip(maps:to_list(Gws)),
         Types = [ Type || _ <- lists:seq(1, length(Names))],
         [lists:zip3(Types, Names, Confs) | Acc]
       end, [], All)).
+
+exclude_options() ->
+   [lwm2m_xml_dir].

+ 22 - 2
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -34,8 +34,10 @@ structs() -> ["gateway"].
 fields("gateway") ->
     [{stomp, t(ref(stomp))},
      {mqttsn, t(ref(mqttsn))},
-     {exproto, t(ref(exproto))},
-     {coap, t(ref(coap))}
+     {coap, t(ref(coap))},
+     {lwm2m, t(ref(lwm2m))},
+     {lwm2m_xml_dir, t(string())},
+     {exproto, t(ref(exproto))}
     ];
 
 fields(stomp) ->
@@ -74,6 +76,21 @@ fields(mqttsn_predefined) ->
     , {topic, t(string())}
     ];
 
+fields(lwm2m) ->
+    [{"$id", t(ref(lwm2m_structs))}
+    ];
+
+fields(lwm2m_structs) ->
+    [ {lifetime_min, t(duration())}
+    , {lifetime_max, t(duration())}
+    , {qmode_time_windonw, t(integer())}
+    , {auto_observe, t(boolean())}
+    , {mountpoint, t(string())}
+    , {update_msg_publish_condition, t(union([always, contains_object_list]))}
+    , {translators, t(ref(translators))}
+    , {listener, t(ref(udp_listener_group))}
+    ];
+
 fields(exproto) ->
     [{"$id", t(ref(exproto_structs))}];
 
@@ -100,6 +117,9 @@ fields(clientinfo_override) ->
     , {clientid, t(string())}
     ];
 
+fields(translators) ->
+    [{"$name", t(string())}];
+
 fields(udp_listener_group) ->
     [ {udp, t(ref(udp_listener))}
     , {dtls, t(ref(dtls_listener))}

+ 6 - 3
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_coap_resource.erl

@@ -363,9 +363,12 @@ check_epn(undefined) -> false;
 check_epn(_)         -> true.
 
 check_lifetime(undefined) -> false;
-check_lifetime(LifeTime) when is_integer(LifeTime) ->
-    Max = proplists:get_value(lifetime_max, lwm2m_coap_responder:options(), 315360000),
-    Min = proplists:get_value(lifetime_min, lwm2m_coap_responder:options(), 0),
+check_lifetime(LifeTime0) when is_integer(LifeTime0) ->
+    LifeTime = timer:seconds(LifeTime0),
+    Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
+    Max = maps:get(lifetime_max, Envs, 315360000),
+    Min = maps:get(lifetime_min, Envs, 0),
+
     if
         LifeTime >= Min, LifeTime =< Max ->
             true;

+ 173 - 0
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl

@@ -0,0 +1,173 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc The LwM2M Gateway Implement interface
+-module(emqx_lwm2m_impl).
+
+-behavior(emqx_gateway_impl).
+
+%% APIs
+-export([ load/0
+        , unload/0
+        ]).
+
+-export([]).
+
+-export([ init/1
+        , on_insta_create/3
+        , on_insta_update/4
+        , on_insta_destroy/3
+        ]).
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+load() ->
+    RegistryOptions = [ {cbkmod, ?MODULE}
+                      ],
+    emqx_gateway_registry:load(lwm2m, RegistryOptions, []).
+
+unload() ->
+    %% XXX:
+    lwm2m_coap_server_registry:remove_handler(
+      [<<"rd">>],
+      emqx_lwm2m_coap_resource, undefined
+     ),
+    emqx_gateway_registry:unload(lwm2m).
+
+init(_) ->
+    %% Handler
+    _ = lwm2m_coap_server:start_registry(),
+    lwm2m_coap_server_registry:add_handler(
+      [<<"rd">>],
+      emqx_lwm2m_coap_resource, undefined
+     ),
+    %% Xml registry
+    {ok, _} = emqx_lwm2m_xml_object_db:start_link(
+                emqx_config:get([gateway, lwm2m_xml_dir])
+              ),
+
+    %% XXX: Self managed table?
+    %% TODO: Improve it later
+    {ok, _} = emqx_lwm2m_cm:start_link(),
+
+    GwState = #{},
+    {ok, GwState}.
+
+%% TODO: deinit
+
+%%--------------------------------------------------------------------
+%% emqx_gateway_registry callbacks
+%%--------------------------------------------------------------------
+
+on_insta_create(_Insta = #{ id := InstaId,
+                            rawconf := RawConf
+                          }, Ctx, _GwState) ->
+    Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
+    ListenerPids = lists:map(fun(Lis) ->
+                     start_listener(InstaId, Ctx, Lis)
+                   end, Listeners),
+    {ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
+
+on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
+    InstaId = maps:get(id, NewInsta),
+    try
+        %% XXX: 1. How hot-upgrade the changes ???
+        %% XXX: 2. Check the New confs first before destroy old instance ???
+        on_insta_destroy(OldInsta, GwInstaState, GwState),
+        on_insta_create(NewInsta, Ctx, GwState)
+    catch
+        Class : Reason : Stk ->
+            logger:error("Failed to update stomp instance ~s; "
+                         "reason: {~0p, ~0p} stacktrace: ~0p",
+                         [InstaId, Class, Reason, Stk]),
+            {error, {Class, Reason}}
+    end.
+
+on_insta_destroy(_Insta = #{ id := InstaId,
+                             rawconf := RawConf
+                           }, _GwInstaState, _GwState) ->
+    Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
+    lists:foreach(fun(Lis) ->
+        stop_listener(InstaId, Lis)
+    end, Listeners).
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
+    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
+    case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
+        {ok, Pid} ->
+            io:format("Start lwm2m ~s:~s listener on ~s successfully.~n",
+                      [InstaId, Type, ListenOnStr]),
+            Pid;
+        {error, Reason} ->
+            io:format(standard_error,
+                      "Failed to start lwm2m ~s:~s listener on ~s: ~0p~n",
+                      [InstaId, Type, ListenOnStr, Reason]),
+            throw({badconf, Reason})
+    end.
+
+start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
+    Name = name(InstaId, udp),
+    NCfg = Cfg#{ctx => Ctx},
+    NSocketOpts = merge_default(SocketOpts),
+    Options = [{config, NCfg}|NSocketOpts],
+    case Type of
+        udp ->
+            lwm2m_coap_server:start_udp(Name, ListenOn, Options);
+        dtls ->
+            lwm2m_coap_server:start_dtls(Name, ListenOn, Options)
+    end.
+
+name(InstaId, Type) ->
+    list_to_atom(lists:concat([InstaId, ":", Type])).
+
+merge_default(Options) ->
+    Default = emqx_gateway_utils:default_udp_options(),
+    case lists:keytake(udp_options, 1, Options) of
+        {value, {udp_options, TcpOpts}, Options1} ->
+            [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
+             | Options1];
+        false ->
+            [{udp_options, Default} | Options]
+    end.
+
+stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
+    StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
+    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
+    case StopRet of
+        ok -> io:format("Stop lwm2m ~s:~s listener on ~s successfully.~n",
+                        [InstaId, Type, ListenOnStr]);
+        {error, Reason} ->
+            io:format(standard_error,
+                      "Failed to stop lwm2m ~s:~s listener on ~s: ~0p~n",
+                      [InstaId, Type, ListenOnStr, Reason]
+                     )
+    end,
+    StopRet.
+
+stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
+    Name = name(InstaId, Type),
+    case Type of
+        udp ->
+            lwm2m_coap_server:stop_udp(Name, ListenOn);
+        dtls ->
+            lwm2m_coap_server:stop_dtls(Name, ListenOn)
+    end.

+ 26 - 12
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl

@@ -75,7 +75,8 @@ call(Pid, Msg, Timeout) ->
     end.
 
 init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
-    Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""),
+    Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
+    Mountpoint = iolist_to_binary(maps:get(mountpoint, Envs, "")),
     Lwm2mState = #lwm2m_state{peername = Peername,
                               endpoint_name = EndpointName,
                               version = Ver,
@@ -89,7 +90,10 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
         ok ->
             _ = run_hooks('client.connack', [conninfo(Lwm2mState), success], undefined),
 
-            Sockport = proplists:get_value(port, lwm2m_coap_responder:options(), 5683),
+            %% FIXME:
+            Sockport = 5683,
+            %Sockport = proplists:get_value(port, lwm2m_coap_responder:options(), 5683),
+
             ClientInfo1 = maps:put(sockport, Sockport, ClientInfo),
             Lwm2mState1 = Lwm2mState#lwm2m_state{started_at = time_now(),
                                                  mountpoint = maps:get(mountpoint, ClientInfo1)},
@@ -124,8 +128,10 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi
                                                     coap_pid = CoapPid, endpoint_name = Epn}) ->
     UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
 
-    _ = case proplists:get_value(update_msg_publish_condition,
-            lwm2m_coap_responder:options(), contains_object_list) of
+    Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
+
+    _ = case maps:get(update_msg_publish_condition,
+                      Envs, contains_object_list) of
         always ->
             send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
         contains_object_list ->
@@ -294,7 +300,8 @@ auto_observe_object_list(Expected, Registered) ->
 
 send_auto_observe(CoapPid, RegInfo, EndpointName) ->
     %% - auto observe the objects
-    case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
+    Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}), 
+    case proplists:get_value(auto_observe, Envs, false) of
         false ->
             ?LOG(info, "Auto Observe Disabled", []);
         TrueOrObjList ->
@@ -379,7 +386,12 @@ get_cached_downlink_messages() ->
 is_cache_mode(RegInfo, StartedAt) ->
     case is_psm(RegInfo) orelse is_qmode(RegInfo) of
         true ->
-            QModeTimeWind = proplists:get_value(qmode_time_window, lwm2m_coap_responder:options(), 22),
+            Envs = proplists:get_value(
+                     config,
+                     lwm2m_coap_responder:options(),
+                     #{}
+                    ),
+            QModeTimeWind = maps:get(qmode_time_window, Envs, 22),
             Now = time_now(),
             if (Now - StartedAt) >= QModeTimeWind -> true;
                 true -> false
@@ -400,15 +412,17 @@ is_qmode(_) -> false.
 %%--------------------------------------------------------------------
 
 downlink_topic(EventType, Lwm2mState = #lwm2m_state{mountpoint = Mountpoint}) ->
-    Topics = proplists:get_value(topics, lwm2m_coap_responder:options(), []),
-    DnTopic = proplists:get_value(downlink_topic_key(EventType), Topics,
-                                  default_downlink_topic(EventType)),
+    Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
+    Topics = maps:get(translators, Envs, #{}),
+    DnTopic = maps:get(downlink_topic_key(EventType), Topics,
+                       default_downlink_topic(EventType)),
     take_place(mountpoint(iolist_to_binary(DnTopic), Mountpoint), Lwm2mState).
 
 uplink_topic(EventType, Lwm2mState = #lwm2m_state{mountpoint = Mountpoint}) ->
-    Topics = proplists:get_value(topics, lwm2m_coap_responder:options(), []),
-    UpTopic = proplists:get_value(uplink_topic_key(EventType), Topics,
-                                  default_uplink_topic(EventType)),
+    Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}),
+    Topics = maps:get(translators, Envs, #{}),
+    UpTopic = maps:get(uplink_topic_key(EventType), Topics,
+                       default_uplink_topic(EventType)),
     take_place(mountpoint(iolist_to_binary(UpTopic), Mountpoint), Lwm2mState).
 
 downlink_topic_key(EventType) when is_binary(EventType) ->

+ 5 - 7
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl

@@ -22,7 +22,7 @@
 % This module is for future use. Disabled now.
 
 %% API
--export([ start_link/0
+-export([ start_link/1
         , stop/0
         , find_name/1
         , find_objectid/1
@@ -49,8 +49,8 @@
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(XmlDir) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []).
 
 find_objectid(ObjectId) ->
     ObjectIdInt =   case is_list(ObjectId) of
@@ -85,10 +85,10 @@ stop() ->
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
 
-init([]) ->
+init([XmlDir]) ->
     _ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]),
     _ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]),
-    load(emqx_config:get([emqx_lwm2m, xml_dir])),
+    load(XmlDir),
     {ok, #state{}}.
 
 handle_call(_Request, _From, State) ->
@@ -108,8 +108,6 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-
-
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 0 - 1953
apps/emqx_gateway/src/lwm2m/test/emqx_lwm2m_SUITE.erl


تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 0 - 240
apps/emqx_gateway/src/lwm2m/test/emqx_tlv_SUITE.erl


+ 0 - 171
apps/emqx_gateway/src/lwm2m/test/test_mqtt_broker.erl

@@ -1,171 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(test_mqtt_broker).
-
-% -compile(nowarn_export_all).
-% -compile(export_all).
-
-% -define(LOGT(Format, Args), logger:debug("TEST_BROKER: " ++ Format, Args)).
-
-% -record(state, {subscriber}).
-
-% -include_lib("emqx/include/emqx.hrl").
-
-% -include_lib("emqx/include/emqx_mqtt.hrl").
-
-% -include_lib("eunit/include/eunit.hrl").
-
-% start(_, <<"attacker">>, _, _, _) ->
-%     {stop, auth_failure};
-% start(ClientId, Username, Password, _Channel, KeepaliveInterval) ->
-%     true = is_binary(ClientId),
-%     (true = ( is_binary(Username)) orelse (Username == undefined) ),
-%     (true = ( is_binary(Password)) orelse (Password == undefined) ),
-%     self() ! {keepalive, start, KeepaliveInterval},
-%     {ok, []}.
-
-% publish(Topic, Payload, Qos) ->
-%     ClientId = <<"lwm2m_test_suite">>,
-%     Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
-%     emqx:publish(Msg).
-
-% subscribe(Topic) ->
-%     gen_server:call(?MODULE, {subscribe, Topic, self()}).
-
-% unsubscribe(Topic) ->
-%     gen_server:call(?MODULE, {unsubscribe, Topic}).
-
-% get_subscrbied_topics() ->
-%     [Topic || {_Client, Topic} <- ets:tab2list(emqx_subscription)].
-
-% start_link() ->
-%     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-% stop() ->
-%     gen_server:stop(?MODULE).
-
-% init(_Param) ->
-%     {ok, #state{subscriber = []}}.
-
-% handle_call({subscribe, Topic, Proc}, _From, State=#state{subscriber = SubList}) ->
-%     ?LOGT("test broker subscribe Topic=~p, Pid=~p~n", [Topic, Proc]),
-%     is_binary(Topic) orelse error("Topic should be a binary"),
-%     {reply, {ok, []}, State#state{subscriber = [{Topic, Proc}|SubList]}};
-
-% handle_call(get_subscribed_topics, _From, State=#state{subscriber = SubList}) ->
-%     Response = subscribed_topics(SubList, []),
-%     ?LOGT("test broker get subscribed topics=~p~n", [Response]),
-%     {reply, Response, State};
-
-% handle_call({unsubscribe, Topic}, _From, State=#state{subscriber = SubList}) ->
-%     ?LOGT("test broker unsubscribe Topic=~p~n", [Topic]),
-%     is_binary(Topic) orelse error("Topic should be a binary"),
-%     NewSubList = proplists:delete(Topic, SubList),
-%     {reply, {ok, []}, State#state{subscriber = NewSubList}};
-
-
-% handle_call({publish, {Topic, Msg, MatchedTopicFilter}}, _From, State=#state{subscriber = SubList}) ->
-%     (is_binary(Topic) and is_binary(Msg)) orelse error("Topic and Msg should be binary"),
-%     Pid = proplists:get_value(MatchedTopicFilter, SubList),
-%     ?LOGT("test broker publish topic=~p, Msg=~p, Pid=~p, MatchedTopicFilter=~p, SubList=~p~n", [Topic, Msg, Pid, MatchedTopicFilter, SubList]),
-%     (Pid == undefined) andalso ?LOGT("!!!!! this topic ~p has never been subscribed, please specify a valid topic filter", [MatchedTopicFilter]),
-%     ?assertNotEqual(undefined, Pid),
-%     Pid ! {deliver, #message{topic = Topic, payload = Msg}},
-%     {reply, ok, State};
-
-% handle_call(stop, _From, State) ->
-%     {stop, normal, stopped, State};
-
-% handle_call(Req, _From, State) ->
-%     ?LOGT("test_broker_server: ignore call Req=~p~n", [Req]),
-%     {reply, {error, badreq}, State}.
-
-
-% handle_cast(Msg, State) ->
-%     ?LOGT("test_broker_server: ignore cast msg=~p~n", [Msg]),
-%     {noreply, State}.
-
-% handle_info(Info, State) ->
-%     ?LOGT("test_broker_server: ignore info=~p~n", [Info]),
-%     {noreply, State}.
-
-% terminate(Reason, _State) ->
-%     ?LOGT("test_broker_server: terminate Reason=~p~n", [Reason]),
-%     ok.
-
-% code_change(_OldVsn, State, _Extra) ->
-%     {ok, State}.
-
-
-
-
-% subscribed_topics([], Acc) ->
-%     Acc;
-% subscribed_topics([{Topic,_Pid}|T], Acc) ->
-%     subscribed_topics(T, [Topic|Acc]).
-
-
-
-
-% -record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}).
-
-% -type(keepalive() :: #keepalive{}).
-
-% %% @doc Start a keepalive
-% -spec(start(fun(), integer(), any()) -> undefined | keepalive()).
-% start(_, 0, _) ->
-%     undefined;
-% start(StatFun, TimeoutSec, TimeoutMsg) ->
-%     {ok, StatVal} = StatFun(),
-%     #keepalive{statfun = StatFun, statval = StatVal,
-%         tsec = TimeoutSec, tmsg = TimeoutMsg,
-%         tref = timer(TimeoutSec, TimeoutMsg)}.
-
-% %% @doc Check keepalive, called when timeout.
-% -spec(check(keepalive()) -> {ok, keepalive()} | {error, any()}).
-% check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
-%     case StatFun() of
-%         {ok, NewVal} ->
-%             if NewVal =/= LastVal ->
-%                 {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})};
-%                 Repeat < 1 ->
-%                     {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})};
-%                 true ->
-%                     {error, timeout}
-%             end;
-%         {error, Error} ->
-%             {error, Error}
-%     end.
-
-% resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
-%     KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}.
-
-% %% @doc Cancel Keepalive
-% -spec(cancel(keepalive()) -> ok).
-% cancel(#keepalive{tref = TRef}) ->
-%     cancel(TRef);
-% cancel(undefined) ->
-%     ok;
-% cancel(TRef) ->
-%         catch erlang:cancel_timer(TRef).
-
-% timer(Sec, Msg) ->
-%     erlang:send_after(timer:seconds(Sec), self(), Msg).
-
-
-% log(Format, Args) ->
-%     logger:debug(Format, Args).

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 1971 - 0
apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl


تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 238 - 0
apps/emqx_gateway/test/emqx_tlv_SUITE.erl


+ 171 - 0
apps/emqx_gateway/test/test_mqtt_broker.erl

@@ -0,0 +1,171 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(test_mqtt_broker).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-define(LOGT(Format, Args), logger:debug("TEST_BROKER: " ++ Format, Args)).
+
+-record(state, {subscriber}).
+
+-include_lib("emqx/include/emqx.hrl").
+
+-include_lib("emqx/include/emqx_mqtt.hrl").
+
+-include_lib("eunit/include/eunit.hrl").
+
+start(_, <<"attacker">>, _, _, _) ->
+    {stop, auth_failure};
+start(ClientId, Username, Password, _Channel, KeepaliveInterval) ->
+    true = is_binary(ClientId),
+    (true = ( is_binary(Username)) orelse (Username == undefined) ),
+    (true = ( is_binary(Password)) orelse (Password == undefined) ),
+    self() ! {keepalive, start, KeepaliveInterval},
+    {ok, []}.
+
+publish(Topic, Payload, Qos) ->
+    ClientId = <<"lwm2m_test_suite">>,
+    Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
+    emqx:publish(Msg).
+
+subscribe(Topic) ->
+    gen_server:call(?MODULE, {subscribe, Topic, self()}).
+
+unsubscribe(Topic) ->
+    gen_server:call(?MODULE, {unsubscribe, Topic}).
+
+get_subscrbied_topics() ->
+    [Topic || {_Client, Topic} <- ets:tab2list(emqx_subscription)].
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+stop() ->
+    gen_server:stop(?MODULE).
+
+init(_Param) ->
+    {ok, #state{subscriber = []}}.
+
+handle_call({subscribe, Topic, Proc}, _From, State=#state{subscriber = SubList}) ->
+    ?LOGT("test broker subscribe Topic=~p, Pid=~p~n", [Topic, Proc]),
+    is_binary(Topic) orelse error("Topic should be a binary"),
+    {reply, {ok, []}, State#state{subscriber = [{Topic, Proc}|SubList]}};
+
+handle_call(get_subscribed_topics, _From, State=#state{subscriber = SubList}) ->
+    Response = subscribed_topics(SubList, []),
+    ?LOGT("test broker get subscribed topics=~p~n", [Response]),
+    {reply, Response, State};
+
+handle_call({unsubscribe, Topic}, _From, State=#state{subscriber = SubList}) ->
+    ?LOGT("test broker unsubscribe Topic=~p~n", [Topic]),
+    is_binary(Topic) orelse error("Topic should be a binary"),
+    NewSubList = proplists:delete(Topic, SubList),
+    {reply, {ok, []}, State#state{subscriber = NewSubList}};
+
+
+handle_call({publish, {Topic, Msg, MatchedTopicFilter}}, _From, State=#state{subscriber = SubList}) ->
+    (is_binary(Topic) and is_binary(Msg)) orelse error("Topic and Msg should be binary"),
+    Pid = proplists:get_value(MatchedTopicFilter, SubList),
+    ?LOGT("test broker publish topic=~p, Msg=~p, Pid=~p, MatchedTopicFilter=~p, SubList=~p~n", [Topic, Msg, Pid, MatchedTopicFilter, SubList]),
+    (Pid == undefined) andalso ?LOGT("!!!!! this topic ~p has never been subscribed, please specify a valid topic filter", [MatchedTopicFilter]),
+    ?assertNotEqual(undefined, Pid),
+    Pid ! {deliver, #message{topic = Topic, payload = Msg}},
+    {reply, ok, State};
+
+handle_call(stop, _From, State) ->
+    {stop, normal, stopped, State};
+
+handle_call(Req, _From, State) ->
+    ?LOGT("test_broker_server: ignore call Req=~p~n", [Req]),
+    {reply, {error, badreq}, State}.
+
+
+handle_cast(Msg, State) ->
+    ?LOGT("test_broker_server: ignore cast msg=~p~n", [Msg]),
+    {noreply, State}.
+
+handle_info(Info, State) ->
+    ?LOGT("test_broker_server: ignore info=~p~n", [Info]),
+    {noreply, State}.
+
+terminate(Reason, _State) ->
+    ?LOGT("test_broker_server: terminate Reason=~p~n", [Reason]),
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+
+
+subscribed_topics([], Acc) ->
+    Acc;
+subscribed_topics([{Topic,_Pid}|T], Acc) ->
+    subscribed_topics(T, [Topic|Acc]).
+
+
+
+
+-record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}).
+
+-type(keepalive() :: #keepalive{}).
+
+%% @doc Start a keepalive
+-spec(start(fun(), integer(), any()) -> undefined | keepalive()).
+start(_, 0, _) ->
+    undefined;
+start(StatFun, TimeoutSec, TimeoutMsg) ->
+    {ok, StatVal} = StatFun(),
+    #keepalive{statfun = StatFun, statval = StatVal,
+        tsec = TimeoutSec, tmsg = TimeoutMsg,
+        tref = timer(TimeoutSec, TimeoutMsg)}.
+
+%% @doc Check keepalive, called when timeout.
+-spec(check(keepalive()) -> {ok, keepalive()} | {error, any()}).
+check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
+    case StatFun() of
+        {ok, NewVal} ->
+            if NewVal =/= LastVal ->
+                {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})};
+                Repeat < 1 ->
+                    {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})};
+                true ->
+                    {error, timeout}
+            end;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
+    KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}.
+
+%% @doc Cancel Keepalive
+-spec(cancel(keepalive()) -> ok).
+cancel(#keepalive{tref = TRef}) ->
+    cancel(TRef);
+cancel(undefined) ->
+    ok;
+cancel(TRef) ->
+        catch erlang:cancel_timer(TRef).
+
+timer(Sec, Msg) ->
+    erlang:send_after(timer:seconds(Sec), self(), Msg).
+
+
+log(Format, Args) ->
+    logger:debug(Format, Args).

+ 1 - 0
rebar.config.erl

@@ -346,6 +346,7 @@ relx_overlay(ReleaseType) ->
     , {copy, "bin/emqx", "bin/emqx-{{release_version}}"} %% for relup
     , {copy, "bin/emqx_ctl", "bin/emqx_ctl-{{release_version}}"} %% for relup
     , {copy, "bin/install_upgrade.escript", "bin/install_upgrade.escript-{{release_version}}"} %% for relup
+    , {copy, "apps/emqx_gateway/src/lwm2m/lwm2m_xml", "etc/lwm2m_xml"}
     , {template, "bin/emqx.cmd", "bin/emqx.cmd"}
     , {template, "bin/emqx_ctl.cmd", "bin/emqx_ctl.cmd"}
     , {copy, "bin/nodetool", "bin/nodetool"}