Parcourir la source

refactor the router and pubsub

Feng il y a 10 ans
Parent
commit
dfdad8bcb6

+ 6 - 3
src/emqttd_pubsub.erl

@@ -228,9 +228,9 @@ publish(To, Msg) ->
 %% @doc Match Topic Name with Topic Filters
 %% @doc Match Topic Name with Topic Filters
 -spec match(emqttd_topic:topic()) -> [mqtt_topic()].
 -spec match(emqttd_topic:topic()) -> [mqtt_topic()].
 match(To) ->
 match(To) ->
-    MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]),
+    Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [To]),
     %% ets:lookup for topic table will be replicated to all nodes.
     %% ets:lookup for topic table will be replicated to all nodes.
-    lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]).
+    lists:append([ets:lookup(topic, Topic) || Topic <- [To | Matched]]).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %% gen_server callbacks
@@ -333,7 +333,10 @@ add_topics(Records) ->
 add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
 add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
     case mnesia:wread({topic, Topic}) of
     case mnesia:wread({topic, Topic}) of
         [] ->
         [] ->
-            ok = emqttd_trie:insert(Topic),
+            case emqttd_topic:wildcard(Topic) of
+                true  -> emqttd_trie:insert(Topic);
+                false -> ok
+            end,
             mnesia:write(topic, TopicR, write);
             mnesia:write(topic, TopicR, write);
         Records ->
         Records ->
             case lists:member(TopicR, Records) of
             case lists:member(TopicR, Records) of

+ 58 - 46
src/emqttd_router.erl

@@ -14,7 +14,9 @@
 %% limitations under the License.
 %% limitations under the License.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-%% @doc MQTT Message Router
+%% @doc
+%% The Message Router on Local Node.
+%% @end
 -module(emqttd_router).
 -module(emqttd_router).
 
 
 -behaviour(gen_server2).
 -behaviour(gen_server2).
@@ -43,22 +45,26 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
          terminate/2, code_change/3]).
 
 
+-ifdef(TEST).
+-compile(export_all).
+-endif.
+
 -record(aging, {topics, time, tref}).
 -record(aging, {topics, time, tref}).
 
 
--record(state, {pool, id, statsfun, aging :: #aging{}}).
+-record(state, {pool, id, aging :: #aging{}, statsfun}).
 
 
-%% @doc Start a local router.
+%% @doc Start a router.
 -spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}.
 -spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}.
 start_link(Pool, Id, StatsFun, Env) ->
 start_link(Pool, Id, StatsFun, Env) ->
     gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)},
     gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)},
                            ?MODULE, [Pool, Id, StatsFun, Env], []).
                            ?MODULE, [Pool, Id, StatsFun, Env], []).
 
 
-%% @doc Route Message on the local node.
+%% @doc Route Message on this node.
 -spec route(emqttd_topic:topic(), mqtt_message()) -> any().
 -spec route(emqttd_topic:topic(), mqtt_message()) -> any().
 route(Queue = <<"$Q/", _Q>>, Msg) ->
 route(Queue = <<"$Q/", _Q>>, Msg) ->
     case lookup_routes(Queue) of
     case lookup_routes(Queue) of
         [] ->
         [] ->
-            emqttd_metrics:inc('messages/dropped');
+            dropped(Queue);
         [SubPid] ->
         [SubPid] ->
             SubPid ! {dispatch, Queue, Msg};
             SubPid ! {dispatch, Queue, Msg};
         Routes ->
         Routes ->
@@ -70,8 +76,8 @@ route(Queue = <<"$Q/", _Q>>, Msg) ->
 route(Topic, Msg) ->
 route(Topic, Msg) ->
     case lookup_routes(Topic) of
     case lookup_routes(Topic) of
         [] ->
         [] ->
-            emqttd_metrics:inc('messages/dropped');
-        [SubPid] -> %% optimize
+            dropped(Topic);
+        [SubPid] ->
             SubPid ! {dispatch, Topic, Msg};
             SubPid ! {dispatch, Topic, Msg};
         Routes ->
         Routes ->
             lists:foreach(fun(SubPid) ->
             lists:foreach(fun(SubPid) ->
@@ -79,9 +85,16 @@ route(Topic, Msg) ->
             end, Routes)
             end, Routes)
     end.
     end.
 
 
+%% @private
+%% @doc Ingore $SYS Messages.
+dropped(<<"$SYS/", _/binary>>) ->
+    ok;
+dropped(_Topic) ->
+    emqttd_metrics:inc('messages/dropped').
+
 %% @doc Has Route?
 %% @doc Has Route?
 -spec has_route(emqttd_topic:topic()) -> boolean().
 -spec has_route(emqttd_topic:topic()) -> boolean().
-has_route(Topic) ->
+has_route(Topic) when is_binary(Topic) ->
     ets:member(route, Topic).
     ets:member(route, Topic).
 
 
 %% @doc Lookup Routes
 %% @doc Lookup Routes
@@ -94,12 +107,12 @@ lookup_routes(Topic) when is_binary(Topic) ->
             []
             []
     end.
     end.
 
 
