Sfoglia il codice sorgente

backend and retained_message

Feng 10 anni fa
parent
commit
e32d85f40a
4 ha cambiato i file con 152 aggiunte e 97 eliminazioni
  1. 62 2
      src/emqttd_backend.erl
  2. 17 68
      src/emqttd_retainer.erl
  3. 52 15
      test/emqttd_SUITE.erl
  4. 21 12
      test/emqttd_backend_SUITE.erl

+ 62 - 2
src/emqttd_backend.erl

@@ -18,13 +18,19 @@
 
 -include("emqttd.hrl").
 
+-include_lib("stdlib/include/ms_transform.hrl").
+
 %% Mnesia Callbacks
 -export([mnesia/1]).
 
 -boot_mnesia({mnesia, [boot]}).
 -copy_mnesia({mnesia, [copy]}).
 
-%% API.
+%% Retained Message API
+-export([retain_message/1, read_messages/1, match_messages/1, delete_message/1,
+         expire_messages/1, retained_count/0]).
+
+%% Static Subscription API
 -export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1,
          del_subscription/2]).
 
@@ -33,6 +39,14 @@
 %%--------------------------------------------------------------------
 
 mnesia(boot) ->
+    ok = emqttd_mnesia:create_table(retained_message, [
+                {type, ordered_set},
+                {disc_copies, [node()]},
+                {record_name, mqtt_message},
+                {index, [#mqtt_message.topic]},
+                {attributes, record_info(fields, mqtt_message)},
+                {storage_properties, [{ets, [compressed]},
+                                      {dets, [{auto_save, 1000}]}]}]),
     ok = emqttd_mnesia:create_table(backend_subscription, [
                 {type, bag},
                 {disc_copies, [node()]},
@@ -42,14 +56,59 @@ mnesia(boot) ->
                                       {dets, [{auto_save, 5000}]}]}]);
 
 mnesia(copy) ->
+    ok = emqttd_mnesia:copy_table(retained_message),
     ok = emqttd_mnesia:copy_table(backend_subscription).
 
+%%--------------------------------------------------------------------
+%% Retained Message
+%%--------------------------------------------------------------------
+
+-spec(retain_message(mqtt_message()) -> ok).
+retain_message(Msg) when is_record(Msg, mqtt_message) ->
+    mnesia:dirty_write(retained_message, Msg).
+
+-spec(read_messages(binary()) -> [mqtt_message()]).
+read_messages(Topic) ->
+    mnesia:dirty_index_read(retained_message, Topic, #mqtt_message.topic).
+
+-spec(match_messages(binary()) -> [mqtt_message()]).
+match_messages(Filter) ->
+    %% TODO: optimize later...
+    Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) ->
+            case emqttd_topic:match(Name, Filter) of
+                true -> [Msg|Acc];
+                false -> Acc
+            end
+          end,
+    mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]).
+
+-spec(delete_message(binary()) -> ok).
+delete_message(Topic) ->
+    %%TODO: no transaction???
+    [mnesia:dirty_delete_object(retained_message, Msg) || Msg <- read_messages(Topic)].
+
+-spec(expire_messages(pos_integer()) -> any()).
+expire_messages(Time) when is_integer(Time) ->
+    mnesia:transaction(
+        fun() ->
+            Match = ets:fun2ms(
+                        fun(#mqtt_message{msgid = MsgId, timestamp = {MegaSecs, Secs, _}})
+                            when Time > (MegaSecs * 1000000 + Secs) -> MsgId
+                        end),
+            MsgIds = mnesia:select(retained_message, Match, write),
+            lists:foreach(fun(MsgId) -> mnesia:delete({retained_message, MsgId}) end, MsgIds)
+        end).
+
+-spec(retained_count() -> non_neg_integer()).
+retained_count() ->
+    mnesia:table_info(retained_message, size).
+
 %%--------------------------------------------------------------------
 %% Static Subscriptions
 %%--------------------------------------------------------------------
 
 %% @doc Add a static subscription manually.
--spec add_subscription(mqtt_subscription()) -> ok | {error, already_existed}.
+-spec(add_subscription(mqtt_subscription()) -> ok | {error, already_existed}).
 add_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) ->
     Pattern = match_pattern(SubId, Topic),
     return(mnesia:transaction(fun() ->
@@ -89,3 +148,4 @@ match_pattern(SubId, Topic) ->
 
 return({atomic, ok})      -> ok;
 return({aborted, Reason}) -> {error, Reason}.
+

+ 17 - 68
src/emqttd_retainer.erl

@@ -23,66 +23,41 @@
 
 -include("emqttd_internal.hrl").
 
--include_lib("stdlib/include/ms_transform.hrl").
-
-%% Mnesia callbacks
--export([mnesia/1]).
-
--boot_mnesia({mnesia, [boot]}).
--copy_mnesia({mnesia, [copy]}).
-
 %% API Function Exports
 -export([retain/1, dispatch/2]).
 
 %% API Function Exports
--export([start_link/0, expire/1]).
+-export([start_link/0]).
 
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(mqtt_retained, {topic, message}).
-
 -record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
 
-%%--------------------------------------------------------------------
-%% Mnesia callbacks
-%%--------------------------------------------------------------------
-
-mnesia(boot) ->
-    ok = emqttd_mnesia:create_table(retained, [
-                {type, ordered_set},
-                {disc_copies, [node()]},
-                {record_name, mqtt_retained},
-                {attributes, record_info(fields, mqtt_retained)}]);
-mnesia(copy) ->
-    ok = emqttd_mnesia:copy_table(retained).
-
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
 
-%% @doc Start a retained server
--spec start_link() -> {ok, pid()} | ignore | {error, any()}.
+%% @doc Start the retainer
+-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-%% @doc Retain message
--spec retain(mqtt_message()) -> ok | ignore.
+%% @doc Retain a message
+-spec(retain(mqtt_message()) -> ok | ignore).
 retain(#mqtt_message{retain = false}) -> ignore;
 
 %% RETAIN flag set to 1 and payload containing zero bytes
 retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
-    mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]);
+    emqttd_backend:delete_message(Topic);
 
 retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) ->
