소스 검색

add out/2, plen/1

Feng 10 년 전
부모
커밋
4ebdbd75a3
4개의 변경된 파일176개의 추가작업 그리고 88개의 파일을 삭제
  1. 1 0
      src/emqttd.erl
  2. 129 66
      src/emqttd_mqueue.erl
  3. 11 19
      src/emqttd_router.erl
  4. 35 3
      src/priority_queue.erl

+ 1 - 0
src/emqttd.erl

@@ -23,6 +23,7 @@
 %%%
 %%% @author Feng Lee <feng@emqtt.io>
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd).
 
 -export([start/0, env/1, env/2,

+ 129 - 66
src/emqttd_mqueue.erl

@@ -30,20 +30,21 @@
 %%%
 %%% If the broker restarted or crashed, all the messages queued will be gone.
 %%% 
-%%% Desgin of The Queue:
+%%% Concept of Message Queue and Inflight Window:
+%%%
 %%%       |<----------------- Max Len ----------------->|
 %%%       -----------------------------------------------
-%%% IN -> |       Pending Messages   | Inflight Window  | -> Out
+%%% IN -> |      Messages Queue   |  Inflight Window    | -> Out
 %%%       -----------------------------------------------
-%%%                                  |<--- Win Size --->|
+%%%                               |<---   Win Size  --->|
 %%%
 %%%
-%%% 1. Inflight Window to store the messages awaiting for ack.
+%%% 1. Inflight Window to store the messages delivered and awaiting for puback.
 %%%
-%%% 2. Suspend IN messages when the queue is deactive, or inflight windows is full.
+%%% 2. Enqueue messages when the inflight window is full.
 %%%
 %%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
-%%%    otherwise dropped the oldest pending one.
+%%%    otherwise dropped the oldest one.
 %%%
 %%% @end
 %%%
@@ -55,96 +56,158 @@
 
 -include("emqttd_protocol.hrl").
 
--export([new/3, name/1,
-         is_empty/1, is_full/1,
-         len/1, max_len/1,
-         in/2, out/1,
-         stats/1]).
+-export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]).
 
 -define(LOW_WM, 0.2).
 
 -define(HIGH_WM, 0.6).
 
--record(mqueue, {name,
-                 q          = queue:new(), %% pending queue
-                 len        = 0,           %% current queue len
-                 low_wm     = ?LOW_WM,
-                 high_wm    = ?HIGH_WM,
-                 max_len    = ?MAX_LEN,
-                 qos0       = false,
-                 dropped    = 0,
+-type priority() :: {iolist(), pos_integer()}.
+
+-type option() :: {type, simple | priority}
+                | {max_length, pos_integer() | infinity}
+                | {priority, list(priority())}
+                | {low_watermark, float()}     %% Low watermark
+                | {high_watermark, float()}    %% High watermark
+                | {queue_qos0, boolean()}.     %% Queue Qos0?
+
+-type mqueue_option() :: {max_length, pos_integer()} %% Max queue length
+                       | {low_watermark, float()}    %% Low watermark
+                       | {high_watermark, float()}   %% High watermark
+                       | {queue_qos0, boolean()}.    %% Queue Qos0
+
+-type stat() :: {max_len, infinity | pos_integer()}
+              | {len, non_neg_integer()}
+              | {dropped, non_neg_integer()}.
+
+-record(mqueue, {type :: simple | priority,
+                 name, q :: queue:queue() | priority_queue:q(),
+                 %% priority table
+                 pseq = 0, priorities = [],
+                 %% len of simple queue
+                 len = 0, max_len = ?MAX_LEN,
+                 low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
+                 qos0 = false, dropped = 0,
                  alarm_fun}).
 
 -type mqueue() :: #mqueue{}.
 
--type mqueue_option() :: {max_length, pos_integer()}      %% Max queue length
-                       | {low_watermark, float()}         %% Low watermark
-                       | {high_watermark, float()}        %% High watermark
-                       | {queue_qos0, boolean()}.         %% Queue Qos0
-
--export_type([mqueue/0]).
+-export_type([mqueue/0, priority/0, option/0]).
 
-%%------------------------------------------------------------------------------
 %% @doc New Queue.
-%% @end
-%%------------------------------------------------------------------------------
--spec new(binary(), list(mqueue_option()), fun()) -> mqueue().
+-spec new(iolist(), list(mqueue_option()), fun()) -> mqueue().
 new(Name, Opts, AlarmFun) ->
-    MaxLen = emqttd_opts:g(max_length, Opts, 1000),
-    #mqueue{name     = Name,
-            max_len  = MaxLen,
-            low_wm   = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)),
-            high_wm  = round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)),
-            qos0     = emqttd_opts:g(queue_qos0, Opts, true),
-            alarm_fun = AlarmFun}.
+    Type = emqttd_opts:g(type, Opts, simple),
+    MaxLen = emqttd_opts:g(max_length, Opts, infinity),
+    init_q(#mqueue{type = Type, name = Name, max_len = MaxLen,
+                   low_wm = low_wm(MaxLen, Opts),
+                   high_wm = high_wm(MaxLen, Opts),
+                   qos0 = emqttd_opts:g(queue_qos0, Opts, false),
+                   alarm_fun = AlarmFun}, Opts).
+
+init_q(MQ = #mqueue{type = simple}, _Opts) ->
+    MQ#mqueue{q = queue:new()};
+init_q(MQ = #mqueue{type = priority}, Opts) ->
+    Priorities = emqttd_opts:g(priority, Opts, []),
+    init_p(Priorities, MQ#mqueue{q = priority_queue:new()}).
+
+init_p([], MQ) ->
+    MQ;
+init_p([{Topic, P} | L], MQ) ->
+    {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
+    init_p(L, MQ1).
+
+insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
+    <<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
+    {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}.
+
+low_wm(infinity, _Opts) ->
+    infinity;
+low_wm(MaxLen, Opts) ->
+    round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)).
+
+high_wm(infinity, _Opts) ->
+    infinity;
+high_wm(MaxLen, Opts) ->
+    round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)).
 
