Jelajahi Sumber

Merge pull request #5110 from Rory-Z/merge-4.3-to-5.0

turtleDeng 4 tahun lalu
induk
melakukan
d56c70bbea
43 mengubah file dengan 3297 tambahan dan 158 penghapusan
  1. 1 1
      .ci/fvt_tests/http_server/rebar.config
  2. 9 1
      .ci/fvt_tests/relup.lux
  3. 6 3
      .github/workflows/build_packages.yaml
  4. 10 0
      .github/workflows/build_slim_packages.yaml
  5. 2 2
      apps/emqx/etc/emqx.conf
  6. 2 0
      apps/emqx/include/emqx_mqtt.hrl
  7. 52 10
      apps/emqx/src/emqx.appup.src
  8. 3 0
      apps/emqx/src/emqx_cm.erl
  9. 8 5
      apps/emqx/src/emqx_shared_sub.erl
  10. 4 1
      apps/emqx/src/emqx_ws_connection.erl
  11. 16 0
      apps/emqx_dashboard/src/emqx_dashboard.appup.src
  12. 1 1
      apps/emqx_exhook/src/emqx_exhook.app.src
  13. 10 2
      apps/emqx_exhook/src/emqx_exhook.appup.src
  14. 8 1
      apps/emqx_exhook/src/emqx_exhook_server.erl
  15. 1 1
      apps/emqx_lwm2m/src/emqx_lwm2m.app.src
  16. 6 8
      apps/emqx_lwm2m/src/emqx_lwm2m.appup.src
  17. 162 0
      apps/emqx_lwm2m/src/emqx_lwm2m_api.erl
  18. 153 0
      apps/emqx_lwm2m/src/emqx_lwm2m_cm.erl
  19. 41 0
      apps/emqx_lwm2m/src/emqx_lwm2m_cm_sup.erl
  20. 1 0
      apps/emqx_lwm2m/src/emqx_lwm2m_cmd_handler.erl
  21. 2 2
      apps/emqx_lwm2m/src/emqx_lwm2m_coap_resource.erl
  22. 4 1
      apps/emqx_lwm2m/src/emqx_lwm2m_message.erl
  23. 49 25
      apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl
  24. 8 1
      apps/emqx_lwm2m/src/emqx_lwm2m_sup.erl
  25. 11 0
      apps/emqx_lwm2m/src/emqx_lwm2m_xml_object.erl
  26. 5 3
      apps/emqx_lwm2m/src/emqx_lwm2m_xml_object_db.erl
  27. 7 4
      apps/emqx_lwm2m/test/emqx_lwm2m_SUITE.erl
  28. 1 1
      apps/emqx_rule_engine/src/emqx_rule_engine.app.src
  29. 21 5
      apps/emqx_rule_engine/src/emqx_rule_engine.appup.src
  30. 6 19
      apps/emqx_rule_engine/src/emqx_rule_registry.erl
  31. 6 0
      apps/emqx_sn/src/emqx_sn.appup.src
  32. 15 31
      apps/emqx_sn/src/emqx_sn_gateway.erl
  33. 0 13
      apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl
  34. 1 1
      apps/emqx_web_hook/src/emqx_web_hook.app.src
  35. 7 5
      apps/emqx_web_hook/src/emqx_web_hook.appup.src
  36. 11 3
      apps/emqx_web_hook/src/emqx_web_hook_actions.erl
  37. 8 5
      apps/emqx_web_hook/src/emqx_web_hook_app.erl
  38. 3 1
      deploy/docker/Dockerfile
  39. 1 1
      deploy/docker/README.md
  40. 2470 0
      priv/emqx.schema
  41. 1 1
      rebar.config
  42. 7 0
      rebar.config.erl
  43. 157 0
      scripts/inject-deps.escript

+ 1 - 1
.ci/fvt_tests/http_server/rebar.config