-%% @doc Add Route.
+%% @doc Add Route
 -spec add_route(emqttd_topic:topic(), pid()) -> ok.
 -spec add_route(emqttd_topic:topic(), pid()) -> ok.
 add_route(Topic, Pid) when is_pid(Pid) ->
 add_route(Topic, Pid) when is_pid(Pid) ->
     call(pick(Topic), {add_route, Topic, Pid}).
     call(pick(Topic), {add_route, Topic, Pid}).
 
 
-%% @doc Add Routes.
+%% @doc Add Routes
 -spec add_routes(list(emqttd_topic:topic()), pid()) -> ok.
 -spec add_routes(list(emqttd_topic:topic()), pid()) -> ok.
 add_routes([], _Pid) ->
 add_routes([], _Pid) ->
     ok;
     ok;
@@ -111,12 +124,12 @@ add_routes(Topics, Pid) ->
                 call(Router, {add_routes, Slice, Pid})
                 call(Router, {add_routes, Slice, Pid})
         end, slice(Topics)).
         end, slice(Topics)).
 
 
-%% @doc Delete Route.
+%% @doc Delete Route
 -spec delete_route(emqttd_topic:topic(), pid()) -> ok.
 -spec delete_route(emqttd_topic:topic(), pid()) -> ok.
 delete_route(Topic, Pid) ->
 delete_route(Topic, Pid) ->
     cast(pick(Topic), {delete_route, Topic, Pid}).
     cast(pick(Topic), {delete_route, Topic, Pid}).
 
 
-%% @doc Delete Routes.
+%% @doc Delete Routes
 -spec delete_routes(list(emqttd_topic:topic()), pid()) -> ok.
 -spec delete_routes(list(emqttd_topic:topic()), pid()) -> ok.
 delete_routes([Topic], Pid) ->
 delete_routes([Topic], Pid) ->
     delete_route(Topic, Pid);
     delete_route(Topic, Pid);
@@ -136,8 +149,11 @@ slice(Topics) ->
 pick(Topic) ->
 pick(Topic) ->
     gproc_pool:pick_worker(router, Topic).
     gproc_pool:pick_worker(router, Topic).
 
 
+%% @doc For unit test.
 stop(Id) when is_integer(Id) ->
 stop(Id) when is_integer(Id) ->
-    gen_server2:call(?PROC_NAME(?MODULE, Id), stop).
+    gen_server2:call(?PROC_NAME(?MODULE, Id), stop);
+stop(Pid) when is_pid(Pid) ->
+    gen_server2:call(Pid, stop).
 
 
 call(Router, Request) ->
 call(Router, Request) ->
     gen_server2:call(Router, Request, infinity).
     gen_server2:call(Router, Request, infinity).
@@ -147,21 +163,22 @@ cast(Router, Msg) ->
 
 
 init([Pool, Id, StatsFun, Opts]) ->
 init([Pool, Id, StatsFun, Opts]) ->
 
 
-    emqttd_time:seed(),
-
     %% Calls from pubsub should be scheduled first?
     %% Calls from pubsub should be scheduled first?
     process_flag(priority, high),
     process_flag(priority, high),
 
 
-    ?GPROC_POOL(join, Pool, Id),
+    emqttd_time:seed(),
 
 
-    AgingSecs = proplists:get_value(route_aging, Opts, 5),
+    ?GPROC_POOL(join, Pool, Id),
 
 
-    %% Aging Timer
-    {ok, AgingTref} = start_tick(AgingSecs + random:uniform(AgingSecs)),
+    Aging = init_aging(Opts),
 
 
-    Aging = #aging{topics = dict:new(), time = AgingSecs, tref = AgingTref},
+    {ok, #state{pool = Pool, id = Id, aging = Aging, statsfun = StatsFun}}.
 
 
-    {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, aging = Aging}}.
+%% Init Aging
+init_aging(Opts) ->
+    AgingSecs = proplists:get_value(route_aging, Opts, 5),
+    {ok, AgingTref} = start_tick(AgingSecs + random:uniform(AgingSecs)),
+    #aging{topics = dict:new(), time = AgingSecs, tref = AgingTref}.
 
 
 start_tick(Secs) ->
 start_tick(Secs) ->
     timer:send_interval(timer:seconds(Secs), {clean, aged}).
     timer:send_interval(timer:seconds(Secs), {clean, aged}).
@@ -181,24 +198,15 @@ handle_call(Req, _From, State) ->
    ?UNEXPECTED_REQ(Req, State).
    ?UNEXPECTED_REQ(Req, State).
 
 
 handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) ->
 handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) ->
