Просмотр исходного кода

Merge pull request #12426 from zmstone/0130-syn-release-55-to-master

0130 sync release 55 to master
Zaiming (Stone) Shi 2 лет назад
Родитель
Сommit
e12e4b5b97
24 измененных файлов с 817 добавлено и 284 удалено
  1. 74 40
      apps/emqx/src/emqx_broker.erl
  2. 14 12
      apps/emqx/src/emqx_router_syncer.erl
  3. 5 1
      apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl
  4. 5 2
      apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl
  5. 115 24
      apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl
  6. 3 0
      apps/emqx_auth/test/data/user-credentials-plain-dup.csv
  7. 12 0
      apps/emqx_auth/test/data/user-credentials-plain-dup.json
  8. 3 0
      apps/emqx_auth/test/data/user-credentials-plain.csv
  9. 12 0
      apps/emqx_auth/test/data/user-credentials-plain.json
  10. 1 1
      apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src
  11. 153 109
      apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl
  12. 7 1
      apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl
  13. 142 3
      apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl
  14. 2 2
      apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl
  15. 6 8
      apps/emqx_management/src/emqx_mgmt_api_cluster.erl
  16. 24 21
      apps/emqx_prometheus/src/emqx_prometheus.erl
  17. 52 10
      apps/emqx_resource/src/emqx_resource_metrics.erl
  18. 53 0
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  19. 1 0
      apps/emqx_retainer/src/emqx_retainer_schema.erl
  20. 57 50
      apps/emqx_retainer/test/emqx_retainer_SUITE.erl
  21. 40 0
      apps/emqx_utils/src/emqx_utils_stream.erl
  22. 31 0
      apps/emqx_utils/test/emqx_utils_stream_tests.erl
  23. 4 0
      changes/ce/feat-12396.en.md
  24. 1 0
      changes/ce/fix-12404.en.md

+ 74 - 40
apps/emqx/src/emqx_broker.erl

@@ -85,6 +85,16 @@
 %% Guards
 -define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).
 
+-define(cast_or_eval(Pid, Msg, Expr),
+    case Pid =:= self() of
+        true ->
+            _ = Expr,
+            ok;
+        false ->
+            cast(Pid, Msg)
+    end
+).
+
 -spec start_link(atom(), pos_integer()) -> startlink_ret().
 start_link(Pool, Id) ->
     ok = create_tabs(),
@@ -159,15 +169,7 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) ->
     %% https://emqx.atlassian.net/browse/EMQX-10214
     I = emqx_broker_helper:get_sub_shard(SubPid, Topic),
     true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}),
-    %% NOTE
-    %% We are relying on the local state to minimize global routing state changes,
-    %% thus it's important that some operations on ETS tables on the same topic
-    %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of
-    %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of
-    %% `unsubscribe` codepath. So we have to pick a worker according to the topic,
-    %% but not shard. If there are topics with high number of shards, then the
-    %% load across the pool will be unbalanced.
-    Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}),
+    Sync = call(pick({Topic, I}), {subscribe, Topic, SubPid, I}),
     case Sync of
         ok ->
             ok;
@@ -218,15 +220,7 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when
         0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts);
         _ -> ok
     end,
-    %% NOTE
-    %% We are relying on the local state to minimize global routing state changes,
-    %% thus it's important that some operations on ETS tables on the same topic
-    %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of
-    %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of
-    %% `unsubscribe` codepath. So we have to pick a worker according to the topic,
-    %% but not shard. If there are topics with high number of shards, then the
-    %% load across the pool will be unbalanced.
-    cast(pick(Topic), {unsubscribed, Topic, SubPid, I});
+    cast(pick({Topic, I}), {unsubscribed, Topic, SubPid, I});
 do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when
     is_binary(Group), is_binary(Topic), is_pid(SubPid)
 ->
@@ -503,8 +497,8 @@ cast(Broker, Req) ->
     gen_server:cast(Broker, Req).
 
 %% Pick a broker
-pick(Topic) ->
-    gproc_pool:pick_worker(broker_pool, Topic).
+pick(TopicShard) ->
+    gproc_pool:pick_worker(broker_pool, TopicShard).
 
 %%--------------------------------------------------------------------
 %% gen_server callbacks
@@ -516,36 +510,72 @@ init([Pool, Id]) ->
 
 handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) ->
     Existed = ets:member(?SUBSCRIBER, Topic),
-    true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
-    Result = maybe_add_route(Existed, Topic, From),
-    {reply, Result, State};
-handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) ->
-    Existed = ets:member(?SUBSCRIBER, Topic),
-    true = ets:insert(?SUBSCRIBER, [
-        {Topic, {shard, I}},
-        {{shard, Topic, I}, SubPid}
-    ]),
     Result = maybe_add_route(Existed, Topic, From),
+    assert_ok_result(Result),
+    true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
     {reply, Result, State};
+handle_call({subscribe, Topic, SubPid, I}, _From, State) ->
+    Existed = ets:member(?SUBSCRIBER, {shard, Topic, I}),
+    Recs = [{{shard, Topic, I}, SubPid}],
+    Recs1 =
+        case Existed of
+            false ->
+                %% This will attempt to add a route per each new shard.
+                %% The overhead must be negligible, but the consistency in general
+                %% and race conditions safety is expected to be stronger.
+                %% The main purpose is to solve the race when
+                %% `{shard, Topic, N}` (where N > 0)
+                %% is the first ever processed subscribe request per `Topic`.
+                %% It inserts `{Topic, {shard, I}}` to `?SUBSCRIBER` tab.
+                %% After that, another broker worker starts processing
+                %% `{shard, Topic, 0}` sub and already observers `{shard, Topic, N}`,
+                %% i.e. `ets:member(?SUBSCRIBER, Topic)` returns false,
+                %% so it doesn't add the route.
+                %% Even if this happens, this cast is expected to be processed eventually
+                %% and the route should be added (unless the worker restarts...)
+                ?cast_or_eval(
+                    pick({Topic, 0}),
+                    {subscribed, Topic, shard, I},
+                    sync_route(add, Topic, #{})
+                ),
+                [{Topic, {shard, I}} | Recs];
+            true ->
+                Recs
+        end,
+    true = ets:insert(?SUBSCRIBER, Recs1),
+    {reply, ok, State};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.
 
+handle_cast({subscribed, Topic, shard, _I}, State) ->
+    %% Do not need to 'maybe add' (i.e. to check if the route exists).
+    %% It was already checked that this shard is newely added.
+    _ = sync_route(add, Topic, #{}),
+    {noreply, State};
+handle_cast({unsubscribed, Topic, shard, _I}, State) ->
+    _ = maybe_delete_route(Topic),
+    {noreply, State};
 handle_cast({unsubscribed, Topic, SubPid, 0}, State) ->
     true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
-    Exists = ets:member(?SUBSCRIBER, Topic),
-    _Result = maybe_delete_route(Exists, Topic),
+    _ = maybe_delete_route(Topic),
     {noreply, State};
 handle_cast({unsubscribed, Topic, SubPid, I}, State) ->
     true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
     case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
         false ->
-            ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}});
+            ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
+            %% Do not attempt to delete any routes here,
+            %% let it be handled only by the same pool worker per topic (0 shard),
+            %% so that all route deletes are serialized.
+            ?cast_or_eval(
+                pick({Topic, 0}),
+                {unsubscribed, Topic, shard, I},
+                maybe_delete_route(Topic)
+            );
         true ->
