Просмотр исходного кода

Merge pull request #13386 from lafirest/feat/banned_boot_57

feat(banned): add a bootstrap file for banned
lafirest 1 год назад
Родитель
Сommit
aa84ca5a88

+ 145 - 5
apps/emqx/src/emqx_banned.erl

@@ -16,6 +16,8 @@
 
 
 -module(emqx_banned).
 -module(emqx_banned).
 
 
+-feature(maybe_expr, enable).
+
 -behaviour(gen_server).
 -behaviour(gen_server).
 -behaviour(emqx_db_backup).
 -behaviour(emqx_db_backup).
 
 
@@ -48,6 +50,7 @@
     handle_call/3,
     handle_call/3,
     handle_cast/2,
     handle_cast/2,
     handle_info/2,
     handle_info/2,
+    handle_continue/2,
     terminate/2,
     terminate/2,
     code_change/3
     code_change/3
 ]).
 ]).
@@ -132,7 +135,7 @@ format(#banned{
         until => to_rfc3339(Until)
         until => to_rfc3339(Until)
     }.
     }.
 
 
--spec parse(map()) -> emqx_types:banned() | {error, term()}.
+-spec parse(map()) -> {ok, emqx_types:banned()} | {error, term()}.
 parse(Params) ->
 parse(Params) ->
     case parse_who(Params) of
     case parse_who(Params) of
         {error, Reason} ->
         {error, Reason} ->
@@ -144,13 +147,13 @@ parse(Params) ->
             Until = maps:get(<<"until">>, Params, At + ?EXPIRATION_TIME),
             Until = maps:get(<<"until">>, Params, At + ?EXPIRATION_TIME),
             case Until > erlang:system_time(second) of
             case Until > erlang:system_time(second) of
                 true ->
                 true ->
-                    #banned{
+                    {ok, #banned{
                         who = Who,
                         who = Who,
                         by = By,
                         by = By,
                         reason = Reason,
                         reason = Reason,
                         at = At,
                         at = At,
                         until = Until
                         until = Until
-                    };
+                    }};
                 false ->
                 false ->
                     ErrorReason =
                     ErrorReason =
                         io_lib:format("Cannot create expired banned, ~p to ~p", [At, Until]),
                         io_lib:format("Cannot create expired banned, ~p to ~p", [At, Until]),
@@ -234,12 +237,137 @@ who(peerhost_net, CIDR) when is_tuple(CIDR) -> {peerhost_net, CIDR};
 who(peerhost_net, CIDR) when is_binary(CIDR) ->
 who(peerhost_net, CIDR) when is_binary(CIDR) ->
     {peerhost_net, esockd_cidr:parse(binary_to_list(CIDR), true)}.
     {peerhost_net, esockd_cidr:parse(binary_to_list(CIDR), true)}.
 
 