-    ets:delete_object(route, {Topic, Pid}),
-    NewState =
-    case has_route(Topic) of
-        false -> State#state{aging = store_aged(Topic, Aging)};
-        true  -> State
-    end,
-    {noreply, setstats(NewState)};
+    Aging1 = delete_route_(Topic, Pid, Aging),
+    {noreply, setstats(State#state{aging = Aging1})};
 
 
 handle_cast({delete_routes, Topics, Pid}, State) ->
 handle_cast({delete_routes, Topics, Pid}, State) ->
-    NewAging =
+    Aging1 =
     lists:foldl(fun(Topic, Aging) ->
     lists:foldl(fun(Topic, Aging) ->
-                    ets:delete_object(route, {Topic, Pid}),
-                    case has_route(Topic) of
-                        false -> store_aged(Topic, Aging);
-                        true  -> Aging
-                    end
+                    delete_route_(Topic, Pid, Aging)
             end, State#state.aging, Topics),
             end, State#state.aging, Topics),
-    {noreply, setstats(State#state{aging = NewAging})};
+    {noreply, setstats(State#state{aging = Aging1})};
 
 
 handle_cast(Msg, State) ->
 handle_cast(Msg, State) ->
     ?UNEXPECTED_MSG(Msg, State).
     ?UNEXPECTED_MSG(Msg, State).
@@ -211,9 +219,9 @@ handle_info({clean, aged}, State = #state{aging = Aging}) ->
 
 
     Dict1 = try_clean(ByTime, dict:to_list(Dict)),
     Dict1 = try_clean(ByTime, dict:to_list(Dict)),
 
 
-    NewAging = Aging#aging{topics = dict:from_list(Dict1)},
+    Aging1 = Aging#aging{topics = dict:from_list(Dict1)},
 
 
-    {noreply, State#state{aging = NewAging}, hibernate};
+    {noreply, State#state{aging = Aging1}, hibernate};
 
 
 handle_info(Info, State) ->
 handle_info(Info, State) ->
     ?UNEXPECTED_INFO(Info, State).
     ?UNEXPECTED_INFO(Info, State).
@@ -225,6 +233,13 @@ terminate(_Reason, #state{pool = Pool, id = Id, aging = #aging{tref = TRef}}) ->
 code_change(_OldVsn, State, _Extra) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.
 
 
+delete_route_(Topic, Pid, Aging) ->
+    ets:delete_object(route, {Topic, Pid}),
+    case has_route(Topic) of
+        false -> store_aged(Topic, Aging);
+        true  -> Aging
+    end.
+
 try_clean(ByTime, List) ->
 try_clean(ByTime, List) ->
     try_clean(ByTime, List, []).
     try_clean(ByTime, List, []).
 
 
@@ -253,15 +268,12 @@ try_clean2(ByTime, {Topic, _TS}, Left, Acc) ->
 try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
 try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
     %% Lock topic first
     %% Lock topic first
     case mnesia:wread({topic, Topic}) of
     case mnesia:wread({topic, Topic}) of
-        [] ->
-            ok; %% mnesia:abort(not_found);
-        [TopicR] ->
-            %% Remove topic and trie
-            delete_topic(TopicR),
-            emqttd_trie:delete(Topic);
-        _More ->
-            %% Remove topic only
-            delete_topic(TopicR)
+        []       -> ok;
+        [TopicR] -> %% Remove topic and trie
+                    delete_topic(TopicR),
+                    emqttd_trie:delete(Topic);
+        _More    -> %% Remove topic only
+                    delete_topic(TopicR)
     end.
     end.
 
 
 delete_topic(TopicR) ->
 delete_topic(TopicR) ->

+ 1 - 1
test/emqttd_access_control_tests.erl

@@ -39,7 +39,7 @@ setup() ->
         {auth, [{anonymous, []}]},
         {auth, [{anonymous, []}]},
         {acl, [ %% ACL config
         {acl, [ %% ACL config
             %% Internal ACL module
             %% Internal ACL module
-            {internal,  [{file, "./testdata/test_acl.config"}, {nomatch, allow}]}
+            {internal,  [{file, "../testdata/test_acl.config"}, {nomatch, allow}]}
         ]}
         ]}
     ],
     ],
     ?AC:start_link(AclOpts).
     ?AC:start_link(AclOpts).

+ 0 - 1
test/emqttd_mqueue_tests.erl

@@ -149,7 +149,6 @@ priority_mqueue2_test() ->
     Q4 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
     Q4 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
     ?assertEqual(4, ?Q:len(Q4)),
     ?assertEqual(4, ?Q:len(Q4)),
     {{value, Val}, Q5} = ?Q:out(Q4),
     {{value, Val}, Q5} = ?Q:out(Q4),
-    ?debugFmt("Val: ~p~n", [Val]),
     ?assertEqual(3, ?Q:len(Q5)).
     ?assertEqual(3, ?Q:len(Q5)).
  
  
 alarm_fun() -> fun(_, _) -> alarm_fun() end.
 alarm_fun() -> fun(_, _) -> alarm_fun() end.

+ 0 - 31
test/emqttd_tests.erl

@@ -1,31 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_tests).
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-start_stop_test() ->
-    application:start(lager),
-    application:ensure_all_started(emqttd),
-    application:stop(emqttd),
-    application:stop(esockd),
-    application:stop(gproc).
-
--endif.
-