Przeglądaj źródła

Merge branch 'master' of github.com:emqx/emqx into merge-master

z8674558 4 lat temu
rodzic
commit
31cbb7aa97
52 zmienionych plików z 732 dodań i 363 usunięć
  1. 3 3
      .ci/build_packages/tests.sh
  2. 13 12
      .ci/fvt_tests/relup.lux
  3. 11 2
      .github/workflows/run_fvt_tests.yaml
  4. 3 0
      .github/workflows/run_test_cases.yaml
  5. 1 1
      .tool-versions
  6. 2 0
      apps/emqx_auth_http/etc/emqx_auth_http.conf
  7. 8 3
      apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl
  8. 3 3
      apps/emqx_coap/test/emqx_coap_SUITE.erl
  9. 3 3
      apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl
  10. 3 3
      apps/emqx_exproto/test/emqx_exproto_SUITE.erl
  11. 1 1
      apps/emqx_management/src/emqx_management.app.src
  12. 6 4
      apps/emqx_management/src/emqx_management.appup.src
  13. 1 7
      apps/emqx_management/src/emqx_mgmt_cli.erl
  14. 1 0
      apps/emqx_management/src/emqx_mgmt_data_backup.erl
  15. 4 2
      apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl
  16. 2 1
      apps/emqx_management/test/emqx_mgmt_SUITE.erl
  17. 32 58
      apps/emqx_management/test/emqx_mgmt_api_SUITE.erl
  18. 4 13
      apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl
  19. 1 1
      apps/emqx_rule_engine/src/emqx_rule_engine.app.src
  20. 11 3
      apps/emqx_rule_engine/src/emqx_rule_engine.appup.src
  21. 18 19
      apps/emqx_rule_engine/src/emqx_rule_engine.erl
  22. 2 0
      apps/emqx_rule_engine/src/emqx_rule_runtime.erl
  23. 1 1
      apps/emqx_sn/src/emqx_sn.app.src
  24. 17 0
      apps/emqx_sn/src/emqx_sn.appup.src
  25. 10 5
      apps/emqx_sn/src/emqx_sn_asleep_timer.erl
  26. 10 8
      apps/emqx_sn/src/emqx_sn_gateway.erl
  27. 5 5
      apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl
  28. 2 0
      bin/emqx.cmd
  29. 3 17
      bin/nodetool
  30. 2 2
      lib-ce/emqx_modules/test/emqx_modules_SUITE.erl
  31. 3 1
      priv/emqx.schema
  32. 1 1
      rebar.config
  33. 1 1
      rebar.config.erl
  34. 0 1
      scripts/apps-version-check.sh
  35. 1 1
      src/emqx.app.src
  36. 39 1
      src/emqx.appup.src
  37. 109 38
      src/emqx_broker_bench.erl
  38. 12 1
      src/emqx_channel.erl
  39. 21 12
      src/emqx_cm.erl
  40. 4 1
      src/emqx_congestion.erl
  41. 63 3
      src/emqx_connection.erl
  42. 54 19
      src/emqx_frame.erl
  43. 9 10
      src/emqx_metrics.erl
  44. 20 8
      src/emqx_node_dump.erl
  45. 68 40
      src/emqx_plugins.erl
  46. 37 22
      src/emqx_trie.erl
  47. 13 0
      test/emqx_cm_SUITE.erl
  48. 52 0
      test/emqx_mqtt_SUITE.erl
  49. 7 11
      test/emqx_plugins_SUITE.erl
  50. 2 11
      test/emqx_plugins_SUITE_data/emqx_mini_plugin/Makefile
  51. 8 2
      test/emqx_plugins_SUITE_data/emqx_mini_plugin/rebar.config
  52. 25 2
      test/emqx_trie_SUITE.erl

+ 3 - 3
.ci/build_packages/tests.sh