+%%--------------------------------------------------------------------
+%% Import From CSV
+%%--------------------------------------------------------------------
+init_from_csv(<<>>) ->
+    ok;
+init_from_csv(File) ->
+    maybe
+        core ?= mria_rlog:role(),
+        '$end_of_table' ?= mnesia:dirty_first(?BANNED_RULE_TAB),
+        '$end_of_table' ?= mnesia:dirty_first(?BANNED_INDIVIDUAL_TAB),
+        {ok, Bin} ?= file:read_file(File),
+        Stream = emqx_utils_stream:csv(Bin, #{nullable => true, filter_null => true}),
+        {ok, List} ?= parse_stream(Stream),
+        import_from_stream(List),
+        ?SLOG(info, #{
+            msg => "load_banned_bootstrap_file_succeeded",
+            file => File
+        })
+    else
+        replicant ->
+            ok;
+        {Name, _} when
+            Name == peerhost;
+            Name == peerhost_net;
+            Name == clientid_re;
+            Name == username_re;
+            Name == clientid;
+            Name == username
+        ->
+            ok;
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "load_banned_bootstrap_file_failed",
+                reason => Reason,
+                file => File
+            }),
+            Error
+    end.
+
+import_from_stream(Stream) ->
+    Groups = maps:groups_from_list(
+        fun(#banned{who = Who}) -> table(Who) end, Stream
+    ),
+    maps:foreach(
+        fun(Tab, Items) ->
+            Trans = fun() ->
+                lists:foreach(
+                    fun(Item) ->
+                        mnesia:write(Tab, Item, write)
+                    end,
+                    Items
+                )
+            end,
+
+            case trans(Trans) of
+                {ok, _} ->
+                    ?SLOG(info, #{
+                        msg => "import_banned_from_stream_succeeded",
+                        items => Items
+                    });
+                {error, Reason} ->
+                    ?SLOG(error, #{
+                        msg => "import_banned_from_stream_failed",
+                        reason => Reason,
+                        items => Items
+                    })
+            end
+        end,
+        Groups
+    ).
+
+parse_stream(Stream) ->
+    try
+        List = emqx_utils_stream:consume(Stream),
+        parse_stream(List, [], [])
+    catch
+        error:Reason ->
+            {error, Reason}
+    end.
+
+parse_stream([Item | List], Ok, Error) ->
+    maybe
+        {ok, Item1} ?= normalize_parse_item(Item),
+        {ok, Banned} ?= parse(Item1),
+        parse_stream(List, [Banned | Ok], Error)
+    else
+        {error, _} ->
+            parse_stream(List, Ok, [Item | Error])
+    end;
+parse_stream([], Ok, []) ->
+    {ok, Ok};
+parse_stream([], Ok, Error) ->
+    ?SLOG(warning, #{
+        msg => "invalid_banned_items",
+        items => Error
+    }),
+    {ok, Ok}.
+
+normalize_parse_item(#{<<"as">> := As} = Item) ->
+    ParseTime = fun(Name, Input) ->
+        maybe
+            #{Name := Time} ?= Input,
+            {ok, Epoch} ?= emqx_utils_calendar:to_epoch_second(emqx_utils_conv:str(Time)),
+            {ok, Input#{Name := Epoch}}
+        else
+            {error, _} = Error ->
+                Error;
+            NoTime when is_map(NoTime) ->
+                {ok, NoTime}
+        end
+    end,
+
+    maybe
+        {ok, Type} ?= emqx_utils:safe_to_existing_atom(As),
+        {ok, Item1} ?= ParseTime(<<"at">>, Item#{<<"as">> := Type}),
+        ParseTime(<<"until">>, Item1)
+    end;
+normalize_parse_item(_Item) ->
+    {error, invalid_item}.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 init([]) ->
 init([]) ->
-    {ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
+    {ok, ensure_expiry_timer(#{expiry_timer => undefined}), {continue, init_from_csv}}.
+
+handle_continue(init_from_csv, State) ->
+    File = emqx_schema:naive_env_interpolation(emqx:get_config([banned, bootstrap_file], <<>>)),
+    _ = init_from_csv(File),
+    {noreply, State}.
 
 
 handle_call(Req, _From, State) ->
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
@@ -250,7 +378,7 @@ handle_cast(Msg, State) ->
     {noreply, State}.
     {noreply, State}.
 
 
 handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
 handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
-    _ = mria:transaction(?COMMON_SHARD, fun ?MODULE:expire_banned_items/1, [
+    _ = trans(fun ?MODULE:expire_banned_items/1, [
         erlang:system_time(second)
         erlang:system_time(second)
     ]),
     ]),
     {noreply, ensure_expiry_timer(State), hibernate};
     {noreply, ensure_expiry_timer(State), hibernate};
@@ -391,3 +519,15 @@ on_banned(_) ->
 
 
 all_rules() ->
 all_rules() ->
     ets:tab2list(?BANNED_RULE_TAB).
     ets:tab2list(?BANNED_RULE_TAB).
+
+trans(Fun) ->
+    case mria:transaction(?COMMON_SHARD, Fun) of
+        {atomic, Res} -> {ok, Res};
+        {aborted, Reason} -> {error, Reason}
+    end.
+
+trans(Fun, Args) ->
+    case mria:transaction(?COMMON_SHARD, Fun, Args) of
+        {atomic, Res} -> {ok, Res};
+        {aborted, Reason} -> {error, Reason}
+    end.

+ 18 - 0
apps/emqx/src/emqx_schema.erl

@@ -319,6 +319,11 @@ roots(low) ->
             sc(
             sc(
                 ref("crl_cache"),
                 ref("crl_cache"),
                 #{importance => ?IMPORTANCE_HIDDEN}
                 #{importance => ?IMPORTANCE_HIDDEN}
+            )},
+        {banned,
+            sc(
+                ref("banned"),
+                #{importance => ?IMPORTANCE_HIDDEN}
             )}
             )}
     ].
     ].
 
 
@@ -1762,6 +1767,17 @@ fields("client_attrs_init") ->
                 desc => ?DESC("client_attrs_init_set_as_attr"),
                 desc => ?DESC("client_attrs_init_set_as_attr"),
                 validator => fun restricted_string/1
                 validator => fun restricted_string/1
             })}
             })}
