Bladeren bron

Merge pull request #5532 from zhongwencool/master

feat(emqx_cluster_call): ensure the consistency of resources
Zaiming (Stone) Shi 4 jaren geleden
bovenliggende
commit
c64af6a78c
27 gewijzigde bestanden met toevoegingen van 765 en 91 verwijderingen
  1. 2 2
      .github/workflows/run_test_cases.yaml
  2. 4 1
      apps/emqx/etc/emqx.conf
  3. 4 4
      apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
  4. 2 2
      apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl
  5. 3 3
      apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl
  6. 3 3
      apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl
  7. 3 3
      apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl
  8. 1 2
      apps/emqx_authz/src/emqx_authz.erl
  9. 2 2
      apps/emqx_data_bridge/src/emqx_data_bridge_api.erl
  10. 1 1
      apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl
  11. 23 0
      apps/emqx_machine/etc/emqx_machine.conf
  12. 35 0
      apps/emqx_machine/include/emqx_cluster_rpc.hrl
  13. 298 0
      apps/emqx_machine/src/emqx_cluster_rpc.erl
  14. 89 0
      apps/emqx_machine/src/emqx_cluster_rpc_handler.erl
  15. 8 0
      apps/emqx_machine/src/emqx_machine_schema.erl
  16. 3 1
      apps/emqx_machine/src/emqx_machine_sup.erl
  17. 240 0
      apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl
  18. 1 27
      apps/emqx_resource/include/emqx_resource_utils.hrl
  19. 13 7
      apps/emqx_resource/src/emqx_resource.erl
  20. 1 1
      apps/emqx_resource/src/emqx_resource_instance.erl
  21. 2 2
      apps/emqx_retainer/src/emqx_retainer.erl
  22. 0 16
      apps/emqx_rule_engine/include/rule_engine.hrl
  23. 9 9
      apps/emqx_rule_engine/src/emqx_rule_engine.erl
  24. 5 4
      apps/emqx_rule_engine/src/emqx_rule_registry.erl
  25. 6 0
      apps/emqx_rule_engine/src/emqx_rule_utils.erl
  26. 5 1
      apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
  27. 2 0
      apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl

+ 2 - 2
.github/workflows/run_test_cases.yaml

@@ -98,8 +98,8 @@ jobs:
         - name: run cover
           run: |
             printenv > .env
-            docker exec -i ${{ matrix.otp_release }} bash -c "make cover"
-            docker exec --env-file .env -i ${{ matrix.otp_release }} bash -c "make coveralls"
+            docker exec -i ${{ matrix.otp_release }} bash -c "DIAGNOSTIC=1 make cover"
+            docker exec --env-file .env -i ${{ matrix.otp_release }} bash -c "DIAGNOSTIC=1 make coveralls"
         - name: cat rebar.crashdump
           if: failure()
           run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi

+ 4 - 1
apps/emqx/etc/emqx.conf

@@ -222,6 +222,8 @@ listeners.quic.default {
   ## If not set, the global configs are used for this listener.
   ##
   ## See `zones.<name>` for more details.
+  ## NOTE: This is a cluster-wide configuration.
+  ## It requires all nodes to be stopped before changing it.
   ##
   ## @doc listeners.quic.<name>.zone
   ## ValueType: String
@@ -490,6 +492,7 @@ listeners.wss.default {
   ## Websocket options
   ## See ${example_common_websocket_options} for more information
   websocket.idle_timeout = 86400s
+
 }
 
 ## Enable per connection statistics.
@@ -1071,7 +1074,7 @@ broker {
   ## are mostly published to topics with large number of levels.
   ##
   ## NOTE: This is a cluster-wide configuration.
-  ## It rquires all nodes to be stopped before changing it.
+  ## It requires all nodes to be stopped before changing it.
   ##
   ## @doc broker.perf.trie_compaction
   ## ValueType: Boolean

+ 4 - 4
apps/emqx_authn/src/simple_authn/emqx_authn_http.erl

@@ -89,7 +89,7 @@ headers(_) -> undefined.
 headers_no_content_type(type) -> map();
 headers_no_content_type(converter) ->
     fun(Headers) ->
-       maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) 
+       maps:merge(default_headers_no_content_type(), transform_header_name(Headers))
     end;
 headers_no_content_type(default) -> default_headers_no_content_type();
 headers_no_content_type(_) -> undefined.
@@ -129,9 +129,9 @@ create(#{ method := Method
                                     emqx_connector_http,
                                     Config#{base_url => maps:remove(query, URIMap),
                                             pool_type => random}) of
-        {ok, _} ->
+        {ok, already_created} ->
             {ok, State};
-        {error, already_created} ->
+        {ok, _} ->
             {ok, State};
         {error, Reason} ->
             {error, Reason}
@@ -296,4 +296,4 @@ parse_body(<<"application/json">>, Body) ->
 parse_body(<<"application/x-www-form-urlencoded">>, Body) ->
     {ok, maps:from_list(cow_qs:parse_qs(Body))};
 parse_body(ContentType, _) ->
-    {error, {unsupported_content_type, ContentType}}.
+    {error, {unsupported_content_type, ContentType}}.

+ 2 - 2
apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl

@@ -106,9 +106,9 @@ create(#{ selector := Selector
                       , '_unique'], Config),
     NState = State#{selector => NSelector},
     case emqx_resource:create_local(Unique, emqx_connector_mongo, Config) of
-        {ok, _} ->
+        {ok, already_created} ->
             {ok, NState};
-        {error, already_created} ->
+        {ok, _} ->
             {ok, NState};
         {error, Reason} ->
             {error, Reason}

+ 3 - 3
apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl

