Просмотр исходного кода

Merge pull request #8755 from ieQu1/master

Export transaction functions
ieQu1 3 лет назад
Родитель
Сommit
46a2324afb

+ 6 - 1
apps/emqx/src/bpapi/emqx_bpapi.erl

@@ -23,6 +23,11 @@
     versions_file/1
 ]).
 
+%% Internal exports (RPC)
+-export([
+    announce_fun/1
+]).
+
 -export_type([api/0, api_version/0, var_name/0, call/0, rpc/0, bpapi_meta/0]).
 
 -include("emqx.hrl").
@@ -77,7 +82,7 @@ supported_version(API) ->
 -spec announce(atom()) -> ok.
 announce(App) ->
     {ok, Data} = file:consult(?MODULE:versions_file(App)),
-    {atomic, ok} = mria:transaction(?COMMON_SHARD, fun announce_fun/1, [Data]),
+    {atomic, ok} = mria:transaction(?COMMON_SHARD, fun ?MODULE:announce_fun/1, [Data]),
     ok.
 
 -spec versions_file(atom()) -> file:filename_all().

+ 16 - 10
apps/emqx/src/emqx_alarm.erl

@@ -54,6 +54,12 @@
     code_change/3
 ]).
 
+%% Internal exports (RPC)
+-export([
+    create_activate_alarm/3,
+    do_get_alarms/0
+]).
+
 -record(activated_alarm, {
     name :: binary() | atom(),
     details :: map() | list(),
@@ -210,7 +216,7 @@ init([]) ->
 handle_call({activate_alarm, Name, Details, Message}, _From, State) ->
     Res = mria:transaction(
         mria:local_content_shard(),
-        fun create_activate_alarm/3,
+        fun ?MODULE:create_activate_alarm/3,
         [Name, Details, Message]
     ),
     case Res of
@@ -234,15 +240,7 @@ handle_call(delete_all_deactivated_alarms, _From, State) ->
 handle_call({get_alarms, all}, _From, State) ->
     {atomic, Alarms} =
         mria:ro_transaction(
-            mria:local_content_shard(),
-            fun() ->
-                [
-                    normalize(Alarm)
-                 || Alarm <-
-                        ets:tab2list(?ACTIVATED_ALARM) ++
-                            ets:tab2list(?DEACTIVATED_ALARM)
-                ]
-            end
+            mria:local_content_shard(), fun ?MODULE:do_get_alarms/0
         ),
     {reply, Alarms, State, get_validity_period()};
 handle_call({get_alarms, activated}, _From, State) ->
@@ -295,6 +293,14 @@ create_activate_alarm(Name, Details, Message) ->
             Alarm
     end.
 
+do_get_alarms() ->
+    [
+        normalize(Alarm)
+     || Alarm <-
+            ets:tab2list(?ACTIVATED_ALARM) ++
+                ets:tab2list(?DEACTIVATED_ALARM)
+    ].
+
 deactivate_alarm(
     #activated_alarm{
         activate_at = ActivateAt,

+ 8 - 1
apps/emqx/src/emqx_banned.erl

@@ -49,6 +49,11 @@
     code_change/3
 ]).
 
+%% Internal exports (RPC)
+-export([
+    expire_banned_items/1
+]).
+
 -elvis([{elvis_style, state_record_and_type, disable}]).
 
 -define(BANNED_TAB, ?MODULE).
@@ -224,7 +229,9 @@ handle_cast(Msg, State) ->
     {noreply, State}.
 
 handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
-    _ = mria:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]),
+    _ = mria:transaction(?COMMON_SHARD, fun ?MODULE:expire_banned_items/1, [
+        erlang:system_time(second)
+    ]),
     {noreply, ensure_expiry_timer(State), hibernate};
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),

+ 7 - 5
apps/emqx/src/emqx_exclusive_subscription.erl

@@ -35,6 +35,11 @@
     unsubscribe/2
 ]).
 