+    ];
+fields("banned") ->
+    [
+        {bootstrap_file,
+            sc(
+                binary(),
+                #{
+                    desc => ?DESC("banned_bootstrap_file"),
+                    default => <<>>
+                }
+            )}
     ].
     ].
 
 
 compile_variform(undefined, _Opts) ->
 compile_variform(undefined, _Opts) ->
@@ -2105,6 +2121,8 @@ desc(durable_storage) ->
     ?DESC(durable_storage);
     ?DESC(durable_storage);
 desc("client_attrs_init") ->
 desc("client_attrs_init") ->
     ?DESC(client_attrs_init);
     ?DESC(client_attrs_init);
+desc("banned") ->
+    "Banned .";
 desc(_) ->
 desc(_) ->
     undefined.
     undefined.
 
 

+ 4 - 0
apps/emqx/test/data/banned/error.csv

@@ -0,0 +1,4 @@
+as,who,reason,at,until,by
+clientid,c1,right,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
+username,u1,reason 1,abc,2025-10-25T21:53:47+08:00,boot
+usernamx,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot

+ 3 - 0
apps/emqx/test/data/banned/full.csv

@@ -0,0 +1,3 @@
+as,who,reason,at,until,by
+clientid,c1,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
+username,u1,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot

+ 3 - 0
apps/emqx/test/data/banned/full2.csv

@@ -0,0 +1,3 @@
+as,who,reason,at,until,by
+clientid,c2,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
+username,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot

+ 3 - 0
apps/emqx/test/data/banned/omitted.csv

@@ -0,0 +1,3 @@
+as,who,reason,at,until,by
+clientid,c1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,
+username,u1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,

+ 3 - 0
apps/emqx/test/data/banned/optional.csv

@@ -0,0 +1,3 @@
+as,who
+clientid,c1
+username,u1

+ 53 - 0
apps/emqx/test/emqx_banned_SUITE.erl