@@ -83,9 +83,9 @@ create(#{ password_hash_algorithm := Algorithm
               query_timeout => QueryTimeout,
               '_unique' => Unique},
     case emqx_resource:create_local(Unique, emqx_connector_mysql, Config) of
-        {ok, _} ->
+        {ok, already_created} ->
             {ok, State};
-        {error, already_created} ->
+        {ok, _} ->
             {ok, State};
         {error, Reason} ->
             {error, Reason}
@@ -131,7 +131,7 @@ authenticate(#{password := Password} = Credential,
 destroy(#{'_unique' := Unique}) ->
     _ = emqx_resource:remove_local(Unique),
     ok.
-    
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------

+ 3 - 3
apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl

@@ -71,9 +71,9 @@ create(#{ query := Query0
               salt_position => SaltPosition,
               '_unique' => Unique},
     case emqx_resource:create_local(Unique, emqx_connector_pgsql, Config) of
-        {ok, _} ->
+        {ok, already_created} ->
             {ok, State};
-        {error, already_created} ->
+        {ok, _} ->
             {ok, State};
         {error, Reason} ->
             {error, Reason}
@@ -119,7 +119,7 @@ authenticate(#{password := Password} = Credential,
 destroy(#{'_unique' := Unique}) ->
     _ = emqx_resource:remove_local(Unique),
     ok.
-    
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------

+ 3 - 3
apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl

@@ -89,9 +89,9 @@ create(#{ query := Query
                           , '_unique'], Config),
         NState = State#{query => NQuery},
         case emqx_resource:create_local(Unique, emqx_connector_redis, Config) of
-            {ok, _} ->
+            {ok, already_created} ->
                 {ok, NState};
-            {error, already_created} ->
+            {ok, _} ->
                 {ok, NState};
             {error, Reason} ->
                 {error, Reason}
@@ -176,7 +176,7 @@ check_fields(["superuser" | More], HasPassHash) ->
     check_fields(More, HasPassHash);
 check_fields([Field | _], _) ->
     error({unsupported_field, Field}).
-    
+
 parse_key(Key) ->
     Tokens = re:split(Key, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]),
     parse_key(Tokens, []).

+ 1 - 2
apps/emqx_authz/src/emqx_authz.erl

@@ -217,7 +217,6 @@ create_resource(#{type := DB,
             [])
     of
         {ok, _} -> ResourceID;
-        {error, already_created} -> ResourceID;
         {error, Reason} -> {error, Reason}
     end;
 create_resource(#{type := DB,
@@ -228,8 +227,8 @@ create_resource(#{type := DB,
             list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
             Config)
     of
+        {ok, already_created} -> ResourceID;
         {ok, _} -> ResourceID;
-        {error, already_created} -> ResourceID;
         {error, Reason} -> {error, Reason}
     end.
 

+ 2 - 2
apps/emqx_data_bridge/src/emqx_data_bridge_api.erl

@@ -77,10 +77,10 @@ create_bridge(#{name := Name}, Params) ->
     case emqx_resource:check_and_create(
             emqx_data_bridge:name_to_resource_id(Name),
             emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of
+        {ok, already_created} ->
+            {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
         {ok, Data} ->
             update_config_and_reply(Name, BridgeType, Config, Data);
-        {error, already_created} ->
-            {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
         {error, Reason0} ->
             Reason = emqx_resource_api:stringnify(Reason0),
             {500, #{code => 102, message => <<"create bridge ", Name/binary,

+ 1 - 1
apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl

@@ -73,8 +73,8 @@ load_bridge(#{name := Name, type := Type, config := Config}) ->
     case emqx_resource:create_local(
             emqx_data_bridge:name_to_resource_id(Name),
             emqx_data_bridge:resource_type(Type), Config) of
+        {ok, already_created} -> ok;
         {ok, _} -> ok;
-        {error, already_created} -> ok;
         {error, Reason} ->
             error({load_bridge, Reason})
     end.

+ 23 - 0
apps/emqx_machine/etc/emqx_machine.conf

@@ -89,6 +89,29 @@ node {
   ## Default: 23
   backtrace_depth = 23
 
+  cluster_call {
+    ## Time interval to retry after a failed call
+    ##
+    ## @doc node.cluster_call.retry_interval
+    ## ValueType: Duration
+    ## Default: 1s
+    retry_interval = 1s
+    ## Retain the maximum number of completed transactions (for queries)
+    ##
+    ## @doc node.cluster_call.max_history
+    ## ValueType: Integer
+    ## Range: [1, 500]
+    ## Default: 100
+    max_history = 100
+    ## Time interval to clear completed but stale transactions.
+    ## Ensure that the number of completed transactions is less than the max_history
+    ##
+    ## @doc node.cluster_call.cleanup_interval
+    ## ValueType: Duration
+    ## Default: 5m
+    cleanup_interval = 5m
+    }
+
 }
 
 ##==================================================================

+ 35 - 0
apps/emqx_machine/include/emqx_cluster_rpc.hrl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQ_X_CLUSTER_RPC_HRL).
+-define(EMQ_X_CLUSTER_RPC_HRL, true).
+
+-define(CLUSTER_MFA, cluster_rpc_mfa).
+-define(CLUSTER_COMMIT, cluster_rpc_commit).
+
+-record(cluster_rpc_mfa, {
+    tnx_id :: pos_integer(),
+    mfa :: mfa(),
+    created_at :: calendar:datetime(),
+    initiator :: node()
+}).
+
+-record(cluster_rpc_commit, {
+    node :: node(),
+    tnx_id :: pos_integer()
+}).
+
+-endif.

+ 298 - 0
apps/emqx_machine/src/emqx_cluster_rpc.erl

@@ -0,0 +1,298 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_rpc).
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0, mnesia/1]).
+-export([multicall/3, multicall/4, query/1, reset/0, status/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+    handle_continue/2, code_change/3]).
+
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-export([start_link/3]).
+-endif.
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include("emqx_cluster_rpc.hrl").
+
+-rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}).
+-rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}).
+
+-define(CATCH_UP, catch_up).
+-define(TIMEOUT, timer:minutes(1)).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+mnesia(boot) ->
+    ok = ekka_mnesia:create_table(?CLUSTER_MFA, [
+        {type, ordered_set},
+        {rlog_shard, ?COMMON_SHARD},
+        {disc_copies, [node()]},
+        {record_name, cluster_rpc_mfa},
+        {attributes, record_info(fields, cluster_rpc_mfa)}]),
+    ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [
+        {type, set},
+        {rlog_shard, ?COMMON_SHARD},
+        {disc_copies, [node()]},
+        {record_name, cluster_rpc_commit},
+        {attributes, record_info(fields, cluster_rpc_commit)}]);
+mnesia(copy) ->
+    ok = ekka_mnesia:copy_table(cluster_rpc_mfa, disc_copies),
+    ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies).
+
+start_link() ->
+    RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000),
+    start_link(node(), ?MODULE, RetryMs).
+
+start_link(Node, Name, RetryMs) ->
+    gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []).
+
+-spec multicall(Module, Function, Args) -> {ok, TnxId, term()} | {error, Reason} when
+    Module :: module(),
+    Function :: atom(),
+    Args :: [term()],
+    TnxId :: pos_integer(),
+    Reason :: string().
+multicall(M, F, A) ->
+    multicall(M, F, A, timer:minutes(2)).
+
+-spec multicall(Module, Function, Args, Timeout) -> {ok, TnxId, term()} |{error, Reason} when
+    Module :: module(),
+    Function :: atom(),
+    Args :: [term()],
+    TnxId :: pos_integer(),
+    Timeout :: timeout(),
+    Reason :: string().
+multicall(M, F, A, Timeout) ->
+    MFA = {initiate, {M, F, A}},
+    case ekka_rlog:role() of
+        core -> gen_server:call(?MODULE, MFA, Timeout);
+        replicant ->
+            %% the initiate transaction must happened on core node
+            %% make sure MFA(in the transaction) and the transaction on the same node
+            %% don't need rpc again inside transaction.
+            case ekka_rlog_status:upstream_node(?COMMON_SHARD) of
+                {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
+                disconnected -> {error, disconnected}
+            end
+    end.
+
+-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
+query(TnxId) ->
+    transaction(fun trans_query/1, [TnxId]).
+
+-spec reset() -> reset.
+reset() -> gen_server:call(?MODULE, reset).
+
+-spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}.
+status() ->
+    transaction(fun trans_status/0, []).
+
+%%%===================================================================
+%%% gen_statem callbacks
+%%%===================================================================
+
+%% @private
+init([Node, RetryMs]) ->
+    {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
+    {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}.
+
+%% @private
+handle_continue(?CATCH_UP, State) ->
+    {noreply, State, catch_up(State)}.
+
+handle_call(reset, _From, State) ->
+    _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT),
+    _ = ekka_mnesia:clear_table(?CLUSTER_MFA),
+    {reply, ok, State, {continue, ?CATCH_UP}};
+
+handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
+    case transaction(fun init_mfa/2, [Node, MFA]) of
+        {atomic, {ok, TnxId, Result}} ->
+            {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
+        {aborted, Reason} ->
+            {reply, {error, Reason}, State, {continue, ?CATCH_UP}}
+    end;
+handle_call(_, _From, State) ->
+    {reply, ok, State, catch_up(State)}.
+
+handle_cast(_, State) ->
+    {noreply, State, catch_up(State)}.
+
+handle_info({mnesia_table_event, _}, State) ->
+    {noreply, State, catch_up(State)};
+handle_info(_, State) ->
+    {noreply, State, catch_up(State)}.
+
+terminate(_Reason, _Data) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+catch_up(#{node := Node, retry_interval := RetryMs} = State) ->
+    case transaction(fun read_next_mfa/1, [Node]) of
+        {atomic, caught_up} -> ?TIMEOUT;
+        {atomic, {still_lagging, NextId, MFA}} ->
+            {Succeed, _} = apply_mfa(NextId, MFA),
+            case Succeed of
+                true ->
+                    case transaction(fun commit/2, [Node, NextId]) of
+                        {atomic, ok} -> catch_up(State);
+                        Error ->
+                            ?SLOG(error, #{
+                                msg => "failed to commit applied call",
+                                applied_id => NextId,
+                                error => Error}),
+                            RetryMs
+                    end;
+                false -> RetryMs
+            end;
+        {aborted, Reason} ->
+            ?SLOG(error, #{msg => "read_next_mfa transaction failed", error => Reason}),
+            RetryMs
+    end.
+
+read_next_mfa(Node) ->
+    NextId =
+        case mnesia:wread({?CLUSTER_COMMIT, Node}) of
+            [] ->
+                LatestId = get_latest_id(),
+                TnxId = max(LatestId - 1, 0),
+                commit(Node, TnxId),
+                ?SLOG(notice, #{
+                    msg => "New node first catch up and start commit.",
+                    node => Node, tnx_id => TnxId}),
+                TnxId;
+            [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1
+        end,
+    case mnesia:read(?CLUSTER_MFA, NextId) of
+        [] -> caught_up;
+        [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA}
+    end.
+
+do_catch_up(ToTnxId, Node) ->
+    case mnesia:wread({?CLUSTER_COMMIT, Node}) of
+        [] ->
+            commit(Node, ToTnxId),
+            caught_up;
+        [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId =:= LastAppliedId ->
+            caught_up;
+        [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId > LastAppliedId ->
+            CurTnxId = LastAppliedId + 1,
+            [#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId),
+            case apply_mfa(CurTnxId, MFA) of
+                {true, _Result} -> ok = commit(Node, CurTnxId);
+                {false, Error} -> mnesia:abort(Error)
+            end;
+        [#cluster_rpc_commit{tnx_id = LastAppliedId}] ->
+            Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
+                [Node, LastAppliedId, ToTnxId])),
+            ?SLOG(error, #{
+                msg => "catch up failed!",
+                last_applied_id => LastAppliedId,
+                to_tnx_id => ToTnxId
+            }),
+            mnesia:abort(Reason)
+    end.
+
+commit(Node, TnxId) ->
+    ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write).
+
+get_latest_id() ->
+    case mnesia:last(?CLUSTER_MFA) of
+        '$end_of_table' -> 0;
+        Id -> Id
+    end.
+
+init_mfa(Node, MFA) ->
+    mnesia:write_lock_table(?CLUSTER_MFA),
+    LatestId = get_latest_id(),
+    ok = do_catch_up_in_one_trans(LatestId, Node),
+    TnxId = LatestId + 1,
+    MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, initiator = Node, created_at = erlang:localtime()},
+    ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
+    ok = commit(Node, TnxId),
+    case apply_mfa(TnxId, MFA) of
+        {true, Result} -> {ok, TnxId, Result};
+        {false, Error} -> mnesia:abort(Error)
+    end.
+
+do_catch_up_in_one_trans(LatestId, Node) ->
+    case do_catch_up(LatestId, Node) of
+        caught_up -> ok;
+        ok -> do_catch_up_in_one_trans(LatestId, Node)
+    end.
+
+transaction(Func, Args) ->
+    ekka_mnesia:transaction(?COMMON_SHARD, Func, Args).
+
+trans_status() ->
+    mnesia:foldl(fun(Rec, Acc) ->
+        #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec,
+        case mnesia:read(?CLUSTER_MFA, TnxId) of
+            [MFARec] ->
+                #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec,
+                [#{
+                    node => Node,
+                    tnx_id => TnxId,
+                    initiator => InitNode,
+                    mfa => MFA,
+                    created_at => CreatedAt
+                } | Acc];
+            [] -> Acc
+        end end, [], ?CLUSTER_COMMIT).
+
+trans_query(TnxId) ->
+    case mnesia:read(?CLUSTER_MFA, TnxId) of
+        [] -> mnesia:abort(not_found);
+        [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] ->
+            #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt}
+    end.
+
+apply_mfa(TnxId, {M, F, A} = MFA) ->
+    try
+        Res = erlang:apply(M, F, A),
+        Succeed =
+        case Res of
+             ok ->
+                 ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
+                 true;
+            {ok, _} ->
+                ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
+                true;
+            _ ->
+                ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
+                false
+        end,
+        {Succeed, Res}
+    catch
+        C : E ->
+            ?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}),
+            {false, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))}
+    end.