+%% Internal exports (RPC)
+-export([
+    try_subscribe/2
+]).
+
 -record(exclusive_subscription, {
     topic :: emqx_types:topic(),
     clientid :: emqx_types:clientid()
@@ -80,10 +85,7 @@ on_delete_module() ->
 -spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
     allow | deny.
 check_subscribe(#{clientid := ClientId}, Topic) ->
-    Fun = fun() ->
-        try_subscribe(ClientId, Topic)
-    end,
-    case mria:transaction(?EXCLUSIVE_SHARD, Fun) of
+    case mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:try_subscribe/2, [ClientId, Topic]) of
         {atomic, Res} ->
             Res;
         {aborted, Reason} ->
@@ -94,7 +96,7 @@ check_subscribe(#{clientid := ClientId}, Topic) ->
     end.
 
 unsubscribe(Topic, #{is_exclusive := true}) ->
-    _ = mria:transaction(?EXCLUSIVE_SHARD, fun() -> mnesia:delete({?TAB, Topic}) end),
+    _ = mria:transaction(?EXCLUSIVE_SHARD, fun mnesia:delete/1, [{?TAB, Topic}]),
     ok;
 unsubscribe(_Topic, _SubOpts) ->
     ok.

+ 6 - 1
apps/emqx/src/emqx_router_helper.erl

@@ -47,6 +47,11 @@
     code_change/3
 ]).
 
+%% Internal exports (RPC)
+-export([
+    cleanup_routes/1
+]).
+
 -record(routing_node, {name, const = unused}).
 
 -define(ROUTE, emqx_route).
@@ -145,7 +150,7 @@ handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
     global:trans(
         {?LOCK, self()},
         fun() ->
-            mria:transaction(?ROUTE_SHARD, fun cleanup_routes/1, [Node])
+            mria:transaction(?ROUTE_SHARD, fun ?MODULE:cleanup_routes/1, [Node])
         end
     ),
     ok = mria:dirty_delete(?ROUTING_NODE, Node),

+ 6 - 1
apps/emqx/src/emqx_shared_sub.erl

@@ -67,6 +67,11 @@
     code_change/3
 ]).
 
+%% Internal exports (RPC)
+-export([
+    init_monitors/0
+]).
+
 -export_type([strategy/0]).
 
 -type strategy() ::
@@ -336,7 +341,7 @@ subscribers(Group, Topic) ->
 init([]) ->
     ok = mria:wait_for_tables([?TAB]),
     {ok, _} = mnesia:subscribe({table, ?TAB, simple}),
-    {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun init_monitors/0),
+    {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0),
     ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
     ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
     ok = emqx_tables:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [public, set, {write_concurrency, true}]),

