Преглед изворни кода

Fix the EXIT outputs in Travis CI (#2154)

* Fix the Exit in testcases

* Fix Exit in emqx_mod_sup_SUITE

* Update testcases for log_tracer

* Fix Exit in emqx_protocol_SUITE

* Add will_acl_check

* Fix more Exits
Shawn пре 7 година
родитељ
комит
b8929a46c1

+ 9 - 5
src/emqx_client.erl

@@ -703,7 +703,7 @@ waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode,
     case take_call(connect, State) of
         {value, #call{from = From}, _State} ->
             Reply = {error, {Reason, Properties}},
-            {stop_and_reply, Reason, [{reply, From, Reply}]};
+            {stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]};
         false -> {stop, connack_error}
     end;
 
@@ -1004,20 +1004,24 @@ handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State)
 
 handle_event(info, {Error, _Sock, Reason}, _StateName, State)
     when Error =:= tcp_error; Error =:= ssl_error ->
-    {stop, Reason, State};
+    emqx_logger:error("[~p] ~p, Reason: ~p", [?MODULE, Error, Reason]),
+    {stop, {shutdown, Reason}, State};
 
 handle_event(info, {Closed, _Sock}, _StateName, State)
     when Closed =:= tcp_closed; Closed =:= ssl_closed ->
+    emqx_logger:debug("[~p] ~p", [?MODULE, Closed]),
     {stop, {shutdown, Closed}, State};
 
 handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) ->
-    {stop, Reason, State};
+    emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]),
+    {stop, {shutdown, Reason}, State};
 
 handle_event(info, {inet_reply, _Sock, ok}, _, State) ->
     {keep_state, State};
 
 handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
-    {stop, Reason, State};
+    emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]),
+    {stop, {shutdown, Reason}, State};
 
 handle_event(EventType, EventContent, StateName, StateData) ->
     emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)",
@@ -1305,7 +1309,7 @@ hosts(#state{hosts = Hosts}) -> Hosts.
 send_puback(Packet, State) ->
     case send(Packet, State) of
         {ok, NewState}  -> {keep_state, NewState};
-        {error, Reason} -> {stop, Reason}
+        {error, Reason} -> {stop, {shutdown, Reason}}
     end.
 
 send(Msg, State) when is_record(Msg, mqtt_msg) ->

+ 1 - 1
src/emqx_protocol.erl

@@ -820,7 +820,7 @@ check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) ->
     case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of
         allow -> ok;
         deny ->
-            ?LOG(warning, "Cannot publish will message to ~p for acl checking failed", [WillTopic]),
+            ?LOG(warning, "Will message (to ~s) validation failed, acl denied", [WillTopic]),
             {error, ?RC_UNSPECIFIED_ERROR}
     end.
 

+ 1 - 1
src/emqx_session.erl

@@ -675,7 +675,7 @@ code_change(_OldVsn, State, _Extra) ->
 maybe_shutdown(undefined, _Reason) ->
     ok;
 maybe_shutdown(Pid, normal) ->
-     Pid ! {shutdown, normal};
+    Pid ! {shutdown, normal};
 maybe_shutdown(Pid, Reason) ->
     exit(Pid, Reason).
 

+ 2 - 3
test/emqx_access_SUITE_data/acl_deny_action.conf

@@ -1,4 +1,3 @@
-
 {deny, {user, "emqx"}, pubsub, ["acl_deny_action"]}.
-
-{allow, all}.
+{deny, {user, "pub_deny"}, publish, ["pub_deny"]}.
+{allow, all}.

+ 25 - 0
test/emqx_ct_broker_helpers.erl

@@ -124,3 +124,28 @@ client_ssl_twoway() ->
 
 client_ssl() ->
     ?CIPHERS ++ [{reuse_sessions, true}].