+ 89 - 0
apps/emqx_machine/src/emqx_cluster_rpc_handler.erl

@@ -0,0 +1,89 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_rpc_handler).
+
+-behaviour(gen_server).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include("emqx_cluster_rpc.hrl").
+
+-export([start_link/0, start_link/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+    code_change/3]).
+
+start_link() ->
+    MaxHistory = application:get_env(emqx_machine, cluster_call_max_history, 100),
+    CleanupMs = application:get_env(emqx_machine, cluster_call_cleanup_interval, 5*60*1000),
+    start_link(MaxHistory, CleanupMs).
+
+start_link(MaxHistory, CleanupMs) ->
+    State = #{max_history => MaxHistory, cleanup_ms => CleanupMs, timer => undefined},
+    gen_server:start_link(?MODULE, [State], []).
+
+%%%===================================================================
+%%% Spawning and gen_server implementation
+%%%===================================================================
+
+init([State]) ->
+    {ok, ensure_timer(State)}.
+
+handle_call(Req, _From, State) ->
+    ?LOG(error, "unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+handle_cast(Msg, State) ->
+    ?LOG(error, "unexpected msg: ~p", [Msg]),
+    {noreply, State}.
+
+handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) ->
+    case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
+        {atomic, ok} -> ok;
+        Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error])
+    end,
+    {noreply, ensure_timer(State), hibernate};
+
+handle_info(Info, State) ->
+    ?LOG(error, "unexpected info: ~p", [Info]),
+    {noreply, State}.
+
+terminate(_Reason, #{timer := TRef}) ->
+    emqx_misc:cancel_timer(TRef).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+ensure_timer(State = #{cleanup_ms := Ms}) ->
+    State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}.
+
+%% @doc Keep the latest completed 100 records for querying and troubleshooting.
+del_stale_mfa(MaxHistory) ->
+    DoneId =
+        mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end,
+            infinity, ?CLUSTER_COMMIT),
+    delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory).
+
+delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok;
+delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId ->
+    delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count);
+delete_stale_mfa(CurrId, DoneId, Count) when Count > 0 ->
+    delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count - 1);
+delete_stale_mfa(CurrId, DoneId, Count) when Count =< 0 ->
+    mnesia:delete(?CLUSTER_MFA, CurrId, write),
+    delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count - 1).

