Просмотр исходного кода

Merge branch 'main-v4.3' into main-v4.3

xiangfangyang-tech 4 лет назад
Родитель
Сommit
2ce592040e
41 измененных файлов с 1596 добавлено и 470 удалено
  1. 14 0
      .ci/acl_migration_test/build.sh
  2. 15 0
      .ci/acl_migration_test/prepare.sh
  3. 17 0
      .ci/acl_migration_test/suite.sh
  4. 121 0
      .ci/acl_migration_test/test.sh
  5. 2 7
      .ci/fvt_tests/relup.lux
  6. 22 0
      .github/workflows/run_acl_migration_tests.yaml
  7. 1 1
      .tool-versions
  8. 33 7
      apps/emqx_auth_mnesia/include/emqx_auth_mnesia.hrl
  9. 8 17
      apps/emqx_auth_mnesia/src/emqx_acl_mnesia.erl
  10. 16 18
      apps/emqx_auth_mnesia/src/emqx_acl_mnesia_api.erl
  11. 12 125
      apps/emqx_auth_mnesia/src/emqx_acl_mnesia_cli.erl
  12. 339 0
      apps/emqx_auth_mnesia/src/emqx_acl_mnesia_db.erl
  13. 215 0
      apps/emqx_auth_mnesia/src/emqx_acl_mnesia_migrator.erl
  14. 1 1
      apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src
  15. 21 20
      apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src
  16. 9 18
      apps/emqx_auth_mnesia/src/emqx_auth_mnesia_api.erl
  17. 13 1
      apps/emqx_auth_mnesia/src/emqx_auth_mnesia_sup.erl
  18. 188 42
      apps/emqx_auth_mnesia/test/emqx_acl_mnesia_SUITE.erl
  19. 1 1
      apps/emqx_lwm2m/etc/emqx_lwm2m.conf
  20. 9 8
      apps/emqx_lwm2m/priv/emqx_lwm2m.schema
  21. 1 1
      apps/emqx_lwm2m/src/emqx_lwm2m.app.src
  22. 7 5
      apps/emqx_lwm2m/src/emqx_lwm2m.appup.src
  23. 1 1
      apps/emqx_management/src/emqx_management.app.src
  24. 2 2
      apps/emqx_management/src/emqx_management.appup.src
  25. 9 1
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  26. 1 1
      apps/emqx_management/src/emqx_mgmt_api_pubsub.erl
  27. 10 11
      apps/emqx_management/src/emqx_mgmt_data_backup.erl
  28. 41 15
      apps/emqx_management/test/emqx_auth_mnesia_migration_SUITE.erl
  29. 15 2
      apps/emqx_management/test/emqx_mgmt_api_SUITE.erl
  30. 19 27
      apps/emqx_web_hook/src/emqx_web_hook.appup.src
  31. 16 6
      bin/emqx
  32. 10 0
      etc/emqx.conf
  33. 2 2
      lib-ce/emqx_dashboard/src/emqx_dashboard.appup.src
  34. 1 1
      rebar.config
  35. 1 1
      scripts/apps-version-check.sh
  36. 106 0
      scripts/one-more-emqx-ee.sh
  37. 102 0
      scripts/one-more-emqx.sh
  38. 95 39
      scripts/update_appup.escript
  39. 98 87
      src/emqx.appup.src
  40. 1 1
      src/emqx_ws_connection.erl
  41. 1 1
      test/emqx_ws_connection_SUITE.erl

+ 14 - 0
.ci/acl_migration_test/build.sh

@@ -0,0 +1,14 @@
+#!/bin/bash
+
+set -xe
+
+cd "$EMQX_PATH"
+
+rm -rf _build _upgrade_base
+
+mkdir _upgrade_base
+pushd _upgrade_base
+    wget "https://s3-us-west-2.amazonaws.com/packages.emqx/emqx-ce/v${EMQX_BASE}/emqx-ubuntu20.04-${EMQX_BASE}-amd64.zip"
+popd
+
+make emqx-zip

+ 15 - 0
.ci/acl_migration_test/prepare.sh