@@ -31,7 +31,7 @@ emqx_test(){
                 echo "running ${packagename} start"
                 "${PACKAGE_PATH}"/emqx/bin/emqx start || ( tail "${PACKAGE_PATH}"/emqx/log/emqx.log.1 && exit 1 )
                 IDLE_TIME=0
-                while [ -z "$("${PACKAGE_PATH}"/emqx/bin/emqx_ctl status |grep 'is running'|awk '{print $1}')" ]
+                while ! "${PACKAGE_PATH}"/emqx/bin/emqx_ctl status | grep -qE 'Node\s.*@.*\sis\sstarted'
                 do
                     if [ $IDLE_TIME -gt 10 ]
                     then
@@ -103,7 +103,7 @@ running_test(){
 
     emqx start || ( tail /var/log/emqx/emqx.log.1 && exit 1 )
     IDLE_TIME=0
-    while [ -z "$(emqx_ctl status |grep 'is running'|awk '{print $1}')" ]
+    while ! emqx_ctl status | grep -qE 'Node\s.*@.*\sis\sstarted'
     do
         if [ $IDLE_TIME -gt 10 ]
         then
@@ -121,7 +121,7 @@ running_test(){
     || [ "$(sed -n '/^ID=/p' /etc/os-release | sed -r 's/ID=(.*)/\1/g' | sed 's/"//g')" = debian ] ;then
         service emqx start || ( tail /var/log/emqx/emqx.log.1 && exit 1 )
         IDLE_TIME=0
-        while [ -z "$(emqx_ctl status |grep 'is running'|awk '{print $1}')" ]
+        while ! emqx_ctl status | grep -E 'Node\s.*@.*\sis\sstarted'
         do
             if [ $IDLE_TIME -gt 10 ]
             then

+ 13 - 12
.ci/fvt_tests/relup.lux

@@ -1,3 +1,4 @@
+[config var=PROFILE]
 [config var=PACKAGE_PATH]
 [config var=BENCH_PATH]
 [config var=ONE_MORE_EMQX_PATH]
@@ -21,7 +22,7 @@
 
 [shell emqx]
     !cd $PACKAGE_PATH
-    !unzip -q -o emqx-ubuntu20.04-$(echo $old_vsn | sed  -r 's/[v|e]//g')-amd64.zip
+    !unzip -q -o $PROFILE-ubuntu20.04-$(echo $old_vsn | sed  -r 's/[v|e]//g')-amd64.zip
     ?SH-PROMPT
 
     !cd emqx
@@ -33,8 +34,8 @@
 
 [shell emqx2]
     !cd $PACKAGE_PATH
-    !cp -f $ONE_MORE_EMQX_PATH/one_more_emqx.sh .
-    !./one_more_emqx.sh emqx2
+    !cp -f $ONE_MORE_EMQX_PATH/one_more_$(echo $PROFILE | sed 's/-/_/g').sh .
+    !./one_more_$(echo $PROFILE | sed 's/-/_/g').sh emqx2
     ?SH-PROMPT
     !cd emqx2
 
@@ -75,7 +76,7 @@
     ???sent
 
 [shell emqx]
-    !cp -f ../emqx-ubuntu20.04-$VSN-amd64.zip releases/
+    !cp -f ../$PROFILE-ubuntu20.04-$VSN-amd64.zip releases/
     !./bin/emqx install $VSN
     ?SH-PROMPT
     !./bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]"
@@ -90,7 +91,7 @@
     ?SH-PROMPT
 
 [shell emqx2]
-    !cp -f ../emqx-ubuntu20.04-$VSN-amd64.zip releases/
+    !cp -f ../$PROFILE-ubuntu20.04-$VSN-amd64.zip releases/
     !./bin/emqx install $VSN
     ?SH-PROMPT
     !./bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]"
@@ -111,13 +112,6 @@
     ???{"data":600,"code":0}
     ?SH-PROMPT
 
-[shell http_server]
-    !http_server:stop().
-    ?ok
-    ?>
-    !halt(3).
-    ?SH-PROMPT:
-
 [shell emqx2]
     !cat log/emqx.log.1 |grep -v 691c29ba |tail -n 100
     -error
@@ -142,6 +136,13 @@
     !rm -rf $PACKAGE_PATH/emqx
     ?SH-PROMPT:
 
+[shell http_server]
+    !http_server:stop().
+    ?ok
+    ?>
+    !halt(3).
+    ?SH-PROMPT:
+
 [endloop]
 
 [cleanup]

+ 11 - 2
.github/workflows/run_fvt_tests.yaml

@@ -16,6 +16,10 @@ jobs:
 
         steps:
         - uses: actions/checkout@v1
+        - uses: gleam-lang/setup-erlang@v1.1.2
+          id: install_erlang
+          with:
+            otp-version: 23.2
         - name: prepare
           run: |
             if make emqx-ee --dry-run > /dev/null 2>&1; then
@@ -52,7 +56,7 @@ jobs:
             output=$(docker exec -i node1.emqx.io bash -c "cat data/loaded_plugins" | tail -n1)
             if [ "$expected" != "$output" ]; then
                 exit 1
-            fi    
+            fi
         - name: make paho tests
           run: |
             if ! docker exec -i python /scripts/pytest.sh; then
@@ -66,6 +70,10 @@ jobs:
 
         steps:
         - uses: actions/checkout@v1
+        - uses: gleam-lang/setup-erlang@v1.1.2
+          id: install_erlang
+          with:
+            otp-version: 23.2
         - name: prepare
           run: |
             if make emqx-ee --dry-run > /dev/null 2>&1; then
@@ -252,10 +260,11 @@ jobs:
             set -e -x -u
             if [ -n "$OLD_VSNS" ]; then
                 mkdir -p packages
-                cp emqx/_packages/emqx/*.zip packages
+                cp emqx/_packages/${PROFILE}/*.zip packages
                 cp emqx/_upgrade_base/*.zip packages
                 lux -v \
                 --timeout 600000 \
+                --var PROFILE=$PROFILE \
                 --var PACKAGE_PATH=$(pwd)/packages \
                 --var BENCH_PATH=$(pwd)/emqtt-bench \
                 --var ONE_MORE_EMQX_PATH=$(pwd)/one_more_emqx \

+ 3 - 0
.github/workflows/run_test_cases.yaml

@@ -128,6 +128,9 @@ jobs:
             printenv > .env
             docker exec -i erlang bash -c "make cover"
             docker exec --env-file .env -i erlang bash -c "make coveralls"
+        - name: cat rebar.crashdump
+          if: failure()
+          run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump' fi
         - uses: actions/upload-artifact@v1
           if: failure()
           with:

+ 1 - 1
.tool-versions

@@ -1 +1 @@
-erlang 23.2.7.2-emqx-1
+erlang 24.0.1-emqx-1

+ 2 - 0
apps/emqx_auth_http/etc/emqx_auth_http.conf

@@ -97,6 +97,7 @@ auth.http.acl_req.headers.content-type = "application/x-www-form-urlencoded"
 ## When the request method is POST, the final format is determined by content-type
 ##
 ## Available Variables:
+##  - %A: access (1 - subscribe, 2 - publish)
 ##  - %u: username
 ##  - %c: clientid
 ##  - %a: ipaddress
@@ -105,6 +106,7 @@ auth.http.acl_req.headers.content-type = "application/x-www-form-urlencoded"
 ##  - %p: sockport of server accepted
 ##  - %C: common name of client TLS cert
 ##  - %d: subject of client TLS cert
+##  - %t: topic
 ##
 ## Value: <K1>=<V1>,<K2>=<V2>,...
 auth.http.acl_req.params = "access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m"

+ 8 - 3
apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl

@@ -37,11 +37,11 @@ groups() ->
     ].
 
 init_per_suite(Config) ->
-    emqx_ct_helpers:start_apps([emqx, emqx_auth_jwt], fun set_special_configs/1),
+    emqx_ct_helpers:start_apps([emqx_auth_jwt], fun set_special_configs/1),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_helpers:stop_apps([emqx_auth_jwt, emqx]).
+    emqx_ct_helpers:stop_apps([emqx_auth_jwt]).
 
 set_special_configs(emqx) ->
     application:set_env(emqx, allow_anonymous, false),
@@ -97,6 +97,8 @@ t_check_auth(_) ->
 
 t_check_claims(_) ->
     application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]),
+    application:stop(emqx_auth_jwt), application:start(emqx_auth_jwt),
+
     Plain = #{clientid => <<"client1">>, username => <<"plain">>, zone => external},
     Jwt = sign([{client_id, <<"client1">>},
                 {username, <<"plain">>},
@@ -113,8 +115,9 @@ t_check_claims(_) ->
 
 t_check_claims_clientid(_) ->
     application:set_env(emqx_auth_jwt, verify_claims, [{clientid, <<"%c">>}]),
+    application:stop(emqx_auth_jwt), application:start(emqx_auth_jwt),
     Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external},
-    Jwt = sign([{client_id, <<"client23">>},
+    Jwt = sign([{clientid, <<"client23">>},
                 {username, <<"plain">>},
                 {exp, os:system_time(seconds) + 3}], <<"HS256">>, <<"emqxsecret">>),
     Result0 = emqx_access_control:authenticate(Plain#{password => Jwt}),
@@ -128,6 +131,8 @@ t_check_claims_clientid(_) ->
 
 t_check_claims_username(_) ->
     application:set_env(emqx_auth_jwt, verify_claims, [{username, <<"%u">>}]),
+    application:stop(emqx_auth_jwt), application:start(emqx_auth_jwt),
+
     Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external},
     Jwt = sign([{client_id, <<"client23">>},
                 {username, <<"plain">>},

+ 3 - 3
apps/emqx_coap/test/emqx_coap_SUITE.erl

@@ -28,16 +28,16 @@
 all() -> emqx_ct:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_ct_helpers:start_apps([emqx_coap], fun set_sepecial_cfg/1),
+    emqx_ct_helpers:start_apps([emqx_coap], fun set_special_cfg/1),
     Config.
 
-set_sepecial_cfg(emqx_coap) ->
+set_special_cfg(emqx_coap) ->
     Opts = application:get_env(emqx_coap, dtls_opts,[]),
     Opts2 = [{keyfile, emqx_ct_helpers:deps_path(emqx, "etc/certs/key.pem")},
              {certfile, emqx_ct_helpers:deps_path(emqx, "etc/certs/cert.pem")}],
     application:set_env(emqx_coap, dtls_opts, emqx_misc:merge_opts(Opts, Opts2)),
     application:set_env(emqx_coap, enable_stats, true);
-set_sepecial_cfg(_) ->
+set_special_cfg(_) ->
     ok.
 
 end_per_suite(Config) ->

+ 3 - 3
apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl

@@ -28,12 +28,12 @@
 all() -> emqx_ct:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_ct_helpers:start_apps([emqx_coap], fun set_sepecial_cfg/1),
+    emqx_ct_helpers:start_apps([emqx_coap], fun set_special_cfg/1),
     Config.
 
-set_sepecial_cfg(emqx_coap) ->
+set_special_cfg(emqx_coap) ->
     application:set_env(emqx_coap, enable_stats, true);
-set_sepecial_cfg(_) ->
+set_special_cfg(_) ->
     ok.
 
 end_per_suite(Config) ->

+ 3 - 3
apps/emqx_exproto/test/emqx_exproto_SUITE.erl

@@ -55,7 +55,7 @@ metrics() ->
 init_per_group(GrpName, Cfg) ->
     put(grpname, GrpName),
     Svrs = emqx_exproto_echo_svr:start(),
-    emqx_ct_helpers:start_apps([emqx_exproto], fun set_sepecial_cfg/1),
+    emqx_ct_helpers:start_apps([emqx_exproto], fun set_special_cfg/1),
     emqx_logger:set_log_level(debug),
     [{servers, Svrs}, {listener_type, GrpName} | Cfg].
 
@@ -63,7 +63,7 @@ end_per_group(_, Cfg) ->
     emqx_ct_helpers:stop_apps([emqx_exproto]),
     emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
 
-set_sepecial_cfg(emqx_exproto) ->
+set_special_cfg(emqx_exproto) ->
     LisType = get(grpname),
     Listeners = application:get_env(emqx_exproto, listeners, []),
     SockOpts = socketopts(LisType),
@@ -77,7 +77,7 @@ set_sepecial_cfg(emqx_exproto) ->
     NListeners = [{Proto, LisType, LisOn, UpgradeOpts(Opts)}
                   || {Proto, _Type, LisOn, Opts} <- Listeners],
     application:set_env(emqx_exproto, listeners, NListeners);
-set_sepecial_cfg(emqx) ->
+set_special_cfg(emqx) ->
     application:set_env(emqx, allow_anonymous, true),
     application:set_env(emqx, enable_acl_cache, false),
     ok.

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -1,6 +1,6 @@
 {application, emqx_management,
  [{description, "EMQ X Management API and CLI"},
-  {vsn, "4.3.1"}, % strict semver, bump manually!
+  {vsn, "4.3.2"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_management_sup]},
   {applications, [kernel,stdlib,minirest]},

+ 6 - 4
apps/emqx_management/src/emqx_management.appup.src

@@ -1,12 +1,14 @@
 %% -*-: erlang -*-
-{"4.3.1",
- [ {"4.3.0",
+{"4.3.2",
+ [ {<<"4.3.[0-1]">>,
     [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []}
+    , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []}
     ]}
  ],
  [
-   {"4.3.0",
+   {<<"4.3.[0-1]">>,
     [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []}
+    , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []}
     ]}
  ]
-}.
+}.

+ 1 - 7
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -116,13 +116,7 @@ mgmt(_) ->
 
 status([]) ->
     {InternalStatus, _ProvidedStatus} = init:get_status(),
-    emqx_ctl:print("Node ~p ~s is ~p~n", [node(), emqx_app:get_release(), InternalStatus]),
-    case lists:keysearch(?APP, 1, application:which_applications()) of
-        false ->
-            emqx_ctl:print("Application ~s is not running~n", [?APP]);
-        {value, {?APP, _Desc, Vsn}} ->
-            emqx_ctl:print("Application ~s ~s is running~n", [?APP, Vsn])
-    end;
+    emqx_ctl:print("Node ~p ~s is ~p~n", [node(), emqx_app:get_release(), InternalStatus]);
 status(_) ->
      emqx_ctl:usage("status", "Show broker status").
 

+ 1 - 0
apps/emqx_management/src/emqx_mgmt_data_backup.erl

@@ -503,6 +503,7 @@ do_import_acl_mnesia(Acls) ->
     end.
 
 -ifdef(EMQX_ENTERPRISE).
+-dialyzer({nowarn_function, [import_modules/1]}).
 import_modules(Modules) ->
     case ets:info(emqx_modules) of
         undefined ->

+ 4 - 2
apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl

@@ -28,7 +28,9 @@ all() ->
     emqx_ct:all(?MODULE).
 
 init_per_suite(Cfg) ->
-    emqx_ct_helpers:start_apps([emqx_bridge_mqtt, emqx_rule_engine]),
+    application:load(emqx_modules),
+    application:load(emqx_bridge_mqtt),
+    emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]),
     Cfg.
 
 end_per_suite(Cfg) ->
@@ -179,4 +181,4 @@ remove_resources() ->
     lists:foreach(fun(#resource{id = Id}) ->
         emqx_rule_engine:delete_resource(Id)
     end, emqx_rule_registry:get_resources()),
-    timer:sleep(500).
+    timer:sleep(500).

+ 2 - 1
apps/emqx_management/test/emqx_mgmt_SUITE.erl

@@ -133,7 +133,8 @@ t_mgmt_cmd(_) ->
 t_status_cmd(_) ->
     % ct:pal("start testing status command"),
     mock_print(),
-    ?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([]), "is running")),
+    %% init internal status seem to be always 'starting' when running ct tests
+    ?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([]), "Node\s.*@.*\sis\sstart(ed|ing)")),
     meck:unload().
 
 t_broker_cmd(_) ->

+ 32 - 58
apps/emqx_management/test/emqx_mgmt_api_SUITE.erl

@@ -34,55 +34,22 @@
 -define(BASE_PATH, "api").
 
 all() ->
-    [{group, rest_api}].
-
-groups() ->
-    [{rest_api,
-      [sequence],
-      [ alarms
-      , apps
-      , banned
-      , brokers
-      , clients
-      , listeners
-      , metrics
-      , nodes
-      , plugins
-      , acl_cache
-      , pubsub
-      , routes_and_subscriptions
-      , stats
-      , data
-      ]
-    }].
+    emqx_ct:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_ct_helpers:start_apps([emqx_management, emqx_auth_mnesia, emqx_modules]),
-    ekka_mnesia:start(),
-    emqx_mgmt_auth:mnesia(boot),
+    application:load(emqx_modules),
+    emqx_ct_helpers:start_apps([emqx_management]),
     Config.
 
-end_per_suite(_Config) ->
-    emqx_ct_helpers:stop_apps([emqx_auth_mnesia, emqx_management, emqx_modules]),
-    ekka_mnesia:ensure_stopped().
-
-init_per_testcase(data, Config) ->
-    ok = emqx_dashboard_admin:mnesia(boot),
-    application:ensure_all_started(emqx_dashboard),
-    ok = emqx_rule_registry:mnesia(boot),
-    application:ensure_all_started(emqx_rule_engine),
-    Config;
+end_per_suite(Config) ->
+    emqx_ct_helpers:stop_apps([emqx_management]),
+    Config.
 
 init_per_testcase(_, Config) ->
     Config.
 
-end_per_testcase(data, _Config) ->
-    application:stop(emqx_dahboard),
-    application:stop(emqx_rule_engine),
-    ok;
-
-end_per_testcase(_, _Config) ->
-    ok.
+end_per_testcase(_, Config) ->
+    Config.
 
 get(Key, ResponseBody) ->
    maps:get(Key, jiffy:decode(list_to_binary(ResponseBody), [return_maps])).
@@ -101,7 +68,7 @@ is_existing(Name, [_Alarm | More]) ->
 is_existing(_Name, []) ->
     false.
 
-alarms(_) ->
+t_alarms(_) ->
     emqx_alarm:activate(alarm1),
     emqx_alarm:activate(alarm2),
 
@@ -134,7 +101,7 @@ alarms(_) ->
     ?assertNot(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return5))))),
     ?assertNot(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return5))))).
 
-apps(_) ->
+t_apps(_) ->
     AppId = <<"123456">>,
     meck:new(emqx_mgmt_auth, [passthrough, no_history]),
     meck:expect(emqx_mgmt_auth, add_app, 6, fun(_, _, _, _, _, _) -> {error, undefined} end),
@@ -172,7 +139,7 @@ apps(_) ->
     [App] = get(<<"data">>, Result),
     ?assertEqual(<<"admin">>, maps:get(<<"app_id">>, App)).
 
-banned(_) ->
+t_banned(_) ->
     Who = <<"myclient">>,
     {ok, _} = request_api(post, api_path(["banned"]), [],
                           auth_header_(), #{<<"who">> => Who,
@@ -190,7 +157,7 @@ banned(_) ->
     {ok, Result2} = request_api(get, api_path(["banned"]), auth_header_()),
     ?assertEqual([], get(<<"data">>, Result2)).
 
-brokers(_) ->
+t_brokers(_) ->
     {ok, _} = request_api(get, api_path(["brokers"]), auth_header_()),
     {ok, _} = request_api(get, api_path(["brokers", atom_to_list(node())]), auth_header_()),
     meck:new(emqx_mgmt, [passthrough, no_history]),
@@ -199,7 +166,7 @@ brokers(_) ->
     ?assertEqual(<<"undefined">>, get(<<"message">>, Error)),
     meck:unload(emqx_mgmt).
 
-clients(_) ->
+t_clients(_) ->
     process_flag(trap_exit, true),
     Username1 = <<"user1">>,
     Username2 = <<"user2">>,
@@ -288,7 +255,7 @@ receive_exit(Count) ->
             ct:log("timeout")
     end.
 
-listeners(_) ->
+t_listeners(_) ->
     {ok, _} = request_api(get, api_path(["listeners"]), auth_header_()),
     {ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "listeners"]), auth_header_()),
     meck:new(emqx_mgmt, [passthrough, no_history]),
@@ -299,7 +266,7 @@ listeners(_) ->
                  maps:get(<<"error">>, maps:get(<<"listeners">>, Error))),
     meck:unload(emqx_mgmt).
 
-metrics(_) ->
+t_metrics(_) ->
     {ok, _} = request_api(get, api_path(["metrics"]), auth_header_()),
     {ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "metrics"]), auth_header_()),
     meck:new(emqx_mgmt, [passthrough, no_history]),
@@ -307,7 +274,7 @@ metrics(_) ->
     {ok, "{\"message\":\"undefined\"}"} = request_api(get, api_path(["nodes", atom_to_list(node()), "metrics"]), auth_header_()),
     meck:unload(emqx_mgmt).
 
-nodes(_) ->
+t_nodes(_) ->
     {ok, _} = request_api(get, api_path(["nodes"]), auth_header_()),
     {ok, _} = request_api(get, api_path(["nodes", atom_to_list(node())]), auth_header_()),
     meck:new(emqx_mgmt, [passthrough, no_history]),
@@ -317,7 +284,8 @@ nodes(_) ->
     ?assertEqual(<<"undefined">>, maps:get(<<"error">>, Error)),
     meck:unload(emqx_mgmt).
 
-plugins(_) ->
+t_plugins(_) ->
+    application:ensure_all_started(emqx_auth_mnesia),
     {ok, Plugins1} = request_api(get, api_path(["plugins"]), auth_header_()),
     [Plugins11] = filter(get(<<"data">>, Plugins1), <<"node">>, atom_to_binary(node(), utf8)),
     [Plugin1] = filter(maps:get(<<"plugins">>, Plugins11), <<"name">>, <<"emqx_auth_mnesia">>),
@@ -354,7 +322,7 @@ plugins(_) ->
                                  auth_header_()),
     [Plugin3] = filter(get(<<"data">>, Plugins3), <<"name">>, <<"emqx_auth_mnesia">>),
     ?assertEqual(<<"emqx_auth_mnesia">>, maps:get(<<"name">>, Plugin3)),
-    ?assertEqual(false, maps:get(<<"active">>, Plugin3)),
+    ?assertEqual(true, maps:get(<<"active">>, Plugin3)),
 
     {ok, _} = request_api(put,
                           api_path(["nodes",
@@ -370,9 +338,10 @@ plugins(_) ->
                                          atom_to_list(emqx_auth_mnesia),
                                          "unload"]),
                                auth_header_()),
-    ?assertEqual(<<"not_started">>, get(<<"message">>, Error2)).
+    ?assertEqual(<<"not_started">>, get(<<"message">>, Error2)),
+    application:stop(emqx_auth_mnesia).
 
-acl_cache(_) ->
+t_acl_cache(_) ->
     ClientId = <<"client1">>,
     Topic = <<"mytopic">>,
     {ok, C1} = emqtt:start_link(#{clientid => ClientId}),
@@ -395,7 +364,7 @@ acl_cache(_) ->
     ?assertEqual(0, length(Caches3)),
     ok = emqtt:disconnect(C1).
 
-pubsub(_) ->
+t_pubsub(_) ->
     Qos1Received = emqx_metrics:val('messages.qos1.received'),
     Qos2Received = emqx_metrics:val('messages.qos2.received'),
     Received = emqx_metrics:val('messages.received'),
@@ -514,7 +483,7 @@ loop(Data) ->
     ?assertEqual(0, maps:get(<<"code">>, H)),
     loop(T).
 
-routes_and_subscriptions(_) ->
+t_routes_and_subscriptions(_) ->
     ClientId = <<"myclient">>,
     Topic = <<"mytopic">>,
     {ok, NonRoute} = request_api(get, api_path(["routes"]), auth_header_()),
@@ -559,7 +528,7 @@ routes_and_subscriptions(_) ->
 
     ok = emqtt:disconnect(C1).
 
-stats(_) ->
+t_stats(_) ->
     {ok, _} = request_api(get, api_path(["stats"]), auth_header_()),
     {ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "stats"]), auth_header_()),
     meck:new(emqx_mgmt, [passthrough, no_history]),
@@ -568,7 +537,11 @@ stats(_) ->
     ?assertEqual(<<"undefined">>, get(<<"message">>, Return)),
     meck:unload(emqx_mgmt).
 
-data(_) ->
+t_data(_) ->
+    ok = emqx_rule_registry:mnesia(boot),
+    ok = emqx_dashboard_admin:mnesia(boot),
+    application:ensure_all_started(emqx_rule_engine),
+    application:ensure_all_started(emqx_dashboard),
     {ok, Data} = request_api(post, api_path(["data","export"]), [], auth_header_(), [#{}]),
     #{<<"filename">> := Filename, <<"node">> := Node} = emqx_ct_http:get_http_data(Data),
     {ok, DataList} = request_api(get, api_path(["data","export"]), auth_header_()),
@@ -576,7 +549,8 @@ data(_) ->
 
     ?assertMatch({ok, _}, request_api(post, api_path(["data","import"]), [], auth_header_(), #{<<"filename">> => Filename, <<"node">> => Node})),
     ?assertMatch({ok, _}, request_api(post, api_path(["data","import"]), [], auth_header_(), #{<<"filename">> => Filename})),
-
+    application:stop(emqx_rule_engine),
+    application:stop(emqx_dahboard),
     ok.
 
 request_api(Method, Url, Auth) ->

+ 4 - 13
apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl

@@ -28,24 +28,15 @@ all() ->
     emqx_ct:all(?MODULE).
 
 init_per_suite(Cfg) ->
-    emqx_ct_helpers:start_apps([emqx_web_hook,
-                                emqx_bridge_mqtt,
-                                emqx_rule_engine,
-                                emqx_modules,
-                                emqx_management,
-                                emqx_dashboard]),
-    ok = ekka_mnesia:start(),
+    application:load(emqx_modules),
+    application:load(emqx_web_hook),
+    emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]),
     ok = emqx_rule_registry:mnesia(boot),
     ok = emqx_rule_engine:load_providers(),
     Cfg.
 
 end_per_suite(Cfg) ->
-    emqx_ct_helpers:stop_apps([emqx_dashboard,
-                               emqx_management,
-                               emqx_modules,
-                               emqx_rule_engine,
-                               emqx_bridge_mqtt,
-                               emqx_web_hook]),
+    emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]),
     Cfg.
 
 get_data_path() ->

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -1,6 +1,6 @@
 {application, emqx_rule_engine,
  [{description, "EMQ X Rule Engine"},
-  {vsn, "4.3.1"}, % strict semver, bump manually!
+  {vsn, "4.3.2"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_rule_engine_sup, emqx_rule_registry]},
   {applications, [kernel,stdlib,rulesql,getopt]},

+ 11 - 3
apps/emqx_rule_engine/src/emqx_rule_engine.appup.src

@@ -1,13 +1,21 @@
 %% -*-: erlang -*-
-{"4.3.1",
+{"4.3.2",
  [ {"4.3.0",
-    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
+    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
+      {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
+    ]},
+   {"4.3.1",
+    [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
     ]},
    {<<".*">>, []}
  ],
  [
    {"4.3.0",
-    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
+    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
+      {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
+    ]},
+   {"4.3.1",
+    [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
     ]},
    {<<".*">>, []}
  ]

+ 18 - 19
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -408,7 +408,7 @@ refresh_resource_status() ->
         fun(#resource{id = ResId, type = ResType}) ->
             case emqx_rule_registry:find_resource_type(ResType) of
                 {ok, #resource_type{on_status = {Mod, OnStatus}}} ->
-                    fetch_resource_status(Mod, OnStatus, ResId);
+                    _ = fetch_resource_status(Mod, OnStatus, ResId);
                 _ -> ok
             end
         end, emqx_rule_registry:get_resources()).
@@ -588,27 +588,26 @@ clear_action(Module, Destroy, ActionInstId) ->
 fetch_resource_status(Module, OnStatus, ResId) ->
     case emqx_rule_registry:find_resource_params(ResId) of
         {ok, ResParams = #resource_params{params = Params, status = #{is_alive := LastIsAlive}}} ->
-            try
-                NewStatus =
-                    case Module:OnStatus(ResId, Params) of
-                        #{is_alive := LastIsAlive} = Status -> Status;
-                        #{is_alive := true} = Status ->
-                            {ok, Type} = find_type(ResId),
-                            Name = alarm_name_of_resource_down(Type, ResId),
-                            emqx_alarm:deactivate(Name),
-                            Status;
-                        #{is_alive := false} = Status ->
-                            {ok, Type} = find_type(ResId),
-                            Name = alarm_name_of_resource_down(Type, ResId),
-                            emqx_alarm:activate(Name, #{id => ResId, type => Type}),
-                            Status
-                    end,
-                emqx_rule_registry:add_resource_params(ResParams#resource_params{status = NewStatus}),
-                NewStatus
+            NewStatus = try
+                case Module:OnStatus(ResId, Params) of
+                    #{is_alive := LastIsAlive} = Status -> Status;
+                    #{is_alive := true} = Status ->
+                        {ok, Type} = find_type(ResId),
+                        Name = alarm_name_of_resource_down(Type, ResId),
+                        emqx_alarm:deactivate(Name),
+                        Status;
+                    #{is_alive := false} = Status ->
+                        {ok, Type} = find_type(ResId),
+                        Name = alarm_name_of_resource_down(Type, ResId),
+                        emqx_alarm:activate(Name, #{id => ResId, type => Type}),
+                        Status
+                end
             catch _Error:Reason:STrace ->
                 ?LOG(error, "get resource status for ~p failed: ~0p", [ResId, {Reason, STrace}]),
                 #{is_alive => false}
-            end;
+            end,
+            emqx_rule_registry:add_resource_params(ResParams#resource_params{status = NewStatus}),
+            NewStatus;
         not_found ->
             #{is_alive => false}
     end.

+ 2 - 0
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -31,6 +31,8 @@
         , range_get/3
         ]).
 
+-compile({no_auto_import,[alias/1]}).
+
 -type(input() :: map()).
 -type(alias() :: atom()).
 -type(collection() :: {alias(), [term()]}).

+ 1 - 1
apps/emqx_sn/src/emqx_sn.app.src

@@ -1,6 +1,6 @@
 {application, emqx_sn,
  [{description, "EMQ X MQTT-SN Plugin"},
-  {vsn, "4.3.0"}, % strict semver, bump manually!
+  {vsn, "4.3.1"}, % strict semver, bump manually!
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,esockd]},

+ 17 - 0
apps/emqx_sn/src/emqx_sn.appup.src

@@ -0,0 +1,17 @@
+%% -*-: erlang -*-
+{VSN,
+ [
+   {"4.3.0", [
+     {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []},
+     {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]}
+   ]},
+   {<<".*">>, []}
+ ],
+ [
+   {"4.3.0", [
+     {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []},
+     {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]}
+   ]},
+   {<<".*">>, []}
+ ]
+}.

+ 10 - 5
apps/emqx_sn/src/emqx_sn_asleep_timer.erl

@@ -18,6 +18,7 @@
 
 -export([ init/0
         , ensure/2
+        , cancel/1
         ]).
 
 -record(asleep_state, {
@@ -42,8 +43,8 @@ init() ->
 -spec(ensure(undefined | integer(), asleep_state()) -> asleep_state()).
 ensure(undefined, State = #asleep_state{duration = Duration}) ->
     ensure(Duration, State);
-ensure(Duration, State = #asleep_state{tref = TRef}) ->
-    _ = cancel(TRef),
+ensure(Duration, State) ->
+    cancel(State),
     State#asleep_state{duration = Duration, tref = start(Duration)}.
 
 %%--------------------------------------------------------------------
@@ -55,6 +56,10 @@ ensure(Duration, State = #asleep_state{tref = TRef}) ->
 start(Duration) ->
     erlang:send_after(timer:seconds(Duration), self(), asleep_timeout).
 
-cancel(undefined) -> ok;
-cancel(TRef) when is_reference(TRef) ->
-    erlang:cancel_timer(TRef).
+cancel(#asleep_state{tref = Timer}) when is_reference(Timer) ->
+    case erlang:cancel_timer(Timer) of
+        false ->
+            receive {timeout, Timer, _} -> ok after 0 -> ok end;
+        _ -> ok
+    end;
+cancel(_) -> ok.

+ 10 - 8
apps/emqx_sn/src/emqx_sn_gateway.erl

@@ -439,12 +439,11 @@ asleep(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State)
 %    4) emq-sn regard this CONNECT as a signal to connected state, not a bootup CONNECT. For this reason, will procedure is lost
 % this should be a bug in mqtt-sn channel.
 asleep(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)},
-       State = #state{keepalive_interval = _Interval}) ->
-    % device wakeup and goto connected state
-    % keepalive timer may timeout in asleep state and delete itself, need to restart keepalive
-    % TODO: Fixme later.
-    %% self() ! {keepalive, start, Interval},
-    {next_state, connected, send_connack(State)};
+       State = #state{channel = Channel, asleep_timer = Timer}) ->
+    NChannel = emqx_channel:ensure_keepalive(#{}, Channel),
+    emqx_sn_asleep_timer:cancel(Timer),
+    {next_state, connected, send_connack(State#state{channel = NChannel,
+                                                     asleep_timer = emqx_sn_asleep_timer:init()})};
 
 asleep(EventType, EventContent, State) ->
     handle_event(EventType, EventContent, asleep, State).
@@ -771,10 +770,13 @@ send_message(Msg = #mqtt_sn_message{type = Type},
 
 goto_asleep_state(State) ->
     goto_asleep_state(undefined, State).
-goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) ->
+goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer,
+                                         channel = Channel}) ->
     ?LOG(debug, "goto_asleep_state Duration=~p", [Duration]),
     NewTimer = emqx_sn_asleep_timer:ensure(Duration, AsleepTimer),
-    {next_state, asleep, State#state{asleep_timer = NewTimer}, hibernate}.
+    NChannel = emqx_channel:clear_keepalive(Channel),
+    {next_state, asleep, State#state{asleep_timer = NewTimer,
+                                     channel = NChannel}, hibernate}.
 
 %%--------------------------------------------------------------------
 %% Helper funcs

+ 5 - 5
apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl

@@ -856,7 +856,7 @@ t_will_test2(_) ->
     send_pingreq_msg(Socket, undefined),
     ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
-    timer:sleep(10000),
+    timer:sleep(4000),
 
     receive_response(Socket), % ignore PUBACK
     receive_response(Socket), % ignore PUBCOMP
@@ -878,7 +878,7 @@ t_will_test3(_) ->
     send_pingreq_msg(Socket, undefined),
     ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
-    timer:sleep(10000),
+    timer:sleep(4000),
 
     ?assertEqual(udp_receive_timeout, receive_response(Socket)),
 
@@ -906,7 +906,7 @@ t_will_test4(_) ->
     send_willmsgupd_msg(Socket, <<"1A2B3C">>),
     ?assertEqual(<<3, ?SN_WILLMSGRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
 
-    timer:sleep(10000),
+    timer:sleep(4000),
 
     receive_response(Socket), % ignore PUBACK
 
@@ -1359,7 +1359,7 @@ t_asleep_test07_to_connected(_) ->
     timer:sleep(1500),
     % asleep timer should get timeout, without any effect
 
-    timer:sleep(9000),
+    timer:sleep(4000),
     % keepalive timer should get timeout
 
     gen_udp:close(Socket).
@@ -1517,7 +1517,7 @@ t_awake_test01_to_connected(_) ->
     timer:sleep(1500),
     % asleep timer should get timeout
 
-    timer:sleep(9000),
+    timer:sleep(4000),
     % keepalive timer should get timeout
     gen_udp:close(Socket).
 

+ 2 - 0
bin/emqx.cmd

@@ -16,6 +16,7 @@
 :: Set variables that describe the release
 @set rel_name=emqx
 @set rel_vsn={{ release_version }}
+@set REL_VSN=%rel_vsn%
 @set erts_vsn={{ erts_vsn }}
 @set erl_opts={{ erl_opts }}
 
@@ -30,6 +31,7 @@
   set rel_root_dir=%%~fA
 )
 @set rel_dir=%rel_root_dir%\releases\%rel_vsn%
+@set RUNNER_ROOT_DIR=%rel_root_dir%
 
 @set etc_dir=%rel_root_dir%\etc
 @set lib_dir=%rel_root_dir%\lib

+ 3 - 17
bin/nodetool

@@ -292,25 +292,11 @@ join([H|T], Sep) ->
 
 add_libs_dir() ->
     [_ | _] = RootDir = os:getenv("RUNNER_ROOT_DIR"),
-    RelFile = filename:join([RootDir, "releases",
-                             os:getenv("REL_VSN"),
-                             "emqx.rel"
-                            ]),
-    case file:consult(RelFile) of
-        {ok, [{release, {_, _RelVsn}, {erts, _ErtsVsn}, Libs}]} ->
-            lists:foreach(
-              fun({Name, Vsn}) -> add_lib_dir(RootDir, Name, Vsn);
-                 ({Name, Vsn, _}) -> add_lib_dir(RootDir, Name, Vsn)
-              end, Libs);
-        {error, enoent} ->
-            %% rel file is deleted by release handler
-            add_libs_dir2(RootDir)
-    end.
-
-add_libs_dir2(RootDir) ->
+    CurrentVsn = os:getenv("REL_VSN"),
     RelFile = filename:join([RootDir, "releases", "RELEASES"]),
     case file:consult(RelFile) of
-        {ok, [[Release]]} ->
+        {ok, [Releases]} ->
+            Release = lists:keyfind(CurrentVsn, 3, Releases),
             {release, _Name, _AppVsn, _ErtsVsn, Libs, _State} = Release,
             lists:foreach(
               fun({Name, Vsn, _}) ->

+ 2 - 2
lib-ce/emqx_modules/test/emqx_modules_SUITE.erl

@@ -32,11 +32,11 @@
 all() -> emqx_ct:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_ct_helpers:start_apps([emqx_management, emqx_modules], fun set_sepecial_cfg/1),
+    emqx_ct_helpers:start_apps([emqx_management, emqx_modules], fun set_special_cfg/1),
     emqx_ct_http:create_default_app(),
     Config.
 
-set_sepecial_cfg(_) ->
+set_special_cfg(_) ->
     application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")),
     ok.
 

+ 3 - 1
priv/emqx.schema

@@ -2307,7 +2307,9 @@ end}.
     [random, %% randomly pick a subscriber
      round_robin, %% round robin alive subscribers one message after another
      sticky, %% pick a random subscriber and stick to it
-     hash %% hash client ID to a group member
+     hash, %% hash client ID to a group member
+     hash_clientid,
+     hash_topic
     ]}}
 ]}.
 

+ 1 - 1
rebar.config

@@ -35,7 +35,7 @@
 {erl_first_files, ["src/emqx_logger.erl", "src/emqx_rule_actions_trans.erl"]}.
 
 {deps,
-    [ {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.3"}}}
+    [ {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.5"}}}
     , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}}
     , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}

+ 1 - 1
rebar.config.erl

@@ -194,7 +194,7 @@ overlay_vars_pkg(bin) ->
     , {platform_etc_dir, "etc"}
     , {platform_lib_dir, "lib"}
     , {platform_log_dir, "log"}
-    , {platform_plugins_dir,  "plugins"}
+    , {platform_plugins_dir,  "etc/plugins"}
     , {runner_root_dir, "$(cd $(dirname $(readlink $0 || echo $0))/..; pwd -P)"}
     , {runner_bin_dir, "$RUNNER_ROOT_DIR/bin"}
     , {runner_etc_dir, "$RUNNER_ROOT_DIR/etc"}

+ 0 - 1
scripts/apps-version-check.sh

@@ -16,7 +16,6 @@ while read -r app; do
     now_app_version=$(grep -E 'vsn' "$src_file" | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')
     if [ "$old_app_version" = "$now_app_version" ]; then
         changed="$(git diff --name-only "$latest_release"...HEAD \
-                    -- "$app_path/etc" \
                     -- "$app_path/src" \
                     -- "$app_path/priv" \
                     -- "$app_path/c_src" | wc -l)"

+ 1 - 1
src/emqx.app.src

@@ -1,7 +1,7 @@
 {application, emqx,
  [{id, "emqx"},
   {description, "EMQ X"},
-  {vsn, "4.3.1"}, % strict semver, bump manually!
+  {vsn, "4.3.2"}, % strict semver, bump manually!
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},

+ 39 - 1
src/emqx.appup.src

@@ -1,19 +1,57 @@
 %% -*-: erlang -*-
 {VSN,
  [
+   {"4.3.1", [
+     {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
+     {load_module, emqx_congestion, brutal_purge, soft_purge, []},
+     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
+     {load_module, emqx_channel, brutal_purge, soft_purge, []},
+     {load_module, emqx_app, brutal_purge, soft_purge, []},
+     {load_module, emqx_plugins, brutal_purge, soft_purge, []}
+   ]},
    {"4.3.0", [
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
+     {load_module, emqx_congestion, brutal_purge, soft_purge, []},
+     {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_frame, brutal_purge, soft_purge, []},
+     {load_module, emqx_trie, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
+     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
+     {load_module, emqx_channel, brutal_purge, soft_purge, []},
+     {load_module, emqx_app, brutal_purge, soft_purge, []},
+     {load_module, emqx_plugins, brutal_purge, soft_purge, []},
+     %%
      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
      {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
    ]},
    {<<".*">>, []}
  ],
  [
+   {"4.3.1", [
+     {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
+     {load_module, emqx_congestion, brutal_purge, soft_purge, []},
+     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
+     {load_module, emqx_channel, brutal_purge, soft_purge, []},
+     {load_module, emqx_app, brutal_purge, soft_purge, []},
+     {load_module, emqx_plugins, brutal_purge, soft_purge, []}
+   ]},
    {"4.3.0", [
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
+     {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_congestion, brutal_purge, soft_purge, []},
+     {load_module, emqx_frame, brutal_purge, soft_purge, []},
+     {load_module, emqx_trie, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
+     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
+     {load_module, emqx_channel, brutal_purge, soft_purge, []},
+     {load_module, emqx_app, brutal_purge, soft_purge, []},
+     {load_module, emqx_plugins, brutal_purge, soft_purge, []},
      %% Just load the module. We don't need to change the 'messages.retained'
      %% and 'messages.retained' counter type.
-     {load_module, emqx_metrics, brutal_purge, soft_purge, []}
+     {load_module, emqx_metrics, brutal_purge, soft_purge, []},
+     {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
    ]},
    {<<".*">>, []}
  ]

+ 109 - 38
src/emqx_broker_bench.erl

@@ -18,32 +18,82 @@
 
 -ifdef(EMQX_BENCHMARK).
 
--export([start/1, run1/0, run1/2]).
+-export([run/1, run1/0, run1/4]).
 
-run1() -> run1(4, 1000).
+-define(T(Expr), timer:tc(fun() -> Expr end)).
 
-run1(Factor, Limit) ->
-    start(#{factor => Factor,
-            limit => Limit,
-            sub_ptn => <<"device/{{id}}/+/{{num}}/#">>,
-            pub_ptn => <<"device/{{id}}/xays/{{num}}/foo/bar/baz">>}).
+run1() -> run1(80, 1000, 80, 10000).
+
+run1(Subs, SubOps, Pubs, PubOps) ->
+    run(#{subscribers => Subs,
+          publishers => Pubs,
+          sub_ops => SubOps,
+          pub_ops => PubOps,
+          sub_ptn => <<"device/{{id}}/+/{{num}}/#">>,
+          pub_ptn => <<"device/{{id}}/foo/{{num}}/bar/1/2/3/4/5">>
+         }).
 
 %% setting fields:
-%% - factor: spawn broker-pool-size * factor number of callers
-%% - limit: limit the total number of topics for each caller
+%% - subscribers: spawn this number of subscriber workers
+%% - publishers: spawn this number of publisher workers
+%% - sub_ops: the number of subscribes (route insert) each subscriber runs
+%% - pub_ops: the number of publish (route lookups) each publisher runs
 %% - sub_ptn: subscribe topic pattern like a/+/b/+/c/#
 %%            or a/+/{{id}}/{{num}}/# to generate pattern with {{id}}
 %%            replaced by worker id and {{num}} replaced by topic number.
 %% - pub_ptn: topic pattern used to benchmark publish (match) performance
 %%            e.g. a/x/{{id}}/{{num}}/foo/bar
-start(#{factor := Factor} = Settings) ->
-    BrokerPoolSize = emqx_vm:schedulers() * 2,
-    Pids = start_callers(BrokerPoolSize * Factor, Settings),
-    R = collect_results(Pids, #{subscribe => 0, match => 0}),
+run(#{subscribers := Subs,
+      publishers := Pubs,
+      sub_ops := SubOps,
+      pub_ops := PubOps
+     } = Settings) ->
+    SubsPids = start_callers(Subs, fun  start_subscriber/1, Settings),
+    PubsPids = start_callers(Pubs, fun start_publisher/1, Settings),
+    _ = collect_results(SubsPids, subscriber_ready),
+    io:format(user, "subscribe ...~n", []),
+    {T1, SubsTime} =
+        ?T(begin
+               lists:foreach(fun(Pid) -> Pid ! start_subscribe end, SubsPids),
+               collect_results(SubsPids, subscribe_time)
+           end),
+    io:format(user, "InsertTotalTime: ~s~n", [ns(T1)]),
+    io:format(user, "InsertTimeAverage: ~s~n", [ns(SubsTime / Subs)]),
+    io:format(user, "InsertRps: ~p~n", [rps(Subs * SubOps, T1)]),
+
+    io:format(user, "lookup ...~n", []),
+    {T2, PubsTime} =
+        ?T(begin
+               lists:foreach(fun(Pid) -> Pid ! start_lookup end, PubsPids),
+               collect_results(PubsPids, lookup_time)
+           end),
+    io:format(user, "LookupTotalTime: ~s~n", [ns(T2)]),
+    io:format(user, "LookupTimeAverage: ~s~n", [ns(PubsTime / Pubs)]),
+    io:format(user, "LookupRps: ~p~n", [rps(Pubs * PubOps, T2)]),
+
     io:format(user, "mnesia table(s) RAM: ~p~n", [ram_bytes()]),
-    io:format(user, "~p~n", [erlang:memory()]),
-    io:format(user, "~p~n", [R]),
-    lists:foreach(fun(Pid) -> Pid ! stop end, Pids).
+
+    io:format(user, "unsubscribe ...~n", []),
+    {T3, ok} =
+        ?T(begin
+               lists:foreach(fun(Pid) -> Pid ! stop end, SubsPids),
+               wait_until_empty()
+           end),
+    io:format(user, "TimeToUnsubscribeAll: ~s~n", [ns(T3)]).
+
+wait_until_empty() ->
+    case emqx_trie:empty() of
+        true -> ok;
+        false ->
+            timer:sleep(5),
+            wait_until_empty()
+    end.
+
+rps(N, NanoSec) -> N * 1_000_000 / NanoSec.
+
+ns(T) when T > 1_000_000 -> io_lib:format("~p(s)", [T / 1_000_000]);
+ns(T) when T > 1_000 -> io_lib:format("~p(ms)", [T / 1_000]);
+ns(T) -> io_lib:format("~p(ns)", [T]).
 
 ram_bytes() ->
     Wordsize = erlang:system_info(wordsize),
@@ -56,48 +106,69 @@ ram_bytes() ->
             0
     end.
 
-start_callers(0, _) -> [];
-start_callers(N, Settings) ->
-    [start_caller(Settings#{id => N}) | start_callers(N - 1, Settings)].
+start_callers(N, F, Settings) ->
+    start_callers(N, F, Settings, []).
 
-collect_results([], R) -> R;
-collect_results([Pid | Pids], Acc = #{subscribe := Sr, match := Mr}) ->
+start_callers(0, _F, _Settings, Acc) ->
+    lists:reverse(Acc);
+start_callers(N, F, Settings, Acc) ->
+    start_callers(N - 1, F, Settings, [F(Settings#{id => N}) | Acc]).
+
+collect_results(Pids, Tag) ->
+    collect_results(Pids, Tag, 0).
+
+collect_results([], _Tag, R) -> R;
+collect_results([Pid | Pids], Tag, R) ->
     receive
-        {Pid, #{subscribe := Srd, match := Mrd}} ->
-            collect_results(Pids, Acc#{subscribe := Sr + Srd, match := Mr + Mrd})
+        {Pid, Tag, N} ->
+            collect_results(Pids, Tag, N + R)
     end.
 
-%% ops per second
-rps(T, N) -> round(N / (T / 1000000)).
-
-start_caller(#{id := Id, limit := N, sub_ptn := SubPtn, pub_ptn := PubPtn}) ->
+start_subscriber(#{id := Id, sub_ops := N, sub_ptn := SubPtn}) ->
     Parent = self(),
     proc_lib:spawn_link(
         fun() ->
                 SubTopics = make_topics(SubPtn, Id, N),
-                {Ts, _} = timer:tc(fun() -> subscribe(SubTopics) end),
-                PubTopics = make_topics(PubPtn, Id, N),
-                {Tm, _} = timer:tc(fun() -> match(PubTopics) end),
-                _ = erlang:send(Parent, {self(), #{subscribe => rps(Ts, N), match => rps(Tm, N)}}),
+                Parent ! {self(), subscriber_ready, 0},
+                receive
+                    start_subscribe ->
+                        ok
+                end,
+                {Ts, _} = ?T(subscribe(SubTopics)),
+                _ = erlang:send(Parent, {self(), subscribe_time, Ts/ N}),
+                %% subscribers should not exit before publish test is done
                 receive
                     stop ->
                         ok
                 end
         end).
 
-match([]) -> ok;
-match([Topic | Topics]) ->
-    _ = emqx_router:lookup_routes(Topic),
-    match(Topics).
+start_publisher(#{id := Id, pub_ops := N, pub_ptn := PubPtn, subscribers := Subs}) ->
+    Parent = self(),
+    proc_lib:spawn_link(
+      fun() ->
+              L = lists:seq(1, N),
+              [Topic] = make_topics(PubPtn, (Id rem Subs) + 1, 1),
+              receive
+                  start_lookup ->
+                      ok
+              end,
+              {Tm, ok} = ?T(lists:foreach(fun(_) -> match(Topic) end, L)),
+              _ = erlang:send(Parent, {self(), lookup_time, Tm / N}),
+              ok
+      end).
+
+match(Topic) ->
+    [_] = emqx_router:match_routes(Topic).
 
 subscribe([]) -> ok;
 subscribe([Topic | Rest]) ->
     ok = emqx_broker:subscribe(Topic),
     subscribe(Rest).
 
-make_topics(SubPtn0, Id, Limit) ->
-    SubPtn = emqx_topic:words(SubPtn0),
-    F = fun(N) -> render(Id, N, SubPtn) end,
+make_topics(Ptn0, Id, Limit) ->
+    Ptn = emqx_topic:words(Ptn0),
+    F = fun(N) -> render(Id, N, Ptn) end,
     lists:map(F, lists:seq(1, Limit)).
 
 render(ID, N, Ptn) ->

+ 12 - 1
src/emqx_channel.erl

@@ -49,7 +49,10 @@
         ]).
 
 %% Export for emqx_sn
--export([do_deliver/2]).
+-export([ do_deliver/2
+        , ensure_keepalive/2
+        , clear_keepalive/1
+        ]).
 
 %% Exports for CT
 -export([set_field/3]).
@@ -1562,6 +1565,14 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}
     Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
     ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
 
+clear_keepalive(Channel = #channel{timers = Timers}) ->
+    case maps:get(alive_timer, Timers, undefined) of
+        undefined ->
+            Channel;
+        TRef ->
+            emqx_misc:cancel_timer(TRef),
+            Channel#channel{timers = maps:without([alive_timer], Timers)}
+    end.
 %%--------------------------------------------------------------------
 %% Maybe Resume Session
 

+ 21 - 12
src/emqx_cm.erl

@@ -22,6 +22,7 @@
 -include("emqx.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -logger_header("[CM]").
 
@@ -110,6 +111,7 @@ start_link() ->
 insert_channel_info(ClientId, Info, Stats) ->
     Chan = {ClientId, self()},
     true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
+    ?tp(debug, insert_channel_info, #{client_id => ClientId}),
     ok.
 
 %% @private
@@ -279,18 +281,25 @@ takeover_session(ClientId, ChanPid) ->
 discard_session(ClientId) when is_binary(ClientId) ->
     case lookup_channels(ClientId) of
         [] -> ok;
-        ChanPids ->
-            lists:foreach(
-              fun(ChanPid) ->
-                      try
-                          discard_session(ClientId, ChanPid)
-                      catch
-                          _:{noproc,_}:_Stk -> ok;
-                          _:{{shutdown,_},_}:_Stk -> ok;
-                          _:Error:_Stk ->
-                              ?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error])
-                      end
-              end, ChanPids)
+        ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
+    end.
+
+do_discard_session(ClientId, Pid) ->
+    try
+        discard_session(ClientId, Pid)
+    catch
+        _ : noproc -> % emqx_ws_connection: call
+            ?tp(debug, "session_already_gone", #{pid => Pid}),
+            ok;
+        _ : {noproc, _} -> % emqx_connection: gen_server:call
+            ?tp(debug, "session_already_gone", #{pid => Pid}),
+            ok;
+        _ : {{shutdown, _}, _} ->
+            ?tp(debug, "session_already_shutdown", #{pid => Pid}),
+            ok;
+        _ : Error : St ->
+            ?tp(error, "failed_to_discard_session",
+                #{pid => Pid, reason => Error, stacktrace=>St})
     end.
 
 discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->

+ 4 - 1
src/emqx_congestion.erl

@@ -48,7 +48,10 @@ maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
 
 cancel_alarms(Socket, Transport, Channel) ->
     lists:foreach(fun(Reason) ->
-        do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
+        case has_alarm_sent(Reason) of
+            true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
+            false -> ok
+        end
     end, ?ALL_ALARM_REASONS).
 
 is_alarm_enabled(Channel) ->

+ 63 - 3
src/emqx_connection.erl

@@ -41,8 +41,13 @@
         , stats/1
         ]).
 
+-export([ async_set_keepalive/4
+        , async_set_socket_options/2
+        ]).
+
 -export([ call/2
         , call/3
+        , cast/2
         ]).
 
 %% Callback
@@ -56,7 +61,7 @@
         ]).
 
 %% Internal callback
--export([wakeup_from_hib/2, recvloop/2]).
+-export([wakeup_from_hib/2, recvloop/2, get_state/1]).
 
 %% Export for CT
 -export([set_field/3]).
@@ -184,6 +189,35 @@ stats(#state{transport = Transport,
     ProcStats = emqx_misc:proc_stats(),
     lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
 
+%% @doc Set TCP keepalive socket options to override system defaults.
+%% Idle: The number of seconds a connection needs to be idle before
+%%       TCP begins sending out keep-alive probes (Linux default 7200).
+%% Interval: The number of seconds between TCP keep-alive probes
+%%           (Linux default 75).
+%% Probes: The maximum number of TCP keep-alive probes to send before
+%%         giving up and killing the connection if no response is
+%%         obtained from the other end (Linux default 9).
+%%
+%% NOTE: This API sets TCP socket options, which has nothing to do with
+%%       the MQTT layer's keepalive (PINGREQ and PINGRESP).
+async_set_keepalive(Pid, Idle, Interval, Probes) ->
+    Options = [ {keepalive, true}
+              , {raw, 6, 4, <<Idle:32/native>>}
+              , {raw, 6, 5, <<Interval:32/native>>}
+              , {raw, 6, 6, <<Probes:32/native>>}
+              ],
+    async_set_socket_options(Pid, Options).
+
+%% @doc Set custom socket options.
+%% This API is made async because the call might be originated from
+%% a hookpoint callback (otherwise deadlock).
+%% If failed to set, the error message is logged.
+async_set_socket_options(Pid, Options) ->
+    cast(Pid, {async_set_socket_options, Options}).
+
+cast(Pid, Req) ->
+    gen_server:cast(Pid, Req).
+
 call(Pid, Req) ->
     call(Pid, Req, infinity).
 call(Pid, Req, Timeout) ->
@@ -366,6 +400,9 @@ handle_msg({'$gen_call', From, Req}, State) ->
             gen_server:reply(From, Reply),
             stop(Reason, NState)
     end;
+handle_msg({'$gen_cast', Req}, State) ->
+    NewState = handle_cast(Req, State),
+    {ok, NewState};
 
 handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
     ?LOG(debug, "RECV ~0p", [Data]),
@@ -475,7 +512,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport,
         E : C : S ->
             ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
     end,
-    ?tp(debug, terminate, #{}),
+    ?tp(info, terminate, #{reason => Reason}),
     maybe_raise_excption(Reason).
 
 %% close socket, discard new state, always return ok.
@@ -683,12 +720,31 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
     end;
 
 handle_info({sock_error, Reason}, State) ->
-    Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]),
+    case Reason =/= closed andalso Reason =/= einval of
+        true -> ?LOG(warning, "socket_error: ~p", [Reason]);
+        false -> ok
+    end,
     handle_info({sock_closed, Reason}, close_socket(State));
 
 handle_info(Info, State) ->
     with_channel(handle_info, [Info], State).
 
+%%--------------------------------------------------------------------
+%% Handle Info
+
+handle_cast({async_set_socket_options, Opts},
+            State = #state{transport = Transport,
+                           socket    = Socket
+                          }) ->
+    case Transport:setopts(Socket, Opts) of
+        ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts});
+        Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err})
+    end,
+    State;
+handle_cast(Req, State) ->
+    ?tp(error, "received_unknown_cast", #{cast => Req}),
+    State.
+
 %%--------------------------------------------------------------------
 %% Ensure rate limit
 
@@ -817,3 +873,7 @@ set_field(Name, Value, State) ->
     Pos = emqx_misc:index_of(Name, record_info(fields, state)),
     setelement(Pos+1, State, Value).
 
+get_state(Pid) ->
+    State = sys:get_state(Pid),
+    maps:from_list(lists:zip(record_info(fields, state),
+                             tl(tuple_to_list(State)))).

+ 54 - 19
src/emqx_frame.erl

@@ -40,6 +40,8 @@
              , serialize_opts/0
              ]).
 
+-define(Q(BYTES, Q), {BYTES, Q}).
+
 -type(options() :: #{strict_mode => boolean(),
                      max_size => 1..?MAX_PACKET_SIZE,
                      version => emqx_types:version()
@@ -50,12 +52,12 @@
 -type(parse_result() :: {more, parse_state()}
                       | {ok, emqx_types:packet(), binary(), parse_state()}).
 
--type(cont_state() :: {Stage :: len | body,
-                       State ::  #{hdr := #mqtt_packet_header{},
-                                   len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
-                                   rest => binary()
-                                  }
-                      }).
+-type(cont_state() ::
+      {Stage :: len | body,
+       State ::  #{hdr := #mqtt_packet_header{},
+                   len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
+                   rest => binary() | ?Q(non_neg_integer(), queue:queue(binary()))
+                  }}).
 
 -type(serialize_opts() :: options()).
 
@@ -117,9 +119,19 @@ parse(Bin, {{len, #{hdr := Header,
     parse_remaining_len(Bin, Header, Multiplier, Length, Options);
 parse(Bin, {{body, #{hdr := Header,
                      len := Length,
-                     rest := Rest}
+                     rest := Body}
              }, Options}) when is_binary(Bin) ->
-    parse_frame(<<Rest/binary, Bin/binary>>, Header, Length, Options).
+    BodyBytes = body_bytes(Body),
+    {NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin),
+    NewBody = append_body(Body, NewBodyPart),
+    parse_frame(NewBody, Tail, Header, Length, Options).
+
+%% split given binary with the first N bytes
+split(N, Bin) when N =< 0 ->
+    {Bin, <<>>};
+split(N, Bin) when N =< size(Bin) ->
+    <<H:N/binary, T/binary>> = Bin,
+    {H, T}.
 
 parse_remaining_len(<<>>, Header, Options) ->
     {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
@@ -132,7 +144,8 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
 parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
     {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
 %% Match DISCONNECT without payload
-parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
+parse_remaining_len(<<0:8, Rest/binary>>,
+                    Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
     Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
     {ok, Packet, Rest, ?none(Options)};
 %% Match PINGREQ.
@@ -149,16 +162,35 @@ parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Opti
 parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
                     Options = #{max_size := MaxSize}) ->
     FrameLen = Value + Len * Multiplier,
-    if
-        FrameLen > MaxSize -> error(frame_too_large);
-        true -> parse_frame(Rest, Header, FrameLen, Options)
+    case FrameLen > MaxSize of
+        true -> error(frame_too_large);
+        false -> parse_frame(Rest, Header, FrameLen, Options)
     end.
 
-parse_frame(Bin, Header, 0, Options) ->
-    {ok, packet(Header), Bin, ?none(Options)};
-parse_frame(Bin, Header, Length, Options) ->
-    case Bin of
-        <<FrameBin:Length/binary, Rest/binary>> ->
+body_bytes(B) when is_binary(B) -> size(B);
+body_bytes(?Q(Bytes, _)) -> Bytes.
+
+append_body(H, T) when is_binary(H) andalso size(H) < 1024 ->
+    <<H/binary, T/binary>>;
+append_body(H, T) when is_binary(H) ->
+    Bytes = size(H) + size(T),
+    ?Q(Bytes, queue:from_list([H, T]));
+append_body(?Q(Bytes, Q), T) ->
+    ?Q(Bytes + iolist_size(T), queue:in(T, Q)).
+
+flatten_body(Body, Tail) when is_binary(Body) -> <<Body/binary, Tail/binary>>;
+flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]).
+
+parse_frame(Body, Header, Length, Options) ->
+    %% already appended
+    parse_frame(Body, _SplitTail = <<>>, Header, Length, Options).
+
+parse_frame(Body, Tail, Header, 0, Options) ->
+    {ok, packet(Header), flatten_body(Body, Tail), ?none(Options)};
+parse_frame(Body, Tail, Header, Length, Options) ->
+    case body_bytes(Body) >= Length of
+        true ->
+            <<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body, Tail),
             case parse_packet(Header, FrameBin, Options) of
                 {Variable, Payload} ->
                     {ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
@@ -167,8 +199,11 @@ parse_frame(Bin, Header, Length, Options) ->
                 Variable ->
                     {ok, packet(Header, Variable), Rest, ?none(Options)}
             end;
-        TooShortBin ->
-            {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}}
+        false ->
+            {more, {{body, #{hdr => Header,
+                             len => Length,
+                             rest => append_body(Body, Tail)
+                            }}, Options}}
     end.
 
 -compile({inline, [packet/1, packet/2, packet/3]}).

+ 9 - 10
src/emqx_metrics.erl

@@ -201,16 +201,8 @@ stop() -> gen_server:stop(?SERVER).
 
 %% BACKW: v4.3.0
 upgrade_retained_delayed_counter_type() ->
-    case ets:info(?TAB, name) of
-        ?TAB ->
-            [M1] = ets:lookup(?TAB, 'messages.retained'),
-            [M2] = ets:lookup(?TAB, 'messages.delayed'),
-            true = ets:insert(?TAB, M1#metric{type = counter}),
-            true = ets:insert(?TAB, M2#metric{type = counter}),
-            ok;
-        _ ->
-            ok
-    end.
+    Ks = ['messages.retained', 'messages.delayed'],
+    gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity).
 
 %%--------------------------------------------------------------------
 %% Metrics API
@@ -467,6 +459,13 @@ handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) ->
             {reply, {ok, NextIdx}, State#state{next_idx = NextIdx + 1}}
     end;
 
+handle_call({set_type_to_counter, Keys}, _From, State) ->
+    lists:foreach(
+      fun(K) ->
+        ets:update_element(?TAB, K, {#metric.type, counter})
+      end, Keys),
+    {reply, ok, State};
+
 handle_call(Req, _From, State) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
     {reply, ignored, State}.

+ 20 - 8
src/emqx_node_dump.erl

@@ -45,16 +45,28 @@ censor(Path, M) when is_map(M) ->
     maps:map(Fun, M);
 censor(Path, L = [Fst|_]) when is_tuple(Fst) ->
     [censor(Path, I) || I <- L];
-censor(Path, Val) ->
-    case Path of
-        [password|_] ->
-            obfuscate_value(Val);
-        [secret|_]  ->
-            obfuscate_value(Val);
-        _ ->
-            Val
+censor([Key | _], Val) ->
+    case is_sensitive(Key) of
+        true -> obfuscate_value(Val);
+        false -> Val
     end.
 
+is_sensitive(Key) when is_atom(Key) ->
+    is_sensitive(atom_to_binary(Key));
+is_sensitive(Key) when is_list(Key) ->
+    try iolist_to_binary(Key) of
+        Bin ->
+            is_sensitive(Bin)
+    catch
+        _ : _ ->
+            false
+    end;
+is_sensitive(Key) when is_binary(Key) ->
+    lists:any(fun(Pattern) -> re:run(Key, Pattern) =/= nomatch end,
+              ["passwd", "password", "secret"]);
+is_sensitive(Key) when is_tuple(Key) ->
+    false.
+
 obfuscate_value(Val) when is_binary(Val) ->
     <<"********">>;
 obfuscate_value(_Val) ->

+ 68 - 40
src/emqx_plugins.erl

@@ -61,7 +61,7 @@ init() ->
 %% @doc Load all plugins when the broker started.
 -spec(load() -> ok | ignore | {error, term()}).
 load() ->
-    load_expand_plugins(),
+    ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)),
     case emqx:get_env(plugins_loaded_file) of
         undefined -> ignore; %% No plugins available
         File ->
@@ -148,46 +148,61 @@ init_config(CfgFile) ->
                       [application:set_env(App, Par, Val) || {Par, Val} <- Envs]
                   end, AppsEnv).
 
-load_expand_plugins() ->
-    case emqx:get_env(expand_plugins_dir) of
-        undefined -> ok;
-        ExpandPluginsDir ->
-            Plugins = filelib:wildcard("*", ExpandPluginsDir),
-            lists:foreach(fun(Plugin) ->
-                PluginDir = filename:join(ExpandPluginsDir, Plugin),
+%% load external plugins which are placed in etc/plugins dir
+load_ext_plugins(undefined) -> ok;
+load_ext_plugins(Dir) ->
+    lists:foreach(
+        fun(Plugin) ->
+                PluginDir = filename:join(Dir, Plugin),
                 case filelib:is_dir(PluginDir) of
-                    true  -> load_expand_plugin(PluginDir);
+                    true  -> load_ext_plugin(PluginDir);
                     false -> ok
                 end
-            end, Plugins)
-    end.
+        end, filelib:wildcard("*", Dir)).
 
-load_expand_plugin(PluginDir) ->
-    init_expand_plugin_config(PluginDir),
+load_ext_plugin(PluginDir) ->
+    ?LOG(debug, "loading_extra_plugin: ~s", [PluginDir]),
     Ebin = filename:join([PluginDir, "ebin"]),
+    AppFile = filename:join([Ebin, "*.app"]),
+    AppName = case filelib:wildcard(AppFile) of
+                  [App] ->
+                      list_to_atom(filename:basename(App, ".app"));
+                  [] ->
+                      ?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]),
+                      error({plugin_app_file_not_found, AppFile})
+              end,
+    ok = load_plugin_app(AppName, Ebin),
+    ok = load_plugin_conf(AppName, PluginDir).
+
+load_plugin_app(AppName, Ebin) ->
     _ = code:add_patha(Ebin),
     Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
-    lists:foreach(fun(Mod) ->
-        Module = list_to_atom(filename:basename(Mod, ".beam")),
-        code:load_file(Module)
-    end, Modules),
-    case filelib:wildcard(Ebin ++ "/*.app") of
-        [App|_] -> application:load(list_to_atom(filename:basename(App, ".app")));
-        _ -> ?LOG(alert, "Plugin not found."),
-             {error, load_app_fail}
+    lists:foreach(
+        fun(BeamFile) ->
+                Module = list_to_atom(filename:basename(BeamFile, ".beam")),
+                case code:ensure_loaded(Module) of
+                    {module, Module} -> ok;
+                    {error, Reason} -> error({failed_to_load_plugin_beam, BeamFile, Reason})
+                end
+        end, Modules),
+    case application:load(AppName) of
+        ok -> ok;
+        {error, {already_loaded, _}} -> ok
     end.
 
-init_expand_plugin_config(PluginDir) ->
-    Priv = PluginDir ++ "/priv",
-    Etc  = PluginDir ++ "/etc",
-    Schema = filelib:wildcard(Priv ++ "/*.schema"),
-    Conf = case filelib:wildcard(Etc ++ "/*.conf") of
-        [] -> [];
-        [Conf1] -> cuttlefish_conf:file(Conf1)
-    end,
+load_plugin_conf(AppName, PluginDir) ->
+    Priv = filename:join([PluginDir, "priv"]),
+    Etc  = filename:join([PluginDir, "etc"]),
+    Schema = filelib:wildcard(filename:join([Priv, "*.schema"])),
+    ConfFile = filename:join([Etc, atom_to_list(AppName) ++ ".conf"]),
+    Conf = case filelib:is_file(ConfFile) of
+               true -> cuttlefish_conf:file(ConfFile);
+               false -> error({conf_file_not_found, ConfFile})
+           end,
+    ?LOG(debug, "loading_extra_plugin_config conf=~s, schema=~s", [ConfFile, Schema]),
     AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf),
-    lists:foreach(fun({AppName, Envs}) ->
-        [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs]
+    lists:foreach(fun({AppName1, Envs}) ->
+        [application:set_env(AppName1, Par, Val) || {Par, Val} <- Envs]
     end, AppsEnv).
 
 ensure_file(File) ->
@@ -210,10 +225,11 @@ filter_plugins(Names) ->
                     end, Names).
 
 load_plugins(Names, Persistent) ->
-    Plugins = list(), NotFound = Names -- names(Plugins),
+    Plugins = list(),
+    NotFound = Names -- names(Plugins),
     case NotFound of
         []       -> ok;
-        NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound])
+        NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound])
     end,
     NeedToLoad = Names -- NotFound -- names(started_app),
     lists:foreach(fun(Name) ->
@@ -223,19 +239,31 @@ load_plugins(Names, Persistent) ->
 
 generate_configs(App) ->
     ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config",
-    ConfFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf",
-    SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema",
-    case {filelib:is_file(ConfigFile), filelib:is_file(ConfFile) andalso filelib:is_file(SchemaFile)} of
-        {true, _} ->
+    case filelib:is_file(ConfigFile) of
+        true ->
             {ok, [Configs]} = file:consult(ConfigFile),
             Configs;
-        {_, true} ->
+        false ->
+            do_generate_configs(App)
+    end.
+
+do_generate_configs(App) ->
+    Name1 = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf",
+    Name2 = filename:join([code:lib_dir(App), "etc", App]) ++ ".conf",
+    ConfFile = case {filelib:is_file(Name1), filelib:is_file(Name2)} of
+                   {true, _} -> Name1;
+                   {false, true} -> Name2;
+                   {false, false} -> error({config_not_found, [Name1, Name2]})
+               end,
+    SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema",
+    case filelib:is_file(SchemaFile) of
+        true ->
             Schema = cuttlefish_schema:files([SchemaFile]),
             Conf = cuttlefish_conf:file(ConfFile),
             LogFun = fun(Key, Value) -> ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]) end,
             cuttlefish_generator:map(Schema, Conf, undefined, LogFun);
-        {false, false} ->
-            error({config_not_found, {ConfigFile, ConfFile, SchemaFile}})
+        false ->
+            error({schema_not_found, SchemaFile})
     end.
 
 apply_configs([]) ->

+ 37 - 22
src/emqx_trie.erl

@@ -194,6 +194,11 @@ delete_key(Key) ->
             ok
     end.
 
+%% micro-optimization: no need to lookup when topic is not wildcard
+%% because we only insert wildcards to emqx_trie
+lookup_topic(_Topic, false) -> [];
+lookup_topic(Topic, true) -> lookup_topic(Topic).
+
 lookup_topic(Topic) when is_binary(Topic) ->
     case ets:lookup(?TRIE, ?TOPIC(Topic)) of
         [#?TRIE{count = C}] -> [Topic || C > 0];
@@ -219,15 +224,22 @@ do_match(Words) ->
     do_match(Words, empty).
 
 do_match(Words, Prefix) ->
-    match(is_compact(), Words, Prefix, []).
+    case is_compact() of
+        true -> match_compact(Words, Prefix, false, []);
+        false -> match_no_compact(Words, Prefix, false, [])
+    end.
 
-match(_IsCompact, [], Topic, Acc) ->
-    'match_#'(Topic) ++ %% try match foo/bar/#
-    lookup_topic(Topic) ++ %% try match foo/bar
+match_no_compact([], Topic, IsWildcard, Acc) ->
+    'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/#
+    lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+
     Acc;
-match(IsCompact, [Word | Words], Prefix, Acc0) ->
-    case {has_prefix(Prefix), IsCompact} of
-        {false, false} ->
+match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
+    case has_prefix(Prefix) of
+        true ->
+            Acc1 = 'match_#'(Prefix) ++ Acc0,
+            Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1),
+            match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc);
+        false ->
             %% non-compact paths in database
             %% if there is no prefix matches the current topic prefix
             %% we can simpliy return from here
@@ -240,21 +252,24 @@ match(IsCompact, [Word | Words], Prefix, Acc0) ->
             %% then at the second level, we lookup prefix a/x,
             %% no such prefix to be found, meaning there is no point
             %% searching for 'a/x/y', 'a/x/+' or 'a/x/#'
-            Acc0;
-        _ ->
-            %% compact paths in database
-            %% we have to enumerate all possible prefixes
-            %% e.g. a/+/b/# results with below entries in database
-            %%   - a/+
-            %%   - a/+/b/#
-            %% when matching a/x/y, we need to enumerate
-            %%   - a
-            %%   - a/x
-            %%   - a/x/y
-            %% *with '+', '#' replaced at each level
-            Acc1 = 'match_#'(Prefix) ++ Acc0,
-            Acc = match(IsCompact, Words, join(Prefix, '+'), Acc1),
-            match(IsCompact, Words, join(Prefix, Word), Acc)
+            Acc0
+    end.
+
+match_compact([], Topic, IsWildcard, Acc) ->
+    'match_#'(Topic) ++ %% try match foo/bar/#
+    lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar
+    Acc;
+match_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
+    Acc1 = 'match_#'(Prefix) ++ Acc0,
+    Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1),
+    WildcardPrefix = join(Prefix, '+'),
+    %% go deeper to match current_prefix/+ only when:
+    %% 1. current word is the last
+    %% OR
+    %% 2. there is a prefix = 'current_prefix/+'
+    case Words =:= [] orelse has_prefix(WildcardPrefix) of
+        true -> match_compact(Words, WildcardPrefix, true, Acc);
+        false -> Acc
     end.
 
 'match_#'(Prefix) ->

+ 13 - 0
test/emqx_cm_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(CM, emqx_cm).
 -define(ChanInfo,#{conninfo =>
@@ -179,6 +180,18 @@ t_discard_session(_) ->
     ok = emqx_cm:unregister_channel(<<"clientid">>),
     ok = meck:unload(emqx_connection).
 
+t_discard_session_race(_) ->
+    ok = snabbkaffe:start_trace(),
+    #{conninfo := ConnInfo0} = ?ChanInfo,
+    ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
+    {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end),
+    ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo),
+    Pid ! stop,
+    receive {'DOWN', Ref, process, Pid, normal} -> ok end,
+    ok = emqx_cm:discard_session(<<"clientid">>),
+    {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000),
+    snabbkaffe:stop().
+
 t_takeover_session(_) ->
     #{conninfo := ConnInfo} = ?ChanInfo,
     {error, not_found} = emqx_cm:takeover_session(<<"clientid">>),

+ 52 - 0
test/emqx_mqtt_SUITE.erl

@@ -22,6 +22,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg,
                      recv_oct, recv_cnt, send_oct, send_cnt,
@@ -38,6 +39,19 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
 
+init_per_testcase(TestCase, Config) ->
+    case erlang:function_exported(?MODULE, TestCase, 2) of
+        true -> ?MODULE:TestCase(init, Config);
+        false -> Config
+    end.
+
+end_per_testcase(TestCase, Config) ->
+    case erlang:function_exported(?MODULE, TestCase, 2) of
+        true -> ?MODULE:TestCase('end', Config);
+        false -> ok
+    end,
+    Config.
+
 t_conn_stats(_) ->
     with_client(fun(CPid) ->
                             Stats = emqx_connection:stats(CPid),
@@ -134,3 +148,41 @@ with_client(TestFun, _Options) ->
             emqtt:stop(C)
     end.
 
+t_async_set_keepalive(init, Config) ->
+    ok = snabbkaffe:start_trace(),
+    Config;
+t_async_set_keepalive('end', _Config) ->
+    snabbkaffe:stop(),
+    ok.
+
+t_async_set_keepalive(_) ->
+    ClientID = <<"client-tcp-keepalive">>,
+    {ok, Client} = emqtt:start_link([{host, "localhost"},
+                                     {proto_ver,v5},
+                                     {clientid, ClientID},
+                                     {clean_start, false}]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _} = ?block_until(#{?snk_kind := insert_channel_info,
+                             client_id := ClientID}, 2000, 100),
+    [Pid] = emqx_cm:lookup_channels(ClientID),
+    State = emqx_connection:get_state(Pid),
+    Transport = maps:get(transport, State),
+    Socket = maps:get(socket, State),
+    ?assert(is_port(Socket)),
+    Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}],
+    {ok, [ {raw, 6, 4, <<Idle:32/native>>}
+         , {raw, 6, 5, <<Interval:32/native>>}
+         , {raw, 6, 6, <<Probes:32/native>>}
+         ]} = Transport:getopts(Socket, Opts),
+    ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]),
+    emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1),
+    {ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000),
+    {ok, [ {raw, 6, 4, <<NewIdle:32/native>>}
+         , {raw, 6, 5, <<NewInterval:32/native>>}
+         , {raw, 6, 6, <<NewProbes:32/native>>}
+         ]} = Transport:getopts(Socket, Opts),
+    ?assertEqual(NewIdle, Idle + 1),
+    ?assertEqual(NewInterval, Interval + 1),
+    ?assertEqual(NewProbes, Probes + 1),
+    emqtt:stop(Client),
+    ok.

+ 7 - 11
test/emqx_plugins_SUITE.erl

@@ -30,24 +30,20 @@ init_per_suite(Config) ->
 
     DataPath = proplists:get_value(data_dir, Config),
     AppPath = filename:join([DataPath, "emqx_mini_plugin"]),
-    Cmd = lists:flatten(io_lib:format("cd ~s && make && cp -r etc _build/default/lib/emqx_mini_plugin/", [AppPath])),
+    Cmd = lists:flatten(io_lib:format("cd ~s && make", [AppPath])),
 
     ct:pal("Executing ~s~n", [Cmd]),
     ct:pal("~n ~s~n", [os:cmd(Cmd)]),
 
-    code:add_path(filename:join([AppPath, "_build", "default", "lib", "emqx_mini_plugin", "ebin"])),
-
     put(loaded_file, filename:join([DataPath, "loaded_plugins"])),
     emqx_ct_helpers:boot_modules([]),
-    emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1),
+    emqx_ct_helpers:start_apps([], fun(_) -> set_special_cfg(DataPath) end),
 
     Config.
-    
-set_sepecial_cfg(_) ->
-    ExpandPath = filename:dirname(code:lib_dir(emqx_mini_plugin)),
 
+set_special_cfg(PluginsDir) ->
     application:set_env(emqx, plugins_loaded_file, get(loaded_file)),
-    application:set_env(emqx, expand_plugins_dir, ExpandPath),
+    application:set_env(emqx, expand_plugins_dir, PluginsDir),
     ok.
 
 end_per_suite(_Config) ->
@@ -58,7 +54,6 @@ t_load(_) ->
     ?assertEqual(ok, emqx_plugins:unload()),
 
     ?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)),
-    ?assertEqual({error, parse_config_file_failed}, emqx_plugins:load(emqx_mini_plugin)),
     ?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)),
 
     application:set_env(emqx, expand_plugins_dir, undefined),
@@ -75,8 +70,9 @@ t_init_config(_) ->
     file:delete(ConfFile),
     ?assertEqual({ok,test}, application:get_env(emqx_mini_plugin, mininame)).
 
-t_load_expand_plugin(_) ->
-    ?assertEqual({error, load_app_fail}, emqx_plugins:load_expand_plugin("./not_existed_path/")).
+t_load_ext_plugin(_) ->
+    ?assertError({plugin_app_file_not_found, _},
+                 emqx_plugins:load_ext_plugin("./not_existed_path/")).
 
 t_list(_) ->
     ?assertMatch([{plugin, _, _, _, _, _, _, _} | _ ], emqx_plugins:list()).

+ 2 - 11
test/emqx_plugins_SUITE_data/emqx_mini_plugin/Makefile

@@ -8,6 +8,7 @@ all: compile
 
 compile:
 	$(REBAR) compile
+	cp -r _build/default/lib/emqx_mini_plugin/ebin ./
 
 clean: distclean
 
@@ -22,14 +23,4 @@ xref:
 
 distclean:
 	@rm -rf _build
-	@rm -f data/app.*.config data/vm.*.args rebar.lock
-
-CUTTLEFISH_SCRIPT = _build/default/lib/cuttlefish/cuttlefish
-
-$(CUTTLEFISH_SCRIPT):
-	@${REBAR} get-deps
-	@if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi
-
-app.config: $(CUTTLEFISH_SCRIPT) etc/emqx_mini_plugin.conf
-	$(verbose) $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/emqx_mini_plugin.conf -i priv/emqx_mini_plugin.schema -d data
-
+	@rm -f ebin/ data/app.*.config data/vm.*.args rebar.lock

+ 8 - 2
test/emqx_plugins_SUITE_data/emqx_mini_plugin/rebar.config

@@ -1,5 +1,4 @@
-{deps,
-    []}.
+{deps, []}.
 
 {edoc_opts, [{preprocess, true}]}.
 {erl_opts, [warn_unused_vars,
@@ -15,3 +14,10 @@
 {cover_enabled, true}.
 {cover_opts, [verbose]}.
 {cover_export_enabled, true}.
+
+{profiles,
+    [{test, [
+        {deps, [ {emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.4"}}}
+               ]}
+    ]}
+]}.

+ 25 - 2
test/emqx_trie_SUITE.erl

@@ -102,10 +102,13 @@ t_match2(_) ->
     ?assertEqual([], ?TRIE:match(<<"$SYS/broker/zenmq">>)).
 
 t_match3(_) ->
-    Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
+    Topics = [<<"d/#">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
     trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end),
     Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]),
-    ?assertEqual(4, length(Matched)),
+    case length(Matched) of
+        3 -> ok;
+        _ -> error({unexpected, Matched})
+    end,
     SysMatched = emqx_trie:match(<<"$SYS/a/b/c">>),
     ?assertEqual([<<"$SYS/#">>], SysMatched).
 
@@ -114,6 +117,26 @@ t_match4(_) ->
     trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end),
     ?assertEqual([<<"/#">>, <<"/+/a/b/c">>], lists:sort(emqx_trie:match(<<"/0/a/b/c">>))).
 
+t_match5(_) ->
+    T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
+    Topics = [<<"#">>, <<T/binary, "/#">>, <<T/binary, "/+">>],
+    trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end),
+    ?assertEqual([<<"#">>, <<T/binary, "/#">>], lists:sort(emqx_trie:match(T))),
+    ?assertEqual([<<"#">>, <<T/binary, "/#">>, <<T/binary, "/+">>],
+                 lists:sort(emqx_trie:match(<<T/binary, "/1">>))).
+
+t_match6(_) ->
+    T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
+    W = <<"+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/#">>,
+    trans(fun() -> emqx_trie:insert(W) end),
+    ?assertEqual([W], emqx_trie:match(T)).
+
+t_match7(_) ->
+    T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
+    W = <<"a/+/c/+/e/+/g/+/i/+/k/+/m/+/o/+/q/+/s/+/u/+/w/+/y/+/#">>,
+    trans(fun() -> emqx_trie:insert(W) end),
+    ?assertEqual([W], emqx_trie:match(T)).
+
 t_empty(_) ->
     ?assert(?TRIE:empty()),
     trans(fun ?TRIE:insert/1, [<<"topic/x/#">>]),