-    TabSize = mnesia:table_info(retained, size),
+    TabSize = emqttd_backend:retained_count(),
     case {TabSize < limit(table), size(Payload) < limit(payload)} of
         {true, true} ->
-            Retained = #mqtt_retained{topic = Topic, message = Msg},
-            lager:debug("RETAIN ~s", [emqttd_message:format(Msg)]),
-            mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]),
-            emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size));
+            emqttd_backend:retain_message(Msg),
+            emqttd_metrics:set('messages/retained', emqttd_backend:retained_count());
        {false, _}->
             lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
        {_, false}->
@@ -103,24 +78,13 @@ env(Key) ->
             Val
     end.
 
-%% @doc Deliver retained messages to subscribed client
--spec dispatch(Topic, CPid) -> any() when
-        Topic  :: binary(),
-        CPid   :: pid().
+%% @doc Deliver retained messages to the subscriber
+-spec(dispatch(Topic :: binary(), CPid :: pid()) -> any()).
 dispatch(Topic, CPid) when is_binary(Topic) ->
-    Msgs =
-    case emqttd_topic:wildcard(Topic) of
-        false ->
-            [Msg || #mqtt_retained{message = Msg} <- mnesia:dirty_read(retained, Topic)];
-        true ->
-            Fun = fun(#mqtt_retained{topic = Name, message = Msg}, Acc) ->
-                    case emqttd_topic:match(Name, Topic) of
-                        true -> [Msg|Acc];
-                        false -> Acc
-                    end
-            end,
-            mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained])
-    end,
+    Msgs = case emqttd_topic:wildcard(Topic) of
+             false -> emqttd_backend:read_messages(Topic);
+             true  -> emqttd_backend:match_messages(Topic)
+           end,
     lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)).
 
 %%--------------------------------------------------------------------
@@ -145,7 +109,7 @@ handle_cast(Msg, State) ->
     ?UNEXPECTED_MSG(Msg, State).
 
 handle_info(stats, State = #state{stats_fun = StatsFun}) ->
-    StatsFun(mnesia:table_info(retained, size)),
+    StatsFun(emqttd_backend:retained_count()),
     {noreply, State, hibernate};
 
 handle_info(expire, State = #state{expired_after = Never})
@@ -153,7 +117,7 @@ handle_info(expire, State = #state{expired_after = Never})
     {noreply, State, hibernate};
 
 handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
-    expire(emqttd_time:now_to_secs() - ExpiredAfter),
+    emqttd_backend:expire_messages(emqttd_time:now_to_secs() - ExpiredAfter),
     {noreply, State, hibernate};
 
 handle_info(Info, State) ->
@@ -166,18 +130,3 @@ terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) -
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-expire(Time) ->
-    mnesia:async_dirty(
-        fun() ->
-            Match = ets:fun2ms(
-                        fun(#mqtt_retained{topic = Topic, message = #mqtt_message{timestamp = {MegaSecs, Secs, _}}})
-                            when Time > (MegaSecs * 1000000 + Secs) -> Topic
-                        end),
-            Topics = mnesia:select(retained, Match, write),
-            lists:foreach(fun(Topic) -> mnesia:delete({retained, Topic}) end, Topics)
-        end).
-

+ 52 - 15
test/emqttd_SUITE.erl

@@ -29,6 +29,7 @@ all() ->
      {group, metrics},
      {group, stats},
      {group, hook},
+     {group, backend},
      {group, cli}].
 
 groups() ->
@@ -44,8 +45,6 @@ groups() ->
        router_unused]},
      {session, [sequence],
       [start_session]},
