Feng Lee 9 lat temu
rodzic
commit
9a4c44913e

+ 1 - 1
Makefile

@@ -19,7 +19,7 @@ TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
 EUNIT_OPTS = verbose
 # EUNIT_ERL_OPTS =
 
-CT_SUITES = emqttd emqttd_access emqttd_backend emqttd_lib emqttd_mod emqttd_net \
+CT_SUITES = emqttd emqttd_access emqttd_lib emqttd_mod emqttd_net \
 			emqttd_mqueue emqttd_protocol emqttd_topic emqttd_trie
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqttd_ct@127.0.0.1
 

+ 28 - 12
src/emqttd.erl

@@ -14,9 +14,9 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqttd).
+%% Facade Module for The EMQTT Broker
 
--author("Feng Lee <feng@emqtt.io>").
+-module(emqttd).
 
 -include("emqttd.hrl").
 
@@ -29,7 +29,8 @@
          unsubscribe/1, unsubscribe/2]).
 
 %% PubSub Management API
--export([topics/0, subscribers/1, subscriptions/1]).
+-export([setqos/3, topics/0, subscriptions/1, subscribers/1,
+         is_subscribed/2, subscriber_down/1]).
 
 %% Hooks API
 -export([hook/4, hook/3, unhook/2, run_hooks/3]).
@@ -37,7 +38,7 @@
 %% Debug API
 -export([dump/0]).
 
--type(subscriber() :: pid() | binary() | function()).
+-type(subscriber() :: pid() | binary()).
 
 -type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}).
 
@@ -81,7 +82,7 @@ is_running(Node) ->
     end.
 
 %%--------------------------------------------------------------------
-%% PubSub APIs that wrap emqttd_pubsub
+%% PubSub APIs
 %%--------------------------------------------------------------------
 
 %% @doc Subscribe
@@ -95,11 +96,12 @@ subscribe(Topic, Subscriber) ->
 
 -spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | pubsub_error()).
 subscribe(Topic, Subscriber, Options) ->
-    emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber, Options).
+    with_pubsub(fun(PS) -> PS:subscribe(iolist_to_binary(Topic), Subscriber, Options) end).
 
 %% @doc Publish MQTT Message
 -spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
-publish(Msg) -> emqttd_server:publish(Msg).
+publish(Msg) ->
+    with_pubsub(fun(PS) -> PS:publish(Msg) end).
 
 %% @doc Unsubscribe
 -spec(unsubscribe(iodata()) -> ok | pubsub_error()).
@@ -108,18 +110,32 @@ unsubscribe(Topic) ->
 
 -spec(unsubscribe(iodata(), subscriber()) -> ok | pubsub_error()).
 unsubscribe(Topic, Subscriber) ->
-    emqttd_server:unsubscribe(iolist_to_binary(Topic), Subscriber).
+    with_pubsub(fun(PS) -> PS:unsubscribe(iolist_to_binary(Topic), Subscriber) end).
+
+-spec(setqos(binary(), subscriber(), mqtt_qos()) -> ok).
+setqos(Topic, Subscriber, Qos) ->
+    with_pubsub(fun(PS) -> PS:setqos(iolist_to_binary(Topic), Subscriber, Qos) end).
 
 -spec(topics() -> [binary()]).
 topics() -> emqttd_router:topics().
 
 -spec(subscribers(iodata()) -> list(subscriber())).
 subscribers(Topic) ->
-    emqttd_pubsub:subscribers(iolist_to_binary(Topic)).
+    with_pubsub(fun(PS) -> PS:subscribers(iolist_to_binary(Topic)) end).
 
 -spec(subscriptions(subscriber()) -> [{binary(), suboption()}]).
 subscriptions(Subscriber) ->
-    emqttd_server:get_subscriptions(Subscriber).
+    with_pubsub(fun(PS) -> PS:subscriptions(Subscriber) end).
+
+-spec(is_subscribed(iodata(), subscriber()) -> boolean()).
+is_subscribed(Topic, Subscriber) ->
+    with_pubsub(fun(PS) -> PS:is_subscribed(iolist_to_binary(Topic), Subscriber) end).
+
+-spec(subscriber_down(subscriber()) -> ok).
+subscriber_down(Subscriber) ->
+    with_pubsub(fun(PS) -> PS:subscriber_down(Subscriber) end).
+
+with_pubsub(Fun) -> Fun(env(pubsub_server, emqttd_server)).
 
 %%--------------------------------------------------------------------
 %% Hooks API