+ 8 - 0
apps/emqx_machine/src/emqx_machine_schema.erl

@@ -138,6 +138,14 @@ fields("node") ->
     , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
     , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)}
     , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)}
+    , {"cluster_call", ref("cluster_call")}
+    ];
+
+
+fields("cluster_call") ->
+    [ {"retry_interval", t(emqx_schema:duration(), "emqx_machine.retry_interval", "1s")}
+    , {"max_history", t(range(1, 500), "emqx_machine.max_history", 100)}
+    , {"cleanup_interval", t(emqx_schema:duration(), "emqx_machine.cleanup_interval", "5m")}
     ];
 
 fields("rpc") ->

+ 3 - 1
apps/emqx_machine/src/emqx_machine_sup.erl

@@ -31,7 +31,9 @@ start_link() ->
 init([]) ->
     GlobalGC = child_worker(emqx_global_gc, [], permanent),
     Terminator = child_worker(emqx_machine_terminator, [], transient),
-    Children = [GlobalGC, Terminator],
+    ClusterRpc = child_worker(emqx_cluster_rpc, [], permanent),
+    ClusterHandler = child_worker(emqx_cluster_rpc_handler, [], permanent),
+    Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler],
     SupFlags = #{strategy => one_for_one,
                  intensity => 100,
                  period => 10

