turtled 7 лет назад
Родитель
Сommit
7a1ec580b0
4 измененных файлов с 27 добавлено и 44 удалено
  1. 1 1
      src/emqx_broker.erl
  2. 1 1
      src/emqx_session.erl
  3. 23 40
      test/emqx_broker_SUITE.erl
  4. 2 2
      test/emqx_session_SUITE.erl

+ 1 - 1
src/emqx_broker.erl

@@ -87,7 +87,7 @@ subscribe(Topic) when is_binary(Topic) ->
 
 -spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
 subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
-    subscribe(Topic, SubId, #{});
+    subscribe(Topic, SubId, #{qos => 0});
 subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
     subscribe(Topic, undefined, SubOpts).
 

+ 1 - 1
src/emqx_session.erl

@@ -482,7 +482,7 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
         lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) ->
                             case maps:find(Topic, SubMap) of
                                 {ok, SubOpts} ->
-                                    ok = emqx_broker:unsubscribe(Topic, ClientId),
+                                    ok = emqx_broker:unsubscribe(Topic),
                                     emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]),
                                     {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
                                 error ->

+ 23 - 40
test/emqx_broker_SUITE.erl

@@ -36,7 +36,6 @@ groups() ->
     [
      {pubsub, [sequence], [subscribe_unsubscribe,
                            publish, pubsub,
-                           t_local_subscribe,
                            t_shared_subscribe,
                            dispatch_with_no_sub,
                            'pubsub#', 'pubsub+']},
@@ -61,14 +60,14 @@ subscribe_unsubscribe(_) ->
     ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
     ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }),
     ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }),
-    true = emqx:subscribed(<<"topic">>, <<"clientId">>),
+    true = emqx:subscribed(<<"clientId">>, <<"topic">>),
     Topics = emqx:topics(),
     lists:foreach(fun(Topic) ->
                       ?assert(lists:member(Topic, Topics))
                   end, Topics),
-    ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
-    ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
-    ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
+    ok = emqx:unsubscribe(<<"topic">>),
+    ok = emqx:unsubscribe(<<"topic/1">>),
+    ok = emqx:unsubscribe(<<"topic/2">>).
 
 publish(_) ->
     Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
@@ -85,18 +84,25 @@ dispatch_with_no_sub(_) ->
 pubsub(_) ->
     true = emqx:is_running(node()),
     Self = self(),
-    Subscriber = {Self, <<"clientId">>},
-    ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }),
-    #{qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
-    #{qos := 1} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
-    true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}),
-    #{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
-    ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }),
+    Subscriber = <<"clientId">>,
+    ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }),
+    #{qos := 1} = ets:lookup_element(emqx_suboption, {Self, <<"a/b/c">>}, 2),
+    #{qos := 1} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>),
+    true = emqx_broker:set_subopts(<<"a/b/c">>, #{qos => 0}),
+    #{qos := 0} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>),
+    ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }),
     %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
     timer:sleep(10),
-    [{Self, <<"clientId">>}] = emqx_broker:subscribers(<<"a/b/c">>),
+    [Self] = emqx_broker:subscribers(<<"a/b/c">>),
     emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
-    ?assert(receive {dispatch, <<"a/b/c">>, _ } -> true; P -> ct:log("Receive Message: ~p~n",[P]) after 2 -> false end),
+    ?assert(
+        receive {dispatch, <<"a/b/c">>, _ } ->
+                true;
+            P ->
+                ct:log("Receive Message: ~p~n",[P])
+        after 2 ->
+            false
+        end),
     spawn(fun() ->
             emqx:subscribe(<<"a/b/c">>),
             emqx:subscribe(<<"c/d/e">>),
@@ -106,38 +112,15 @@ pubsub(_) ->
     timer:sleep(20),
     emqx:unsubscribe(<<"a/b/c">>).
 
-t_local_subscribe(_) ->
-    ok = emqx:subscribe(<<"$local/topic0">>),
-    ok = emqx:subscribe(<<"$local/topic1">>, <<"clientId">>),
-    ok = emqx:subscribe(<<"$local/topic2">>, <<"clientId">>, #{ qos => 2 }),
-    timer:sleep(10),
-    ?assertEqual([{self(), undefined}], emqx:subscribers("$local/topic0")),
-    ?assertEqual([{self(), <<"clientId">>}], emqx:subscribers("$local/topic1")),
-    ?assertEqual([{<<"$local/topic1">>, #{ qos => 0 }},
-                  {<<"$local/topic2">>, #{ qos => 2 }}],
-                 emqx:subscriptions({self(), <<"clientId">>})),
-    ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
-    ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
-    ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"clientId">>)),
-    ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"clientId">>)),
-    ?assertEqual([], emqx:subscribers("topic1")),
-    ?assertEqual([], emqx:subscriptions({self(), <<"clientId">>})).
-
 t_shared_subscribe(_) ->
-    emqx:subscribe("$local/$share/group1/topic1"),
     emqx:subscribe("$share/group2/topic2"),
     emqx:subscribe("$queue/topic3"),
     timer:sleep(10),
-    ct:log("share subscriptions: ~p~n", [emqx:subscriptions({self(), undefined})]),
-    ?assertEqual([{self(), undefined}], emqx:subscribers(<<"$local/$share/group1/topic1">>)),
-    ?assertEqual([{<<"$local/$share/group1/topic1">>, #{qos => 0}},
-                  {<<"$queue/topic3">>, #{qos => 0}},
-                  {<<"$share/group2/topic2">>, #{qos => 0}}],
-                 lists:sort(emqx:subscriptions({self(), undefined}))),
-    emqx:unsubscribe("$local/$share/group1/topic1"),
+    ct:log("share subscriptions: ~p~n", [emqx:subscriptions(self())]),
+    ?assertEqual(2, length(emqx:subscriptions(self()))),
     emqx:unsubscribe("$share/group2/topic2"),
     emqx:unsubscribe("$queue/topic3"),
-    ?assertEqual([], lists:sort(emqx:subscriptions(self()))).
+    ?assertEqual(0, length(emqx:subscriptions(self()))).
 
 'pubsub#'(_) ->
     emqx:subscribe(<<"a/#">>),

+ 2 - 2
test/emqx_session_SUITE.erl

@@ -53,7 +53,7 @@ t_session_all(_) ->
     emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 2}}]),
     emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 1}}]),
     timer:sleep(200),
-    [{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}),
+    [{<<"topic">>, _}] = emqx:subscriptions(SPid),
     emqx_session:publish(SPid, 1, Message1),
     timer:sleep(200),
     {publish, 1, _} = emqx_mock_client:get_last_message(ConnPid),
@@ -76,5 +76,5 @@ t_session_all(_) ->
     1 = proplists:get_value(subscriptions_count, Stats),
     emqx_session:unsubscribe(SPid, [<<"topic">>]),
     timer:sleep(200),
-    [] = emqx:subscriptions({SPid, <<"clientId">>}),
+    [] = emqx:subscriptions(SPid),
     emqx_mock_client:close_session(ConnPid).