@@ -0,0 +1,15 @@
+#!/bin/bash
+
+set -xe
+
+mkdir -p "$TEST_PATH"
+cd "$TEST_PATH"
+
+cp ../"$EMQX_PATH"/_upgrade_base/*.zip ./
+unzip ./*.zip
+
+cp ../"$EMQX_PATH"/_packages/emqx/*.zip ./emqx/releases/
+
+git clone --depth 1 https://github.com/terry-xiaoyu/one_more_emqx.git
+
+./one_more_emqx/one_more_emqx.sh emqx2

+ 17 - 0
.ci/acl_migration_test/suite.sh

@@ -0,0 +1,17 @@
+#!/bin/bash
+
+set -xe
+
+export EMQX_PATH="$1"
+export EMQX_BASE="$2"
+
+export TEST_PATH="emqx_test"
+
+./build.sh
+
+VERSION=$("$EMQX_PATH"/pkg-vsn.sh)
+export VERSION
+
+./prepare.sh
+
+./test.sh

+ 121 - 0
.ci/acl_migration_test/test.sh

@@ -0,0 +1,121 @@
+#!/bin/bash
+
+set -e
+
+EMQX_ENDPOINT="http://localhost:8081/api/v4/acl"
+EMQX2_ENDPOINT="http://localhost:8917/api/v4/acl"
+
+function run() {
+    emqx="$1"
+    shift
+
+    echo "[$emqx]" "$@"
+
+    pushd "$TEST_PATH/$emqx"
+        "$@"
+    popd
+}
+
+function post_rule() {
+    endpoint="$1"
+    rule="$2"
+    echo -n "->($endpoint) "
+    curl -s -u admin:public -X POST "$endpoint" -d "$rule"
+    echo
+}
+
+function verify_clientid_rule() {
+    endpoint="$1"
+    id="$2"
+    echo -n "<-($endpoint) "
+    curl -s -u admin:public "$endpoint/clientid/$id" | grep "$id" || (echo "verify rule for client $id failed" && return 1)
+}
+
+# Run nodes
+
+run emqx ./bin/emqx start
+run emqx2 ./bin/emqx start
+
+run emqx ./bin/emqx_ctl plugins load emqx_auth_mnesia
+run emqx2 ./bin/emqx_ctl plugins load emqx_auth_mnesia
+
+run emqx2 ./bin/emqx_ctl cluster join 'emqx@127.0.0.1'
+
+# Add ACL rule to unupgraded EMQX nodes
+
+post_rule "$EMQX_ENDPOINT" '{"clientid": "CLIENT1_A","topic": "t", "action": "pub", "access": "allow"}'
+post_rule "$EMQX2_ENDPOINT" '{"clientid": "CLIENT1_B","topic": "t", "action": "pub", "access": "allow"}'
+
+# Upgrade emqx2 node
+
+run emqx2 ./bin/emqx install "$VERSION"
+sleep 60
+
+# Verify upgrade blocked
+
+run emqx2 ./bin/emqx eval 'emqx_acl_mnesia_migrator:is_old_table_migrated().' | grep false || (echo "emqx2 shouldn't have migrated" && exit 1)
+
+# Verify old rules on both nodes
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_B'
+
+# Add ACL on OLD and NEW node, verify on all nodes
+
+post_rule "$EMQX_ENDPOINT" '{"clientid": "CLIENT2_A","topic": "t", "action": "pub", "access": "allow"}'
+post_rule "$EMQX2_ENDPOINT" '{"clientid": "CLIENT2_B","topic": "t", "action": "pub", "access": "allow"}'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_B'
+
+# Upgrade emqx node
+
+run emqx ./bin/emqx install "$VERSION"
+
+# Wait for upgrade
+
+sleep 60
+
+# Verify if upgrade occured
+
+run emqx ./bin/emqx eval 'emqx_acl_mnesia_migrator:is_old_table_migrated().' | grep true || (echo "emqx should have migrated" && exit 1)
+run emqx2 ./bin/emqx eval 'emqx_acl_mnesia_migrator:is_old_table_migrated().' | grep true || (echo "emqx2 should have migrated" && exit 1)
+
+# Verify rules are kept
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_B'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_B'
+
+# Add ACL on OLD and NEW node, verify on all nodes
+
+post_rule "$EMQX_ENDPOINT" '{"clientid": "CLIENT3_A","topic": "t", "action": "pub", "access": "allow"}'
+post_rule "$EMQX2_ENDPOINT" '{"clientid": "CLIENT3_B","topic": "t", "action": "pub", "access": "allow"}'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT3_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT3_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT3_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT3_B'
+
+# Stop nodes
+
+run emqx ./bin/emqx stop
+run emqx2 ./bin/emqx stop
+
+echo "Success!"
+

+ 2 - 7
.ci/fvt_tests/relup.lux

@@ -23,10 +23,7 @@
     ?SH-PROMPT
 
     !cd emqx
-    !export EMQX_LOG__CONSOLE_HANDLER__ENABLE=true
-    !export EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug
-    !export EMQX_LOG__PRIMARY_LEVEL=debug
-    !export EMQX_ZONES__DEFAULT__LISTENERS__MQTT_WSS__BIND="0.0.0.0:8085"
+    !export EMQX_LOG__LEVEL=debug
 
     !./bin/emqx start
     ?EMQ X .* is started successfully!
@@ -39,9 +36,7 @@
     ?SH-PROMPT
     !cd emqx2
 
-    !export EMQX_LOG__CONSOLE_HANDLER__ENABLE=true
-    !export EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug
-    !export EMQX_LOG__PRIMARY_LEVEL=debug
+    !export EMQX_LOG__LEVEL=debug
 
     !./bin/emqx start
     ?EMQ X .* is started successfully!

+ 22 - 0
.github/workflows/run_acl_migration_tests.yaml

@@ -0,0 +1,22 @@
+name: ACL fix & migration integration tests
+
+on: workflow_dispatch
+
+jobs:
+    test:
+        runs-on: ubuntu-20.04
+        container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
+        strategy:
+            fail-fast: true
+        env:
+            BASE_VERSION: "4.3.0"
+        steps:
+        - uses: actions/checkout@v2
+          with:
+            path: emqx
+        - name: Prepare scripts
+          run: |
+            cp ./emqx/.ci/acl_migration_test/*.sh ./
+        - name: Run tests
+          run: |
+            ./suite.sh emqx "$BASE_VERSION"

+ 1 - 1
.tool-versions

@@ -1 +1 @@
-erlang 24.0.1-emqx-1
+erlang 23.2.7.2-emqx-2

+ 33 - 7
apps/emqx_auth_mnesia/include/emqx_auth_mnesia.hrl

@@ -1,21 +1,47 @@
 -define(APP, emqx_auth_mnesia).
 
--type(login():: {clientid, binary()}
+-type(login() :: {clientid, binary()}
               | {username, binary()}).
 
+-type(acl_target() :: login() | all).
+
+-type(acl_target_type() :: clientid | username | all).
+
+-type(access():: allow | deny).
+-type(action():: pub | sub).
+-type(legacy_action():: action() | pubsub).
+-type(created_at():: integer()).
+
 -record(emqx_user, {
           login :: login(),
           password :: binary(),
-          created_at :: integer()
+          created_at :: created_at()
         }).
 
--record(emqx_acl, {
-          filter:: {login() | all, emqx_topic:topic()},
-          action :: pub | sub | pubsub,
-          access :: allow | deny,
-          created_at :: integer()
+-define(ACL_TABLE, emqx_acl).
+
+-define(MIGRATION_MARK_KEY, emqx_acl2_migration_started).
+
+-record(?ACL_TABLE, {
+          filter :: {acl_target(), emqx_topic:topic()} | ?MIGRATION_MARK_KEY,
+          action :: legacy_action(),
+          access :: access(),
+          created_at :: created_at()
          }).
 
+-define(MIGRATION_MARK_RECORD, #?ACL_TABLE{filter = ?MIGRATION_MARK_KEY, action = pub, access = deny, created_at = 0}).
+
+-type(rule() :: {access(), action(), emqx_topic:topic(), created_at()}).
+
+-define(ACL_TABLE2, emqx_acl2).
+
+-record(?ACL_TABLE2, {
+          who :: acl_target(),
+          rules :: [ rule() ]
+         }).
+
+-type(acl_record() :: {acl_target(), emqx_topic:topic(), action(), access(), created_at()}).
+
 -record(auth_metrics, {
         success = 'client.auth.success',
         failure = 'client.auth.failure',

+ 8 - 17
apps/emqx_auth_mnesia/src/emqx_acl_mnesia.erl

@@ -18,24 +18,16 @@
 
 -include("emqx_auth_mnesia.hrl").
 
--include_lib("stdlib/include/ms_transform.hrl").
-
--define(TABLE, emqx_acl).
-
 %% ACL Callbacks
 -export([ init/0
         , register_metrics/0
         , check_acl/5
         , description/0
-        ]).
+       ]).
 
 init() ->
-    ok = ekka_mnesia:create_table(emqx_acl, [
-            {type, bag},
-            {disc_copies, [node()]},
-            {attributes, record_info(fields, emqx_acl)},
-            {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
-    ok = ekka_mnesia:copy_table(emqx_acl, disc_copies).
+    ok = emqx_acl_mnesia_db:create_table(),
+    ok = emqx_acl_mnesia_db:create_table2().
 
 -spec(register_metrics() -> ok).
 register_metrics() ->
@@ -46,12 +38,12 @@ check_acl(ClientInfo = #{ clientid := Clientid }, PubSub, Topic, _NoMatchAction,
 
     Acls = case Username of
                undefined ->
-                   emqx_acl_mnesia_cli:lookup_acl({clientid, Clientid}) ++
-                   emqx_acl_mnesia_cli:lookup_acl(all);
+                   emqx_acl_mnesia_db:lookup_acl({clientid, Clientid}) ++
+                   emqx_acl_mnesia_db:lookup_acl(all);
                _ ->
-                   emqx_acl_mnesia_cli:lookup_acl({clientid, Clientid}) ++
-                   emqx_acl_mnesia_cli:lookup_acl({username, Username}) ++
-                   emqx_acl_mnesia_cli:lookup_acl(all)
+                   emqx_acl_mnesia_db:lookup_acl({clientid, Clientid}) ++
+                   emqx_acl_mnesia_db:lookup_acl({username, Username}) ++
+                   emqx_acl_mnesia_db:lookup_acl(all)
            end,
 
     case match(ClientInfo, PubSub, Topic, Acls) of
@@ -83,7 +75,6 @@ match(ClientInfo, PubSub, Topic, [ {_, ACLTopic, Action, Access, _} | Acls]) ->
 match_topic(ClientInfo, Topic, ACLTopic) when is_binary(Topic) ->
     emqx_topic:match(Topic, feed_var(ClientInfo, ACLTopic)).
 
-match_actions(_, pubsub) -> true;
 match_actions(subscribe, sub) -> true;
 match_actions(publish, pub) -> true;
 match_actions(_, _) -> false.

+ 16 - 18
apps/emqx_auth_mnesia/src/emqx_acl_mnesia_api.erl

@@ -16,8 +16,6 @@
 
 -module(emqx_acl_mnesia_api).
 
--include("emqx_auth_mnesia.hrl").
-
 -include_lib("stdlib/include/ms_transform.hrl").
 
 -import(proplists, [ get_value/2
@@ -99,26 +97,22 @@
         ]).
 
 list_clientid(_Bindings, Params) ->
-    MatchSpec = ets:fun2ms(
-                  fun({emqx_acl, {{clientid, Clientid}, Topic}, Action, Access, CreatedAt}) -> {{clientid,Clientid}, Topic, Action,Access, CreatedAt} end),
-    return({ok, emqx_auth_mnesia_api:paginate(emqx_acl, MatchSpec, Params, fun emqx_acl_mnesia_cli:comparing/2, fun format/1)}).
+    Table = emqx_acl_mnesia_db:login_acl_table(clientid),
+    return({ok, emqx_auth_mnesia_api:paginate_qh(Table, count(Table), Params, fun emqx_acl_mnesia_db:comparing/2, fun format/1)}).
 
 list_username(_Bindings, Params) ->
-    MatchSpec = ets:fun2ms(
-                  fun({emqx_acl, {{username, Username}, Topic}, Action, Access, CreatedAt}) -> {{username, Username}, Topic, Action,Access, CreatedAt} end),
-    return({ok, emqx_auth_mnesia_api:paginate(emqx_acl, MatchSpec, Params, fun emqx_acl_mnesia_cli:comparing/2, fun format/1)}).
+    Table = emqx_acl_mnesia_db:login_acl_table(username),
+    return({ok, emqx_auth_mnesia_api:paginate_qh(Table, count(Table), Params, fun emqx_acl_mnesia_db:comparing/2, fun format/1)}).
 
 list_all(_Bindings, Params) ->
-    MatchSpec = ets:fun2ms(
-                  fun({emqx_acl, {all, Topic}, Action, Access, CreatedAt}) -> {all, Topic, Action,Access, CreatedAt}end
-                 ),
-    return({ok, emqx_auth_mnesia_api:paginate(emqx_acl, MatchSpec, Params, fun emqx_acl_mnesia_cli:comparing/2, fun format/1)}).
+    Table = emqx_acl_mnesia_db:login_acl_table(all),
+    return({ok, emqx_auth_mnesia_api:paginate_qh(Table, count(Table), Params, fun emqx_acl_mnesia_db:comparing/2, fun format/1)}).
 
 
 lookup(#{clientid := Clientid}, _Params) ->
-    return({ok, format(emqx_acl_mnesia_cli:lookup_acl({clientid, urldecode(Clientid)}))});
+    return({ok, format(emqx_acl_mnesia_db:lookup_acl({clientid, urldecode(Clientid)}))});
 lookup(#{username := Username}, _Params) ->
-    return({ok, format(emqx_acl_mnesia_cli:lookup_acl({username, urldecode(Username)}))}).
+    return({ok, format(emqx_acl_mnesia_db:lookup_acl({username, urldecode(Username)}))}).
 
 add(_Bindings, Params) ->
     [ P | _] = Params,
@@ -152,7 +146,7 @@ do_add(Params) ->
     Access = get_value(<<"access">>, Params),
     Re = case validate([login, topic, action, access], [Login, Topic, Action, Access]) of
         ok ->
-            emqx_acl_mnesia_cli:add_acl(Login, Topic, erlang:binary_to_atom(Action, utf8), erlang:binary_to_atom(Access, utf8));
+            emqx_acl_mnesia_db:add_acl(Login, Topic, erlang:binary_to_atom(Action, utf8), erlang:binary_to_atom(Access, utf8));
         Err -> Err
     end,
     maps:merge(#{topic => Topic,
@@ -165,15 +159,19 @@ do_add(Params) ->
                    end).
 
 delete(#{clientid := Clientid, topic := Topic}, _) ->
-    return(emqx_acl_mnesia_cli:remove_acl({clientid, urldecode(Clientid)}, urldecode(Topic)));
+    return(emqx_acl_mnesia_db:remove_acl({clientid, urldecode(Clientid)}, urldecode(Topic)));
 delete(#{username := Username, topic := Topic}, _) ->
-    return(emqx_acl_mnesia_cli:remove_acl({username, urldecode(Username)}, urldecode(Topic)));
+    return(emqx_acl_mnesia_db:remove_acl({username, urldecode(Username)}, urldecode(Topic)));
 delete(#{topic := Topic}, _) ->
-    return(emqx_acl_mnesia_cli:remove_acl(all, urldecode(Topic))).
+    return(emqx_acl_mnesia_db:remove_acl(all, urldecode(Topic))).
 
 %%------------------------------------------------------------------------------
 %% Interval Funcs
 %%------------------------------------------------------------------------------
+
+count(QH) ->
+    qlc:fold(fun(_, Count) -> Count + 1 end, 0, QH).
+
 format({{clientid, Clientid}, Topic, Action, Access, _CreatedAt}) ->
     #{clientid => Clientid, topic => Topic, action => Action, access => Access};
 format({{username, Username}, Topic, Action, Access, _CreatedAt}) ->

+ 12 - 125
apps/emqx_auth_mnesia/src/emqx_acl_mnesia_cli.erl

@@ -16,110 +16,28 @@
 
 -module(emqx_acl_mnesia_cli).
 
--include("emqx_auth_mnesia.hrl").
--include_lib("emqx/include/logger.hrl").
--include_lib("stdlib/include/ms_transform.hrl").
--define(TABLE, emqx_acl).
-
-%% Acl APIs
--export([ add_acl/4
-        , lookup_acl/1
-        , all_acls/0
-        , all_acls/1
-        , remove_acl/2
-        ]).
-
 -export([cli/1]).
--export([comparing/2]).
-%%--------------------------------------------------------------------
-%% Acl API
-%%--------------------------------------------------------------------
-
-%% @doc Add Acls
--spec(add_acl(login() | all, emqx_topic:topic(), pub | sub | pubsub, allow | deny) ->
-        ok | {error, any()}).
-add_acl(Login, Topic, Action, Access) ->
-    Filter = {Login, Topic},
-    Acl = #?TABLE{
-             filter = Filter,
-             action = Action,
-             access = Access,
-             created_at = erlang:system_time(millisecond)
-            },
-    ret(mnesia:transaction(
-          fun() ->
-                  OldRecords = mnesia:wread({?TABLE, Filter}),
-                  case Action of
-                      pubsub ->
-                          update_permission(pub, Acl, OldRecords),
-                          update_permission(sub, Acl, OldRecords);
-                      _ ->
-                          update_permission(Action, Acl, OldRecords)
-                  end
-          end)).
-
-%% @doc Lookup acl by login
--spec(lookup_acl(login() | all) -> list()).
-lookup_acl(undefined) -> [];
-lookup_acl(Login) ->
-    MatchSpec = ets:fun2ms(fun({?TABLE, {Filter, ACLTopic}, Action, Access, CreatedAt})
-                                 when Filter =:= Login ->
-                                   {Filter, ACLTopic, Action, Access, CreatedAt}
-                           end),
-    lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)).
-
-%% @doc Remove acl
--spec(remove_acl(login() | all, emqx_topic:topic()) -> ok | {error, any()}).
-remove_acl(Login, Topic) ->
-    ret(mnesia:transaction(fun mnesia:delete/1, [{?TABLE, {Login, Topic}}])).
-
-%% @doc All logins
--spec(all_acls() -> list()).
-all_acls() ->
-    all_acls(clientid) ++
-    all_acls(username) ++
-    all_acls(all).
-
-all_acls(clientid) ->
-    MatchSpec = ets:fun2ms(
-                  fun({?TABLE, {{clientid, Clientid}, Topic}, Action, Access, CreatedAt}) ->
-                          {{clientid, Clientid}, Topic, Action, Access, CreatedAt}
-                  end),
-    lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec));
-all_acls(username) ->
-    MatchSpec = ets:fun2ms(
-                  fun({?TABLE, {{username, Username}, Topic}, Action, Access, CreatedAt}) ->
-                          {{username, Username}, Topic, Action, Access, CreatedAt}
-                  end),
-    lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec));
-all_acls(all) ->
-    MatchSpec = ets:fun2ms(
-                  fun({?TABLE, {all, Topic}, Action, Access, CreatedAt}) ->
-                          {all, Topic, Action, Access, CreatedAt}
-                  end
-                 ),
-    lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)).
 
 %%--------------------------------------------------------------------
 %% ACL Cli
 %%--------------------------------------------------------------------
 
 cli(["list"]) ->
-    [print_acl(Acl) || Acl <- all_acls()];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:all_acls()];
 
 cli(["list", "clientid"]) ->
-    [print_acl(Acl) || Acl <- all_acls(clientid)];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:all_acls(clientid)];
 
 cli(["list", "username"]) ->
-    [print_acl(Acl) || Acl <- all_acls(username)];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:all_acls(username)];
 
 cli(["list", "_all"]) ->
-    [print_acl(Acl) || Acl <- all_acls(all)];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:all_acls(all)];
 
 cli(["add", "clientid", Clientid, Topic, Action, Access]) ->
     case validate(action, Action) andalso validate(access, Access) of
         true ->
-            case add_acl(
+            case emqx_acl_mnesia_db:add_acl(
                    {clientid, iolist_to_binary(Clientid)},
                    iolist_to_binary(Topic),
                    list_to_existing_atom(Action),
@@ -135,7 +53,7 @@ cli(["add", "clientid", Clientid, Topic, Action, Access]) ->
 cli(["add", "username", Username, Topic, Action, Access]) ->
     case validate(action, Action) andalso validate(access, Access) of
         true ->
-            case add_acl(
+            case emqx_acl_mnesia_db:add_acl(
                    {username, iolist_to_binary(Username)},
                    iolist_to_binary(Topic),
                    list_to_existing_atom(Action),
@@ -151,7 +69,7 @@ cli(["add", "username", Username, Topic, Action, Access]) ->
 cli(["add", "_all", Topic, Action, Access]) ->
     case validate(action, Action) andalso validate(access, Access) of
         true ->
-            case add_acl(
+            case emqx_acl_mnesia_db:add_acl(
                    all,
                    iolist_to_binary(Topic),
                    list_to_existing_atom(Action),
@@ -165,16 +83,16 @@ cli(["add", "_all", Topic, Action, Access]) ->
     end;
 
 cli(["show", "clientid", Clientid]) ->
-    [print_acl(Acl) || Acl <- lookup_acl({clientid, iolist_to_binary(Clientid)})];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:lookup_acl({clientid, iolist_to_binary(Clientid)})];
 
 cli(["show", "username", Username]) ->
-    [print_acl(Acl) || Acl <- lookup_acl({username, iolist_to_binary(Username)})];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:lookup_acl({username, iolist_to_binary(Username)})];
 
 cli(["del", "clientid", Clientid, Topic])->
     cli(["delete", "clientid", Clientid, Topic]);
 
 cli(["delete", "clientid", Clientid, Topic])->
-    case remove_acl({clientid, iolist_to_binary(Clientid)}, iolist_to_binary(Topic)) of
+    case emqx_acl_mnesia_db:remove_acl({clientid, iolist_to_binary(Clientid)}, iolist_to_binary(Topic)) of
          ok -> emqx_ctl:print("ok~n");
         {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason])
     end;
@@ -183,7 +101,7 @@ cli(["del", "username", Username, Topic])->
     cli(["delete", "username", Username, Topic]);
 
 cli(["delete", "username", Username, Topic])->
-    case remove_acl({username, iolist_to_binary(Username)}, iolist_to_binary(Topic)) of
+    case emqx_acl_mnesia_db:remove_acl({username, iolist_to_binary(Username)}, iolist_to_binary(Topic)) of
          ok -> emqx_ctl:print("ok~n");
         {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason])
     end;
@@ -192,7 +110,7 @@ cli(["del", "_all", Topic])->
     cli(["delete", "_all", Topic]);
 
 cli(["delete", "_all", Topic])->
-    case remove_acl(all, iolist_to_binary(Topic)) of
+    case emqx_acl_mnesia_db:remove_acl(all, iolist_to_binary(Topic)) of
          ok -> emqx_ctl:print("ok~n");
         {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason])
     end;
@@ -215,13 +133,6 @@ cli(_) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-comparing({_, _, _, _, CreatedAt1},
-          {_, _, _, _, CreatedAt2}) ->
-    CreatedAt1 >= CreatedAt2.
-
-ret({atomic, ok})     -> ok;
-ret({aborted, Error}) -> {error, Error}.
-
 validate(action, "pub") -> true;
 validate(action, "sub") -> true;
 validate(action, "pubsub") -> true;
@@ -244,27 +155,3 @@ print_acl({all, Topic, Action, Access, _}) ->
         "Acl($all topic = ~p action = ~p access = ~p)~n",
         [Topic, Action, Access]
      ).
-
-update_permission(Action, Acl0, OldRecords) ->
-    Acl = Acl0 #?TABLE{action = Action},
-    maybe_delete_shadowed_records(Action, OldRecords),
-    mnesia:write(Acl).
-
-maybe_delete_shadowed_records(_, []) ->
-    ok;
-maybe_delete_shadowed_records(Action1, [Rec = #emqx_acl{action = Action2} | Rest]) ->
-    if Action1 =:= Action2 ->
-            ok = mnesia:delete_object(Rec);
-       Action2 =:= pubsub ->
-            %% Perform migration from the old data format on the
-            %% fly. This is needed only for the enterprise version,
-            %% delete this branch on 5.0
-            mnesia:delete_object(Rec),
-            mnesia:write(Rec#?TABLE{action = other_action(Action1)});
-       true ->
-            ok
-    end,
-    maybe_delete_shadowed_records(Action1, Rest).
-
-other_action(pub) -> sub;
-other_action(sub) -> pub.

+ 339 - 0
apps/emqx_auth_mnesia/src/emqx_acl_mnesia_db.erl

@@ -0,0 +1,339 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_acl_mnesia_db).
+
+-include("emqx_auth_mnesia.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("stdlib/include/qlc.hrl").
+
+%% ACL APIs
+-export([ create_table/0
+        , create_table2/0
+        ]).
+
+-export([ add_acl/4
+        , lookup_acl/1
+        , all_acls_export/0
+        , all_acls/0
+        , all_acls/1
+        , remove_acl/2
+        , merge_acl_records/3
+        , login_acl_table/1
+        , is_migration_started/0
+        ]).
+
+-export([comparing/2]).
+
+%%--------------------------------------------------------------------
+%% ACL API
+%%--------------------------------------------------------------------
+
+%% @doc Create table `emqx_acl` of old format rules
+-spec(create_table() -> ok).
+create_table() ->
+    ok = ekka_mnesia:create_table(?ACL_TABLE, [
+        {type, bag},
+        {disc_copies, [node()]},
+        {attributes, record_info(fields, ?ACL_TABLE)},
+        {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
+    ok = ekka_mnesia:copy_table(?ACL_TABLE, disc_copies).
+
+%% @doc Create table `emqx_acl2` of new format rules
+-spec(create_table2() -> ok).
+create_table2() ->
+    ok = ekka_mnesia:create_table(?ACL_TABLE2, [
+            {type, ordered_set},
+            {disc_copies, [node()]},
+            {attributes, record_info(fields, ?ACL_TABLE2)},
+            {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
+    ok = ekka_mnesia:copy_table(?ACL_TABLE2, disc_copies).
+
+%% @doc Add Acls
+-spec(add_acl(acl_target(), emqx_topic:topic(), legacy_action(), access()) ->
+        ok | {error, any()}).
+add_acl(Login, Topic, Action, Access) ->
+    ret(mnesia:transaction(fun() ->
+                              case is_migration_started() of
+                                  true -> add_acl_new(Login, Topic, Action, Access);
+                                  false -> add_acl_old(Login, Topic, Action, Access)
+                              end
+                           end)).
+
+%% @doc Lookup acl by login
+-spec(lookup_acl(acl_target()) -> list(acl_record())).
+lookup_acl(undefined) -> [];
+lookup_acl(Login) ->
+    % After migration to ?ACL_TABLE2, ?ACL_TABLE never has any rules. This lookup should be removed later.
+    MatchSpec = ets:fun2ms(fun(#?ACL_TABLE{filter = {Filter, _}} = Rec)
+                                 when Filter =:= Login -> Rec
+                           end),
+    OldRecs = ets:select(?ACL_TABLE, MatchSpec),
+
+    NewAcls = ets:lookup(?ACL_TABLE2, Login),
+    MergedAcl = merge_acl_records(Login, OldRecs, NewAcls),
+    lists:sort(fun comparing/2, acl_to_list(MergedAcl)).
+
+%% @doc Remove ACL
+-spec remove_acl(acl_target(), emqx_topic:topic()) -> ok | {error, any()}.
+remove_acl(Login, Topic) ->
+    ret(mnesia:transaction(fun() ->
+                              mnesia:delete({?ACL_TABLE, {Login, Topic}}),
+                              case mnesia:wread({?ACL_TABLE2, Login}) of
+                                  [] -> ok;
+                                  [#?ACL_TABLE2{rules = Rules} = Acl] ->
+                                      case delete_topic_rules(Topic, Rules) of
+                                          [] -> mnesia:delete({?ACL_TABLE2, Login});
+                                          [_ | _] = RemainingRules ->
+                                              mnesia:write(Acl#?ACL_TABLE2{rules = RemainingRules})
+                                      end
+                              end
+                           end)).
+
+%% @doc All ACL rules
+-spec(all_acls() -> list(acl_record())).
+all_acls() ->
+    all_acls(username) ++
+    all_acls(clientid) ++
+    all_acls(all).
+
+%% @doc All ACL rules of specified type
+-spec(all_acls(acl_target_type()) -> list(acl_record())).
+all_acls(AclTargetType) ->
+    lists:sort(fun comparing/2, qlc:eval(login_acl_table(AclTargetType))).
+
+%% @doc All ACL rules fetched transactionally
+-spec(all_acls_export() -> list(acl_record())).
+all_acls_export() ->
+    AclTargetTypes = [username, clientid, all],
+    MatchSpecNew = lists:flatmap(fun login_match_spec_new/1, AclTargetTypes),
+    MatchSpecOld = lists:flatmap(fun login_match_spec_old/1, AclTargetTypes),
+
+    {atomic, Records} = mnesia:transaction(
+        fun() ->
+            QH = acl_table(MatchSpecNew, MatchSpecOld, fun mnesia:table/2, fun lookup_mnesia/2),
+            qlc:eval(QH)
+        end),
+    Records.
+
+%% @doc QLC table of logins matching spec
+-spec(login_acl_table(acl_target_type()) -> qlc:query_handle()).
+login_acl_table(AclTargetType) ->
+    MatchSpecNew = login_match_spec_new(AclTargetType),
+    MatchSpecOld = login_match_spec_old(AclTargetType),
+    acl_table(MatchSpecNew, MatchSpecOld, fun ets:table/2, fun lookup_ets/2).
+
+%% @doc Combine old `emqx_acl` ACL records with a new `emqx_acl2` ACL record for a given login
+-spec(merge_acl_records(acl_target(), [#?ACL_TABLE{}], [#?ACL_TABLE2{}]) -> #?ACL_TABLE2{}).
+merge_acl_records(Login, OldRecs, Acls) ->
+    OldRules = old_recs_to_rules(OldRecs),
+    NewRules = case Acls of
+        [] -> [];
+        [#?ACL_TABLE2{rules = Rules}] -> Rules
+    end,
+    #?ACL_TABLE2{who = Login, rules = merge_rules(NewRules, OldRules)}.
+
+%% @doc Checks if background migration of ACL rules from `emqx_acl` to `emqx_acl2` format started.
+%% Should be run in transaction
+-spec(is_migration_started() -> boolean()).
+is_migration_started() ->
+    case mnesia:read({?ACL_TABLE, ?MIGRATION_MARK_KEY}) of
+        [?MIGRATION_MARK_RECORD | _] -> true;
+        [] -> false
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+add_acl_new(Login, Topic, Action, Access) ->
+    Rule = {Access, Action, Topic, erlang:system_time(millisecond)},
+    Rules = normalize_rule(Rule),
+    OldAcl = mnesia:wread({?ACL_TABLE2, Login}),
+    NewAcl = case OldAcl of
+        [#?ACL_TABLE2{rules = OldRules} = Acl] ->
+            Acl#?ACL_TABLE2{rules = merge_rules(Rules, OldRules)};
+        [] ->
+            #?ACL_TABLE2{who = Login, rules = Rules}
+    end,
+    mnesia:write(NewAcl).
+
+add_acl_old(Login, Topic, Action, Access) ->
+    Filter = {Login, Topic},
+    Acl = #?ACL_TABLE{
+             filter = Filter,
+             action = Action,
+             access = Access,
+             created_at = erlang:system_time(millisecond)
+            },
+    OldRecords = mnesia:wread({?ACL_TABLE, Filter}),
+    case Action of
+        pubsub ->
+            update_permission(pub, Acl, OldRecords),
+            update_permission(sub, Acl, OldRecords);
+        _ ->
+            update_permission(Action, Acl, OldRecords)
+    end.
+
+old_recs_to_rules(OldRecs) ->
+    lists:flatmap(fun old_rec_to_rules/1, OldRecs).
+
+old_rec_to_rules(#?ACL_TABLE{filter = {_, Topic}, action = Action, access = Access, created_at = CreatedAt}) ->
+    normalize_rule({Access, Action, Topic, CreatedAt}).
+
+normalize_rule({Access, pubsub, Topic, CreatedAt}) ->
+    [{Access, pub, Topic, CreatedAt}, {Access, sub, Topic, CreatedAt}];
+normalize_rule({Access, Action, Topic, CreatedAt}) ->
+    [{Access, Action, Topic, CreatedAt}].
+
+merge_rules([], OldRules) -> OldRules;
+merge_rules([NewRule | RestNewRules], OldRules) ->
+    merge_rules(RestNewRules, merge_rule(NewRule, OldRules)).
+
+merge_rule({_, Action, Topic, _ } = NewRule, OldRules) ->
+    [NewRule | lists:filter(
+        fun({_, OldAction, OldTopic, _}) ->
+            {Action, Topic} =/= {OldAction, OldTopic}
+        end, OldRules)].
+
+acl_to_list(#?ACL_TABLE2{who = Login, rules = Rules}) ->
+    [{Login, Topic, Action, Access, CreatedAt} || {Access, Action, Topic, CreatedAt} <- Rules].
+
+delete_topic_rules(Topic, Rules) ->
+    [Rule || {_, _, T, _} = Rule <- Rules, T =/= Topic].
+
+comparing({_, _, _, _, CreatedAt} = Rec1,
+          {_, _, _, _, CreatedAt} = Rec2) ->
+        Rec1 >= Rec2;
+
+comparing({_, _, _, _, CreatedAt1},
+          {_, _, _, _, CreatedAt2}) ->
+    CreatedAt1 >= CreatedAt2.
+
+login_match_spec_old(all) ->
+    ets:fun2ms(fun(#?ACL_TABLE{filter = {all, _}} = Record) ->
+                Record
+               end);
+
+login_match_spec_old(Type) when (Type =:= username) or (Type =:= clientid) ->
+    ets:fun2ms(fun(#?ACL_TABLE{filter = {{RecordType, _}, _}} = Record)
+                    when RecordType =:= Type -> Record
+               end).
+
+login_match_spec_new(all) ->
+    ets:fun2ms(fun(#?ACL_TABLE2{who = all} = Record) ->
+                Record
+               end);
+
+login_match_spec_new(Type) when (Type =:= username) or (Type =:= clientid) ->
+    ets:fun2ms(fun(#?ACL_TABLE2{who = {RecordType, _}} = Record)
+                    when RecordType =:= Type -> Record
+               end).
+
+acl_table(MatchSpecNew, MatchSpecOld, TableFun, LookupFun) ->
+    TraverseFun =
+        fun() ->
+           CursorNew =
+               qlc:cursor(
+                   TableFun(?ACL_TABLE2, [{traverse, {select, MatchSpecNew}}])),
+           CursorOld =
+               qlc:cursor(
+                   TableFun(?ACL_TABLE, [{traverse, {select, MatchSpecOld}}])),
+           traverse_new(CursorNew, CursorOld, #{}, LookupFun)
+        end,
+
+    qlc:table(TraverseFun, []).
+
+
+% These are traverse funs for qlc table created by `acl_table/4`.
+% Traversing consumes memory: it collects logins present in `?ACL_TABLE` and
+% at the same time having rules in `?ACL_TABLE2`.
+% Such records appear if ACLs are inserted before migration started.
+% After migration, number of such logins is zero, so traversing starts working in
+% constant memory.
+
+traverse_new(CursorNew, CursorOld, FoundKeys, LookupFun) ->
+    Acls = qlc:next_answers(CursorNew, 1),
+    case Acls of
+        [] ->
+            qlc:delete_cursor(CursorNew),
+            traverse_old(CursorOld, FoundKeys);
+        [#?ACL_TABLE2{who = Login, rules = Rules} = Acl] ->
+            Keys = lists:usort([{Login, Topic} || {_, _, Topic, _} <- Rules]),
+            OldRecs = lists:flatmap(fun(Key) -> LookupFun(?ACL_TABLE, Key) end, Keys),
+            MergedAcl = merge_acl_records(Login, OldRecs, [Acl]),
+            NewFoundKeys =
+                lists:foldl(fun(#?ACL_TABLE{filter = Key}, Found) -> maps:put(Key, true, Found) end,
+                            FoundKeys,
+                            OldRecs),
+            case acl_to_list(MergedAcl) of
+                [] ->
+                    traverse_new(CursorNew, CursorOld, NewFoundKeys, LookupFun);
+                List ->
+                    List ++ fun() -> traverse_new(CursorNew, CursorOld, NewFoundKeys, LookupFun) end
+            end
+    end.
+
+traverse_old(CursorOld, FoundKeys) ->
+    OldAcls = qlc:next_answers(CursorOld),
+    case OldAcls of
+        [] ->
+            qlc:delete_cursor(CursorOld),
+            [];
+        _ ->
+            Records = [ {Login, Topic, Action, Access, CreatedAt}
+                || #?ACL_TABLE{filter = {Login, Topic}, action = LegacyAction, access = Access, created_at = CreatedAt} <- OldAcls,
+                {_, Action, _, _} <- normalize_rule({Access, LegacyAction, Topic, CreatedAt}),
+                not maps:is_key({Login, Topic}, FoundKeys)
+            ],
+            case Records of
+                [] -> traverse_old(CursorOld, FoundKeys);
+                List -> List ++ fun() -> traverse_old(CursorOld, FoundKeys) end
+            end
+    end.
+
+lookup_mnesia(Tab, Key) ->
+    mnesia:read({Tab, Key}).
+
+lookup_ets(Tab, Key) ->
+    ets:lookup(Tab, Key).
+
+update_permission(Action, Acl0, OldRecords) ->
+    Acl = Acl0 #?ACL_TABLE{action = Action},
+    maybe_delete_shadowed_records(Action, OldRecords),
+    mnesia:write(Acl).
+
+maybe_delete_shadowed_records(_, []) ->
+    ok;
+maybe_delete_shadowed_records(Action1, [Rec = #emqx_acl{action = Action2} | Rest]) ->
+    if Action1 =:= Action2 ->
+            ok = mnesia:delete_object(Rec);
+       Action2 =:= pubsub ->
+            %% Perform migration from the old data format on the
+            %% fly. This is needed only for the enterprise version,
+            %% delete this branch on 5.0
+            mnesia:delete_object(Rec),
+            mnesia:write(Rec#?ACL_TABLE{action = other_action(Action1)});
+       true ->
+            ok
+    end,
+    maybe_delete_shadowed_records(Action1, Rest).
+
+other_action(pub) -> sub;
+other_action(sub) -> pub.
+
+ret({atomic, ok})     -> ok;
+ret({aborted, Error}) -> {error, Error}.

+ 215 - 0
apps/emqx_auth_mnesia/src/emqx_acl_mnesia_migrator.erl

@@ -0,0 +1,215 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_acl_mnesia_migrator).
+
+-include("emqx_auth_mnesia.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-behaviour(gen_statem).
+
+-define(CHECK_ALL_NODES_INTERVAL, 60000).
+
+-type(migration_delay_reason() :: old_nodes | bad_nodes).
+
+-export([
+    callback_mode/0,
+    init/1
+]).
+
+-export([
+    waiting_all_nodes/3,
+    checking_old_table/3,
+    migrating/3
+]).
+
+-export([
+    start_link/0,
+    start_link/1,
+    start_supervised/0,
+    stop_supervised/0,
+    migrate_records/0,
+    is_migrating_on_node/1,
+    is_old_table_migrated/0
+]).
+
+%%--------------------------------------------------------------------
+%% External interface
+%%--------------------------------------------------------------------
+
+start_link() ->
+    start_link(?MODULE).
+
+start_link(Name) when is_atom(Name) ->
+    start_link(#{
+        name => Name
+    });
+
+start_link(#{name := Name} = Opts) ->
+    gen_statem:start_link({local, Name}, ?MODULE, Opts, []).
+
+start_supervised() ->
+    try
+        {ok, _} = supervisor:restart_child(emqx_auth_mnesia_sup, ?MODULE),
+        ok
+    catch
+        exit:{noproc, _} -> ok
+    end.
+
+stop_supervised() ->
+    try
+        ok = supervisor:terminate_child(emqx_auth_mnesia_sup, ?MODULE),
+        ok = supervisor:delete_child(emqx_auth_mnesia_sup, ?MODULE)
+    catch
+        exit:{noproc, _} -> ok
+    end.
+
+%%--------------------------------------------------------------------
+%% gen_statem callbacks
+%%--------------------------------------------------------------------
+
+callback_mode() -> state_functions.
+
+init(Opts) ->
+    ok = emqx_acl_mnesia_db:create_table(),
+    ok = emqx_acl_mnesia_db:create_table2(),
+    Name = maps:get(name, Opts, ?MODULE),
+    CheckNodesInterval = maps:get(check_nodes_interval, Opts, ?CHECK_ALL_NODES_INTERVAL),
+    GetNodes = maps:get(get_nodes, Opts, fun all_nodes/0),
+    Data =
+        #{name => Name,
+          check_nodes_interval => CheckNodesInterval,
+          get_nodes => GetNodes},
+    {ok, waiting_all_nodes, Data, [{state_timeout, 0, check_nodes}]}.
+
+%%--------------------------------------------------------------------
+%% state callbacks
+%%--------------------------------------------------------------------
+
+waiting_all_nodes(state_timeout, check_nodes, Data) ->
+    #{name := Name, check_nodes_interval := CheckNodesInterval, get_nodes := GetNodes} = Data,
+    case is_all_nodes_migrating(Name, GetNodes()) of
+        true ->
+            ?tp(info, emqx_acl_mnesia_migrator_check_old_table, #{}),
+            {next_state, checking_old_table, Data, [{next_event, internal, check_old_table}]};
+        {false, Reason, Nodes} ->
+            ?tp(info,
+                emqx_acl_mnesia_migrator_bad_nodes_delay,
+                #{delay => CheckNodesInterval,
+                  reason => Reason,
+                  name => Name,
+                  nodes => Nodes}),
+            {keep_state_and_data, [{state_timeout, CheckNodesInterval, check_nodes}]}
+    end.
+
+checking_old_table(internal, check_old_table, Data) ->
+    case is_old_table_migrated() of
+        true ->
+            ?tp(info, emqx_acl_mnesia_migrator_finish, #{}),
+            {next_state, finished, Data, [{hibernate, true}]};
+        false ->
+            ?tp(info, emqx_acl_mnesia_migrator_start_migration, #{}),
+            {next_state, migrating, Data, [{next_event, internal, start_migration}]}
+    end.
+
+migrating(internal, start_migration, Data) ->
+    ok = migrate_records(),
+    {next_state, checking_old_table, Data, [{next_event, internal, check_old_table}]}.
+
+%% @doc Returns `true` if migration is started in the local node, otherwise crash.
+-spec(is_migrating_on_node(atom()) -> true).
+is_migrating_on_node(Name) ->
+    true = is_pid(erlang:whereis(Name)).
+
+%% @doc Run migration of records
+-spec(migrate_records() -> ok).
+migrate_records() ->
+    ok = add_migration_mark(),
+    Key = peek_record(),
+    do_migrate_records(Key).
+
+%% @doc Run migration of records
+-spec(is_all_nodes_migrating(atom(), list(node())) -> true | {false, migration_delay_reason(), list(node())}).
+is_all_nodes_migrating(Name, Nodes) ->
+    case rpc:multicall(Nodes, ?MODULE, is_migrating_on_node, [Name]) of
+        {Results, []} ->
+            OldNodes = [ Node || {Node, Result} <- lists:zip(Nodes, Results), Result =/= true ],
+            case OldNodes of
+                [] -> true;
+                _ -> {false, old_nodes, OldNodes}
+            end;
+        {_, [_BadNode | _] = BadNodes} ->
+            {false, bad_nodes, BadNodes}
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+all_nodes() ->
+    ekka_mnesia:cluster_nodes(all).
+
+is_old_table_migrated() ->
+    Result =
+        mnesia:transaction(fun() ->
+                              case mnesia:first(?ACL_TABLE) of
+                                  ?MIGRATION_MARK_KEY ->
+                                      case mnesia:next(?ACL_TABLE, ?MIGRATION_MARK_KEY) of
+                                          '$end_of_table' -> true;
+                                          _OtherKey -> false
+                                      end;
+                                  '$end_of_table' -> false;
+                                  _OtherKey -> false
+                              end
+                           end),
+    case Result of
+        {atomic, true} ->
+            true;
+        _ ->
+            false
+    end.
+
+add_migration_mark() ->
+    {atomic, ok} = mnesia:transaction(fun() -> mnesia:write(?MIGRATION_MARK_RECORD) end),
+    ok.
+
+peek_record() ->
+    Key = mnesia:dirty_first(?ACL_TABLE),
+    case Key of
+        ?MIGRATION_MARK_KEY ->
+            mnesia:dirty_next(?ACL_TABLE, Key);
+        _ -> Key
+    end.
+
+do_migrate_records('$end_of_table') -> ok;
+do_migrate_records({_Login, _Topic} = Key) ->
+    ?tp(emqx_acl_mnesia_migrator_record_selected, #{key => Key}),
+    _ = mnesia:transaction(fun migrate_one_record/1, [Key]),
+    do_migrate_records(peek_record()).
+
+migrate_one_record({Login, _Topic} = Key) ->
+    case mnesia:wread({?ACL_TABLE, Key}) of
+        [] ->
+            ?tp(emqx_acl_mnesia_migrator_record_missed, #{key => Key}),
+            record_missing;
+        OldRecs ->
+            Acls = mnesia:wread({?ACL_TABLE2, Login}),
+            UpdatedAcl = emqx_acl_mnesia_db:merge_acl_records(Login, OldRecs, Acls),
+            ok = mnesia:write(UpdatedAcl),
+            ok = mnesia:delete({?ACL_TABLE, Key}),
+            ?tp(emqx_acl_mnesia_migrator_record_migrated, #{key => Key})
+    end.

+ 1 - 1
apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src

@@ -1,6 +1,6 @@
 {application, emqx_auth_mnesia,
  [{description, "EMQ X Authentication with Mnesia"},
-  {vsn, "4.3.3"}, % strict semver, bump manually
+  {vsn, "4.3.4"}, % strict semver, bump manually
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,mnesia]},

+ 21 - 20
apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src

@@ -1,30 +1,31 @@
-%% -*-: erlang -*-
-
+%% -*- mode: erlang -*-
 {VSN,
   [
-    {"4.3.2", [
+    {<<"4.3.[0-3]">>, [
+      {add_module,emqx_acl_mnesia_db},
+      {add_module,emqx_acl_mnesia_migrator, [emqx_acl_mnesia_db]},
+      {update, emqx_auth_mnesia_sup, supervisor},
+      {apply, {emqx_acl_mnesia_migrator, start_supervised, []}},
+      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]},
+      {load_module,emqx_acl_mnesia, brutal_purge,soft_purge,[]},
       {load_module,emqx_acl_mnesia_api, brutal_purge,soft_purge,[]},
-      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]}
+      {load_module,emqx_acl_mnesia_cli, brutal_purge,soft_purge,[]}
     ]},
-    {"4.3.1", [
-      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]}
-    ]},
-    {"4.3.0", [
-      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]}
-    ]},
-    {<<".*">>, []}
+    {<<".*">>, [
+    ]}
   ],
   [
-    {"4.3.2", [
+    {<<"4.3.[0-3]">>, [
+      {apply, {emqx_acl_mnesia_migrator, stop_supervised, []}},
+      {update, emqx_auth_mnesia_sup, supervisor},
+      {load_module,emqx_acl_mnesia_cli, brutal_purge,soft_purge,[]},
       {load_module,emqx_acl_mnesia_api, brutal_purge,soft_purge,[]},
-      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]}
-    ]},
-    {"4.3.1", [
-      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]}
-    ]},
-    {"4.3.0", [
-      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]}
+      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]},
+      {load_module,emqx_acl_mnesia, brutal_purge,soft_purge,[]},
+      {delete_module,emqx_acl_mnesia_migrator},
+      {delete_module,emqx_acl_mnesia_db}
     ]},
-    {<<".*">>, []}
+    {<<".*">>, [
+    ]}
   ]
 }.

+ 9 - 18
apps/emqx_auth_mnesia/src/emqx_auth_mnesia_api.erl

@@ -23,7 +23,7 @@
 
 -import(proplists, [get_value/2]).
 -import(minirest,  [return/1]).
--export([paginate/5]).
+-export([paginate_qh/5]).
 
 -export([ list_clientid/2
         , lookup_clientid/2
@@ -212,9 +212,12 @@ delete_username(#{username := Username}, _) ->
 %% Paging Query
 %%------------------------------------------------------------------------------
 
-paginate(Tables, MatchSpec, Params, ComparingFun, RowFun) ->
-    Qh = query_handle(Tables, MatchSpec),
-    Count = count(Tables, MatchSpec),
+paginate(Table, MatchSpec, Params, ComparingFun, RowFun) ->
+    Qh = query_handle(Table, MatchSpec),
+    Count = count(Table, MatchSpec),
+    paginate_qh(Qh, Count, Params, ComparingFun, RowFun).
+
+paginate_qh(Qh, Count, Params, ComparingFun, RowFun) ->
     Page = page(Params),
     Limit = limit(Params),
     Cursor = qlc:cursor(Qh),
@@ -231,24 +234,12 @@ paginate(Tables, MatchSpec, Params, ComparingFun, RowFun) ->
 
 query_handle(Table, MatchSpec) when is_atom(Table) ->
     Options = {traverse, {select, MatchSpec}},
-    qlc:q([R|| R <- ets:table(Table, Options)]);
-query_handle([Table], MatchSpec) when is_atom(Table) ->
-    Options = {traverse, {select, MatchSpec}},
-    qlc:q([R|| R <- ets:table(Table, Options)]);
-query_handle(Tables, MatchSpec) ->
-    Options = {traverse, {select, MatchSpec}},
-    qlc:append([qlc:q([E || E <- ets:table(T, Options)]) || T <- Tables]).
+    qlc:q([R || R <- ets:table(Table, Options)]).
 
 count(Table, MatchSpec) when is_atom(Table) ->
     [{MatchPattern, Where, _Re}] = MatchSpec,
     NMatchSpec = [{MatchPattern, Where, [true]}],
-    ets:select_count(Table, NMatchSpec);
-count([Table], MatchSpec) when is_atom(Table) ->
-    [{MatchPattern, Where, _Re}] = MatchSpec,
-    NMatchSpec = [{MatchPattern, Where, [true]}],
-    ets:select_count(Table, NMatchSpec);
-count(Tables, MatchSpec) ->
-    lists:sum([count(T, MatchSpec) || T <- Tables]).
+    ets:select_count(Table, NMatchSpec).
 
 page(Params) ->
     binary_to_integer(proplists:get_value(<<"_page">>, Params, <<"1">>)).

+ 13 - 1
apps/emqx_auth_mnesia/src/emqx_auth_mnesia_sup.erl

@@ -33,4 +33,16 @@ start_link() ->
 %%--------------------------------------------------------------------
 
 init([]) ->
-    {ok, {{one_for_one, 10, 100}, []}}.
+    {ok, {{one_for_one, 10, 100}, [
+        child_spec(emqx_acl_mnesia_migrator, worker, [])
+    ]}}.
+
+child_spec(M, worker, Args) ->
+    #{id       => M,
+      start    => {M, start_link, Args},
+      restart  => permanent,
+      shutdown => 5000,
+      type     => worker,
+      modules  => [M]
+     }.
+

+ 188 - 42
apps/emqx_auth_mnesia/test/emqx_acl_mnesia_SUITE.erl

@@ -22,6 +22,7 @@
 -include("emqx_auth_mnesia.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -import(emqx_ct_http, [ request_api/3
                       , request_api/5
@@ -39,10 +40,15 @@ all() ->
     emqx_ct:all(?MODULE).
 
 groups() ->
-    [].
+    [{async_migration_tests, [sequence], [
+        t_old_and_new_acl_migration_by_migrator,
+        t_old_and_new_acl_migration_repeated_by_migrator,
+        t_migration_concurrency
+    ]}].
 
 init_per_suite(Config) ->
     emqx_ct_helpers:start_apps([emqx_modules, emqx_management, emqx_auth_mnesia], fun set_special_configs/1),
+    supervisor:terminate_child(emqx_auth_mnesia_sup, emqx_acl_mnesia_migrator),
     create_default_app(),
     Config.
 
@@ -50,14 +56,32 @@ end_per_suite(_Config) ->
     delete_default_app(),
     emqx_ct_helpers:stop_apps([emqx_modules, emqx_management, emqx_auth_mnesia]).
 
-init_per_testcase(t_check_acl_as_clientid, Config) ->
+init_per_testcase_clean(_, Config) ->
+    mnesia:clear_table(?ACL_TABLE),
+    mnesia:clear_table(?ACL_TABLE2),
+    Config.
+
+init_per_testcase_emqx_hook(t_check_acl_as_clientid, Config) ->
     emqx:hook('client.check_acl', fun emqx_acl_mnesia:check_acl/5, [#{key_as => clientid}]),
     Config;
-
-init_per_testcase(_, Config) ->
+init_per_testcase_emqx_hook(_, Config) ->
     emqx:hook('client.check_acl', fun emqx_acl_mnesia:check_acl/5, [#{key_as => username}]),
     Config.
 
+init_per_testcase_migration(t_management_before_migration, Config) ->
+    Config;
+init_per_testcase_migration(_, Config) ->
+    emqx_acl_mnesia_migrator:migrate_records(),
+    Config.
+
+init_per_testcase(Case, Config) ->
+    PerTestInitializers = [
+        fun init_per_testcase_clean/2,
+        fun init_per_testcase_migration/2,
+        fun init_per_testcase_emqx_hook/2
+    ],
+    lists:foldl(fun(Init, Conf) -> Init(Case, Conf) end, Config, PerTestInitializers).
+
 end_per_testcase(_, Config) ->
     emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5),
     Config.
@@ -76,25 +100,34 @@ set_special_configs(_App) ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_management(_Config) ->
-    clean_all_acls(),
+t_management_before_migration(_Config) ->
+    {atomic, IsStarted} = mnesia:transaction(fun emqx_acl_mnesia_db:is_migration_started/0),
+    ?assertNot(IsStarted),
+    run_acl_tests().
+
+t_management_after_migration(_Config) ->
+    {atomic, IsStarted} = mnesia:transaction(fun emqx_acl_mnesia_db:is_migration_started/0),
+    ?assert(IsStarted),
+    run_acl_tests().
+
+run_acl_tests() ->
     ?assertEqual("Acl with Mnesia", emqx_acl_mnesia:description()),
-    ?assertEqual([], emqx_acl_mnesia_cli:all_acls()),
+    ?assertEqual([], emqx_acl_mnesia_db:all_acls()),
 
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>, sub, allow),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/+">>, pub, deny),
-    ok = emqx_acl_mnesia_cli:add_acl({username, <<"test_username">>}, <<"topic/%u">>, sub, deny),
-    ok = emqx_acl_mnesia_cli:add_acl({username, <<"test_username">>}, <<"topic/+">>, pub, allow),
-    ok = emqx_acl_mnesia_cli:add_acl(all, <<"#">>, pubsub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>, sub, allow),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/+">>, pub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({username, <<"test_username">>}, <<"topic/%u">>, sub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({username, <<"test_username">>}, <<"topic/+">>, pub, allow),
+    ok = emqx_acl_mnesia_db:add_acl(all, <<"#">>, pubsub, deny),
     %% Sleeps below are needed to hide the race condition between
     %% mnesia and ets dirty select in check_acl, that make this test
     %% flaky
     timer:sleep(100),
 
-    ?assertEqual(2, length(emqx_acl_mnesia_cli:lookup_acl({clientid, <<"test_clientid">>}))),
-    ?assertEqual(2, length(emqx_acl_mnesia_cli:lookup_acl({username, <<"test_username">>}))),
-    ?assertEqual(2, length(emqx_acl_mnesia_cli:lookup_acl(all))),
-    ?assertEqual(6, length(emqx_acl_mnesia_cli:all_acls())),
+    ?assertEqual(2, length(emqx_acl_mnesia_db:lookup_acl({clientid, <<"test_clientid">>}))),
+    ?assertEqual(2, length(emqx_acl_mnesia_db:lookup_acl({username, <<"test_username">>}))),
+    ?assertEqual(2, length(emqx_acl_mnesia_db:lookup_acl(all))),
+    ?assertEqual(6, length(emqx_acl_mnesia_db:all_acls())),
 
     User1 = #{zone => external, clientid => <<"test_clientid">>},
     User2 = #{zone => external, clientid => <<"no_exist">>, username => <<"test_username">>},
@@ -110,30 +143,30 @@ t_management(_Config) ->
     deny  = emqx_access_control:check_acl(User3, publish,   <<"topic/A/B">>),
 
     %% Test merging of pubsub capability:
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pubsub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pubsub, deny),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     deny  = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, allow),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, allow),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     allow = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pubsub, allow),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pubsub, allow),
     timer:sleep(100),
     allow = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     allow = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, sub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, sub, deny),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     allow = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, deny),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     deny  = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
 
     %% Test implicit migration of pubsub to pub and sub:
-    ok = emqx_acl_mnesia_cli:remove_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>),
-    ok = mnesia:dirty_write(#emqx_acl{
+    ok = emqx_acl_mnesia_db:remove_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>),
+    ok = mnesia:dirty_write(#?ACL_TABLE{
                                filter = {{clientid, <<"test_clientid">>}, <<"topic/mix">>},
                                action = pubsub,
                                access = allow,
@@ -142,24 +175,130 @@ t_management(_Config) ->
     timer:sleep(100),
     allow = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     allow = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, deny),
     timer:sleep(100),
     allow = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     deny  = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, sub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, sub, deny),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     deny  = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
 
-    ok = emqx_acl_mnesia_cli:remove_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>),
-    ok = emqx_acl_mnesia_cli:remove_acl({clientid, <<"test_clientid">>}, <<"topic/+">>),
-    ok = emqx_acl_mnesia_cli:remove_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:remove_acl({username, <<"test_username">>}, <<"topic/%u">>),
-    ok = emqx_acl_mnesia_cli:remove_acl({username, <<"test_username">>}, <<"topic/+">>),
-    ok = emqx_acl_mnesia_cli:remove_acl(all, <<"#">>),
+    ok = emqx_acl_mnesia_db:remove_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>),
+    ok = emqx_acl_mnesia_db:remove_acl({clientid, <<"test_clientid">>}, <<"topic/+">>),
+    ok = emqx_acl_mnesia_db:remove_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>),
+    ok = emqx_acl_mnesia_db:remove_acl({username, <<"test_username">>}, <<"topic/%u">>),
+    ok = emqx_acl_mnesia_db:remove_acl({username, <<"test_username">>}, <<"topic/+">>),
+    ok = emqx_acl_mnesia_db:remove_acl(all, <<"#">>),
     timer:sleep(100),
 
-    ?assertEqual([], emqx_acl_mnesia_cli:all_acls()).
+    ?assertEqual([], emqx_acl_mnesia_db:all_acls()).
+
+t_old_and_new_acl_combination(_Config) ->
+    create_conflicting_records(),
+
+    ?assertEqual(combined_conflicting_records(), emqx_acl_mnesia_db:all_acls()),
+    ?assertEqual(
+        lists:usort(combined_conflicting_records()),
+        lists:usort(emqx_acl_mnesia_db:all_acls_export())).
+
+t_old_and_new_acl_migration(_Config) ->
+    create_conflicting_records(),
+    emqx_acl_mnesia_migrator:migrate_records(),
+
+    ?assertEqual(combined_conflicting_records(), emqx_acl_mnesia_db:all_acls()),
+    ?assertEqual(
+        lists:usort(combined_conflicting_records()),
+        lists:usort(emqx_acl_mnesia_db:all_acls_export())),
+
+    % check that old table is not popoulated anymore
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>, sub, allow),
+    ?assert(emqx_acl_mnesia_migrator:is_old_table_migrated()).
+
+
+t_migration_concurrency(_Config) ->
+    Key = {{clientid,<<"client6">>}, <<"t">>},
+    Record = #?ACL_TABLE{filter = Key, action = pubsub, access = deny, created_at = 0},
+    {atomic, ok} = mnesia:transaction(fun mnesia:write/1, [Record]),
+
+    LockWaitAndDelete =
+        fun() ->
+           [_Rec] = mnesia:wread({?ACL_TABLE, Key}),
+           {{Pid, Ref}, _} =
+               ?wait_async_action(spawn_monitor(fun emqx_acl_mnesia_migrator:migrate_records/0),
+                                  #{?snk_kind := emqx_acl_mnesia_migrator_record_selected},
+                                  1000),
+           mnesia:delete({?ACL_TABLE, Key}),
+           {Pid, Ref}
+        end,
+
+    ?check_trace(
+        begin
+            {atomic, {Pid, Ref}} = mnesia:transaction(LockWaitAndDelete),
+            receive {'DOWN', Ref, process, Pid, _} -> ok end
+        end,
+        fun(_, Trace) ->
+            ?assertMatch([_], ?of_kind(emqx_acl_mnesia_migrator_record_missed, Trace))
+        end),
+
+    ?assert(emqx_acl_mnesia_migrator:is_old_table_migrated()),
+    ?assertEqual([], emqx_acl_mnesia_db:all_acls()).
+
+
+t_old_and_new_acl_migration_by_migrator(_Config) ->
+    create_conflicting_records(),
+
+    meck:new(fake_nodes, [non_strict]),
+    meck:expect(fake_nodes, all, fun() -> [node(), 'somebadnode@127.0.0.1'] end),
+
+    ?check_trace(
+        begin
+            % check all nodes every 30 ms
+            {ok, _} = emqx_acl_mnesia_migrator:start_link(#{
+                name => ct_migrator,
+                check_nodes_interval => 30,
+                get_nodes => fun fake_nodes:all/0
+            }),
+            timer:sleep(100)
+        end,
+        fun(_, Trace) ->
+            ?assertEqual([], ?of_kind(emqx_acl_mnesia_migrator_start_migration, Trace))
+        end),
+
+    ?check_trace(
+        begin
+            meck:expect(fake_nodes, all, fun() -> [node()] end),
+            timer:sleep(100)
+        end,
+        fun(_, Trace) ->
+            ?assertMatch([_], ?of_kind(emqx_acl_mnesia_migrator_finish, Trace))
+        end),
+
+    meck:unload(fake_nodes),
+
+    ?assertEqual(combined_conflicting_records(), emqx_acl_mnesia_db:all_acls()),
+    ?assert(emqx_acl_mnesia_migrator:is_old_table_migrated()).
+
+t_old_and_new_acl_migration_repeated_by_migrator(_Config) ->
+    create_conflicting_records(),
+    emqx_acl_mnesia_migrator:migrate_records(),
+
+    ?check_trace(
+        begin
+            {ok, _} = emqx_acl_mnesia_migrator:start_link(ct_migrator),
+            timer:sleep(100)
+        end,
+        fun(_, Trace) ->
+            ?assertEqual([], ?of_kind(emqx_acl_mnesia_migrator_start_migration, Trace)),
+            ?assertMatch([_], ?of_kind(emqx_acl_mnesia_migrator_finish, Trace))
+        end).
+
+t_start_stop_supervised(_Config) ->
+    ?assertEqual(undefined, whereis(emqx_acl_mnesia_migrator)),
+    ok = emqx_acl_mnesia_migrator:start_supervised(),
+    ?assert(is_pid(whereis(emqx_acl_mnesia_migrator))),
+    ok = emqx_acl_mnesia_migrator:stop_supervised(),
+    ?assertEqual(undefined, whereis(emqx_acl_mnesia_migrator)).
 
 t_acl_cli(_Config) ->
     meck:new(emqx_ctl, [non_strict, passthrough]),
@@ -168,8 +307,6 @@ t_acl_cli(_Config) ->
     meck:expect(emqx_ctl, usage, fun(Usages) -> emqx_ctl:format_usage(Usages) end),
     meck:expect(emqx_ctl, usage, fun(Cmd, Descr) -> emqx_ctl:format_usage(Cmd, Descr) end),
 
-    clean_all_acls(),
-
     ?assertEqual(0, length(emqx_acl_mnesia_cli:cli(["list"]))),
 
     emqx_acl_mnesia_cli:cli(["add", "clientid", "test_clientid", "topic/A", "pub", "deny"]),
@@ -202,8 +339,6 @@ t_acl_cli(_Config) ->
     meck:unload(emqx_ctl).
 
 t_rest_api(_Config) ->
-    clean_all_acls(),
-
     Params1 = [#{<<"clientid">> => <<"test_clientid">>,
                  <<"topic">> => <<"topic/A">>,
                  <<"action">> => <<"pub">>,
@@ -273,13 +408,24 @@ t_rest_api(_Config) ->
     {ok, Res3} = request_http_rest_list(["$all"]),
     ?assertMatch([], get_http_data(Res3)).
 
-%%------------------------------------------------------------------------------
-%% Helpers
-%%------------------------------------------------------------------------------
 
-clean_all_acls() ->
-    [ mnesia:dirty_delete({emqx_acl, Login})
-      || Login <- mnesia:dirty_all_keys(emqx_acl)].
+create_conflicting_records() ->
+    Records = [
+        #?ACL_TABLE{filter = {{clientid,<<"client6">>}, <<"t">>}, action = pubsub, access = deny, created_at = 0},
+        #?ACL_TABLE{filter = {{clientid,<<"client5">>}, <<"t">>}, action = pubsub, access = deny, created_at = 1},
+        #?ACL_TABLE2{who = {clientid,<<"client5">>}, rules = [{allow, sub, <<"t">>, 2}]}
+    ],
+    mnesia:transaction(fun() -> lists:foreach(fun mnesia:write/1, Records) end).
+
+
+combined_conflicting_records() ->
+    % pubsub's are split, ACL_TABLE2 rules shadow ACL_TABLE rules
+    [
+        {{clientid,<<"client5">>},<<"t">>,sub,allow,2},
+        {{clientid,<<"client5">>},<<"t">>,pub,deny,1},
+        {{clientid,<<"client6">>},<<"t">>,sub,deny,0},
+        {{clientid,<<"client6">>},<<"t">>,pub,deny,0}
+    ].
 
 %%--------------------------------------------------------------------
 %% HTTP Request

+ 1 - 1
apps/emqx_lwm2m/etc/emqx_lwm2m.conf

@@ -146,4 +146,4 @@ lwm2m.dtls.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,E
 ## Note that 'lwm2m.dtls.ciphers' and 'lwm2m.dtls.psk_ciphers' cannot
 ## be configured at the same time.
 ## See 'https://tools.ietf.org/html/rfc4279#section-2'.
-#lwm2m.dtls.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
+#lwm2m.dtls.psk_ciphers = RSA-PSK-AES256-GCM-SHA384,RSA-PSK-AES256-CBC-SHA384,RSA-PSK-AES128-GCM-SHA256,RSA-PSK-AES128-CBC-SHA256,RSA-PSK-AES256-CBC-SHA,RSA-PSK-AES128-CBC-SHA

+ 9 - 8
apps/emqx_lwm2m/priv/emqx_lwm2m.schema

@@ -185,7 +185,7 @@ end}.
   OldCert = cuttlefish:conf_get("lwm2m.certfile", Conf, undefined),
 
   %% Ciphers
-  SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
+  SplitFun = fun(undefined) -> []; (S) -> string:tokens(S, ",") end,
   Ciphers =
       case cuttlefish:conf_get("lwm2m.dtls.ciphers", Conf, undefined) of
           undefined ->
@@ -198,16 +198,17 @@ end}.
           undefined ->
               [];
           C2 ->
-              Psk = lists:map(fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
-                                     ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
-                                     ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
-                                     ("PSK-RC4-SHA") -> {psk, rc4_128, sha}
-                                  end, SplitFun(C2)),
+              Psk = lists:map(fun("PSK-AES128-CBC-SHA") -> "RSA-PSK-AES128-CBC-SHA";
+                                 ("PSK-AES256-CBC-SHA") -> "RSA-PSK-AES256-CBC-SHA";
+                                 ("PSK-3DES-EDE-CBC-SHA") -> "RSA-PSK-3DES-EDE-CBC-SHA";
+                                 ("PSK-RC4-SHA") -> "RSA-PSK-RC4-SHA";
+                                 (Suite) -> Suite
+                              end, SplitFun(C2)),
               [{ciphers, Psk}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}]
       end,
   Ciphers /= []
-      andalso PskCiphers /= []
-         andalso cuttlefish:invalid("The 'lwm2m.dtls.ciphers' and 'lwm2m.dtls.psk_ciphers' cannot exist simultaneously."),
+  andalso PskCiphers /= []
+  andalso cuttlefish:invalid("The 'lwm2m.dtls.ciphers' and 'lwm2m.dtls.psk_ciphers' cannot coexist"),
 
   NCiphers = Ciphers ++ PskCiphers,
 

+ 1 - 1
apps/emqx_lwm2m/src/emqx_lwm2m.app.src

@@ -1,6 +1,6 @@
 {application,emqx_lwm2m,
              [{description,"EMQ X LwM2M Gateway"},
-              {vsn, "4.3.3"}, % strict semver, bump manually!
+              {vsn, "4.3.4"}, % strict semver, bump manually!
               {modules,[]},
               {registered,[emqx_lwm2m_sup]},
               {applications,[kernel,stdlib,lwm2m_coap]},

+ 7 - 5
apps/emqx_lwm2m/src/emqx_lwm2m.appup.src

@@ -1,19 +1,21 @@
 %% -*-: erlang -*-
-{"4.3.3",
+{"4.3.4",
   [
-    {<<"4.3.[0-1]">>, [
+    {<<"4\\.3\\.[0-1]">>, [
       {restart_application, emqx_lwm2m}
     ]},
     {"4.3.2", [
       {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
-    ]}
+    ]},
+    {"4.3.3", []} %% only config change
   ],
   [
-    {<<"4.3.[0-1]">>, [
+    {<<"4\\.3\\.[0-1]">>, [
       {restart_application, emqx_lwm2m}
     ]},
     {"4.3.2", [
       {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
-    ]}
+    ]},
+    {"4.3.3", []} %% only config change
   ]
 }.

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -1,6 +1,6 @@
 {application, emqx_management,
  [{description, "EMQ X Management API and CLI"},
-  {vsn, "4.3.7"}, % strict semver, bump manually!
+  {vsn, "4.3.8"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_management_sup]},
   {applications, [kernel,stdlib,minirest]},

+ 2 - 2
apps/emqx_management/src/emqx_management.appup.src

@@ -1,13 +1,13 @@
 %% -*- mode: erlang -*-
 {VSN,
- [ {<<"4.3.[0-6]">>,
+ [ {<<"4\\.3\\.[0-9]+">>,
     [ {apply,{minirest,stop_http,['http:management']}},
       {apply,{minirest,stop_http,['https:management']}},
       {restart_application, emqx_management}
     ]},
    {<<".*">>, []}
  ],
- [ {<<"4.3.[0-6]">>,
+ [ {<<"4\\.3\\.[0-9]+">>,
     [ {apply,{minirest,stop_http,['http:management']}},
       {apply,{minirest,stop_http,['https:management']}},
       {restart_application, emqx_management}

+ 9 - 1
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -334,7 +334,7 @@ query({Qs, Fuzzy}, Start, Limit) ->
 match_fun(Ms, Fuzzy) ->
     MsC = ets:match_spec_compile(Ms),
     REFuzzy = lists:map(fun({K, like, S}) ->
-                  {ok, RE} = re:compile(S),
+                  {ok, RE} = re:compile(escape(S)),
                   {K, like, RE}
               end, Fuzzy),
     fun(Rows) ->
@@ -347,6 +347,9 @@ match_fun(Ms, Fuzzy) ->
          end
     end.
 
+escape(B) when is_binary(B) ->
+    re:replace(B, <<"\\\\">>, <<"\\\\\\\\">>, [{return, binary}, global]).
+
 run_fuzzy_match(_, []) ->
     true;
 run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) ->
@@ -450,4 +453,9 @@ params2qs_test() ->
 
     [{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]).
 
+escape_test() ->
+    Str = <<"\\n">>,
+    {ok, Re} = re:compile(escape(Str)),
+    {match, _} = re:run(<<"\\name">>, Re).
+
 -endif.

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_pubsub.erl

@@ -158,7 +158,7 @@ do_subscribe(ClientId, Topics, QoS) ->
         _ -> ok
     end.
 
-do_publish(ClientId, _Topics, _Qos, _Retain, _Payload) when not is_binary(ClientId) ->
+do_publish(ClientId, _Topics, _Qos, _Retain, _Payload) when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
     {ok, ?ERROR8, <<"bad clientid: must be string">>};
 do_publish(_ClientId, [], _Qos, _Retain, _Payload) ->
     {ok, ?ERROR15, bad_topic};

+ 10 - 11
apps/emqx_management/src/emqx_mgmt_data_backup.erl

@@ -118,18 +118,18 @@ export_auth_mnesia() ->
     end.
 
 export_acl_mnesia() ->
-    case ets:info(emqx_acl) of
+    case ets:info(emqx_acl2) of
         undefined -> [];
         _ ->
-            lists:map(fun({_, Filter, Action, Access, CreatedAt}) ->
-                          Filter1 = case Filter of
-                              {{Type, TypeValue}, Topic} ->
+            lists:map(fun({Login, Topic, Action, Access, CreatedAt}) ->
+                          Filter1 = case Login of
+                              {Type, TypeValue} ->
                                   [{type, Type}, {type_value, TypeValue}, {topic, Topic}];
-                              {Type, Topic} ->
+                              Type ->
                                   [{type, Type}, {topic, Topic}]
                           end,
                           Filter1 ++ [{action, Action}, {access, Access}, {created_at, CreatedAt}]
-                      end, ets:tab2list(emqx_acl))
+                      end, emqx_acl_mnesia_db:all_acls_export())
     end.
 
 -ifdef(EMQX_ENTERPRISE).
@@ -473,10 +473,9 @@ do_import_auth_mnesia(Auths) ->
     end.
 
 do_import_acl_mnesia_by_old_data(Acls) ->
-    case ets:info(emqx_acl) of
+    case ets:info(emqx_acl2) of
         undefined -> ok;
         _ ->
-            CreatedAt = erlang:system_time(millisecond),
             lists:foreach(fun(#{<<"login">> := Login,
                                 <<"topic">> := Topic,
                                 <<"allow">> := Allow,
@@ -485,11 +484,11 @@ do_import_acl_mnesia_by_old_data(Acls) ->
                                          true -> allow;
                                          false -> deny
                                      end,
-                            mnesia:dirty_write({emqx_acl, {{get_old_type(), Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt})
+                            emqx_acl_mnesia_db:add_acl({get_old_type(), Login}, Topic, any_to_atom(Action), Allow1)
                           end, Acls)
     end.
 do_import_acl_mnesia(Acls) ->
-    case ets:info(emqx_acl) of
+    case ets:info(emqx_acl2) of
         undefined -> ok;
         _ ->
             lists:foreach(fun(Map = #{<<"action">> := Action,
@@ -501,7 +500,7 @@ do_import_acl_mnesia(Acls) ->
                                 Value ->
                                     {any_to_atom(maps:get(<<"type">>, Map)), Value}
                             end,
-                            emqx_acl_mnesia_cli:add_acl(Login, Topic, any_to_atom(Action), any_to_atom(Access))
+                            emqx_acl_mnesia_db:add_acl(Login, Topic, any_to_atom(Action), any_to_atom(Access))
                           end, Acls)
     end.
 

+ 41 - 15
apps/emqx_management/test/emqx_auth_mnesia_migration_SUITE.erl

@@ -30,7 +30,7 @@ matrix() ->
                           , Version <- ["v4.2.10", "v4.1.5"]].
 
 all() ->
-    [t_import_4_0, t_import_4_1, t_import_4_2].
+    [t_import_4_0, t_import_4_1, t_import_4_2, t_export_import].
 
 groups() ->
     [{username, [], cases()}, {clientid, [], cases()}].
@@ -52,7 +52,8 @@ init_per_testcase(_, Config) ->
     Config.
 
 end_per_testcase(_, _Config) ->
-    {atomic,ok} = mnesia:clear_table(emqx_acl),
+    {atomic,ok} = mnesia:clear_table(?ACL_TABLE),
+    {atomic,ok} = mnesia:clear_table(?ACL_TABLE2),
     {atomic,ok} = mnesia:clear_table(emqx_user),
     ok.
 -ifdef(EMQX_ENTERPRISE).
@@ -138,25 +139,50 @@ t_import_4_2(Config) ->
     test_import(clientid, {<<"client_for_test">>, <<"public">>}),
     test_import(username, {<<"user_for_test">>, <<"public">>}),
 
-    ?assertMatch([#emqx_acl{
-                     filter = {{Type,<<"emqx_c">>}, <<"Topic/A">>},
-                     action = pub,
-                     access = allow
-                    },
-                  #emqx_acl{
-                     filter = {{Type,<<"emqx_c">>}, <<"Topic/A">>},
-                     action = sub,
-                     access = allow
-                    }],
-                 lists:sort(ets:tab2list(emqx_acl))).
+    ?assertMatch([
+            {{username, <<"emqx_c">>}, <<"Topic/A">>, pub, allow, _},
+            {{username, <<"emqx_c">>}, <<"Topic/A">>, sub, allow, _}
+        ],
+        lists:sort(emqx_acl_mnesia_db:all_acls())).
 -endif.
 
+t_export_import(_Config) ->
+    emqx_acl_mnesia_migrator:migrate_records(),
+
+    Records = [
+        #?ACL_TABLE2{who = {clientid,<<"client1">>}, rules = [{allow, sub, <<"t1">>, 1}]},
+        #?ACL_TABLE2{who = {clientid,<<"client2">>}, rules = [{allow, pub, <<"t2">>, 2}]}
+    ],
+    mnesia:transaction(fun() -> lists:foreach(fun mnesia:write/1, Records) end),
+    timer:sleep(100),
+
+    AclData = emqx_json:encode(emqx_mgmt_data_backup:export_acl_mnesia()),
+
+    mnesia:transaction(fun() ->
+                          lists:foreach(fun(#?ACL_TABLE2{who = Who}) ->
+                                           mnesia:delete({?ACL_TABLE2, Who})
+                                        end,
+                                        Records)
+                       end),
+
+    ?assertEqual([], emqx_acl_mnesia_db:all_acls()),
+
+    emqx_mgmt_data_backup:import_acl_mnesia(emqx_json:decode(AclData, [return_maps]), "4.3"),
+    timer:sleep(100),
+
+    ?assertMatch([
+        {{clientid, <<"client1">>}, <<"t1">>, sub, allow, _},
+        {{clientid, <<"client2">>}, <<"t2">>, pub, allow, _}
+    ], lists:sort(emqx_acl_mnesia_db:all_acls())).
+
 do_import(File, Config) ->
     do_import(File, Config, "{}").
 
 do_import(File, Config, Overrides) ->
-    mnesia:clear_table(emqx_acl),
+    mnesia:clear_table(?ACL_TABLE),
+    mnesia:clear_table(?ACL_TABLE2),
     mnesia:clear_table(emqx_user),
+    emqx_acl_mnesia_migrator:migrate_records(),
     Filename = filename:join(proplists:get_value(data_dir, Config), File),
     emqx_mgmt_data_backup:import(Filename, Overrides).
 
@@ -172,4 +198,4 @@ test_import(clientid, {ClientID, Password}) ->
     Req = #{clientid => ClientID,
             password => Password},
     ?assertMatch({stop, #{auth_result := success}},
-                 emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})).
+                 emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})).

+ 15 - 2
apps/emqx_management/test/emqx_mgmt_api_SUITE.erl

@@ -447,6 +447,19 @@ t_pubsub(_) ->
             after 100 ->
                     false
             end),
+
+    % no clientid
+    {ok, Code} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(),
+                             #{<<"topic">> => <<"mytopic">>,
+                               <<"qos">> => 1,
+                               <<"payload">> => <<"hello">>}),
+    ?assert(receive
+                {publish, #{payload := <<"hello">>}} ->
+                    true
+            after 100 ->
+                    false
+            end),
+
     %% json payload
     {ok, Code} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(),
                              #{<<"clientid">> => ClientId,
@@ -491,9 +504,9 @@ t_pubsub(_) ->
 
     ok = emqtt:disconnect(C1),
 
-    ?assertEqual(2, emqx_metrics:val('messages.qos1.received') - Qos1Received),
+    ?assertEqual(3, emqx_metrics:val('messages.qos1.received') - Qos1Received),
     ?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received),
-    ?assertEqual(4, emqx_metrics:val('messages.received') - Received).
+    ?assertEqual(5, emqx_metrics:val('messages.received') - Received).
 
 loop([]) -> [];
 

+ 19 - 27
apps/emqx_web_hook/src/emqx_web_hook.appup.src

@@ -1,28 +1,20 @@
-%% -*-: erlang -*-
-
+%% -*- mode: erlang -*-
 {VSN,
-  [
-    {<<"4.3.[0-2]">>, [
-     {apply, {application, stop,[emqx_web_hook]}},
-     {load_module, emqx_web_hook_app, brutal_purge, soft_purge, []},
-     {load_module, emqx_web_hook, brutal_purge, soft_purge, []},
-     {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
-    ]},
-    {<<"4.3.[3-5]">>, [
-      {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
-    ]},
-    {<<".*">>, []}
-  ],
-  [
-    {<<"4.3.[0-2]">>, [
-     {apply, {application, stop, [emqx_web_hook]}},
-     {load_module, emqx_web_hook_app, brutal_purge, soft_purge, []},
-     {load_module, emqx_web_hook, brutal_purge, soft_purge, []},
-     {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
-    ]},
-    {<<"4.3.[3-5]">>, [
-      {load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
-    ]},
-    {<<".*">>, []}
-  ]
-}.
+  [{"4.3.5",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
+   {<<"4.3.[0-2]">>,
+    [{apply,{application,stop,[emqx_web_hook]}},
+     {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
+     {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
+     {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
+   {<<"4.3.[3-4]">>,
+    [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
+   {<<".*">>,[]}],
+  [{"4.3.5",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
+   {<<"4.3.[0-2]">>,
+    [{apply,{application,stop,[emqx_web_hook]}},
+     {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
+     {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
+     {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
+   {<<"4.3.[3-4]">>,
+    [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
+   {<<".*">>,[]}]}.

+ 16 - 6
bin/emqx

@@ -238,12 +238,22 @@ generate_config() {
         sed '/^#/d' "$CUTTLE_GEN_ARG_FILE" | sed '/^$/d' | while IFS='' read -r ARG_LINE || [ -n "$ARG_LINE" ]; do
             ARG_KEY=$(echo "$ARG_LINE" | awk '{$NF="";print}')
             ARG_VALUE=$(echo "$ARG_LINE" | awk '{print $NF}')
-            TMP_ARG_VALUE=$(grep "^$ARG_KEY" "$TMP_ARG_FILE" | awk '{print $NF}')
-            if [ "$ARG_VALUE" != "$TMP_ARG_VALUE" ] ; then
-                if [ -n "$TMP_ARG_VALUE" ]; then
-                    sh -c "$SED_REPLACE 's/^$ARG_KEY.*$/$ARG_LINE/' $TMP_ARG_FILE"
-                else
-                    echo "$ARG_LINE" >> "$TMP_ARG_FILE"
+            if [ "$ARG_KEY" =  '' ]; then
+                ## for the flags, e.g. -heart -emu_args etc
+                ARG_KEY=$(echo "$ARG_LINE" | awk '{print $1}')
+                ARG_VALUE=''
+                TMP_ARG_KEY=$(grep "^$ARG_KEY" "$TMP_ARG_FILE" | awk '{print $1}')
+                if [ "$TMP_ARG_KEY" = '' ]; then
+                    echo "$ARG_KEY" >> "$TMP_ARG_FILE"
+                fi
+            else
+                TMP_ARG_VALUE=$(grep "^$ARG_KEY" "$TMP_ARG_FILE" | awk '{print $NF}')
+                if [ "$ARG_VALUE" != "$TMP_ARG_VALUE" ] ; then
+                    if [ -n "$TMP_ARG_VALUE" ]; then
+                        sh -c "$SED_REPLACE 's/^$ARG_KEY.*$/$ARG_LINE/' $TMP_ARG_FILE"
+                    else
+                        echo "$ARG_LINE" >> "$TMP_ARG_FILE"
+                    fi
                 fi
             fi
         done

+ 10 - 0
etc/emqx.conf

@@ -199,6 +199,16 @@ node.data_dir = {{ platform_data_dir }}
 ## Heartbeat monitoring of an Erlang runtime system. Comment the line to disable
 ## heartbeat, or set the value as 'on'
 ##
+## Turning this on may cause the node to restart if it becomes unresponsive to
+## the heartbeat pings.
+##
+## NOTE: When managed by systemd (or other supervision tools like systemd),
+##       heart will probably only cause EMQ X to stop, but restart or not will
+##       depend on systemd's restart strategy.
+## NOTE: When running in docker, the container will die as soon as the the
+##       heart process kills EMQ X, but restart or not will depend on container
+##       supervision strategy, such as k8s restartPolicy.
+##
 ## Value: on
 ##
 ## vm.args: -heart

+ 2 - 2
lib-ce/emqx_dashboard/src/emqx_dashboard.appup.src

@@ -1,6 +1,6 @@
 %% -*- mode: erlang -*-
 {VSN,
- [ {<<"4.3.[0-4]">>,
+ [ {<<"4.3.[0-9]">>,
     %% load all plugins
     %% NOTE: this depends on the fact that emqx_dashboard is always
     %% the last application gets upgraded
@@ -10,7 +10,7 @@
     ]},
    {<<".*">>, []}
  ],
- [ {<<"4.3.[0-4]">>,
+ [ {<<"4.3.[0-9]">>,
     [ {apply, {emqx_rule_engine, load_providers, []}}
     , {restart_application, emqx_dashboard}
     , {apply, {emqx_plugins, load, []}}

+ 1 - 1
rebar.config

@@ -55,7 +55,7 @@
     , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
     , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
     , {getopt, "1.0.1"}
-    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
+    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}}
     ]}.
 
 {xref_ignores,

+ 1 - 1
scripts/apps-version-check.sh

@@ -18,7 +18,7 @@ while read -r app; do
         changed="$(git diff --name-only "$latest_release"...HEAD \
                     -- "$app_path/src" \
                     -- "$app_path/priv" \
-                    -- "$app_path/c_src" | wc -l)"
+                    -- "$app_path/c_src" | { grep -v -E 'appup\.src' || true; } | wc -l)"
         if [ "$changed" -gt 0 ]; then
             echo "$src_file needs a vsn bump"
             bad_app_count=$(( bad_app_count + 1))

+ 106 - 0
scripts/one-more-emqx-ee.sh

@@ -0,0 +1,106 @@
+#!/bin/bash
+# shellcheck disable=2090
+###############
+## args and env validation
+###############
+
+if ! [ -d "emqx" ]; then
+  echo "[error] this script must be run at the same dir as the emqx"
+  exit 1
+fi
+
+if [ $# -eq 0 ]
+  then
+    echo "[error] a new emqx name should be provided!"
+    echo "Usage: ./one_more_emqx <new_name>"
+    echo "  e.g. ./one_more_emqx emqx2"
+    exit 1
+fi
+
+NEW_EMQX=$1
+if [ -d "$NEW_EMQX" ]; then
+  echo "[error] a dir named ${NEW_EMQX} already exists!"
+  exit 2
+fi
+echo creating "$NEW_EMQX" ...
+
+SED_REPLACE="sed -i "
+# shellcheck disable=2089
+case $(sed --help 2>&1) in
+    *GNU*) SED_REPLACE="sed -i ";;
+    *) SED_REPLACE="sed -i ''";;
+esac
+
+PORT_INC_=$(cksum <<< "$NEW_EMQX" | cut -f 1 -d ' ')
+PORT_INC=$((PORT_INC_ % 1000))
+echo using increment factor: $PORT_INC
+
+###############
+## helpers
+###############
+process_emqx_conf() {
+  echo "processing config file: $1"
+  $SED_REPLACE '/^#/d' "$1"
+  $SED_REPLACE '/^$/d' "$1"
+
+  for entry_ in "${entries_to_be_inc[@]}"
+  do
+    echo inc port for "$entry_"
+    ip_port_=$(grep -E "$entry_"'[ \t]*=' "$1" 2> /dev/null | tail -1 | cut -d = -f 2- | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')
+    echo -- from: "$ip_port_"
+    ip_=$(echo "$ip_port_" | cut -sd : -f 1)
+    port_=$(echo "$ip_port_" | cut -sd : -f 2)
+    if [ -z "$ip_" ]
+      then
+        new_ip_port=$(( ip_port_ + PORT_INC ))
+      else
+        new_ip_port="${ip_}:$(( port_ + PORT_INC ))"
+    fi
+    echo -- to: "$new_ip_port"
+    $SED_REPLACE 's|'"$entry_"'[ \t]*=.*|'"$entry_"' = '"$new_ip_port"'|g' "$1"
+  done
+}
+
+###############
+## main
+###############
+
+cp -r emqx "$NEW_EMQX"
+
+## change the rpc ports
+$SED_REPLACE 's|tcp_server_port[ \t]*=.*|tcp_server_port = 5369|g' emqx/etc/rpc.conf
+$SED_REPLACE 's|tcp_client_port[ \t]*=.*|tcp_client_port = 5370|g' emqx/etc/rpc.conf
+$SED_REPLACE 's|tcp_client_port[ \t]*=.*|tcp_client_port = 5369|g' "$NEW_EMQX/etc/rpc.conf"
+$SED_REPLACE 's|tcp_server_port[ \t]*=.*|tcp_server_port = 5370|g' "$NEW_EMQX/etc/rpc.conf"
+$SED_REPLACE 's|.*node\.name.*|node.name='"$NEW_EMQX"'@127.0.0.1|g' "$NEW_EMQX/etc/emqx.conf"
+
+conf_ext="*.conf"
+
+find "$NEW_EMQX" -name "${conf_ext}" | while read -r conf; do
+    if [ "${conf##*/}" = 'emqx.conf' ]
+      then
+        declare -a entries_to_be_inc=("node.dist_listen_min"
+                                      "node.dist_listen_max")
+        process_emqx_conf "$conf" "${entries_to_be_inc[@]}"
+    elif [ "${conf##*/}" = 'listeners.conf' ]
+      then
+        declare -a entries_to_be_inc=("listener.tcp.external"
+                                      "listener.tcp.internal"
+                                      "listener.ssl.external"
+                                      "listener.ws.external"
+                                      "listener.wss.external")
+        process_emqx_conf "$conf" "${entries_to_be_inc[@]}"
+    elif [ "${conf##*/}" = 'emqx_management.conf' ]
+      then
+        declare -a entries_to_be_inc=("management.listener.http"
+                                      "management.listener.https")
+        process_emqx_conf "$conf" "${entries_to_be_inc[@]}"
+    elif [ "${conf##*/}" = 'emqx_dashboard.conf' ]
+      then
+        declare -a entries_to_be_inc=("dashboard.listener.http"
+                                      "dashboard.listener.https")
+        process_emqx_conf "$conf" "${entries_to_be_inc[@]}"
+    else
+        echo "."
+    fi
+done

