Просмотр исходного кода

Merge pull request #5014 from k32/dev/route_shard

feat(rlog): Introduce routing RLOG shard
k32 4 лет назад
Родитель
Сommit
5b84513e77

+ 2 - 0
apps/emqx/include/emqx.hrl

@@ -84,6 +84,8 @@
 %% Route
 %% Route
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+-define(ROUTE_SHARD, route_shard).
+
 -record(route, {
 -record(route, {
           topic :: binary(),
           topic :: binary(),
           dest  :: node() | {binary(), node()}
           dest  :: node() | {binary(), node()}

+ 1 - 1
apps/emqx/rebar.config

@@ -13,7 +13,7 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.0"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.5.1"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.5.1"}}}

+ 1 - 1
apps/emqx/src/emqx_app.erl

@@ -28,7 +28,7 @@
 
 
 -define(APP, emqx).
 -define(APP, emqx).
 
 
--define(EMQX_SHARDS, []).
+-define(EMQX_SHARDS, [route_shard]).
 
 
 -include("emqx_release.hrl").
 -include("emqx_release.hrl").
 
 

+ 5 - 4
apps/emqx/src/emqx_cm_registry.erl

@@ -48,6 +48,8 @@
 -define(TAB, emqx_channel_registry).
 -define(TAB, emqx_channel_registry).
 -define(LOCK, {?MODULE, cleanup_down}).
 -define(LOCK, {?MODULE, cleanup_down}).
 
 
+-rlog_shard({?ROUTE_SHARD, ?TAB}).
+
 -record(channel, {chid, pid}).
 -record(channel, {chid, pid}).
 
 
 %% @doc Start the global channel registry.
 %% @doc Start the global channel registry.
@@ -72,7 +74,7 @@ register_channel(ClientId) when is_binary(ClientId) ->
 
 
 register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
 register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
     case is_enabled() of
     case is_enabled() of
-        true -> mnesia:dirty_write(?TAB, record(ClientId, ChanPid));
+        true -> ekka_mnesia:dirty_write(?TAB, record(ClientId, ChanPid));
         false -> ok
         false -> ok
     end.
     end.
 
 
@@ -84,7 +86,7 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
 
 
 unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
 unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
     case is_enabled() of
     case is_enabled() of
-        true -> mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid));
+        true -> ekka_mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid));
         false -> ok
         false -> ok
     end.
     end.
 
 
@@ -123,7 +125,7 @@ handle_cast(Msg, State) ->
 handle_info({membership, {mnesia, down, Node}}, State) ->
 handle_info({membership, {mnesia, down, Node}}, State) ->
     global:trans({?LOCK, self()},
     global:trans({?LOCK, self()},
                  fun() ->
                  fun() ->
-                     mnesia:transaction(fun cleanup_channels/1, [Node])
+                     ekka_mnesia:transaction(?ROUTE_SHARD, fun cleanup_channels/1, [Node])
                  end),
                  end),
     {noreply, State};
     {noreply, State};
 
 
@@ -150,4 +152,3 @@ cleanup_channels(Node) ->
 
 
 delete_channel(Chan) ->
 delete_channel(Chan) ->
     mnesia:delete_object(?TAB, Chan, write).
     mnesia:delete_object(?TAB, Chan, write).
-

+ 6 - 3
apps/emqx/src/emqx_router.erl

@@ -69,6 +69,7 @@
 -type(dest() :: node() | {group(), node()}).
 -type(dest() :: node() | {group(), node()}).
 
 
 -define(ROUTE_TAB, emqx_route).
 -define(ROUTE_TAB, emqx_route).
+-rlog_shard({?ROUTE_SHARD, ?ROUTE_TAB}).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
 %% Mnesia bootstrap
@@ -225,7 +226,7 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 insert_direct_route(Route) ->
 insert_direct_route(Route) ->