-            true
+            ok
     end,
-    Exists = ets:member(?SUBSCRIBER, Topic),
-    _Result = maybe_delete_route(Exists, Topic),
     {noreply, State};
 handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
@@ -584,7 +614,7 @@ do_dispatch(Topic, #delivery{message = Msg}) ->
             {ok, DispN}
     end.
 
-%% Donot dispatch to share subscriber here.
+%% Don't dispatch to share subscriber here.
 %% we do it in `emqx_shared_sub.erl` with configured strategy
 do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
     case erlang:is_process_alive(SubPid) of
@@ -605,15 +635,19 @@ do_dispatch({shard, I}, Topic, Msg) ->
 
 %%
 
+assert_ok_result(ok) -> ok;
+assert_ok_result(Ref) when is_reference(Ref) -> ok.
+
 maybe_add_route(_Existed = false, Topic, ReplyTo) ->
     sync_route(add, Topic, #{reply => ReplyTo});
 maybe_add_route(_Existed = true, _Topic, _ReplyTo) ->
     ok.
 
-maybe_delete_route(_Exists = false, Topic) ->
-    sync_route(delete, Topic, #{});
-maybe_delete_route(_Exists = true, _Topic) ->
-    ok.
+maybe_delete_route(Topic) ->
+    case ets:member(?SUBSCRIBER, Topic) of
+        true -> ok;
+        false -> sync_route(delete, Topic, #{})
+    end.
 
 sync_route(Action, Topic, ReplyTo) ->
     EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),

+ 14 - 12
apps/emqx/src/emqx_router_syncer.erl

@@ -97,7 +97,7 @@ push(Action, Topic, Dest, Opts) ->
     Context = mk_push_context(Opts),
     _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})),
     case Context of
-        {MRef, _} ->
+        [{MRef, _}] ->
             MRef;
         [] ->
             ok
@@ -128,7 +128,7 @@ designate_prio(delete, #{}) ->
 
 mk_push_context(#{reply := To}) ->
     MRef = erlang:make_ref(),
-    {MRef, To};
+    [{MRef, To}];
 mk_push_context(_) ->
     [].
 
@@ -272,8 +272,8 @@ send_replies(Errors, Batch) ->
 
 replyctx_send(_Result, []) ->
     noreply;
-replyctx_send(Result, {MRef, Pid}) ->
-    _ = erlang:send(Pid, {MRef, Result}),
+replyctx_send(Result, RefsPids) ->
+    _ = lists:foreach(fun({MRef, Pid}) -> erlang:send(Pid, {MRef, Result}) end, RefsPids),
     ok.
 
 %%
@@ -316,10 +316,11 @@ stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) ->
             Stash#{Route := RouteOpMerged}
     end.
 
-merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) ->
-    %% NOTE: This should not happen anyway.
-    _ = replyctx_send(ignored, Ctx1),
-    DestOp;
+merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), ?ROUTEOP(Action, Prio2, Ctx2)) ->
+    %% NOTE: This can happen as topic shard can be processed concurrently
+    %% by different broker worker, see emqx_broker for more details.
+    MergedCtx = Ctx1 ++ Ctx2,
+    ?ROUTEOP(Action, Prio2, MergedCtx);
 merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) ->
     %% NOTE: Latter cancel the former.
     %% Strictly speaking, in ideal conditions we could just cancel both, because they
@@ -352,7 +353,7 @@ stash_stats(Stash) ->
 
 batch_test() ->
     Dest = node(),
-    Ctx = fun(N) -> {N, self()} end,
+    Ctx = fun(N) -> [{N, self()}] end,
     Stash = stash_add(
         [
             ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))),
@@ -375,6 +376,7 @@ batch_test() ->
         stash_new()
     ),
     {Batch, StashLeft} = mk_batch(Stash, 5),
+
     ?assertMatch(
         #{
             {<<"t/1">>, Dest} := {add, ?PRIO_LO, _},
@@ -392,16 +394,16 @@ batch_test() ->
         },
         StashLeft
     ),
+
+    %% Replies are only sent to superseded ops:
     ?assertEqual(
         [
-            {2, ignored},
             {1, ok},
             {5, ok},
-            {7, ignored},
             {4, ok},
             {9, ok},
+            {7, ok},
             {8, ok},
-            {13, ignored},
             {11, ok}
         ],
         emqx_utils_stream:consume(emqx_utils_stream:mqueue(0))

+ 5 - 1
apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl

@@ -310,7 +310,11 @@ reorder_authenticator(_ChainName, []) ->
 reorder_authenticator(ChainName, AuthenticatorIDs) ->
     call({reorder_authenticator, ChainName, AuthenticatorIDs}).
 
--spec import_users(chain_name(), authenticator_id(), {binary(), binary()}) ->
+-spec import_users(
+    chain_name(),
+    authenticator_id(),
+    {plain | hash, prepared_user_list | binary(), binary()}
+) ->
     ok | {error, term()}.
 import_users(ChainName, AuthenticatorID, Filename) ->
     call({import_users, ChainName, AuthenticatorID, Filename}).

+ 5 - 2
apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl

@@ -53,11 +53,14 @@ when
 when
     State :: state().
 
--callback import_users({Filename, FileData}, State) ->
+-callback import_users({PasswordType, Filename, FileData}, State) ->
     ok
     | {error, term()}
 when
-    Filename :: binary(), FileData :: binary(), State :: state().
+    PasswordType :: plain | hash,
+    Filename :: prepared_user_list | binary(),
+    FileData :: binary(),
+    State :: state().
 
 -callback add_user(UserInfo, State) ->
     {ok, User}

+ 115 - 24
apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl

@@ -58,8 +58,8 @@ schema("/authentication/:id/import_users") ->
         post => #{
             tags => ?API_TAGS_GLOBAL,
             description => ?DESC(authentication_id_import_users_post),
-            parameters => [emqx_authn_api:param_auth_id()],
-            'requestBody' => emqx_dashboard_swagger:file_schema(filename),
+            parameters => [emqx_authn_api:param_auth_id(), param_password_type()],
+            'requestBody' => request_body_schema(),
             responses => #{
                 204 => <<"Users imported">>,
                 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
@@ -74,8 +74,12 @@ schema("/listeners/:listener_id/authentication/:id/import_users") ->
             tags => ?API_TAGS_SINGLE,
             deprecated => true,
             description => ?DESC(listeners_listener_id_authentication_id_import_users_post),
-            parameters => [emqx_authn_api:param_listener_id(), emqx_authn_api:param_auth_id()],
-            'requestBody' => emqx_dashboard_swagger:file_schema(filename),
+            parameters => [
+                emqx_authn_api:param_listener_id(),
+                emqx_authn_api:param_auth_id(),
+                param_password_type()
+            ],
+            'requestBody' => request_body_schema(),
             responses => #{
                 204 => <<"Users imported">>,
                 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
@@ -84,37 +88,124 @@ schema("/listeners/:listener_id/authentication/:id/import_users") ->
         }
     }.
 
