Переглянути джерело

feat(lwm2m): add emqx_lwm2m http API

Turtle 4 роки тому
батько
коміт
ecec9bd2f6

+ 6 - 16
apps/emqx_lwm2m/src/emqx_lwm2m.appup.src

@@ -1,23 +1,13 @@
 %% -*-: erlang -*-
 {VSN,
   [
-    {"4.3.1", [
-      {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
-    ]},
-    {"4.3.0", [
-      {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []},
-      {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
-    ]},
-    {<<".*">>, []}
+    {<<"4.3.[0-1]">>, [
+      {restart_application, emqx_lwm2m}
+    ]}
   ],
   [
-    {"4.3.1", [
-      {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
-    ]},
-    {"4.3.0", [
-      {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []},
-      {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
-    ]},
-   {<<".*">>, []}
+    {<<"4.3.[0-1]">>, [
+      {restart_application, emqx_lwm2m}
+    ]}
   ]
 }.

+ 162 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_api.erl

@@ -0,0 +1,162 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_lwm2m_api).
+
+-import(minirest,  [return/1]).
+
+-rest_api(#{name   => list,
+            method => 'GET',
+            path   => "/lwm2m_channels/",
+            func   => list,
+            descr  => "A list of all lwm2m channel"
+           }).
+
+-rest_api(#{name   => list,
+            method => 'GET',
+            path   => "/nodes/:atom:node/lwm2m_channels/",
+            func   => list,
+            descr  => "A list of lwm2m channel of a node"
+           }).
+
+-rest_api(#{name   => lookup_cmd,
+            method => 'GET',
+            path   => "/lookup_cmd/:bin:ep/",
+            func   => lookup_cmd,
+            descr  => "Send a lwm2m downlink command"
+           }).
+
+-rest_api(#{name   => lookup_cmd,
+            method => 'GET',
+            path   => "/nodes/:atom:node/lookup_cmd/:bin:ep/",
+            func   => lookup_cmd,
+            descr  => "Send a lwm2m downlink command of a node"
+           }).
+
+-export([ list/2
+        , lookup_cmd/2
+        ]).
+
+list(#{node := Node }, Params) ->
+    case Node = node() of
+        true -> list(#{}, Params);
+        _ -> rpc_call(Node, list, [#{}, Params])
+    end;
+
+list(#{}, _Params) ->
+    Channels = emqx_lwm2m_cm:all_channels(),
+    return({ok, format(Channels)}).
+
+lookup_cmd(#{ep := Ep, node := Node}, Params) ->
+    case Node = node() of
+        true -> lookup_cmd(#{ep => Ep}, Params);
+        _ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params])
+    end;
+
+lookup_cmd(#{ep := Ep}, Params) ->
+    MsgType = proplists:get_value(<<"msgType">>, Params),
+    Path0 = proplists:get_value(<<"path">>, Params),
+    case emqx_lwm2m_cm:lookup_cmd(Ep, Path0, MsgType) of
+        [] -> return({ok, []});
+        [{_, undefined} | _] -> return({ok, []});
+        [{{IMEI, Path, MsgType}, undefined}] ->
+            return({ok, [{imei, IMEI},
+                         {'msgType', IMEI},
+                         {'code', <<"6.01">>},
+                         {'codeMsg', <<"reply_not_received">>},
+                         {'path', Path}]});
+        [{{IMEI, Path, MsgType}, {Code, CodeMsg, Content}}] ->
+            Payload1 = format_cmd_content(Content, MsgType),
+            return({ok, [{imei, IMEI},
+                         {'msgType', IMEI},
+                         {'code', Code},
+                         {'codeMsg', CodeMsg},
+                         {'path', Path}] ++ Payload1})
+    end.
+
+rpc_call(Node, Fun, Args) ->
+    case rpc:call(Node, ?MODULE, Fun, Args) of
+        {badrpc, Reason} -> {error, Reason};
+        Res -> Res
+    end.
+
+format(Channels) ->
+    lists:map(fun({IMEI, #{lifetime := LifeTime,
+                           peername := Peername,
+                           version := Version,
+                           reg_info := RegInfo}}) ->
+        ObjectList = lists:map(fun(Path) ->
+            [ObjId | _] = path_list(Path),
+            case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
+                {error, _} ->
+                    {Path, Path};
+                ObjDefinition ->
+                    ObjectName = emqx_lwm2m_xml_object:get_object_name(ObjDefinition),
+                    {Path, list_to_binary(ObjectName)}
+            end
+        end, maps:get(<<"objectList">>, RegInfo)),
+        {IpAddr, Port} = Peername,
+        [{imei, IMEI},
+         {lifetime, LifeTime},
+         {ip_address, iolist_to_binary(ntoa(IpAddr))},
+         {port, Port},
+         {version, Version},
+         {'objectList', ObjectList}]
+    end, Channels).
+
+format_cmd_content(undefined, _MsgType) -> [];
+format_cmd_content(Content, <<"discover">>) ->
+    [H | Content1] = Content,
+    {_, [HObjId]} = emqx_lwm2m_coap_resource:parse_object_list(H),
+    [ObjId | _]= path_list(HObjId),
+    ObjectList = case Content1 of
+        [Content2 | _] ->
+            {_, ObjL} = emqx_lwm2m_coap_resource:parse_object_list(Content2),
+            ObjL;
+        [] -> []
+    end,
+    R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
+        {error, _} ->
+            lists:map(fun(Object) -> {Object, Object} end, ObjectList);
+        ObjDefinition ->
+            lists:map(fun(Object) ->
+                [_, _,  ResId| _] = path_list(Object),
+                Operations = case emqx_lwm2m_xml_object:get_resource_operations(binary_to_integer(ResId), ObjDefinition) of
+                    "E" -> [{operations, list_to_binary("E")}];
+                    Oper -> [{'dataType', list_to_binary(emqx_lwm2m_xml_object:get_resource_type(binary_to_integer(ResId), ObjDefinition))},
+                             {operations, list_to_binary(Oper)}]
+                end,
+                [{path, Object},
+                 {name, list_to_binary(emqx_lwm2m_xml_object:get_resource_name(binary_to_integer(ResId), ObjDefinition))}
+                ] ++ Operations
+            end, ObjectList)
+    end,
+    [{content, R}];
+format_cmd_content(Content, _) ->
+    [{content, Content}].
+
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+    inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
+ntoa(IP) ->
+    inet_parse:ntoa(IP).
+
+path_list(Path) ->
+    case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
+        [ObjId, ObjInsId, ResId, ResInstId] -> [ObjId, ObjInsId, ResId, ResInstId];
+        [ObjId, ObjInsId, ResId] -> [ObjId, ObjInsId, ResId];
+        [ObjId, ObjInsId] -> [ObjId, ObjInsId];
+        [ObjId] -> [ObjId]
+    end.

+ 153 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_cm.erl

@@ -0,0 +1,153 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_lwm2m_cm).
+
+-export([start_link/0]).
+
+-export([ register_channel/5
+        , update_reg_info/2
+        , unregister_channel/1
+        ]).
+
+-export([ lookup_channel/1
+        , all_channels/0
+        ]).
+
+-export([ register_cmd/3
+        , register_cmd/4
+        , lookup_cmd/3
+        , lookup_cmd_by_imei/1
+        ]).
+
+%% gen_server callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        , code_change/3
+        ]).
+
+-define(LOG(Level, Format, Args), logger:Level("LWM2M-CM: " ++ Format, Args)).
+
+%% Server name
+-define(CM, ?MODULE).
+
+-define(LWM2M_CHANNEL_TAB, emqx_lwm2m_channel).
+-define(LWM2M_CMD_TAB, emqx_lwm2m_cmd).
+
+%% Batch drain
+-define(BATCH_SIZE, 100000).
+
+%% @doc Start the channel manager.
+start_link() ->
+    gen_server:start_link({local, ?CM}, ?MODULE, [], []).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+register_channel(IMEI, RegInfo, LifeTime, Ver, Peername) ->
+    Info = #{
+        reg_info => RegInfo,
+        lifetime => LifeTime,
+        version => Ver,
+        peername => Peername
+    },
+    true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, Info}),
+    cast({registered, {IMEI, self()}}).
+
+update_reg_info(IMEI, RegInfo) ->
+    case lookup_channel(IMEI) of
+        [{_, RegInfo0}] ->
+            true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, RegInfo0#{reg_info => RegInfo}}),
+            ok;
+        [] ->
+            ok
+    end.
+
+unregister_channel(IMEI) when is_binary(IMEI) ->
+    true = ets:delete(?LWM2M_CHANNEL_TAB, IMEI),
+    ok.
+
+lookup_channel(IMEI) ->
+    ets:lookup(?LWM2M_CHANNEL_TAB, IMEI).
+
+all_channels() ->
+    ets:tab2list(?LWM2M_CHANNEL_TAB).
+
+register_cmd(IMEI, Path, Type) ->
+    true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, undefined}).
+
+register_cmd(_IMEI, undefined, _Type, _Result) ->
+    ok;
+register_cmd(IMEI, Path, Type, Result) ->
+    true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, Result}).
+
+lookup_cmd(IMEI, Path, Type) ->
+    ets:lookup(?LWM2M_CMD_TAB, {IMEI, Path, Type}).
+
+lookup_cmd_by_imei(IMEI) ->
+    ets:select(?LWM2M_CHANNEL_TAB, [{{{IMEI, '_', '_'}, '$1'}, [], ['$_']}]).
+
+%% @private
+cast(Msg) -> gen_server:cast(?CM, Msg).
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    TabOpts = [public, {write_concurrency, true}, {read_concurrency, true}],
+    ok = emqx_tables:new(?LWM2M_CHANNEL_TAB, [set, compressed | TabOpts]),
+    ok = emqx_tables:new(?LWM2M_CMD_TAB, [set, compressed | TabOpts]),
+    {ok, #{chan_pmon => emqx_pmon:new()}}.
+
+handle_call(Req, _From, State) ->
+    ?LOG(error, "Unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+handle_cast({registered, {IMEI, ChanPid}}, State = #{chan_pmon := PMon}) ->
+    PMon1 = emqx_pmon:monitor(ChanPid, IMEI, PMon),
+    {noreply, State#{chan_pmon := PMon1}};
+
+handle_cast(Msg, State) ->
+    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    {noreply, State}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
+    ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
+    {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
+    ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
+    {noreply, State#{chan_pmon := PMon1}};
+
+handle_info(Info, State) ->
+    ?LOG(error, "Unexpected info: ~p", [Info]),
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    emqx_stats:cancel_update(chan_stats).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+clean_down({_ChanPid, IMEI}) ->
+    unregister_channel(IMEI).

+ 41 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_cm_sup.erl

@@ -0,0 +1,41 @@
+
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_lwm2m_cm_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    CM = #{id => emqx_lwm2m_cm,
+           start => {emqx_lwm2m_cm, start_link, []},
+           restart => permanent,
+           shutdown => 5000,
+           type => worker,
+           modules => [emqx_lwm2m_cm]},
+    SupFlags = #{strategy => one_for_one,
+                 intensity => 100,
+                 period => 10
+                },
+    {ok, {SupFlags, [CM]}}.
+

+ 8 - 1
apps/emqx_lwm2m/src/emqx_lwm2m_sup.erl

@@ -29,4 +29,11 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init(_Args) ->
-    {ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db)] }}.
+    CmSup = #{id => emqx_lwm2m_cm_sup,
+              start => {emqx_lwm2m_cm_sup, start_link, []},
+              restart => permanent,
+              shutdown => infinity,
+              type => supervisor,
+              modules => [emqx_lwm2m_cm_sup]
+            },
+    {ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db), CmSup] }}.

