فهرست منبع

Merge pull request #4903 from emqx/resolve-master-dev/v5.0-conflict-release-version

Auto-pull-request-on-2021-05-31
Zaiming (Stone) Shi 4 سال پیش
والد
کامیت
65a660aaea
45فایلهای تغییر یافته به همراه620 افزوده شده و 368 حذف شده
  1. 2 0
      .ci/build_packages/Dockerfile
  2. 20 7
      .ci/build_packages/tests.sh
  3. 26 24
      .github/workflows/build_packages.yaml
  4. 0 3
      .github/workflows/run_cts_tests.yaml
  5. 0 3
      .github/workflows/run_fvt_tests.yaml
  6. 0 3
      .github/workflows/run_test_cases.yaml
  7. 1 1
      Makefile
  8. 1 0
      README-CN.md
  9. 1 0
      README-JP.md
  10. 1 0
      README-RU.md
  11. 1 1
      README.md
  12. 1 1
      apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src
  13. 15 0
      apps/emqx_auth_jwt/src/emqx_auth_jwt.appup.src
  14. 7 5
      apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl
  15. 19 0
      apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl
  16. 5 16
      apps/emqx_management/src/emqx_management.appup.src
  17. 32 16
      apps/emqx_management/src/emqx_mgmt_api.erl
  18. 19 2
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  19. 25 9
      apps/emqx_management/src/emqx_mgmt_api_data.erl
  20. 6 0
      apps/emqx_management/src/emqx_mgmt_http.erl
  21. 17 0
      apps/emqx_management/test/emqx_mgmt_api_SUITE.erl
  22. 1 1
      apps/emqx_sn/src/emqx_sn.app.src
  23. 6 10
      apps/emqx_sn/src/emqx_sn.appup.src
  24. 3 12
      apps/emqx_sn/src/emqx_sn_app.erl
  25. 40 33
      apps/emqx_sn/src/emqx_sn_gateway.erl
  26. 80 48
      apps/emqx_sn/src/emqx_sn_registry.erl
  27. 11 17
      apps/emqx_sn/src/emqx_sn_sup.erl
  28. 2 1
      apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl
  29. 59 65
      apps/emqx_sn/test/emqx_sn_registry_SUITE.erl
  30. 1 1
      apps/emqx_web_hook/src/emqx_web_hook.app.src
  31. 6 0
      apps/emqx_web_hook/src/emqx_web_hook.appup.src
  32. 1 1
      apps/emqx_web_hook/src/emqx_web_hook_actions.erl
  33. 0 1
      bin/nodetool
  34. 1 0
      deploy/charts/emqx/README.md
  35. 4 0
      deploy/charts/emqx/templates/StatefulSet.yaml
  36. 8 0
      deploy/charts/emqx/templates/rbac.yaml
  37. 3 0
      deploy/charts/emqx/values.yaml
  38. 7 0
      etc/emqx.conf
  39. 1 1
      rebar.config
  40. 75 0
      scripts/update_appup.escript
  41. 67 74
      src/emqx.appup.src
  42. 3 3
      src/emqx_channel.erl
  43. 1 0
      src/emqx_connection.erl
  44. 7 2
      src/emqx_plugins.erl
  45. 34 7
      test/mqtt_protocol_v5_SUITE.erl

+ 2 - 0
.ci/build_packages/Dockerfile

@@ -7,6 +7,8 @@ COPY . /emqx
 
 WORKDIR /emqx
 
+RUN rm -rf _build/${EMQX_NAME}/lib _build/${EMQX_NAME}-pkg/lib
+
 RUN make ${EMQX_NAME}-zip || cat rebar3.crashdump
 
 RUN make ${EMQX_NAME}-pkg || cat rebar3.crashdump

+ 20 - 7
.ci/build_packages/tests.sh

@@ -3,10 +3,23 @@ set -x -e -u
 export CODE_PATH=${CODE_PATH:-"/emqx"}
 export EMQX_NAME=${EMQX_NAME:-"emqx"}
 export PACKAGE_PATH="${CODE_PATH}/_packages/${EMQX_NAME}"
-export RELUP_PACKAGE_PATH="${CODE_PATH}/relup_packages/${EMQX_NAME}"
+export RELUP_PACKAGE_PATH="${CODE_PATH}/_upgrade_base"
 # export EMQX_NODE_NAME="emqx-on-$(uname -m)@127.0.0.1"
 # export EMQX_NODE_COOKIE=$(date +%s%N)
 