@@ -1,7 +1,7 @@
 {erl_opts, [debug_info]}.
 {erl_opts, [debug_info]}.
 {deps, 
 {deps, 
  [
  [
-    {minirest, {git, "https://github.com/emqx/minirest.git", {tag, "0.3.5"}}}
+    {minirest, {git, "https://github.com/emqx/minirest.git", {tag, "0.3.6"}}}
  ]}.
  ]}.
 
 
 {shell, [
 {shell, [

+ 9 - 1
.ci/fvt_tests/relup.lux

@@ -43,7 +43,7 @@
     !sed -i '/emqx_telemetry/d' data/loaded_plugins
     !sed -i '/emqx_telemetry/d' data/loaded_plugins
 
 
     !./bin/emqx start
     !./bin/emqx start
-    ?EMQ X (.*) is started successfully!
+    ?EMQ X .* is started successfully!
     ?SH-PROMPT
     ?SH-PROMPT
 
 
     !./bin/emqx_ctl cluster join emqx@127.0.0.1
     !./bin/emqx_ctl cluster join emqx@127.0.0.1
@@ -99,6 +99,10 @@
     """
     """
     ?SH-PROMPT
     ?SH-PROMPT
 
 
+    !./bin/emqx_ctl plugins list | grep emqx_management
+    ?Plugin\(emqx_management.*active=true\)
+    ?SH-PROMPT
+
 [shell emqx2]
 [shell emqx2]
     !echo "" > log/emqx.log.1
     !echo "" > log/emqx.log.1
     ?SH-PROMPT
     ?SH-PROMPT
@@ -120,6 +124,10 @@
     """
     """
     ?SH-PROMPT
     ?SH-PROMPT
 
 
+    !./bin/emqx_ctl plugins list | grep emqx_management
+    ?Plugin\(emqx_management.*active=true\)
+    ?SH-PROMPT
+
 [shell bench]
 [shell bench]
     ???publish complete
     ???publish complete
     ??SH-PROMPT:
     ??SH-PROMPT:

+ 6 - 3
.github/workflows/build_packages.yaml

@@ -83,6 +83,7 @@ jobs:
     - name: build
     - name: build
       env:
       env:
         PYTHON: python
         PYTHON: python
+        DIAGNOSTIC: 1
       run: |
       run: |
         $env:PATH = "${{ steps.install_erlang.outputs.erlpath }}\bin;$env:PATH"
         $env:PATH = "${{ steps.install_erlang.outputs.erlpath }}\bin;$env:PATH"
 
 
@@ -168,9 +169,11 @@ jobs:
     - name: build
     - name: build
       run: |
       run: |
         . $HOME/.kerl/${{ matrix.erl_otp }}/activate
         . $HOME/.kerl/${{ matrix.erl_otp }}/activate
-        make -C source ensure-rebar3
-        sudo cp source/rebar3 /usr/local/bin/rebar3
-        make -C source ${{ matrix.profile }}-zip
+        cd source
+        make ensure-rebar3
+        sudo cp rebar3 /usr/local/bin/rebar3
+        rm -rf _build/${{ matrix.profile }}/lib
+        make ${{ matrix.profile }}-zip
     - name: test
     - name: test
       run: |
       run: |
         cd source
         cd source

+ 10 - 0
.github/workflows/build_slim_packages.yaml

@@ -38,6 +38,11 @@ jobs:
       run: make ${EMQX_NAME}-zip
       run: make ${EMQX_NAME}-zip
     - name: build deb/rpm packages
     - name: build deb/rpm packages
       run: make ${EMQX_NAME}-pkg
       run: make ${EMQX_NAME}-pkg
+    - uses: actions/upload-artifact@v1
+      if: failure()
+      with:
+        name: rebar3.crashdump
+        path: ./rebar3.crashdump
     - name: pakcages test
     - name: pakcages test
       run: |
       run: |
         export CODE_PATH=$GITHUB_WORKSPACE
         export CODE_PATH=$GITHUB_WORKSPACE
@@ -94,6 +99,11 @@ jobs:
         make ensure-rebar3
         make ensure-rebar3
         sudo cp rebar3 /usr/local/bin/rebar3
         sudo cp rebar3 /usr/local/bin/rebar3
         make ${EMQX_NAME}-zip
         make ${EMQX_NAME}-zip
+    - uses: actions/upload-artifact@v1
+      if: failure()
+      with:
+        name: rebar3.crashdump
+        path: ./rebar3.crashdump
     - name: test
     - name: test
       run: |
       run: |
         pkg_name=$(basename _packages/${EMQX_NAME}/emqx-*.zip)
         pkg_name=$(basename _packages/${EMQX_NAME}/emqx-*.zip)

+ 2 - 2
apps/emqx/etc/emqx.conf

@@ -355,8 +355,8 @@ rpc.port_discovery = stateless
 ## Number of outgoing RPC connections.
 ## Number of outgoing RPC connections.
 ##
 ##
 ## Value: Interger [0-256]
 ## Value: Interger [0-256]
-## Defaults to NumberOfCPUSchedulers / 2 when set to 0
-#rpc.tcp_client_num = 0
+## Default = 1
+#rpc.tcp_client_num = 1
 
 
 ## RCP Client connect timeout.
 ## RCP Client connect timeout.
 ##
 ##

+ 2 - 0
apps/emqx/include/emqx_mqtt.hrl

@@ -30,11 +30,13 @@
 %% MQTT Protocol Version and Names
 %% MQTT Protocol Version and Names
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+-define(MQTT_SN_PROTO_V1, 1).
 -define(MQTT_PROTO_V3, 3).
 -define(MQTT_PROTO_V3, 3).
 -define(MQTT_PROTO_V4, 4).
 -define(MQTT_PROTO_V4, 4).
 -define(MQTT_PROTO_V5, 5).
 -define(MQTT_PROTO_V5, 5).
 
 
 -define(PROTOCOL_NAMES, [
 -define(PROTOCOL_NAMES, [
+    {?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in
     {?MQTT_PROTO_V3, <<"MQIsdp">>},
     {?MQTT_PROTO_V3, <<"MQIsdp">>},
     {?MQTT_PROTO_V4, <<"MQTT">>},
     {?MQTT_PROTO_V4, <<"MQTT">>},
     {?MQTT_PROTO_V5, <<"MQTT">>}]).
     {?MQTT_PROTO_V5, <<"MQTT">>}]).

+ 52 - 10
apps/emqx/src/emqx.appup.src

@@ -1,12 +1,31 @@
 %% -*- mode: erlang -*-
 %% -*- mode: erlang -*-
 {VSN,
 {VSN,
-  [{"4.3.2",
-    [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
+  [
+   {"4.3.4",
+    [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}
+    ]},
+   {"4.3.3",
+    [{load_module,emqx_packet,brutal_purge,soft_purge,[]},
+     {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}
+    ]},
+   {"4.3.2",
+    [{load_module,emqx_packet,brutal_purge,soft_purge,[]},
+     {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
      {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]},
-     {load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
+     {load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]}
+    ]},
    {"4.3.1",
    {"4.3.1",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_packet,brutal_purge,soft_purge,[]},
+     {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@@ -18,7 +37,9 @@
      {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
      {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
      {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
      {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
    {"4.3.0",
    {"4.3.0",
-    [{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_packet,brutal_purge,soft_purge,[]},
+     {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_congestion,brutal_purge,soft_purge,[]},
      {load_module,emqx_congestion,brutal_purge,soft_purge,[]},
      {load_module,emqx_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_connection,brutal_purge,soft_purge,[]},
@@ -34,13 +55,32 @@
      {apply,{emqx_metrics,upgrade_retained_delayed_counter_type,[]}},
      {apply,{emqx_metrics,upgrade_retained_delayed_counter_type,[]}},
      {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
      {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
    {<<".*">>,[]}],
    {<<".*">>,[]}],
-  [{"4.3.2",
-    [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
+  [
+   {"4.3.4",
+    [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}
+    ]},
+   {"4.3.3",
+    [{load_module,emqx_packet,brutal_purge,soft_purge,[]},
+     {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}
+    ]},
+   {"4.3.2",
+    [{load_module,emqx_packet,brutal_purge,soft_purge,[]},
+     {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
      {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]},
-     {load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
+     {load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]}
+    ]},
    {"4.3.1",
    {"4.3.1",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_packet,brutal_purge,soft_purge,[]},
+     {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@@ -52,7 +92,9 @@
      {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
      {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
      {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
      {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
    {"4.3.0",
    {"4.3.0",
-    [{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_packet,brutal_purge,soft_purge,[]},
+     {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
+     {load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_congestion,brutal_purge,soft_purge,[]},
      {load_module,emqx_congestion,brutal_purge,soft_purge,[]},

+ 3 - 0
apps/emqx/src/emqx_cm.erl

@@ -294,6 +294,9 @@ do_discard_session(ClientId, Pid) ->
         _ : {noproc, _} -> % emqx_connection: gen_server:call
         _ : {noproc, _} -> % emqx_connection: gen_server:call
             ?tp(debug, "session_already_gone", #{pid => Pid}),
             ?tp(debug, "session_already_gone", #{pid => Pid}),
             ok;
             ok;
+        _ : {'EXIT', {noproc, _}} -> % rpc_call/3
+            ?tp(debug, "session_already_gone", #{pid => Pid}),
+            ok;
         _ : {{shutdown, _}, _} ->
         _ : {{shutdown, _}, _} ->
             ?tp(debug, "session_already_shutdown", #{pid => Pid}),
             ?tp(debug, "session_already_shutdown", #{pid => Pid}),
             ok;
             ok;

+ 8 - 5
apps/emqx/src/emqx_shared_sub.erl

@@ -336,9 +336,13 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
     #emqx_shared_subscription{subpid = SubPid} = NewRecord,
     #emqx_shared_subscription{subpid = SubPid} = NewRecord,
     {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
     {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
 
 
-handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
-    #emqx_shared_subscription{subpid = SubPid} = OldRecord,
-    {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
+%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
+%% it `unsubscribed` the last topic.
+%% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually
+%% be disconnected.
+% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
+%     #emqx_shared_subscription{subpid = SubPid} = OldRecord,
+%     {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
 
 
 handle_info({mnesia_table_event, _Event}, State) ->
 handle_info({mnesia_table_event, _Event}, State) ->
     {noreply, State};
     {noreply, State};
@@ -348,8 +352,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMo
     cleanup_down(SubPid),
     cleanup_down(SubPid),
     {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
     {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
 
 
-handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+handle_info(_Info, State) ->
     {noreply, State}.
     {noreply, State}.
 
 
 terminate(_Reason, _State) ->
 terminate(_Reason, _State) ->

+ 4 - 1
apps/emqx/src/emqx_ws_connection.erl

@@ -403,7 +403,10 @@ websocket_close(Reason, State) ->
 
 
 terminate(Reason, _Req, #state{channel = Channel}) ->
 terminate(Reason, _Req, #state{channel = Channel}) ->
     ?LOG(debug, "Terminated due to ~p", [Reason]),
     ?LOG(debug, "Terminated due to ~p", [Reason]),
-    emqx_channel:terminate(Reason, Channel).
+    emqx_channel:terminate(Reason, Channel);
+
+terminate(_Reason, _Req, _UnExpectedState) ->
+    ok.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle call
 %% Handle call

+ 16 - 0
apps/emqx_dashboard/src/emqx_dashboard.appup.src

@@ -0,0 +1,16 @@
+%% -*- mode: erlang -*-
+{VSN,
+ [ {"4.3.0",
+    %% load all plugins
+    %% NOTE: this depends on the fact that emqx_dashboard is always
+    %% the last application gets upgraded
+    [ {apply, {emqx_plugins, load, []}}
+    ]},
+   {<<".*">>, []}
+ ],
+ [ {"4.3.0",
+    [ {apply, {emqx_plugins, load, []}}
+    ]},
+   {<<".*">>, []}
+ ]
+}.

+ 1 - 1
apps/emqx_exhook/src/emqx_exhook.app.src

@@ -1,6 +1,6 @@
 {application, emqx_exhook,
 {application, emqx_exhook,
  [{description, "EMQ X Extension for Hook"},
  [{description, "EMQ X Extension for Hook"},
-  {vsn, "4.3.1"},
+  {vsn, "4.3.2"},
   {modules, []},
   {modules, []},
   {registered, []},
   {registered, []},
   {mod, {emqx_exhook_app, []}},
   {mod, {emqx_exhook_app, []}},

+ 10 - 2
apps/emqx_exhook/src/emqx_exhook.appup.src

@@ -1,14 +1,22 @@
 %% -*-: erlang -*-
 %% -*-: erlang -*-
 {VSN,
 {VSN,
  [
  [
+    {"4.3.1", [
+      {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
+    ]},
     {"4.3.0", [
     {"4.3.0", [
-      {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
+      {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
+      {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
     ]},
     ]},
     {<<".*">>, []}
     {<<".*">>, []}
  ],
  ],
  [
  [
+    {"4.3.1", [
+      {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
+    ]},
     {"4.3.0", [
     {"4.3.0", [
-      {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
+      {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
+      {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
     ]},
     ]},
     {<<".*">>, []}
     {<<".*">>, []}
  ]
  ]

+ 8 - 1
apps/emqx_exhook/src/emqx_exhook_server.erl

@@ -122,7 +122,7 @@ channel_opts(Opts) ->
     Scheme = proplists:get_value(scheme, Opts),
     Scheme = proplists:get_value(scheme, Opts),
     Host = proplists:get_value(host, Opts),
     Host = proplists:get_value(host, Opts),
     Port = proplists:get_value(port, Opts),
     Port = proplists:get_value(port, Opts),
-    SvrAddr = lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])),
+    SvrAddr = format_http_uri(Scheme, Host, Port),
     ClientOpts = case Scheme of
     ClientOpts = case Scheme of
                      https ->
                      https ->
                          SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
                          SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
@@ -133,6 +133,13 @@ channel_opts(Opts) ->
                  end,
                  end,
     {SvrAddr, ClientOpts}.
     {SvrAddr, ClientOpts}.
 
 
+format_http_uri(Scheme, Host0, Port) ->
+    Host = case is_tuple(Host0) of
+               true -> inet:ntoa(Host0);
+               _ -> Host0
+           end,
+    lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])).
+
 -spec unload(server()) -> ok.
 -spec unload(server()) -> ok.
 unload(#server{name = Name, hookspec = HookSpecs}) ->
 unload(#server{name = Name, hookspec = HookSpecs}) ->
     _ = do_deinit(Name),
     _ = do_deinit(Name),

+ 1 - 1
apps/emqx_lwm2m/src/emqx_lwm2m.app.src

@@ -1,6 +1,6 @@
 {application,emqx_lwm2m,
 {application,emqx_lwm2m,
              [{description,"EMQ X LwM2M Gateway"},
              [{description,"EMQ X LwM2M Gateway"},
-              {vsn, "4.3.1"}, % strict semver, bump manually!
+              {vsn, "4.3.2"}, % strict semver, bump manually!
               {modules,[]},
               {modules,[]},
               {registered,[emqx_lwm2m_sup]},
               {registered,[emqx_lwm2m_sup]},
               {applications,[kernel,stdlib,lwm2m_coap]},
               {applications,[kernel,stdlib,lwm2m_coap]},

+ 6 - 8
apps/emqx_lwm2m/src/emqx_lwm2m.appup.src

@@ -1,15 +1,13 @@
 %% -*-: erlang -*-
 %% -*-: erlang -*-
 {VSN,
 {VSN,
   [
   [
-    {"4.3.0", [
-      {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
-    ]},
-    {<<".*">>, []}
+    {<<"4.3.[0-1]">>, [
+      {restart_application, emqx_lwm2m}
+    ]}
   ],
   ],
   [
   [
-    {"4.3.0", [
-      {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
-    ]},
-   {<<".*">>, []}
+    {<<"4.3.[0-1]">>, [
+      {restart_application, emqx_lwm2m}
+    ]}
   ]
   ]
 }.
 }.

+ 162 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_api.erl

@@ -0,0 +1,162 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_lwm2m_api).
+
+-import(minirest,  [return/1]).
+
+-rest_api(#{name   => list,
+            method => 'GET',
+            path   => "/lwm2m_channels/",
+            func   => list,
+            descr  => "A list of all lwm2m channel"
+           }).
+
+-rest_api(#{name   => list,
+            method => 'GET',
+            path   => "/nodes/:atom:node/lwm2m_channels/",
+            func   => list,
+            descr  => "A list of lwm2m channel of a node"
+           }).
+
+-rest_api(#{name   => lookup_cmd,
+            method => 'GET',
+            path   => "/lookup_cmd/:bin:ep/",
+            func   => lookup_cmd,
+            descr  => "Send a lwm2m downlink command"
+           }).
+
+-rest_api(#{name   => lookup_cmd,
+            method => 'GET',
+            path   => "/nodes/:atom:node/lookup_cmd/:bin:ep/",
+            func   => lookup_cmd,
+            descr  => "Send a lwm2m downlink command of a node"
+           }).
+
+-export([ list/2
+        , lookup_cmd/2
+        ]).
+
+list(#{node := Node }, Params) ->
+    case Node = node() of
+        true -> list(#{}, Params);
+        _ -> rpc_call(Node, list, [#{}, Params])
+    end;
+
+list(#{}, _Params) ->
+    Channels = emqx_lwm2m_cm:all_channels(),
+    return({ok, format(Channels)}).
+
+lookup_cmd(#{ep := Ep, node := Node}, Params) ->
+    case Node = node() of
+        true -> lookup_cmd(#{ep => Ep}, Params);
+        _ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params])
+    end;
+
+lookup_cmd(#{ep := Ep}, Params) ->
+    MsgType = proplists:get_value(<<"msgType">>, Params),
+    Path0 = proplists:get_value(<<"path">>, Params),
+    case emqx_lwm2m_cm:lookup_cmd(Ep, Path0, MsgType) of
+        [] -> return({ok, []});
+        [{_, undefined} | _] -> return({ok, []});
+        [{{IMEI, Path, MsgType}, undefined}] ->
+            return({ok, [{imei, IMEI},
+                         {'msgType', IMEI},
+                         {'code', <<"6.01">>},
+                         {'codeMsg', <<"reply_not_received">>},
+                         {'path', Path}]});
+        [{{IMEI, Path, MsgType}, {Code, CodeMsg, Content}}] ->
+            Payload1 = format_cmd_content(Content, MsgType),
+            return({ok, [{imei, IMEI},
+                         {'msgType', IMEI},
+                         {'code', Code},
+                         {'codeMsg', CodeMsg},
+                         {'path', Path}] ++ Payload1})
+    end.
+
+rpc_call(Node, Fun, Args) ->
+    case rpc:call(Node, ?MODULE, Fun, Args) of
+        {badrpc, Reason} -> {error, Reason};
+        Res -> Res
+    end.
+
+format(Channels) ->
+    lists:map(fun({IMEI, #{lifetime := LifeTime,
+                           peername := Peername,
+                           version := Version,
+                           reg_info := RegInfo}}) ->
+        ObjectList = lists:map(fun(Path) ->
+            [ObjId | _] = path_list(Path),
+            case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
+                {error, _} ->
+                    {Path, Path};
+                ObjDefinition ->
+                    ObjectName = emqx_lwm2m_xml_object:get_object_name(ObjDefinition),
+                    {Path, list_to_binary(ObjectName)}
+            end
+        end, maps:get(<<"objectList">>, RegInfo)),
+        {IpAddr, Port} = Peername,
+        [{imei, IMEI},
+         {lifetime, LifeTime},
+         {ip_address, iolist_to_binary(ntoa(IpAddr))},
+         {port, Port},
+         {version, Version},
+         {'objectList', ObjectList}]
+    end, Channels).
+
+format_cmd_content(undefined, _MsgType) -> [];
+format_cmd_content(Content, <<"discover">>) ->
+    [H | Content1] = Content,
+    {_, [HObjId]} = emqx_lwm2m_coap_resource:parse_object_list(H),
+    [ObjId | _]= path_list(HObjId),
+    ObjectList = case Content1 of
+        [Content2 | _] ->
+            {_, ObjL} = emqx_lwm2m_coap_resource:parse_object_list(Content2),
+            ObjL;
+        [] -> []
+    end,
+    R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
+        {error, _} ->
+            lists:map(fun(Object) -> {Object, Object} end, ObjectList);
+        ObjDefinition ->
+            lists:map(fun(Object) ->
+                [_, _,  ResId| _] = path_list(Object),
+                Operations = case emqx_lwm2m_xml_object:get_resource_operations(binary_to_integer(ResId), ObjDefinition) of
+                    "E" -> [{operations, list_to_binary("E")}];
+                    Oper -> [{'dataType', list_to_binary(emqx_lwm2m_xml_object:get_resource_type(binary_to_integer(ResId), ObjDefinition))},
+                             {operations, list_to_binary(Oper)}]
+                end,
+                [{path, Object},
+                 {name, list_to_binary(emqx_lwm2m_xml_object:get_resource_name(binary_to_integer(ResId), ObjDefinition))}
+                ] ++ Operations
+            end, ObjectList)
+    end,
+    [{content, R}];
+format_cmd_content(Content, _) ->
+    [{content, Content}].
+
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+    inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
+ntoa(IP) ->
+    inet_parse:ntoa(IP).
+
+path_list(Path) ->
+    case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
+        [ObjId, ObjInsId, ResId, ResInstId] -> [ObjId, ObjInsId, ResId, ResInstId];
+        [ObjId, ObjInsId, ResId] -> [ObjId, ObjInsId, ResId];
+        [ObjId, ObjInsId] -> [ObjId, ObjInsId];
+        [ObjId] -> [ObjId]
+    end.

+ 153 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_cm.erl

@@ -0,0 +1,153 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_lwm2m_cm).
+
+-export([start_link/0]).
+
+-export([ register_channel/5
+        , update_reg_info/2
+        , unregister_channel/1
+        ]).
+
+-export([ lookup_channel/1
+        , all_channels/0
+        ]).
+
+-export([ register_cmd/3
+        , register_cmd/4
+        , lookup_cmd/3
+        , lookup_cmd_by_imei/1
+        ]).
+
+%% gen_server callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        , code_change/3
+        ]).
+
+-define(LOG(Level, Format, Args), logger:Level("LWM2M-CM: " ++ Format, Args)).
+
+%% Server name
+-define(CM, ?MODULE).
+
+-define(LWM2M_CHANNEL_TAB, emqx_lwm2m_channel).
+-define(LWM2M_CMD_TAB, emqx_lwm2m_cmd).
+
+%% Batch drain
+-define(BATCH_SIZE, 100000).
+
+%% @doc Start the channel manager.
+start_link() ->
+    gen_server:start_link({local, ?CM}, ?MODULE, [], []).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+register_channel(IMEI, RegInfo, LifeTime, Ver, Peername) ->
+    Info = #{
+        reg_info => RegInfo,
+        lifetime => LifeTime,
+        version => Ver,
+        peername => Peername
+    },
+    true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, Info}),
+    cast({registered, {IMEI, self()}}).
+
+update_reg_info(IMEI, RegInfo) ->
+    case lookup_channel(IMEI) of
+        [{_, RegInfo0}] ->
+            true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, RegInfo0#{reg_info => RegInfo}}),
+            ok;
+        [] ->
+            ok
+    end.
+
+unregister_channel(IMEI) when is_binary(IMEI) ->
+    true = ets:delete(?LWM2M_CHANNEL_TAB, IMEI),
+    ok.
+
+lookup_channel(IMEI) ->
+    ets:lookup(?LWM2M_CHANNEL_TAB, IMEI).
+
+all_channels() ->
+    ets:tab2list(?LWM2M_CHANNEL_TAB).
+
+register_cmd(IMEI, Path, Type) ->
+    true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, undefined}).
+
+register_cmd(_IMEI, undefined, _Type, _Result) ->
+    ok;
+register_cmd(IMEI, Path, Type, Result) ->
+    true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, Result}).
+
+lookup_cmd(IMEI, Path, Type) ->
+    ets:lookup(?LWM2M_CMD_TAB, {IMEI, Path, Type}).
+
+lookup_cmd_by_imei(IMEI) ->
+    ets:select(?LWM2M_CHANNEL_TAB, [{{{IMEI, '_', '_'}, '$1'}, [], ['$_']}]).
+
+%% @private
+cast(Msg) -> gen_server:cast(?CM, Msg).
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    TabOpts = [public, {write_concurrency, true}, {read_concurrency, true}],
+    ok = emqx_tables:new(?LWM2M_CHANNEL_TAB, [set, compressed | TabOpts]),
+    ok = emqx_tables:new(?LWM2M_CMD_TAB, [set, compressed | TabOpts]),
+    {ok, #{chan_pmon => emqx_pmon:new()}}.
+
+handle_call(Req, _From, State) ->
+    ?LOG(error, "Unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+handle_cast({registered, {IMEI, ChanPid}}, State = #{chan_pmon := PMon}) ->
+    PMon1 = emqx_pmon:monitor(ChanPid, IMEI, PMon),
+    {noreply, State#{chan_pmon := PMon1}};
+
+handle_cast(Msg, State) ->
+    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    {noreply, State}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
+    ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
+    {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
+    ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
+    {noreply, State#{chan_pmon := PMon1}};
+
+handle_info(Info, State) ->
+    ?LOG(error, "Unexpected info: ~p", [Info]),
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    emqx_stats:cancel_update(chan_stats).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+clean_down({_ChanPid, IMEI}) ->
+    unregister_channel(IMEI).

+ 41 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_cm_sup.erl

@@ -0,0 +1,41 @@
+
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_lwm2m_cm_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    CM = #{id => emqx_lwm2m_cm,
+           start => {emqx_lwm2m_cm, start_link, []},
+           restart => permanent,
+           shutdown => 5000,
+           type => worker,
+           modules => [emqx_lwm2m_cm]},
+    SupFlags = #{strategy => one_for_one,
+                 intensity => 100,
+                 period => 10
+                },
+    {ok, {SupFlags, [CM]}}.
+

+ 1 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_cmd_handler.erl

@@ -23,6 +23,7 @@
 -export([ mqtt2coap/2
 -export([ mqtt2coap/2
         , coap2mqtt/4
         , coap2mqtt/4
         , ack2mqtt/1
         , ack2mqtt/1
+        , extract_path/1
         ]).
         ]).
 
 
 -export([path_list/1]).
 -export([path_list/1]).

+ 2 - 2
apps/emqx_lwm2m/src/emqx_lwm2m_coap_resource.erl

@@ -48,11 +48,11 @@
 -define(LOG(Level, Format, Args), logger:Level("LWM2M-RESOURCE: " ++ Format, Args)).
 -define(LOG(Level, Format, Args), logger:Level("LWM2M-RESOURCE: " ++ Format, Args)).
 
 
 -dialyzer([{nowarn_function, [coap_discover/2]}]).
 -dialyzer([{nowarn_function, [coap_discover/2]}]).
-% we use {'absolute', string(), [{atom(), binary()}]} as coap_uri()
+% we use {'absolute', list(binary()), [{atom(), binary()}]} as coap_uri()
 % https://github.com/emqx/lwm2m-coap/blob/258e9bd3762124395e83c1e68a1583b84718230f/src/lwm2m_coap_resource.erl#L61
 % https://github.com/emqx/lwm2m-coap/blob/258e9bd3762124395e83c1e68a1583b84718230f/src/lwm2m_coap_resource.erl#L61
 % resource operations
 % resource operations
 coap_discover(_Prefix, _Args) ->
 coap_discover(_Prefix, _Args) ->
-    [{absolute, "mqtt", []}].
+    [{absolute, [<<"mqtt">>], []}].
 
 
 coap_get(ChId, [?PREFIX], Query, Content, Lwm2mState) ->
 coap_get(ChId, [?PREFIX], Query, Content, Lwm2mState) ->
     ?LOG(debug, "~p ~p GET Query=~p, Content=~p", [self(),ChId, Query, Content]),
     ?LOG(debug, "~p ~p GET Query=~p, Content=~p", [self(),ChId, Query, Content]),

+ 4 - 1
apps/emqx_lwm2m/src/emqx_lwm2m_message.erl

@@ -197,7 +197,10 @@ value_ex(K, Value) when K =:= <<"Integer">>; K =:= <<"Float">>; K =:= <<"Time">>
 value_ex(K, Value) when K =:= <<"String">> ->
 value_ex(K, Value) when K =:= <<"String">> ->
     Value;
     Value;
 value_ex(K, Value) when K =:= <<"Opaque">> ->
 value_ex(K, Value) when K =:= <<"Opaque">> ->
-    Value;
+    %% XXX: force to decode it with base64
+    %%      This may not be a good implementation, but it is
+    %%      consistent with the treatment of Opaque in value/3
+    base64:decode(Value);
 value_ex(K, <<"true">>) when K =:= <<"Boolean">> -> <<1>>;
 value_ex(K, <<"true">>) when K =:= <<"Boolean">> -> <<1>>;
 value_ex(K, <<"false">>) when K =:= <<"Boolean">> -> <<0>>;
 value_ex(K, <<"false">>) when K =:= <<"Boolean">> -> <<0>>;
 
 

+ 49 - 25
apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl

@@ -103,6 +103,7 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
                 emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
                 emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
             end),
             end),
             emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
             emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
+	    emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
 
 
             {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
             {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
         {error, Error} ->
         {error, Error} ->
@@ -120,10 +121,8 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
     _ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
     _ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
     Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
     Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
 
 
-update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
-        life_timer = LifeTimer, register_info = RegInfo,
-        coap_pid = CoapPid}) ->
-
+update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
+                                                    coap_pid = CoapPid, endpoint_name = Epn}) ->
     UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
     UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
 
 
     _ = case proplists:get_value(update_msg_publish_condition,
     _ = case proplists:get_value(update_msg_publish_condition,
@@ -134,6 +133,7 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
             %% - report the registration info update, but only when objectList is updated.
             %% - report the registration info update, but only when objectList is updated.
             case NewRegInfo of
             case NewRegInfo of
                 #{<<"objectList">> := _} ->
                 #{<<"objectList">> := _} ->
+		    emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
                     send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
                     send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
                 _ -> ok
                 _ -> ok
             end
             end
@@ -151,7 +151,8 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
                            register_info = UpdatedRegInfo}.
                            register_info = UpdatedRegInfo}.
 
 
 replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
 replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
-                                                     coap_pid = CoapPid}) ->
+                                                     coap_pid = CoapPid,
+                                                     endpoint_name = EndpointName}) ->
     _ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
     _ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
 
 
     %% - flush cached donwlink commands
     %% - flush cached donwlink commands
@@ -161,7 +162,7 @@ replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
     UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
     UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
                             maps:get(<<"lt">>, NewRegInfo), LifeTimer),
                             maps:get(<<"lt">>, NewRegInfo), LifeTimer),
 
 
-    _ = send_auto_observe(CoapPid, NewRegInfo),
+    _ = send_auto_observe(CoapPid, NewRegInfo, EndpointName),
 
 
     ?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
     ?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
     Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
     Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
@@ -174,15 +175,20 @@ send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
     Lwm2mState.
     Lwm2mState.
 
 
 auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
 auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
-                                       coap_pid = CoapPid}) ->
-    _ = send_auto_observe(CoapPid, RegInfo),
+                                       coap_pid = CoapPid,
+                                       endpoint_name = EndpointName}) ->
+    _ = send_auto_observe(CoapPid, RegInfo, EndpointName),
     Lwm2mState.
     Lwm2mState.
 
 
-deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) ->
+deliver(#message{topic = Topic, payload = Payload},
+        Lwm2mState = #lwm2m_state{coap_pid = CoapPid,
+                                  register_info = RegInfo,
+                                  started_at = StartedAt,
+                                  endpoint_name = EndpointName}) ->
     IsCacheMode = is_cache_mode(RegInfo, StartedAt),
     IsCacheMode = is_cache_mode(RegInfo, StartedAt),
     ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
     ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
     AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
     AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
-    deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode),
+    deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
     Lwm2mState.
     Lwm2mState.
 
 
 get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
 get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
@@ -238,20 +244,21 @@ time_now() -> erlang:system_time(millisecond).
 %% Deliver downlink message to coap
 %% Deliver downlink message to coap
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode) when is_binary(JsonData)->
+deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
     try
     try
         TermData = emqx_json:decode(JsonData, [return_maps]),
         TermData = emqx_json:decode(JsonData, [return_maps]),
-        deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode)
+        deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
     catch
     catch
         C:R:Stack ->
         C:R:Stack ->
             ?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
             ?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
                 [JsonData, {C, R}, Stack])
                 [JsonData, {C, R}, Stack])
     end;
     end;
 
 
-deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermData) ->
+deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when is_map(TermData) ->
     ?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
     ?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
     {CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
     {CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
-
+    MsgType = maps:get(<<"msgType">>, Ref),
+    emqx_lwm2m_cm:register_cmd(EndpointName, emqx_lwm2m_cmd_handler:extract_path(Ref), MsgType),
     case CacheMode of
     case CacheMode of
         false ->
         false ->
             do_deliver_to_coap(CoapPid, CoapRequest, Ref);
             do_deliver_to_coap(CoapPid, CoapRequest, Ref);
@@ -266,7 +273,12 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermDat
 send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
 send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
     do_send_to_broker(EventType, Payload, Lwm2mState).
     do_send_to_broker(EventType, Payload, Lwm2mState).
 
 
-do_send_to_broker(EventType, Payload, Lwm2mState) ->
+do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
+    ReqPath = maps:get(<<"reqPath">>, Data, undefined),
+    Code = maps:get(<<"code">>, Data, undefined),
+    CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
+    Content = maps:get(<<"content">>, Data, undefined),
+    emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
     NewPayload = maps:put(<<"msgType">>, EventType, Payload),
     NewPayload = maps:put(<<"msgType">>, EventType, Payload),
     Topic = uplink_topic(EventType, Lwm2mState),
     Topic = uplink_topic(EventType, Lwm2mState),
     publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
     publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
@@ -281,7 +293,7 @@ auto_observe_object_list(Expected, Registered) ->
     Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
     Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
     lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
     lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
 
 
-send_auto_observe(CoapPid, RegInfo) ->
+send_auto_observe(CoapPid, RegInfo, EndpointName) ->
     %% - auto observe the objects
     %% - auto observe the objects
     case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
     case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
         false ->
         false ->
@@ -292,25 +304,37 @@ send_auto_observe(CoapPid, RegInfo) ->
                             maps:get(<<"objectList">>, RegInfo, [])
                             maps:get(<<"objectList">>, RegInfo, [])
                            ),
                            ),
             AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
             AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
-            auto_observe(AlternatePath, Objectlists, CoapPid)
+            auto_observe(AlternatePath, Objectlists, CoapPid, EndpointName)
     end.
     end.
 
 
-auto_observe(AlternatePath, ObjectList, CoapPid) ->
+auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
     ?LOG(info, "Auto Observe on: ~p", [ObjectList]),
     ?LOG(info, "Auto Observe on: ~p", [ObjectList]),
     erlang:spawn(fun() ->
     erlang:spawn(fun() ->
-            observe_object_list(AlternatePath, ObjectList, CoapPid)
+            observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName)
         end).
         end).
 
 
-observe_object_list(AlternatePath, ObjectList, CoapPid) ->
+observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
     lists:foreach(fun(ObjectPath) ->
     lists:foreach(fun(ObjectPath) ->
-        observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100)
+        [ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
+        case ObjId of
+            <<"19">> ->
+                [ObjInsId | _LastPath1] = LastPath,
+                case ObjInsId of
+                    <<"0">> ->
+                        observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
+                    _ ->
+                        observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
+                end;
+            _ ->
+                observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
+        end
     end, ObjectList).
     end, ObjectList).
 
 
-observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval) ->
-    observe_object(AlternatePath, ObjectPath, CoapPid),
+observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval, EndpointName) ->
+    observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName),
     timer:sleep(Interval).
     timer:sleep(Interval).
 
 
-observe_object(AlternatePath, ObjectPath, CoapPid) ->
+observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName) ->
     Payload = #{
     Payload = #{
         <<"msgType">> => <<"observe">>,
         <<"msgType">> => <<"observe">>,
         <<"data">> => #{
         <<"data">> => #{
@@ -318,7 +342,7 @@ observe_object(AlternatePath, ObjectPath, CoapPid) ->
         }
         }
     },
     },
     ?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
     ?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
-    deliver_to_coap(AlternatePath, Payload, CoapPid, false).
+    deliver_to_coap(AlternatePath, Payload, CoapPid, false, EndpointName).
 
 
 do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
 do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
     erlang:spawn(fun() ->
     erlang:spawn(fun() ->

+ 8 - 1
apps/emqx_lwm2m/src/emqx_lwm2m_sup.erl

@@ -29,4 +29,11 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 
 init(_Args) ->
 init(_Args) ->
-    {ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db)] }}.
+    CmSup = #{id => emqx_lwm2m_cm_sup,
+              start => {emqx_lwm2m_cm_sup, start_link, []},
+              restart => permanent,
+              shutdown => infinity,
+              type => supervisor,
+              modules => [emqx_lwm2m_cm_sup]
+            },
+    {ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db), CmSup] }}.

+ 11 - 0
apps/emqx_lwm2m/src/emqx_lwm2m_xml_object.erl

@@ -21,9 +21,11 @@
 
 
 -export([ get_obj_def/2
 -export([ get_obj_def/2
         , get_object_id/1
         , get_object_id/1
+        , get_object_name/1
         , get_object_and_resource_id/2
         , get_object_and_resource_id/2
         , get_resource_type/2
         , get_resource_type/2
         , get_resource_name/2
         , get_resource_name/2
+        , get_resource_operations/2
         ]).
         ]).
 
 
 -define(LOG(Level, Format, Args),
 -define(LOG(Level, Format, Args),
@@ -42,6 +44,10 @@ get_object_id(ObjDefinition) ->
     [#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
     [#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
     ObjectId.
     ObjectId.
 
 
+get_object_name(ObjDefinition) ->
+    [#xmlText{value=ObjectName}] = xmerl_xpath:string("Name/text()", ObjDefinition),
+    ObjectName.
+
 
 
 get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
 get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
     ResourceNameString = binary_to_list(ResourceNameBinary),
     ResourceNameString = binary_to_list(ResourceNameBinary),
@@ -60,3 +66,8 @@ get_resource_name(ResourceIdInt, ObjDefinition) ->
     ResourceIdString = integer_to_list(ResourceIdInt),
     ResourceIdString = integer_to_list(ResourceIdInt),
     [#xmlText{value=Name}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Name/text()", ObjDefinition),
     [#xmlText{value=Name}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Name/text()", ObjDefinition),
     Name.
     Name.
+
+get_resource_operations(ResourceIdInt, ObjDefinition) ->
+    ResourceIdString = integer_to_list(ResourceIdInt),
+    [#xmlText{value=Operations}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Operations/text()", ObjDefinition),
+    Operations.

+ 5 - 3
apps/emqx_lwm2m/src/emqx_lwm2m_xml_object_db.erl

@@ -58,7 +58,7 @@ find_objectid(ObjectId) ->
                         false -> ObjectId
                         false -> ObjectId
                     end,
                     end,
     case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
     case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
-        [] -> error(no_xml_definition);
+        [] -> {error, no_xml_definition};
         [{ObjectId, Xml}] -> Xml
         [{ObjectId, Xml}] -> Xml
     end.
     end.
 
 
@@ -121,8 +121,10 @@ load(BaseDir) ->
                true  -> BaseDir++"*.xml";
                true  -> BaseDir++"*.xml";
                false -> BaseDir++"/*.xml"
                false -> BaseDir++"/*.xml"
            end,
            end,
-    AllXmlFiles = filelib:wildcard(Wild),
-    load_loop(AllXmlFiles).
+    case filelib:wildcard(Wild) of
+        [] -> error(no_xml_files_found, BaseDir);
+        AllXmlFiles -> load_loop(AllXmlFiles)
+    end.
 
 
 load_loop([]) ->
 load_loop([]) ->
     ok;
     ok;

+ 7 - 4
apps/emqx_lwm2m/test/emqx_lwm2m_SUITE.erl

@@ -40,6 +40,7 @@ all() ->
     , {group, test_grp_4_discover}
     , {group, test_grp_4_discover}
     , {group, test_grp_5_write_attr}
     , {group, test_grp_5_write_attr}
     , {group, test_grp_6_observe}
     , {group, test_grp_6_observe}
+    , {group, test_grp_8_object_19}
     ].
     ].
 
 
 suite() -> [{timetrap, {seconds, 90}}].
 suite() -> [{timetrap, {seconds, 90}}].
@@ -98,9 +99,9 @@ groups() ->
         ]},
         ]},
         {test_grp_8_object_19, [RepeatOpt], [
         {test_grp_8_object_19, [RepeatOpt], [
             case80_specail_object_19_1_0_write,
             case80_specail_object_19_1_0_write,
-            case80_specail_object_19_0_0_notify,
-            case80_specail_object_19_0_0_response,
-            case80_normal_object_19_0_0_read
+            case80_specail_object_19_0_0_notify
+            %case80_specail_object_19_0_0_response,
+            %case80_normal_object_19_0_0_read
         ]},
         ]},
         {test_grp_9_psm_queue_mode, [RepeatOpt], [
         {test_grp_9_psm_queue_mode, [RepeatOpt], [
             case90_psm_mode,
             case90_psm_mode,
@@ -1655,6 +1656,7 @@ case80_specail_object_19_1_0_write(Config) ->
                     <<"value">> => base64:encode(<<12345:32>>)
                     <<"value">> => base64:encode(<<12345:32>>)
                 }
                 }
                },
                },
+
     CommandJson = emqx_json:encode(Command),
     CommandJson = emqx_json:encode(Command),
     test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
     test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
     timer:sleep(50),
     timer:sleep(50),
@@ -1663,7 +1665,7 @@ case80_specail_object_19_1_0_write(Config) ->
     Path2 = get_coap_path(Options2),
     Path2 = get_coap_path(Options2),
     ?assertEqual(put, Method2),
     ?assertEqual(put, Method2),
     ?assertEqual(<<"/19/1/0">>, Path2),
     ?assertEqual(<<"/19/1/0">>, Path2),
-    ?assertEqual(<<12345:32>>, Payload2),
+    ?assertEqual(<<3:2, 0:1, 0:2, 4:3, 0, 12345:32>>, Payload2),
     timer:sleep(50),
     timer:sleep(50),
 
 
     test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, changed}, #coap_content{}, Request2, true),
     test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, changed}, #coap_content{}, Request2, true),
@@ -1672,6 +1674,7 @@ case80_specail_object_19_1_0_write(Config) ->
     ReadResult = emqx_json:encode(#{
     ReadResult = emqx_json:encode(#{
                                 <<"requestID">> => CmdId, <<"cacheID">> => CmdId,
                                 <<"requestID">> => CmdId, <<"cacheID">> => CmdId,
                                 <<"data">> => #{
                                 <<"data">> => #{
+                                    <<"reqPath">> => <<"/19/1/0">>,
                                     <<"code">> => <<"2.04">>,
                                     <<"code">> => <<"2.04">>,
                                     <<"codeMsg">> => <<"changed">>
                                     <<"codeMsg">> => <<"changed">>
                                 },
                                 },

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -1,6 +1,6 @@
 {application, emqx_rule_engine,
 {application, emqx_rule_engine,
  [{description, "EMQ X Rule Engine"},
  [{description, "EMQ X Rule Engine"},
-  {vsn, "4.3.2"}, % strict semver, bump manually!
+  {vsn, "4.3.3"}, % strict semver, bump manually!
   {modules, []},
   {modules, []},
   {registered, [emqx_rule_engine_sup, emqx_rule_registry]},
   {registered, [emqx_rule_engine_sup, emqx_rule_registry]},
   {applications, [kernel,stdlib,rulesql,getopt]},
   {applications, [kernel,stdlib,rulesql,getopt]},

+ 21 - 5
apps/emqx_rule_engine/src/emqx_rule_engine.appup.src

@@ -1,21 +1,37 @@
 %% -*-: erlang -*-
 %% -*-: erlang -*-
-{"4.3.2",
+{"4.3.3",
  [ {"4.3.0",
  [ {"4.3.0",
-    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
-      {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
+    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
+    , {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
+    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
+    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
     ]},
     ]},
    {"4.3.1",
    {"4.3.1",
     [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
     [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
+    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
+    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
+    ]},
+   {"4.3.2",
+    [ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
+    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
     ]},
     ]},
    {<<".*">>, []}
    {<<".*">>, []}
  ],
  ],
  [
  [
    {"4.3.0",
    {"4.3.0",
-    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
-      {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
+    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
+    , {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
+    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
+    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
     ]},
     ]},
    {"4.3.1",
    {"4.3.1",
     [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
     [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
+    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
+    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
+    ]},
+   {"4.3.2",
+    [ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
+    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
     ]},
     ]},
    {<<".*">>, []}
    {<<".*">>, []}
  ]
  ]

+ 6 - 19
apps/emqx_rule_engine/src/emqx_rule_registry.erl

@@ -93,13 +93,6 @@
 
 
 -define(REGISTRY, ?MODULE).
 -define(REGISTRY, ?MODULE).
 
 
-%% Statistics
--define(STATS,
-        [ {?RULE_TAB, 'rules.count', 'rules.max'}
-        , {?ACTION_TAB, 'actions.count', 'actions.max'}
-        , {?RES_TAB, 'resources.count', 'resources.max'}
-        ]).
-
 -define(T_CALL, 10000).
 -define(T_CALL, 10000).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -392,8 +385,11 @@ find_rules_depends_on_resource(ResId) ->
     end, [], get_rules()).
     end, [], get_rules()).
 
 
 search_action_despends_on_resource(ResId, Actions) ->
 search_action_despends_on_resource(ResId, Actions) ->
-    lists:search(fun(#action_instance{args = #{<<"$resource">> := ResId0}}) ->
-        ResId0 =:= ResId
+    lists:search(fun
+        (#action_instance{args = #{<<"$resource">> := ResId0}}) ->
+            ResId0 =:= ResId;
+        (_) ->
+            false
     end, Actions).
     end, Actions).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -439,8 +435,6 @@ delete_resource_type(Type) ->
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
 init([]) ->
 init([]) ->
-    %% Enable stats timer
-    ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0),
     _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
     _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
                                  {read_concurrency, true}]),
                                  {read_concurrency, true}]),
     {ok, #{}}.
     {ok, #{}}.
@@ -466,7 +460,7 @@ handle_info(Info, State) ->
     {noreply, State}.
     {noreply, State}.
 
 
 terminate(_Reason, _State) ->
 terminate(_Reason, _State) ->
-    emqx_stats:cancel_update(rule_registery_stats).
+    ok.
 
 
 code_change(_OldVsn, State, _Extra) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.
@@ -475,13 +469,6 @@ code_change(_OldVsn, State, _Extra) ->
 %% Private functions
 %% Private functions
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
-update_stats() ->
-    lists:foreach(
-      fun({Tab, Stat, MaxStat}) ->
-              Size = mnesia:table_info(Tab, size),
-              emqx_stats:setstat(Stat, MaxStat, Size)
-      end, ?STATS).
-
 get_all_records(Tab) ->
 get_all_records(Tab) ->
     %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
     %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
     ets:tab2list(Tab).
     ets:tab2list(Tab).

+ 6 - 0
apps/emqx_sn/src/emqx_sn.appup.src

@@ -1,11 +1,17 @@
 %% -*-: erlang -*-
 %% -*-: erlang -*-
 {VSN,
 {VSN,
  [
  [
+   {"4.3.2", [
+     {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
+   ]},
    {<<"4.3.[0-1]">>, [
    {<<"4.3.[0-1]">>, [
      {restart_application, emqx_sn}
      {restart_application, emqx_sn}
    ]}
    ]}
  ],
  ],
  [
  [
+   {"4.3.2", [
+     {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
+   ]},
    {<<"4.3.[0-1]">>, [
    {<<"4.3.[0-1]">>, [
      {restart_application, emqx_sn}
      {restart_application, emqx_sn}
    ]}
    ]}

+ 15 - 31
apps/emqx_sn/src/emqx_sn_gateway.erl

@@ -250,8 +250,9 @@ wait_for_will_topic(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State)
     % ignore
     % ignore
     keep_state_and_data;
     keep_state_and_data;
 
 
-wait_for_will_topic(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
-    do_2nd_connect(Flags, Duration, ClientId, State);
+wait_for_will_topic(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
+    ?LOG(warning, "Receive connect packet in wait_for_will_topic state", []),
+    keep_state_and_data;
 
 
 wait_for_will_topic(cast, {outgoing, Packet}, State) ->
 wait_for_will_topic(cast, {outgoing, Packet}, State) ->
     {keep_state, handle_outgoing(Packet, State)};
     {keep_state, handle_outgoing(Packet, State)};
@@ -275,9 +276,9 @@ wait_for_will_msg(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State) -
     % ignore
     % ignore
     keep_state_and_data;
     keep_state_and_data;
 
 
-%% XXX: ?? Why we will handling the 2nd CONNECT packet ??
-wait_for_will_msg(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
-    do_2nd_connect(Flags, Duration, ClientId, State);
+wait_for_will_msg(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
+    ?LOG(warning, "Receive connect packet in wait_for_will_msg state", []),
+    keep_state_and_data;
 
 
 wait_for_will_msg(cast, {outgoing, Packet}, State) ->
 wait_for_will_msg(cast, {outgoing, Packet}, State) ->
     {keep_state, handle_outgoing(Packet, State)};
     {keep_state, handle_outgoing(Packet, State)};
@@ -365,8 +366,9 @@ connected(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, State) ->
     % ignore
     % ignore
     {keep_state, State};
     {keep_state, State};
 
 
-connected(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
-    do_2nd_connect(Flags, Duration, ClientId, State);
+connected(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
+    ?LOG(warning, "Receive connect packet in wait_for_will_topic state", []),
+    keep_state_and_data;
 
 
 connected(cast, {outgoing, Packet}, State) ->
 connected(cast, {outgoing, Packet}, State) ->
     {keep_state, handle_outgoing(Packet, State)};
     {keep_state, handle_outgoing(Packet, State)};
@@ -826,15 +828,17 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
                                    clean_start = CleanStart,
                                    clean_start = CleanStart,
                                    username    = State#state.username,
                                    username    = State#state.username,
                                    password    = State#state.password,
                                    password    = State#state.password,
+                                   proto_name  = <<"MQTT-SN">>,
                                    keepalive   = Duration,
                                    keepalive   = Duration,
-                                   properties  = OnlyOneInflight
+                                   properties  = OnlyOneInflight,
+                                   proto_ver   = 1
                                   },
                                   },
     case WillFlag of
     case WillFlag of
         true -> State0 = send_message(?SN_WILLTOPICREQ_MSG(), State),
         true -> State0 = send_message(?SN_WILLTOPICREQ_MSG(), State),
                 NState = State0#state{connpkt  = ConnPkt,
                 NState = State0#state{connpkt  = ConnPkt,
-                                     clientid = ClientId,
-                                     keepalive_interval = Duration
-                                    },
+                                      clientid = ClientId,
+                                      keepalive_interval = Duration
+                                     },
                 {next_state, wait_for_will_topic, NState};
                 {next_state, wait_for_will_topic, NState};
         false ->
         false ->
             NState = State#state{clientid = ClientId,
             NState = State#state{clientid = ClientId,
@@ -843,26 +847,6 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
             handle_incoming(?CONNECT_PACKET(ConnPkt), NState)
             handle_incoming(?CONNECT_PACKET(ConnPkt), NState)
     end.
     end.
 
 
-do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
-                                                         peername = Peername,
-                                                         channel  = Channel}) ->
-    emqx_logger:set_metadata_clientid(ClientId),
-    #mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags,
-    NChannel = case CleanStart of
-                   true ->
-                       emqx_channel:terminate(normal, Channel),
-                       emqx_sn_registry:unregister_topic(ClientId),
-                       emqx_channel:init(#{socktype => udp,
-                                           sockname => Sockname,
-                                           peername => Peername,
-                                           peercert => ?NO_PEERCERT,
-                                           conn_mod => ?MODULE
-                                          }, ?DEFAULT_CHAN_OPTIONS);
-                   false -> Channel
-               end,
-    NState = State#state{channel = NChannel},
-    do_connect(ClientId, CleanStart, Will, Duration, NState).
-
 handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
 handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
                  State=#state{channel = Channel}) ->
                  State=#state{channel = Channel}) ->
     ClientId = emqx_channel:info(clientid, Channel),
     ClientId = emqx_channel:info(clientid, Channel),

+ 0 - 13
apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl

@@ -98,19 +98,6 @@ t_connect(_) ->
     ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
     ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
     gen_udp:close(Socket).
     gen_udp:close(Socket).
 
 
-t_do_2nd_connect(_) ->
-    {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = ?CLIENTID,
-    send_connect_msg(Socket, ClientId),
-    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
-    timer:sleep(100),
-    send_connect_msg(Socket, <<"client_id_other">>),
-    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
-
-    send_disconnect_msg(Socket, undefined),
-    ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
-    gen_udp:close(Socket).
-
 t_subscribe(_) ->
 t_subscribe(_) ->
     Dup = 0,
     Dup = 0,
     QoS = 0,
     QoS = 0,

+ 1 - 1
apps/emqx_web_hook/src/emqx_web_hook.app.src

@@ -1,6 +1,6 @@
 {application, emqx_web_hook,
 {application, emqx_web_hook,
  [{description, "EMQ X WebHook Plugin"},
  [{description, "EMQ X WebHook Plugin"},
-  {vsn, "4.3.1"}, % strict semver, bump manually!
+  {vsn, "4.3.2"}, % strict semver, bump manually!
   {modules, []},
   {modules, []},
   {registered, [emqx_web_hook_sup]},
   {registered, [emqx_web_hook_sup]},
   {applications, [kernel,stdlib,ehttpc]},
   {applications, [kernel,stdlib,ehttpc]},

+ 7 - 5
apps/emqx_web_hook/src/emqx_web_hook.appup.src

@@ -2,14 +2,16 @@
 
 
 {VSN,
 {VSN,
   [
   [
-    {"4.3.0", [
-     {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
-   ]},
+    {<<"4.3.[0-1]">>, [
+     {restart_application, emqx_web_hook},
+     {apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
+    ]},
     {<<".*">>, []}
     {<<".*">>, []}
   ],
   ],
   [
   [
-    {"4.3.0", [
-     {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
+    {<<"4.3.[0-1]">>, [
+     {restart_application, emqx_web_hook},
+     {apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
     ]},
     ]},
     {<<".*">>, []}
     {<<".*">>, []}
   ]
   ]

+ 11 - 3
apps/emqx_web_hook/src/emqx_web_hook_actions.erl

@@ -292,7 +292,7 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
         Headers = headers(maps:get(<<"headers">>, Params, undefined)),
         Headers = headers(maps:get(<<"headers">>, Params, undefined)),
         NHeaders = ensure_content_type_header(Headers, Method),
         NHeaders = ensure_content_type_header(Headers, Method),
         #{method => Method,
         #{method => Method,
-          path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))),
+          path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
           headers => NHeaders,
           headers => NHeaders,
           body => maps:get(<<"body">>, Params, <<>>),
           body => maps:get(<<"body">>, Params, <<>>),
           request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
           request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
@@ -306,8 +306,16 @@ ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =
 ensure_content_type_header(Headers, _Method) ->
 ensure_content_type_header(Headers, _Method) ->
     lists:keydelete("content-type", 1, Headers).
     lists:keydelete("content-type", 1, Headers).
 
 
-path(<<>>) -> <<"/">>;
-path(Path) -> Path.
+merge_path(CommonPath, <<>>) ->
+    CommonPath;
+merge_path(CommonPath, Path0) ->
+    case emqx_http_lib:uri_parse(Path0) of
+        {ok, #{path := Path1, 'query' := Query}} ->
+            Path2 = filename:join(CommonPath, Path1),
+            <<Path2/binary, "?", Query/binary>>;
+        {ok, #{path := Path1}} ->
+            filename:join(CommonPath, Path1)
+    end.
 
 
 method(GET) when GET == <<"GET">>; GET == <<"get">> -> get;
 method(GET) when GET == <<"GET">>; GET == <<"get">> -> get;
 method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
 method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;

+ 8 - 5
apps/emqx_web_hook/src/emqx_web_hook_app.erl

@@ -42,10 +42,9 @@ stop(_State) ->
 translate_env() ->
 translate_env() ->
     {ok, URL} = application:get_env(?APP, url),
     {ok, URL} = application:get_env(?APP, url),
     {ok, #{host := Host,
     {ok, #{host := Host,
-           path := Path0,
            port := Port,
            port := Port,
-           scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
-    Path = path(Path0),
+           scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL),
+    Path = path(URIMap),
     PoolSize = application:get_env(?APP, pool_size, 32),
     PoolSize = application:get_env(?APP, pool_size, 32),
     MoreOpts = case Scheme of
     MoreOpts = case Scheme of
                    http ->
                    http ->
@@ -89,9 +88,13 @@ translate_env() ->
     NHeaders = set_content_type(Headers),
     NHeaders = set_content_type(Headers),
     application:set_env(?APP, headers, NHeaders).
     application:set_env(?APP, headers, NHeaders).
 
 
-path("") ->
+path(#{path := "", 'query' := Query}) ->
+    "?" ++ Query;
+path(#{path := Path, 'query' := Query}) ->
+    Path ++ "?" ++ Query;
+path(#{path := ""}) ->
     "/";
     "/";
-path(Path) ->
+path(#{path := Path}) ->
     Path.
     Path.
 
 
 set_content_type(Headers) ->
 set_content_type(Headers) ->

+ 3 - 1
deploy/docker/Dockerfile

@@ -26,7 +26,9 @@ COPY . /emqx
 ARG PKG_VSN
 ARG PKG_VSN
 ARG EMQX_NAME=emqx
 ARG EMQX_NAME=emqx
 
 
-RUN cd /emqx && make $EMQX_NAME
+RUN cd /emqx \
+    && rm -rf _build/$EMQX_NAME/lib \
+    && make $EMQX_NAME
 
 
 FROM $RUN_FROM
 FROM $RUN_FROM
 
 

+ 1 - 1
deploy/docker/README.md

@@ -83,7 +83,7 @@ These environment variables will ignore for configuration file.
 
 
 The list is incomplete and may changed with [etc/emqx.conf](https://github.com/emqx/emqx/blob/master/etc/emqx.conf) and plugin configuration files. But the mapping rule is similar.
 The list is incomplete and may changed with [etc/emqx.conf](https://github.com/emqx/emqx/blob/master/etc/emqx.conf) and plugin configuration files. But the mapping rule is similar.
 
 
-If set ``EMQX_NAME`` and ``EMQX_HOST``, and unset ``EMQX_NODE__NAME``, ``EMQX_NODE__NAME=$EMQX_NAME@$EMQX_HOST``.
+If set ``EMQX_NAME`` and ``EMQX_HOST``, and unset ``EMQX_NODE_NAME``, ``EMQX_NODE_NAME=$EMQX_NAME@$EMQX_HOST``.
 
 
 For example, set mqtt tcp port to 1883
 For example, set mqtt tcp port to 1883
 
 

File diff ditekan karena terlalu besar
+ 2470 - 0
priv/emqx.schema


+ 1 - 1
rebar.config

@@ -42,7 +42,7 @@
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon
-    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}
+    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.6"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
     , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
     , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}

+ 7 - 0
rebar.config.erl

@@ -127,23 +127,30 @@ prod_compile_opts() ->
 prod_overrides() ->
 prod_overrides() ->
     [{add, [ {erl_opts, [deterministic]}]}].
     [{add, [ {erl_opts, [deterministic]}]}].
 
 
+relup_deps(Profile) ->
+    {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "scripts/inject-deps.escript " ++ atom_to_list(Profile)}]}.
+
 profiles() ->
 profiles() ->
     Vsn = get_vsn(),
     Vsn = get_vsn(),
     [ {'emqx',          [ {erl_opts, prod_compile_opts()}
     [ {'emqx',          [ {erl_opts, prod_compile_opts()}
                         , {relx, relx(Vsn, cloud, bin)}
                         , {relx, relx(Vsn, cloud, bin)}
                         , {overrides, prod_overrides()}
                         , {overrides, prod_overrides()}
+                        , relup_deps('emqx')
                         ]}
                         ]}
     , {'emqx-pkg',      [ {erl_opts, prod_compile_opts()}
     , {'emqx-pkg',      [ {erl_opts, prod_compile_opts()}
                         , {relx, relx(Vsn, cloud, pkg)}
                         , {relx, relx(Vsn, cloud, pkg)}
                         , {overrides, prod_overrides()}
                         , {overrides, prod_overrides()}
+                        , relup_deps('emqx-pkg')
                         ]}
                         ]}
     , {'emqx-edge',     [ {erl_opts, prod_compile_opts()}
     , {'emqx-edge',     [ {erl_opts, prod_compile_opts()}
                         , {relx, relx(Vsn, edge, bin)}
                         , {relx, relx(Vsn, edge, bin)}
                         , {overrides, prod_overrides()}
                         , {overrides, prod_overrides()}
+                        , relup_deps('emqx-edge')
                         ]}
                         ]}
     , {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()}
     , {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()}
                         , {relx, relx(Vsn, edge, pkg)}
                         , {relx, relx(Vsn, edge, pkg)}
                         , {overrides, prod_overrides()}
                         , {overrides, prod_overrides()}
+                        , relup_deps('emqx-edge-pkg')
                         ]}
                         ]}
     , {check,           [ {erl_opts, common_compile_opts()}
     , {check,           [ {erl_opts, common_compile_opts()}
                         ]}
                         ]}

+ 157 - 0
scripts/inject-deps.escript

@@ -0,0 +1,157 @@
+#!/usr/bin/env escript
+
+%% This script injects implicit relup dependencies for emqx applications.
+%%
+%% By 'implicit', it means that it is not feasible to define application
+%% dependencies in .app.src files.
+%%
+%% For instance, during upgrade/downgrade, emqx_dashboard usually requires
+%% a restart after (but not before) all plugins are upgraded (and maybe
+%% restarted), however, the dependencies are not resolvable at build time
+%% when relup is generated.
+%%
+%% This script is to be executed after compile, with the profile given as the
+%% first argument. For each dependency, it modifies the .app file to
+%% have the `relup_deps` list extended to application attributes.
+%%
+%% The `relup_deps` application attribute is then picked up by (EMQ's fork of)
+%% `relx` when top-sorting apps to generate relup instructions
+
+-mode(compile).
+
+usage() ->
+  "Usage: " ++ escript:script_name() ++ " emqx|emqx-edge".
+
+-type app() :: atom().
+-type deps_overlay() :: {re, string()} | app().
+
+%% deps/0 returns the dependency overlays.
+%% {re, Pattern} to match application names using regexp pattern
+-spec deps(string()) -> [{app(), [deps_overlay()]}].
+deps("emqx-edge" ++ _) ->
+  %% special case for edge
+  base_deps() ++ [{{re, ".+"}, [{exclude, App} || App <- edge_excludes()]}];
+deps(_Profile) ->
+  base_deps().
+
+edge_excludes() ->
+    [ emqx_lwm2m
+    , emqx_auth_ldap
+    , emqx_auth_pgsql
+    , emqx_auth_redis
+    , emqx_auth_mongo
+    , emqx_lua_hook
+    , emqx_exhook
+    , emqx_exproto
+    , emqx_prometheus
+    , emqx_psk_file
+    ].
+
+base_deps() ->
+  %% make sure emqx_dashboard depends on all other emqx_xxx apps
+  %% so the appup instructions for emqx_dashboard is always the last
+  %% to be executed
+  [ {emqx_dashboard, [{re, "emqx_.*"}]}
+  , {emqx_management, [{re, "emqx_.*"}, {exclude, emqx_dashboard}]}
+  , {{re, "emqx_.*"}, [emqx]}
+  ].
+
+main([Profile | _]) ->
+  ok = inject(Profile);
+main(_Args) ->
+  io:format(standard_error, "~s", [usage()]),
+  erlang:halt(1).
+
+expand_names({Name, Deps}, AppNames) ->
+  Names = match_pattern(Name, AppNames),
+  [{N, Deps} || N <- Names].
+
+%% merge k-v pairs with v1 ++ v2
+merge([], Acc) -> Acc;
+merge([{K, V0} | Rest], Acc) ->
+  V = case lists:keyfind(K, 1, Acc) of
+        {K, V1} -> V1 ++ V0;
+        false -> V0
+      end,
+  NewAcc = lists:keystore(K, 1, Acc, {K, V}),
+  merge(Rest, NewAcc).
+
+expand_deps([], _AppNames, Acc) -> Acc;
+expand_deps([{exclude, Dep} | Deps], AppNames, Acc) ->
+  Matches = expand_deps([Dep], AppNames, []),
+  expand_deps(Deps, AppNames, Acc -- Matches);
+expand_deps([Dep | Deps], AppNames, Acc) ->
+  NewAcc = add_to_list(Acc, match_pattern(Dep, AppNames)),
+  expand_deps(Deps, AppNames, NewAcc).
+
+inject(Profile) ->
+  LibDir = lib_dir(Profile),
+  AppNames = list_apps(LibDir),
+  Deps0 = lists:flatmap(fun(Dep) -> expand_names(Dep, AppNames) end, deps(Profile)),
+  Deps1 = merge(Deps0, []),
+  Deps2 = lists:map(fun({Name, DepsX}) ->
+                        NewDeps = expand_deps(DepsX, AppNames, []),
+                        {Name, NewDeps}
+                    end, Deps1),
+  lists:foreach(fun({App, Deps}) -> inject(App, Deps, LibDir) end, Deps2).
+
+%% list the profile/lib dir to get all apps
+list_apps(LibDir) ->
+  Apps = filelib:wildcard("*", LibDir),
+  lists:foldl(fun(App, Acc) -> [App || is_app(LibDir, App)] ++ Acc end, [], Apps).
+
+is_app(_LibDir, "." ++ _) -> false; %% ignore hidden dir
+is_app(LibDir, AppName) ->
+  Path = filename:join([ebin_dir(LibDir, AppName), AppName ++ ".app"]),
+  filelib:is_regular(Path) orelse error({unknown_app, AppName, Path}). %% wtf
+
+lib_dir(Profile) ->
+  filename:join(["_build", Profile, lib]).
+
+ebin_dir(LibDir, AppName) -> filename:join([LibDir, AppName, "ebin"]).
+
+inject(App0, DepsToAdd, LibDir) ->
+  App = str(App0),
+  AppEbinDir = ebin_dir(LibDir, App),
+  [AppFile0] = filelib:wildcard("*.app", AppEbinDir),
+  AppFile = filename:join(AppEbinDir, AppFile0),
+  {ok, [{application, AppName, Props}]} = file:consult(AppFile),
+  Deps0 = case lists:keyfind(relup_deps, 1, Props) of
+              {_, X} -> X;
+              false -> []
+          end,
+  %% merge extra deps, but do not self-include
+  Deps = add_to_list(Deps0, DepsToAdd) -- [App0],
+  case Deps =:= [] of
+    true -> ok;
+    _ ->
+      NewProps = lists:keystore(relup_deps, 1, Props, {relup_deps, Deps}),
+      AppSpec = {application, AppName, NewProps},
+      AppSpecIoData = io_lib:format("~p.", [AppSpec]),
+      io:format(user, "updated_relup_deps for ~p~n", [App]),
+      file:write_file(AppFile, AppSpecIoData)
+  end.
+
+str(A) when is_atom(A) -> atom_to_list(A).
+
+match_pattern({re, Re}, AppNames) ->
+  Match = fun(AppName) -> re:run(AppName, Re) =/= nomatch end,
+  AppNamesToAdd = lists:filter(Match, AppNames),
+  AppsToAdd = lists:map(fun(N) -> list_to_atom(N) end, AppNamesToAdd),
+  case AppsToAdd =:= [] of
+    true  -> error({nomatch, Re});
+    false -> AppsToAdd
+  end;
+match_pattern(NameAtom, AppNames) ->
+  case lists:member(str(NameAtom), AppNames) of
+    true  -> [NameAtom];
+    false -> error({notfound, NameAtom})
+  end.
+
+%% Append elements to list without duplication. No reordering.
+add_to_list(List, []) -> List;
+add_to_list(List, [H | T]) ->
+  case lists:member(H, List) of
+    true -> add_to_list(List, T);
+    false -> add_to_list(List ++ [H], T)
+  end.