+ 102 - 0
scripts/one-more-emqx.sh

@@ -0,0 +1,102 @@
+#!/bin/bash
+# shellcheck disable=2090
+###############
+## args and env validation
+###############
+
+if ! [ -d "emqx" ]; then
+  echo "[error] this script must be run at the same dir as the emqx"
+  exit 1
+fi
+
+if [ $# -eq 0 ]
+  then
+    echo "[error] a new emqx name should be provided!"
+    echo "Usage: ./one_more_emqx <new_name>"
+    echo "  e.g. ./one_more_emqx emqx2"
+    exit 1
+fi
+
+NEW_EMQX=$1
+if [ -d "$NEW_EMQX" ]; then
+  echo "[error] a dir named ${NEW_EMQX} already exists!"
+  exit 2
+fi
+echo creating "$NEW_EMQX" ...
+
+SED_REPLACE="sed -i "
+# shellcheck disable=2089
+case $(sed --help 2>&1) in
+    *GNU*) SED_REPLACE="sed -i ";;
+    *) SED_REPLACE="sed -i ''";;
+esac
+
+PORT_INC_=$(cksum <<< "$NEW_EMQX" | cut -f 1 -d ' ')
+PORT_INC=$((PORT_INC_ % 1000))
+echo using increment factor: "$PORT_INC"
+
+###############
+## helpers
+###############
+process_emqx_conf() {
+  echo "processing config file: $1"
+  $SED_REPLACE '/^#/d' "$1"
+  $SED_REPLACE '/^$/d' "$1"
+  $SED_REPLACE 's|.*node\.name.*|node.name='"$NEW_EMQX"'@127.0.0.1|g' "$1"
+
+  for entry_ in "${entries_to_be_inc[@]}"
+  do
+    echo inc port for "$entry_"
+    ip_port_=$(grep -E "$entry_"'[ \t]*=' "$1" 2> /dev/null | tail -1 | cut -d = -f 2- | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')
+    echo -- from: "$ip_port_"
+    ip_=$(echo "$ip_port_" | cut -sd : -f 1)
+    port_=$(echo "$ip_port_" | cut -sd : -f 2)
+    if [ -z "$ip_" ]
+      then
+        new_ip_port=$(( ip_port_ + PORT_INC ))
+      else
+        new_ip_port="${ip_}:$(( port_ + PORT_INC ))"
+    fi
+    echo -- to: "$new_ip_port"
+    $SED_REPLACE 's|'"$entry_"'[ \t]*=.*|'"$entry_"' = '"$new_ip_port"'|g' "$1"
+  done
+}
+
+###############
+## main
+###############
+
+cp -r emqx "$NEW_EMQX"
+
+## change the rpc ports
+$SED_REPLACE 's|tcp_server_port[ \t]*=.*|tcp_server_port = 5369|g' emqx/etc/emqx.conf
+$SED_REPLACE 's|tcp_client_port[ \t]*=.*|tcp_client_port = 5370|g' emqx/etc/emqx.conf
+$SED_REPLACE 's|tcp_client_port[ \t]*=.*|tcp_client_port = 5369|g' "$NEW_EMQX/etc/emqx.conf"
+$SED_REPLACE 's|tcp_server_port[ \t]*=.*|tcp_server_port = 5370|g' "$NEW_EMQX/etc/emqx.conf"
+
+conf_ext="*.conf"
+find "$NEW_EMQX" -name "${conf_ext}" | while read -r conf; do
+    if [ "${conf##*/}" = 'emqx.conf' ]
+      then
+        declare -a entries_to_be_inc=("node.dist_listen_min"
+                                      "dist_listen_max"
+                                      "listener.tcp.external"
+                                      "listener.tcp.internal"
+                                      "listener.ssl.external"
+                                      "listener.ws.external"
+                                      "listener.wss.external")
+        process_emqx_conf "$conf" "${entries_to_be_inc[@]}"
+    elif [ "${conf##*/}" = 'emqx_management.conf' ]
+      then
+        declare -a entries_to_be_inc=("management.listener.http"
+                                      "management.listener.https")
+        process_emqx_conf "$conf" "${entries_to_be_inc[@]}"
+    elif [ "${conf##*/}" = 'emqx_dashboard.conf' ]
+      then
+        declare -a entries_to_be_inc=("dashboard.listener.http"
+                                      "dashboard.listener.https")
+        process_emqx_conf "$conf" "${entries_to_be_inc[@]}"
+    else
+        echo "."
+    fi
+done

+ 95 - 39
scripts/update_appup.escript

@@ -25,14 +25,16 @@ Usage:
 
 Options:
 
-  --check         Don't update the appfile, just check that they are complete
-  --prev-tag      Specify the previous release tag. Otherwise the previous patch version is used
-  --repo          Upsteam git repo URL
-  --remote        Get upstream repo URL from the specified git remote
-  --skip-build    Don't rebuild the releases. May produce wrong results
-  --make-command  A command used to assemble the release
-  --release-dir   Release directory
-  --src-dirs      Directories where source code is found. Defaults to '{src,apps,lib-*}/**/'
+  --check           Don't update the appfile, just check that they are complete
+  --prev-tag        Specify the previous release tag. Otherwise the previous patch version is used
+  --repo            Upsteam git repo URL
+  --remote          Get upstream repo URL from the specified git remote
+  --skip-build      Don't rebuild the releases. May produce wrong results
+  --make-command    A command used to assemble the release
+  --release-dir     Release directory
+  --src-dirs        Directories where source code is found. Defaults to '{src,apps,lib-*}/**/'
+  --binary-rel-url  Binary release URL pattern. %TAG% variable is substituted with the release tag.
+                    E.g. \"https://github.com/emqx/emqx/releases/download/v4.3.8/emqx-centos7-%TAG%-amd64.zip\"
 ".
 
 -record(app,
@@ -41,18 +43,26 @@ Options:
         }).
 
 default_options() ->