+ 240 - 0
apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl

@@ -0,0 +1,240 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_rpc_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-define(NODE1, emqx_cluster_rpc).
+-define(NODE2, emqx_cluster_rpc2).
+-define(NODE3, emqx_cluster_rpc3).
+
+all() -> [
+    t_base_test,
+    t_commit_fail_test,
+    t_commit_crash_test,
+    t_commit_ok_but_apply_fail_on_other_node,
+    t_commit_ok_apply_fail_on_other_node_then_recover,
+    t_del_stale_mfa
+].
+suite() -> [{timetrap, {minutes, 3}}].
+groups() -> [].
+
+init_per_suite(Config) ->
+    application:load(emqx),
+    application:load(emqx_machine),
+    ok = ekka:start(),
+    ok = ekka_rlog:wait_for_shards([emqx_common_shard], infinity),
+    application:set_env(emqx_machine, cluster_call_max_history, 100),
+    application:set_env(emqx_machine, cluster_call_clean_interval, 1000),
+    application:set_env(emqx_machine, cluster_call_retry_interval, 900),
+    %%dbg:tracer(),
+    %%dbg:p(all, c),
+    %%dbg:tpl(emqx_cluster_rpc, cx),
+    %%dbg:tpl(gen_statem, loop_receive, cx),
+    %%dbg:tpl(gen_statem, loop_state_callback, cx),
+    %%dbg:tpl(gen_statem, loop_callback_mode_result, cx),
+    Config.
+
+end_per_suite(_Config) ->
+    ekka:stop(),
+    ekka_mnesia:ensure_stopped(),
+    ekka_mnesia:delete_schema(),
+    %%dbg:stop(),
+    ok.
+
+init_per_testcase(_TestCase, Config) ->
+    start(),
+    Config.
+
+end_per_testcase(_Config) ->
+    stop(),
+    ok.
+
+t_base_test(_Config) ->
+    ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
+    Pid = self(),
+    MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
+    {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
+    {atomic, Query} = emqx_cluster_rpc:query(TnxId),
+    ?assertEqual(MFA, maps:get(mfa, Query)),
+    ?assertEqual(node(), maps:get(initiator, Query)),
+    ?assert(maps:is_key(created_at, Query)),
+    ?assertEqual(ok, receive_msg(3, test)),
+    sleep(400),
+    {atomic, Status} = emqx_cluster_rpc:status(),
+    ?assertEqual(3, length(Status)),
+    ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)),
+    ok.
+
+t_commit_fail_test(_Config) ->
+    emqx_cluster_rpc:reset(),
+    {atomic, []} = emqx_cluster_rpc:status(),
+    {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]},
+    {error, "MFA return not ok"} = emqx_cluster_rpc:multicall(M, F, A),
+    ?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
+    ok.
+
+t_commit_crash_test(_Config) ->
+    emqx_cluster_rpc:reset(),
+    {atomic, []} = emqx_cluster_rpc:status(),
+    {M, F, A} = {?MODULE, no_exist_function, []},
+    Error = emqx_cluster_rpc:multicall(M, F, A),
+    ?assertEqual({error, "TnxId(1) apply MFA({emqx_cluster_rpc_SUITE,no_exist_function,[]}) crash"}, Error),
+    ?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
+    ok.
+
+t_commit_ok_but_apply_fail_on_other_node(_Config) ->
+    emqx_cluster_rpc:reset(),
+    {atomic, []} = emqx_cluster_rpc:status(),
+    MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
+    {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A),
+    {atomic, [Status]} = emqx_cluster_rpc:status(),
+    ?assertEqual(MFA, maps:get(mfa, Status)),
+    ?assertEqual(node(), maps:get(node, Status)),
+    erlang:send(?NODE2, test),
+    Res = gen_statem:call(?NODE2,  {initiate, {M, F, A}}),
+    ?assertEqual({error, "MFA return not ok"}, Res),
+    ok.
+
+t_catch_up_status_handle_next_commit(_Config) ->
+    emqx_cluster_rpc:reset(),
+    {atomic, []} = emqx_cluster_rpc:status(),
+    {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
+    {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A),
+    {ok, 2} = gen_statem:call(?NODE2,  {initiate, {M, F, A}}),
+    ok.
+
+t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
+    emqx_cluster_rpc:reset(),
+    {atomic, []} = emqx_cluster_rpc:status(),
+    Now = erlang:system_time(second),
+    {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]},
+    {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A),
+    {ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"]),
+    {atomic, [Status|L]} = emqx_cluster_rpc:status(),
+    ?assertEqual([], L),
+    ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)),
+    ?assertEqual(node(), maps:get(node, Status)),
+    sleep(3000),
+    {atomic, [Status1]} = emqx_cluster_rpc:status(),
+    ?assertEqual(Status, Status1),
+    sleep(2600),
+    {atomic, NewStatus} = emqx_cluster_rpc:status(),
+    ?assertEqual(3, length(NewStatus)),
+    Pid = self(),
+    MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},
+    {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M1, F1, A1),
+    {atomic, Query} = emqx_cluster_rpc:query(TnxId),
+    ?assertEqual(MFAEcho, maps:get(mfa, Query)),
+    ?assertEqual(node(), maps:get(initiator, Query)),
+    ?assert(maps:is_key(created_at, Query)),
+    ?assertEqual(ok, receive_msg(3, test)),
+    ok.
+
+t_del_stale_mfa(_Config) ->
+    emqx_cluster_rpc:reset(),
+    {atomic, []} = emqx_cluster_rpc:status(),
+    MFA = {M, F, A} = {io, format, ["test"]},
+    Keys = lists:seq(1, 50),
+    Keys2 = lists:seq(51, 150),
+    Ids =
+        [begin
+             {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
+             TnxId end || _ <- Keys],
+    ?assertEqual(Keys, Ids),
+    Ids2 =
+        [begin
+             {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
+             TnxId end || _ <- Keys2],
+    ?assertEqual(Keys2, Ids2),
+    sleep(1200),
+    [begin
+         ?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I))
+     end || I <- lists:seq(1, 50)],
+    [begin
+         {atomic, Map} = emqx_cluster_rpc:query(I),
+         ?assertEqual(MFA, maps:get(mfa, Map)),
+         ?assertEqual(node(), maps:get(initiator, Map)),
+         ?assert(maps:is_key(created_at, Map))
+     end || I <- lists:seq(51, 150)],
+    ok.
+
+start() ->
+    {ok, Pid1} = emqx_cluster_rpc:start_link(),
+    {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
+    {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
+    {ok, Pid4} = emqx_cluster_rpc_handler:start_link(100, 500),
+    {ok, [Pid1, Pid2, Pid3, Pid4]}.
+
+stop() ->
+    [begin
+         case erlang:whereis(N) of
+             undefined -> ok;
+             P ->
+                 erlang:unlink(P),
+                 erlang:exit(P, kill)
+         end end || N <- [?NODE1, ?NODE2, ?NODE3]].
+
+receive_msg(0, _Msg) -> ok;
+receive_msg(Count, Msg) when Count > 0 ->
+    receive Msg ->
+        receive_msg(Count - 1, Msg)
+    after 800 ->
+        timeout
+    end.
+
+echo(Pid, Msg) ->
+    erlang:send(Pid, Msg),
+    ok.
+
+failed_on_node(Pid) ->
+    case Pid =:= self() of
+        true -> ok;
+        false -> "MFA return not ok"
+    end.
+
+failed_on_node_by_odd(Pid) ->
+    case Pid =:= self() of
+        true -> ok;
+        false ->
+            catch ets:new(test, [named_table, set, public]),
+            Num = ets:update_counter(test, self(), {2, 1}, {self(), 1}),
+            case Num rem 2 =:= 0 of
+                false -> "MFA return not ok";
+                true -> ok
+            end
+    end.
+
+failed_on_other_recover_after_5_second(Pid, CreatedAt) ->
+    Now = erlang:system_time(second),
+    case Pid =:= self() of
+        true -> ok;
+        false ->
+            case Now < CreatedAt + 5 of
+                true -> "MFA return not ok";
+                false -> ok
+            end
+    end.
+
+sleep(Second) ->
+    receive _ -> ok
+    after Second -> timeout
+    end.

+ 1 - 27
apps/emqx_resource/include/emqx_resource_utils.hrl

@@ -13,32 +13,6 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
--define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
-
--define(CLUSTER_CALL(Func, Args, ResParttern),
-%% ekka_mnesia:running_nodes()
-    fun() ->
-        case LocalResult = erlang:apply(?MODULE, Func, Args) of
-            ResParttern ->
-                case rpc:multicall(nodes(), ?MODULE, Func, Args, 5000) of
-                {ResL, []} ->
-                    Filter = fun
-                        (ResParttern) -> false;
-                        ({badrpc, {'EXIT', {undef, [{?MODULE, Func0, _, []}]}}})
-                            when Func0 =:= Func -> false;
-                        (_) -> true
-                    end,
-                    case lists:filter(Filter, ResL) of
-                        [] -> LocalResult;
-                        ErrL -> {error, ErrL}
-                    end;
-                {ResL, BadNodes} ->
-                    {error, {failed_on_nodes, BadNodes, ResL}}
-                end;
-            ErrorResult ->
-                {error, ErrorResult}
-        end
-    end()).
 
 -define(SAFE_CALL(_EXP_),
         ?SAFE_CALL(_EXP_, _ = do_nothing)).
@@ -50,4 +24,4 @@
                 _EXP_ON_FAIL_,
                 {error, {_EXCLASS_, _EXCPTION_, _ST_}}
             end
-        end()).
+        end()).