+case "$(uname -m)" in
+    x86_64)
+        ARCH='amd64'
+        ;;
+    aarch64)
+        ARCH='arm64'
+        ;;
+    arm*)
+        ARCH=arm
+        ;;
+esac
+export ARCH
+
 emqx_prepare(){
     mkdir -p "${PACKAGE_PATH}"
 
@@ -136,19 +149,19 @@ running_test(){
 }
 
 relup_test(){
-    TARGET_VERSION="$1"
+    TARGET_VERSION="$("$CODE_PATH"/pkg-vsn.sh)"
     if [ -d "${RELUP_PACKAGE_PATH}" ];then
-        cd "${RELUP_PACKAGE_PATH }"
+        cd "${RELUP_PACKAGE_PATH}"
 
-        for var in "${EMQX_NAME}"-*-"$(uname -m)".zip;do
+        for var in "${EMQX_NAME}"-*-"${ARCH}".zip;do
             packagename=$(basename "${var}")
             unzip "$packagename"
             ./emqx/bin/emqx start || ( tail emqx/log/emqx.log.1 && exit 1 )
             ./emqx/bin/emqx_ctl status
             ./emqx/bin/emqx versions
-            cp "${PACKAGE_PATH}/${EMQX_NAME}"-*-"${TARGET_VERSION}-$(uname -m)".zip ./emqx/releases
+            cp "${PACKAGE_PATH}/${EMQX_NAME}"-*-"${TARGET_VERSION}-${ARCH}".zip ./emqx/releases
             ./emqx/bin/emqx install "${TARGET_VERSION}"
-            [ "$(./emqx/bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]")" = "${TARGET_VERSION}" ] || exit 1
+            [ "$(./emqx/bin/emqx versions |grep permanent | awk '{print $2}')" = "${TARGET_VERSION}" ] || exit 1
             ./emqx/bin/emqx_ctl status
             ./emqx/bin/emqx stop
             rm -rf emqx
@@ -158,4 +171,4 @@ relup_test(){
 
 emqx_prepare
 emqx_test
-# relup_test <TODO: parameterise relup target version>
+relup_test

+ 26 - 24
.github/workflows/build_packages.yaml

@@ -3,10 +3,6 @@ name: Cross build packages
 on:
   schedule:
     - cron:  '0 */6 * * *'
-  push:
-    tags:
-      - v*
-      - e*
   release:
     types:
       - published
@@ -19,25 +15,34 @@ jobs:
 
     outputs:
       profiles: ${{ steps.set_profile.outputs.profiles}}
+      old_vsns: ${{ steps.set_profile.outputs.old_vsns}}
 
     steps:
       - uses: actions/checkout@v2
         with:
           path: source
+          fetch-depth: 0
       - name: set profile
         id: set_profile
         shell: bash
         run: |
-          if make -C source emqx-ee --dry-run > /dev/null 2>&1; then
+          cd source
+          vsn="$(./pkg-vsn.sh)"
+          pre_vsn="$(echo $vsn | grep -oE '^[0-9]+.[0-9]')"
+          if make emqx-ee --dry-run > /dev/null 2>&1; then
+            old_vsns="$(git tag -l "e$pre_vsn.[0-9]" | xargs echo -n | sed "s/e$vsn//")"
+            echo "::set-output name=old_vsns::$old_vsns"
             echo "::set-output name=profiles::[\"emqx-ee\"]"
           else
+            old_vsns="$(git tag -l "v$pre_vsn.[0-9]" | xargs echo -n | sed "s/v$vsn//")"
+            echo "::set-output name=old_vsns::$old_vsns"
             echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]"
           fi
       - name: get_all_deps
         if: endsWith(github.repository, 'emqx')
         run: |
           make -C source deps-all
-          zip -ryq source.zip source
+          zip -ryq source.zip source/* source/.[^.]*
       - name: get_all_deps
         if: endsWith(github.repository, 'enterprise')
         run: |
@@ -45,7 +50,7 @@ jobs:
           git config --global credential.helper store
           echo "${{ secrets.CI_GIT_TOKEN }}" >> source/scripts/git-token
           make -C source deps-all
-          zip -ryq source.zip source
+          zip -ryq source.zip source/* source/.[^.]*
       - uses: actions/upload-artifact@v2
         with:
           name: source
@@ -251,35 +256,32 @@ jobs:
         path: .
     - name: unzip source code
       run: unzip -q source.zip
-    - name: downloads emqx zip packages
+    - name: downloads old emqx zip packages
       env:
         PROFILE: ${{ matrix.profile }}
         ARCH: ${{ matrix.arch }}
         SYSTEM: ${{ matrix.os }}
+        OLD_VSNS: ${{ needs.prepare.outputs.old_vsns }}
       run: |
-        set -e -u -x
-        cd source
-        if [ $PROFILE = "emqx" ];then broker="emqx-ce"; else broker="$PROFILE"; fi
-        if [ $PROFILE = "emqx-ee" ];then edition='enterprise'; else edition='opensource'; fi
-
-        vsn="$(./pkg-vsn.sh)"
-        pre_vsn="$(echo $vsn | grep -oE '^[0-9]+.[0-9]')"
-        if [ $PROFILE = "emqx-ee" ]; then
-            old_vsns=($(git tag -l "e$pre_vsn.[0-9]" | sed "s/e$vsn//"))
-        else
-            old_vsns=($(git tag -l "v$pre_vsn.[0-9]" | sed "s/v$vsn//"))
+        set -e -x -u
+        broker=$PROFILE
+        if [ $PROFILE = "emqx" ];then
+            broker="emqx-ce"
+        fi
+        if [[ "$SYSTEM" =~ "raspbian*" ]];then
+            export ARCH="arm"
         fi
 
-        mkdir -p _upgrade_base
-        cd _upgrade_base
-        for tag in ${old_vsns[@]};do
-          if [ ! -z "$(echo  $(curl -I -m 10 -o /dev/null -s -w %{http_code} https://s3-${{ secrets.AWS_DEFAULT_REGION }}.amazonaws.com/${{ secrets.AWS_S3_BUCKET }}/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip) | grep -oE "^[23]+")" ];then
+        mkdir -p source/_upgrade_base
+        cd source/_upgrade_base
+        old_vsns=($(echo $OLD_VSNS | tr ' ' ' '))
+        for tag in ${old_vsns[@]}; do
+          if [ ! -z "$(echo $(curl -I -m 10 -o /dev/null -s -w %{http_code} https://s3-us-west-2.amazonaws.com/packages.emqx/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip) | grep -oE "^[23]+")" ];then
             wget --no-verbose https://s3-us-west-2.amazonaws.com/packages.emqx/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip
             wget --no-verbose https://s3-us-west-2.amazonaws.com/packages.emqx/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip.sha256
             echo "$(cat $PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip.sha256) $PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip" | sha256sum -c || exit 1
           fi
         done
-        cd -
     - name: build emqx packages
       env:
         ERL_OTP: erl23.2.7.2-emqx-2

+ 0 - 3
.github/workflows/run_cts_tests.yaml

@@ -5,9 +5,6 @@ on:
     tags:
       - v*
       - e*
-  release:
-    types:
-      - published
   pull_request:
 
 jobs:

+ 0 - 3
.github/workflows/run_fvt_tests.yaml

@@ -5,9 +5,6 @@ on:
     tags:
       - v*
       - e*
-  release:
-    types:
-      - published
   pull_request:
 
 jobs:

+ 0 - 3
.github/workflows/run_test_cases.yaml

@@ -5,9 +5,6 @@ on:
     tags:
       - v*
       - e*
-  release:
-    types:
-      - published
   pull_request:
 
 jobs:

+ 1 - 1
Makefile

@@ -1,5 +1,5 @@
 $(shell $(CURDIR)/scripts/git-hooks-init.sh)
-REBAR_VERSION = 3.14.3-emqx-7
+REBAR_VERSION = 3.14.3-emqx-8
 REBAR = $(CURDIR)/rebar3
 BUILD = $(CURDIR)/build
 SCRIPTS = $(CURDIR)/scripts

+ 1 - 0
README-CN.md

@@ -7,6 +7,7 @@
 [![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io)
 [![Twitter](https://img.shields.io/badge/Twitter-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech)
 [![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow)](https://askemq.com)
+[![YouTube](https://img.shields.io/badge/Subscribe-EMQ%20中文-FF0000?logo=youtube)](https://www.youtube.com/channel/UCir_r04HIsLjf2qqyZ4A8Cg)
 
 [![最棒的物联网 MQTT 开源团队期待您的加入](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://careers.emqx.cn/)
 

+ 1 - 0
README-JP.md

@@ -6,6 +6,7 @@
 [![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx)](https://hub.docker.com/r/emqx/emqx)
 [![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io)
 [![Twitter](https://img.shields.io/badge/Twitter-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech)
+[![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
 
 [![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers)
 

+ 1 - 0
README-RU.md

@@ -7,6 +7,7 @@
 [![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io)
 [![Twitter](https://img.shields.io/badge/Follow-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech)
 [![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow?logo=github)](https://github.com/emqx/emqx/discussions)
+[![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
 
 [![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers)
 

+ 1 - 1
README.md

@@ -6,7 +6,7 @@
 [![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx)](https://hub.docker.com/r/emqx/emqx)
 [![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io)
 [![Twitter](https://img.shields.io/badge/Follow-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech)
-[![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow?logo=github)](https://github.com/emqx/emqx/discussions)
+[![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
 
 [![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers)
 

+ 1 - 1
apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src

@@ -1,6 +1,6 @@
 {application, emqx_auth_jwt,
  [{description, "EMQ X Authentication with JWT"},
-  {vsn, "4.3.0"}, % strict semver, bump manually!
+  {vsn, "4.4.0"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_auth_jwt_sup]},
   {applications, [kernel,stdlib,jose]},

+ 15 - 0
apps/emqx_auth_jwt/src/emqx_auth_jwt.appup.src

@@ -0,0 +1,15 @@
+%% -*-: erlang -*-
+{VSN,
+ [
+    {"4.3.0", [
+      {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []}
+    ]},
+    {<<".*">>, []}
+ ],
+ [
+    {"4.3.0", [
+      {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []}
+    ]},
+    {<<".*">>, []}
+ ]
+}.

+ 7 - 5
apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl

@@ -140,7 +140,7 @@ handle_verify(JwsCompacted,
               State = #state{static = Static, remote = Remote}) ->
     try
         Jwks = case emqx_json:decode(jose_jws:peek_protected(JwsCompacted), [return_maps]) of
-                   #{<<"kid">> := Kid} ->
+                   #{<<"kid">> := Kid} when Remote /= undefined ->
                        [J || J <- Remote, maps:get(<<"kid">>, J#jose_jwk.fields, undefined) =:= Kid];
                    _ -> Static
                end,
@@ -150,7 +150,9 @@ handle_verify(JwsCompacted,
                 {reply, do_verify(JwsCompacted, Jwks), State}
         end
     catch
-        _:_ ->
+        Class : Reason : Stk ->
+            ?LOG(error, "Handle JWK crashed: ~p, ~p, stacktrace: ~p~n",
+                        [Class, Reason, Stk]),
             {reply, {error, invalid_signature}, State}
     end.
 
@@ -186,8 +188,8 @@ do_verify(JwsCompacted, [Jwk|More]) ->
         {true, Payload, _Jws} ->
             Claims = emqx_json:decode(Payload, [return_maps]),
             case check_claims(Claims) of
-                false ->
-                    {error, invalid_signature};
+                {false, <<"exp">>} ->
+                    {error, {invalid_signature, expired}};
                 NClaims ->
                     {ok, NClaims}
             end;
@@ -217,6 +219,6 @@ do_check_claim([{K, F}|More], Claims) ->
         {V, NClaims} ->
             case F(V) of
                 true -> do_check_claim(More, NClaims);
-                _ -> false
+                _ -> {false, K}
             end
     end.

+ 19 - 0
apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl

@@ -33,6 +33,7 @@ groups() ->
                                  , t_check_claims
                                  , t_check_claims_clientid
                                  , t_check_claims_username
+                                 , t_check_claims_kid_in_header
                                  ]}
     ].
 
@@ -61,6 +62,12 @@ set_special_configs(emqx_auth_jwt) ->
 set_special_configs(_) ->
     ok.
 
+sign(Payload, Header, Key) when is_map(Header) ->
+    Jwk = jose_jwk:from_oct(Key),
+    Jwt = emqx_json:encode(Payload),
+    {_, Token} = jose_jws:compact(jose_jwt:sign(Jwk, Header, Jwt)),
+    Token;
+
 sign(Payload, Alg, Key) ->
     Jwk = jose_jwk:from_oct(Key),
     Jwt = emqx_json:encode(Payload),
@@ -145,3 +152,15 @@ t_check_claims_username(_) ->
     Result3 = emqx_access_control:authenticate(Plain#{password => Jwt_Error}),
     ct:pal("Auth result for the invalid jwt: ~p~n", [Result3]),
     ?assertEqual({error, invalid_signature}, Result3).
+
+t_check_claims_kid_in_header(_) ->
+    application:set_env(emqx_auth_jwt, verify_claims, []),
+    Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external},
+    Jwt = sign([{clientid, <<"client23">>},
+                {username, <<"plain">>},
+                {exp, os:system_time(seconds) + 3}],
+               #{<<"alg">> => <<"HS256">>,
+                 <<"kid">> => <<"a_kid_str">>}, <<"emqxsecret">>),
+    Result0 = emqx_access_control:authenticate(Plain#{password => Jwt}),
+    ct:pal("Auth result: ~p~n", [Result0]),
+    ?assertMatch({ok, #{auth_result := success, jwt_claims := _}}, Result0).

+ 5 - 16
apps/emqx_management/src/emqx_management.appup.src

@@ -1,23 +1,12 @@
-%% -*-: erlang -*-
+%% -*- mode: erlang -*-
 {VSN,
- [ {"4.3.2",
-    [ {load_module, emqx_mgmt, brutal_purge, soft_purge, []}
-    ]},
-   {<<"4.3.[0-1]">>,
-    [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []}
-    , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []}
-    , {load_module, emqx_mgmt, brutal_purge, soft_purge, []}
+ [ {<<"4.3.[0-2]">>,
+    [ {restart_application, emqx_management}
     ]},
    {<<".*">>, []}
  ],
- [
-   {"4.3.2",
-    [ {load_module, emqx_mgmt, brutal_purge, soft_purge, []}
-    ]},
-   {<<"4.3.[0-1]">>,
-    [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []}
-    , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []}
-    , {load_module, emqx_mgmt, brutal_purge, soft_purge, []}
+ [ {<<"4.3.[0-2]">>,
+    [ {restart_application, emqx_management}
     ]},
    {<<".*">>, []}
  ]

+ 32 - 16
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -239,22 +239,30 @@ pick_params_to_qs([{Key, Value}|Params], QsKits, Acc1, Acc2) ->
             end
     end.
 
-qs(<<"_gte_", Key/binary>>, Value, Type) ->
-    {binary_to_existing_atom(Key, utf8), '>=', to_type(Value, Type)};
-qs(<<"_lte_", Key/binary>>, Value, Type) ->
-    {binary_to_existing_atom(Key, utf8), '=<', to_type(Value, Type)};
-qs(<<"_like_", Key/binary>>, Value, Type) ->
-    {binary_to_existing_atom(Key, utf8), like, to_type(Value, Type)};
-qs(<<"_match_", Key/binary>>, Value, Type) ->
-    {binary_to_existing_atom(Key, utf8), match, to_type(Value, Type)};
-qs(Key, Value, Type) ->
-    {binary_to_existing_atom(Key, utf8), '=:=', to_type(Value, Type)}.
-
 qs(K1, V1, K2, V2, Type) ->
     {Key, Op1, NV1} = qs(K1, V1, Type),
     {Key, Op2, NV2} = qs(K2, V2, Type),
     {Key, Op1, NV1, Op2, NV2}.
 
+qs(K, Value0, Type) ->
+    try
+        qs(K, to_type(Value0, Type))
+    catch
+        throw : bad_value_type ->
+            throw({bad_value_type, {K, Type, Value0}})
+    end.
+
+qs(<<"_gte_", Key/binary>>, Value) ->
+    {binary_to_existing_atom(Key, utf8), '>=', Value};
+qs(<<"_lte_", Key/binary>>, Value) ->
+    {binary_to_existing_atom(Key, utf8), '=<', Value};
+qs(<<"_like_", Key/binary>>, Value) ->
+    {binary_to_existing_atom(Key, utf8), like, Value};
+qs(<<"_match_", Key/binary>>, Value) ->
+    {binary_to_existing_atom(Key, utf8), match, Value};
+qs(Key, Value) ->
+    {binary_to_existing_atom(Key, utf8), '=:=', Value}.
+
 is_fuzzy_key(<<"_like_", _/binary>>) ->
     true;
 is_fuzzy_key(<<"_match_", _/binary>>) ->
@@ -265,11 +273,19 @@ is_fuzzy_key(_) ->
 %%--------------------------------------------------------------------
 %% Types
 
-to_type(V, atom) -> to_atom(V);
-to_type(V, integer) -> to_integer(V);
-to_type(V, timestamp) -> to_timestamp(V);
-to_type(V, ip) -> aton(V);
-to_type(V, _) -> V.
+to_type(V, TargetType) ->
+    try
+        to_type_(V, TargetType)
+    catch
+        _ : _ ->
+            throw(bad_value_type)
+    end.
+
+to_type_(V, atom) -> to_atom(V);
+to_type_(V, integer) -> to_integer(V);
+to_type_(V, timestamp) -> to_timestamp(V);
+to_type_(V, ip) -> aton(V);
+to_type_(V, _) -> V.
 
 to_atom(A) when is_atom(A) ->
     A;

+ 19 - 2
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -140,10 +140,14 @@
 -define(format_fun, {?MODULE, format_channel_info}).
 
 list(Bindings, Params) when map_size(Bindings) == 0 ->
-    minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)});
+    fence(fun() ->
+        emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)
+    end);
 
 list(#{node := Node}, Params) when Node =:= node() ->
-    minirest:return({ok, emqx_mgmt_api:node_query(Node, Params, ?CLIENT_QS_SCHEMA, ?query_fun)});
+    fence(fun() ->
+        emqx_mgmt_api:node_query(Node, Params, ?CLIENT_QS_SCHEMA, ?query_fun)
+    end);
 
 list(Bindings = #{node := Node}, Params) ->
     case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of
@@ -151,6 +155,19 @@ list(Bindings = #{node := Node}, Params) ->
         Res -> Res
     end.
 
+%% @private
+fence(Func) ->
+    try
+        minirest:return({ok, Func()})
+    catch
+        throw : {bad_value_type, {_Key, Type, Value}} ->
+            Reason = iolist_to_binary(
+                       io_lib:format("Can't convert ~p to ~p type",
+                                     [Value, Type])
+                      ),
+            minirest:return({error, ?ERROR8, Reason})
+    end.
+
 lookup(#{node := Node, clientid := ClientId}, _Params) ->
     minirest:return({ok, emqx_mgmt:lookup_client(Node, {clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
 

+ 25 - 9
apps/emqx_management/src/emqx_mgmt_api_data.erl

@@ -110,7 +110,8 @@ get_list_exported() ->
 import(_Bindings, Params) ->
     case proplists:get_value(<<"filename">>, Params) of
         undefined ->
-            minirest:return({error, missing_required_params});
+            Result = import_content(Params),
+            minirest:return(Result);
         Filename ->
             case proplists:get_value(<<"node">>, Params) of
                 undefined ->
@@ -127,11 +128,11 @@ import(_Bindings, Params) ->
     end.
 
 do_import(Filename) ->
-    FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
+    FullFilename = fullname(Filename),
     emqx_mgmt_data_backup:import(FullFilename, "{}").
 
 download(#{filename := Filename}, _Params) ->
-    FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
+    FullFilename = fullname(Filename),
     case file:read_file(FullFilename) of
         {ok, Bin} ->
             {ok, #{filename => list_to_binary(Filename),
@@ -145,7 +146,7 @@ upload(Bindings, Params) ->
 
 do_upload(_Bindings, #{<<"filename">> := Filename,
                        <<"file">> := Bin}) ->
-    FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
+    FullFilename = fullname(Filename),
     case file:write_file(FullFilename, Bin) of
         ok ->
             minirest:return({ok, [{node, node()}]});
@@ -153,18 +154,33 @@ do_upload(_Bindings, #{<<"filename">> := Filename,
             minirest:return({error, Reason})
     end;
 do_upload(Bindings, Params = #{<<"file">> := _}) ->
-    Seconds = erlang:system_time(second),
-    {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
-    Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
-    do_upload(Bindings, Params#{<<"filename">> => Filename});
+    do_upload(Bindings, Params#{<<"filename">> => tmp_filename()});
 do_upload(_Bindings, _Params) ->
     minirest:return({error, missing_required_params}).
 
 delete(#{filename := Filename}, _Params) ->
-    FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
+    FullFilename = fullname(Filename),
     case file:delete(FullFilename) of
         ok ->
             minirest:return();
         {error, Reason} ->
             minirest:return({error, Reason})
     end.
+
+import_content(Content) ->
+    File = dump_to_tmp_file(Content),
+    do_import(File).
+
+dump_to_tmp_file(Content) ->
+    Bin = emqx_json:encode(Content),
+    Filename = tmp_filename(),
+    ok = file:write_file(fullname(Filename), Bin),
+    Filename.
+
+fullname(Name) ->
+    filename:join(emqx:get_env(data_dir), Name).
+
+tmp_filename() ->
+    Seconds = erlang:system_time(second),
+    {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
+    io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]).

+ 6 - 0
apps/emqx_management/src/emqx_mgmt_http.erl

@@ -119,12 +119,18 @@ authorize_appid(Req) ->
          _  -> false
     end.
 
+-ifdef(EMQX_ENTERPRISE).
+filter(_) ->
+    true.
+-else.
 filter(#{app := emqx_modules}) -> true;
 filter(#{app := App}) ->
     case emqx_plugins:find_plugin(App) of
         false -> false;
         Plugin -> Plugin#plugin.active
     end.
+-endif.
+
 
 format(Port) when is_integer(Port) ->
     io_lib:format("0.0.0.0:~w", [Port]);

+ 17 - 0
apps/emqx_management/test/emqx_mgmt_api_SUITE.erl

@@ -553,6 +553,23 @@ t_data(_) ->
     application:stop(emqx_dahboard),
     ok.
 
+t_data_import_content(_) ->
+    ok = emqx_rule_registry:mnesia(boot),
+    ok = emqx_dashboard_admin:mnesia(boot),
+    application:ensure_all_started(emqx_rule_engine),
+    application:ensure_all_started(emqx_dashboard),
+    {ok, Data} = request_api(post, api_path(["data","export"]), [], auth_header_(), [#{}]),
+    #{<<"filename">> := Filename} = emqx_ct_http:get_http_data(Data),
+    Dir = emqx:get_env(data_dir),
+    {ok, Bin} = file:read_file(filename:join(Dir, Filename)),
+    Content = emqx_json:decode(Bin),
+    %% TODO: enable when 5.0 if we are still using data export/import
+    %?assertMatch({ok, "{\"code\":0}"}, request_api(post, api_path(["data","import"]), [], auth_header_(), Content)),
+    ?assertMatch({ok, "{\"message\":\"5.0\",\"code\":\"unsupported_version\"}"},
+                 request_api(post, api_path(["data","import"]), [], auth_header_(), Content)),
+    application:stop(emqx_rule_engine),
+    application:stop(emqx_dahboard).
+
 request_api(Method, Url, Auth) ->
     request_api(Method, Url, [], Auth, []).
 

+ 1 - 1
apps/emqx_sn/src/emqx_sn.app.src

@@ -1,6 +1,6 @@
 {application, emqx_sn,
  [{description, "EMQ X MQTT-SN Plugin"},
-  {vsn, "4.3.1"}, % strict semver, bump manually!
+  {vsn, "4.4.0"}, % strict semver, bump manually!
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,esockd]},

+ 6 - 10
apps/emqx_sn/src/emqx_sn.appup.src

@@ -1,17 +1,13 @@
 %% -*-: erlang -*-
 {VSN,
  [
-   {"4.3.0", [
-     {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []},
-     {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]}
-   ]},
-   {<<".*">>, []}
+   {<<"4.3.[0-1]">>, [
+     {restart_application, emqx_sn}
+   ]}
  ],
  [
-   {"4.3.0", [
-     {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []},
-     {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]}
-   ]},
-   {<<".*">>, []}
+   {<<"4.3.[0-1]">>, [
+     {restart_application, emqx_sn}
+   ]}
  ]
 }.

+ 3 - 12
apps/emqx_sn/src/emqx_sn_app.erl

@@ -43,7 +43,8 @@
 start(_Type, _Args) ->
     Addr = application:get_env(emqx_sn, port, 1884),
     GwId = application:get_env(emqx_sn, gateway_id, 1),
-    {ok, Sup} = emqx_sn_sup:start_link(Addr, GwId),
+    PredefTopics = application:get_env(emqx_sn, predefined, []),
+    {ok, Sup} = emqx_sn_sup:start_link(Addr, GwId, PredefTopics),
     start_listeners(),
     {ok, Sup}.
 
@@ -57,13 +58,7 @@ stop(_State) ->
 
 -spec start_listeners() -> ok.
 start_listeners() ->
-    PredefTopics = application:get_env(emqx_sn, predefined, []),
-    ListenCfs = [begin
-                    TabName = tabname(Proto, ListenOn),
-                    {ok, RegistryPid} = emqx_sn_sup:start_registry_proc(emqx_sn_sup, TabName, PredefTopics),
-                    {Proto, ListenOn, [{registry, {TabName, RegistryPid}} | Options]}
-                 end || {Proto, ListenOn, Options} <- listeners_confs()],
-    lists:foreach(fun start_listener/1, ListenCfs).
+    lists:foreach(fun start_listener/1, listeners_confs()).
 
 -spec start_listener(listener()) -> ok.
 start_listener({Proto, ListenOn, Options}) ->
@@ -151,7 +146,3 @@ format({Addr, Port}) when is_list(Addr) ->
     io_lib:format("~s:~w", [Addr, Port]);
 format({Addr, Port}) when is_tuple(Addr) ->
     io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
-
-tabname(Proto, ListenOn) ->
-    list_to_atom(lists:flatten(["emqx_sn_registry__", atom_to_list(Proto), "_", format(ListenOn)])).
-

+ 40 - 33
apps/emqx_sn/src/emqx_sn_gateway.erl

@@ -82,7 +82,6 @@
                 sockname             :: {inet:ip_address(), inet:port()},
                 peername             :: {inet:ip_address(), inet:port()},
                 channel              :: maybe(emqx_channel:channel()),
-                registry             :: emqx_sn_registry:registry(),
                 clientid             :: maybe(binary()),
                 username             :: maybe(binary()),
                 password             :: maybe(binary()),
@@ -147,7 +146,6 @@ kick(GwPid) ->
 
 init([{_, SockPid, Sock}, Peername, Options]) ->
     GwId = proplists:get_value(gateway_id, Options),
-    Registry = proplists:get_value(registry, Options),
     Username = proplists:get_value(username, Options, undefined),
     Password = proplists:get_value(password, Options, undefined),
     EnableQos3 = proplists:get_value(enable_qos3, Options, false),
@@ -165,7 +163,6 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
                            sockname         = Sockname,
                            peername         = Peername,
                            channel          = Channel,
-                           registry         = Registry,
                            asleep_timer     = emqx_sn_asleep_timer:init(),
                            enable_stats     = EnableStats,
                            enable_qos3      = EnableQos3,
@@ -205,9 +202,9 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State =
 idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
                                                      topic_id_type = TopicIdType
                                                     }, TopicId, _MsgId, Data)},
-    State = #state{clientid = ClientId, registry = Registry}) ->
+    State = #state{clientid = ClientId}) ->
     TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of
-                    false -> emqx_sn_registry:lookup_topic(Registry, self(), TopicId);
+                    false -> emqx_sn_registry:lookup_topic(ClientId, TopicId);
                     true  -> <<TopicId:16>>
                 end,
     _ = case TopicName =/= undefined of
@@ -292,9 +289,9 @@ wait_for_will_msg(EventType, EventContent, State) ->
     handle_event(EventType, EventContent, wait_for_will_msg, State).
 
 connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
-          State = #state{clientid = ClientId, registry = Registry}) ->
+          State = #state{clientid = ClientId}) ->
     State0 =
-    case emqx_sn_registry:register_topic(Registry, self(), TopicName) of
+    case emqx_sn_registry:register_topic(ClientId, TopicName) of
         TopicId when is_integer(TopicId) ->
             ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]),
             send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State);
@@ -580,13 +577,16 @@ handle_event(EventType, EventContent, StateName, State) ->
          [StateName, {EventType, EventContent}]),
     {keep_state, State}.
 
-terminate(Reason, _StateName, #state{channel  = Channel,
-                                     registry = Registry}) ->
-    emqx_sn_registry:unregister_topic(Registry, self()),
-    case Channel =:= undefined of
-        true -> ok;
-        false -> emqx_channel:terminate(Reason, Channel)
-    end.
+terminate(Reason, _StateName, #state{channel  = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    case Reason of
+        {shutdown, takeovered} ->
+            ok;
+        _ ->
+            emqx_sn_registry:unregister_topic(ClientId)
+        end,
+    emqx_channel:terminate(Reason, Channel),
+    ok.
 
 code_change(_Vsn, StateName, State, _Extra) ->
     {ok, StateName, State}.
@@ -719,11 +719,12 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
 mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
     ?SN_UNSUBACK_MSG(MsgId);
 
-mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry}) ->
+mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel  = Channel}) ->
     NewPacketId = if QoS =:= ?QOS_0 -> 0;
                      true -> PacketId
                   end,
-    {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, self(), Topic) of
+    ClientId = emqx_channel:info(clientid, Channel),
+    {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of
                                       {predef, PredefTopicId} ->
                                           {?SN_PREDEFINED_TOPIC, PredefTopicId};
                                       TopicId when is_integer(TopicId) ->
@@ -844,14 +845,13 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
 
 do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
                                                          peername = Peername,
-                                                         registry = Registry,
                                                          channel  = Channel}) ->
     emqx_logger:set_metadata_clientid(ClientId),
     #mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags,
     NChannel = case CleanStart of
                    true ->
                        emqx_channel:terminate(normal, Channel),
-                       emqx_sn_registry:unregister_topic(Registry, self()),
+                       emqx_sn_registry:unregister_topic(ClientId),
                        emqx_channel:init(#{socktype => udp,
                                            sockname => Sockname,
                                            peername => Peername,
@@ -864,8 +864,9 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
     do_connect(ClientId, CleanStart, Will, Duration, NState).
 
 handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
-                 State=#state{registry = Registry}) ->
-    case emqx_sn_registry:register_topic(Registry, self(), TopicName) of
+                 State=#state{channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    case emqx_sn_registry:register_topic(ClientId, TopicName) of
         {error, too_large} ->
             State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
                                              ?SN_INVALID_TOPIC_ID,
@@ -879,8 +880,9 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
     end;
 
 handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId,
-                 State = #state{registry = Registry}) ->
-    case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
+                 State = #state{channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
         undefined ->
             State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
                                              TopicId,
@@ -909,8 +911,9 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) ->
     proto_unsubscribe(TopicId, MsgId, State);
 
 handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId,
-                   State = #state{registry = Registry}) ->
-    case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
+                   State = #state{channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
         undefined ->
             {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)};
         PredefinedTopic ->
@@ -932,10 +935,11 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) ->
     <<TopicId:16>> = TopicName,
     do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State);
 do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
-           State=#state{registry = Registry}) ->
+           State=#state{channel = Channel}) ->
     #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
     NewQoS = get_corrected_qos(QoS),
-    case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
+    ClientId = emqx_channel:info(clientid, Channel),
+    case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
         undefined ->
             {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID,
                 State)};
@@ -946,7 +950,7 @@ do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
 do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) ->
     #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
     NewQoS = get_corrected_qos(QoS),
-    <<TopicId:16>> = STopicName ,
+    <<TopicId:16>> = STopicName,
     case emqx_topic:wildcard(STopicName) of
         true ->
             {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_NOT_SUPPORTED,
@@ -974,12 +978,13 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) ->
     ok.
 
 do_puback(TopicId, MsgId, ReturnCode, StateName,
-          State=#state{registry = Registry}) ->
+          State=#state{channel = Channel}) ->
     case ReturnCode of
         ?SN_RC_ACCEPTED ->
             handle_incoming(?PUBACK_PACKET(MsgId), StateName, State);
         ?SN_RC_INVALID_TOPIC_ID ->
-            case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
+            ClientId = emqx_channel:info(clientid, Channel),
+            case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
                 undefined -> {keep_state, State};
                 TopicName ->
                     %%notice that this TopicName maybe normal or predefined,
@@ -1068,9 +1073,10 @@ handle_outgoing(Packets, State) when is_list(Packets) ->
     end, State, Packets);
 
 handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _),
-                State = #state{registry = Registry}) ->
+                State = #state{channel = Channel}) ->
     ?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]),
-    TopicId = emqx_sn_registry:lookup_topic_id(Registry, self(), TopicName),
+    ClientId = emqx_channel:info(clientid, Channel),
+    TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
     case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
         true -> register_and_notify_client(PubPkt, State);
         false -> send_message(mqtt2sn(PubPkt, State), State)
@@ -1094,10 +1100,11 @@ replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} =
     State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}.
 
 register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
-        State = #state{registry = Registry, pending_topic_ids = Pendings}) ->
+        State = #state{pending_topic_ids = Pendings, channel = Channel}) ->
     MsgId = message_id(PacketId),
     #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
-    TopicId = emqx_sn_registry:register_topic(Registry, self(), TopicName),
+    ClientId = emqx_channel:info(clientid, Channel),
+    TopicId = emqx_sn_registry:register_topic(ClientId, TopicName),
     ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
                 "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
     NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State),

+ 80 - 48
apps/emqx_sn/src/emqx_sn_registry.erl

@@ -23,16 +23,16 @@
 -define(LOG(Level, Format, Args),
         emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)).
 
--export([ start_link/2
-        , stop/1
+-export([ start_link/1
+        , stop/0
         ]).
 
--export([ register_topic/3
-        , unregister_topic/2
+-export([ register_topic/2
+        , unregister_topic/1
         ]).
 
--export([ lookup_topic/3
-        , lookup_topic_id/3
+-export([ lookup_topic/2
+        , lookup_topic_id/2
         ]).
 
 %% gen_server callbacks
@@ -46,25 +46,45 @@
 
 -define(TAB, ?MODULE).
 
--record(state, {tab, max_predef_topic_id = 0}).
+-record(state, {max_predef_topic_id = 0}).
 
--type(registry() :: {ets:tab(), pid()}).
+-record(emqx_sn_registry, {key, value}).
+
+%% Mnesia bootstrap
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+
+%% @doc Create or replicate tables.
+-spec(mnesia(boot | copy) -> ok).
+mnesia(boot) ->
+    %% Optimize storage
+    StoreProps = [{ets, [{read_concurrency, true}]}],
+    ok = ekka_mnesia:create_table(?MODULE, [
+            {attributes, record_info(fields, emqx_sn_registry)},
+            {ram_copies, [node()]},
+            {storage_properties, StoreProps}]);
+
+mnesia(copy) ->
+    ok = ekka_mnesia:copy_table(?MODULE, ram_copies).
 
 %%-----------------------------------------------------------------------------
 
--spec(start_link(atom(), list()) -> {ok, pid()} | ignore | {error, Reason :: term()}).
-start_link(Tab, PredefTopics) ->
-    gen_server:start_link(?MODULE, [Tab, PredefTopics], []).
+-spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}).
+start_link(PredefTopics) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []).
 
--spec(stop(registry()) -> ok).
-stop({_Tab, Pid}) ->
-    gen_server:stop(Pid, normal, infinity).
+-spec(stop() -> ok).
+stop() ->
+    gen_server:stop(?MODULE, normal, infinity).
 
--spec(register_topic(registry(), pid(), binary()) -> integer() | {error, term()}).
-register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
+-spec(register_topic(binary(), binary()) -> integer() | {error, term()}).
+register_topic(ClientId, TopicName) when is_binary(TopicName) ->
     case emqx_topic:wildcard(TopicName) of
         false ->
-            gen_server:call(Pid, {register, ClientPid, TopicName});
+            gen_server:call(?MODULE, {register, ClientId, TopicName});
         %% TopicId: in case of “accepted” the value that will be used as topic
         %% id by the gateway when sending PUBLISH messages to the client (not
         %% relevant in case of subscriptions to a short topic name or to a topic
@@ -72,22 +92,22 @@ register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
         true  -> {error, wildcard_topic}
     end.
 
--spec(lookup_topic(registry(), pid(), pos_integer()) -> undefined | binary()).
-lookup_topic({Tab, _Pid}, ClientPid, TopicId) when is_integer(TopicId) ->
-    case lookup_element(Tab, {predef, TopicId}, 2) of
+-spec(lookup_topic(binary(), pos_integer()) -> undefined | binary()).
+lookup_topic(ClientId, TopicId) when is_integer(TopicId) ->
+    case lookup_element(?TAB, {predef, TopicId}, 3) of
         undefined ->
-            lookup_element(Tab, {ClientPid, TopicId}, 2);
+            lookup_element(?TAB, {ClientId, TopicId}, 3);
         Topic -> Topic
     end.
 
--spec(lookup_topic_id(registry(), pid(), binary())
+-spec(lookup_topic_id(binary(), binary())
       -> undefined
        | pos_integer()
        | {predef, integer()}).
-lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
-    case lookup_element(Tab, {predef, TopicName}, 2) of
+lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) ->
+    case lookup_element(?TAB, {predef, TopicName}, 3) of
         undefined ->
-            lookup_element(Tab, {ClientPid, TopicName}, 2);
+            lookup_element(?TAB, {ClientId, TopicName}, 3);
         TopicId ->
             {predef, TopicId}
     end.
@@ -96,47 +116,59 @@ lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
 lookup_element(Tab, Key, Pos) ->
     try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end.
 
--spec(unregister_topic(registry(), pid()) -> ok).
-unregister_topic({_Tab, Pid}, ClientPid) ->
-    gen_server:call(Pid, {unregister, ClientPid}).
+-spec(unregister_topic(binary()) -> ok).
+unregister_topic(ClientId) ->
+    gen_server:call(?MODULE, {unregister, ClientId}).
 
 %%-----------------------------------------------------------------------------
 
-init([Tab, PredefTopics]) ->
+init([PredefTopics]) ->
     %% {predef, TopicId}     -> TopicName
     %% {predef, TopicName}   -> TopicId
-    %% {ClientPid, TopicId}   -> TopicName
-    %% {ClientPid, TopicName} -> TopicId
-    _ = ets:new(Tab, [set, public, named_table, {read_concurrency, true}]),
+    %% {ClientId, TopicId}   -> TopicName
+    %% {ClientId, TopicName} -> TopicId
     MaxPredefId = lists:foldl(
                     fun({TopicId, TopicName}, AccId) ->
-                        _ = ets:insert(Tab, {{predef, TopicId}, TopicName}),
-                        _ = ets:insert(Tab, {{predef, TopicName}, TopicId}),
+                        mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId},
+                                                             value = TopicName}),
+                        mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName},
+                                                             value = TopicId}),
                         if TopicId > AccId -> TopicId; true -> AccId end
                     end, 0, PredefTopics),
-    {ok, #state{tab = Tab, max_predef_topic_id = MaxPredefId}}.
+    {ok, #state{max_predef_topic_id = MaxPredefId}}.
 
-handle_call({register, ClientPid, TopicName}, _From,
-            State = #state{tab = Tab, max_predef_topic_id = PredefId}) ->
-    case lookup_topic_id({Tab, self()}, ClientPid, TopicName) of
+handle_call({register, ClientId, TopicName}, _From,
+            State = #state{max_predef_topic_id = PredefId}) ->
+    case lookup_topic_id(ClientId, TopicName) of
         {predef, PredefTopicId}  when is_integer(PredefTopicId) ->
             {reply, PredefTopicId, State};
         TopicId when is_integer(TopicId) ->
             {reply, TopicId, State};
         undefined ->
-            case next_topic_id(Tab, PredefId, ClientPid) of
+            case next_topic_id(?TAB, PredefId, ClientId) of
                 TopicId when TopicId >= 16#FFFF ->
                     {reply, {error, too_large}, State};
                 TopicId ->
-                    _ = ets:insert(Tab, {{ClientPid, next_topic_id}, TopicId + 1}),
-                    _ = ets:insert(Tab, {{ClientPid, TopicName}, TopicId}),
-                    _ = ets:insert(Tab, {{ClientPid, TopicId}, TopicName}),
-                    {reply, TopicId, State}
+                    Fun = fun() ->
+                        mnesia:write(#emqx_sn_registry{key = {ClientId, next_topic_id},
+                                                            value = TopicId + 1}),
+                        mnesia:write(#emqx_sn_registry{key = {ClientId, TopicName},
+                                                            value = TopicId}),
+                        mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId},
+                                                            value = TopicName})
+                    end,
+                    case mnesia:transaction(Fun) of
+                        {atomic, ok} ->
+                            {reply, TopicId, State};
+                        {aborted, Error} ->
+                            {reply, {error, Error}, State}
+                    end
             end
     end;
 
-handle_call({unregister, ClientPid}, _From, State = #state{tab = Tab}) ->
-    ets:match_delete(Tab, {{ClientPid, '_'}, '_'}),
+handle_call({unregister, ClientId}, _From, State) ->
+    Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}),
+    lists:foreach(fun(R) -> mnesia:dirty_delete_object(R) end, Registry),
     {reply, ok, State};
 
 handle_call(Req, _From, State) ->
@@ -159,8 +191,8 @@ code_change(_OldVsn, State, _Extra) ->
 
 %%-----------------------------------------------------------------------------
 
-next_topic_id(Tab, PredefId, ClientPid) ->
-    case ets:lookup(Tab, {ClientPid, next_topic_id}) of
-        [{_, Id}] -> Id;
+next_topic_id(Tab, PredefId, ClientId) ->
+    case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of
+        [#emqx_sn_registry{value = Id}] -> Id;
         []        -> PredefId + 1
     end.

+ 11 - 17
apps/emqx_sn/src/emqx_sn_sup.erl

@@ -18,32 +18,26 @@
 
 -behaviour(supervisor).
 
--export([ start_link/2
-        , start_registry_proc/3
+-export([ start_link/3
         , init/1
         ]).
 
-start_registry_proc(Sup, TabName, PredefTopics) ->
-    Registry = #{id       => TabName,
-                 start    => {emqx_sn_registry, start_link, [TabName, PredefTopics]},
-                 restart  => permanent,
-                 shutdown => 5000,
-                 type     => worker,
-                 modules  => [emqx_sn_registry]},
-    handle_ret(supervisor:start_child(Sup, Registry)).
+start_link(Addr, GwId, PredefTopics) ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, [Addr, GwId, PredefTopics]).
 
-start_link(Addr, GwId) ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, [Addr, GwId]).
-
-init([{_Ip, Port}, GwId]) ->
+init([{_Ip, Port}, GwId, PredefTopics]) ->
     Broadcast = #{id       => emqx_sn_broadcast,
                   start    => {emqx_sn_broadcast, start_link, [GwId, Port]},
                   restart  => permanent,
                   shutdown => brutal_kill,
                   type     => worker,
                   modules  => [emqx_sn_broadcast]},
-    {ok, {{one_for_one, 10, 3600}, [Broadcast]}}.
+    Registry = #{id       => emqx_sn_registry,
+                  start    => {emqx_sn_registry, start_link, [PredefTopics]},
+                  restart  => permanent,
+                  shutdown => brutal_kill,
+                  type     => worker,
+                  modules  => [emqx_sn_registry]},
+    {ok, {{one_for_one, 10, 3600}, [Broadcast, Registry]}}.
 
-handle_ret({ok, Pid, _Info}) -> {ok, Pid};
-handle_ret(Ret) -> Ret.
 

+ 2 - 1
apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl

@@ -1084,7 +1084,7 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) ->
     {ok, C} = emqtt:start_link(),
     {ok, _} = emqtt:connect(C),
     {ok, _} = emqtt:publish(C, TopicName1, Payload1, QoS),
-    timer:sleep(500),
+    timer:sleep(100),
     ok = emqtt:disconnect(C),
 
     timer:sleep(50),
@@ -1278,6 +1278,7 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
     CleanSession = 0,
     ReturnCode = 0,
     send_register_msg(Socket, TopicName_tom, MsgId1),
+    timer:sleep(50),
     TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)),
     send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1),
     ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1,

+ 59 - 65
apps/emqx_sn/test/emqx_sn_registry_SUITE.erl

@@ -16,12 +16,9 @@
 
 -module(emqx_sn_registry_SUITE).
 
--import(proplists, [get_value/2]).
-
 -compile(export_all).
 -compile(nowarn_export_all).
 
--include_lib("emqx_sn/include/emqx_sn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 -define(REGISTRY, emqx_sn_registry).
@@ -44,84 +41,81 @@ end_per_suite(_Config) ->
     ok.
 
 init_per_testcase(_TestCase, Config) ->
+    ekka_mnesia:start(),
+    emqx_sn_registry:mnesia(boot),
+    mnesia:clear_table(emqx_sn_registry),
     PredefTopics = application:get_env(emqx_sn, predefined, []),
-    TabName = emqx_sn_registry,
-    {ok, Pid} = ?REGISTRY:start_link(TabName, PredefTopics),
-    [{registray, {TabName, Pid}} | Config].
+    {ok, _Pid} = ?REGISTRY:start_link(PredefTopics),
+    Config.
 
 end_per_testcase(_TestCase, Config) ->
-    ?REGISTRY:stop(get_value(registray, Config)),
+    ?REGISTRY:stop(),
     Config.
 
 %%--------------------------------------------------------------------
 %% Test cases
 %%--------------------------------------------------------------------
 
-t_register(Config) ->
-    Registry = get_value(registray, Config),
-    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic1">>)),
-    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic2">>)),
-    ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
-    ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
-    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)),
-    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)),
-    emqx_sn_registry:unregister_topic(Registry, <<"ClientId">>),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)).
-
-t_register_case2(Config) ->
-    Registry = get_value(registray, Config),
-    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic1">>)),
-    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic2">>)),
-    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic1">>)),
-    ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
-    ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
-    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)),
-    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic3">>)),
-    ?REGISTRY:unregister_topic(Registry, <<"ClientId">>),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)).
-
-t_reach_maximum(Config) ->
-    Registry = get_value(registray, Config),
-    register_a_lot(Registry, ?MAX_PREDEF_ID+1, 16#ffff),
-    ?assertEqual({error, too_large}, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicABC">>)),
+t_register(_Config) ->
+    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)),
+    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)),
+    ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
+    ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
+    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)),
+    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)),
+    emqx_sn_registry:unregister_topic(<<"ClientId">>),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)).
+
+t_register_case2(_Config) ->
+    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)),
+    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)),
+    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)),
+    ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
+    ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
+    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)),
+    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic3">>)),
+    ?REGISTRY:unregister_topic(<<"ClientId">>),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)).
+
+t_reach_maximum(_Config) ->
+    register_a_lot(?MAX_PREDEF_ID+1, 16#ffff),
+    ?assertEqual({error, too_large}, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicABC">>)),
     Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+1])),
     Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+2])),
-    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic1)),
-    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic2)),
-    ?REGISTRY:unregister_topic(Registry, <<"ClientId">>),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic1)),
-    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic2)).
-
-t_register_case4(Config) ->
-    Registry = get_value(registray, Config),
-    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicA">>)),
-    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicB">>)),
-    ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicC">>)),
-    ?REGISTRY:unregister_topic(Registry, <<"ClientId">>),
-    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicD">>)).
-
-t_deny_wildcard_topic(Config) ->
-    Registry = get_value(registray, Config),
-    ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"/TopicA/#">>)),
-    ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"/+/TopicB">>)).
+    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)),
+    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)),
+    ?REGISTRY:unregister_topic(<<"ClientId">>),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)),
+    ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)).
+
+t_register_case4(_Config) ->
+    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicA">>)),
+    ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicB">>)),
+    ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicC">>)),
+    ?REGISTRY:unregister_topic(<<"ClientId">>),
+    ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicD">>)).
+
+t_deny_wildcard_topic(_Config) ->
+    ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/TopicA/#">>)),
+    ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/+/TopicB">>)).
 
 %%--------------------------------------------------------------------
 %% Helper funcs
 %%--------------------------------------------------------------------
 
-register_a_lot(_, Max, Max) ->
+register_a_lot(Max, Max) ->
     ok;
-register_a_lot(Registry, N, Max) when N < Max ->
+register_a_lot(N, Max) when N < Max ->
     Topic = iolist_to_binary(["Topic", integer_to_list(N)]),
-    ?assertEqual(N, ?REGISTRY:register_topic(Registry, <<"ClientId">>, Topic)),
-    register_a_lot(Registry, N+1, Max).
+    ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)),
+    register_a_lot(N+1, Max).
 

+ 1 - 1
apps/emqx_web_hook/src/emqx_web_hook.app.src

@@ -1,6 +1,6 @@
 {application, emqx_web_hook,
  [{description, "EMQ X WebHook Plugin"},
-  {vsn, "4.3.0"}, % strict semver, bump manually!
+  {vsn, "4.3.1"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_web_hook_sup]},
   {applications, [kernel,stdlib,ehttpc]},

+ 6 - 0
apps/emqx_web_hook/src/emqx_web_hook.appup.src

@@ -2,9 +2,15 @@
 
 {VSN,
   [
+    {"4.3.0", [
+     {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
+   ]},
     {<<".*">>, []}
   ],
   [
+    {"4.3.0", [
+     {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
+    ]},
     {<<".*">>, []}
   ]
 }.

+ 1 - 1
apps/emqx_web_hook/src/emqx_web_hook_actions.erl

@@ -353,7 +353,7 @@ pool_name(ResId) ->
     list_to_atom("webhook:" ++ str(ResId)).
 
 get_ssl_opts(Opts, ResId) ->
-    [{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Opts, "rules", ResId)}].
+    emqx_plugin_libs_ssl:save_files_return_opts(Opts, "rules", ResId).
 
 test_http_connect(Conf) ->
     Url = fun() -> maps:get(<<"url">>, Conf) end,

+ 0 - 1
bin/nodetool

@@ -63,7 +63,6 @@ main(Args) ->
             %% a "pong"
             io:format("pong\n");
         ["stop"] ->
-            rpc:call(TargetNode, emqx_plugins, unload, [], 60000),
             io:format("~p\n", [rpc:call(TargetNode, init, stop, [], 60000)]);
         ["restart", "-config", ConfigFile | _RestArgs1] ->
             io:format("~p\n", [rpc:call(TargetNode, emqx, restart, [ConfigFile], 60000)]);

+ 1 - 0
deploy/charts/emqx/README.md

@@ -37,6 +37,7 @@ The following table lists the configurable parameters of the emqx chart and thei
 | `image.repository` | EMQ X Image name |emqx/emqx|
 | `image.pullPolicy`  | The image pull policy  |IfNotPresent|
 | `image.pullSecrets `  | The image pull secrets  |`[]` (does not add image pull secrets to deployed pods)|
+| `recreatePods` | Forces the recreation of pods during upgrades, which can be useful to always apply the most recent configuration. | false |
 | `persistence.enabled` | Enable EMQX persistence using PVC |false|
 | `persistence.storageClass` | Storage class of backing PVC |`nil` (uses alpha storage class annotation)|
 | `persistence.existingClaim` | EMQ X data Persistent Volume existing claim name, evaluated as a template |""|

+ 4 - 0
deploy/charts/emqx/templates/StatefulSet.yaml

@@ -47,6 +47,10 @@ spec:
         version: {{ .Chart.AppVersion }}
         app.kubernetes.io/name: {{ include "emqx.name" . }}
         app.kubernetes.io/instance: {{ .Release.Name }}
+      {{- if .Values.recreatePods }}
+      annotations:
+        checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum | quote }}
+      {{- end }}
     spec:
       volumes:
       - name: emqx-loaded-plugins

+ 8 - 0
deploy/charts/emqx/templates/rbac.yaml

@@ -5,7 +5,11 @@ metadata:
   name: {{ include "emqx.fullname" . }}
 ---
 kind: Role
+{{- if semverCompare ">=1.17-0" .Capabilities.KubeVersion.GitVersion }}
+apiVersion: rbac.authorization.k8s.io/v1
+{{- else }}
 apiVersion: rbac.authorization.k8s.io/v1beta1
+{{- end }}
 metadata:
   namespace: {{ .Release.Namespace }}
   name: {{ include "emqx.fullname" . }}
@@ -20,7 +24,11 @@ rules:
   - list
 ---
 kind: RoleBinding
+{{- if semverCompare ">=1.17-0" .Capabilities.KubeVersion.GitVersion }}
+apiVersion: rbac.authorization.k8s.io/v1
+{{- else }}
 apiVersion: rbac.authorization.k8s.io/v1beta1
+{{- end }}
 metadata:
   namespace: {{ .Release.Namespace }}
   name: {{ include "emqx.fullname" . }}

+ 3 - 0
deploy/charts/emqx/values.yaml

@@ -14,6 +14,9 @@ image:
   # pullSecrets:
   # - myRegistryKeySecretName
 
+## Forces the recreation of pods during helm upgrades. This can be useful to update configuration values even if the container image did not change.
+recreatePods: false
+
 persistence:
   enabled: false
   size: 20Mi

+ 7 - 0
etc/emqx.conf

@@ -453,6 +453,13 @@ log.file = emqx.log
 ## Default: No Limit
 #log.chars_limit = 8192
 
+## Maximum depth for Erlang term log formatting
+## and Erlang process message queue inspection.
+##
+## Value: Integer or 'unlimited' (without quotes)
+## Default: 20
+#log.max_depth = 20
+
 ## Log formatter
 ## Value: text | json
 #log.formatter = text

+ 1 - 1
rebar.config

@@ -50,7 +50,7 @@
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
     , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
-    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}
+    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}
     , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
     , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1

+ 75 - 0
scripts/update_appup.escript

@@ -0,0 +1,75 @@
+#!/usr/bin/env -S escript -c
+%% A script that adds changed modules to the corresponding appup files
+
+main(_Args) ->
+    ChangedFiles = string:lexemes(os:cmd("git diff --name-only origin/master..HEAD"), "\n"),
+    AppModules0 = lists:filtermap(fun filter_erlang_modules/1, ChangedFiles),
+    %% emqx_app must always be included as we bump version number in emqx_release.hrl for each release
+    AppModules1 = [{emqx, emqx_app} | AppModules0],
+    AppModules = group_modules(AppModules1),
+    io:format("Changed modules: ~p~n", [AppModules]),
+    _ = maps:map(fun process_app/2, AppModules),
+    ok.
+
+process_app(App, Modules) ->
+    AppupFiles = filelib:wildcard(lists:concat(["{src,apps,lib-*}/**/", App, ".appup.src"])),
+    case AppupFiles of
+        [AppupFile] ->
+          update_appup(AppupFile, Modules);
+        []          ->
+          io:format("~nWARNING: Please create an stub appup src file for ~p~n", [App])
+    end.
+
+filter_erlang_modules(Filename) ->
+    case lists:reverse(filename:split(Filename)) of
+        [Module, "src"] ->
+            erl_basename("emqx", Module);
+        [Module, "src", App|_] ->
+            erl_basename(App, Module);
+        [Module, _, "src", App|_] ->
+            erl_basename(App, Module);
+        _ ->
+            false
+    end.
+
+erl_basename(App, Name) ->
+    case filename:basename(Name, ".erl") of
+        Name   -> false;
+        Module -> {true, {list_to_atom(App), list_to_atom(Module)}}
+    end.
+
+group_modules(L) ->
+    lists:foldl(fun({App, Mod}, Acc) ->
+                        maps:update_with(App, fun(Tl) -> [Mod|Tl] end, [Mod], Acc)
+                end, #{}, L).
+
+update_appup(File, Modules) ->
+    io:format("~nUpdating appup: ~p~n", [File]),
+    {_, Upgrade0, Downgrade0} = read_appup(File),
+    Upgrade = update_actions(Modules, Upgrade0),
+    Downgrade = update_actions(Modules, Downgrade0),
+    IOList = io_lib:format("%% -*- mode: erlang -*-
+{VSN,~n  ~p,~n  ~p}.~n", [Upgrade, Downgrade]),
+    ok = file:write_file(File, IOList).
+
+update_actions(Modules, Versions) ->
+    lists:map(fun(L) -> do_update_actions(Modules, L) end, Versions).
+
+do_update_actions(_, Ret = {<<".*">>, _}) ->
+    Ret;
+do_update_actions(Modules, {Vsn, Actions}) ->
+    {Vsn, add_modules(Modules, Actions)}.
+
+add_modules(NewModules, OldActions) ->
+    OldModules = lists:map(fun(It) -> element(2, It) end, OldActions),
+    Modules = NewModules -- OldModules,
+    OldActions ++ [{load_module, M, brutal_purge, soft_purge, []} || M <- Modules].
+
+read_appup(File) ->
+    {ok, Bin0} = file:read_file(File),
+    %% Hack:
+    Bin1 = re:replace(Bin0, "VSN", "\"VSN\""),
+    TmpFile = filename:join("/tmp", filename:basename(File)),
+    ok = file:write_file(TmpFile, Bin1),
+    {ok, [Terms]} = file:consult(TmpFile),
+    Terms.

+ 67 - 74
src/emqx.appup.src

@@ -1,76 +1,69 @@
 %% -*- mode: erlang -*-
 {VSN,
- [
-   {"4.3.2", [
-     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
-   ]},
-   {"4.3.1", [
-     {load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
-     {load_module, emqx_connection, brutal_purge, soft_purge, []},
-     {load_module, emqx_frame, brutal_purge, soft_purge, []},
-     {load_module, emqx_cm, brutal_purge, soft_purge, []},
-     {load_module, emqx_congestion, brutal_purge, soft_purge, []},
-     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
-     {load_module, emqx_channel, brutal_purge, soft_purge, []},
-     {load_module, emqx_app, brutal_purge, soft_purge, []},
-     {load_module, emqx_plugins, brutal_purge, soft_purge, []},
-     {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []},
-     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
-   ]},
-   {"4.3.0", [
-     {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
-     {load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
-     {load_module, emqx_congestion, brutal_purge, soft_purge, []},
-     {load_module, emqx_connection, brutal_purge, soft_purge, []},
-     {load_module, emqx_frame, brutal_purge, soft_purge, []},
-     {load_module, emqx_trie, brutal_purge, soft_purge, []},
-     {load_module, emqx_cm, brutal_purge, soft_purge, []},
-     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
-     {load_module, emqx_channel, brutal_purge, soft_purge, []},
-     {load_module, emqx_app, brutal_purge, soft_purge, []},
-     {load_module, emqx_plugins, brutal_purge, soft_purge, []},
-     {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []},
-     {load_module, emqx_metrics, brutal_purge, soft_purge, []},
-     {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}},
-     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
-   ]},
-   {<<".*">>, []}
- ],
- [
-   {"4.3.2", [
-     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
-   ]},
-   {"4.3.1", [
-     {load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
-     {load_module, emqx_connection, brutal_purge, soft_purge, []},
-     {load_module, emqx_frame, brutal_purge, soft_purge, []},
-     {load_module, emqx_cm, brutal_purge, soft_purge, []},
-     {load_module, emqx_congestion, brutal_purge, soft_purge, []},
-     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
-     {load_module, emqx_channel, brutal_purge, soft_purge, []},
-     {load_module, emqx_app, brutal_purge, soft_purge, []},
-     {load_module, emqx_plugins, brutal_purge, soft_purge, []},
-     {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []},
-     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
-   ]},
-   {"4.3.0", [
-     {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
-     {load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
-     {load_module, emqx_connection, brutal_purge, soft_purge, []},
-     {load_module, emqx_congestion, brutal_purge, soft_purge, []},
-     {load_module, emqx_frame, brutal_purge, soft_purge, []},
-     {load_module, emqx_trie, brutal_purge, soft_purge, []},
-     {load_module, emqx_cm, brutal_purge, soft_purge, []},
-     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
-     {load_module, emqx_channel, brutal_purge, soft_purge, []},
-     {load_module, emqx_app, brutal_purge, soft_purge, []},
-     {load_module, emqx_plugins, brutal_purge, soft_purge, []},
-     {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []},
-     %% Just load the module. We don't need to change the 'messages.retained'
-     %% and 'messages.retained' counter type.
-     {load_module, emqx_metrics, brutal_purge, soft_purge, []},
-     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
-   ]},
-   {<<".*">>, []}
- ]
-}.
+  [{"4.3.2",
+    [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]},
+     {load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
+   {"4.3.1",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_frame,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_congestion,brutal_purge,soft_purge,[]},
+     {load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]},
+     {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
+     {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
+     {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
+   {"4.3.0",
+    [{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_congestion,brutal_purge,soft_purge,[]},
+     {load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_frame,brutal_purge,soft_purge,[]},
+     {load_module,emqx_trie,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]},
+     {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
+     {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
+     {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
+     {apply,{emqx_metrics,upgrade_retained_delayed_counter_type,[]}},
+     {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
+   {<<".*">>,[]}],
+  [{"4.3.2",
+    [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]},
+     {load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
+   {"4.3.1",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_frame,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_congestion,brutal_purge,soft_purge,[]},
+     {load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]},
+     {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
+     {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
+     {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
+   {"4.3.0",
+    [{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_congestion,brutal_purge,soft_purge,[]},
+     {load_module,emqx_frame,brutal_purge,soft_purge,[]},
+     {load_module,emqx_trie,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]},
+     {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
+     {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
+     {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
+     {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
+   {<<".*">>,[]}]}.

+ 3 - 3
src/emqx_channel.erl

@@ -407,7 +407,8 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
     case emqx_packet:check(Packet) of
         ok ->
             TopicFilters0 = parse_topic_filters(TopicFilters),
-            TupleTopicFilters0 = check_sub_acls(TopicFilters0, Channel),
+            TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0),
+            TupleTopicFilters0 = check_sub_acls(TopicFilters1, Channel),
             case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso
                  lists:any(fun({_TopicFilter, ReasonCode}) ->
                                     ReasonCode =:= ?RC_NOT_AUTHORIZED
@@ -419,8 +420,7 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
                                       _Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More);
                                 _Fun(TupleList, []) -> TupleList
                               end,
-                    TopicFilters1 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0],
-                    TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1),
+                    TopicFilters2 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0],
                     TopicFilters3 = run_hooks('client.subscribe',
                                               [ClientInfo, Properties],
                                               TopicFilters2),

+ 1 - 0
src/emqx_connection.erl

@@ -349,6 +349,7 @@ ensure_stats_timer(_Timeout, State) -> State.
 
 -compile({inline, [cancel_stats_timer/1]}).
 cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) ->
+    ?tp(debug, cancel_stats_timer, #{}),
     ok = emqx_misc:cancel_timer(TRef),
     State#state{stats_timer = undefined};
 cancel_stats_timer(State) -> State.

+ 7 - 2
src/emqx_plugins.erl

@@ -34,6 +34,8 @@
         , apply_configs/1
         ]).
 
+-export([funlog/2]).
+
 -ifdef(TEST).
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -267,8 +269,7 @@ do_generate_configs(App) ->
         true ->
             Schema = cuttlefish_schema:files([SchemaFile]),
             Conf = cuttlefish_conf:file(ConfFile),
-            LogFun = fun(Key, Value) -> ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]) end,
-            cuttlefish_generator:map(Schema, Conf, undefined, LogFun);
+            cuttlefish_generator:map(Schema, Conf, undefined, fun ?MODULE:funlog/2);
         false ->
             error({schema_not_found, SchemaFile})
     end.
@@ -411,3 +412,7 @@ plugin_type(protocol) -> protocol;
 plugin_type(backend) -> backend;
 plugin_type(bridge) -> bridge;
 plugin_type(_) -> feature.
+
+
+funlog(Key, Value) ->
+    ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]).

+ 34 - 7
test/mqtt_protocol_v5_SUITE.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(mqtt_protocol_v5_SUITE).
+-module(emqx_mqtt_protocol_v5_SUITE).
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -22,6 +22,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -import(lists, [nth/2]).
 
@@ -37,6 +38,7 @@ init_per_suite(Config) ->
     %% Meck emqtt
     ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
     %% Start Apps
+    emqx_ct_helpers:boot_modules(all),
     emqx_ct_helpers:start_apps([]),
     Config.
 
@@ -44,6 +46,19 @@ end_per_suite(_Config) ->
     ok = meck:unload(emqtt),
     emqx_ct_helpers:stop_apps([]).
 
+init_per_testcase(TestCase, Config) ->
+    case erlang:function_exported(?MODULE, TestCase, 2) of
+        true -> ?MODULE:TestCase(init, Config);
+        _ -> Config
+    end.
+
+end_per_testcase(TestCase, Config) ->
+    case erlang:function_exported(?MODULE, TestCase, 2) of
+        true -> ?MODULE:TestCase('end', Config);
+        false -> ok
+    end,
+    Config.
+
 %%--------------------------------------------------------------------
 %% Helpers
 %%--------------------------------------------------------------------
@@ -273,21 +288,33 @@ t_connect_limit_timeout(_) ->
     emqx_zone:set_env(external, publish_limit, undefined),
     meck:unload(proplists).
 
-t_connect_emit_stats_timeout(_) ->
-    IdleTimeout = 2000,
-    emqx_zone:set_env(external, idle_timeout, IdleTimeout),
-
+t_connect_emit_stats_timeout(init, Config) ->
+    NewIdleTimeout = 1000,
+    OldIdleTimeout = emqx_zone:get_env(external, idle_timeout),
+    emqx_zone:set_env(external, idle_timeout, NewIdleTimeout),
+    ok = snabbkaffe:start_trace(),
+    [{idle_timeout, NewIdleTimeout}, {old_idle_timeout, OldIdleTimeout} | Config];
+t_connect_emit_stats_timeout('end', Config) ->
+    snabbkaffe:stop(),
+    {_, OldIdleTimeout} = lists:keyfind(old_idle_timeout, 1, Config),
+    emqx_zone:set_env(external, idle_timeout, OldIdleTimeout),
+    ok.
+
+t_connect_emit_stats_timeout(Config) ->
+    {_, IdleTimeout} = lists:keyfind(idle_timeout, 1, Config),
     {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
     {ok, _} = emqtt:connect(Client),
     [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
-
     ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
-    timer:sleep(IdleTimeout),
+    ?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0),
     ?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
     ok = emqtt:disconnect(Client).
 
 %% [MQTT-3.1.2-22]
 t_connect_keepalive_timeout(_) ->
+    %% Prevent the emqtt client bringing us down on the disconnect.
+    process_flag(trap_exit, true),
+
     Keepalive = 2,
 
     {ok, Client} = emqtt:start_link([{proto_ver, v5},