@@ -246,6 +246,45 @@ t_session_taken(_) ->
     {ok, #{}, [0]} = emqtt:unsubscribe(C3, Topic),
     {ok, #{}, [0]} = emqtt:unsubscribe(C3, Topic),
     ok = emqtt:disconnect(C3).
     ok = emqtt:disconnect(C3).
 
 
+t_full_bootstrap_file(_) ->
+    emqx_banned:clear(),
+    ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full.csv">>))),
+    FullDatas = lists:sort([
+        {banned, {username, <<"u1">>}, <<"boot">>, <<"reason 2">>, 1635170027, 1761400427},
+        {banned, {clientid, <<"c1">>}, <<"boot">>, <<"reason 1">>, 1635170027, 1761400427}
+    ]),
+    ?assertMatch(FullDatas, lists:sort(get_banned_list())),
+
+    ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full2.csv">>))),
+    ?assertMatch(FullDatas, lists:sort(get_banned_list())),
+    ok.
+
+t_optional_bootstrap_file(_) ->
+    emqx_banned:clear(),
+    ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"optional.csv">>))),
+    Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]),
+    ?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])),
+    ok.
+
+t_omitted_bootstrap_file(_) ->
+    emqx_banned:clear(),
+    ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"omitted.csv">>))),
+    Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]),
+    ?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])),
+    ok.
+
+t_error_bootstrap_file(_) ->
+    emqx_banned:clear(),
+    ?assertEqual(
+        {error, enoent}, emqx_banned:init_from_csv(mk_bootstrap_file(<<"not_exists.csv">>))
+    ),
+    ?assertEqual(
+        ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"error.csv">>))
+    ),
+    Keys = [{clientid, <<"c1">>}],
+    ?assertMatch(Keys, [element(2, Data) || Data <- get_banned_list()]),
+    ok.
+
 receive_messages(Count) ->
 receive_messages(Count) ->
     receive_messages(Count, []).
     receive_messages(Count, []).
 receive_messages(0, Msgs) ->
 receive_messages(0, Msgs) ->
@@ -261,3 +300,17 @@ receive_messages(Count, Msgs) ->
     after 1200 ->
     after 1200 ->
         Msgs
         Msgs
     end.
     end.
+
+mk_bootstrap_file(File) ->
+    Dir = code:lib_dir(emqx, test),
+    filename:join([Dir, <<"data/banned">>, File]).
+
+get_banned_list() ->
+    Tabs = emqx_banned:tables(),
+    lists:foldl(
+        fun(Tab, Acc) ->
+            Acc ++ ets:tab2list(Tab)
+        end,
+        [],
+        Tabs
+    ).

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_banned.erl