@@ -141,9 +157,9 @@ unhook(Hook, Function) ->
 run_hooks(Hook, Args, Acc) ->
     emqttd_hook:run(Hook, Args, Acc).
 
-
 %%--------------------------------------------------------------------
 %% Debug
 %%--------------------------------------------------------------------
 
-dump() -> lists:append([emqttd_server:dump(), emqttd_router:dump()]).
+dump() -> with_pubsub(fun(PS) -> lists:append([PS:dump(), emqttd_router:dump()]) end).
+

+ 4 - 10
src/emqttd_cli.erl

@@ -170,7 +170,7 @@ if_client(ClientId, Fun) ->
 %%--------------------------------------------------------------------
 %% @doc Sessions Command
 sessions(["list"]) ->
-    [sessions(["list", Type]) || Type <- ["persistent", "transient"]];
+    dump(mqtt_local_session);
 
 sessions(["list", "persistent"]) ->
     dump(mqtt_persistent_session);
@@ -179,15 +179,9 @@ sessions(["list", "transient"]) ->
     dump(mqtt_transient_session);
 
 sessions(["show", ClientId]) ->
-    MP = {{bin(ClientId), '_'}, '_'},
-    case {ets:match_object(mqtt_transient_session,  MP),
-          ets:match_object(mqtt_persistent_session, MP)} of
-        {[], []} ->
-            ?PRINT_MSG("Not Found.~n");
-        {[SessInfo], _} ->
-            print(SessInfo);
-        {_, [SessInfo]} ->
-            print(SessInfo)
+    case ets:lookup(mqtt_local_session, bin(ClientId)) of
+        []         -> ?PRINT_MSG("Not Found.~n");
+        [SessInfo] -> print(SessInfo)
     end;
 
 sessions(_) ->

+ 15 - 9
src/emqttd_pubsub_sup.erl

@@ -45,24 +45,30 @@ init([Env]) ->
     %% Create ETS Tables
     [create_tab(Tab) || Tab <- [mqtt_subproperty, mqtt_subscriber, mqtt_subscription]],
 
-    %% PubSub Pool
-    PubSubMFA = {emqttd_pubsub, start_link, [Env]},
-    PubSubPool = pool_sup(pubsub, Env, PubSubMFA),
+    {ok, { {one_for_all, 10, 3600}, [pool_sup(pubsub, Env), pool_sup(server, Env)]} }.
 
-    %% Server Pool
-    ServerMFA = {emqttd_server, start_link, [Env]},
-    ServerPool = pool_sup(server, Env, ServerMFA),
-
-    {ok, { {one_for_all, 10, 3600}, [PubSubPool, ServerPool]} }.
+%%--------------------------------------------------------------------
+%% Pool
+%%--------------------------------------------------------------------
 
 pool_size(Env) ->
     Schedulers = erlang:system_info(schedulers),
     proplists:get_value(pool_size, Env, Schedulers).
 
-pool_sup(Name, Env, MFA) ->
+pool_sup(Name, Env) ->
     Pool = list_to_atom(atom_to_list(Name) ++ "_pool"),
+    MFA = {adapter(Name), start_link, [Env]},
     emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]).
 
+%%--------------------------------------------------------------------
+%% Adapter
+%%--------------------------------------------------------------------
+
+adapter(server) ->
+    emqttd:env(pubsub_server, emqttd_server);
+adapter(pubsub) ->
+    emqttd:env(pubsub_adapter, emqttd_pubsub).
+
 %%--------------------------------------------------------------------
 %% Create PubSub Tables
 %%--------------------------------------------------------------------

+ 11 - 7
src/emqttd_server.erl

@@ -37,7 +37,8 @@
          async_unsubscribe/1, async_unsubscribe/2]).
 
 %% Management API.