+ 13 - 7
apps/emqx_resource/src/emqx_resource.erl

@@ -155,19 +155,19 @@ query_failed({_, {OnFailed, Args}}) ->
 %% APIs for resource instances
 %% =================================================================================
 -spec create(instance_id(), resource_type(), resource_config()) ->
-    {ok, resource_data()} | {error, Reason :: term()}.
+    {ok, resource_data() |'already_created'} | {error, Reason :: term()}.
 create(InstId, ResourceType, Config) ->
-    ?CLUSTER_CALL(create_local, [InstId, ResourceType, Config], {ok, _}).
+    cluster_call(create_local, [InstId, ResourceType, Config]).
 
 -spec create_local(instance_id(), resource_type(), resource_config()) ->
-    {ok, resource_data()} | {error, Reason :: term()}.
+    {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
 create_local(InstId, ResourceType, Config) ->
     call_instance(InstId, {create, InstId, ResourceType, Config}).
 
 -spec create_dry_run(instance_id(), resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
 create_dry_run(InstId, ResourceType, Config) ->
-    ?CLUSTER_CALL(create_dry_run_local, [InstId, ResourceType, Config]).
+    cluster_call(create_dry_run_local, [InstId, ResourceType, Config]).
 
 -spec create_dry_run_local(instance_id(), resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
@@ -177,7 +177,7 @@ create_dry_run_local(InstId, ResourceType, Config) ->
 -spec update(instance_id(), resource_type(), resource_config(), term()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
 update(InstId, ResourceType, Config, Params) ->
-    ?CLUSTER_CALL(update_local, [InstId, ResourceType, Config, Params], {ok, _}).
+    cluster_call(update_local, [InstId, ResourceType, Config, Params]).
 
 -spec update_local(instance_id(), resource_type(), resource_config(), term()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
@@ -186,7 +186,7 @@ update_local(InstId, ResourceType, Config, Params) ->
 
 -spec remove(instance_id()) -> ok | {error, Reason :: term()}.
 remove(InstId) ->
-    ?CLUSTER_CALL(remove_local, [InstId]).
+    cluster_call(remove_local, [InstId]).
 
 -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
 remove_local(InstId) ->
@@ -285,7 +285,7 @@ check_config(ResourceType, RawConfigTerm) ->
     end.
 
 -spec check_and_create(instance_id(), resource_type(), raw_resource_config()) ->
-    {ok, resource_data()} | {error, term()}.
+    {ok, resource_data() |'already_created'} | {error, term()}.
 check_and_create(InstId, ResourceType, RawConfig) ->
     check_and_do(ResourceType, RawConfig,
         fun(InstConf) -> create(InstId, ResourceType, InstConf) end).
@@ -335,3 +335,9 @@ safe_apply(Func, Args) ->
 
 str(S) when is_binary(S) -> binary_to_list(S);
 str(S) when is_list(S) -> S.
+
+cluster_call(Func, Args) ->
+    case emqx_cluster_rpc:multicall(?MODULE, Func, Args) of
+        {ok, _TxnId, Result} -> Result;
+        Failed -> Failed
+    end.

+ 1 - 1
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -162,7 +162,7 @@ do_update(InstId, ResourceType, NewConfig, Params) ->
 
 do_create(InstId, ResourceType, Config) ->
     case lookup(InstId) of
-        {ok, _} -> {error, already_created};
+        {ok, _} -> {ok, already_created};
         _ ->
             case emqx_resource:call_start(InstId, ResourceType, Config) of
                 {ok, ResourceState} ->

+ 2 - 2
apps/emqx_retainer/src/emqx_retainer.erl

@@ -443,9 +443,9 @@ create_resource(Context, #{type := DB} = Config) ->
            ResourceID,
            list_to_existing_atom(io_lib:format("~s_~s", [emqx_connector, DB])),
            Config) of
-        {ok, _} ->
+        {ok, already_created} ->
             Context#{resource_id => ResourceID};
-        {error, already_created} ->
+        {ok, _} ->
             Context#{resource_id => ResourceID};
         {error, Reason} ->
             error({load_config_error, Reason})

+ 0 - 16
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -155,22 +155,6 @@
             end
         end()).
 
--define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
-
--define(CLUSTER_CALL(Func, Args, ResParttern),
-    fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of
-        {ResL, []} ->
-            case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
-                [] -> ResL;
-                ErrL ->
-                    ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
-                    throw({Func, ErrL})
-            end;
-        {ResL, BadNodes} ->
-            ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
-            throw({Func, {failed_on_nodes, BadNodes}})
-   end end()).
-
 %% Tables
 -define(RULE_TAB, emqx_rule).
 -define(ACTION_TAB, emqx_rule_action).

+ 9 - 9
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -216,7 +216,7 @@ delete_rule(RuleId) ->
     case emqx_rule_registry:get_rule(RuleId) of
         {ok, Rule = #rule{actions = Actions}} ->
             try
-                _ = ?CLUSTER_CALL(clear_rule, [Rule]),
+                _ = emqx_rule_utils:cluster_call(?MODULE, clear_rule, [Rule]),
                 ok = emqx_rule_registry:remove_rule(Rule)
             catch
                 Error:Reason:ST ->
@@ -242,7 +242,7 @@ create_resource(#{type := Type, config := Config0} = Params) ->
             ok = emqx_rule_registry:add_resource(Resource),
             %% Note that we will return OK in case of resource creation failure,
             %% A timer is started to re-start the resource later.
-            catch _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]),
+            catch _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]),
             {ok, Resource};
         not_found ->
             {error, {resource_type_not_found, Type}}
@@ -280,7 +280,7 @@ do_check_and_update_resource(#{id := Id, type := Type, description := NewDescrip
             Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
             case test_resource(#{type => Type, config => NewConfig}) of
                 ok ->
-                    _ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]),
+                    _ =  emqx_rule_utils:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]),
                     emqx_rule_registry:add_resource(#resource{
                         id = Id,
                         type = Type,
@@ -319,8 +319,8 @@ test_resource(#{type := Type, config := Config0}) ->
             Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
             ResId = resource_id(),
             try
-                _ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]),
-                _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
+                _ =  emqx_rule_utils:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]),
+                _ =  emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
                 ok
             catch
                 throw:Reason -> {error, Reason}
