Преглед на файлове

Merge pull request #5885 from savonarola/fix-acl-schema

fix(mnesia_acl): introduce optimized schema and migration process
Ilya Averyanov преди 4 години
родител
ревизия
49c7eae211

+ 14 - 0
.ci/acl_migration_test/build.sh

@@ -0,0 +1,14 @@
+#!/bin/bash
+
+set -xe
+
+cd "$EMQX_PATH"
+
+rm -rf _build _upgrade_base
+
+mkdir _upgrade_base
+pushd _upgrade_base
+    wget "https://s3-us-west-2.amazonaws.com/packages.emqx/emqx-ce/v${EMQX_BASE}/emqx-ubuntu20.04-${EMQX_BASE}-amd64.zip"
+popd
+
+make emqx-zip

+ 15 - 0
.ci/acl_migration_test/prepare.sh

@@ -0,0 +1,15 @@
+#!/bin/bash
+
+set -xe
+
+mkdir -p "$TEST_PATH"
+cd "$TEST_PATH"
+
+cp ../"$EMQX_PATH"/_upgrade_base/*.zip ./
+unzip ./*.zip
+
+cp ../"$EMQX_PATH"/_packages/emqx/*.zip ./emqx/releases/
+
+git clone --depth 1 https://github.com/terry-xiaoyu/one_more_emqx.git
+
+./one_more_emqx/one_more_emqx.sh emqx2

+ 17 - 0
.ci/acl_migration_test/suite.sh

@@ -0,0 +1,17 @@
+#!/bin/bash
+
+set -xe
+
+export EMQX_PATH="$1"
+export EMQX_BASE="$2"
+
+export TEST_PATH="emqx_test"
+
+./build.sh
+
+VERSION=$("$EMQX_PATH"/pkg-vsn.sh)
+export VERSION
+
+./prepare.sh
+
+./test.sh

+ 121 - 0
.ci/acl_migration_test/test.sh

@@ -0,0 +1,121 @@
+#!/bin/bash
+
+set -e
+
+EMQX_ENDPOINT="http://localhost:8081/api/v4/acl"
+EMQX2_ENDPOINT="http://localhost:8917/api/v4/acl"
+
+function run() {
+    emqx="$1"
+    shift
+
+    echo "[$emqx]" "$@"
+
+    pushd "$TEST_PATH/$emqx"
+        "$@"
+    popd
+}
+
+function post_rule() {
+    endpoint="$1"
+    rule="$2"
+    echo -n "->($endpoint) "
+    curl -s -u admin:public -X POST "$endpoint" -d "$rule"
+    echo
+}
+
+function verify_clientid_rule() {
+    endpoint="$1"
+    id="$2"
+    echo -n "<-($endpoint) "
+    curl -s -u admin:public "$endpoint/clientid/$id" | grep "$id" || (echo "verify rule for client $id failed" && return 1)
+}
+
+# Run nodes
+
+run emqx ./bin/emqx start
+run emqx2 ./bin/emqx start
+
+run emqx ./bin/emqx_ctl plugins load emqx_auth_mnesia
+run emqx2 ./bin/emqx_ctl plugins load emqx_auth_mnesia
+
+run emqx2 ./bin/emqx_ctl cluster join 'emqx@127.0.0.1'
+
+# Add ACL rule to unupgraded EMQX nodes
+
+post_rule "$EMQX_ENDPOINT" '{"clientid": "CLIENT1_A","topic": "t", "action": "pub", "access": "allow"}'
+post_rule "$EMQX2_ENDPOINT" '{"clientid": "CLIENT1_B","topic": "t", "action": "pub", "access": "allow"}'
+
+# Upgrade emqx2 node
+
+run emqx2 ./bin/emqx install "$VERSION"
+sleep 60
+
+# Verify upgrade blocked
+
+run emqx2 ./bin/emqx eval 'emqx_acl_mnesia_migrator:is_old_table_migrated().' | grep false || (echo "emqx2 shouldn't have migrated" && exit 1)
+
+# Verify old rules on both nodes
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_B'
+
+# Add ACL on OLD and NEW node, verify on all nodes
+
+post_rule "$EMQX_ENDPOINT" '{"clientid": "CLIENT2_A","topic": "t", "action": "pub", "access": "allow"}'
+post_rule "$EMQX2_ENDPOINT" '{"clientid": "CLIENT2_B","topic": "t", "action": "pub", "access": "allow"}'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_B'
+
+# Upgrade emqx node
+
+run emqx ./bin/emqx install "$VERSION"
+
+# Wait for upgrade
+
+sleep 60
+
+# Verify if upgrade occured
+
+run emqx ./bin/emqx eval 'emqx_acl_mnesia_migrator:is_old_table_migrated().' | grep true || (echo "emqx should have migrated" && exit 1)
+run emqx2 ./bin/emqx eval 'emqx_acl_mnesia_migrator:is_old_table_migrated().' | grep true || (echo "emqx2 should have migrated" && exit 1)
+
+# Verify rules are kept
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_B'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_B'
+
+# Add ACL on OLD and NEW node, verify on all nodes
+
+post_rule "$EMQX_ENDPOINT" '{"clientid": "CLIENT3_A","topic": "t", "action": "pub", "access": "allow"}'
+post_rule "$EMQX2_ENDPOINT" '{"clientid": "CLIENT3_B","topic": "t", "action": "pub", "access": "allow"}'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT3_A'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT3_A'
+
+verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT3_B'
+verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT3_B'
+
+# Stop nodes
+
+run emqx ./bin/emqx stop
+run emqx2 ./bin/emqx stop
+
+echo "Success!"
+

+ 22 - 0
.github/workflows/run_acl_migration_tests.yaml

@@ -0,0 +1,22 @@
+name: ACL fix & migration integration tests
+
+on: workflow_dispatch
+
+jobs:
+    test:
+        runs-on: ubuntu-20.04
+        container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
+        strategy:
+            fail-fast: true
+        env:
+            BASE_VERSION: "4.3.0"
+        steps:
+        - uses: actions/checkout@v2
+          with:
+            path: emqx
+        - name: Prepare scripts
+          run: |
+            cp ./emqx/.ci/acl_migration_test/*.sh ./
+        - name: Run tests
+          run: |
+            ./suite.sh emqx "$BASE_VERSION"

+ 33 - 7
apps/emqx_auth_mnesia/include/emqx_auth_mnesia.hrl

@@ -1,21 +1,47 @@
 -define(APP, emqx_auth_mnesia).
 
--type(login():: {clientid, binary()}
+-type(login() :: {clientid, binary()}
               | {username, binary()}).
 
+-type(acl_target() :: login() | all).
+
+-type(acl_target_type() :: clientid | username | all).
+
+-type(access():: allow | deny).
+-type(action():: pub | sub).
+-type(legacy_action():: action() | pubsub).
+-type(created_at():: integer()).
+
 -record(emqx_user, {
           login :: login(),
           password :: binary(),
-          created_at :: integer()
+          created_at :: created_at()
         }).
 
--record(emqx_acl, {
-          filter:: {login() | all, emqx_topic:topic()},
-          action :: pub | sub | pubsub,
-          access :: allow | deny,
-          created_at :: integer()
+-define(ACL_TABLE, emqx_acl).
+
+-define(MIGRATION_MARK_KEY, emqx_acl2_migration_started).
+
+-record(?ACL_TABLE, {
+          filter :: {acl_target(), emqx_topic:topic()} | ?MIGRATION_MARK_KEY,
+          action :: legacy_action(),
+          access :: access(),
+          created_at :: created_at()
          }).
 
+-define(MIGRATION_MARK_RECORD, #?ACL_TABLE{filter = ?MIGRATION_MARK_KEY, action = pub, access = deny, created_at = 0}).
+
+-type(rule() :: {access(), action(), emqx_topic:topic(), created_at()}).
+
+-define(ACL_TABLE2, emqx_acl2).
+
+-record(?ACL_TABLE2, {
+          who :: acl_target(),
+          rules :: [ rule() ]
+         }).
+
+-type(acl_record() :: {acl_target(), emqx_topic:topic(), action(), access(), created_at()}).
+
 -record(auth_metrics, {
         success = 'client.auth.success',
         failure = 'client.auth.failure',

+ 8 - 17
apps/emqx_auth_mnesia/src/emqx_acl_mnesia.erl

@@ -18,24 +18,16 @@
 
 -include("emqx_auth_mnesia.hrl").
 
--include_lib("stdlib/include/ms_transform.hrl").
-
--define(TABLE, emqx_acl).
-
 %% ACL Callbacks
 -export([ init/0
         , register_metrics/0
         , check_acl/5
         , description/0
-        ]).
+       ]).
 
 init() ->
-    ok = ekka_mnesia:create_table(emqx_acl, [
-            {type, bag},
-            {disc_copies, [node()]},
-            {attributes, record_info(fields, emqx_acl)},
-            {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
-    ok = ekka_mnesia:copy_table(emqx_acl, disc_copies).
+    ok = emqx_acl_mnesia_db:create_table(),
+    ok = emqx_acl_mnesia_db:create_table2().
 
 -spec(register_metrics() -> ok).
 register_metrics() ->
@@ -46,12 +38,12 @@ check_acl(ClientInfo = #{ clientid := Clientid }, PubSub, Topic, _NoMatchAction,
 
     Acls = case Username of
                undefined ->
-                   emqx_acl_mnesia_cli:lookup_acl({clientid, Clientid}) ++
-                   emqx_acl_mnesia_cli:lookup_acl(all);
+                   emqx_acl_mnesia_db:lookup_acl({clientid, Clientid}) ++
+                   emqx_acl_mnesia_db:lookup_acl(all);
                _ ->
-                   emqx_acl_mnesia_cli:lookup_acl({clientid, Clientid}) ++
-                   emqx_acl_mnesia_cli:lookup_acl({username, Username}) ++
-                   emqx_acl_mnesia_cli:lookup_acl(all)
+                   emqx_acl_mnesia_db:lookup_acl({clientid, Clientid}) ++
+                   emqx_acl_mnesia_db:lookup_acl({username, Username}) ++
+                   emqx_acl_mnesia_db:lookup_acl(all)
            end,
 
     case match(ClientInfo, PubSub, Topic, Acls) of
@@ -83,7 +75,6 @@ match(ClientInfo, PubSub, Topic, [ {_, ACLTopic, Action, Access, _} | Acls]) ->
 match_topic(ClientInfo, Topic, ACLTopic) when is_binary(Topic) ->
     emqx_topic:match(Topic, feed_var(ClientInfo, ACLTopic)).
 
-match_actions(_, pubsub) -> true;
 match_actions(subscribe, sub) -> true;
 match_actions(publish, pub) -> true;
 match_actions(_, _) -> false.

+ 16 - 18
apps/emqx_auth_mnesia/src/emqx_acl_mnesia_api.erl

@@ -16,8 +16,6 @@
 
 -module(emqx_acl_mnesia_api).
 
--include("emqx_auth_mnesia.hrl").
-
 -include_lib("stdlib/include/ms_transform.hrl").
 
 -import(proplists, [ get_value/2
@@ -99,26 +97,22 @@
         ]).
 
 list_clientid(_Bindings, Params) ->
-    MatchSpec = ets:fun2ms(
-                  fun({emqx_acl, {{clientid, Clientid}, Topic}, Action, Access, CreatedAt}) -> {{clientid,Clientid}, Topic, Action,Access, CreatedAt} end),
-    return({ok, emqx_auth_mnesia_api:paginate(emqx_acl, MatchSpec, Params, fun emqx_acl_mnesia_cli:comparing/2, fun format/1)}).
+    Table = emqx_acl_mnesia_db:login_acl_table(clientid),
+    return({ok, emqx_auth_mnesia_api:paginate_qh(Table, count(Table), Params, fun emqx_acl_mnesia_db:comparing/2, fun format/1)}).
 
 list_username(_Bindings, Params) ->
-    MatchSpec = ets:fun2ms(
-                  fun({emqx_acl, {{username, Username}, Topic}, Action, Access, CreatedAt}) -> {{username, Username}, Topic, Action,Access, CreatedAt} end),
-    return({ok, emqx_auth_mnesia_api:paginate(emqx_acl, MatchSpec, Params, fun emqx_acl_mnesia_cli:comparing/2, fun format/1)}).
+    Table = emqx_acl_mnesia_db:login_acl_table(username),
+    return({ok, emqx_auth_mnesia_api:paginate_qh(Table, count(Table), Params, fun emqx_acl_mnesia_db:comparing/2, fun format/1)}).
 
 list_all(_Bindings, Params) ->
-    MatchSpec = ets:fun2ms(
-                  fun({emqx_acl, {all, Topic}, Action, Access, CreatedAt}) -> {all, Topic, Action,Access, CreatedAt}end
-                 ),
-    return({ok, emqx_auth_mnesia_api:paginate(emqx_acl, MatchSpec, Params, fun emqx_acl_mnesia_cli:comparing/2, fun format/1)}).
+    Table = emqx_acl_mnesia_db:login_acl_table(all),
+    return({ok, emqx_auth_mnesia_api:paginate_qh(Table, count(Table), Params, fun emqx_acl_mnesia_db:comparing/2, fun format/1)}).
 
 
 lookup(#{clientid := Clientid}, _Params) ->
-    return({ok, format(emqx_acl_mnesia_cli:lookup_acl({clientid, urldecode(Clientid)}))});
+    return({ok, format(emqx_acl_mnesia_db:lookup_acl({clientid, urldecode(Clientid)}))});
 lookup(#{username := Username}, _Params) ->
-    return({ok, format(emqx_acl_mnesia_cli:lookup_acl({username, urldecode(Username)}))}).
+    return({ok, format(emqx_acl_mnesia_db:lookup_acl({username, urldecode(Username)}))}).
 
 add(_Bindings, Params) ->
     [ P | _] = Params,
@@ -152,7 +146,7 @@ do_add(Params) ->
     Access = get_value(<<"access">>, Params),
     Re = case validate([login, topic, action, access], [Login, Topic, Action, Access]) of
         ok ->
-            emqx_acl_mnesia_cli:add_acl(Login, Topic, erlang:binary_to_atom(Action, utf8), erlang:binary_to_atom(Access, utf8));
+            emqx_acl_mnesia_db:add_acl(Login, Topic, erlang:binary_to_atom(Action, utf8), erlang:binary_to_atom(Access, utf8));
         Err -> Err
     end,
     maps:merge(#{topic => Topic,
@@ -165,15 +159,19 @@ do_add(Params) ->
                    end).
 
 delete(#{clientid := Clientid, topic := Topic}, _) ->
-    return(emqx_acl_mnesia_cli:remove_acl({clientid, urldecode(Clientid)}, urldecode(Topic)));
+    return(emqx_acl_mnesia_db:remove_acl({clientid, urldecode(Clientid)}, urldecode(Topic)));
 delete(#{username := Username, topic := Topic}, _) ->
-    return(emqx_acl_mnesia_cli:remove_acl({username, urldecode(Username)}, urldecode(Topic)));
+    return(emqx_acl_mnesia_db:remove_acl({username, urldecode(Username)}, urldecode(Topic)));
 delete(#{topic := Topic}, _) ->
-    return(emqx_acl_mnesia_cli:remove_acl(all, urldecode(Topic))).
+    return(emqx_acl_mnesia_db:remove_acl(all, urldecode(Topic))).
 
 %%------------------------------------------------------------------------------
 %% Interval Funcs
 %%------------------------------------------------------------------------------
+
+count(QH) ->
+    qlc:fold(fun(_, Count) -> Count + 1 end, 0, QH).
+
 format({{clientid, Clientid}, Topic, Action, Access, _CreatedAt}) ->
     #{clientid => Clientid, topic => Topic, action => Action, access => Access};
 format({{username, Username}, Topic, Action, Access, _CreatedAt}) ->

+ 12 - 125
apps/emqx_auth_mnesia/src/emqx_acl_mnesia_cli.erl

@@ -16,110 +16,28 @@
 
 -module(emqx_acl_mnesia_cli).
 
--include("emqx_auth_mnesia.hrl").
--include_lib("emqx/include/logger.hrl").
--include_lib("stdlib/include/ms_transform.hrl").
--define(TABLE, emqx_acl).
-
-%% Acl APIs
--export([ add_acl/4
-        , lookup_acl/1
-        , all_acls/0
-        , all_acls/1
-        , remove_acl/2
-        ]).
-
 -export([cli/1]).
--export([comparing/2]).
-%%--------------------------------------------------------------------
-%% Acl API
-%%--------------------------------------------------------------------
-
-%% @doc Add Acls
--spec(add_acl(login() | all, emqx_topic:topic(), pub | sub | pubsub, allow | deny) ->
-        ok | {error, any()}).
-add_acl(Login, Topic, Action, Access) ->
-    Filter = {Login, Topic},
-    Acl = #?TABLE{
-             filter = Filter,
-             action = Action,
-             access = Access,
-             created_at = erlang:system_time(millisecond)
-            },
-    ret(mnesia:transaction(
-          fun() ->
-                  OldRecords = mnesia:wread({?TABLE, Filter}),
-                  case Action of
-                      pubsub ->
-                          update_permission(pub, Acl, OldRecords),
-                          update_permission(sub, Acl, OldRecords);
-                      _ ->
-                          update_permission(Action, Acl, OldRecords)
-                  end
-          end)).
-
-%% @doc Lookup acl by login
--spec(lookup_acl(login() | all) -> list()).
-lookup_acl(undefined) -> [];
-lookup_acl(Login) ->
-    MatchSpec = ets:fun2ms(fun({?TABLE, {Filter, ACLTopic}, Action, Access, CreatedAt})
-                                 when Filter =:= Login ->
-                                   {Filter, ACLTopic, Action, Access, CreatedAt}
-                           end),
-    lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)).
-
-%% @doc Remove acl
--spec(remove_acl(login() | all, emqx_topic:topic()) -> ok | {error, any()}).
-remove_acl(Login, Topic) ->
-    ret(mnesia:transaction(fun mnesia:delete/1, [{?TABLE, {Login, Topic}}])).
-
-%% @doc All logins
--spec(all_acls() -> list()).
-all_acls() ->
-    all_acls(clientid) ++
-    all_acls(username) ++
-    all_acls(all).
-
-all_acls(clientid) ->
-    MatchSpec = ets:fun2ms(
-                  fun({?TABLE, {{clientid, Clientid}, Topic}, Action, Access, CreatedAt}) ->
-                          {{clientid, Clientid}, Topic, Action, Access, CreatedAt}
-                  end),
-    lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec));
-all_acls(username) ->
-    MatchSpec = ets:fun2ms(
-                  fun({?TABLE, {{username, Username}, Topic}, Action, Access, CreatedAt}) ->
-                          {{username, Username}, Topic, Action, Access, CreatedAt}
-                  end),
-    lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec));
-all_acls(all) ->
-    MatchSpec = ets:fun2ms(
-                  fun({?TABLE, {all, Topic}, Action, Access, CreatedAt}) ->
-                          {all, Topic, Action, Access, CreatedAt}
-                  end
-                 ),
-    lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)).
 
 %%--------------------------------------------------------------------
 %% ACL Cli
 %%--------------------------------------------------------------------
 
 cli(["list"]) ->
-    [print_acl(Acl) || Acl <- all_acls()];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:all_acls()];
 
 cli(["list", "clientid"]) ->
-    [print_acl(Acl) || Acl <- all_acls(clientid)];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:all_acls(clientid)];
 
 cli(["list", "username"]) ->
-    [print_acl(Acl) || Acl <- all_acls(username)];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:all_acls(username)];
 
 cli(["list", "_all"]) ->
-    [print_acl(Acl) || Acl <- all_acls(all)];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:all_acls(all)];
 
 cli(["add", "clientid", Clientid, Topic, Action, Access]) ->
     case validate(action, Action) andalso validate(access, Access) of
         true ->
-            case add_acl(
+            case emqx_acl_mnesia_db:add_acl(
                    {clientid, iolist_to_binary(Clientid)},
                    iolist_to_binary(Topic),
                    list_to_existing_atom(Action),
@@ -135,7 +53,7 @@ cli(["add", "clientid", Clientid, Topic, Action, Access]) ->
 cli(["add", "username", Username, Topic, Action, Access]) ->
     case validate(action, Action) andalso validate(access, Access) of
         true ->
-            case add_acl(
+            case emqx_acl_mnesia_db:add_acl(
                    {username, iolist_to_binary(Username)},
                    iolist_to_binary(Topic),
                    list_to_existing_atom(Action),
@@ -151,7 +69,7 @@ cli(["add", "username", Username, Topic, Action, Access]) ->
 cli(["add", "_all", Topic, Action, Access]) ->
     case validate(action, Action) andalso validate(access, Access) of
         true ->
-            case add_acl(
+            case emqx_acl_mnesia_db:add_acl(
                    all,
                    iolist_to_binary(Topic),
                    list_to_existing_atom(Action),
@@ -165,16 +83,16 @@ cli(["add", "_all", Topic, Action, Access]) ->
     end;
 
 cli(["show", "clientid", Clientid]) ->
-    [print_acl(Acl) || Acl <- lookup_acl({clientid, iolist_to_binary(Clientid)})];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:lookup_acl({clientid, iolist_to_binary(Clientid)})];
 
 cli(["show", "username", Username]) ->
-    [print_acl(Acl) || Acl <- lookup_acl({username, iolist_to_binary(Username)})];
+    [print_acl(Acl) || Acl <- emqx_acl_mnesia_db:lookup_acl({username, iolist_to_binary(Username)})];
 
 cli(["del", "clientid", Clientid, Topic])->
     cli(["delete", "clientid", Clientid, Topic]);
 
 cli(["delete", "clientid", Clientid, Topic])->
-    case remove_acl({clientid, iolist_to_binary(Clientid)}, iolist_to_binary(Topic)) of
+    case emqx_acl_mnesia_db:remove_acl({clientid, iolist_to_binary(Clientid)}, iolist_to_binary(Topic)) of
          ok -> emqx_ctl:print("ok~n");
         {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason])
     end;
@@ -183,7 +101,7 @@ cli(["del", "username", Username, Topic])->
     cli(["delete", "username", Username, Topic]);
 
 cli(["delete", "username", Username, Topic])->
-    case remove_acl({username, iolist_to_binary(Username)}, iolist_to_binary(Topic)) of
+    case emqx_acl_mnesia_db:remove_acl({username, iolist_to_binary(Username)}, iolist_to_binary(Topic)) of
          ok -> emqx_ctl:print("ok~n");
         {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason])
     end;
@@ -192,7 +110,7 @@ cli(["del", "_all", Topic])->
     cli(["delete", "_all", Topic]);
 
 cli(["delete", "_all", Topic])->
-    case remove_acl(all, iolist_to_binary(Topic)) of
+    case emqx_acl_mnesia_db:remove_acl(all, iolist_to_binary(Topic)) of
          ok -> emqx_ctl:print("ok~n");
         {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason])
     end;
@@ -215,13 +133,6 @@ cli(_) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-comparing({_, _, _, _, CreatedAt1},
-          {_, _, _, _, CreatedAt2}) ->
-    CreatedAt1 >= CreatedAt2.
-
-ret({atomic, ok})     -> ok;
-ret({aborted, Error}) -> {error, Error}.
-
 validate(action, "pub") -> true;
 validate(action, "sub") -> true;
 validate(action, "pubsub") -> true;
@@ -244,27 +155,3 @@ print_acl({all, Topic, Action, Access, _}) ->
         "Acl($all topic = ~p action = ~p access = ~p)~n",
         [Topic, Action, Access]
      ).
-
-update_permission(Action, Acl0, OldRecords) ->
-    Acl = Acl0 #?TABLE{action = Action},
-    maybe_delete_shadowed_records(Action, OldRecords),
-    mnesia:write(Acl).
-
-maybe_delete_shadowed_records(_, []) ->
-    ok;
-maybe_delete_shadowed_records(Action1, [Rec = #emqx_acl{action = Action2} | Rest]) ->
-    if Action1 =:= Action2 ->
-            ok = mnesia:delete_object(Rec);
-       Action2 =:= pubsub ->
-            %% Perform migration from the old data format on the
-            %% fly. This is needed only for the enterprise version,
-            %% delete this branch on 5.0
-            mnesia:delete_object(Rec),
-            mnesia:write(Rec#?TABLE{action = other_action(Action1)});
-       true ->
-            ok
-    end,
-    maybe_delete_shadowed_records(Action1, Rest).
-
-other_action(pub) -> sub;
-other_action(sub) -> pub.

+ 339 - 0
apps/emqx_auth_mnesia/src/emqx_acl_mnesia_db.erl

@@ -0,0 +1,339 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_acl_mnesia_db).
+
+-include("emqx_auth_mnesia.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("stdlib/include/qlc.hrl").
+
+%% ACL APIs
+-export([ create_table/0
+        , create_table2/0
+        ]).
+
+-export([ add_acl/4
+        , lookup_acl/1
+        , all_acls_export/0
+        , all_acls/0
+        , all_acls/1
+        , remove_acl/2
+        , merge_acl_records/3
+        , login_acl_table/1
+        , is_migration_started/0
+        ]).
+
+-export([comparing/2]).
+
+%%--------------------------------------------------------------------
+%% ACL API
+%%--------------------------------------------------------------------
+
+%% @doc Create table `emqx_acl` of old format rules
+-spec(create_table() -> ok).
+create_table() ->
+    ok = ekka_mnesia:create_table(?ACL_TABLE, [
+        {type, bag},
+        {disc_copies, [node()]},
+        {attributes, record_info(fields, ?ACL_TABLE)},
+        {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
+    ok = ekka_mnesia:copy_table(?ACL_TABLE, disc_copies).
+
+%% @doc Create table `emqx_acl2` of new format rules
+-spec(create_table2() -> ok).
+create_table2() ->
+    ok = ekka_mnesia:create_table(?ACL_TABLE2, [
+            {type, ordered_set},
+            {disc_copies, [node()]},
+            {attributes, record_info(fields, ?ACL_TABLE2)},
+            {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
+    ok = ekka_mnesia:copy_table(?ACL_TABLE2, disc_copies).
+
+%% @doc Add Acls
+-spec(add_acl(acl_target(), emqx_topic:topic(), legacy_action(), access()) ->
+        ok | {error, any()}).
+add_acl(Login, Topic, Action, Access) ->
+    ret(mnesia:transaction(fun() ->
+                              case is_migration_started() of
+                                  true -> add_acl_new(Login, Topic, Action, Access);
+                                  false -> add_acl_old(Login, Topic, Action, Access)
+                              end
+                           end)).
+
+%% @doc Lookup acl by login
+-spec(lookup_acl(acl_target()) -> list(acl_record())).
+lookup_acl(undefined) -> [];
+lookup_acl(Login) ->
+    % After migration to ?ACL_TABLE2, ?ACL_TABLE never has any rules. This lookup should be removed later.
+    MatchSpec = ets:fun2ms(fun(#?ACL_TABLE{filter = {Filter, _}} = Rec)
+                                 when Filter =:= Login -> Rec
+                           end),
+    OldRecs = ets:select(?ACL_TABLE, MatchSpec),
+
+    NewAcls = ets:lookup(?ACL_TABLE2, Login),
+    MergedAcl = merge_acl_records(Login, OldRecs, NewAcls),
+    lists:sort(fun comparing/2, acl_to_list(MergedAcl)).
+
+%% @doc Remove ACL
+-spec remove_acl(acl_target(), emqx_topic:topic()) -> ok | {error, any()}.
+remove_acl(Login, Topic) ->
+    ret(mnesia:transaction(fun() ->
+                              mnesia:delete({?ACL_TABLE, {Login, Topic}}),
+                              case mnesia:wread({?ACL_TABLE2, Login}) of
+                                  [] -> ok;
+                                  [#?ACL_TABLE2{rules = Rules} = Acl] ->
+                                      case delete_topic_rules(Topic, Rules) of
+                                          [] -> mnesia:delete({?ACL_TABLE2, Login});
+                                          [_ | _] = RemainingRules ->
+                                              mnesia:write(Acl#?ACL_TABLE2{rules = RemainingRules})
+                                      end
+                              end
+                           end)).
+
+%% @doc All ACL rules
+-spec(all_acls() -> list(acl_record())).
+all_acls() ->
+    all_acls(username) ++
+    all_acls(clientid) ++
+    all_acls(all).
+
+%% @doc All ACL rules of specified type
+-spec(all_acls(acl_target_type()) -> list(acl_record())).
+all_acls(AclTargetType) ->
+    lists:sort(fun comparing/2, qlc:eval(login_acl_table(AclTargetType))).
+
+%% @doc All ACL rules fetched transactionally
+-spec(all_acls_export() -> list(acl_record())).
+all_acls_export() ->
+    AclTargetTypes = [username, clientid, all],
+    MatchSpecNew = lists:flatmap(fun login_match_spec_new/1, AclTargetTypes),
+    MatchSpecOld = lists:flatmap(fun login_match_spec_old/1, AclTargetTypes),
+
+    {atomic, Records} = mnesia:transaction(
+        fun() ->
+            QH = acl_table(MatchSpecNew, MatchSpecOld, fun mnesia:table/2, fun lookup_mnesia/2),
+            qlc:eval(QH)
+        end),
+    Records.
+
+%% @doc QLC table of logins matching spec
+-spec(login_acl_table(acl_target_type()) -> qlc:query_handle()).
+login_acl_table(AclTargetType) ->
+    MatchSpecNew = login_match_spec_new(AclTargetType),
+    MatchSpecOld = login_match_spec_old(AclTargetType),
+    acl_table(MatchSpecNew, MatchSpecOld, fun ets:table/2, fun lookup_ets/2).
+
+%% @doc Combine old `emqx_acl` ACL records with a new `emqx_acl2` ACL record for a given login
+-spec(merge_acl_records(acl_target(), [#?ACL_TABLE{}], [#?ACL_TABLE2{}]) -> #?ACL_TABLE2{}).
+merge_acl_records(Login, OldRecs, Acls) ->
+    OldRules = old_recs_to_rules(OldRecs),
+    NewRules = case Acls of
+        [] -> [];
+        [#?ACL_TABLE2{rules = Rules}] -> Rules
+    end,
+    #?ACL_TABLE2{who = Login, rules = merge_rules(NewRules, OldRules)}.
+
+%% @doc Checks if background migration of ACL rules from `emqx_acl` to `emqx_acl2` format started.
+%% Should be run in transaction
+-spec(is_migration_started() -> boolean()).
+is_migration_started() ->
+    case mnesia:read({?ACL_TABLE, ?MIGRATION_MARK_KEY}) of
+        [?MIGRATION_MARK_RECORD | _] -> true;
+        [] -> false
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+add_acl_new(Login, Topic, Action, Access) ->
+    Rule = {Access, Action, Topic, erlang:system_time(millisecond)},
+    Rules = normalize_rule(Rule),
+    OldAcl = mnesia:wread({?ACL_TABLE2, Login}),
+    NewAcl = case OldAcl of
+        [#?ACL_TABLE2{rules = OldRules} = Acl] ->
+            Acl#?ACL_TABLE2{rules = merge_rules(Rules, OldRules)};
+        [] ->
+            #?ACL_TABLE2{who = Login, rules = Rules}
+    end,
+    mnesia:write(NewAcl).
+
+add_acl_old(Login, Topic, Action, Access) ->
+    Filter = {Login, Topic},
+    Acl = #?ACL_TABLE{
+             filter = Filter,
+             action = Action,
+             access = Access,
+             created_at = erlang:system_time(millisecond)
+            },
+    OldRecords = mnesia:wread({?ACL_TABLE, Filter}),
+    case Action of
+        pubsub ->
+            update_permission(pub, Acl, OldRecords),
+            update_permission(sub, Acl, OldRecords);
+        _ ->
+            update_permission(Action, Acl, OldRecords)
+    end.
+
+old_recs_to_rules(OldRecs) ->
+    lists:flatmap(fun old_rec_to_rules/1, OldRecs).
+
+old_rec_to_rules(#?ACL_TABLE{filter = {_, Topic}, action = Action, access = Access, created_at = CreatedAt}) ->
+    normalize_rule({Access, Action, Topic, CreatedAt}).
+
+normalize_rule({Access, pubsub, Topic, CreatedAt}) ->
+    [{Access, pub, Topic, CreatedAt}, {Access, sub, Topic, CreatedAt}];
+normalize_rule({Access, Action, Topic, CreatedAt}) ->
+    [{Access, Action, Topic, CreatedAt}].
+
+merge_rules([], OldRules) -> OldRules;
+merge_rules([NewRule | RestNewRules], OldRules) ->
+    merge_rules(RestNewRules, merge_rule(NewRule, OldRules)).
+
+merge_rule({_, Action, Topic, _ } = NewRule, OldRules) ->
+    [NewRule | lists:filter(
+        fun({_, OldAction, OldTopic, _}) ->
+            {Action, Topic} =/= {OldAction, OldTopic}
+        end, OldRules)].
+
+acl_to_list(#?ACL_TABLE2{who = Login, rules = Rules}) ->
+    [{Login, Topic, Action, Access, CreatedAt} || {Access, Action, Topic, CreatedAt} <- Rules].
+
+delete_topic_rules(Topic, Rules) ->
+    [Rule || {_, _, T, _} = Rule <- Rules, T =/= Topic].
+
+comparing({_, _, _, _, CreatedAt} = Rec1,
+          {_, _, _, _, CreatedAt} = Rec2) ->
+        Rec1 >= Rec2;
+
+comparing({_, _, _, _, CreatedAt1},
+          {_, _, _, _, CreatedAt2}) ->
+    CreatedAt1 >= CreatedAt2.
+
+login_match_spec_old(all) ->
+    ets:fun2ms(fun(#?ACL_TABLE{filter = {all, _}} = Record) ->
+                Record
+               end);
+
+login_match_spec_old(Type) when (Type =:= username) or (Type =:= clientid) ->
+    ets:fun2ms(fun(#?ACL_TABLE{filter = {{RecordType, _}, _}} = Record)
+                    when RecordType =:= Type -> Record
+               end).
+
+login_match_spec_new(all) ->
+    ets:fun2ms(fun(#?ACL_TABLE2{who = all} = Record) ->
+                Record
+               end);
+
+login_match_spec_new(Type) when (Type =:= username) or (Type =:= clientid) ->
+    ets:fun2ms(fun(#?ACL_TABLE2{who = {RecordType, _}} = Record)
+                    when RecordType =:= Type -> Record
+               end).
+
+acl_table(MatchSpecNew, MatchSpecOld, TableFun, LookupFun) ->
+    TraverseFun =
+        fun() ->
+           CursorNew =
+               qlc:cursor(
+                   TableFun(?ACL_TABLE2, [{traverse, {select, MatchSpecNew}}])),
+           CursorOld =
+               qlc:cursor(
+                   TableFun(?ACL_TABLE, [{traverse, {select, MatchSpecOld}}])),
+           traverse_new(CursorNew, CursorOld, #{}, LookupFun)
+        end,
+
+    qlc:table(TraverseFun, []).
+
+
+% These are traverse funs for qlc table created by `acl_table/4`.
+% Traversing consumes memory: it collects logins present in `?ACL_TABLE` and
+% at the same time having rules in `?ACL_TABLE2`.
+% Such records appear if ACLs are inserted before migration started.
+% After migration, number of such logins is zero, so traversing starts working in
+% constant memory.
+
+traverse_new(CursorNew, CursorOld, FoundKeys, LookupFun) ->
+    Acls = qlc:next_answers(CursorNew, 1),
+    case Acls of
+        [] ->
+            qlc:delete_cursor(CursorNew),
+            traverse_old(CursorOld, FoundKeys);
+        [#?ACL_TABLE2{who = Login, rules = Rules} = Acl] ->
+            Keys = lists:usort([{Login, Topic} || {_, _, Topic, _} <- Rules]),
+            OldRecs = lists:flatmap(fun(Key) -> LookupFun(?ACL_TABLE, Key) end, Keys),
+            MergedAcl = merge_acl_records(Login, OldRecs, [Acl]),
+            NewFoundKeys =
+                lists:foldl(fun(#?ACL_TABLE{filter = Key}, Found) -> maps:put(Key, true, Found) end,
+                            FoundKeys,
+                            OldRecs),
+            case acl_to_list(MergedAcl) of
+                [] ->
+                    traverse_new(CursorNew, CursorOld, NewFoundKeys, LookupFun);
+                List ->
+                    List ++ fun() -> traverse_new(CursorNew, CursorOld, NewFoundKeys, LookupFun) end
+            end
+    end.
+
+traverse_old(CursorOld, FoundKeys) ->
+    OldAcls = qlc:next_answers(CursorOld),
+    case OldAcls of
+        [] ->
+            qlc:delete_cursor(CursorOld),
+            [];
+        _ ->
+            Records = [ {Login, Topic, Action, Access, CreatedAt}
+                || #?ACL_TABLE{filter = {Login, Topic}, action = LegacyAction, access = Access, created_at = CreatedAt} <- OldAcls,
+                {_, Action, _, _} <- normalize_rule({Access, LegacyAction, Topic, CreatedAt}),
+                not maps:is_key({Login, Topic}, FoundKeys)
+            ],
+            case Records of
+                [] -> traverse_old(CursorOld, FoundKeys);
+                List -> List ++ fun() -> traverse_old(CursorOld, FoundKeys) end
+            end
+    end.
+
+lookup_mnesia(Tab, Key) ->
+    mnesia:read({Tab, Key}).
+
+lookup_ets(Tab, Key) ->
+    ets:lookup(Tab, Key).
+
+update_permission(Action, Acl0, OldRecords) ->
+    Acl = Acl0 #?ACL_TABLE{action = Action},
+    maybe_delete_shadowed_records(Action, OldRecords),
+    mnesia:write(Acl).
+
+maybe_delete_shadowed_records(_, []) ->
+    ok;
+maybe_delete_shadowed_records(Action1, [Rec = #emqx_acl{action = Action2} | Rest]) ->
+    if Action1 =:= Action2 ->
+            ok = mnesia:delete_object(Rec);
+       Action2 =:= pubsub ->
+            %% Perform migration from the old data format on the
+            %% fly. This is needed only for the enterprise version,
+            %% delete this branch on 5.0
+            mnesia:delete_object(Rec),
+            mnesia:write(Rec#?ACL_TABLE{action = other_action(Action1)});
+       true ->
+            ok
+    end,
+    maybe_delete_shadowed_records(Action1, Rest).
+
+other_action(pub) -> sub;
+other_action(sub) -> pub.
+
+ret({atomic, ok})     -> ok;
+ret({aborted, Error}) -> {error, Error}.

+ 215 - 0
apps/emqx_auth_mnesia/src/emqx_acl_mnesia_migrator.erl

@@ -0,0 +1,215 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_acl_mnesia_migrator).
+
+-include("emqx_auth_mnesia.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-behaviour(gen_statem).
+
+-define(CHECK_ALL_NODES_INTERVAL, 60000).
+
+-type(migration_delay_reason() :: old_nodes | bad_nodes).
+
+-export([
+    callback_mode/0,
+    init/1
+]).
+
+-export([
+    waiting_all_nodes/3,
+    checking_old_table/3,
+    migrating/3
+]).
+
+-export([
+    start_link/0,
+    start_link/1,
+    start_supervised/0,
+    stop_supervised/0,
+    migrate_records/0,
+    is_migrating_on_node/1,
+    is_old_table_migrated/0
+]).
+
+%%--------------------------------------------------------------------
+%% External interface
+%%--------------------------------------------------------------------
+
+start_link() ->
+    start_link(?MODULE).
+
+start_link(Name) when is_atom(Name) ->
+    start_link(#{
+        name => Name
+    });
+
+start_link(#{name := Name} = Opts) ->
+    gen_statem:start_link({local, Name}, ?MODULE, Opts, []).
+
+start_supervised() ->
+    try
+        {ok, _} = supervisor:restart_child(emqx_auth_mnesia_sup, ?MODULE),
+        ok
+    catch
+        exit:{noproc, _} -> ok
+    end.
+
+stop_supervised() ->
+    try
+        ok = supervisor:terminate_child(emqx_auth_mnesia_sup, ?MODULE),
+        ok = supervisor:delete_child(emqx_auth_mnesia_sup, ?MODULE)
+    catch
+        exit:{noproc, _} -> ok
+    end.
+
+%%--------------------------------------------------------------------
+%% gen_statem callbacks
+%%--------------------------------------------------------------------
+
+callback_mode() -> state_functions.
+
+init(Opts) ->
+    ok = emqx_acl_mnesia_db:create_table(),
+    ok = emqx_acl_mnesia_db:create_table2(),
+    Name = maps:get(name, Opts, ?MODULE),
+    CheckNodesInterval = maps:get(check_nodes_interval, Opts, ?CHECK_ALL_NODES_INTERVAL),
+    GetNodes = maps:get(get_nodes, Opts, fun all_nodes/0),
+    Data =
+        #{name => Name,
+          check_nodes_interval => CheckNodesInterval,
+          get_nodes => GetNodes},
+    {ok, waiting_all_nodes, Data, [{state_timeout, 0, check_nodes}]}.
+
+%%--------------------------------------------------------------------
+%% state callbacks
+%%--------------------------------------------------------------------
+
+waiting_all_nodes(state_timeout, check_nodes, Data) ->
+    #{name := Name, check_nodes_interval := CheckNodesInterval, get_nodes := GetNodes} = Data,
+    case is_all_nodes_migrating(Name, GetNodes()) of
+        true ->
+            ?tp(info, emqx_acl_mnesia_migrator_check_old_table, #{}),
+            {next_state, checking_old_table, Data, [{next_event, internal, check_old_table}]};
+        {false, Reason, Nodes} ->
+            ?tp(info,
+                emqx_acl_mnesia_migrator_bad_nodes_delay,
+                #{delay => CheckNodesInterval,
+                  reason => Reason,
+                  name => Name,
+                  nodes => Nodes}),
+            {keep_state_and_data, [{state_timeout, CheckNodesInterval, check_nodes}]}
+    end.
+
+checking_old_table(internal, check_old_table, Data) ->
+    case is_old_table_migrated() of
+        true ->
+            ?tp(info, emqx_acl_mnesia_migrator_finish, #{}),
+            {next_state, finished, Data, [{hibernate, true}]};
+        false ->
+            ?tp(info, emqx_acl_mnesia_migrator_start_migration, #{}),
+            {next_state, migrating, Data, [{next_event, internal, start_migration}]}
+    end.
+
+migrating(internal, start_migration, Data) ->
+    ok = migrate_records(),
+    {next_state, checking_old_table, Data, [{next_event, internal, check_old_table}]}.
+
+%% @doc Returns `true` if migration is started in the local node, otherwise crash.
+-spec(is_migrating_on_node(atom()) -> true).
+is_migrating_on_node(Name) ->
+    true = is_pid(erlang:whereis(Name)).
+
+%% @doc Run migration of records
+-spec(migrate_records() -> ok).
+migrate_records() ->
+    ok = add_migration_mark(),
+    Key = peek_record(),
+    do_migrate_records(Key).
+
+%% @doc Run migration of records
+-spec(is_all_nodes_migrating(atom(), list(node())) -> true | {false, migration_delay_reason(), list(node())}).
+is_all_nodes_migrating(Name, Nodes) ->
+    case rpc:multicall(Nodes, ?MODULE, is_migrating_on_node, [Name]) of
+        {Results, []} ->
+            OldNodes = [ Node || {Node, Result} <- lists:zip(Nodes, Results), Result =/= true ],
+            case OldNodes of
+                [] -> true;
+                _ -> {false, old_nodes, OldNodes}
+            end;
+        {_, [_BadNode | _] = BadNodes} ->
+            {false, bad_nodes, BadNodes}
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+all_nodes() ->
+    ekka_mnesia:cluster_nodes(all).
+
+is_old_table_migrated() ->
+    Result =
+        mnesia:transaction(fun() ->
+                              case mnesia:first(?ACL_TABLE) of
+                                  ?MIGRATION_MARK_KEY ->
+                                      case mnesia:next(?ACL_TABLE, ?MIGRATION_MARK_KEY) of
+                                          '$end_of_table' -> true;
+                                          _OtherKey -> false
+                                      end;
+                                  '$end_of_table' -> false;
+                                  _OtherKey -> false
+                              end
+                           end),
+    case Result of
+        {atomic, true} ->
+            true;
+        _ ->
+            false
+    end.
+
+add_migration_mark() ->
+    {atomic, ok} = mnesia:transaction(fun() -> mnesia:write(?MIGRATION_MARK_RECORD) end),
+    ok.
+
+peek_record() ->
+    Key = mnesia:dirty_first(?ACL_TABLE),
+    case Key of
+        ?MIGRATION_MARK_KEY ->
+            mnesia:dirty_next(?ACL_TABLE, Key);
+        _ -> Key
+    end.
+
+do_migrate_records('$end_of_table') -> ok;
+do_migrate_records({_Login, _Topic} = Key) ->
+    ?tp(emqx_acl_mnesia_migrator_record_selected, #{key => Key}),
+    _ = mnesia:transaction(fun migrate_one_record/1, [Key]),
+    do_migrate_records(peek_record()).
+
+migrate_one_record({Login, _Topic} = Key) ->
+    case mnesia:wread({?ACL_TABLE, Key}) of
+        [] ->
+            ?tp(emqx_acl_mnesia_migrator_record_missed, #{key => Key}),
+            record_missing;
+        OldRecs ->
+            Acls = mnesia:wread({?ACL_TABLE2, Login}),
+            UpdatedAcl = emqx_acl_mnesia_db:merge_acl_records(Login, OldRecs, Acls),
+            ok = mnesia:write(UpdatedAcl),
+            ok = mnesia:delete({?ACL_TABLE, Key}),
+            ?tp(emqx_acl_mnesia_migrator_record_migrated, #{key => Key})
+    end.

+ 1 - 1
apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src

@@ -1,6 +1,6 @@
 {application, emqx_auth_mnesia,
  [{description, "EMQ X Authentication with Mnesia"},
-  {vsn, "4.3.3"}, % strict semver, bump manually
+  {vsn, "4.3.4"}, % strict semver, bump manually
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,mnesia]},

+ 29 - 20
apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src

@@ -1,22 +1,31 @@
 %% -*- mode: erlang -*-
 {VSN,
-  [{"4.3.2",
-    [{load_module,emqx_acl_mnesia_api,brutal_purge,soft_purge,[]},
-     {load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]}]},
-   {"4.3.1",
-    [{load_module,emqx_acl_mnesia_api,brutal_purge,soft_purge,[]},
-     {load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]}]},
-   {"4.3.0",
-    [{load_module,emqx_acl_mnesia_api,brutal_purge,soft_purge,[]},
-     {load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]}]},
-   {<<".*">>,[]}],
-  [{"4.3.2",
-    [{load_module,emqx_acl_mnesia_api,brutal_purge,soft_purge,[]},
-     {load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]}]},
-   {"4.3.1",
-    [{load_module,emqx_acl_mnesia_api,brutal_purge,soft_purge,[]},
-     {load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]}]},
-   {"4.3.0",
-    [{load_module,emqx_acl_mnesia_api,brutal_purge,soft_purge,[]},
-     {load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]}]},
-   {<<".*">>,[]}]}.
+  [
+    {<<"4.3.[0-3]">>, [
+      {add_module,emqx_acl_mnesia_db},
+      {add_module,emqx_acl_mnesia_migrator, [emqx_acl_mnesia_db]},
+      {update, emqx_auth_mnesia_sup, supervisor},
+      {apply, {emqx_acl_mnesia_migrator, start_supervised, []}},
+      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]},
+      {load_module,emqx_acl_mnesia, brutal_purge,soft_purge,[]},
+      {load_module,emqx_acl_mnesia_api, brutal_purge,soft_purge,[]},
+      {load_module,emqx_acl_mnesia_cli, brutal_purge,soft_purge,[]}
+    ]},
+    {<<".*">>, [
+    ]}
+  ],
+  [
+    {<<"4.3.[0-3]">>, [
+      {apply, {emqx_acl_mnesia_migrator, stop_supervised, []}},
+      {update, emqx_auth_mnesia_sup, supervisor},
+      {load_module,emqx_acl_mnesia_cli, brutal_purge,soft_purge,[]},
+      {load_module,emqx_acl_mnesia_api, brutal_purge,soft_purge,[]},
+      {load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]},
+      {load_module,emqx_acl_mnesia, brutal_purge,soft_purge,[]},
+      {delete_module,emqx_acl_mnesia_migrator},
+      {delete_module,emqx_acl_mnesia_db}
+    ]},
+    {<<".*">>, [
+    ]}
+  ]
+}.

+ 9 - 18
apps/emqx_auth_mnesia/src/emqx_auth_mnesia_api.erl

@@ -23,7 +23,7 @@
 
 -import(proplists, [get_value/2]).
 -import(minirest,  [return/1]).
--export([paginate/5]).
+-export([paginate_qh/5]).
 
 -export([ list_clientid/2
         , lookup_clientid/2
@@ -212,9 +212,12 @@ delete_username(#{username := Username}, _) ->
 %% Paging Query
 %%------------------------------------------------------------------------------
 
-paginate(Tables, MatchSpec, Params, ComparingFun, RowFun) ->
-    Qh = query_handle(Tables, MatchSpec),
-    Count = count(Tables, MatchSpec),
+paginate(Table, MatchSpec, Params, ComparingFun, RowFun) ->
+    Qh = query_handle(Table, MatchSpec),
+    Count = count(Table, MatchSpec),
+    paginate_qh(Qh, Count, Params, ComparingFun, RowFun).
+
+paginate_qh(Qh, Count, Params, ComparingFun, RowFun) ->
     Page = page(Params),
     Limit = limit(Params),
     Cursor = qlc:cursor(Qh),
@@ -231,24 +234,12 @@ paginate(Tables, MatchSpec, Params, ComparingFun, RowFun) ->
 
 query_handle(Table, MatchSpec) when is_atom(Table) ->
     Options = {traverse, {select, MatchSpec}},
-    qlc:q([R|| R <- ets:table(Table, Options)]);
-query_handle([Table], MatchSpec) when is_atom(Table) ->
-    Options = {traverse, {select, MatchSpec}},
-    qlc:q([R|| R <- ets:table(Table, Options)]);
-query_handle(Tables, MatchSpec) ->
-    Options = {traverse, {select, MatchSpec}},
-    qlc:append([qlc:q([E || E <- ets:table(T, Options)]) || T <- Tables]).
+    qlc:q([R || R <- ets:table(Table, Options)]).
 
 count(Table, MatchSpec) when is_atom(Table) ->
     [{MatchPattern, Where, _Re}] = MatchSpec,
     NMatchSpec = [{MatchPattern, Where, [true]}],
-    ets:select_count(Table, NMatchSpec);
-count([Table], MatchSpec) when is_atom(Table) ->
-    [{MatchPattern, Where, _Re}] = MatchSpec,
-    NMatchSpec = [{MatchPattern, Where, [true]}],
-    ets:select_count(Table, NMatchSpec);
-count(Tables, MatchSpec) ->
-    lists:sum([count(T, MatchSpec) || T <- Tables]).
+    ets:select_count(Table, NMatchSpec).
 
 page(Params) ->
     binary_to_integer(proplists:get_value(<<"_page">>, Params, <<"1">>)).

+ 13 - 1
apps/emqx_auth_mnesia/src/emqx_auth_mnesia_sup.erl

@@ -33,4 +33,16 @@ start_link() ->
 %%--------------------------------------------------------------------
 
 init([]) ->
-    {ok, {{one_for_one, 10, 100}, []}}.
+    {ok, {{one_for_one, 10, 100}, [
+        child_spec(emqx_acl_mnesia_migrator, worker, [])
+    ]}}.
+
+child_spec(M, worker, Args) ->
+    #{id       => M,
+      start    => {M, start_link, Args},
+      restart  => permanent,
+      shutdown => 5000,
+      type     => worker,
+      modules  => [M]
+     }.
+

+ 188 - 42
apps/emqx_auth_mnesia/test/emqx_acl_mnesia_SUITE.erl

@@ -22,6 +22,7 @@
 -include("emqx_auth_mnesia.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -import(emqx_ct_http, [ request_api/3
                       , request_api/5
@@ -39,10 +40,15 @@ all() ->
     emqx_ct:all(?MODULE).
 
 groups() ->
-    [].
+    [{async_migration_tests, [sequence], [
+        t_old_and_new_acl_migration_by_migrator,
+        t_old_and_new_acl_migration_repeated_by_migrator,
+        t_migration_concurrency
+    ]}].
 
 init_per_suite(Config) ->
     emqx_ct_helpers:start_apps([emqx_modules, emqx_management, emqx_auth_mnesia], fun set_special_configs/1),
+    supervisor:terminate_child(emqx_auth_mnesia_sup, emqx_acl_mnesia_migrator),
     create_default_app(),
     Config.
 
@@ -50,14 +56,32 @@ end_per_suite(_Config) ->
     delete_default_app(),
     emqx_ct_helpers:stop_apps([emqx_modules, emqx_management, emqx_auth_mnesia]).
 
-init_per_testcase(t_check_acl_as_clientid, Config) ->
+init_per_testcase_clean(_, Config) ->
+    mnesia:clear_table(?ACL_TABLE),
+    mnesia:clear_table(?ACL_TABLE2),
+    Config.
+
+init_per_testcase_emqx_hook(t_check_acl_as_clientid, Config) ->
     emqx:hook('client.check_acl', fun emqx_acl_mnesia:check_acl/5, [#{key_as => clientid}]),
     Config;
-
-init_per_testcase(_, Config) ->
+init_per_testcase_emqx_hook(_, Config) ->
     emqx:hook('client.check_acl', fun emqx_acl_mnesia:check_acl/5, [#{key_as => username}]),
     Config.
 
+init_per_testcase_migration(t_management_before_migration, Config) ->
+    Config;
+init_per_testcase_migration(_, Config) ->
+    emqx_acl_mnesia_migrator:migrate_records(),
+    Config.
+
+init_per_testcase(Case, Config) ->
+    PerTestInitializers = [
+        fun init_per_testcase_clean/2,
+        fun init_per_testcase_migration/2,
+        fun init_per_testcase_emqx_hook/2
+    ],
+    lists:foldl(fun(Init, Conf) -> Init(Case, Conf) end, Config, PerTestInitializers).
+
 end_per_testcase(_, Config) ->
     emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5),
     Config.
@@ -76,25 +100,34 @@ set_special_configs(_App) ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_management(_Config) ->
-    clean_all_acls(),
+t_management_before_migration(_Config) ->
+    {atomic, IsStarted} = mnesia:transaction(fun emqx_acl_mnesia_db:is_migration_started/0),
+    ?assertNot(IsStarted),
+    run_acl_tests().
+
+t_management_after_migration(_Config) ->
+    {atomic, IsStarted} = mnesia:transaction(fun emqx_acl_mnesia_db:is_migration_started/0),
+    ?assert(IsStarted),
+    run_acl_tests().
+
+run_acl_tests() ->
     ?assertEqual("Acl with Mnesia", emqx_acl_mnesia:description()),
-    ?assertEqual([], emqx_acl_mnesia_cli:all_acls()),
+    ?assertEqual([], emqx_acl_mnesia_db:all_acls()),
 
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>, sub, allow),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/+">>, pub, deny),
-    ok = emqx_acl_mnesia_cli:add_acl({username, <<"test_username">>}, <<"topic/%u">>, sub, deny),
-    ok = emqx_acl_mnesia_cli:add_acl({username, <<"test_username">>}, <<"topic/+">>, pub, allow),
-    ok = emqx_acl_mnesia_cli:add_acl(all, <<"#">>, pubsub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>, sub, allow),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/+">>, pub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({username, <<"test_username">>}, <<"topic/%u">>, sub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({username, <<"test_username">>}, <<"topic/+">>, pub, allow),
+    ok = emqx_acl_mnesia_db:add_acl(all, <<"#">>, pubsub, deny),
     %% Sleeps below are needed to hide the race condition between
     %% mnesia and ets dirty select in check_acl, that make this test
     %% flaky
     timer:sleep(100),
 
-    ?assertEqual(2, length(emqx_acl_mnesia_cli:lookup_acl({clientid, <<"test_clientid">>}))),
-    ?assertEqual(2, length(emqx_acl_mnesia_cli:lookup_acl({username, <<"test_username">>}))),
-    ?assertEqual(2, length(emqx_acl_mnesia_cli:lookup_acl(all))),
-    ?assertEqual(6, length(emqx_acl_mnesia_cli:all_acls())),
+    ?assertEqual(2, length(emqx_acl_mnesia_db:lookup_acl({clientid, <<"test_clientid">>}))),
+    ?assertEqual(2, length(emqx_acl_mnesia_db:lookup_acl({username, <<"test_username">>}))),
+    ?assertEqual(2, length(emqx_acl_mnesia_db:lookup_acl(all))),
+    ?assertEqual(6, length(emqx_acl_mnesia_db:all_acls())),
 
     User1 = #{zone => external, clientid => <<"test_clientid">>},
     User2 = #{zone => external, clientid => <<"no_exist">>, username => <<"test_username">>},
@@ -110,30 +143,30 @@ t_management(_Config) ->
     deny  = emqx_access_control:check_acl(User3, publish,   <<"topic/A/B">>),
 
     %% Test merging of pubsub capability:
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pubsub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pubsub, deny),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     deny  = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, allow),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, allow),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     allow = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pubsub, allow),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pubsub, allow),
     timer:sleep(100),
     allow = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     allow = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, sub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, sub, deny),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     allow = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, deny),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     deny  = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
 
     %% Test implicit migration of pubsub to pub and sub:
-    ok = emqx_acl_mnesia_cli:remove_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>),
-    ok = mnesia:dirty_write(#emqx_acl{
+    ok = emqx_acl_mnesia_db:remove_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>),
+    ok = mnesia:dirty_write(#?ACL_TABLE{
                                filter = {{clientid, <<"test_clientid">>}, <<"topic/mix">>},
                                action = pubsub,
                                access = allow,
@@ -142,24 +175,130 @@ t_management(_Config) ->
     timer:sleep(100),
     allow = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     allow = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, pub, deny),
     timer:sleep(100),
     allow = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     deny  = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, sub, deny),
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>, sub, deny),
     timer:sleep(100),
     deny  = emqx_access_control:check_acl(User1, subscribe,   <<"topic/mix">>),
     deny  = emqx_access_control:check_acl(User1, publish,     <<"topic/mix">>),
 
-    ok = emqx_acl_mnesia_cli:remove_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>),
-    ok = emqx_acl_mnesia_cli:remove_acl({clientid, <<"test_clientid">>}, <<"topic/+">>),
-    ok = emqx_acl_mnesia_cli:remove_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>),
-    ok = emqx_acl_mnesia_cli:remove_acl({username, <<"test_username">>}, <<"topic/%u">>),
-    ok = emqx_acl_mnesia_cli:remove_acl({username, <<"test_username">>}, <<"topic/+">>),
-    ok = emqx_acl_mnesia_cli:remove_acl(all, <<"#">>),
+    ok = emqx_acl_mnesia_db:remove_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>),
+    ok = emqx_acl_mnesia_db:remove_acl({clientid, <<"test_clientid">>}, <<"topic/+">>),
+    ok = emqx_acl_mnesia_db:remove_acl({clientid, <<"test_clientid">>}, <<"topic/mix">>),
+    ok = emqx_acl_mnesia_db:remove_acl({username, <<"test_username">>}, <<"topic/%u">>),
+    ok = emqx_acl_mnesia_db:remove_acl({username, <<"test_username">>}, <<"topic/+">>),
+    ok = emqx_acl_mnesia_db:remove_acl(all, <<"#">>),
     timer:sleep(100),
 
-    ?assertEqual([], emqx_acl_mnesia_cli:all_acls()).
+    ?assertEqual([], emqx_acl_mnesia_db:all_acls()).
+
+t_old_and_new_acl_combination(_Config) ->
+    create_conflicting_records(),
+
+    ?assertEqual(combined_conflicting_records(), emqx_acl_mnesia_db:all_acls()),
+    ?assertEqual(
+        lists:usort(combined_conflicting_records()),
+        lists:usort(emqx_acl_mnesia_db:all_acls_export())).
+
+t_old_and_new_acl_migration(_Config) ->
+    create_conflicting_records(),
+    emqx_acl_mnesia_migrator:migrate_records(),
+
+    ?assertEqual(combined_conflicting_records(), emqx_acl_mnesia_db:all_acls()),
+    ?assertEqual(
+        lists:usort(combined_conflicting_records()),
+        lists:usort(emqx_acl_mnesia_db:all_acls_export())),
+
+    % check that old table is not popoulated anymore
+    ok = emqx_acl_mnesia_db:add_acl({clientid, <<"test_clientid">>}, <<"topic/%c">>, sub, allow),
+    ?assert(emqx_acl_mnesia_migrator:is_old_table_migrated()).
+
+
+t_migration_concurrency(_Config) ->
+    Key = {{clientid,<<"client6">>}, <<"t">>},
+    Record = #?ACL_TABLE{filter = Key, action = pubsub, access = deny, created_at = 0},
+    {atomic, ok} = mnesia:transaction(fun mnesia:write/1, [Record]),
+
+    LockWaitAndDelete =
+        fun() ->
+           [_Rec] = mnesia:wread({?ACL_TABLE, Key}),
+           {{Pid, Ref}, _} =
+               ?wait_async_action(spawn_monitor(fun emqx_acl_mnesia_migrator:migrate_records/0),
+                                  #{?snk_kind := emqx_acl_mnesia_migrator_record_selected},
+                                  1000),
+           mnesia:delete({?ACL_TABLE, Key}),
+           {Pid, Ref}
+        end,
+
+    ?check_trace(
+        begin
+            {atomic, {Pid, Ref}} = mnesia:transaction(LockWaitAndDelete),
+            receive {'DOWN', Ref, process, Pid, _} -> ok end
+        end,
+        fun(_, Trace) ->
+            ?assertMatch([_], ?of_kind(emqx_acl_mnesia_migrator_record_missed, Trace))
+        end),
+
+    ?assert(emqx_acl_mnesia_migrator:is_old_table_migrated()),
+    ?assertEqual([], emqx_acl_mnesia_db:all_acls()).
+
+
+t_old_and_new_acl_migration_by_migrator(_Config) ->
+    create_conflicting_records(),
+
+    meck:new(fake_nodes, [non_strict]),
+    meck:expect(fake_nodes, all, fun() -> [node(), 'somebadnode@127.0.0.1'] end),
+
+    ?check_trace(
+        begin
+            % check all nodes every 30 ms
+            {ok, _} = emqx_acl_mnesia_migrator:start_link(#{
+                name => ct_migrator,
+                check_nodes_interval => 30,
+                get_nodes => fun fake_nodes:all/0
+            }),
+            timer:sleep(100)
+        end,
+        fun(_, Trace) ->
+            ?assertEqual([], ?of_kind(emqx_acl_mnesia_migrator_start_migration, Trace))
+        end),
+
+    ?check_trace(
+        begin
+            meck:expect(fake_nodes, all, fun() -> [node()] end),
+            timer:sleep(100)
+        end,
+        fun(_, Trace) ->
+            ?assertMatch([_], ?of_kind(emqx_acl_mnesia_migrator_finish, Trace))
+        end),
+
+    meck:unload(fake_nodes),
+
+    ?assertEqual(combined_conflicting_records(), emqx_acl_mnesia_db:all_acls()),
+    ?assert(emqx_acl_mnesia_migrator:is_old_table_migrated()).
+
+t_old_and_new_acl_migration_repeated_by_migrator(_Config) ->
+    create_conflicting_records(),
+    emqx_acl_mnesia_migrator:migrate_records(),
+
+    ?check_trace(
+        begin
+            {ok, _} = emqx_acl_mnesia_migrator:start_link(ct_migrator),
+            timer:sleep(100)
+        end,
+        fun(_, Trace) ->
+            ?assertEqual([], ?of_kind(emqx_acl_mnesia_migrator_start_migration, Trace)),
+            ?assertMatch([_], ?of_kind(emqx_acl_mnesia_migrator_finish, Trace))
+        end).
+
+t_start_stop_supervised(_Config) ->
+    ?assertEqual(undefined, whereis(emqx_acl_mnesia_migrator)),
+    ok = emqx_acl_mnesia_migrator:start_supervised(),
+    ?assert(is_pid(whereis(emqx_acl_mnesia_migrator))),
+    ok = emqx_acl_mnesia_migrator:stop_supervised(),
+    ?assertEqual(undefined, whereis(emqx_acl_mnesia_migrator)).
 
 t_acl_cli(_Config) ->
     meck:new(emqx_ctl, [non_strict, passthrough]),
@@ -168,8 +307,6 @@ t_acl_cli(_Config) ->
     meck:expect(emqx_ctl, usage, fun(Usages) -> emqx_ctl:format_usage(Usages) end),
     meck:expect(emqx_ctl, usage, fun(Cmd, Descr) -> emqx_ctl:format_usage(Cmd, Descr) end),
 
-    clean_all_acls(),
-
     ?assertEqual(0, length(emqx_acl_mnesia_cli:cli(["list"]))),
 
     emqx_acl_mnesia_cli:cli(["add", "clientid", "test_clientid", "topic/A", "pub", "deny"]),
@@ -202,8 +339,6 @@ t_acl_cli(_Config) ->
     meck:unload(emqx_ctl).
 
 t_rest_api(_Config) ->
-    clean_all_acls(),
-
     Params1 = [#{<<"clientid">> => <<"test_clientid">>,
                  <<"topic">> => <<"topic/A">>,
                  <<"action">> => <<"pub">>,
@@ -273,13 +408,24 @@ t_rest_api(_Config) ->
     {ok, Res3} = request_http_rest_list(["$all"]),
     ?assertMatch([], get_http_data(Res3)).
 
-%%------------------------------------------------------------------------------
-%% Helpers
-%%------------------------------------------------------------------------------
 
-clean_all_acls() ->
-    [ mnesia:dirty_delete({emqx_acl, Login})
-      || Login <- mnesia:dirty_all_keys(emqx_acl)].
+create_conflicting_records() ->
+    Records = [
+        #?ACL_TABLE{filter = {{clientid,<<"client6">>}, <<"t">>}, action = pubsub, access = deny, created_at = 0},
+        #?ACL_TABLE{filter = {{clientid,<<"client5">>}, <<"t">>}, action = pubsub, access = deny, created_at = 1},
+        #?ACL_TABLE2{who = {clientid,<<"client5">>}, rules = [{allow, sub, <<"t">>, 2}]}
+    ],
+    mnesia:transaction(fun() -> lists:foreach(fun mnesia:write/1, Records) end).
+
+
+combined_conflicting_records() ->
+    % pubsub's are split, ACL_TABLE2 rules shadow ACL_TABLE rules
+    [
+        {{clientid,<<"client5">>},<<"t">>,sub,allow,2},
+        {{clientid,<<"client5">>},<<"t">>,pub,deny,1},
+        {{clientid,<<"client6">>},<<"t">>,sub,deny,0},
+        {{clientid,<<"client6">>},<<"t">>,pub,deny,0}
+    ].
 
 %%--------------------------------------------------------------------
 %% HTTP Request

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -1,6 +1,6 @@
 {application, emqx_management,
  [{description, "EMQ X Management API and CLI"},
-  {vsn, "4.3.7"}, % strict semver, bump manually!
+  {vsn, "4.3.8"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_management_sup]},
   {applications, [kernel,stdlib,minirest]},

+ 10 - 11
apps/emqx_management/src/emqx_mgmt_data_backup.erl

@@ -118,18 +118,18 @@ export_auth_mnesia() ->
     end.
 
 export_acl_mnesia() ->
-    case ets:info(emqx_acl) of
+    case ets:info(emqx_acl2) of
         undefined -> [];
         _ ->
-            lists:map(fun({_, Filter, Action, Access, CreatedAt}) ->
-                          Filter1 = case Filter of
-                              {{Type, TypeValue}, Topic} ->
+            lists:map(fun({Login, Topic, Action, Access, CreatedAt}) ->
+                          Filter1 = case Login of
+                              {Type, TypeValue} ->
                                   [{type, Type}, {type_value, TypeValue}, {topic, Topic}];
-                              {Type, Topic} ->
+                              Type ->
                                   [{type, Type}, {topic, Topic}]
                           end,
                           Filter1 ++ [{action, Action}, {access, Access}, {created_at, CreatedAt}]
-                      end, ets:tab2list(emqx_acl))
+                      end, emqx_acl_mnesia_db:all_acls_export())
     end.
 
 -ifdef(EMQX_ENTERPRISE).
@@ -473,10 +473,9 @@ do_import_auth_mnesia(Auths) ->
     end.
 
 do_import_acl_mnesia_by_old_data(Acls) ->
-    case ets:info(emqx_acl) of
+    case ets:info(emqx_acl2) of
         undefined -> ok;
         _ ->
-            CreatedAt = erlang:system_time(millisecond),
             lists:foreach(fun(#{<<"login">> := Login,
                                 <<"topic">> := Topic,
                                 <<"allow">> := Allow,
@@ -485,11 +484,11 @@ do_import_acl_mnesia_by_old_data(Acls) ->
                                          true -> allow;
                                          false -> deny
                                      end,
-                            mnesia:dirty_write({emqx_acl, {{get_old_type(), Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt})
+                            emqx_acl_mnesia_db:add_acl({get_old_type(), Login}, Topic, any_to_atom(Action), Allow1)
                           end, Acls)
     end.
 do_import_acl_mnesia(Acls) ->
-    case ets:info(emqx_acl) of
+    case ets:info(emqx_acl2) of
         undefined -> ok;
         _ ->
             lists:foreach(fun(Map = #{<<"action">> := Action,
@@ -501,7 +500,7 @@ do_import_acl_mnesia(Acls) ->
                                 Value ->
                                     {any_to_atom(maps:get(<<"type">>, Map)), Value}
                             end,
-                            emqx_acl_mnesia_cli:add_acl(Login, Topic, any_to_atom(Action), any_to_atom(Access))
+                            emqx_acl_mnesia_db:add_acl(Login, Topic, any_to_atom(Action), any_to_atom(Access))
                           end, Acls)
     end.
 

+ 41 - 15
apps/emqx_management/test/emqx_auth_mnesia_migration_SUITE.erl

@@ -30,7 +30,7 @@ matrix() ->
                           , Version <- ["v4.2.10", "v4.1.5"]].
 
 all() ->
-    [t_import_4_0, t_import_4_1, t_import_4_2].
+    [t_import_4_0, t_import_4_1, t_import_4_2, t_export_import].
 
 groups() ->
     [{username, [], cases()}, {clientid, [], cases()}].
@@ -52,7 +52,8 @@ init_per_testcase(_, Config) ->
     Config.
 
 end_per_testcase(_, _Config) ->
-    {atomic,ok} = mnesia:clear_table(emqx_acl),
+    {atomic,ok} = mnesia:clear_table(?ACL_TABLE),
+    {atomic,ok} = mnesia:clear_table(?ACL_TABLE2),
     {atomic,ok} = mnesia:clear_table(emqx_user),
     ok.
 -ifdef(EMQX_ENTERPRISE).
@@ -138,25 +139,50 @@ t_import_4_2(Config) ->
     test_import(clientid, {<<"client_for_test">>, <<"public">>}),
     test_import(username, {<<"user_for_test">>, <<"public">>}),
 
-    ?assertMatch([#emqx_acl{
-                     filter = {{Type,<<"emqx_c">>}, <<"Topic/A">>},
-                     action = pub,
-                     access = allow
-                    },
-                  #emqx_acl{
-                     filter = {{Type,<<"emqx_c">>}, <<"Topic/A">>},
-                     action = sub,
-                     access = allow
-                    }],
-                 lists:sort(ets:tab2list(emqx_acl))).
+    ?assertMatch([
+            {{username, <<"emqx_c">>}, <<"Topic/A">>, pub, allow, _},
+            {{username, <<"emqx_c">>}, <<"Topic/A">>, sub, allow, _}
+        ],
+        lists:sort(emqx_acl_mnesia_db:all_acls())).
 -endif.
 
+t_export_import(_Config) ->
+    emqx_acl_mnesia_migrator:migrate_records(),
+
+    Records = [
+        #?ACL_TABLE2{who = {clientid,<<"client1">>}, rules = [{allow, sub, <<"t1">>, 1}]},
+        #?ACL_TABLE2{who = {clientid,<<"client2">>}, rules = [{allow, pub, <<"t2">>, 2}]}
+    ],
+    mnesia:transaction(fun() -> lists:foreach(fun mnesia:write/1, Records) end),
+    timer:sleep(100),
+
+    AclData = emqx_json:encode(emqx_mgmt_data_backup:export_acl_mnesia()),
+
+    mnesia:transaction(fun() ->
+                          lists:foreach(fun(#?ACL_TABLE2{who = Who}) ->
+                                           mnesia:delete({?ACL_TABLE2, Who})
+                                        end,
+                                        Records)
+                       end),
+
+    ?assertEqual([], emqx_acl_mnesia_db:all_acls()),
+
+    emqx_mgmt_data_backup:import_acl_mnesia(emqx_json:decode(AclData, [return_maps]), "4.3"),
+    timer:sleep(100),
+
+    ?assertMatch([
+        {{clientid, <<"client1">>}, <<"t1">>, sub, allow, _},
+        {{clientid, <<"client2">>}, <<"t2">>, pub, allow, _}
+    ], lists:sort(emqx_acl_mnesia_db:all_acls())).
+
 do_import(File, Config) ->
     do_import(File, Config, "{}").
 
 do_import(File, Config, Overrides) ->
-    mnesia:clear_table(emqx_acl),
+    mnesia:clear_table(?ACL_TABLE),
+    mnesia:clear_table(?ACL_TABLE2),
     mnesia:clear_table(emqx_user),
+    emqx_acl_mnesia_migrator:migrate_records(),
     Filename = filename:join(proplists:get_value(data_dir, Config), File),
     emqx_mgmt_data_backup:import(Filename, Overrides).
 
@@ -172,4 +198,4 @@ test_import(clientid, {ClientID, Password}) ->
     Req = #{clientid => ClientID,
             password => Password},
     ?assertMatch({stop, #{auth_result := success}},
-                 emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})).
+                 emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})).

+ 1 - 1
rebar.config

@@ -55,7 +55,7 @@
     , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
     , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
     , {getopt, "1.0.1"}
-    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
+    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}}
     ]}.
 
 {xref_ignores,