--export([setqos/3, is_subscribed/2, get_subscriptions/1, subscriber_down/1]).
+-export([setqos/3, subscriptions/1, subscribers/1, is_subscribed/2,
+         subscriber_down/1]).
 
 %% Debug API
 -export([dump/0]).
@@ -131,12 +132,8 @@ async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
 setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
     call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
 
--spec(is_subscribed(binary(), emqttd:subscriber()) -> boolean()).
-is_subscribed(Topic, Subscriber) when is_binary(Topic) ->
-    ets:member(mqtt_subproperty, {Topic, Subscriber}).
-
--spec(get_subscriptions(emqttd:subscriber()) -> [{binary(), list()}]).
-get_subscriptions(Subscriber) ->
+-spec(subscriptions(emqttd:subscriber()) -> [{binary(), list(emqttd:suboption())}]).
+subscriptions(Subscriber) ->
     lists:map(fun({_, Topic}) ->
                 subscription(Topic, Subscriber)
         end, ets:lookup(mqtt_subscription, Subscriber)).
@@ -144,6 +141,13 @@ get_subscriptions(Subscriber) ->
 subscription(Topic, Subscriber) ->
     {Topic, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)}.
 
+subscribers(Topic) -> emqttd_pubsub:subscribers(Topic).
+
+-spec(is_subscribed(binary(), emqttd:subscriber()) -> boolean()).
+is_subscribed(Topic, Subscriber) when is_binary(Topic) ->
+    ets:member(mqtt_subproperty, {Topic, Subscriber}).
+
+-spec(subscriber_down(emqttd:subscriber()) -> ok).
 subscriber_down(Subscriber) ->
     cast(pick(Subscriber), {subscriber_down, Subscriber}).
 

+ 4 - 5
src/emqttd_session.erl

@@ -297,7 +297,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     =
                             ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
                             SubDict;
                         {ok, OldQos} ->
-                            emqttd_server:setqos(Topic, ClientId, Qos),
+                            emqttd:setqos(Topic, ClientId, Qos),
                             ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
                             dict:store(Topic, Qos, SubDict);
                         error ->
@@ -328,8 +328,8 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id     = ClientId,
             Subscriptions1 = lists:foldl(
                 fun(Topic, SubDict) ->
                     case dict:find(Topic, SubDict) of
-                        {ok, Qos} ->
-                            emqttd:unsubscribe(ClientId, Topic, Qos),
+                        {ok, _Qos} ->
+                            emqttd:unsubscribe(ClientId, Topic),
                             dict:erase(Topic, SubDict);
                         error ->
                             SubDict
@@ -532,8 +532,7 @@ handle_info(Info, Session) ->
     ?UNEXPECTED_INFO(Info, Session).
 
 terminate(_Reason, #session{client_id = ClientId}) ->
-    %%TODO: ...
-    emqttd_server:subscriber_down(ClientId),
+    emqttd:subscriber_down(ClientId),
     emqttd_sm:unregister_session(ClientId).
 
 code_change(_OldVsn, Session, _Extra) ->

+ 35 - 14
src/emqttd_topic.erl

@@ -16,17 +16,20 @@
 
 -module(emqttd_topic).
 
+-import(lists, [reverse/1]).
 -export([match/2, validate/1, triples/1, words/1, wildcard/1]).
 
--export([join/1, feed_var/3, is_queue/1, systop/1]).
+-export([join/1, feed_var/3, systop/1]).
 
--type topic() :: binary().
+-export([strip/1, strip/2]).
 
--type word()   :: '' | '+' | '#' | binary().
+-type(topic() :: binary()).
 
--type words()  :: list(word()).
+-type(word()   :: '' | '+' | '#' | binary()).
 
--type triple() :: {root | binary(), word(), binary()}.
+-type(words()  :: list(word())).
+
+-type(triple() :: {root | binary(), word(), binary()}).
 
 -export_type([topic/0, word/0, triple/0]).
 
@@ -111,7 +114,7 @@ triples(Topic) when is_binary(Topic) ->
     triples(words(Topic), root, []).
 
 triples([], _Parent, Acc) ->
-    lists:reverse(Acc);
+    reverse(Acc);
 
 triples([W|Words], Parent, Acc) ->
     Node = join(Parent, W),
@@ -137,13 +140,6 @@ word(<<"+">>) -> '+';
 word(<<"#">>) -> '#';
 word(Bin)     -> Bin.
 
-%% @doc Queue is a special topic name that starts with "$queue/"
--spec(is_queue(topic()) -> boolean()).
-is_queue(<<"$queue/", _Queue/binary>>) ->
-    true;
-is_queue(_) ->
-    false.
-
 %% @doc '$SYS' Topic.
 systop(Name) when is_atom(Name) ->
     list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name]));