+request_body_schema() ->
+    #{content := Content} = emqx_dashboard_swagger:file_schema(filename),
+    Content1 =
+        Content#{
+            <<"application/json">> => #{
+                schema => #{
+                    type => object,
+                    example => [
+                        #{
+                            <<"user_id">> => <<"user1">>,
+                            <<"password">> => <<"password1">>,
+                            <<"is_superuser">> => true
+                        },
+                        #{
+                            <<"user_id">> => <<"user2">>,
+                            <<"password">> => <<"password2">>,
+                            <<"is_superuser">> => false
+                        }
+                    ]
+                }
+            }
+        },
+    #{
+        content => Content1,
+        description => <<"Import body">>
+    }.
+
 authenticator_import_users(
     post,
-    #{
+    Req = #{
         bindings := #{id := AuthenticatorID},
-        body := #{<<"filename">> := #{type := _} = File}
+        headers := Headers,
+        body := Body
     }
 ) ->
-    [{FileName, FileData}] = maps:to_list(maps:without([type], File)),
-    case emqx_authn_chains:import_users(?GLOBAL, AuthenticatorID, {FileName, FileData}) of
+    PasswordType = password_type(Req),
+    Result =
+        case maps:get(<<"content-type">>, Headers, undefined) of
+            <<"application/json">> ->
+                emqx_authn_chains:import_users(
+                    ?GLOBAL, AuthenticatorID, {PasswordType, prepared_user_list, Body}
+                );
+            _ ->
+                case Body of
+                    #{<<"filename">> := #{type := _} = File} ->
+                        [{Name, Data}] = maps:to_list(maps:without([type], File)),
+                        emqx_authn_chains:import_users(
+                            ?GLOBAL, AuthenticatorID, {PasswordType, Name, Data}
+                        );
+                    _ ->
+                        {error, {missing_parameter, filename}}
+                end
+        end,
+    case Result of
         ok -> {204};
         {error, Reason} -> emqx_authn_api:serialize_error(Reason)
-    end;
-authenticator_import_users(post, #{bindings := #{id := _}, body := _}) ->
-    emqx_authn_api:serialize_error({missing_parameter, filename}).
+    end.
 
 listener_authenticator_import_users(
     post,
-    #{
+    Req = #{
         bindings := #{listener_id := ListenerID, id := AuthenticatorID},
-        body := #{<<"filename">> := #{type := _} = File}
+        headers := Headers,
+        body := Body
     }
 ) ->