+-spec name(mqueue()) -> iolist().
 name(#mqueue{name = Name}) ->
     Name.
 
-is_empty(#mqueue{len = 0}) -> true;
-is_empty(_MQ)              -> false.
+-spec type(mqueue()) -> atom().
+type(#mqueue{type = Type}) ->
+    Type.
 
-is_full(#mqueue{len = Len, max_len = MaxLen})
-    when Len =:= MaxLen -> true;
-is_full(_MQ) -> false.
+is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
+is_empty(#mqueue{type = priority, q = Q})   -> priority_queue:is_empty(Q).
 
-len(#mqueue{len = Len}) -> Len.
+len(#mqueue{type = simple, len = Len}) -> Len;
+len(#mqueue{type = priority, q = Q})   -> priority_queue:len(Q).
 
 max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
 
-stats(#mqueue{max_len = MaxLen, len = Len, dropped = Dropped}) ->
-    [{max_len, MaxLen}, {len, Len}, {dropped, Dropped}].
-
-%%------------------------------------------------------------------------------
-%% @doc Queue one message.
-%% @end
-%%------------------------------------------------------------------------------
+stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
+    [{len, case Type of
+                simple   -> Len;
+                priority -> priority_queue:len(Q)
+            end} | [{max_len, MaxLen}, {dropped, Dropped}]].
 
+%% @doc Enqueue a message.
 -spec in(mqtt_message(), mqueue()) -> mqueue().
-%% drop qos0
 in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
     MQ;
-
-%% simply drop the oldest one if queue is full, improve later
-in(Msg, MQ = #mqueue{q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
-    when Len =:= MaxLen ->
-    {{value, _OldMsg}, Q2} = queue:out(Q),
-    %lager:error("MQueue(~s) drop ~s", [Name, emqttd_message:format(OldMsg)]),
+in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
+    MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
+in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
+    when Len >= MaxLen ->
+    {{value, _Old}, Q2} = queue:out(Q),
     MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
-
-in(Msg, MQ = #mqueue{q = Q, len = Len}) ->
-    maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}).
-
-out(MQ = #mqueue{len = 0}) ->
+in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
+    maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
+
+in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
+                                                    priorities = Priorities,
+                                                    max_len = infinity}) ->
+    case lists:keysearch(Topic, 1, Priorities) of
+        {value, {_, Pri}} ->
+            MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)};
+        false ->
+            {Pri, MQ1} = insert_p(Topic, 0, MQ),
+            MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
+    end;
+in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
+                                                    priorities = Priorities,
+                                                    max_len = MaxLen}) ->
+    case lists:keysearch(Topic, 1, Priorities) of
+        {value, {_, Pri}} ->
+            case priority_queue:plen(Pri, Q) >= MaxLen of
+                true ->
+                    {_, Q1} = priority_queue:out(Pri, Q),
+                    MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)};
+                false ->
+                    MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}
+            end;
+        false ->
+            {Pri, MQ1} = insert_p(Topic, 0, MQ),
+            MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
+    end.
+
+out(MQ = #mqueue{type = simple, len = 0}) ->
     {empty, MQ};
-
-out(MQ = #mqueue{q = Q, len = Len}) ->
-    {Result, Q2} = queue:out(Q),
-    {Result, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}.
+out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
+    {R, Q2} = queue:out(Q),
+    {R, MQ#mqueue{q = Q2, len = Len - 1}};
+out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
+    {R, Q2} = queue:out(Q),
+    {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
+out(MQ = #mqueue{type = priority, q = Q}) ->
+    {R, Q2} = priority_queue:out(Q),
+    {R, MQ#mqueue{q = Q2}}.
 
 maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
     when Len > HighWM ->
-    Alarm = #mqtt_alarm{id = list_to_binary(["queue_high_watermark.", Name]),
+    Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
                         severity = warning,
                         title = io_lib:format("Queue ~s high-water mark", [Name]),
                         summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},

+ 11 - 19
src/emqttd_router.erl

@@ -19,7 +19,7 @@
 %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 %%% SOFTWARE.
 %%%-----------------------------------------------------------------------------
-%%% @doc Message Router on Local Node.
+%%% @doc Message Router on local node.
 %%%
 %%% @author Feng Lee <feng@emqtt.io>
 %%%-----------------------------------------------------------------------------
@@ -52,8 +52,6 @@
 
 -record(state, {pool, id, statsfun, aging :: #aging{}}).
 
--type topic() :: binary().
-
 %% @doc Start a local router.
 -spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}.
 start_link(Pool, Id, StatsFun, Env) ->
@@ -61,7 +59,7 @@ start_link(Pool, Id, StatsFun, Env) ->
                            ?MODULE, [Pool, Id, StatsFun, Env], []).
 
 %% @doc Route Message on the local node.
--spec route(topic(), mqtt_message()) -> any().
+-spec route(emqttd_topic:topic(), mqtt_message()) -> any().
 route(Queue = <<"$Q/", _Q>>, Msg) ->
     case lookup_routes(Queue) of
         [] ->
@@ -87,12 +85,12 @@ route(Topic, Msg) ->
     end.
 
 %% @doc Has Route?
--spec has_route(topic()) -> boolean().
+-spec has_route(emqttd_topic:topic()) -> boolean().
 has_route(Topic) ->
     ets:member(route, Topic).
 
 %% @doc Lookup Routes
--spec lookup_routes(topic()) -> list(pid()).
+-spec lookup_routes(emqttd_topic:topic()) -> list(pid()).
 lookup_routes(Topic) when is_binary(Topic) ->
     case ets:member(route, Topic) of
         true  ->
@@ -102,12 +100,12 @@ lookup_routes(Topic) when is_binary(Topic) ->
     end.
 
 %% @doc Add Route.
--spec add_route(topic(), pid()) -> ok.
+-spec add_route(emqttd_topic:topic(), pid()) -> ok.
 add_route(Topic, Pid) when is_pid(Pid) ->
     call(pick(Topic), {add_route, Topic, Pid}).
 
 %% @doc Add Routes.
--spec add_routes(list(topic()), pid()) -> ok.
+-spec add_routes(list(emqttd_topic:topic()), pid()) -> ok.
 add_routes([], _Pid) ->
     ok;
 add_routes([Topic], Pid) ->
@@ -119,12 +117,12 @@ add_routes(Topics, Pid) ->
         end, slice(Topics)).
 
 %% @doc Delete Route.
--spec delete_route(topic(), pid()) -> ok.
+-spec delete_route(emqttd_topic:topic(), pid()) -> ok.
 delete_route(Topic, Pid) ->
     cast(pick(Topic), {delete_route, Topic, Pid}).
 
 %% @doc Delete Routes.
--spec delete_routes(list(topic()), pid()) -> ok.
+-spec delete_routes(list(emqttd_topic:topic()), pid()) -> ok.
 delete_routes([Topic], Pid) ->
     delete_route(Topic, Pid);
 
@@ -136,13 +134,7 @@ delete_routes(Topics, Pid) ->
 %% @private Slice topics.
 slice(Topics) ->
     dict:to_list(lists:foldl(fun(Topic, Dict) ->
-                    Router = pick(Topic),
-                    case dict:find(Router, Dict) of
-                        {ok, L} ->
-                            dict:store(Router, [Topic | L], Dict);
-                        error   ->
-                            dict:store(Router, [Topic], Dict)
-                    end
+                    dict:append(pick(Topic), Topic, Dict)
             end, dict:new(), Topics)).
 
 %% @private Pick a router.
@@ -162,7 +154,7 @@ init([Pool, Id, StatsFun, Opts]) ->
 
     ?GPROC_POOL(join, Pool, Id),
 
-    random:seed(os:timestamp()),
+    random:seed(erlang:now()),
 
     AgingSecs = proplists:get_value(route_aging, Opts, 5),
 
@@ -261,7 +253,7 @@ try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
     %% Lock topic first
     case mnesia:wread({topic, Topic}) of
         [] ->
-            mnesia:abort(not_found);
+            ok; %% mnesia:abort(not_found);
         [TopicR] ->
             %% Remove topic and trie
             delete_topic(TopicR),

+ 35 - 3
src/priority_queue.erl

@@ -37,11 +37,10 @@
 %% calls into the same function knowing that ordinary queues represent
 %% a base case.
 
-
 -module(priority_queue).
 
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1,
-         in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]).
+-export([new/0, is_queue/1, is_empty/1, len/1, plen/2, to_list/1, from_list/1,
+         in/2, in/3, out/1, out/2, out_p/1, join/2, filter/2, fold/3, highest/1]).
 
 %%----------------------------------------------------------------------------
 
@@ -58,6 +57,7 @@
 -spec(is_queue/1 :: (any()) -> boolean()).
 -spec(is_empty/1 :: (pqueue()) -> boolean()).
 -spec(len/1 :: (pqueue()) -> non_neg_integer()).
+-spec(len_p/2 :: (priority(), pqueue()) -> non_neg_integer()).
 -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
 -spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()).
 -spec(in/2 :: (any(), pqueue()) -> pqueue()).
@@ -96,6 +96,16 @@ len({queue, _R, _F, L}) ->
 len({pqueue, Queues}) ->
     lists:sum([len(Q) || {_, Q} <- Queues]).
 
+plen(0, {queue, _R, _F, L}) ->
+    L;
+plen(P, {queue, _R, _F, _}) ->
+    erlang:error(badarg, [P]);
+plen(P, {pqueue, Queues}) ->
+    case lists:keysearch(maybe_negate_priority(P), 1, Queues) of
+        {value, {_, Q}} -> len(Q);
+        false           -> 0
+    end.
+
 to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) ->
     [{0, V} || V <- Out ++ lists:reverse(In, [])];
 to_list({pqueue, Queues}) ->
@@ -159,6 +169,28 @@ out({pqueue, [{P, Q} | Queues]}) ->
 out_p({queue, _, _, _}       = Q) -> add_p(out(Q), 0);
 out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
 
+out(0, {queue, _, _, _} = Q) ->
+    out(Q);
+out(Priority, {queue, _, _, _}) ->
+    erlang:error(badarg, [Priority]);
+out(Priority, {pqueue, Queues}) ->
+    P = maybe_negate_priority(Priority),
+    case lists:keysearch(P, 1, Queues) of
+        {value, {_, Q}} ->
+            {R, Q1} = out(Q),
+            Queues1 = case is_empty(Q1) of
+                true  -> lists:keydelete(P, 1, Queues);
+                false -> lists:keyreplace(P, 1, Queues, {P, Q1})
+            end,
+            {R, case Queues1 of
+                    []           -> {queue, [], [], 0};
+                    [{0, OnlyQ}] -> OnlyQ;
+                    [_|_]        -> {pqueue, Queues1}
+                end};
+        false ->
+            {empty, {pqueue, Queues}}
+    end.
+
 add_p(R, P) -> case R of
                    {empty, Q}      -> {empty, Q};
                    {{value, V}, Q} -> {{value, V, P}, Q}