@@ -155,7 +151,7 @@ systop(Name) when is_binary(Name) ->
 feed_var(Var, Val, Topic) ->
     feed_var(Var, Val, words(Topic), []).
 feed_var(_Var, _Val, [], Acc) ->
-    join(lists:reverse(Acc));
+    join(reverse(Acc));
 feed_var(Var, Val, [Var|Words], Acc) ->
     feed_var(Var, Val, Words, [Val|Acc]);
 feed_var(Var, Val, [W|Words], Acc) ->
@@ -175,3 +171,28 @@ join(Words) ->
                 end, {true, <<>>}, [bin(W) || W <- Words]),
     Bin.
 
+-spec(strip(topic()) -> {topic(), [local | {share, binary()}]}).
+strip(Topic) when is_binary(Topic) ->
+    strip(Topic, []).
+
+strip(Topic = <<"$local/", Topic1/binary>>, Options) ->
+    case lists:member(local, Options) of
+        true  -> error({invalid_topic, Topic});
+        false -> strip(Topic1, [local | Options])
+    end;
+
+strip(Topic = <<"$queue/", Topic1/binary>>, Options) ->
+    case lists:keyfind(share, 1, Options) of
+        {share, _} -> error({invalid_topic, Topic});
+        false      -> strip(Topic1, [{share, '$queue'} | Options])
+    end;
+
+strip(Topic = <<"$share/", Topic1/binary>>, Options) ->
+    case lists:keyfind(share, 1, Options) of
+        {share, _} -> error({invalid_topic, Topic});
+        false      -> [Share, Topic2] = binary:split(Topic1, <<"/">>),
+                      {Topic2, [{share, Share} | Options]}
+    end;
+
+strip(Topic, Options) -> {Topic, Options}.
+

+ 18 - 64
test/emqttd_SUITE.erl

@@ -32,19 +32,16 @@ all() ->
      {group, metrics},
      {group, stats},
      {group, hook},
-     {group, backend},
+     %%{group, backend},
      {group, cli}].
 
 groups() ->
     [{protocol, [sequence],
       [mqtt_connect]},
      {pubsub, [sequence],
-      [create_topic,
-       create_subscription,
-       subscribe_unsubscribe,
+      [subscribe_unsubscribe,
        publish, pubsub,
-       'pubsub#', 'pubsub+',
-       pubsub_queue]},
+       'pubsub#', 'pubsub+']},
      {router, [sequence],
       [router_add_del,
        router_print,
@@ -65,7 +62,7 @@ groups() ->
        dispatch_retained_messages,
        expire_retained_messages]},
      {backend, [sequence],
-      [backend_subscription]},
+      []},
      {cli, [sequence],
       [ctl_register_cmd,
        cli_status,
@@ -115,26 +112,13 @@ connect_broker_(Packet, RecvSize) ->
 %% PubSub Test
 %%--------------------------------------------------------------------
 
-create_topic(_) ->
-    ok = emqttd:create(topic, <<"topic/create">>),
-    ok = emqttd:create(topic, <<"topic/create2">>),
-    [#mqtt_topic{topic = <<"topic/create">>, flags = [static]}]
-        = emqttd:lookup(topic, <<"topic/create">>).
-
-create_subscription(_) ->
-    ok = emqttd:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}),
-    [#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}]
-        = emqttd_backend:lookup_subscriptions(<<"clientId">>),
-    ok = emqttd_backend:del_subscriptions(<<"clientId">>),
-    ?assertEqual([], emqttd_backend:lookup_subscriptions(<<"clientId">>)).
-
 subscribe_unsubscribe(_) ->
-    ok = emqttd:subscribe(<<"topic/subunsub">>),
-    ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub1">>, 1),
-    ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub2">>, 2),
-    ok = emqttd:unsubscribe(<<"topic/subunsub">>),
-    ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub1">>, 1),
-    ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub2">>, 2).
+    ok = emqttd:subscribe(<<"topic">>, <<"clientId">>),
+    ok = emqttd:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]),
+    ok = emqttd:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]),
+    ok = emqttd:unsubscribe(<<"topic">>, <<"clientId">>),
+    ok = emqttd:unsubscribe(<<"topic/1">>, <<"clientId">>),
+    ok = emqttd:unsubscribe(<<"topic/2">>, <<"clientId">>).
 
 publish(_) ->
     Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>),