+ 11 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_xml_object.erl

@@ -21,9 +21,11 @@
 
 -export([ get_obj_def/2
         , get_object_id/1
+        , get_object_name/1
         , get_object_and_resource_id/2
         , get_resource_type/2
         , get_resource_name/2
+        , get_resource_operations/2
         ]).
 
 -define(LOG(Level, Format, Args),
@@ -42,6 +44,10 @@ get_object_id(ObjDefinition) ->
     [#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
     ObjectId.
 
+get_object_name(ObjDefinition) ->
+    [#xmlText{value=ObjectName}] = xmerl_xpath:string("Name/text()", ObjDefinition),
+    ObjectName.
+
 
 get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
     ResourceNameString = binary_to_list(ResourceNameBinary),
@@ -60,3 +66,8 @@ get_resource_name(ResourceIdInt, ObjDefinition) ->
     ResourceIdString = integer_to_list(ResourceIdInt),
     [#xmlText{value=Name}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Name/text()", ObjDefinition),
     Name.
+
+get_resource_operations(ResourceIdInt, ObjDefinition) ->
+    ResourceIdString = integer_to_list(ResourceIdInt),
+    [#xmlText{value=Operations}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Operations/text()", ObjDefinition),
+    Operations.

+ 5 - 3
apps/emqx_lwm2m/src/emqx_lwm2m_xml_object_db.erl

@@ -58,7 +58,7 @@ find_objectid(ObjectId) ->
                         false -> ObjectId
                     end,
     case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
-        [] -> error(no_xml_definition);
+        [] -> {error, no_xml_definition};
         [{ObjectId, Xml}] -> Xml
     end.
 
@@ -121,8 +121,10 @@ load(BaseDir) ->
                true  -> BaseDir++"*.xml";
                false -> BaseDir++"/*.xml"
            end,
-    AllXmlFiles = filelib:wildcard(Wild),
-    load_loop(AllXmlFiles).
+    case filelib:wildcard(Wild) of
+        [] -> error(no_xml_files_found, BaseDir);
+        AllXmlFiles -> load_loop(AllXmlFiles)
+    end.
 
 load_loop([]) ->
     ok;