|
|
@@ -28,11 +28,14 @@
|
|
|
-boot_mnesia({mnesia, [boot]}).
|
|
|
-copy_mnesia({mnesia, [copy]}).
|
|
|
|
|
|
--export([start_link/0, topics/0, local_topics/0]).
|
|
|
+-export([start_link/1]).
|
|
|
|
|
|
%% For eunit tests
|
|
|
-export([start/0, stop/0]).
|
|
|
|
|
|
+%% Topics
|
|
|
+-export([topics/0, local_topics/0]).
|
|
|
+
|
|
|
%% Route APIs
|
|
|
-export([add_route/1, get_routes/1, del_route/1, has_route/1]).
|
|
|
|
|
|
@@ -49,7 +52,7 @@
|
|
|
|
|
|
-export([dump/0]).
|
|
|
|
|
|
--record(state, {stats_timer}).
|
|
|
+-record(state, {stats_fun, stats_timer}).
|
|
|
|
|
|
-define(ROUTER, ?MODULE).
|
|
|
|
|
|
@@ -60,21 +63,21 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
mnesia(boot) ->
|
|
|
- ok = ekka_mnesia:create_table(mqtt_route, [
|
|
|
+ ok = ekka_mnesia:create_table(route, [
|
|
|
{type, bag},
|
|
|
{ram_copies, [node()]},
|
|
|
- {record_name, mqtt_route},
|
|
|
- {attributes, record_info(fields, mqtt_route)}]);
|
|
|
+ {record_name, route},
|
|
|
+ {attributes, record_info(fields, route)}]);
|
|
|
|
|
|
mnesia(copy) ->
|
|
|
- ok = ekka_mnesia:copy_table(mqtt_route, ram_copies).
|
|
|
+ ok = ekka_mnesia:copy_table(route, ram_copies).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Start the Router
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-start_link() ->
|
|
|
- gen_server:start_link({local, ?ROUTER}, ?MODULE, [], []).
|
|
|
+start_link(StatsFun) ->
|
|
|
+ gen_server:start_link({local, ?ROUTER}, ?MODULE, [StatsFun], []).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Topics
|
|
|
@@ -82,39 +85,39 @@ start_link() ->
|
|
|
|
|
|
-spec(topics() -> list(binary())).
|
|
|
topics() ->
|
|
|
- mnesia:dirty_all_keys(mqtt_route).
|
|
|
+ mnesia:dirty_all_keys(route).
|
|
|
|
|
|
-spec(local_topics() -> list(binary())).
|
|
|
local_topics() ->
|
|
|
- ets:select(mqtt_local_route, [{{'$1', '_'}, [], ['$1']}]).
|
|
|
+ ets:select(local_route, [{{'$1', '_'}, [], ['$1']}]).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Match API
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
%% @doc Match Routes.
|
|
|
--spec(match(Topic:: binary()) -> [mqtt_route()]).
|
|
|
+-spec(match(Topic:: binary()) -> [route()]).
|
|
|
match(Topic) when is_binary(Topic) ->
|
|
|
%% Optimize: ets???
|
|
|
Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]),
|
|
|
%% Optimize: route table will be replicated to all nodes.
|
|
|
- lists:append([ets:lookup(mqtt_route, To) || To <- [Topic | Matched]]).
|
|
|
+ lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]).
|
|
|
|
|
|
%% @doc Print Routes.
|
|
|
-spec(print(Topic :: binary()) -> [ok]).
|
|
|
print(Topic) ->
|
|
|
[io:format("~s -> ~s~n", [To, Node]) ||
|
|
|
- #mqtt_route{topic = To, node = Node} <- match(Topic)].
|
|
|
+ #route{topic = To, node = Node} <- match(Topic)].
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Route Management API
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
%% @doc Add Route.
|
|
|
--spec(add_route(binary() | mqtt_route()) -> ok | {error, Reason :: term()}).
|
|
|
+-spec(add_route(binary() | route()) -> ok | {error, Reason :: term()}).
|
|
|
add_route(Topic) when is_binary(Topic) ->
|
|
|
- add_route(#mqtt_route{topic = Topic, node = node()});
|
|
|
-add_route(Route = #mqtt_route{topic = Topic}) ->
|
|
|
+ add_route(#route{topic = Topic, node = node()});
|
|
|
+add_route(Route = #route{topic = Topic}) ->
|
|
|
case emqx_topic:wildcard(Topic) of
|
|
|
true -> case mnesia:is_transaction() of
|
|
|
true -> add_trie_route(Route);
|
|
|
@@ -126,23 +129,23 @@ add_route(Route = #mqtt_route{topic = Topic}) ->
|
|
|
add_direct_route(Route) ->
|
|
|
mnesia:async_dirty(fun mnesia:write/1, [Route]).
|
|
|
|
|
|
-add_trie_route(Route = #mqtt_route{topic = Topic}) ->
|
|
|
- case mnesia:wread({mqtt_route, Topic}) of
|
|
|
+add_trie_route(Route = #route{topic = Topic}) ->
|
|
|
+ case mnesia:wread({route, Topic}) of
|
|
|
[] -> emqx_trie:insert(Topic);
|
|
|
_ -> ok
|
|
|
end,
|
|
|
mnesia:write(Route).
|
|
|
|
|
|
%% @doc Lookup Routes
|
|
|
--spec(get_routes(binary()) -> [mqtt_route()]).
|
|
|
+-spec(get_routes(binary()) -> [route()]).
|
|
|
get_routes(Topic) ->
|
|
|
- ets:lookup(mqtt_route, Topic).
|
|
|
+ ets:lookup(route, Topic).
|
|
|
|
|
|
%% @doc Delete Route
|
|
|
--spec(del_route(binary() | mqtt_route()) -> ok | {error, Reason :: term()}).
|
|
|
+-spec(del_route(binary() | route()) -> ok | {error, Reason :: term()}).
|
|
|
del_route(Topic) when is_binary(Topic) ->
|
|
|
- del_route(#mqtt_route{topic = Topic, node = node()});
|
|
|
-del_route(Route = #mqtt_route{topic = Topic}) ->
|
|
|
+ del_route(#route{topic = Topic, node = node()});
|
|
|
+del_route(Route = #route{topic = Topic}) ->
|
|
|
case emqx_topic:wildcard(Topic) of
|
|
|
true -> case mnesia:is_transaction() of
|
|
|
true -> del_trie_route(Route);
|
|
|
@@ -154,8 +157,8 @@ del_route(Route = #mqtt_route{topic = Topic}) ->
|
|
|
del_direct_route(Route) ->
|
|
|
mnesia:async_dirty(fun mnesia:delete_object/1, [Route]).
|
|
|
|
|
|
-del_trie_route(Route = #mqtt_route{topic = Topic}) ->
|
|
|
- case mnesia:wread({mqtt_route, Topic}) of
|
|
|
+del_trie_route(Route = #route{topic = Topic}) ->
|
|
|
+ case mnesia:wread({route, Topic}) of
|
|
|
[Route] -> %% Remove route and trie
|
|
|
mnesia:delete_object(Route),
|
|
|
emqx_trie:delete(Topic);
|
|
|
@@ -167,7 +170,7 @@ del_trie_route(Route = #mqtt_route{topic = Topic}) ->
|
|
|
%% @doc Has route?
|
|
|
-spec(has_route(binary()) -> boolean()).
|
|
|
has_route(Topic) when is_binary(Topic) ->
|
|
|
- ets:member(mqtt_route, Topic).
|
|
|
+ ets:member(route, Topic).
|
|
|
|
|
|
%% @private
|
|
|
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
|
|
@@ -183,7 +186,7 @@ trans(Fun, Args) ->
|
|
|
|
|
|
-spec(get_local_routes() -> list({binary(), node()})).
|
|
|
get_local_routes() ->
|
|
|
- ets:tab2list(mqtt_local_route).
|
|
|
+ ets:tab2list(local_route).
|
|
|
|
|
|
-spec(add_local_route(binary()) -> ok).
|
|
|
add_local_route(Topic) ->
|
|
|
@@ -195,15 +198,15 @@ del_local_route(Topic) ->
|
|
|
|
|
|
-spec(match_local(binary()) -> [mqtt_route()]).
|
|
|
match_local(Name) ->
|
|
|
- case ets:info(mqtt_local_route, size) of
|
|
|
+ case ets:info(local_route, size) of
|
|
|
0 -> [];
|
|
|
_ -> ets:foldl(
|
|
|
fun({Filter, Node}, Matched) ->
|
|
|
case emqx_topic:match(Name, Filter) of
|
|
|
- true -> [#mqtt_route{topic = {local, Filter}, node = Node} | Matched];
|
|
|
+ true -> [#route{topic = {local, Filter}, node = Node} | Matched];
|
|
|
false -> Matched
|
|
|
end
|
|
|
- end, [], mqtt_local_route)
|
|
|
+ end, [], local_route)
|
|
|
end.
|
|
|
|
|
|
-spec(clean_local_routes() -> ok).
|
|
|
@@ -211,7 +214,7 @@ clean_local_routes() ->
|
|
|
gen_server:call(?ROUTER, clean_local_routes).
|
|
|
|
|
|
dump() ->
|
|
|
- [{route, ets:tab2list(mqtt_route)}, {local_route, ets:tab2list(mqtt_local_route)}].
|
|
|
+ [{route, ets:tab2list(route)}, {local_route, ets:tab2list(local_route)}].
|
|
|
|
|
|
%% For unit test.
|
|
|
start() ->
|
|
|
@@ -224,23 +227,23 @@ stop() ->
|
|
|
%% gen_server Callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-init([]) ->
|
|
|
+init([StatsFun]) ->
|
|
|
ekka:monitor(membership),
|
|
|
- ets:new(mqtt_local_route, [set, named_table, protected]),
|
|
|
+ ets:new(local_route, [set, named_table, protected]),
|
|
|
{ok, TRef} = timer:send_interval(timer:seconds(1), stats),
|
|
|
- {ok, #state{stats_timer = TRef}}.
|
|
|
+ {ok, #state{stats_fun = StatsFun, stats_timer = TRef}}.
|
|
|
|
|
|
handle_call({add_local_route, Topic}, _From, State) ->
|
|
|
%% why node()...?
|
|
|
- ets:insert(mqtt_local_route, {Topic, node()}),
|
|
|
+ ets:insert(local_route, {Topic, node()}),
|
|
|
{reply, ok, State};
|
|
|
|
|
|
handle_call({del_local_route, Topic}, _From, State) ->
|
|
|
- ets:delete(mqtt_local_route, Topic),
|
|
|
+ ets:delete(local_route, Topic),
|
|
|
{reply, ok, State};
|
|
|
|
|
|
handle_call(clean_local_routes, _From, State) ->
|
|
|
- ets:delete_all_objects(mqtt_local_route),
|
|
|
+ ets:delete_all_objects(local_route),
|
|
|
{reply, ok, State};
|
|
|
|
|
|
handle_call(stop, _From, State) ->
|
|
|
@@ -253,19 +256,15 @@ handle_cast(_Msg, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
handle_info({membership, {mnesia, down, Node}}, State) ->
|
|
|
- global:trans({?LOCK, self()},
|
|
|
- fun() ->
|
|
|
- clean_routes_(Node),
|
|
|
- update_stats_()
|
|
|
- end),
|
|
|
- {noreply, State, hibernate};
|
|
|
+ global:trans({?LOCK, self()}, fun() -> clean_routes_(Node) end),
|
|
|
+ handle_info(stats, State);
|
|
|
|
|
|
handle_info({membership, _Event}, State) ->
|
|
|
%% ignore
|
|
|
{noreply, State};
|
|
|
|
|
|
-handle_info(stats, State) ->
|
|
|
- update_stats_(),
|
|
|
+handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
|
|
+ StatsFun(mnesia:table_info(route, size)),
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
handle_info(_Info, State) ->
|
|
|
@@ -282,15 +281,12 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%% Internal Functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-%% Clean Routes on Node
|
|
|
+%% Clean routes on the down node.
|
|
|
clean_routes_(Node) ->
|
|
|
- Pattern = #mqtt_route{_ = '_', node = Node},
|
|
|
+ Pattern = #route{_ = '_', node = Node},
|
|
|
Clean = fun() ->
|
|
|
- [mnesia:delete_object(mqtt_route, R, write) ||
|
|
|
- R <- mnesia:match_object(mqtt_route, Pattern, write)]
|
|
|
+ [mnesia:delete_object(route, R, write) ||
|
|
|
+ R <- mnesia:match_object(route, Pattern, write)]
|
|
|
end,
|
|
|
mnesia:transaction(Clean).
|
|
|
|
|
|
-update_stats_() ->
|
|
|
- emqx_stats:setstats('routes/count', 'routes/max', mnesia:table_info(mqtt_route, size)).
|
|
|
-
|