+
+wait_mqtt_payload(Payload) ->
+    receive
+        {publish, #{payload := Payload}} ->
+            ct:pal("OK - received msg: ~p~n", [Payload])
+    after 1000 ->
+        ct:fail({timeout, Payload, {msg_box, flush()}})
+    end.
+
+not_wait_mqtt_payload(Payload) ->
+    receive
+        {publish, #{payload := Payload}} ->
+            ct:fail({received, Payload})
+    after 1000 ->
+        ct:pal("OK - msg ~p is not received", [Payload])
+    end.
+
+flush() ->
+    flush([]).
+flush(Msgs) ->
+    receive
+        M -> flush([M|Msgs])
+    after
+        0 -> lists:reverse(Msgs)
+    end.

+ 18 - 3
test/emqx_mod_sup_SUITE.erl

@@ -21,8 +21,23 @@
 
 all() -> [t_child_all].
 
+start_link() ->
+    Pid = spawn_link(?MODULE, echo, [0]),
+    {ok, Pid}.
+
+echo(State) ->
+    receive
+        {From, Req} ->
+            ct:pal("======from:~p, req:~p", [From, Req]),
+            From ! Req,
+            echo(State)
+    end.
+
 t_child_all(_) ->
-    {ok, _Pid} = emqx_mod_sup:start_link(),
-    {ok, _Child} = emqx_mod_sup:start_child(emqx_banned, worker),
+    {ok, Pid} = emqx_mod_sup:start_link(),
+    {ok, Child} = emqx_mod_sup:start_child(?MODULE, worker),
     timer:sleep(10),
-    ok = emqx_mod_sup:stop_child(emqx_banned).
+    Child ! {self(), hi},
+    receive hi -> ok after 100 -> ct:fail({timeout, wait_echo}) end,
+    ok = emqx_mod_sup:stop_child(?MODULE),
+    exit(Pid, normal).

+ 3 - 6
test/emqx_pool_SUITE.erl

@@ -30,7 +30,7 @@ all() ->
 groups() ->
     [
      {submit_case, [sequence], [submit_mfa, submit_fa]},
-     {async_submit_case, [sequence], [async_submit_mfa, async_submit_ex]}
+     {async_submit_case, [sequence], [async_submit_mfa, async_submit_crash]}
     ].
 
 init_per_suite(Config) ->
@@ -61,8 +61,8 @@ async_submit_mfa(_Config) ->
     emqx_pool:async_submit({?MODULE, test_mfa, []}),
     emqx_pool:async_submit(fun ?MODULE:test_mfa/0, []).
 
-async_submit_ex(_) ->
-    emqx_pool:async_submit(fun error_fun/0).
+async_submit_crash(_) ->
+    emqx_pool:async_submit(fun() -> A = 1, A = 0 end).
 
 t_unexpected(_) ->
     Pid = emqx_pool:worker(),
@@ -73,6 +73,3 @@ t_unexpected(_) ->
 
 test_mfa() ->
     lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]).
-
-error_fun() -> error(test_error).
-

+ 56 - 57
test/emqx_protocol_SUITE.erl

@@ -101,19 +101,17 @@ all() ->
     ].
 
 groups() ->
-    [{mqtt_common,
-      [sequence],
-      [will_check]},
-     {mqttv4,
-      [sequence],
+    [{mqtt_common, [sequence],
+      [will_topic_check,
+       will_acl_check
+       ]},
+     {mqttv4, [sequence],
       [connect_v4,
        subscribe_v4]},
-     {mqttv5,
-      [sequence],
+     {mqttv5, [sequence],
       [connect_v5,
        subscribe_v5]},
-     {acl,
-      [sequence],
+     {acl, [sequence],
       [acl_deny_action_ct,
        acl_deny_action_eunit]}].
 
@@ -266,11 +264,10 @@ connect_v5(_) ->
                                 raw_recv_parse(Data, ?MQTT_PROTO_V5),
 
                             emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh  => 1,
-                                                                                                                  qos => ?QOS_2,
-                                                                                                                  rap => 0,
-                                                                                                                  nl  => 0,
-                                                                                                                  rc  => 0}}]),
-                                                                            #{version => ?MQTT_PROTO_V5})),
+                                             qos => ?QOS_2,
+                                             rap => 0,
+                                             nl  => 0,
+                                             rc  => 0}}]), #{version => ?MQTT_PROTO_V5})),
 
                             {ok, Data2} = gen_tcp:recv(Sock, 0),
                             {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5),