-    [{FileName, FileData}] = maps:to_list(maps:without([type], File)),
-    emqx_authn_api:with_chain(
-        ListenerID,
-        fun(ChainName) ->
-            case emqx_authn_chains:import_users(ChainName, AuthenticatorID, {FileName, FileData}) of
-                ok -> {204};
-                {error, Reason} -> emqx_authn_api:serialize_error(Reason)
+    PasswordType = password_type(Req),
+
+    DoImport = fun(FileName, FileData) ->
+        emqx_authn_api:with_chain(
+            ListenerID,
+            fun(ChainName) ->
+                case
+                    emqx_authn_chains:import_users(
+                        ChainName, AuthenticatorID, {PasswordType, FileName, FileData}
+                    )
+                of
+                    ok -> {204};
+                    {error, Reason} -> emqx_authn_api:serialize_error(Reason)
+                end
             end
-        end
-    );
-listener_authenticator_import_users(post, #{bindings := #{listener_id := _, id := _}, body := _}) ->
-    emqx_authn_api:serialize_error({missing_parameter, filename}).
+        )
+    end,
+    case maps:get(<<"content-type">>, Headers, undefined) of
+        <<"application/json">> ->
+            DoImport(prepared_user_list, Body);
+        _ ->
+            case Body of
+                #{<<"filename">> := #{type := _} = File} ->
+                    [{Name, Data}] = maps:to_list(maps:without([type], File)),
+                    DoImport(Name, Data);
+                _ ->
+                    emqx_authn_api:serialize_error({missing_parameter, filename})
+            end
+    end.
+
+%%--------------------------------------------------------------------
+%% helpers
+
+param_password_type() ->
+    {type,
+        hoconsc:mk(
+            binary(),
+            #{
+                in => query,
+                enum => [<<"plain">>, <<"hash">>],
+                required => true,
+                desc => <<
+                    "The import file template type, enum with `plain`,"
+                    "`hash`"
+                >>,
+                example => <<"hash">>
+            }
+        )}.
+
+password_type(_Req = #{query_string := #{<<"type">> := <<"plain">>}}) ->
+    plain;
+password_type(_Req = #{query_string := #{<<"type">> := <<"hash">>}}) ->
+    hash;
+password_type(_) ->
+    hash.

+ 3 - 0
apps/emqx_auth/test/data/user-credentials-plain-dup.csv

@@ -0,0 +1,3 @@
+user_id,password,is_superuser
+myuser3,password3,true
+myuser3,password4,false

+ 12 - 0
apps/emqx_auth/test/data/user-credentials-plain-dup.json

@@ -0,0 +1,12 @@
+[
+    {
+        "user_id":"myuser1",
+        "password":"password1",
+        "is_superuser": true
+    },
+    {
+        "user_id":"myuser1",
+        "password":"password2",
+        "is_superuser": false
+    }
+]

+ 3 - 0
apps/emqx_auth/test/data/user-credentials-plain.csv

@@ -0,0 +1,3 @@
+user_id,password,is_superuser
+myuser3,password3,true
+myuser4,password4,false

+ 12 - 0
apps/emqx_auth/test/data/user-credentials-plain.json

@@ -0,0 +1,12 @@
+[
+    {
+        "user_id":"myuser1",
+        "password":"password1",
+        "is_superuser": true
+    },
+    {
+        "user_id":"myuser2",
+        "password":"password2",
+        "is_superuser": false
+    }
+]

+ 1 - 1
apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_mnesia, [
     {description, "EMQX Buitl-in Database Authentication and Authorization"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {mod, {emqx_auth_mnesia_app, []}},
     {applications, [

+ 153 - 109
apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl

@@ -52,9 +52,7 @@
     do_destroy/1,
     do_add_user/1,
     do_delete_user/2,
-    do_update_user/3,
-    import/2,
-    import_csv/3
+    do_update_user/3
 ]).
 
 -export([mnesia/1, init_tables/0]).
@@ -173,20 +171,67 @@ do_destroy(UserGroup) ->
         mnesia:select(?TAB, group_match_spec(UserGroup), write)
     ).
 
-import_users({Filename0, FileData}, State) ->
-    Filename = to_binary(Filename0),
-    case filename:extension(Filename) of
-        <<".json">> ->
-            import_users_from_json(FileData, State);
-        <<".csv">> ->
-            CSV = csv_data(FileData),
-            import_users_from_csv(CSV, State);
-        <<>> ->
-            {error, unknown_file_format};
-        Extension ->
-            {error, {unsupported_file_format, Extension}}
+import_users({PasswordType, Filename, FileData}, State) ->
+    Convertor = convertor(PasswordType, State),
+    try
+        {_NewUsersCnt, Users} = parse_import_users(Filename, FileData, Convertor),
+        case length(Users) > 0 andalso do_import_users(Users) of
+            false ->
+                error(empty_users);
+            ok ->
+                ok;
+            {error, Reason} ->
+                _ = do_clean_imported_users(Users),
+                error(Reason)
+        end
+    catch
+        error:Reason1:Stk ->
+            ?SLOG(
+                warning,
+                #{
+                    msg => "import_users_failed",
+                    reason => Reason1,
+                    type => PasswordType,
+                    filename => Filename,
+                    stacktrace => Stk
+                }
+            ),
+            {error, Reason1}
     end.
 
+do_import_users(Users) ->
+    trans(
+        fun() ->
+            lists:foreach(
+                fun(
+                    #{
+                        <<"user_group">> := UserGroup,
+                        <<"user_id">> := UserID,
+                        <<"password_hash">> := PasswordHash,
+                        <<"salt">> := Salt,
+                        <<"is_superuser">> := IsSuperuser
+                    }
+                ) ->
+                    insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser)
+                end,
+                Users
+            )
+        end
+    ).
+
+do_clean_imported_users(Users) ->
+    lists:foreach(
+        fun(
+            #{
+                <<"user_group">> := UserGroup,
+                <<"user_id">> := UserID
+            }
+        ) ->
+            mria:dirty_delete(?TAB, {UserGroup, UserID})
+        end,
+        Users
+    ).
+
 add_user(
     UserInfo,
     State
@@ -293,93 +338,6 @@ run_fuzzy_filter(
 %% Internal functions
 %%------------------------------------------------------------------------------
 
-%% Example: data/user-credentials.json
-import_users_from_json(Bin, #{user_group := UserGroup}) ->
-    case emqx_utils_json:safe_decode(Bin, [return_maps]) of
-        {ok, List} ->
-            trans(fun ?MODULE:import/2, [UserGroup, List]);
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-%% Example: data/user-credentials.csv
-import_users_from_csv(CSV, #{user_group := UserGroup}) ->
-    case get_csv_header(CSV) of
-        {ok, Seq, NewCSV} ->
-            trans(fun ?MODULE:import_csv/3, [UserGroup, NewCSV, Seq]);
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-import(_UserGroup, []) ->
-    ok;
-import(UserGroup, [
-    #{
-        <<"user_id">> := UserID,
-        <<"password_hash">> := PasswordHash
-    } = UserInfo
-    | More
-]) when
-    is_binary(UserID) andalso is_binary(PasswordHash)
-->
-    Salt = maps:get(<<"salt">>, UserInfo, <<>>),
-    IsSuperuser = maps:get(<<"is_superuser">>, UserInfo, false),
-    insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
-    import(UserGroup, More);
-import(_UserGroup, [_ | _More]) ->
-    {error, bad_format}.
-
-%% Importing 5w users needs 1.7 seconds
-import_csv(UserGroup, CSV, Seq) ->
-    case csv_read_line(CSV) of
-        {ok, Line, NewCSV} ->
-            Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]),
-            case get_user_info_by_seq(Fields, Seq) of
-                {ok,
-                    #{
-                        user_id := UserID,
-                        password_hash := PasswordHash
-                    } = UserInfo} ->
-                    Salt = maps:get(salt, UserInfo, <<>>),
-                    IsSuperuser = maps:get(is_superuser, UserInfo, false),
-                    insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
-                    import_csv(UserGroup, NewCSV, Seq);
-                {error, Reason} ->
-                    {error, Reason}
-            end;
-        eof ->
-            ok
-    end.
-
-get_csv_header(CSV) ->
-    case csv_read_line(CSV) of
-        {ok, Line, NewCSV} ->
-            Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]),
-            {ok, Seq, NewCSV};
-        eof ->
-            {error, empty_file}
-    end.
-
-get_user_info_by_seq(Fields, Seq) ->
-    get_user_info_by_seq(Fields, Seq, #{}).
-
-get_user_info_by_seq([], [], #{user_id := _, password_hash := _} = Acc) ->
-    {ok, Acc};
-get_user_info_by_seq(_, [], _) ->
-    {error, bad_format};
-get_user_info_by_seq([UserID | More1], [<<"user_id">> | More2], Acc) ->
-    get_user_info_by_seq(More1, More2, Acc#{user_id => UserID});
-get_user_info_by_seq([PasswordHash | More1], [<<"password_hash">> | More2], Acc) ->
-    get_user_info_by_seq(More1, More2, Acc#{password_hash => PasswordHash});
-get_user_info_by_seq([Salt | More1], [<<"salt">> | More2], Acc) ->
-    get_user_info_by_seq(More1, More2, Acc#{salt => Salt});
-get_user_info_by_seq([<<"true">> | More1], [<<"is_superuser">> | More2], Acc) ->
-    get_user_info_by_seq(More1, More2, Acc#{is_superuser => true});
-get_user_info_by_seq([<<"false">> | More1], [<<"is_superuser">> | More2], Acc) ->
-    get_user_info_by_seq(More1, More2, Acc#{is_superuser => false});
-get_user_info_by_seq(_, _, _) ->
-    {error, bad_format}.
-
 insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser) ->
     UserInfoRecord = user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
     insert_user(UserInfoRecord).
@@ -449,6 +407,12 @@ trans(Fun, Args) ->
         {aborted, Reason} -> {error, Reason}
     end.
 
+trans(Fun) ->
+    case mria:transaction(?AUTHN_SHARD, Fun) of
+        {atomic, Res} -> Res;
+        {aborted, Reason} -> {error, Reason}
+    end.
+
 to_binary(B) when is_binary(B) ->
     B;
 to_binary(L) when is_list(L) ->
@@ -482,11 +446,91 @@ group_match_spec(UserGroup, QString) ->
             end)
     end.
 
-csv_data(Data) ->
-    Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]),
-    {csv_data, Lines}.
+%%--------------------------------------------------------------------
+%% parse import file/data
+
+parse_import_users(Filename, FileData, Convertor) ->
+    Eval = fun _Eval(F) ->
+        case F() of
+            [] -> [];
+            [User | F1] -> [Convertor(User) | _Eval(F1)]
+        end
+    end,
+    ReaderFn = reader_fn(Filename, FileData),
+    Users = Eval(ReaderFn),
+    NewUsersCount =
+        lists:foldl(
+            fun(
+                #{
+                    <<"user_group">> := UserGroup,
+                    <<"user_id">> := UserID
+                },
+                Acc
+            ) ->
+                case ets:member(?TAB, {UserGroup, UserID}) of
+                    true ->
+                        Acc;
+                    false ->
+                        Acc + 1
+                end
+            end,
+            0,
+            Users
+        ),
+    {NewUsersCount, Users}.
+
+reader_fn(prepared_user_list, List) when is_list(List) ->
+    %% Example: [#{<<"user_id">> => <<>>, ...}]
+    emqx_utils_stream:list(List);
+reader_fn(Filename0, Data) ->
+    case filename:extension(to_binary(Filename0)) of
+        <<".json">> ->
+            %% Example: data/user-credentials.json
+            case emqx_utils_json:safe_decode(Data, [return_maps]) of
+                {ok, List} when is_list(List) ->
+                    emqx_utils_stream:list(List);
+                {ok, _} ->
+                    error(unknown_file_format);
+                {error, Reason} ->
+                    error(Reason)
+            end;
+        <<".csv">> ->
+            %% Example: data/user-credentials.csv
+            emqx_utils_stream:csv(Data);
+        <<>> ->
+            error(unknown_file_format);
+        Extension ->
+            error({unsupported_file_format, Extension})
+    end.
 
-csv_read_line({csv_data, [Line | Lines]}) ->
-    {ok, Line, {csv_data, Lines}};
-csv_read_line({csv_data, []}) ->
-    eof.
+convertor(PasswordType, State) ->
+    fun(User) ->
+        convert_user(User, PasswordType, State)
+    end.
+
+convert_user(
+    User = #{<<"user_id">> := UserId},
+    PasswordType,
+    #{user_group := UserGroup, password_hash_algorithm := Algorithm}
+) ->
+    {PasswordHash, Salt} = find_password_hash(PasswordType, User, Algorithm),
+    #{
+        <<"user_id">> => UserId,
+        <<"password_hash">> => PasswordHash,
+        <<"salt">> => Salt,
+        <<"is_superuser">> => is_superuser(User),
+        <<"user_group">> => UserGroup
+    };
+convert_user(_, _, _) ->
+    error(bad_format).
+
+find_password_hash(hash, User = #{<<"password_hash">> := PasswordHash}, _) ->
+    {PasswordHash, maps:get(<<"salt">>, User, <<>>)};
+find_password_hash(plain, #{<<"password">> := Password}, Algorithm) ->
+    emqx_authn_password_hashing:hash(Algorithm, Password);
+find_password_hash(_, _, _) ->
+    error(bad_format).
+
+is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true;
+is_superuser(#{<<"is_superuser">> := true}) -> true;
+is_superuser(_) -> false.

+ 7 - 1
apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl

@@ -336,7 +336,13 @@ test_authenticator_import_users(PathPrefix) ->
     {ok, CSVData} = file:read_file(CSVFileName),
     {ok, 204, _} = multipart_formdata_request(ImportUri, [], [
         {filename, "user-credentials.csv", CSVData}
-    ]).
+    ]),
+
+    %% test application/json
+    {ok, 204, _} = request(post, ImportUri ++ "?type=hash", emqx_utils_json:decode(JSONData)),
+    {ok, JSONData1} = file:read_file(filename:join([Dir, <<"data/user-credentials-plain.json">>])),
+    {ok, 204, _} = request(post, ImportUri ++ "?type=plain", emqx_utils_json:decode(JSONData1)),
+    ok.
 
 %%------------------------------------------------------------------------------
 %% Helpers

+ 142 - 3
apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl

@@ -216,7 +216,7 @@ t_import_users(_) ->
     ?assertMatch(
         {error, {unsupported_file_format, _}},
         emqx_authn_mnesia:import_users(
-            {<<"/file/with/unknown.extension">>, <<>>},
+            {hash, <<"/file/with/unknown.extension">>, <<>>},
             State
         )
     ),
@@ -224,7 +224,7 @@ t_import_users(_) ->
     ?assertEqual(
         {error, unknown_file_format},
         emqx_authn_mnesia:import_users(
-            {<<"/file/with/no/extension">>, <<>>},
+            {hash, <<"/file/with/no/extension">>, <<>>},
             State
         )
     ),
@@ -251,6 +251,142 @@ t_import_users(_) ->
             sample_filename_and_data(<<"user-credentials-malformed.csv">>),
             State
         )
+    ),
+
+    ?assertEqual(
+        {error, empty_users},
+        emqx_authn_mnesia:import_users(
+            {hash, <<"empty_users.json">>, <<"[]">>},
+            State
+        )
+    ),
+
+    ?assertEqual(
+        {error, empty_users},
+        emqx_authn_mnesia:import_users(
+            {hash, <<"empty_users.csv">>, <<>>},
+            State
+        )
+    ),
+
+    ?assertEqual(
+        {error, empty_users},
+        emqx_authn_mnesia:import_users(
+            {hash, prepared_user_list, []},
+            State
+        )
+    ).
+
+t_import_users_plain(_) ->
+    Config0 = config(),
+    Config = Config0#{password_hash_algorithm => #{name => sha256, salt_position => suffix}},
+    {ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
+
+    ?assertEqual(
+        ok,
+        emqx_authn_mnesia:import_users(
+            sample_filename_and_data(plain, <<"user-credentials-plain.json">>),
+            State
+        )
+    ),
+
+    ?assertEqual(
+        ok,
+        emqx_authn_mnesia:import_users(
+            sample_filename_and_data(plain, <<"user-credentials-plain.csv">>),
+            State
+        )
+    ).
+
+t_import_users_prepared_list(_) ->
+    Config0 = config(),
+    Config = Config0#{password_hash_algorithm => #{name => sha256, salt_position => suffix}},
+    {ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
+
+    Users1 = [
+        #{<<"user_id">> => <<"u1">>, <<"password">> => <<"p1">>, <<"is_superuser">> => true},
+        #{<<"user_id">> => <<"u2">>, <<"password">> => <<"p2">>, <<"is_superuser">> => true}
+    ],
+    Users2 = [
+        #{
+            <<"user_id">> => <<"u3">>,
+            <<"password_hash">> =>
+                <<"c5e46903df45e5dc096dc74657610dbee8deaacae656df88a1788f1847390242">>,
+            <<"salt">> => <<"e378187547bf2d6f0545a3f441aa4d8a">>,
+            <<"is_superuser">> => true
+        },
+        #{
+            <<"user_id">> => <<"u4">>,
+            <<"password_hash">> =>
+                <<"f4d17f300b11e522fd33f497c11b126ef1ea5149c74d2220f9a16dc876d4567b">>,
+            <<"salt">> => <<"6d3f9bd5b54d94b98adbcfe10b6d181f">>,
+            <<"is_superuser">> => true
+        }
+    ],
+
+    ?assertEqual(
+        ok,
+        emqx_authn_mnesia:import_users(
+            {plain, prepared_user_list, Users1},
+            State
+        )
+    ),
+
+    ?assertEqual(
+        ok,
+        emqx_authn_mnesia:import_users(
+            {hash, prepared_user_list, Users2},
+            State
+        )
+    ).
+
+t_import_users_duplicated_records(_) ->
+    Config0 = config(),
+    Config = Config0#{password_hash_algorithm => #{name => plain, salt_position => disable}},
+    {ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
+
+    ?assertEqual(
+        ok,
+        emqx_authn_mnesia:import_users(
+            sample_filename_and_data(plain, <<"user-credentials-plain-dup.json">>),
+            State
+        )
+    ),
+    ?assertEqual(
+        ok,
+        emqx_authn_mnesia:import_users(
+            sample_filename_and_data(plain, <<"user-credentials-plain-dup.csv">>),
+            State
+        )
+    ),
+    Users1 = [
+        #{
+            <<"user_id">> => <<"myuser5">>,
+            <<"password">> => <<"password5">>,
+            <<"is_superuser">> => true
+        },
+        #{
+            <<"user_id">> => <<"myuser5">>,
+            <<"password">> => <<"password6">>,
+            <<"is_superuser">> => false
+        }
+    ],
+    ?assertEqual(
+        ok,
+        emqx_authn_mnesia:import_users(
+            {plain, prepared_user_list, Users1},
+            State
+        )
+    ),
+
+    %% assert: the last record overwrites the previous one
+    ?assertMatch(
+        [
+            {user_info, {_, <<"myuser1">>}, <<"password2">>, _, false},
+            {user_info, {_, <<"myuser3">>}, <<"password4">>, _, false},
+            {user_info, {_, <<"myuser5">>}, <<"password6">>, _, false}
+        ],
+        ets:tab2list(emqx_authn_mnesia)
     ).
 
 %%------------------------------------------------------------------------------
