فهرست منبع

Merge pull request #5100 from k32/dev/authn-shard

Create authN shard
k32 4 سال پیش
والد
کامیت
ca1b789ef6

+ 5 - 2
apps/emqx/src/emqx_cm_registry.erl

@@ -48,7 +48,9 @@
 -define(TAB, emqx_channel_registry).
 -define(TAB, emqx_channel_registry).
 -define(LOCK, {?MODULE, cleanup_down}).
 -define(LOCK, {?MODULE, cleanup_down}).
 
 
--rlog_shard({?ROUTE_SHARD, ?TAB}).
+-define(CM_SHARD, emqx_cm_shard).
+
+-rlog_shard({?CM_SHARD, ?TAB}).
 
 
 -record(channel, {chid, pid}).
 -record(channel, {chid, pid}).
 
 
@@ -111,6 +113,7 @@ init([]) ->
                 {storage_properties, [{ets, [{read_concurrency, true},
                 {storage_properties, [{ets, [{read_concurrency, true},
                                              {write_concurrency, true}]}]}]),
                                              {write_concurrency, true}]}]}]),
     ok = ekka_mnesia:copy_table(?TAB, ram_copies),
     ok = ekka_mnesia:copy_table(?TAB, ram_copies),
+    ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
     ok = ekka:monitor(membership),
     ok = ekka:monitor(membership),
     {ok, #{}}.
     {ok, #{}}.
 
 
@@ -125,7 +128,7 @@ handle_cast(Msg, State) ->
 handle_info({membership, {mnesia, down, Node}}, State) ->
 handle_info({membership, {mnesia, down, Node}}, State) ->
     global:trans({?LOCK, self()},
     global:trans({?LOCK, self()},
                  fun() ->
                  fun() ->
-                     ekka_mnesia:transaction(?ROUTE_SHARD, fun cleanup_channels/1, [Node])
+                     ekka_mnesia:transaction(?CM_SHARD, fun cleanup_channels/1, [Node])
                  end),
                  end),
     {noreply, State};
     {noreply, State};
 
 

+ 2 - 0
apps/emqx_authentication/include/emqx_authentication.hrl

@@ -39,3 +39,5 @@
         , services :: [{service_name(), #service{}}]
         , services :: [{service_name(), #service{}}]
         , created_at :: integer()
         , created_at :: integer()
         }).
         }).
+
+-define(AUTH_SHARD, emqx_authentication_shard).

+ 6 - 3
apps/emqx_authentication/src/emqx_authentication.erl

@@ -56,6 +56,9 @@
 -define(CHAIN_TAB, emqx_authentication_chain).
 -define(CHAIN_TAB, emqx_authentication_chain).
 -define(SERVICE_TYPE_TAB, emqx_authentication_service_type).
 -define(SERVICE_TYPE_TAB, emqx_authentication_service_type).
 
 
+-rlog_shard({?AUTH_SHARD, ?CHAIN_TAB}).
+-rlog_shard({?AUTH_SHARD, ?SERVICE_TYPE_TAB}).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Mnesia bootstrap
 %% Mnesia bootstrap
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -370,7 +373,7 @@ validate_other_service_params([#{type := Type, params := Params} = ServiceParams
         {error, not_found} ->
         {error, not_found} ->
             {error, {not_found, {service_type, Type}}}
             {error, {not_found, {service_type, Type}}}
     end.
     end.
-    
+
 no_duplicate_names(Names) ->
 no_duplicate_names(Names) ->
     no_duplicate_names(Names, #{}).
     no_duplicate_names(Names, #{}).
 
 
@@ -423,7 +426,7 @@ extract_services([ServiceName | More], Services, Acc) ->
         false ->
         false ->
             {error, {not_found, {service, ServiceName}}}
             {error, {not_found, {service, ServiceName}}}
     end.
     end.
-    
+
 move_service_to_the_front_(ServiceName, Services) ->
 move_service_to_the_front_(ServiceName, Services) ->
     move_service_to_the_front_(ServiceName, Services, []).
     move_service_to_the_front_(ServiceName, Services, []).
 
 
@@ -513,7 +516,7 @@ trans(Fun) ->
     trans(Fun, []).
     trans(Fun, []).
 
 
 trans(Fun, Args) ->
 trans(Fun, Args) ->
-    case mnesia:transaction(Fun, Args) of
+    case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of
         {atomic, Res} -> Res;
         {atomic, Res} -> Res;
         {aborted, Reason} -> {error, Reason}
         {aborted, Reason} -> {error, Reason}
     end.
     end.

+ 3 - 0
apps/emqx_authentication/src/emqx_authentication_app.erl

@@ -20,6 +20,8 @@
 
 
 -emqx_plugin(?MODULE).
 -emqx_plugin(?MODULE).
 
 
+-include("emqx_authentication.hrl").
+
 %% Application callbacks
 %% Application callbacks
 -export([ start/2
 -export([ start/2
         , stop/1
         , stop/1
@@ -27,6 +29,7 @@
 
 
 start(_StartType, _StartArgs) ->
 start(_StartType, _StartArgs) ->
     {ok, Sup} = emqx_authentication_sup:start_link(),
     {ok, Sup} = emqx_authentication_sup:start_link(),
+    ok = ekka_rlog:wait_for_shards([?AUTH_SHARD], infinity),
     ok = emqx_authentication:register_service_types(),
     ok = emqx_authentication:register_service_types(),
     {ok, Sup}.
     {ok, Sup}.
 
 

+ 5 - 3
apps/emqx_authentication/src/emqx_authentication_mnesia.erl

@@ -51,7 +51,7 @@
         salt_rounds => #{
         salt_rounds => #{
             order => 3,
             order => 3,
             type => number,
             type => number,
-            default => 10 
+            default => 10
         }
         }
     }
     }
 }).
 }).
@@ -72,6 +72,8 @@
 
 
 -define(TAB, mnesia_basic_auth).
 -define(TAB, mnesia_basic_auth).
 
 
+-rlog_shard({?AUTH_SHARD, ?TAB}).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Mnesia bootstrap
 %% Mnesia bootstrap
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -231,7 +233,7 @@ import(UserGroup, [#{<<"user_id">> := UserID,
 import(_UserGroup, [_ | _More]) ->
 import(_UserGroup, [_ | _More]) ->
     {error, bad_format}.
     {error, bad_format}.
 
 
-%% Importing 5w users needs 1.7 seconds 
+%% Importing 5w users needs 1.7 seconds
 import(UserGroup, File, Seq) ->
 import(UserGroup, File, Seq) ->
     case file:read_line(File) of
     case file:read_line(File) of
         {ok, Line} ->
         {ok, Line} ->
@@ -330,7 +332,7 @@ trans(Fun) ->
     trans(Fun, []).
     trans(Fun, []).
 
 
 trans(Fun, Args) ->
 trans(Fun, Args) ->
-    case mnesia:transaction(Fun, Args) of
+    case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of
         {atomic, Res} -> Res;
         {atomic, Res} -> Res;
         {aborted, Reason} -> {error, Reason}
         {aborted, Reason} -> {error, Reason}
     end.
     end.

+ 2 - 4
apps/emqx_authentication/test/emqx_authentication_SUITE.erl

@@ -28,6 +28,7 @@ all() ->
     emqx_ct:all(?MODULE).
     emqx_ct:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
+    application:set_env(ekka, strict_mode, true),
     emqx_ct_helpers:start_apps([emqx_authentication]),
     emqx_ct_helpers:start_apps([emqx_authentication]),
     Config.
     Config.
 
 
@@ -40,7 +41,7 @@ t_chain(_) ->
     ?assertMatch({ok, #{id := ChainID, services := []}}, ?AUTH:create_chain(#{id => ChainID})),
     ?assertMatch({ok, #{id := ChainID, services := []}}, ?AUTH:create_chain(#{id => ChainID})),
     ?assertEqual({error, {already_exists, {chain, ChainID}}}, ?AUTH:create_chain(#{id => ChainID})),
     ?assertEqual({error, {already_exists, {chain, ChainID}}}, ?AUTH:create_chain(#{id => ChainID})),
     ?assertMatch({ok, #{id := ChainID, services := []}}, ?AUTH:lookup_chain(ChainID)),
     ?assertMatch({ok, #{id := ChainID, services := []}}, ?AUTH:lookup_chain(ChainID)),
-    ?assertEqual(ok, ?AUTH:delete_chain(ChainID)),
+    ?assertEqual(ok, ?AUTH:delete_chain(ChainID)),
     ?assertMatch({error, {not_found, {chain, ChainID}}}, ?AUTH:lookup_chain(ChainID)),
     ?assertMatch({error, {not_found, {chain, ChainID}}}, ?AUTH:lookup_chain(ChainID)),
     ok.
     ok.
 
 
@@ -186,6 +187,3 @@ t_multi_mnesia_service(_) ->
     ?assertEqual(ok, ?AUTH:authenticate(ClientInfo2)),
     ?assertEqual(ok, ?AUTH:authenticate(ClientInfo2)),
     ?assertEqual(ok, ?AUTH:delete_chain(ChainID)),
     ?assertEqual(ok, ?AUTH:delete_chain(ChainID)),
     ok.
     ok.
-
-
-