-    mnesia:async_dirty(fun mnesia:write/3, [?ROUTE_TAB, Route, sticky_write]).
+    ekka_mnesia:dirty_write(?ROUTE_TAB, Route).
 
 
 insert_trie_route(Route = #route{topic = Topic}) ->
 insert_trie_route(Route = #route{topic = Topic}) ->
     case mnesia:wread({?ROUTE_TAB, Topic}) of
     case mnesia:wread({?ROUTE_TAB, Topic}) of
@@ -235,7 +236,7 @@ insert_trie_route(Route = #route{topic = Topic}) ->
     mnesia:write(?ROUTE_TAB, Route, sticky_write).
     mnesia:write(?ROUTE_TAB, Route, sticky_write).
 
 
 delete_direct_route(Route) ->
 delete_direct_route(Route) ->
-    mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE_TAB, Route, sticky_write]).
+    ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route).
 
 
 delete_trie_route(Route = #route{topic = Topic}) ->
 delete_trie_route(Route = #route{topic = Topic}) ->
     case mnesia:wread({?ROUTE_TAB, Topic}) of
     case mnesia:wread({?ROUTE_TAB, Topic}) of
@@ -254,6 +255,8 @@ maybe_trans(Fun, Args) ->
         key ->
         key ->
             trans(Fun, Args);
             trans(Fun, Args);
         global ->
         global ->
+            %% Assert:
+            mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash
             lock_router(),
             lock_router(),
             try mnesia:sync_dirty(Fun, Args)
             try mnesia:sync_dirty(Fun, Args)
             after
             after
@@ -278,7 +281,7 @@ trans(Fun, Args) ->
             %% Future changes should keep in mind that this process
             %% Future changes should keep in mind that this process
             %% always exit with database write result.
             %% always exit with database write result.
             fun() ->
             fun() ->
-                    Res = case mnesia:transaction(Fun, Args) of
+                    Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of
                               {atomic, Ok} -> Ok;
                               {atomic, Ok} -> Ok;
                               {aborted, Reason} -> {error, Reason}
                               {aborted, Reason} -> {error, Reason}
                           end,
                           end,

+ 5 - 4
apps/emqx/src/emqx_router_helper.erl

@@ -53,6 +53,8 @@
 -define(ROUTING_NODE, emqx_routing_node).
 -define(ROUTING_NODE, emqx_routing_node).
 -define(LOCK, {?MODULE, cleanup_routes}).
 -define(LOCK, {?MODULE, cleanup_routes}).
 
 
+-rlog_shard({?ROUTE_SHARD, ?ROUTING_NODE}).
+
 -dialyzer({nowarn_function, [cleanup_routes/1]}).
 -dialyzer({nowarn_function, [cleanup_routes/1]}).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -87,7 +89,7 @@ monitor(Node) when is_atom(Node) ->
     case ekka:is_member(Node)
     case ekka:is_member(Node)
          orelse ets:member(?ROUTING_NODE, Node) of
          orelse ets:member(?ROUTING_NODE, Node) of
         true  -> ok;
         true  -> ok;
-        false -> mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
+        false -> ekka_mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
     end.
     end.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -136,9 +138,9 @@ handle_info({mnesia_table_event, Event}, State) ->
 handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
 handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
     global:trans({?LOCK, self()},
     global:trans({?LOCK, self()},
                  fun() ->
                  fun() ->
-                     mnesia:transaction(fun cleanup_routes/1, [Node])
+                     ekka_mnesia:transaction(fun cleanup_routes/1, [Node])
                  end),
                  end),
-    ok = mnesia:dirty_delete(?ROUTING_NODE, Node),
+    ok = ekka_mnesia:dirty_delete(?ROUTING_NODE, Node),
     {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
     {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
 
 
 handle_info({membership, {mnesia, down, Node}}, State) ->
 handle_info({membership, {mnesia, down, Node}}, State) ->
@@ -176,4 +178,3 @@ cleanup_routes(Node) ->
                 #route{_ = '_', dest = {'_', Node}}],
                 #route{_ = '_', dest = {'_', Node}}],
     [mnesia:delete_object(?ROUTE, Route, write)
     [mnesia:delete_object(?ROUTE, Route, write)
      || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write)].
      || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write)].
-

+ 12 - 5
apps/emqx/src/emqx_schema.erl

@@ -17,6 +17,7 @@
 -type percent() :: float().
 -type percent() :: float().
 -type file() :: string().
 -type file() :: string().
 -type comma_separated_list() :: list().
 -type comma_separated_list() :: list().
+-type comma_separated_atoms() :: [atom()].
 -type bar_separated_list() :: list().
 -type bar_separated_list() :: list().
 -type ip_port() :: tuple().
 -type ip_port() :: tuple().
 
 
@@ -29,17 +30,20 @@
 -typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}).
 -typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}).
 -typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}).
 -typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}).
 -typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
 -typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