@@ -359,7 +359,7 @@ delete_resource(ResId) ->
             try
                 case emqx_rule_registry:remove_resource(ResId) of
                     ok ->
-                        _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
+                        _ =  emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
                         ok;
                     {error, _} = R -> R
                 end
@@ -426,7 +426,7 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) ->
             ActionInstId = maps:get(id, Action, action_instance_id(Name)),
             case NeedInit of
                 true ->
-                    _ = ?CLUSTER_CALL(init_action, [Mod, Create, ActionInstId,
+                    _ =  emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId,
                             with_resource_params(Args)]),
                     ok;
                 false -> ok
@@ -485,7 +485,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) ->
 may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) ->
     %% prepare new actions before removing old ones
     NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)),
-    _ = ?CLUSTER_CALL(clear_actions, [OldActions]),
+    _ =  emqx_rule_utils:cluster_call(?MODULE, clear_actions, [OldActions]),
     may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params));
 may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params
     Rule.
@@ -631,7 +631,7 @@ refresh_actions(Actions, Pred) ->
                 true ->
                     {ok, #action{module = Mod, on_create = Create}}
                         = emqx_rule_registry:find_action(ActName),
-                    _ = ?CLUSTER_CALL(init_action, [Mod, Create, Id, with_resource_params(Args)]),
+                    _ =  emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]),
                     refresh_actions(Fallbacks, Pred);
                 false -> ok
             end

