|
|
@@ -52,9 +52,7 @@
|
|
|
do_destroy/1,
|
|
|
do_add_user/1,
|
|
|
do_delete_user/2,
|
|
|
- do_update_user/3,
|
|
|
- import/2,
|
|
|
- import_csv/3
|
|
|
+ do_update_user/3
|
|
|
]).
|
|
|
|
|
|
-export([mnesia/1, init_tables/0]).
|
|
|
@@ -173,20 +171,67 @@ do_destroy(UserGroup) ->
|
|
|
mnesia:select(?TAB, group_match_spec(UserGroup), write)
|
|
|
).
|
|
|
|
|
|
-import_users({Filename0, FileData}, State) ->
|
|
|
- Filename = to_binary(Filename0),
|
|
|
- case filename:extension(Filename) of
|
|
|
- <<".json">> ->
|
|
|
- import_users_from_json(FileData, State);
|
|
|
- <<".csv">> ->
|
|
|
- CSV = csv_data(FileData),
|
|
|
- import_users_from_csv(CSV, State);
|
|
|
- <<>> ->
|
|
|
- {error, unknown_file_format};
|
|
|
- Extension ->
|
|
|
- {error, {unsupported_file_format, Extension}}
|
|
|
+import_users({PasswordType, Filename, FileData}, State) ->
|
|
|
+ Convertor = convertor(PasswordType, State),
|
|
|
+ try
|
|
|
+ {_NewUsersCnt, Users} = parse_import_users(Filename, FileData, Convertor),
|
|
|
+ case length(Users) > 0 andalso do_import_users(Users) of
|
|
|
+ false ->
|
|
|
+ error(empty_users);
|
|
|
+ ok ->
|
|
|
+ ok;
|
|
|
+ {error, Reason} ->
|
|
|
+ _ = do_clean_imported_users(Users),
|
|
|
+ error(Reason)
|
|
|
+ end
|
|
|
+ catch
|
|
|
+ error:Reason1:Stk ->
|
|
|
+ ?SLOG(
|
|
|
+ warning,
|
|
|
+ #{
|
|
|
+ msg => "import_users_failed",
|
|
|
+ reason => Reason1,
|
|
|
+ type => PasswordType,
|
|
|
+ filename => Filename,
|
|
|
+ stacktrace => Stk
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ {error, Reason1}
|
|
|
end.
|
|
|
|
|
|
+do_import_users(Users) ->
|
|
|
+ trans(
|
|
|
+ fun() ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(
|
|
|
+ #{
|
|
|
+ <<"user_group">> := UserGroup,
|
|
|
+ <<"user_id">> := UserID,
|
|
|
+ <<"password_hash">> := PasswordHash,
|
|
|
+ <<"salt">> := Salt,
|
|
|
+ <<"is_superuser">> := IsSuperuser
|
|
|
+ }
|
|
|
+ ) ->
|
|
|
+ insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser)
|
|
|
+ end,
|
|
|
+ Users
|
|
|
+ )
|
|
|
+ end
|
|
|
+ ).
|
|
|
+
|
|
|
+do_clean_imported_users(Users) ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(
|
|
|
+ #{
|
|
|
+ <<"user_group">> := UserGroup,
|
|
|
+ <<"user_id">> := UserID
|
|
|
+ }
|
|
|
+ ) ->
|
|
|
+ mria:dirty_delete(?TAB, {UserGroup, UserID})
|
|
|
+ end,
|
|
|
+ Users
|
|
|
+ ).
|
|
|
+
|
|
|
add_user(
|
|
|
UserInfo,
|
|
|
State
|
|
|
@@ -293,93 +338,6 @@ run_fuzzy_filter(
|
|
|
%% Internal functions
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-%% Example: data/user-credentials.json
|
|
|
-import_users_from_json(Bin, #{user_group := UserGroup}) ->
|
|
|
- case emqx_utils_json:safe_decode(Bin, [return_maps]) of
|
|
|
- {ok, List} ->
|
|
|
- trans(fun ?MODULE:import/2, [UserGroup, List]);
|
|
|
- {error, Reason} ->
|
|
|
- {error, Reason}
|
|
|
- end.
|
|
|
-
|
|
|
-%% Example: data/user-credentials.csv
|
|
|
-import_users_from_csv(CSV, #{user_group := UserGroup}) ->
|
|
|
- case get_csv_header(CSV) of
|
|
|
- {ok, Seq, NewCSV} ->
|
|
|
- trans(fun ?MODULE:import_csv/3, [UserGroup, NewCSV, Seq]);
|
|
|
- {error, Reason} ->
|
|
|
- {error, Reason}
|
|
|
- end.
|
|
|
-
|
|
|
-import(_UserGroup, []) ->
|
|
|
- ok;
|
|
|
-import(UserGroup, [
|
|
|
- #{
|
|
|
- <<"user_id">> := UserID,
|
|
|
- <<"password_hash">> := PasswordHash
|
|
|
- } = UserInfo
|
|
|
- | More
|
|
|
-]) when
|
|
|
- is_binary(UserID) andalso is_binary(PasswordHash)
|
|
|
-->
|
|
|
- Salt = maps:get(<<"salt">>, UserInfo, <<>>),
|
|
|
- IsSuperuser = maps:get(<<"is_superuser">>, UserInfo, false),
|
|
|
- insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
|
|
|
- import(UserGroup, More);
|
|
|
-import(_UserGroup, [_ | _More]) ->
|
|
|
- {error, bad_format}.
|
|
|
-
|
|
|
-%% Importing 5w users needs 1.7 seconds
|
|
|
-import_csv(UserGroup, CSV, Seq) ->
|
|
|
- case csv_read_line(CSV) of
|
|
|
- {ok, Line, NewCSV} ->
|
|
|
- Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]),
|
|
|
- case get_user_info_by_seq(Fields, Seq) of
|
|
|
- {ok,
|
|
|
- #{
|
|
|
- user_id := UserID,
|
|
|
- password_hash := PasswordHash
|
|
|
- } = UserInfo} ->
|
|
|
- Salt = maps:get(salt, UserInfo, <<>>),
|
|
|
- IsSuperuser = maps:get(is_superuser, UserInfo, false),
|
|
|
- insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
|
|
|
- import_csv(UserGroup, NewCSV, Seq);
|
|
|
- {error, Reason} ->
|
|
|
- {error, Reason}
|
|
|
- end;
|
|
|
- eof ->
|
|
|
- ok
|
|
|
- end.
|
|
|
-
|
|
|
-get_csv_header(CSV) ->
|
|
|
- case csv_read_line(CSV) of
|
|
|
- {ok, Line, NewCSV} ->
|
|
|
- Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]),
|
|
|
- {ok, Seq, NewCSV};
|
|
|
- eof ->
|
|
|
- {error, empty_file}
|
|
|
- end.
|
|
|
-
|
|
|
-get_user_info_by_seq(Fields, Seq) ->
|
|
|
- get_user_info_by_seq(Fields, Seq, #{}).
|
|
|
-
|
|
|
-get_user_info_by_seq([], [], #{user_id := _, password_hash := _} = Acc) ->
|
|
|
- {ok, Acc};
|
|
|
-get_user_info_by_seq(_, [], _) ->
|
|
|
- {error, bad_format};
|
|
|
-get_user_info_by_seq([UserID | More1], [<<"user_id">> | More2], Acc) ->
|
|
|
- get_user_info_by_seq(More1, More2, Acc#{user_id => UserID});
|
|
|
-get_user_info_by_seq([PasswordHash | More1], [<<"password_hash">> | More2], Acc) ->
|
|
|
- get_user_info_by_seq(More1, More2, Acc#{password_hash => PasswordHash});
|
|
|
-get_user_info_by_seq([Salt | More1], [<<"salt">> | More2], Acc) ->
|
|
|
- get_user_info_by_seq(More1, More2, Acc#{salt => Salt});
|
|
|
-get_user_info_by_seq([<<"true">> | More1], [<<"is_superuser">> | More2], Acc) ->
|
|
|
- get_user_info_by_seq(More1, More2, Acc#{is_superuser => true});
|
|
|
-get_user_info_by_seq([<<"false">> | More1], [<<"is_superuser">> | More2], Acc) ->
|
|
|
- get_user_info_by_seq(More1, More2, Acc#{is_superuser => false});
|
|
|
-get_user_info_by_seq(_, _, _) ->
|
|
|
- {error, bad_format}.
|
|
|
-
|
|
|
insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser) ->
|
|
|
UserInfoRecord = user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
|
|
|
insert_user(UserInfoRecord).
|
|
|
@@ -449,6 +407,12 @@ trans(Fun, Args) ->
|
|
|
{aborted, Reason} -> {error, Reason}
|
|
|
end.
|
|
|
|
|
|
+trans(Fun) ->
|
|
|
+ case mria:transaction(?AUTHN_SHARD, Fun) of
|
|
|
+ {atomic, Res} -> Res;
|
|
|
+ {aborted, Reason} -> {error, Reason}
|
|
|
+ end.
|
|
|
+
|
|
|
to_binary(B) when is_binary(B) ->
|
|
|
B;
|
|
|
to_binary(L) when is_list(L) ->
|
|
|
@@ -482,11 +446,91 @@ group_match_spec(UserGroup, QString) ->
|
|
|
end)
|
|
|
end.
|
|
|
|
|
|
-csv_data(Data) ->
|
|
|
- Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]),
|
|
|
- {csv_data, Lines}.
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% parse import file/data
|
|
|
+
|
|
|
+parse_import_users(Filename, FileData, Convertor) ->
|
|
|
+ Eval = fun _Eval(F) ->
|
|
|
+ case F() of
|
|
|
+ [] -> [];
|
|
|
+ [User | F1] -> [Convertor(User) | _Eval(F1)]
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ ReaderFn = reader_fn(Filename, FileData),
|
|
|
+ Users = lists:reverse(Eval(ReaderFn)),
|
|
|
+ NewUsersCount =
|
|
|
+ lists:foldl(
|
|
|
+ fun(
|
|
|
+ #{
|
|
|
+ <<"user_group">> := UserGroup,
|
|
|
+ <<"user_id">> := UserID
|
|
|
+ },
|
|
|
+ Acc
|
|
|
+ ) ->
|
|
|
+ case ets:member(?TAB, {UserGroup, UserID}) of
|
|
|
+ true ->
|
|
|
+ Acc;
|
|
|
+ false ->
|
|
|
+ Acc + 1
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ 0,
|
|
|
+ Users
|
|
|
+ ),
|
|
|
+ {NewUsersCount, Users}.
|
|
|
+
|
|
|
+reader_fn(prepared_user_list, List) when is_list(List) ->
|
|
|
+ %% Example: [#{<<"user_id">> => <<>>, ...}]
|
|
|
+ emqx_utils_stream:list(List);
|
|
|
+reader_fn(Filename0, Data) ->
|
|
|
+ case filename:extension(to_binary(Filename0)) of
|
|
|
+ <<".json">> ->
|
|
|
+ %% Example: data/user-credentials.json
|
|
|
+ case emqx_utils_json:safe_decode(Data, [return_maps]) of
|
|
|
+ {ok, List} when is_list(List) ->
|
|
|
+ emqx_utils_stream:list(List);
|
|
|
+ {ok, _} ->
|
|
|
+ error(unknown_file_format);
|
|
|
+ {error, Reason} ->
|
|
|
+ error(Reason)
|
|
|
+ end;
|
|
|
+ <<".csv">> ->
|
|
|
+ %% Example: data/user-credentials.csv
|
|
|
+ emqx_utils_stream:csv(Data);
|
|
|
+ <<>> ->
|
|
|
+ error(unknown_file_format);
|
|
|
+ Extension ->
|
|
|
+ error({unsupported_file_format, Extension})
|
|
|
+ end.
|
|
|
|
|
|
-csv_read_line({csv_data, [Line | Lines]}) ->
|
|
|
- {ok, Line, {csv_data, Lines}};
|
|
|
-csv_read_line({csv_data, []}) ->
|
|
|
- eof.
|
|
|
+convertor(PasswordType, State) ->
|
|
|
+ fun(User) ->
|
|
|
+ convert_user(User, PasswordType, State)
|
|
|
+ end.
|
|
|
+
|
|
|
+convert_user(
|
|
|
+ User = #{<<"user_id">> := UserId},
|
|
|
+ PasswordType,
|
|
|
+ #{user_group := UserGroup, password_hash_algorithm := Algorithm}
|
|
|
+) ->
|
|
|
+ {PasswordHash, Salt} = find_password_hash(PasswordType, User, Algorithm),
|
|
|
+ #{
|
|
|
+ <<"user_id">> => UserId,
|
|
|
+ <<"password_hash">> => PasswordHash,
|
|
|
+ <<"salt">> => Salt,
|
|
|
+ <<"is_superuser">> => is_superuser(User),
|
|
|
+ <<"user_group">> => UserGroup
|
|
|
+ };
|
|
|
+convert_user(_, _, _) ->
|
|
|
+ error(bad_format).
|
|
|
+
|
|
|
+find_password_hash(hash, User = #{<<"password_hash">> := PasswordHash}, _) ->
|
|
|
+ {PasswordHash, maps:get(<<"salt">>, User, <<>>)};
|
|
|
+find_password_hash(plain, #{<<"password">> := Password}, Algorithm) ->
|
|
|
+ emqx_authn_password_hashing:hash(Algorithm, Password);
|
|
|
+find_password_hash(_, _, _) ->
|
|
|
+ error(bad_format).
|
|
|
+
|
|
|
+is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true;
|
|
|
+is_superuser(#{<<"is_superuser">> := true}) -> true;
|
|
|
+is_superuser(_) -> false.
|