@@ -145,11 +129,11 @@ publish(_) ->
 
 pubsub(_) ->
     Self = self(),
-    emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 1}),
-    emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 2}),
+    ok = emqttd:subscribe(<<"a/b/c">>, Self, [{qos, 1}]),
+    ?assertMatch({error, _}, emqttd:subscribe(<<"a/b/c">>, Self, [{qos, 2}])),
     timer:sleep(10),
-    [{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self),
-    [{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>),
+    [{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self),
+    [{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>),
     emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
     ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
     spawn(fun() ->
@@ -175,22 +159,6 @@ pubsub(_) ->
     ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end),
     emqttd:unsubscribe(<<"a/+/+">>).
 
-pubsub_queue(_) ->
-    Self = self(), Q = <<"$queue/abc">>,
-    SubFun = fun() ->
-               emqttd:subscribe(Q),
-               timer:sleep(10),
-               {ok, Msgs} = loop_recv(Q, 10),
-               Self ! {recv, self(), Msgs}
-             end,
-    Sub1 = spawn(SubFun), Sub2 = spawn(SubFun),
-    timer:sleep(5),
-    emqttd:publish(emqttd_message:make(ct, Q, <<"1", Q/binary>>)),
-    emqttd:publish(emqttd_message:make(ct, Q, <<"2", Q/binary>>)),
-    emqttd:publish(emqttd_message:make(ct, Q, <<"3", Q/binary>>)),
-    ?assert(receive {recv, Sub1, Msgs1} -> length(Msgs1) < 3 end),
-    ?assert(receive {recv, Sub2, Msgs2} -> length(Msgs2) < 3 end).
-
 loop_recv(Topic, Timeout) ->
     loop_recv(Topic, Timeout, []).
 
@@ -215,15 +183,15 @@ router_add_del(_) ->
             #mqtt_route{topic = <<"#">>,     node = node()},
             #mqtt_route{topic = <<"+/#">>,   node = node()},
             #mqtt_route{topic = <<"a/b/c">>, node = node()}],
-    Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
+    Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)),
 
     %% Batch Add
     emqttd_router:add_routes(Routes),
-    Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
+    Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)),
 
     %% Del
     emqttd_router:del_route(<<"a/b/c">>),
-    [R1, R2] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
+    [R1, R2] = lists:sort(emqttd_router:match(<<"a/b/c">>)),
     {atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]),
 
     %% Batch Del
@@ -231,7 +199,7 @@ router_add_del(_) ->
     emqttd_router:add_route(R3),
     emqttd_router:del_routes([R1, R2]),
     emqttd_router:del_route(R3),
-    [] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)).
+    [] = lists:sort(emqttd_router:match(<<"a/b/c">>)).
 
 router_print(_) ->
     Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()},
@@ -360,20 +328,6 @@ expire_retained_messages(_) ->
     emqttd_backend:expire_messages(emqttd_time:now_to_secs()),
     0 = emqttd_backend:retained_count().
 