-    #{ clone_url    => find_upstream_repo("origin")
-     , make_command => "make emqx-rel"
-     , beams_dir    => "_build/emqx/rel/emqx/lib/"
-     , check        => false
-     , prev_tag     => undefined
-     , src_dirs     => "{src,apps,lib-*}/**/"
+    #{ clone_url      => find_upstream_repo("origin")
+     , make_command   => "make emqx-rel"
+     , beams_dir      => "_build/emqx/rel/emqx/lib/"
+     , check          => false
+     , prev_tag       => undefined
+     , src_dirs       => "{src,apps,lib-*}/**/"
+     , binary_rel_url => undefined
      }.
 
+%% App-specific actions that should be added unconditionally to any update/downgrade:
+app_specific_actions(_) ->
+    [].
+
+ignored_apps() ->
+    [emqx_dashboard, emqx_management] ++ otp_standard_apps().
+
 main(Args) ->
     #{current_release := CurrentRelease} = Options = parse_args(Args, default_options()),
     init_globals(Options),
-    case find_pred_tag(CurrentRelease) of
+    case find_prev_tag(CurrentRelease) of
         {ok, Baseline} ->
             main(Options, Baseline);
         undefined ->
@@ -78,6 +88,8 @@ parse_args(["--src-dirs", Pattern|Rest], State) ->
     parse_args(Rest, State#{src_dirs => Pattern});
 parse_args(["--prev-tag", Tag|Rest], State) ->
     parse_args(Rest, State#{prev_tag => Tag});
+parse_args(["--binary-rel-url", URL|Rest], State) ->
+    parse_args(Rest, State#{binary_rel_url => {ok, URL}});
 parse_args(_, _) ->
     fail(usage()).
 
@@ -88,7 +100,7 @@ main(Options, Baseline) ->
         "~n===================================~n"),
     CurrAppsIdx = index_apps(CurrRelDir),
     PrevAppsIdx = index_apps(PrevRelDir),
-    %% log("Curr: ~p~nPrev: ~p~n", [CurrApps, PrevApps]),
+    %% log("Curr: ~p~nPrev: ~p~n", [CurrAppsIdx, PrevAppsIdx]),
     AppupChanges = find_appup_actions(CurrAppsIdx, PrevAppsIdx),
     case getopt(check) of
         true ->
@@ -115,17 +127,25 @@ warn_and_exit(false) ->
     log("~nERROR: Incomplete appups found. Please inspect the output for more details.~n"),
     halt(1).
 
-prepare(Baseline, Options = #{make_command := MakeCommand, beams_dir := BeamDir}) ->
+prepare(Baseline, Options = #{make_command := MakeCommand, beams_dir := BeamDir, binary_rel_url := BinRel}) ->
     log("~n===================================~n"
         "Baseline: ~s"
         "~n===================================~n", [Baseline]),
     log("Building the current version...~n"),
     bash(MakeCommand),
     log("Downloading and building the previous release...~n"),
-    {ok, PrevRootDir} = build_pred_release(Baseline, Options),
-    {BeamDir, filename:join(PrevRootDir, BeamDir)}.
+    PrevRelDir =
+        case BinRel of
+            undefined ->
+                {ok, PrevRootDir} = build_prev_release(Baseline, Options),
+                filename:join(PrevRootDir, BeamDir);
+            {ok, _URL} ->
+                {ok, PrevRootDir} = download_prev_release(Baseline, Options),
+                PrevRootDir
+        end,
+    {BeamDir, PrevRelDir}.
 
-build_pred_release(Baseline, #{clone_url := Repo, make_command := MakeCommand}) ->
+build_prev_release(Baseline, #{clone_url := Repo, make_command := MakeCommand}) ->
     BaseDir = "/tmp/emqx-baseline/",
     Dir = filename:basename(Repo, ".git") ++ [$-|Baseline],
     %% TODO: shallow clone
@@ -137,10 +157,22 @@ build_pred_release(Baseline, #{clone_url := Repo, make_command := MakeCommand})
     bash(Script, Env),
     {ok, filename:join(BaseDir, Dir)}.
 
+download_prev_release(Tag, #{binary_rel_url := {ok, URL0}, clone_url := Repo}) ->
+    URL = string:replace(URL0, "%TAG%", Tag, all),
+    BaseDir = "/tmp/emqx-baseline-bin/",
+    Dir = filename:basename(Repo, ".git") ++ [$-|Tag],
+    Filename = filename:join(BaseDir, Dir),
+    Script = "mkdir -p ${OUTFILE} &&
+              { [ -f ${OUTFILE}.zip ] || wget -O ${OUTFILE}.zip ${URL}; } &&
+              unzip -n -d ${OUTFILE} ${OUTFILE}.zip",
+    Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}],
+    bash(Script, Env),
+    {ok, Filename}.
+
 find_upstream_repo(Remote) ->
     string:trim(os:cmd("git remote get-url " ++ Remote)).
 
-find_pred_tag(CurrentRelease) ->
+find_prev_tag(CurrentRelease) ->
     case getopt(prev_tag) of
         undefined ->
             {Maj, Min, Patch} = parse_semver(CurrentRelease),
@@ -172,8 +204,8 @@ find_appup_actions(_App, AppIdx, AppIdx) ->
     [];
 find_appup_actions(App, CurrAppIdx, PrevAppIdx = #app{version = PrevVersion}) ->
     {OldUpgrade, OldDowngrade} = find_old_appup_actions(App, PrevVersion),
-    Upgrade = merge_update_actions(diff_app(App, CurrAppIdx, PrevAppIdx), OldUpgrade),
-    Downgrade = merge_update_actions(diff_app(App, PrevAppIdx, CurrAppIdx), OldDowngrade),
+    Upgrade = merge_update_actions(App, diff_app(App, CurrAppIdx, PrevAppIdx), OldUpgrade),
+    Downgrade = merge_update_actions(App, diff_app(App, PrevAppIdx, CurrAppIdx), OldDowngrade),
     if OldUpgrade =:= Upgrade andalso OldDowngrade =:= Downgrade ->
             %% The appup file has been already updated:
             [];
@@ -183,7 +215,7 @@ find_appup_actions(App, CurrAppIdx, PrevAppIdx = #app{version = PrevVersion}) ->
 
 find_old_appup_actions(App, PrevVersion) ->
     {Upgrade0, Downgrade0} =
-        case locate(App, ".appup.src") of
+        case locate(ebin_current, App, ".appup") of
             {ok, AppupFile} ->
                 {_, U, D} = read_appup(AppupFile),
                 {U, D};
@@ -192,22 +224,24 @@ find_old_appup_actions(App, PrevVersion) ->
         end,
     {ensure_version(PrevVersion, Upgrade0), ensure_version(PrevVersion, Downgrade0)}.
 
-merge_update_actions(Changes, Vsns) ->
+merge_update_actions(App, Changes, Vsns) ->
     lists:map(fun(Ret = {<<".*">>, _}) ->
                       Ret;
                  ({Vsn, Actions}) ->
-                      {Vsn, do_merge_update_actions(Changes, Actions)}
+                      {Vsn, do_merge_update_actions(App, Changes, Actions)}
               end,
               Vsns).
 
-do_merge_update_actions({New0, Changed0, Deleted0}, OldActions) ->
+do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
+    AppSpecific = app_specific_actions(App) -- OldActions,
     AlreadyHandled = lists:flatten(lists:map(fun process_old_action/1, OldActions)),
     New = New0 -- AlreadyHandled,
     Changed = Changed0 -- AlreadyHandled,
     Deleted = Deleted0 -- AlreadyHandled,
     [{load_module, M, brutal_purge, soft_purge, []} || M <- Changed ++ New] ++
         OldActions ++
-        [{delete_module, M} || M <- Deleted].
+        [{delete_module, M} || M <- Deleted] ++
+        AppSpecific.
 
 
 %% @doc Process the existing actions to exclude modules that are
@@ -222,12 +256,13 @@ process_old_action(LoadModule) when is_tuple(LoadModule) andalso
 process_old_action(_) ->
     [].
 
-ensure_version(Version, Versions) ->
-    case lists:keyfind(Version, 1, Versions) of
+ensure_version(Version, OldInstructions) ->
+    OldVersions = [ensure_string(element(1, I)) || I <- OldInstructions],
+    case lists:member(Version, OldVersions) of
         false ->
-            [{Version, []}|Versions];
+            [{Version, []}|OldInstructions];
         _ ->
-            Versions
+            OldInstructions
     end.
 
 read_appup(File) ->
@@ -251,7 +286,7 @@ update_appups(Changes) ->
       Changes).
 
 do_update_appup(App, Upgrade, Downgrade) ->
-    case locate(App, ".appup.src") of
+    case locate(src, App, ".appup.src") of
         {ok, AppupFile} ->
             render_appfile(AppupFile, Upgrade, Downgrade);
         undefined ->
@@ -260,7 +295,7 @@ do_update_appup(App, Upgrade, Downgrade) ->
                     render_appfile(AppupFile, Upgrade, Downgrade);
                 false ->
                     set_invalid(),
-                    log("ERROR: Appup file for the external dependency '~p' is not complete.~n       Missing changes: ~p", [App, Upgrade])
+                    log("ERROR: Appup file for the external dependency '~p' is not complete.~n       Missing changes: ~p~n", [App, Upgrade])
             end
     end.
 
@@ -273,7 +308,7 @@ render_appfile(File, Upgrade, Downgrade) ->
     ok = file:write_file(File, IOList).
 
 create_stub(App) ->
-    case locate(App, ".app.src") of
+    case locate(src, App, ".app.src") of
         {ok, AppSrc} ->
             AppupFile = filename:basename(AppSrc) ++ ".appup.src",
             Default = {<<".*">>, []},
@@ -288,8 +323,9 @@ create_stub(App) ->
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
 index_apps(ReleaseDir) ->
-    maps:from_list([index_app(filename:join(ReleaseDir, AppFile)) ||
-                       AppFile <- filelib:wildcard("**/ebin/*.app", ReleaseDir)]).
+    Apps0 = maps:from_list([index_app(filename:join(ReleaseDir, AppFile)) ||
+                               AppFile <- filelib:wildcard("**/ebin/*.app", ReleaseDir)]),
+    maps:without(ignored_apps(), Apps0).
 
 index_app(AppFile) ->
     {ok, [{application, App, Properties}]} = file:consult(AppFile),
@@ -320,7 +356,10 @@ diff_app(App, #app{version = NewVersion, modules = NewModules}, #app{version = O
     NChanges = length(New) + length(Changed) + length(Deleted),
     if NewVersion =:= OldVersion andalso NChanges > 0 ->
             set_invalid(),
-            log("ERROR: Application '~p' contains changes, but its version is not updated", [App]);
+            log("ERROR: Application '~p' contains changes, but its version is not updated~n", [App]);
+       NewVersion > OldVersion ->
+            log("INFO: Application '~p' has been updated: ~p -> ~p~n", [App, OldVersion, NewVersion]),
+            ok;
        true ->
             ok
     end,
@@ -372,7 +411,16 @@ semver(Maj, Min, Patch) ->
     lists:flatten(io_lib:format("~p.~p.~p", [Maj, Min, Patch])).
 
 %% Locate a file in a specified application
-locate(App, Suffix) ->
+locate(ebin_current, App, Suffix) ->
+    ReleaseDir = getopt(beams_dir),
+    AppStr = atom_to_list(App),
+    case filelib:wildcard(ReleaseDir ++ "/**/ebin/" ++ AppStr ++ Suffix) of
+        [File] ->
+            {ok, File};
+        [] ->
+            undefined
+    end;
+locate(src, App, Suffix) ->
     AppStr = atom_to_list(App),
     SrcDirs = getopt(src_dirs),
     case filelib:wildcard(SrcDirs ++ AppStr ++ Suffix) of
@@ -425,3 +473,11 @@ log(Msg) ->
 
 log(Msg, Args) ->
     io:format(standard_error, Msg, Args).
+
+ensure_string(Str) when is_binary(Str) ->
+    binary_to_list(Str);
+ensure_string(Str) when is_list(Str) ->
+    Str.
+
+otp_standard_apps() ->
+    [ssl, mnesia, kernel, asn1, stdlib].

+ 98 - 87
src/emqx.appup.src

@@ -1,36 +1,40 @@
 %% -*- mode: erlang -*-
-Instructions =
-{"4.3.10",
-  [
-   %% app 4.3.9 was released in e4.3.4(enterprise) but not v4.3.9(opensource)
-   {"4.3.9", [
+{VSN,
+  [{"4.3.9",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.8", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.8",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.7", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.7",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.6", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.6",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.5", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.5",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@@ -39,9 +43,10 @@ Instructions =
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.4", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.4",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@@ -51,10 +56,10 @@ Instructions =
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.3", [
-     {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.3",
+    [{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
      {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
@@ -65,10 +70,10 @@ Instructions =
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.2", [
-     {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.2",
+    [{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
      {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
@@ -82,10 +87,10 @@ Instructions =
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.1", [
-     {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.1",
+    [{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
      {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
@@ -103,9 +108,11 @@ Instructions =
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-   ]},
-   {"4.3.0", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.0",
+    [{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
@@ -126,43 +133,58 @@ Instructions =
      {load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
      {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-   ]},
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {<<".*">>,[]}],
-  [
-   {"4.3.9", [
+  [{"4.3.9",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.8", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.8",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.7", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.7",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.6", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.6",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.5", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.5",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.4", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.4",
+    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@@ -170,9 +192,11 @@ Instructions =
      {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.3", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.3",
+    [{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
@@ -182,9 +206,11 @@ Instructions =
      {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.2", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.2",
+    [{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
@@ -197,9 +223,11 @@ Instructions =
      {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-    ]},
-   {"4.3.1", [
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.1",
+    [{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+     {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
@@ -216,10 +244,10 @@ Instructions =
      {load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
      {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-   ]},
-   {"4.3.0", [
-     {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {"4.3.0",
+    [{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
      {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
@@ -240,23 +268,6 @@ Instructions =
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
-     {load_module,emqx_rpc,brutal_purge,soft_purge,[]}
-   ]},
-   {<<".*">>,[]}]},
-
-%% Always reload emqx_app for emqx_app:get_release/0 to return the correct version
-Mandatory = [{load_module,emqx_app,brutal_purge,soft_purge,[]}],
-
-Append = fun
-  ({<<".*">>, Instrs}) ->
-      {<<".*">>, Instrs};
-  ({Vsn, Instrs}) ->
-      {Vsn, Instrs ++ Mandatory}
-end,
-
-PostProcess = fun({Vsn, UpList, DownList}) ->
-  {Vsn, [Append(Up) || Up <- UpList],
-        [Append(Dn) || Dn <- DownList]}
-end,
-
-PostProcess(Instructions).
+     {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
+     {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
+   {<<".*">>,[]}]}.

+ 1 - 1
src/emqx_ws_connection.erl

@@ -242,7 +242,7 @@ parse_header_fun_origin(Req, Opts) ->
             Origins = proplists:get_value(check_origins, Opts, []),
             case lists:member(Value, Origins) of
                 true -> ok;
-                false -> {origin_not_allowed, Value}
+                false -> {error, {origin_not_allowed, Value}}
             end
     end.
 

+ 1 - 1
test/emqx_ws_connection_SUITE.erl

@@ -247,7 +247,7 @@ t_ws_check_origin(_) ->
     ?assertMatch({gun_upgrade, _},
         start_ws_client(#{protocols => [<<"mqtt">>],
                           headers => [{<<"origin">>, <<"http://localhost:18083">>}]})),
-    ?assertMatch({gun_response, {_, 500, _}},
+    ?assertMatch({gun_response, {_, 403, _}},
         start_ws_client(#{protocols => [<<"mqtt">>],
                           headers => [{<<"origin">>, <<"http://localhost:18080">>}]})),
     emqx_ct_helpers:stop_apps([]).