@@ -262,9 +398,12 @@ sample_filename(Name) ->
     filename:join([Dir, <<"data">>, Name]).
 
 sample_filename_and_data(Name) ->
+    sample_filename_and_data(hash, Name).
+
+sample_filename_and_data(Type, Name) ->
     Filename = sample_filename(Name),
     {ok, Data} = file:read_file(Filename),
-    {Filename, Data}.
+    {Type, Filename, Data}.
 
 config() ->
     #{

+ 2 - 2
apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl

@@ -81,7 +81,7 @@ import_users(post, #{
                 [{FileName, FileData}] = maps:to_list(maps:without([type], File)),
                 case
                     emqx_authn_chains:import_users(
-                        ChainName, AuthId, {FileName, FileData}
+                        ChainName, AuthId, {hash, FileName, FileData}
                     )
                 of
                     ok -> {204};
@@ -105,7 +105,7 @@ import_listener_users(post, #{
                     [{FileName, FileData}] = maps:to_list(maps:without([type], File)),
                     case
                         emqx_authn_chains:import_users(
-                            ChainName, AuthId, {FileName, FileData}
+                            ChainName, AuthId, {hash, FileName, FileData}
                         )
                     of
                         ok -> {204};

+ 6 - 8
apps/emqx_management/src/emqx_mgmt_api_cluster.erl

@@ -206,11 +206,10 @@ fields(node_invitation_succeed) ->
         [
             {finished_at,
                 ?HOCON(
-                    emqx_utils_calendar:epoch_millisecond(),
+                    binary(),
                     #{
-                        desc =>
-                            <<"The time of the async invitation result is received, millisecond precision epoch">>,
-                        example => <<"1705044829915">>
+                        desc => <<"The time of the async invitation result is received">>,
+                        example => <<"2024-01-30T15:24:39.355+08:00">>
                     }
                 )}
         ];
@@ -223,11 +222,10 @@ fields(node_invitation_in_progress) ->
             )},
         {started_at,
             ?HOCON(
-                emqx_utils_calendar:epoch_millisecond(),
+                binary(),
                 #{
-                    desc =>
-                        <<"The start timestamp of the invitation, millisecond precision epoch">>,
-                    example => <<"1705044829915">>
+                    desc => <<"The time of the async invitation is started">>,
+                    example => <<"2024-01-30T15:24:39.355+08:00">>
                 }
             )}
     ].