+ 5 - 4
apps/emqx_rule_engine/src/emqx_rule_registry.erl

@@ -221,7 +221,7 @@ remove_rules(Rules) ->
 
 %% @private
 insert_rule(Rule) ->
-    _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]),
+    _ =  emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rule]),
     mnesia:write(?RULE_TAB, Rule, write).
 
 %% @private
@@ -231,7 +231,7 @@ delete_rule(RuleId) when is_binary(RuleId) ->
         not_found -> ok
     end;
 delete_rule(Rule) ->
-    _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]),
+    _ =  emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]),
     mnesia:delete_object(?RULE_TAB, Rule, write).
 
 load_hooks_for_rule(#rule{for = Topics}) ->
@@ -476,10 +476,11 @@ code_change(_OldVsn, State, _Extra) ->
 
 get_all_records(Tab) ->
     %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
-    %% Wrapping ets to a r/o transaction to avoid reading inconsistent
+    %% Wrapping ets to a transaction to avoid reading inconsistent
+    %% ( nest cluster_call transaction, no a r/o transaction)
     %% data during shard bootstrap
     {atomic, Ret} =
-        ekka_mnesia:ro_transaction(?RULE_ENGINE_SHARD,
+        ekka_mnesia:transaction(?RULE_ENGINE_SHARD,
                                    fun() ->
                                            ets:tab2list(Tab)
                                    end),

+ 6 - 0
apps/emqx_rule_engine/src/emqx_rule_utils.erl

@@ -55,6 +55,8 @@
         , can_topic_match_oneof/2
         ]).
 
+-export([cluster_call/3]).
+
 -compile({no_auto_import,
           [ float/1
           ]}).
@@ -356,3 +358,7 @@ can_topic_match_oneof(Topic, Filters) ->
     lists:any(fun(Fltr) ->
         emqx_topic:match(Topic, Fltr)
     end, Filters).
+
+cluster_call(Module, Func, Args) ->
+    {ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args),
+    Result.

+ 5 - 1
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -30,7 +30,7 @@
 all() ->
     [ {group, engine}
     , {group, actions}
-    , {group, api}
+%%    , {group, api}
     , {group, cli}
     , {group, funcs}
     , {group, registry}
@@ -148,6 +148,7 @@ groups() ->
 %%------------------------------------------------------------------------------
 
 init_per_suite(Config) ->
+    application:load(emqx_machine),
     ok = ekka_mnesia:start(),
     ok = emqx_rule_registry:mnesia(boot),
     ok = emqx_ct_helpers:start_apps([emqx_rule_engine], fun set_special_configs/1),
@@ -181,6 +182,7 @@ end_per_group(_Groupname, _Config) ->
 %%------------------------------------------------------------------------------
 
 init_per_testcase(t_events, Config) ->
+    {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
     ok = emqx_rule_engine:load_providers(),
     init_events_counters(),
     ok = emqx_rule_registry:register_resource_types([make_simple_resource_type(simple_resource_type)]),
@@ -214,6 +216,7 @@ init_per_testcase(Test, Config)
             ;Test =:= t_sqlselect_multi_actoins_3_1
             ;Test =:= t_sqlselect_multi_actoins_4
         ->
+    emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
     ok = emqx_rule_engine:load_providers(),
     ok = emqx_rule_registry:add_action(
             #action{name = 'crash_action', app = ?APP,
@@ -252,6 +255,7 @@ init_per_testcase(Test, Config)
      {connsql, SQL}
     | Config];
 init_per_testcase(_TestCase, Config) ->
+    emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
     ok = emqx_rule_registry:register_resource_types(
             [#resource_type{
                 name = built_in,

+ 2 - 0
apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl

@@ -39,6 +39,7 @@ groups() ->
     ].
 
 init_per_suite(Config) ->
+    application:load(emqx_machine),
     ok = ekka_mnesia:start(),
     ok = emqx_rule_registry:mnesia(boot),
     Config.
@@ -65,6 +66,7 @@ end_per_testcase(_, Config) ->
 
 t_restart_resource(_) ->
     {ok, _} = emqx_rule_monitor:start_link(),
+    emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc,1000),
     ok = emqx_rule_registry:register_resource_types(
             [#resource_type{
                 name = test_res_1,