-%%--------------------------------------------------------------------
-%% Backend Test
-%%--------------------------------------------------------------------
-
-backend_subscription(_) ->
-    Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2},
-    Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"#">>, qos = 2},
-    emqttd_backend:add_subscription(Sub1),
-    emqttd_backend:add_subscription(Sub2),
-    [Sub1, Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
-    emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>),
-    [Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
-    emqttd_backend:del_subscriptions(<<"clientId">>),
-    [] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
 
 %%--------------------------------------------------------------------
 %% CLI Group

+ 0 - 45
test/emqttd_backend_SUITE.erl

@@ -1,45 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_backend_SUITE).
-
--include("emqttd.hrl").
-
--compile(export_all).
-
-all() -> [{group, subscription}].
-
-groups() -> [{subscription, [], [add_del_subscription]}].
-
-init_per_suite(Config) ->
-    ok = emqttd_mnesia:ensure_started(),
-    emqttd_backend:mnesia(boot),
-    emqttd_backend:mnesia(copy),
-    Config.
-
-end_per_suite(_Config) ->
-    emqttd_mnesia:ensure_stopped().
-
-add_del_subscription(_) ->
-    Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2},
-    Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 1},
-    ok = emqttd_backend:add_subscription(Sub1),
-    {error, already_existed} = emqttd_backend:add_subscription(Sub1),
-    ok = emqttd_backend:add_subscription(Sub2),
-    [Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
-    emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>),
-    [] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
-

+ 18 - 12
test/emqttd_topic_SUITE.erl

@@ -16,18 +16,20 @@
 
 -module(emqttd_topic_SUITE).
 
+-include_lib("eunit/include/eunit.hrl").
+
 %% CT
 -compile(export_all).
 
 -import(emqttd_topic, [wildcard/1, match/2, validate/1, triples/1, join/1,
-                       words/1, systop/1, is_queue/1, feed_var/3]).
+                       words/1, systop/1, feed_var/3, strip/1, strip/2]).
 
 -define(N, 10000).
 
 all() -> [t_wildcard, t_match, t_match2, t_validate, t_triples, t_join,
-          t_words, t_systop, t_is_queue, t_feed_var, t_sys_match, 't_#_match',
+          t_words, t_systop, t_feed_var, t_sys_match, 't_#_match',
           t_sigle_level_validate, t_sigle_level_match, t_match_perf,
-          t_triples_perf].
+          t_triples_perf, t_strip].
 
 t_wildcard(_) ->
     true  = wildcard(<<"a/b/#">>),
@@ -155,21 +157,25 @@ t_join(_) ->
     <<"/ab/cd/ef/">> = join(words(<<"/ab/cd/ef/">>)),
     <<"ab/+/#">> = join(words(<<"ab/+/#">>)).
 
-t_is_queue(_) ->
-    true  = is_queue(<<"$queue/queue">>),
-    false = is_queue(<<"xyz/queue">>).
-
 t_systop(_) ->
     SysTop1 = iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/xyz"]),
-    SysTop1 = systop('xyz'),
+    ?assertEqual(SysTop1, systop('xyz')),
     SysTop2 = iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/abc"]),
-    SysTop2 = systop(<<"abc">>).
+    ?assertEqual(SysTop2,systop(<<"abc">>)).
 
 t_feed_var(_) ->
-    <<"$queue/client/clientId">> = feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>),
-    <<"username/test/client/x">> = feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>),
-    <<"username/test/client/clientId">> = feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>).
+    ?assertEqual(<<"$queue/client/clientId">>, feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)),
+    ?assertEqual(<<"username/test/client/x">>, feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>)),
+    ?assertEqual(<<"username/test/client/clientId">>, feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>)).
 
 long_topic() ->
     iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]).
 
+t_strip(_) ->
+    ?assertEqual({<<"a/b/+/#">>, []}, strip(<<"a/b/+/#">>)),
+    ?assertEqual({<<"topic">>, [{share, '$queue'}]}, strip(<<"$queue/topic">>)),
+    ?assertEqual({<<"topic">>, [{share, <<"group">>}]}, strip(<<"$share/group/topic">>)),
+    ?assertEqual({<<"topic">>, [local]}, strip(<<"$local/topic">>)),
+    ?assertEqual({<<"topic">>, [{share, '$queue'}, local]}, strip(<<"$local/$queue/topic">>)),
+    ?assertEqual({<<"/a/b/c">>, [{share, <<"group">>}, local]}, strip(<<"$local/$share/group//a/b/c">>)).
+