Explorar o código

feat(emqx_management): mqtt_app shard

k32 %!s(int64=4) %!d(string=hai) anos
pai
achega
198e3c03e0

+ 1 - 1
apps/emqx/rebar.config

@@ -13,7 +13,7 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}}

+ 3 - 1
apps/emqx_management/include/emqx_mgmt.hrl

@@ -32,4 +32,6 @@
 -define(ERROR14, 114). %% OldPassword error
 -define(ERROR15, 115). %% bad topic
 
--define(VERSIONS, ["4.0", "4.1", "4.2", "4.3"]).
+-define(VERSIONS, ["4.0", "4.1", "4.2", "4.3"]).
+
+-define(MANAGEMENT_SHARD, emqx_management_shard).

+ 3 - 0
apps/emqx_management/src/emqx_mgmt_app.erl

@@ -24,8 +24,11 @@
         , stop/1
         ]).
 
+-include("emqx_mgmt.hrl").
+
 start(_Type, _Args) ->
     {ok, Sup} = emqx_mgmt_sup:start_link(),
+    ok = ekka_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity),
     _ = emqx_mgmt_auth:add_default_app(),
     emqx_mgmt_http:start_listeners(),
     emqx_mgmt_cli:load(),

+ 15 - 9
apps/emqx_management/src/emqx_mgmt_auth.erl

@@ -46,6 +46,10 @@
 
 -type(appsecret() :: binary()).
 
+-include("emqx_mgmt.hrl").
+
+-rlog_shard({?MANAGEMENT_SHARD, mqtt_app}).
+
 %%--------------------------------------------------------------------
 %% Mnesia Bootstrap
 %%--------------------------------------------------------------------
@@ -102,7 +106,7 @@ add_app(AppId, Name, Secret, Desc, Status, Expired) when is_binary(AppId) ->
                      _  -> mnesia:abort(alread_existed)
                  end
              end,
-    case mnesia:transaction(AddFun) of
+    case ekka_mnesia:transaction(?MANAGEMENT_SHARD, AddFun) of
         {atomic, ok} -> {ok, Secret1};
         {aborted, Reason} -> {error, Reason}
     end.
@@ -116,7 +120,7 @@ force_add_app(AppId, Name, Secret, Desc, Status, Expired) ->
                                         status = Status,
                                         expired = Expired})
              end,
-    case mnesia:transaction(AddFun) of
+    case ekka_mnesia:transaction(?MANAGEMENT_SHARD, AddFun) of
         {atomic, ok} -> ok;
         {aborted, Reason} -> {error, Reason}
     end.
@@ -154,7 +158,8 @@ lookup_app(AppId) when is_binary(AppId) ->
 update_app(AppId, Status) ->
     case mnesia:dirty_read(mqtt_app, AppId) of
         [App = #mqtt_app{}] ->
-            case mnesia:transaction(fun() -> mnesia:write(App#mqtt_app{status = Status}) end) of
+            Fun = fun() -> mnesia:write(App#mqtt_app{status = Status}) end,
+            case ekka_mnesia:transaction(?MANAGEMENT_SHARD, Fun) of
                 {atomic, ok} -> ok;
                 {aborted, Reason} -> {error, Reason}
             end;
@@ -166,10 +171,12 @@ update_app(AppId, Status) ->
 update_app(AppId, Name, Desc, Status, Expired) ->
     case mnesia:dirty_read(mqtt_app, AppId) of
         [App = #mqtt_app{}] ->
-            case mnesia:transaction(fun() -> mnesia:write(App#mqtt_app{name = Name,
-                                                                       desc = Desc,
-                                                                       status = Status,
-                                                                       expired = Expired}) end) of
+            case ekka_mnesia:transaction(
+                   ?MANAGEMENT_SHARD,
+                   fun() -> mnesia:write(App#mqtt_app{name = Name,
+                                                      desc = Desc,
+                                                      status = Status,
+                                                      expired = Expired}) end) of
                 {atomic, ok} -> ok;
                 {aborted, Reason} -> {error, Reason}
             end;
@@ -179,7 +186,7 @@ update_app(AppId, Name, Desc, Status, Expired) ->
 
 -spec(del_app(appid()) -> ok | {error, term()}).
 del_app(AppId) when is_binary(AppId) ->
-    case mnesia:transaction(fun mnesia:delete/1, [{mqtt_app, AppId}]) of
+    case ekka_mnesia:transaction(?MANAGEMENT_SHARD, fun mnesia:delete/1, [{mqtt_app, AppId}]) of
         {atomic, Ok} -> Ok;
         {aborted, Reason} -> {error, Reason}
     end.
@@ -207,4 +214,3 @@ is_authorized(AppId, AppSecret) ->
 
 is_expired(undefined) -> true;
 is_expired(Expired)   -> Expired >= erlang:system_time(second).
-

+ 2 - 1
apps/emqx_management/test/emqx_auth_mnesia_migration_SUITE.erl

@@ -40,6 +40,7 @@ cases() ->
 
 init_per_suite(Config) ->
     emqx_ct_helpers:start_apps([emqx_management, emqx_dashboard, emqx_auth_mnesia]),
+    application:set_env(ekka, strict_mode, true),
     ekka_mnesia:start(),
     emqx_mgmt_auth:mnesia(boot),
     Config.
@@ -172,4 +173,4 @@ test_import(clientid, {ClientID, Password}) ->
     Req = #{clientid => ClientID,
             password => Password},
     ?assertMatch({stop, #{auth_result := success}},
-                 emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})).
+                 emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})).

+ 1 - 0
apps/emqx_management/test/emqx_mgmt_SUITE.erl

@@ -56,6 +56,7 @@ apps() ->
     [emqx_management, emqx_auth_mnesia, emqx_modules].
 
 init_per_suite(Config) ->
+    application:set_env(ekka, strict_mode, true),
     ekka_mnesia:start(),
     emqx_mgmt_auth:mnesia(boot),
     emqx_ct_helpers:start_apps(apps()),

+ 2 - 1
rebar.config

@@ -1,3 +1,4 @@
+%% -*- mode:erlang -*-
 %% This config file is the very basic config to compile emqx
 %% This allows emqx to be used as a dependency for other applications
 %% such as emqx module/plugin develpments and tests.
@@ -39,7 +40,7 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.6"}}}