Gilbert Wong 7 лет назад
Родитель
Сommit
ce3f2e4d9e
4 измененных файлов с 63 добавлено и 46 удалено
  1. 1 1
      src/emqx_alarm_mgr.erl
  2. 5 2
      src/emqx_time.erl
  3. 48 42
      test/emqx_broker_SUITE.erl
  4. 9 1
      test/emqx_mock_client.erl

+ 1 - 1
src/emqx_alarm_mgr.erl

@@ -81,7 +81,7 @@ handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)->
 handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #state{alarms = Alarms}) ->
     case encode_alarm(Alarm) of
         {ok, Json} ->
-            ok = emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json));
+            emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json));
         {error, Reason} ->
             emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
     end,

+ 5 - 2
src/emqx_time.erl

@@ -14,7 +14,7 @@
 
 -module(emqx_time).
 
--export([seed/0, now_secs/0, now_ms/0, now_ms/1]).
+-export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1]).
 
 seed() ->
     rand:seed(exsplus, erlang:timestamp()).
@@ -22,8 +22,11 @@ seed() ->
 now_secs() ->
     erlang:system_time(second).
 
+now_secs({MegaSecs, Secs, _MicroSecs}) ->
+    MegaSecs * 1000000 + Secs.
+
 now_ms() ->
     erlang:system_time(millisecond).
 
 now_ms({MegaSecs, Secs, MicroSecs}) ->
-     (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).
+     (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).

+ 48 - 42
test/emqx_broker_SUITE.erl

@@ -64,12 +64,12 @@ end_per_suite(_Config) ->
 %%--------------------------------------------------------------------
 
 subscribe_unsubscribe(_) ->
-    ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
-    ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]),
-    ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]),
-    ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
-    ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
-    ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
+    ok = emqx:subscribe(<<"topic">>, "clientId"),
+    ok = emqx:subscribe(<<"topic/1">>, "clientId", #{ qos => 1 }),
+    ok = emqx:subscribe(<<"topic/2">>, "clientId", #{ qos => 2 }),
+    ok = emqx:unsubscribe(<<"topic">>, "clientId"),
+    ok = emqx:unsubscribe(<<"topic/1">>, "clientId"),
+    ok = emqx:unsubscribe(<<"topic/2">>, "clientId").
 
 publish(_) ->
     Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
@@ -80,13 +80,17 @@ publish(_) ->
 
 pubsub(_) ->
     Self = self(),
-    ok = emqx:subscribe(<<"a/b/c">>, Self, [{qos, 1}]),
-    ?assertMatch({error, _}, emqx:subscribe(<<"a/b/c">>, Self, [{qos, 2}])),
+    Subscriber = {Self, <<"clientId">>},
+    ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }),
+    #{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
+    ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }),
+    #{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
+    %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
     timer:sleep(10),