-     {retainer, [sequence],
-      [retain_message]},
      {broker, [sequence],
       [hook_unhook]},
      {metrics, [sequence],
@@ -55,6 +54,12 @@ groups() ->
      {hook, [sequence],
       [add_delete_hook,
        run_hooks]},
+     {retainer, [sequence],
+      [retain_messages,
+       dispatch_retained_messages,
+       expire_retained_messages]},
+     {backend, [sequence],
+      [backend_subscription]},
      {cli, [sequence],
       [ctl_register_cmd,
        cli_status,
@@ -207,19 +212,6 @@ start_session(_) ->
     emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]),
     emqttd_mock_client:stop(ClientPid).
 
-%%--------------------------------------------------------------------
-%% Retainer Group
-%%--------------------------------------------------------------------
-
-retain_message(_) ->
-    Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>,
-                        payload = <<"payload">>},
-    emqttd_retainer:retain(Msg),
-    emqttd_retainer:dispatch(<<"a/b/+">>, self()),
-    true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end,
-    emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
-    [] = mnesia:dirty_read({retained, <<"a/b/c">>}).
-
 %%--------------------------------------------------------------------
 %% Broker Group
 %%--------------------------------------------------------------------
@@ -280,6 +272,51 @@ hook_fun3(arg1, arg2, _Acc, init) -> ok.
 hook_fun4(arg1, arg2, Acc, init)  -> {ok, [r2 | Acc]}.
 hook_fun5(arg1, arg2, Acc, init)  -> {stop, [r3 | Acc]}.
 
+%%--------------------------------------------------------------------
+%% Retainer Test
+%%--------------------------------------------------------------------
+
+retain_messages(_) ->
+    Msg = emqttd_message:make(<<"clientId">>, <<"topic">>, <<"payload">>),
+    emqttd_backend:retain_message(Msg),
+    [Msg] = emqttd_backend:read_messages(<<"topic">>),
+    [Msg] = emqttd_backend:match_messages(<<"topic/#">>),
+    emqttd_backend:delete_message(<<"topic">>),
+    0 = emqttd_backend:retained_count().
+
+dispatch_retained_messages(_) ->
+    Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>,
+                        payload = <<"payload">>},
+    emqttd_retainer:retain(Msg),
+    emqttd_retainer:dispatch(<<"a/b/+">>, self()),
+    true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end,
+    emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
+    [] = emqttd_backend:read_messages(<<"a/b/c">>).
+
+expire_retained_messages(_) ->
+    Msg1 = emqttd_message:make(<<"clientId1">>, qos1, <<"topic/1">>, <<"payload1">>),
+    Msg2 = emqttd_message:make(<<"clientId2">>, qos2, <<"topic/2">>, <<"payload2">>),
+    emqttd_backend:retain_message(Msg1),
+    emqttd_backend:retain_message(Msg2),
+    timer:sleep(2000),
+    emqttd_backend:expire_messages(emqttd_time:now_to_secs()),
+    0 = emqttd_backend:retained_count().
+
+%%--------------------------------------------------------------------
+%% Backend Test
+%%--------------------------------------------------------------------
+
+backend_subscription(_) ->
+    Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2},
+    Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"#">>, qos = 2},
+    emqttd_backend:add_subscription(Sub1),
+    emqttd_backend:add_subscription(Sub2),
+    [Sub1, Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
+    emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>),
+    [Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
+    emqttd_backend:del_subscriptions(<<"clientId">>),
+    [] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
+
 %%--------------------------------------------------------------------
 %% CLI Group
 %%--------------------------------------------------------------------

+ 21 - 12
test/emqttd_backend_SUITE.erl

@@ -16,21 +16,30 @@
 
 -module(emqttd_backend_SUITE).
 
+-include("emqttd.hrl").
+
 -compile(export_all).
 
-all() -> [{group, retainer}].
+all() -> [{group, subscription}].
 
-groups() -> [{retainer, [], [t_retain]}].
+groups() -> [{subscription, [], [add_del_subscription]}].
 
-init_per_group(retainer, _Config) ->
+init_per_suite(Config) ->
     ok = emqttd_mnesia:ensure_started(),
-    emqttd_retainer:mnesia(boot),
-    emqttd_retainer:mnesia(copy).
-
-end_per_group(retainer, _Config) ->
-    ok;
-end_per_group(_Group, _Config) ->
-    ok.
-
-t_retain(_) -> ok.
+    emqttd_backend:mnesia(boot),
+    emqttd_backend:mnesia(copy),
+    Config.
+
+end_per_suite(_Config) ->
+    emqttd_mnesia:ensure_stopped().
+
+add_del_subscription(_) ->
+    Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2},
+    Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 1},
+    ok = emqttd_backend:add_subscription(Sub1),
+    {error, already_existed} = emqttd_backend:add_subscription(Sub1),
+    ok = emqttd_backend:add_subscription(Sub2),
+    [Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
+    emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>),
+    [] = emqttd_backend:lookup_subscriptions(<<"clientId">>).