+-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
 
 
 % workaround: prevent being recognized as unused functions
 % workaround: prevent being recognized as unused functions
 -export([to_duration/1, to_duration_s/1, to_duration_ms/1, to_bytesize/1,
 -export([to_duration/1, to_duration_s/1, to_duration_ms/1, to_bytesize/1,
          to_flag/1, to_percent/1, to_comma_separated_list/1,
          to_flag/1, to_percent/1, to_comma_separated_list/1,
-         to_bar_separated_list/1, to_ip_port/1]).
+         to_bar_separated_list/1, to_ip_port/1,
+         to_comma_separated_atoms/1]).
 
 
 -behaviour(hocon_schema).
 -behaviour(hocon_schema).
 
 
 -reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, duration_ms/0,
 -reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, duration_ms/0,
                 bytesize/0, percent/0, file/0,
                 bytesize/0, percent/0, file/0,
-                comma_separated_list/0, bar_separated_list/0, ip_port/0]).
+                comma_separated_list/0, bar_separated_list/0, ip_port/0,
+                comma_separated_atoms/0]).
 
 
 -export([structs/0, fields/1, translations/0, translation/1]).
 -export([structs/0, fields/1, translations/0, translation/1]).
 -export([t/1, t/3, t/4, ref/1]).
 -export([t/1, t/3, t/4, ref/1]).
@@ -61,6 +65,7 @@ fields("cluster") ->
     , {"dns", ref("dns")}
     , {"dns", ref("dns")}
     , {"etcd", ref("etcd")}
     , {"etcd", ref("etcd")}
     , {"k8s", ref("k8s")}
     , {"k8s", ref("k8s")}
+    , {"db_backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)}
     , {"rlog", ref("rlog")}
     , {"rlog", ref("rlog")}
     ];
     ];
 
 
@@ -101,9 +106,8 @@ fields("k8s") ->
     ];
     ];
 
 
 fields("rlog") ->
 fields("rlog") ->
-    [ {"backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)}
-    , {"role", t(union([core, replicant]), "ekka.node_role", core)}
-    , {"core_nodes", t(comma_separated_list(), "ekka.core_nodes", [])}
+    [ {"role", t(union([core, replicant]), "ekka.node_role", core)}
+    , {"core_nodes", t(comma_separated_atoms(), "ekka.core_nodes", [])}
     ];
     ];
 
 
 fields("node") ->
 fields("node") ->
@@ -1228,6 +1232,9 @@ to_percent(Str) ->
 to_comma_separated_list(Str) ->
 to_comma_separated_list(Str) ->
     {ok, string:tokens(Str, ", ")}.
     {ok, string:tokens(Str, ", ")}.
 
 
+to_comma_separated_atoms(Str) ->
+    {ok, lists:map(fun list_to_atom/1, string:tokens(Str, ", "))}.
+
 to_bar_separated_list(Str) ->
 to_bar_separated_list(Str) ->
     {ok, string:tokens(Str, "| ")}.
     {ok, string:tokens(Str, "| ")}.
 
 

+ 3 - 1
apps/emqx/src/emqx_trie.erl

@@ -52,6 +52,8 @@
 
 
 -define(IS_COMPACT, true).
 -define(IS_COMPACT, true).
 
 
+-rlog_shard({?ROUTE_SHARD, ?TRIE}).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -342,6 +344,6 @@ do_compact_test() ->
                  do_compact(words(<<"a/+/+/+/+/b">>))),
                  do_compact(words(<<"a/+/+/+/+/b">>))),
     ok.
     ok.
 
 
-clear_tables() -> mnesia:clear_table(?TRIE).
+clear_tables() -> ekka_mnesia:clear_table(?TRIE).
 
 
 -endif. % TEST
 -endif. % TEST

+ 1 - 1
rebar.config

@@ -39,7 +39,7 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.0"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}