-    [{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self),
-    [{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>),
+    [{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber),
+    [{Self, <<"clientId">>}] = emqx_broker:subscribers(<<"a/b/c">>),
     emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
-    ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
+    ?assert(receive {dispatch, <<"a/b/c">>, _ } -> true; P -> ct:log("Receive Message: ~p~n",[P]) after 2 -> false end),
     spawn(fun() ->
             emqx:subscribe(<<"a/b/c">>),
             emqx:subscribe(<<"c/d/e">>),
@@ -97,32 +101,33 @@ pubsub(_) ->
     emqx:unsubscribe(<<"a/b/c">>).
 
 t_local_subscribe(_) ->
-    ok = emqx:subscribe("$local/topic0"),
-    ok = emqx:subscribe("$local/topic1", <<"x">>),
-    ok = emqx:subscribe("$local/topic2", <<"x">>, [{qos, 2}]),
+    ok = emqx:subscribe(<<"$local/topic0">>),
+    ok = emqx:subscribe(<<"$local/topic1">>, "clientId"),
+    ok = emqx:subscribe(<<"$local/topic2">>, "clientId", #{ qos => 2 }),
     timer:sleep(10),
-    ?assertEqual([self()], emqx:subscribers("$local/topic0")),
-    ?assertEqual([{<<"x">>, self()}], emqx:subscribers("$local/topic1")),
-    ?assertEqual([{{<<"x">>, self()}, <<"$local/topic1">>, []},
-                  {{<<"x">>, self()}, <<"$local/topic2">>, [{qos,2}]}],
-                 emqx:subscriptions(<<"x">>)),
+    ?assertEqual([{self(), undefined}], emqx:subscribers("$local/topic0")),
+    ?assertEqual([{self(), <<"clientId">>}], emqx:subscribers("$local/topic1")),
+    ?assertEqual([{<<"$local/topic1">>, #{}},
+                  {<<"$local/topic2">>, #{ qos => 2 }}],
+                 emqx:subscriptions({self(), <<"clientId">>})),
     ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
-    ?assertMatch({error, {subscription_not_found, _}}, emqx:unsubscribe("$local/topic0")),
-    ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"x">>)),
-    ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"x">>)),
+    ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
+    ?assertEqual(ok, emqx:unsubscribe("$local/topic1", "clientId")),
+    ?assertEqual(ok, emqx:unsubscribe("$local/topic2", "clientId")),
     ?assertEqual([], emqx:subscribers("topic1")),
-    ?assertEqual([], emqx:subscriptions(<<"x">>)).
+    ?assertEqual([], emqx:subscriptions({self(), <<"clientId">>})).
 
 t_shared_subscribe(_) ->
     emqx:subscribe("$local/$share/group1/topic1"),
     emqx:subscribe("$share/group2/topic2"),
     emqx:subscribe("$queue/topic3"),
     timer:sleep(10),
-    ?assertEqual([self()], emqx:subscribers(<<"$local/$share/group1/topic1">>)),
-    ?assertEqual([{self(), <<"$local/$share/group1/topic1">>, []},
-                  {self(), <<"$queue/topic3">>, []},
-                  {self(), <<"$share/group2/topic2">>, []}],
-                 lists:sort(emqx:subscriptions(self()))),
+    ct:log("share subscriptions: ~p~n", [emqx:subscriptions({self(), undefined})]),
+    ?assertEqual([{self(), undefined}], emqx:subscribers(<<"$local/$share/group1/topic1">>)),
+    ?assertEqual([{<<"$local/$share/group1/topic1">>, #{}},
+                  {<<"$queue/topic3">>, #{}},
+                  {<<"$share/group2/topic2">>, #{}}],
+                 lists:sort(emqx:subscriptions({self(), undefined}))),
     emqx:unsubscribe("$local/$share/group1/topic1"),
     emqx:unsubscribe("$share/group2/topic2"),
     emqx:unsubscribe("$queue/topic3"),
@@ -146,17 +151,18 @@ t_shared_subscribe(_) ->
 %% Session Group
 %%--------------------------------------------------------------------
 start_session(_) ->
-    {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>),
-    {ok, SessPid} = emqx_mock_client:start_session(ClientPid),
-    Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
-    Message1 = Message#message{id = 1},
-    emqx_session:publish(SessPid, Message1),
-    emqx_session:pubrel(SessPid, 1),
-    emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]),
+    ClientId = <<"clientId">>,
+    {ok, ClientPid} = emqx_mock_client:start_link(ClientId),
+    {ok, SessPid} = emqx_mock_client:open_session(ClientPid, ClientId, internal),
+    Message1 = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
+    emqx_session:publish(SessPid, 1, Message1),
+    emqx_session:pubrel(SessPid, 2, reasoncode),
+    emqx_session:subscribe(SessPid, [{<<"topic/session">>, #{qos => 2}}]),
     Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
-    emqx_session:publish(SessPid, Message2),
+    emqx_session:publish(SessPid, 3, Message2),
     emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]),
-    emqx_mock_client:stop(ClientPid).
+    %% emqx_mock_client:stop(ClientPid).
+    emqx_mock_client:close_session(ClientPid, SessPid).
 
 %%--------------------------------------------------------------------
 %% Broker Group
@@ -231,10 +237,10 @@ hook_fun8(arg, initArg) -> stop.
 
 set_alarms(_) ->
     AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
-    emqx_alarm:set_alarm(AlarmTest),
-    Alarms = emqx_alarm:get_alarms(),
+    emqx_alarm_mgr:set_alarm(AlarmTest),
+    Alarms = emqx_alarm_mgr:get_alarms(),
+    ct:log("Alarms Length: ~p ~n", [length(Alarms)]),
     ?assertEqual(1, length(Alarms)),
-    emqx_alarm:clear_alarm(<<"1">>),
-    [] = emqx_alarm:get_alarms().
-
+    emqx_alarm_mgr:clear_alarm(<<"1">>),
+    [] = emqx_alarm_mgr:get_alarms().
 

+ 9 - 1
test/emqx_mock_client.erl

@@ -18,7 +18,7 @@
 
 -behaviour(gen_server).
 
--export([start_link/1, open_session/3, stop/1]).
+-export([start_link/1, open_session/3, close_session/2, stop/1]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
@@ -31,6 +31,9 @@ start_link(ClientId) ->
 open_session(ClientPid, ClientId, Zone) ->
     gen_server:call(ClientPid, {start_session, ClientPid, ClientId, Zone}).
 
+close_session(ClientPid, SessPid) ->
+    gen_server:call(ClientPid, {stop_session, SessPid}).
+
 stop(CPid) ->
     gen_server:call(CPid, stop).
 
@@ -55,6 +58,11 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
                              client_pid = ClientPid
                             }};
 
+handle_call({stop_session, SessPid}, _From, State) ->
+    unlink(SessPid),
+    emqx_sm:close_session(SessPid),
+    {stop, normal, ok, State};
+
 handle_call(stop, _From, State) ->
     {stop, normal, ok, State};