+ 24 - 21
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -991,12 +991,13 @@ catch_all(DataFun) ->
 collect_stats_json_data(StatsData, StatsClData) ->
     StatsDatas = collect_json_data_(StatsData),
     CLData = hd(collect_json_data_(StatsClData)),
-    lists:map(
+    Res = lists:map(
         fun(NodeData) ->
             maps:merge(NodeData, CLData)
         end,
         StatsDatas
-    ).
+    ),
+    json_obj_or_array(Res).
 
 %% always return json array
 collect_cert_json_data(Data) ->
@@ -1004,32 +1005,34 @@ collect_cert_json_data(Data) ->
 
 collect_vm_json_data(Data) ->
     DataListPerNode = collect_json_data_(Data),
-    case {?GET_PROM_DATA_MODE(), DataListPerNode} of
-        {?PROM_DATA_MODE__NODE, [NData | _]} ->
-            NData;
-        {_, _} ->
+    case ?GET_PROM_DATA_MODE() of
+        ?PROM_DATA_MODE__NODE ->
+            hd(DataListPerNode);
+        _ ->
             DataListPerNode
     end.
 
 collect_json_data(Data0) ->
     DataListPerNode = collect_json_data_(Data0),
-    case {?GET_PROM_DATA_MODE(), DataListPerNode} of
-        %% all nodes results unaggregated, should be a list
-        {?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, _} ->
-            DataListPerNode;
-        %% only local node result [#{...}]
-        %% To guaranteed compatibility, return a json object, not array
-        {?PROM_DATA_MODE__NODE, [NData | _]} ->
-            NData;
-        %% All nodes results aggregated
-        %% return a json object, not array
-        {?PROM_DATA_MODE__ALL_NODES_AGGREGATED, [NData | _]} ->
-            NData;
-        %% olp maybe not enabled, with empty list to empty object
-        {_, []} ->
-            #{}
+    json_obj_or_array(DataListPerNode).
+
+%% compatibility with previous api format in json mode
+json_obj_or_array(DataL) ->
+    case ?GET_PROM_DATA_MODE() of
+        ?PROM_DATA_MODE__NODE ->
+            data_list_to_json_obj(DataL);
+        ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED ->
+            DataL;
+        ?PROM_DATA_MODE__ALL_NODES_AGGREGATED ->
+            data_list_to_json_obj(DataL)
     end.
 
+data_list_to_json_obj([]) ->
+    %% olp maybe not enabled, with empty list to empty object
+    #{};
+data_list_to_json_obj(DataL) ->
+    hd(DataL).
+
 collect_json_data_(Data) ->
     emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_prom_stats_metrics/3).
 