@@ -365,19 +362,16 @@ connect_v5(_) ->
                             do_connect(Sock2, ?MQTT_PROTO_V5),
 
                             emqx_client_sock:send(Sock2, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh  => 1,
-                                                                                                                   qos => ?QOS_2,
-                                                                                                                   rap => 0,
-                                                                                                                   nl  => 0,
-                                                                                                                   rc  => 0}}]),
-                                                                            #{version => ?MQTT_PROTO_V5})),
+                                             qos => ?QOS_2,
+                                             rap => 0,
+                                             nl  => 0,
+                                             rc  => 0}}]), #{version => ?MQTT_PROTO_V5})),
 
                             {ok, SubData} = gen_tcp:recv(Sock2, 0),
                             {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
 
                             emqx_client_sock:send(Sock, raw_send_serialize(
-                                                            ?DISCONNECT_PACKET(?RC_SUCCESS)
-                                                        )
-                            ),
+                                                            ?DISCONNECT_PACKET(?RC_SUCCESS))),
 
                             {error, timeout} = gen_tcp:recv(Sock2, 0, 2000),
 
@@ -572,8 +566,8 @@ raw_recv_parse(P, ProtoVersion) ->
 acl_deny_action_ct(_) ->
     emqx_zone:set_env(external, acl_deny_action, disconnect),
     process_flag(trap_exit, true),
-    [acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
     [acl_deny_do_disconnect(subscribe, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
+    [acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
     emqx_zone:set_env(external, acl_deny_action, ignore),
     ok.
 
@@ -585,57 +579,62 @@ acl_deny_action_eunit(_) ->
     {error, CodeName, NEWPSTATE2} = emqx_protocol:process_packet(?PUBLISH_PACKET(?QOS_2, <<"acl_deny_action">>, 2, <<"payload">>), PState),
     ?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE2#pstate.send_stats).
 
-will_check(_) ->
-    process_flag(trap_exit, true),
-    will_topic_check(0),
-    will_acl_check(0).
-
-will_topic_check(QoS) ->
+will_topic_check(_) ->
     {ok, Client} = emqx_client:start_link([{username, <<"emqx">>},
                                            {will_flag, true},
-                                           {will_topic, <<"">>},
+                                           {will_topic, <<"aaa">>},
                                            {will_payload, <<"I have died">>},
-                                           {will_qos, QoS}]),
-    try emqx_client:connect(Client) of
-        _ ->
-            ok
-    catch
-        exit : _Reason ->
-            false = is_process_alive(Client)
-    end.
+                                           {will_qos, 0}]),
+    {ok, _} = emqx_client:connect(Client),
 
-will_acl_check(QoS) ->
-    {ok, Client} = emqx_client:start_link([{username, <<"emqx">>},
+    {ok, T} = emqx_client:start_link([{client_id, <<"client">>}]),
+    emqx_client:connect(T),
+    emqx_client:subscribe(T, <<"aaa">>),
+    ct:sleep(200),
+
+    emqx_client:stop(Client),
+    ct:sleep(100),
+    false = is_process_alive(Client),
+    emqx_ct_broker_helpers:wait_mqtt_payload(<<"I have died">>),
+    emqx_client:stop(T).
+
+will_acl_check(_) ->
+    %% The connection will be rejected if publishing of the will message is not allowed by
+    %% ACL rules
+    process_flag(trap_exit, true),
+    {ok, Client} = emqx_client:start_link([{username, <<"pub_deny">>},
                                            {will_flag, true},
-                                           {will_topic, <<"acl_deny_action">>},
+                                           {will_topic, <<"pub_deny">>},
                                            {will_payload, <<"I have died">>},
-                                           {will_qos, QoS}]),
-    try emqx_client:connect(Client) of
-        _ ->
-            ok
-    catch
-        exit : _Reason ->
-            false = is_process_alive(Client)
-    end.
+                                           {will_qos, 0}]),
+    ?assertMatch({error,{_,_}}, emqx_client:connect(Client)).
 
 acl_deny_do_disconnect(publish, QoS, Topic) ->
+    process_flag(trap_exit, true),
     {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]),
     {ok, _} = emqx_client:connect(Client),
     emqx_client:publish(Client, Topic, <<"test">>, QoS),
     receive
-        {'EXIT', Client, _Reason} ->
-            false = is_process_alive(Client)
+        {'EXIT', Client, {shutdown,tcp_closed}} ->
+            ct:pal(info, "[OK] after publish, received exit: {shutdown,tcp_closed}"),
+            false = is_process_alive(Client);
+        {'EXIT', Client, Reason} ->
+            ct:pal(info, "[OK] after publish, client got disconnected: ~p", [Reason])
+    after 1000 -> ct:fail({timeout, wait_tcp_closed})
     end;
 
 acl_deny_do_disconnect(subscribe, QoS, Topic) ->
+    process_flag(trap_exit, true),
     {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]),
     {ok, _} = emqx_client:connect(Client),