@@ -171,7 +171,7 @@ banned(post, #{body := Body}) ->
         {error, Reason} ->
         {error, Reason} ->
             ErrorReason = io_lib:format("~p", [Reason]),
             ErrorReason = io_lib:format("~p", [Reason]),
             {400, 'BAD_REQUEST', list_to_binary(ErrorReason)};
             {400, 'BAD_REQUEST', list_to_binary(ErrorReason)};
-        Ban ->
+        {ok, Ban} ->
             case emqx_banned:create(Ban) of
             case emqx_banned:create(Ban) of
                 {ok, Banned} ->
                 {ok, Banned} ->
                     {200, format(Banned)};
                     {200, format(Banned)};

+ 54 - 5
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -45,15 +45,18 @@
 
 
 %% Streams from .csv data
 %% Streams from .csv data
 -export([
 -export([
-    csv/1
+    csv/1,
+    csv/2
 ]).
 ]).
 
 
--export_type([stream/1]).
+-export_type([stream/1, csv_parse_opts/0]).
 
 
 %% @doc A stream is essentially a lazy list.
 %% @doc A stream is essentially a lazy list.
 -type stream(T) :: fun(() -> next(T) | []).
 -type stream(T) :: fun(() -> next(T) | []).
 -type next(T) :: nonempty_improper_list(T, stream(T)).
 -type next(T) :: nonempty_improper_list(T, stream(T)).
 
 
+-type csv_parse_opts() :: #{nullable => boolean(), filter_null => boolean()}.
+
 -dialyzer(no_improper_lists).
 -dialyzer(no_improper_lists).
 
 
 -elvis([{elvis_style, nesting_level, disable}]).
 -elvis([{elvis_style, nesting_level, disable}]).
@@ -261,13 +264,42 @@ ets(Cont, ContF) ->
 %% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once.
 %% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once.
 %% The .csv binary is assumed to be in UTF-8 encoding and to have a header row.
 %% The .csv binary is assumed to be in UTF-8 encoding and to have a header row.
 -spec csv(binary()) -> stream(map()).
 -spec csv(binary()) -> stream(map()).
-csv(Bin) when is_binary(Bin) ->
+csv(Bin) ->
+    csv(Bin, #{}).
+
+-spec csv(binary(), csv_parse_opts()) -> stream(map()).
+csv(Bin, Opts) when is_binary(Bin) ->
+    Liner =
+        case Opts of
+            #{nullable := true} ->
+                fun csv_read_nullable_line/1;
+            _ ->
+                fun csv_read_line/1
+        end,
+    Maper =
+        case Opts of
+            #{filter_null := true} ->
+                fun(Headers, Fields) ->
+                    maps:from_list(
+                        lists:filter(
+                            fun({_, Value}) ->
+                                Value =/= undefined
+                            end,
+                            lists:zip(Headers, Fields)
+                        )
+                    )
+                end;
+            _ ->
+                fun(Headers, Fields) ->
+                    maps:from_list(lists:zip(Headers, Fields))
+                end
+        end,
     Reader = fun _Iter(Headers, Lines) ->
     Reader = fun _Iter(Headers, Lines) ->
-        case csv_read_line(Lines) of
+        case Liner(Lines) of
             {Fields, Rest} ->
             {Fields, Rest} ->
                 case length(Fields) == length(Headers) of
                 case length(Fields) == length(Headers) of
                     true ->
                     true ->
-                        User = maps:from_list(lists:zip(Headers, Fields)),
+                        User = Maper(Headers, Fields),
                         [User | fun() -> _Iter(Headers, Rest) end];
                         [User | fun() -> _Iter(Headers, Rest) end];
                     false ->
                     false ->
                         error(bad_format)
                         error(bad_format)
@@ -291,6 +323,23 @@ csv_read_line([Line | Lines]) ->
 csv_read_line([]) ->
 csv_read_line([]) ->
     eof.
     eof.
 
 
+csv_read_nullable_line([Line | Lines]) ->
+    %% XXX: not support ' ' for the field value
+    Fields = lists:map(
+        fun(Bin) ->
+            case string:trim(Bin, both) of
+                <<>> ->
+                    undefined;
+                Any ->
+                    Any
+            end
+        end,
+        binary:split(Line, [<<",">>], [global])
+    ),
+    {Fields, Lines};
+csv_read_nullable_line([]) ->
+    eof.
+
 do_interleave(_Cont, _, [], []) ->
 do_interleave(_Cont, _, [], []) ->
     [];
     [];
 do_interleave(Cont, N, [{N, S} | Rest], Rev) ->
 do_interleave(Cont, N, [{N, S} | Rest], Rev) ->

+ 17 - 0
changes/ee/feat-13386.en.md

@@ -0,0 +1,17 @@
+Added a bootstrap file to batch loading banned data when initializing a single node or cluster, in other words, the import operation is performed only if there is no data in the database.
+
+
+This file is a CSV file with `,` as its delimiter.
+
+The first line of this file must be a header line. All valid headers are listed here:
+- as :: required
+- who :: required
+- by  :: optional
+- reason :: optional
+- at :: optional
+- until :: optional
+
+See the documentation for details on each field.
+
+Each row in the rest of this file must contain the same number of columns as the header line,
+and column can be omitted then its value will be `undefined`.

+ 18 - 0
rel/i18n/emqx_schema.hocon

@@ -1630,4 +1630,22 @@ client_attrs_init_set_as_attr {
     The extracted attribute will be stored in the `client_attrs` property with this name."""
     The extracted attribute will be stored in the `client_attrs` property with this name."""
 }
 }
 
 
+banned_bootstrap_file.desc:
+"""The bootstrap file is a CSV file used to batch loading banned data when initializing a single node or cluster, in other words, the import operation is performed only if there is no data in the database.
+
+The delimiter for this file is `,`.
+
+The first line of this file must be a header line. All valid headers are listed here:
+- as :: required
+- who :: required
+- by  :: optional
+- reason :: optional
+- at :: optional
+- until :: optional
+
+See the documentation for details on each field.
+
+Each row in the rest of this file must contain the same number of columns as the header line,
+and column can be omitted then its value will be `undefined`."""
+
 }
 }