+ 9 - 74
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -51,10 +51,7 @@
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
--define(TRACE, ?MODULE).
--define(SHARD, ?COMMON_SHARD).
--define(MAX_SIZE, 30).
--define(OWN_KEYS, [level, filters, filter_default, handlers]).
+-include("emqx_trace.hrl").
 
 -ifdef(TEST).
 -export([
@@ -66,15 +63,6 @@
 -export_type([ip_address/0]).
 -type ip_address() :: string().
 
--record(?TRACE, {
-    name :: binary() | undefined | '_',
-    type :: clientid | topic | ip_address | undefined | '_',
-    filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_',
-    enable = true :: boolean() | '_',
-    start_at :: integer() | undefined | '_',
-    end_at :: integer() | undefined | '_'
-}).
-
 publish(#message{topic = <<"$SYS/", _/binary>>}) ->
     ignore;
 publish(#message{from = From, topic = Topic, payload = Payload}) when
@@ -172,13 +160,7 @@ create(Trace) ->
 
 -spec delete(Name :: binary()) -> ok | {error, not_found}.
 delete(Name) ->
-    Tran = fun() ->
-        case mnesia:read(?TRACE, Name) of
-            [_] -> mnesia:delete(?TRACE, Name, write);
-            [] -> mnesia:abort(not_found)
-        end
-    end,
-    transaction(Tran).
+    transaction(fun emqx_trace_dl:delete/1, [Name]).
 
 -spec clear() -> ok | {error, Reason :: term()}.
 clear() ->
@@ -190,20 +172,7 @@ clear() ->
 -spec update(Name :: binary(), Enable :: boolean()) ->
     ok | {error, not_found | finished}.
 update(Name, Enable) ->
-    Tran = fun() ->
-        case mnesia:read(?TRACE, Name) of
-            [] ->
-                mnesia:abort(not_found);
-            [#?TRACE{enable = Enable}] ->
-                ok;
-            [Rec] ->
-                case erlang:system_time(second) >= Rec#?TRACE.end_at of
-                    false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write);
-                    true -> mnesia:abort(finished)
-                end
-        end
-    end,
-    transaction(Tran).
+    transaction(fun emqx_trace_dl:update/2, [Name, Enable]).
 
 check() ->
     gen_server:call(?MODULE, check).
@@ -211,13 +180,7 @@ check() ->
 -spec get_trace_filename(Name :: binary()) ->
     {ok, FileName :: string()} | {error, not_found}.
 get_trace_filename(Name) ->
-    Tran = fun() ->
-        case mnesia:read(?TRACE, Name, read) of
-            [] -> mnesia:abort(not_found);
-            [#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)}
-        end
-    end,
-    transaction(Tran).
+    transaction(fun emqx_trace_dl:get_trace_filename/1, [Name]).
 
 -spec trace_file(File :: file:filename_all()) ->
     {ok, Node :: list(), Binary :: binary()}
@@ -309,23 +272,7 @@ code_change(_, State, _Extra) ->
     {ok, State}.
 
 insert_new_trace(Trace) ->
-    Tran = fun() ->
-        case mnesia:read(?TRACE, Trace#?TRACE.name) of
-            [] ->
-                #?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace,
-                Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter},
-                case mnesia:match_object(?TRACE, Match, read) of
-                    [] ->
-                        ok = mnesia:write(?TRACE, Trace, write),
-                        {ok, Trace};
-                    [#?TRACE{name = Name}] ->
-                        mnesia:abort({duplicate_condition, Name})
-                end;
-            [#?TRACE{name = Name}] ->
-                mnesia:abort({already_existed, Name})
-        end
-    end,
-    transaction(Tran).
+    transaction(fun emqx_trace_dl:insert_new_trace/1, [Trace]).
 
 update_trace(Traces) ->
     Now = erlang:system_time(second),
@@ -347,9 +294,7 @@ stop_all_trace_handler() ->
 
 get_enabled_trace() ->
     {atomic, Traces} =
-        mria:ro_transaction(?SHARD, fun() ->
-            mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
-        end),
+        mria:ro_transaction(?SHARD, fun emqx_trace_dl:get_enabled_trace/0),
     Traces.
 
 find_closest_time(Traces, Now) ->
@@ -372,17 +317,7 @@ closest(Time, Now, Closest) -> min(Time - Now, Closest).
 disable_finished([]) ->
     ok;
 disable_finished(Traces) ->
-    transaction(fun() ->
-        lists:map(
-            fun(#?TRACE{name = Name}) ->
-                case mnesia:read(?TRACE, Name, write) of
-                    [] -> ok;
-                    [Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write)
-                end
-            end,
-            Traces
-        )
-    end).
+    transaction(fun emqx_trace_dl:delete_finished/1, [Traces]).
 
 start_trace(Traces, Started0) ->
     Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
@@ -586,8 +521,8 @@ filename(Name, Start) ->
     [Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading),
     lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]).
 
-transaction(Tran) ->
-    case mria:transaction(?COMMON_SHARD, Tran) of
+transaction(Fun, Args) ->
+    case mria:transaction(?COMMON_SHARD, Fun, Args) of
         {atomic, Res} -> Res;
         {aborted, Reason} -> {error, Reason}
     end.

+ 35 - 0
apps/emqx/src/emqx_trace/emqx_trace.hrl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_TRACE_HRL).
+-define(EMQX_TRACE_HRL, true).
+
+-define(TRACE, emqx_trace).
+
+-record(?TRACE, {
+    name :: binary() | undefined | '_',
+    type :: clientid | topic | ip_address | undefined | '_',
+    filter ::
+        emqx_types:topic() | emqx_types:clientid() | emqx_trace:ip_address() | undefined | '_',
+    enable = true :: boolean() | '_',
+    start_at :: integer() | undefined | '_',
+    end_at :: integer() | undefined | '_'
+}).
+
+-define(SHARD, ?COMMON_SHARD).
+-define(MAX_SIZE, 30).
+-define(OWN_KEYS, [level, filters, filter_default, handlers]).
+
+-endif.

+ 103 - 0
apps/emqx/src/emqx_trace/emqx_trace_dl.erl

@@ -0,0 +1,103 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% Data layer for emqx_trace
+-module(emqx_trace_dl).
+
+%% API:
+-export([
+    update/2,
+    insert_new_trace/1,
+    delete/1,
+    get_trace_filename/1,
+    delete_finished/1,
+    get_enabled_trace/0
+]).
+
+-include("emqx_trace.hrl").
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+%% Introduced in 5.0
+-spec update(Name :: binary(), Enable :: boolean()) ->
+    ok.
+update(Name, Enable) ->
+    case mnesia:read(?TRACE, Name) of
+        [] ->
+            mnesia:abort(not_found);
+        [#?TRACE{enable = Enable}] ->
+            ok;
+        [Rec] ->
+            case erlang:system_time(second) >= Rec#?TRACE.end_at of
+                false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write);
+                true -> mnesia:abort(finished)
+            end
+    end.
+
+%% Introduced in 5.0
+insert_new_trace(Trace) ->
+    case mnesia:read(?TRACE, Trace#?TRACE.name) of
+        [] ->
+            #?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace,
+            Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter},
+            case mnesia:match_object(?TRACE, Match, read) of
+                [] ->
+                    ok = mnesia:write(?TRACE, Trace, write),
+                    {ok, Trace};
+                [#?TRACE{name = Name}] ->
+                    mnesia:abort({duplicate_condition, Name})
+            end;
+        [#?TRACE{name = Name}] ->
+            mnesia:abort({already_existed, Name})
+    end.
+
+%% Introduced in 5.0
+-spec delete(Name :: binary()) -> ok.
+delete(Name) ->
+    case mnesia:read(?TRACE, Name) of
+        [_] -> mnesia:delete(?TRACE, Name, write);
+        [] -> mnesia:abort(not_found)
+    end.
+
+%% Introduced in 5.0
+-spec get_trace_filename(Name :: binary()) -> {ok, string()}.
+get_trace_filename(Name) ->
+    case mnesia:read(?TRACE, Name, read) of
+        [] -> mnesia:abort(not_found);
+        [#?TRACE{start_at = Start}] -> {ok, emqx_trace:filename(Name, Start)}
+    end.
+
+%% Introduced in 5.0
+delete_finished(Traces) ->
+    lists:map(
+        fun(#?TRACE{name = Name}) ->
+            case mnesia:read(?TRACE, Name, write) of
+                [] -> ok;
+                [Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write)
+            end
+        end,
+        Traces
+    ).
+
+%% Introduced in 5.0
+get_enabled_trace() ->
+    mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read).
+
+%%================================================================================
+%% Internal functions
+%%================================================================================

+ 1 - 1
apps/emqx/test/emqx_router_helper_SUITE.erl

@@ -72,7 +72,7 @@ end_per_testcase(TestCase, Config) when
 ->
     Slave = ?config(slave, Config),
     emqx_common_test_helpers:stop_slave(Slave),
-    mria:transaction(?ROUTE_SHARD, fun() -> mnesia:clear_table(?ROUTE_TAB) end),
+    mria:clear_table(?ROUTE_TAB),
     snabbkaffe:stop(),
     ok;
 end_per_testcase(_TestCase, _Config) ->