Feng 10 роки тому
батько
коміт
db8b7c9d82
6 змінених файлів з 458 додано та 0 видалено
  1. 7 0
      TODO
  2. 7 0
      doc/pool.md
  3. 62 0
      src/emqttd_pool_sup.erl
  4. 87 0
      src/emqttd_pubsub_helper.erl
  5. 249 0
      src/emqttd_router.erl
  6. 46 0
      src/emqttd_router_sup.erl

+ 7 - 0
TODO

@@ -0,0 +1,7 @@
+
+TODO 1. refactor gproc_pool usage
+
+TODO 2. emqttd_router, emqttd_pubsub to route message
+
+TODO 3. sup, pool_sup, manager......
+

+ 7 - 0
doc/pool.md

@@ -0,0 +1,7 @@
+sup(one_for_all)
+    manager
+    pool_sup(one_for_one)
+        worker1
+        worker2
+        ...
+        workerN

+ 62 - 0
src/emqttd_pool_sup.erl

@@ -0,0 +1,62 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc Common Pool Supervisor
+%%%
+%%% @author Feng Lee <feng@emqtt.io>
+%%%
+%%%-----------------------------------------------------------------------------
+-module(emqttd_pool_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([spec/2, start_link/3, start_link/4]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-spec spec(any(), list()) -> supervisor:child_spec().
+spec(Id, Args) ->
+    {Id, {?MODULE, start_link, Args},
+        transient, infinity, supervisor, [?MODULE]}.
+
+-spec start_link(atom(), atom(), mfa()) -> {ok, pid()} | {error, any()}.
+start_link(Pool, Type, MFA) ->
+    Schedulers = erlang:system_info(schedulers),
+    start_link(Pool, Type, Schedulers, MFA).
+
+-spec start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, any()}.
+start_link(Pool, Type, Size, MFA) ->
+    supervisor:start_link({local, sup_name(Pool)}, ?MODULE, [Pool, Type, Size, MFA]).
+
+sup_name(Pool) ->
+    list_to_atom(atom_to_list(Pool) ++ "_pool_sup").
+
+init([Pool, Type, Size, {M, F, Args}]) ->
+    emqttd:ensure_pool(Pool, Type, [{size, Size}]),
+    {ok, {{one_for_one, 10, 3600}, [
+        begin
+            gproc_pool:add_worker(Pool, {Pool, I}, I),
+            {{M, I}, {M, F, [Pool, I | Args]},
+                transient, 5000, worker, [M]}
+        end || I <- lists:seq(1, Size)]}}.
+

+ 87 - 0
src/emqttd_pubsub_helper.erl

@@ -0,0 +1,87 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc PubSub Helper
+%%%
+%%% @author Feng Lee <feng@emqtt.io>
+%%%
+%%%-----------------------------------------------------------------------------
+-module(emqttd_pubsub_helper).
+
+-behaviour(gen_server).
+
+-include("emqttd.hrl").
+
+-define(SERVER, ?MODULE).
+
+%% API Function Exports
+-export([start_link/0]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(aging, {topics, timer}).
+
+-record(state, {aging :: #aging{}}).
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+
+init(Args) ->
+    {ok, Args}.
+
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    TopicR = #mqtt_topic{_ = '_', node = node()},
+    F = fun() ->
+            [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)]
+            %%TODO: remove trie??
+        end,
+    mnesia:transaction(F),
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% ------------------------------------------------------------------
+%% Internal Function Definitions
+%% ------------------------------------------------------------------
+

+ 249 - 0
src/emqttd_router.erl

@@ -0,0 +1,249 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc MQTT Message Router on Local Node
+%%%
+%%% Route Table:
+%%%
+%%%   Topic -> {Pid1, Qos}, {Pid2, Qos}, ... 
+%%%
+%%% Reverse Route Table:
+%%%
+%%%   Pid -> {Topic1, Qos}, {Topic2, Qos}, ...
+%%%
+%%% @end
+%%%
+%%% @author Feng Lee <feng@emqtt.io>
+%%%
+%%%-----------------------------------------------------------------------------
+-module(emqttd_router).
+
+-behaviour(gen_server2).
+
+-include("emqttd.hrl").
+
+-include("emqttd_protocol.hrl").
+
+-export([start_link/2, add_routes/1, add_routes/2, route/2,
+         delete_routes/1, delete_routes/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+%%TODO: test...
+-compile(export_all).
+
+%%%=============================================================================
+%%% API Function Definitions
+%%%=============================================================================
+
+%%------------------------------------------------------------------------------
+%% @doc Start router.
+%% @end
+%%------------------------------------------------------------------------------
+start_link(Id, Opts) ->
+    gen_server2:start_link(?MODULE, [Id, Opts], []).
+
+%%------------------------------------------------------------------------------
+%% @doc Add Routes.
+%% @end
+%%------------------------------------------------------------------------------
+-spec add_routes(list({binary(), mqtt_qos()})) -> ok.
+add_routes(TopicTable) ->
+    add_routes(TopicTable, self()).
+    
+-spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok.
+add_routes(TopicTable, Pid) ->
+    Router = gproc_pool:pick_worker(router, Pid),
+    gen_server2:cast(Router, {add_routes, TopicTable, Pid}).
+
+%%------------------------------------------------------------------------------
+%% @doc Lookup topics that a pid subscribed.
+%% @end
+%%------------------------------------------------------------------------------
+-spec lookup(pid()) -> list({binary(), mqtt_qos()}).
+lookup(Pid) when is_pid(Pid) ->
+    [{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)].
+
+%%------------------------------------------------------------------------------
+%% @doc Route Message on Local Node.
+%% @end
+%%------------------------------------------------------------------------------
+-spec route(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
+route(Queue = <<"$Q/", _Q>>, Msg) ->
+    case ets:lookup(route, Queue) of
+        [] ->
+            setstats(dropped, true);
+        Routes ->
+            Idx = random:uniform(length(Routes)),
+            {_, SubPid, SubQos} = lists:nth(Idx, Routes),
+            SubPid ! {dispatch, tune_qos(SubQos, Msg)}
+    end;
+
+route(Topic, Msg) ->
+    Routes = ets:lookup(route, Topic),
+    setstats(dropped, Routes =:= []),
+    lists:foreach(
+        fun({_Topic, SubPid, SubQos}) ->
+            SubPid ! {dispatch, tune_qos(SubQos, Msg)}
+        end, Routes).
+
+tune_qos(SubQos, Msg = #mqtt_message{qos = PubQos}) when PubQos > SubQos ->
+    Msg#mqtt_message{qos = SubQos};
+tune_qos(_SubQos, Msg) ->
+    Msg.
+
+%%------------------------------------------------------------------------------
+%% @doc Delete Routes.
+%% @end
+%%------------------------------------------------------------------------------
+-spec delete_routes(list(binary())) -> ok.
+delete_routes(Topics) ->
+    delete_routes(Topics, self()). 
+
+-spec delete_routes(list(binary()), pid()) -> ok.
+delete_routes(Topics, Pid) ->
+    Router = gproc_pool:pick_worker(router, Pid),
+    gen_server2:cast(Router, {delete_routes, Topics, Pid}).
+
+%%%=============================================================================
+%%% gen_server Function Definitions
+%%%=============================================================================
+
+init([Id, Opts]) ->
+    %% Only ETS Operations
+    process_flag(priority, high),
+
+    %% Aging Timer
+    AgingSecs = proplists:get_value(aging, Opts, 5),
+
+    {ok, TRef} = timer:send_interval(timer:seconds(AgingSecs), aging),
+
+    gproc_pool:connect_worker(router, {?MODULE, Id}),
+
+    {ok, #state{aging = #aging{topics = [], timer = TRef}}}.
+
+handle_call(Req, _From, State) ->
+    lager:error("Unexpected Request: ~p", [Req]),
+    {reply, {error, unsupported_req}, State}.
+
+handle_cast({add_routes, TopicTable, Pid}, State) ->
+    case lookup(Pid) of
+        [] ->
+            erlang:monitor(process, Pid),
+            ets_add_routes(TopicTable, Pid);
+        TopicInEts ->
+            {NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts),
+            ets_update_routes(UpdatedTopics, Pid),
+            ets_add_routes(NewTopics, Pid)
+    end,
+    {noreply, State};
+
+handle_cast({delete, Topics, Pid}, State) ->
+    Routes = [{Topic, Pid} || Topic <- Topics],
+    lists:foreach(fun ets_delete_route/1, Routes),
+    %% TODO: aging route......
+    {noreply, State};
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
+    Topics = [Topic || {Topic, _Qos} <- lookup(DownPid)],
+    ets:delete(reverse_route, DownPid),
+    lists:foreach(fun(Topic) ->
+            ets:match_delete(route, {Topic, DownPid, '_'})
+        end, Topics),
+    %% TODO: aging route......
+    {noreply, State};
+
+handle_info(aging, State = #state{aging = #aging{topics = Topics}}) ->
+    %%TODO.. aging
+    %%io:format("Aging Topics: ~p~n", [Topics]),
+    {noreply, State};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, #state{id = Id, aging = #aging{timer = TRef}}) ->
+    timer:cancel(TRef),
+    gproc_pool:connect_worker(route, {?MODULE, Id}),
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%=============================================================================
+%%% Internal Functions
+%%%=============================================================================
+
+diff(TopicTable, TopicInEts) ->
+    diff(TopicTable, TopicInEts, [], []).
+
+diff([], _TopicInEts, NewAcc, UpAcc) ->
+    {NewAcc, UpAcc};
+
+diff([{Topic, Qos}|TopicTable], TopicInEts, NewAcc, UpAcc) ->
+    case lists:keyfind(Topic, 1, TopicInEts) of
+        {Topic, Qos}  ->
+            diff(TopicTable, TopicInEts, NewAcc, UpAcc);
+        {Topic, _Qos} ->
+            diff(TopicTable, TopicInEts, NewAcc, [{Topic, Qos}|UpAcc]);
+        false ->
+            diff(TopicTable, TopicInEts, [{Topic, Qos}|NewAcc], UpAcc)
+    end.
+
+ets_add_routes([], _Pid) ->
+    ok;
+ets_add_routes(TopicTable, Pid) ->
+    {Routes, ReverseRoutes} = routes(TopicTable, Pid),
+    ets:insert(route, Routes),
+    ets:insert(reverse_route, ReverseRoutes).
+
+ets_update_routes([], _Pid) ->
+    ok;
+ets_update_routes(TopicTable, Pid) ->
+    {Routes, ReverseRoutes} = routes(TopicTable, Pid),
+    lists:foreach(fun ets_update_route/1, Routes),
+    lists:foreach(fun ets_update_reverse_route/1, ReverseRoutes).
+
+ets_update_route(Route = {Topic, Pid, _Qos}) ->
+    ets:match_delete(route, {Topic, Pid, '_'}),
+    ets:insert(route, Route).
+
+ets_update_reverse_route(RevRoute = {Pid, Topic, _Qos}) ->
+    ets:match_delete(reverse_route, {Pid, Topic, '_'}),
+    ets:insert(reverse_route, RevRoute).
+
+ets_delete_route({Topic, Pid}) ->
+    ets:match_delete(reverse_route, {Pid, Topic, '_'}),
+    ets:match_delete(route, {Topic, Pid, '_'}).
+
+routes(TopicTable, Pid) ->
+    F = fun(Topic, Qos) -> {{Topic, Pid, Qos}, {Pid, Topic, Qos}} end,
+    lists:unzip([F(Topic, Qos) || {Topic, Qos} <- TopicTable]).
+
+setstats(dropped, false) ->
+    ignore;
+
+setstats(dropped, true) ->
+    emqttd_metrics:inc('messages/dropped').
+

+ 46 - 0
src/emqttd_router_sup.erl

@@ -0,0 +1,46 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc Router Supervisor
+%%%
+%%% @author Feng Lee <feng@emqtt.io>
+%%%
+%%%-----------------------------------------------------------------------------
+-module(emqttd_router_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+start_link() ->
+    Opts = emqttd_broker:env(router),
+    supervisor:start_link({local, ?MODULE}, ?MODULE, [Opts]).
+
+init([Opts]) ->
+    create_route_tabs(Opts),
+    MFA = {emqttd_router, start_link, [Opts]},
+    PoolSup = emqttd_pool_sup:spec(pool_sup, [router, hash, MFA]),
+    {ok, {{one_for_all, 10, 3600}, [PoolSup]}}.
+