+ 52 - 10
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_resource_metrics).
 
+-include_lib("emqx/include/logger.hrl").
+
 -export([
     events/0,
     install_telemetry_handler/1,
@@ -118,6 +120,52 @@ handle_telemetry_event(
     _Metadata = #{resource_id := ID},
     _HandlerConfig
 ) ->
+    try
+        handle_counter_telemetry_event(Event, ID, Val)
+    catch
+        Kind:Reason:Stacktrace ->
+            %% We catch errors to avoid detaching the telemetry handler function.
+            %% When restarting a resource while it's under load, there might be transient
+            %% failures while the metrics are not yet created.
+            ?SLOG(warning, #{
+                msg => "handle_resource_metrics_failed",
+                hint => "transient failures may occur when restarting a resource",
+                kind => Kind,
+                reason => Reason,
+                stacktrace => Stacktrace,
+                resource_id => ID,
+                event => Event
+            }),
+            ok
+    end;
+handle_telemetry_event(
+    [?TELEMETRY_PREFIX, Event],
+    _Measurements = #{gauge_set := Val},
+    _Metadata = #{resource_id := ID, worker_id := WorkerID},
+    _HandlerConfig
+) ->
+    try
+        handle_gauge_telemetry_event(Event, ID, WorkerID, Val)
+    catch
+        Kind:Reason:Stacktrace ->
+            %% We catch errors to avoid detaching the telemetry handler function.
+            %% When restarting a resource while it's under load, there might be transient
+            %% failures while the metrics are not yet created.
+            ?SLOG(warning, #{
+                msg => "handle_resource_metrics_failed",
+                hint => "transient failures may occur when restarting a resource",
+                kind => Kind,
+                reason => Reason,
+                stacktrace => Stacktrace,
+                resource_id => ID,
+                event => Event
+            }),
+            ok
+    end;
+handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
+    ok.
+
+handle_counter_telemetry_event(Event, ID, Val) ->
     case Event of
         dropped_other ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
@@ -154,13 +202,9 @@ handle_telemetry_event(
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val);
         _ ->
             ok
-    end;
-handle_telemetry_event(
-    [?TELEMETRY_PREFIX, Event],
-    _Measurements = #{gauge_set := Val},
-    _Metadata = #{resource_id := ID, worker_id := WorkerID},
-    _HandlerConfig
-) ->
+    end.
+
+handle_gauge_telemetry_event(Event, ID, WorkerID, Val) ->
     case Event of
         inflight ->
             emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
@@ -168,9 +212,7 @@ handle_telemetry_event(
             emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
         _ ->
             ok
-    end;
-handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
-    ok.
+    end.
 
 %% Gauges (value can go both up and down):
 %% --------------------------------------

+ 53 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -28,6 +28,7 @@
 -define(DEFAULT_RESOURCE_GROUP, <<"default">>).
 -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
 -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
+-define(TELEMETRY_PREFIX, emqx, resource).
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -3006,6 +3007,36 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) ->
         end
     ).
 
+t_telemetry_handler_crash(_Config) ->
+    %% Check that a crash while handling a telemetry event, such as when a busy resource
+    %% is restarted and its metrics are not recreated while handling an increment, does
+    %% not lead to the handler being uninstalled.
+    ?check_trace(
+        begin
+            NonExistentId = <<"I-dont-exist">>,
+            WorkerId = 1,
+            HandlersBefore = telemetry:list_handlers([?TELEMETRY_PREFIX]),
+            ?assertMatch([_ | _], HandlersBefore),
+            lists:foreach(fun(Fn) -> Fn(NonExistentId) end, counter_metric_inc_fns()),
+            emqx_common_test_helpers:with_mock(
+                emqx_metrics_worker,
+                set_gauge,
+                fun(_Name, _Id, _WorkerId, _Metric, _Val) ->
+                    error(random_crash)
+                end,
+                fun() ->
+                    lists:foreach(
+                        fun(Fn) -> Fn(NonExistentId, WorkerId, 1) end, gauge_metric_set_fns()
+                    )
+                end
+            ),
+            ?assertEqual(HandlersBefore, telemetry:list_handlers([?TELEMETRY_PREFIX])),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
@@ -3235,3 +3266,25 @@ do_wait_until_all_marked_as_retriable(NumExpected, Seen) ->
                     })
             end
     end.
+
+counter_metric_inc_fns() ->
+    Mod = emqx_resource_metrics,
+    [
+        fun Mod:Fn/1
+     || {Fn, 1} <- Mod:module_info(functions),
+        case string:find(atom_to_list(Fn), "_inc", trailing) of
+            "_inc" -> true;
+            _ -> false
+        end
+    ].
+
+gauge_metric_set_fns() ->
+    Mod = emqx_resource_metrics,
+    [
+        fun Mod:Fn/3
+     || {Fn, 3} <- Mod:module_info(functions),
+        case string:find(atom_to_list(Fn), "_set", trailing) of
+            "_set" -> true;
+            _ -> false
+        end
+    ].

+ 1 - 0
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -81,6 +81,7 @@ fields("retainer") ->
                 #{
                     required => false,
                     desc => ?DESC(delivery_rate),
+                    default => <<"1000/s">>,
                     example => <<"1000/s">>,
                     aliases => [deliver_rate]
                 }

+ 57 - 50
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -256,61 +256,67 @@ t_wildcard_subscription(_) ->
     ok = emqtt:disconnect(C1).
 
 t_message_expiry(_) ->