-    try emqx_client:subscribe(Client, Topic, QoS) of
-        _ ->
-            ok
-    catch
-        exit : _Reason ->
-            false = is_process_alive(Client)
+    {ok, _, [128]} = emqx_client:subscribe(Client, Topic, QoS),
+    receive
+        {'EXIT', Client, {shutdown,tcp_closed}} ->
+            ct:pal(info, "[OK] after subscribe, received exit: {shutdown,tcp_closed}"),
+            false = is_process_alive(Client);
+        {'EXIT', Client, Reason} ->
+            ct:pal(info, "[OK] after subscribe, client got disconnected: ~p", [Reason])
+    after 1000 -> ct:fail({timeout, wait_tcp_closed})
     end.
 
 start_apps(App, SchemaFile, ConfigFile) ->

+ 29 - 26
test/emqx_sm_SUITE.erl

@@ -16,6 +16,7 @@
 
 -include("emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -33,7 +34,7 @@ all() -> [{group, sm}].
 
 groups() ->
     [{sm, [non_parallel_tests],
-      [t_open_close_session,
+      [
        t_resume_session,
        t_discard_session,
        t_register_unregister_session,
@@ -48,45 +49,47 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_ct_broker_helpers:run_teardown_steps().
 
-t_open_close_session(_) ->
-    {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
-    {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
-    ?assertEqual(ok, emqx_sm:close_session(SPid)).
+init_per_testcase(_All, Config) ->
+    {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => self()}),
+    [{session_pid, SPid}|Config].
 
-t_resume_session(_) ->
-    {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
-    {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
-    ?assertEqual({ok, SPid}, emqx_sm:resume_session(<<"client">>, ?ATTRS#{conn_pid => ClientPid})).
+end_per_testcase(_All, Config) ->
+    emqx_sm:close_session(?config(session_pid, Config)),
+    receive
+        {shutdown, normal} -> ok
+    after 500 -> ct:fail({timeout, wait_session_shutdown})
+    end.
+
+t_resume_session(Config) ->
+    ?assertEqual({ok, ?config(session_pid, Config)}, emqx_sm:resume_session(<<"client">>, ?ATTRS#{conn_pid => self()})).
 
 t_discard_session(_) ->
-    {ok, ClientPid} = emqx_mock_client:start_link(<<"client1">>),
-    {ok, _SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
     ?assertEqual(ok, emqx_sm:discard_session(<<"client1">>)).
 
 t_register_unregister_session(_) ->
     Pid = self(),
-    {ok, _ClientPid} = emqx_mock_client:start_link(<<"client">>),
     ?assertEqual(ok, emqx_sm:register_session(<<"client">>)),
     ?assertEqual(ok, emqx_sm:register_session(<<"client">>, Pid)),
     ?assertEqual(ok, emqx_sm:unregister_session(<<"client">>)),
     ?assertEqual(ok, emqx_sm:unregister_session(<<"client">>), Pid).
 
-t_get_set_session_attrs(_) ->
-    {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
-    {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
-    ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, [?ATTRS#{conn_pid => ClientPid}])),
-    ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, SPid, [?ATTRS#{conn_pid => ClientPid}])),
-    [SAttr] = emqx_sm:get_session_attrs(<<"client">>, SPid),
-    ?assertEqual(<<"client">>, maps:get(client_id, SAttr)).
-
-t_get_set_session_stats(_) ->
-    {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
-    {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
+t_get_set_session_attrs(Config) ->
+    SPid = ?config(session_pid, Config),
+    ClientPid0 = spawn(fun() -> receive _ -> ok end end),
+    ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, [?ATTRS#{conn_pid => ClientPid0}])),
+    ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, SPid, [?ATTRS#{conn_pid => ClientPid0}])),
+    [SAttr0] = emqx_sm:get_session_attrs(<<"client">>, SPid),
+    ?assertEqual(ClientPid0, maps:get(conn_pid, SAttr0)),
+    ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, SPid, [?ATTRS#{conn_pid => self()}])),
+    [SAttr1] = emqx_sm:get_session_attrs(<<"client">>, SPid),
+    ?assertEqual(self(), maps:get(conn_pid, SAttr1)).
+
+t_get_set_session_stats(Config) ->
+    SPid = ?config(session_pid, Config),
     ?assertEqual(true, emqx_sm:set_session_stats(<<"client">>, [{inflight, 10}])),
     ?assertEqual(true, emqx_sm:set_session_stats(<<"client">>, SPid, [{inflight, 10}])),
     ?assertEqual([{inflight, 10}], emqx_sm:get_session_stats(<<"client">>, SPid)).
 
-t_lookup_session_pids(_) ->
-    {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
-    {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
+t_lookup_session_pids(Config) ->
+    SPid = ?config(session_pid, Config),
     ?assertEqual([SPid], emqx_sm:lookup_session_pids(<<"client">>)).

+ 2 - 1
test/emqx_sys_mon_SUITE.erl

@@ -58,7 +58,8 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
     after
         1000 ->
             ct:fail("flase")
-    end.
+    end,
+    emqx_client:stop(C).
 
 concat_str(ValidateInfo, InfoOrPort, Info) ->
     WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]),

+ 35 - 8
test/emqx_tracer_SUITE.erl

@@ -36,12 +36,39 @@ start_traces(_Config) ->
                                       {username, <<"testuser">>},
                                       {password, <<"pass">>}]),
     emqx_client:connect(T),
-    emqx_client:subscribe(T, <<"a/b/c">>),
-    ok = emqx_tracer:start_trace({client_id, <<"client">>}, all, "test/emqx_SUITE_data/clientid_trace.log"),
-    ok = emqx_tracer:start_trace({topic, <<"topic">>}, all, "test/emqx_SUITE_data/topic_trace.log"),
-    {ok, _} = file:read_file("test/emqx_SUITE_data/clientid_trace.log"),
-    {ok, _} = file:read_file("test/emqx_SUITE_data/topic_trace.log"),
-    Result = emqx_tracer:lookup_traces(),
-    ?assertEqual([{{client_id,<<"client">>},{all,"test/emqx_SUITE_data/clientid_trace.log"}},{{topic,<<"topic">>},{all,"test/emqx_SUITE_data/topic_trace.log"}}], Result),
+
+    %% Start tracing
+    ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
+    ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
+    ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
+    ct:sleep(100),
+
+    %% Verify the tracing file exits
+    ?assert(filelib:is_regular("tmp/client.log")),
+    ?assert(filelib:is_regular("tmp/client2.log")),
+    ?assert(filelib:is_regular("tmp/topic_trace.log")),
+
+    %% Get current traces
+    ?assertEqual([{{client_id,<<"client">>},{debug,"tmp/client.log"}},
+                  {{client_id,<<"client2">>},{all,"tmp/client2.log"}},
+                  {{topic,<<"a/#">>},{all,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
+
+    %% set the overall log level to debug
+    emqx_logger:set_log_level(debug),
+
+    %% Client with clientid = "client" publishes a "hi" message to "a/b/c".
+    emqx_client:publish(T, <<"a/b/c">>, <<"hi">>),
+    ct:sleep(200),
+
+    %% Verify messages are logged to "tmp/client.log" and "tmp/topic_trace.log", but not "tmp/client2.log".
+    ?assert(filelib:file_size("tmp/client.log") > 0),
+    ?assert(filelib:file_size("tmp/topic_trace.log") > 0),
+    ?assert(filelib:file_size("tmp/client2.log") == 0),
+
+    %% Stop tracing
     ok = emqx_tracer:stop_trace({client_id, <<"client">>}),
-    ok = emqx_tracer:stop_trace({topic, <<"topic">>}).
+    ok = emqx_tracer:stop_trace({client_id, <<"client2">>}),
+    ok = emqx_tracer:stop_trace({topic, <<"a/#">>}),
+    emqx_client:disconnect(T),
+
+    emqx_logger:set_log_level(error).