-    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C1),
+    ConfMod = fun(Conf) ->
+        Conf#{<<"delivery_rate">> := <<"infinity">>}
+    end,
+    Case = fun() ->
+        {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+        {ok, _} = emqtt:connect(C1),
 
-    emqtt:publish(
-        C1,
-        <<"retained/0">>,
-        #{'Message-Expiry-Interval' => 0},
-        <<"don't expire">>,
-        [{qos, 0}, {retain, true}]
-    ),
-    emqtt:publish(
-        C1,
-        <<"retained/1">>,
-        #{'Message-Expiry-Interval' => 2},
-        <<"expire">>,
-        [{qos, 0}, {retain, true}]
-    ),
-    emqtt:publish(
-        C1,
-        <<"retained/2">>,
-        #{'Message-Expiry-Interval' => 5},
-        <<"don't expire">>,
-        [{qos, 0}, {retain, true}]
-    ),
-    emqtt:publish(
-        C1,
-        <<"retained/3">>,
-        <<"don't expire">>,
-        [{qos, 0}, {retain, true}]
-    ),
-    emqtt:publish(
-        C1,
-        <<"$SYS/retained/4">>,
-        <<"don't expire">>,
-        [{qos, 0}, {retain, true}]
-    ),
+        emqtt:publish(
+            C1,
+            <<"retained/0">>,
+            #{'Message-Expiry-Interval' => 0},
+            <<"don't expire">>,
+            [{qos, 0}, {retain, true}]
+        ),
+        emqtt:publish(
+            C1,
+            <<"retained/1">>,
+            #{'Message-Expiry-Interval' => 2},
+            <<"expire">>,
+            [{qos, 0}, {retain, true}]
+        ),
+        emqtt:publish(
+            C1,
+            <<"retained/2">>,
+            #{'Message-Expiry-Interval' => 5},
+            <<"don't expire">>,
+            [{qos, 0}, {retain, true}]
+        ),
+        emqtt:publish(
+            C1,
+            <<"retained/3">>,
+            <<"don't expire">>,
+            [{qos, 0}, {retain, true}]
+        ),
+        emqtt:publish(
+            C1,
+            <<"$SYS/retained/4">>,
+            <<"don't expire">>,
+            [{qos, 0}, {retain, true}]
+        ),
 
-    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
-    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
-    ?assertEqual(5, length(receive_messages(5))),
-    {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
-    {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>),
+        {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
+        {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
+        ?assertEqual(5, length(receive_messages(5))),
+        {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
+        {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>),
 
-    timer:sleep(3000),
-    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
-    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
-    ?assertEqual(4, length(receive_messages(5))),
+        timer:sleep(3000),
+        {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
+        {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
+        ?assertEqual(4, length(receive_messages(5))),
 
-    emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
-    emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]),
-    emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]),
-    emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]),
-    emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]),
+        emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
+        emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]),
+        emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]),
+        emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]),
+        emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]),
 
-    ok = emqtt:disconnect(C1).
+        ok = emqtt:disconnect(C1)
+    end,
+    with_conf(ConfMod, Case).
 
 t_message_expiry_2(_) ->
     ConfMod = fun(Conf) ->
@@ -410,6 +416,7 @@ t_flow_control(_) ->
     JsonCfg = make_limiter_json(<<"1/1s">>),
     emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
     emqx_retainer:update_config(#{
+        <<"delivery_rate">> => <<"1/1s">>,
         <<"flow_control">> =>
             #{
                 <<"batch_read_number">> => 1,

+ 40 - 0
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -37,6 +37,11 @@
     ets/1
 ]).
 
+%% Streams from .csv data
+-export([
+    csv/1
+]).
+
 -export_type([stream/1]).
 
 %% @doc A stream is essentially a lazy list.
@@ -45,6 +50,8 @@
 
 -dialyzer(no_improper_lists).
 
+-elvis([{elvis_style, nesting_level, disable}]).
+
 %%
 
 %% @doc Make a stream that produces no values.
@@ -157,3 +164,36 @@ ets(Cont, ContF) ->
                 []
         end
     end.
+
+%% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once.
+%% The .csv binary is assumed to be in UTF-8 encoding and to have a header row.
+-spec csv(binary()) -> stream(map()).
+csv(Bin) when is_binary(Bin) ->
+    Reader = fun _Iter(Headers, Lines) ->
+        case csv_read_line(Lines) of
+            {Fields, Rest} ->
+                case length(Fields) == length(Headers) of
+                    true ->
+                        User = maps:from_list(lists:zip(Headers, Fields)),
+                        [User | fun() -> _Iter(Headers, Rest) end];
+                    false ->
+                        error(bad_format)
+                end;
+            eof ->
+                []
+        end
+    end,
+    HeadersAndLines = binary:split(Bin, [<<"\r">>, <<"\n">>], [global, trim_all]),
+    case csv_read_line(HeadersAndLines) of
+        {CSVHeaders, CSVLines} ->
+            fun() -> Reader(CSVHeaders, CSVLines) end;
+        eof ->
+            empty()
+    end.
+
+csv_read_line([Line | Lines]) ->
+    %% XXX: not support ' ' for the field value
+    Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]),
+    {Fields, Lines};
+csv_read_line([]) ->
+    eof.

+ 31 - 0
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -82,3 +82,34 @@ mqueue_test() ->
         [1, 42, 2],
         emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
     ).
+
+csv_test() ->
+    Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
+    ?assertEqual(
+        [
+            #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
+            #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
+        ],
+        emqx_utils_stream:consume(emqx_utils_stream:csv(Data1))
+    ),
+
+    Data2 = <<"h1, h2, h3\nvv1, vv2, vv3\nvv4,vv5,vv6\n">>,
+    ?assertEqual(
+        [
+            #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
+            #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
+        ],
+        emqx_utils_stream:consume(emqx_utils_stream:csv(Data2))
+    ),
+
+    ?assertEqual(
+        [],
+        emqx_utils_stream:consume(emqx_utils_stream:csv(<<"">>))
+    ),
+
+    BadData = <<"h1,h2,h3\r\nv1,v2,v3\r\nv4,v5">>,
+    ?assertException(
+        error,
+        bad_format,
+        emqx_utils_stream:consume(emqx_utils_stream:csv(BadData))
+    ).

+ 4 - 0
changes/ce/feat-12396.en.md

@@ -0,0 +1,4 @@
+Enhanced the `authentication/:id/import_users` interface for a more user-friendly user import feature:
+
+- Add new parameter `?type=plain` to support importing users with plaintext passwords in addition to the current solution which only supports password hash.
+- Support `content-type: application/json` to accept HTTP Body in JSON format in extension to the current solution which only supports `multipart/form-data` for csv format.

+ 1 - 0
changes/ce/fix-12404.en.md

@@ -0,0 +1 @@
+Fixed an issue where restarting a busy data integration could lead to data integration metrics to stop being collected.