Просмотр исходного кода

Merge pull request #10919 from thalesmg/test-flakiness-20230601-a

test(pulsar_producer): attempt to fix flaky test
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
20a7ce64cd

+ 24 - 0
apps/emqx/include/asserts.hrl

@@ -59,3 +59,27 @@
         end
     end)()
 ).
+
+-define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin
+    __TEST_CASE = ?FUNCTION_NAME,
+    (fun
+        __GO(__CONFIG, __N) when __N >= NUM_RETRIES ->
+            TEST_BODY_FN(__CONFIG);
+        __GO(__CONFIG, __N) ->
+            try
+                TEST_BODY_FN(__CONFIG)
+            catch
+                __KIND:__REASON:__STACKTRACE ->
+                    ct:pal("test errored; will retry\n  ~p", [
+                        #{kind => __KIND, reason => __REASON, stacktrace => __STACKTRACE}
+                    ]),
+                    end_per_testcase(__TEST_CASE, __CONFIG),
+                    garbage_collect(),
+                    timer:sleep(1000),
+                    __CONFIG1 = init_per_testcase(__TEST_CASE, __CONFIG),
+                    __GO(__CONFIG1, __N + 1)
+            end
+    end)(
+        CONFIG, 0
+    )
+end).

+ 29 - 4
apps/emqx/test/emqx_common_test_helpers.erl

@@ -725,10 +725,17 @@ start_slave(Name, Opts) when is_map(Opts) ->
     Node = node_name(Name),
     put_peer_mod(Node, SlaveMod),
     Cookie = atom_to_list(erlang:get_cookie()),
+    PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
+    NodeDataDir = filename:join([
+        PrivDataDir,
+        Node,
+        integer_to_list(erlang:unique_integer())
+    ]),
     DoStart =
         fun() ->
             case SlaveMod of
                 ct_slave ->
+                    ct:pal("~p: node data dir: ~s", [Node, NodeDataDir]),
                     ct_slave:start(
                         Node,
                         [
@@ -739,7 +746,8 @@ start_slave(Name, Opts) when is_map(Opts) ->
                             {erl_flags, erl_flags()},
                             {env, [
                                 {"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"},
-                                {"EMQX_NODE__COOKIE", Cookie}
+                                {"EMQX_NODE__COOKIE", Cookie},
+                                {"EMQX_NODE__DATA_DIR", NodeDataDir}
                             ]}
                         ]
                     );
@@ -844,7 +852,14 @@ setup_node(Node, Opts) when is_map(Opts) ->
         integer_to_list(erlang:unique_integer()),
         "mnesia"
     ]),
-    erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]),
+    case erpc:call(Node, application, get_env, [mnesia, dir, undefined]) of
+        undefined ->
+            ct:pal("~p: setting mnesia dir: ~p", [Node, MnesiaDataDir]),
+            erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]);
+        PreviousMnesiaDir ->
+            ct:pal("~p: mnesia dir already set: ~p", [Node, PreviousMnesiaDir]),
+            ok
+    end,
 
     %% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
     %% in emqx_common_test_helpers:start_apps(...)
@@ -877,8 +892,8 @@ setup_node(Node, Opts) when is_map(Opts) ->
                     %% nodes.  these variables might not be in the
                     %% config file (e.g.: emqx_enterprise_schema).
                     Cookie = atom_to_list(erlang:get_cookie()),
-                    os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir),
-                    os:putenv("EMQX_NODE__COOKIE", Cookie),
+                    set_env_once("EMQX_NODE__DATA_DIR", NodeDataDir),
+                    set_env_once("EMQX_NODE__COOKIE", Cookie),
                     emqx_config:init_load(SchemaMod),
                     application:set_env(emqx, init_config_load_done, true)
                 end,
@@ -930,6 +945,15 @@ setup_node(Node, Opts) when is_map(Opts) ->
 
 %% Helpers
 
+set_env_once(Var, Value) ->
+    case os:getenv(Var) of
+        false ->
+            os:putenv(Var, Value);
+        _OldValue ->
+            ok
+    end,
+    ok.
+
 put_peer_mod(Node, SlaveMod) ->
     put({?MODULE, Node}, SlaveMod),
     ok.
@@ -1289,6 +1313,7 @@ call_janitor() ->
 call_janitor(Timeout) ->
     Janitor = get_or_spawn_janitor(),
     ok = emqx_test_janitor:stop(Janitor, Timeout),
+    erase({?MODULE, janitor_proc}),
     ok.
 
 get_or_spawn_janitor() ->

+ 11 - 9
apps/emqx/test/emqx_test_janitor.erl

@@ -60,13 +60,14 @@ init(Parent) ->
     {ok, #{callbacks => [], owner => Parent}}.
 
 terminate(_Reason, #{callbacks := Callbacks}) ->
-    do_terminate(Callbacks).
+    _ = do_terminate(Callbacks),
+    ok.
 
 handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
     {reply, ok, State#{callbacks := [Callback | Callbacks]}};
 handle_call(terminate, _From, State = #{callbacks := Callbacks}) ->
-    do_terminate(Callbacks),
-    {stop, normal, ok, State};
+    FailedCallbacks = do_terminate(Callbacks),
+    {stop, normal, ok, State#{callbacks := FailedCallbacks}};
 handle_call(_Req, _From, State) ->
     {reply, error, State}.
 
@@ -83,17 +84,18 @@ handle_info(_Msg, State) ->
 %%----------------------------------------------------------------------------------
 
 do_terminate(Callbacks) ->
-    lists:foreach(
-        fun(Fun) ->
+    lists:foldl(
+        fun(Fun, Failed) ->
             try
-                Fun()
+                Fun(),
+                Failed
             catch
                 K:E:S ->
                     ct:pal("error executing callback ~p: ~p", [Fun, {K, E}]),
                     ct:pal("stacktrace: ~p", [S]),
-                    ok
+                    [Fun | Failed]
             end
         end,
+        [],
         Callbacks
-    ),
-    ok.
+    ).

+ 51 - 24
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl

@@ -9,6 +9,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -148,6 +149,7 @@ end_per_testcase(_Testcase, Config) ->
         true ->
             ok;
         false ->
+            ok = emqx_config:delete_override_conf_files(),
             ProxyHost = ?config(proxy_host, Config),
             ProxyPort = ?config(proxy_port, Config),
             emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
@@ -157,6 +159,7 @@ end_per_testcase(_Testcase, Config) ->
             %% machines struggle with all the containers running...
             emqx_common_test_helpers:call_janitor(60_000),
             ok = snabbkaffe:stop(),
+            flush_consumed(),
             ok
     end.
 
@@ -373,7 +376,9 @@ start_consumer(TestCase, Config) ->
                 (integer_to_binary(PulsarPort))/binary>>
         ),
     ConnOpts = #{},
-    ConsumerClientId = TestCase,
+    ConsumerClientId = list_to_atom(
+        atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer())
+    ),
     CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
     SSLOpts = #{
         enable => UseTLS,
@@ -393,12 +398,12 @@ start_consumer(TestCase, Config) ->
         cb_init_args => #{send_to => self()},
         cb_module => pulsar_echo_consumer,
         sub_type => 'Shared',
-        subscription => atom_to_list(TestCase),
+        subscription => atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer()),
         max_consumer_num => 1,
         %% Note!  This must not coincide with the client
         %% id, or else weird bugs will happen, like the
         %% consumer never starts...
-        name => test_consumer,
+        name => list_to_atom("test_consumer" ++ integer_to_list(erlang:unique_integer())),
         consumer_id => 1,
         conn_opts => ConnOpts
     },
@@ -440,7 +445,10 @@ wait_until_connected(SupMod, Mod) ->
     ?retry(
         _Sleep = 300,
         _Attempts0 = 20,
-        lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
+        begin
+            true = length(Pids) > 0,
+            lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
+        end
     ),
     ok.
 
@@ -483,6 +491,12 @@ receive_consumed(Timeout) ->
         ct:fail("no message consumed")
     end.
 
+flush_consumed() ->
+    receive
+        {pulsar_message, _} -> flush_consumed()
+    after 0 -> ok
+    end.
+
 try_decode_json(Payload) ->
     case emqx_utils_json:safe_decode(Payload, [return_maps]) of
         {error, _} ->
@@ -1054,31 +1068,44 @@ t_resource_manager_crash_before_producers_started(Config) ->
     ),
     ok.
 
-t_cluster(Config) ->
-    MQTTTopic = ?config(mqtt_topic, Config),
-    ResourceId = resource_id(Config),
-    Cluster = cluster(Config),
-    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
-    QoS = 0,
-    Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+t_cluster(Config0) ->
+    ct:timetrap({seconds, 120}),
+    ?retrying(Config0, 3, fun do_t_cluster/1).
+
+do_t_cluster(Config) ->
     ?check_trace(
         begin
+            MQTTTopic = ?config(mqtt_topic, Config),
+            ResourceId = resource_id(Config),
+            Cluster = cluster(Config),
+            ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+            QoS = 0,
+            Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+            NumNodes = length(Cluster),
+            {ok, SRef0} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := emqx_bridge_app_started}),
+                NumNodes,
+                25_000
+            ),
             Nodes = [N1, N2 | _] = start_cluster(Cluster),
             %% wait until bridge app supervisor is up; by that point,
             %% `emqx_config_handler:add_handler' has been called and the node should be
             %% ready to create bridges.
-            NumNodes = length(Nodes),
-            {ok, _} = snabbkaffe:block_until(
-                ?match_n_events(NumNodes, #{?snk_kind := emqx_bridge_app_started}),
-                15_000
-            ),
-            {ok, SRef0} = snabbkaffe:subscribe(
+            {ok, _} = snabbkaffe:receive_events(SRef0),
+            {ok, SRef1} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := pulsar_producer_bridge_started}),
                 NumNodes,
-                15_000
+                25_000
             ),
             {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),
-            {ok, _} = snabbkaffe:receive_events(SRef0),
+            {ok, _} = snabbkaffe:receive_events(SRef1),
+            {ok, _} = snabbkaffe:block_until(
+                ?match_n_events(
+                    NumNodes,
+                    #{?snk_kind := bridge_post_config_update_done}
+                ),
+                25_000
+            ),
             lists:foreach(
                 fun(N) ->
                     ?retry(
@@ -1095,6 +1122,7 @@ t_cluster(Config) ->
             ),
             erpc:multicall(Nodes, fun wait_until_producer_connected/0),
             Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
+            ?tp(publishing_message, #{}),
             erpc:call(N2, emqx, publish, [Message0]),
 
             lists:foreach(
@@ -1108,10 +1136,7 @@ t_cluster(Config) ->
                 Nodes
             ),
 
-            ok
-        end,
-        fun(_Trace) ->
-            Data0 = receive_consumed(10_000),
+            Data0 = receive_consumed(30_000),
             ?assertMatch(
                 [
                     #{
@@ -1123,7 +1148,9 @@ t_cluster(Config) ->
                 ],
                 Data0
             ),
+
             ok
-        end
+        end,
+        []
     ),
     ok.

+ 51 - 36
apps/emqx_conf/test/emqx_conf_app_SUITE.erl

@@ -20,14 +20,17 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
-t_copy_conf_override_on_restarts(_Config) ->
+t_copy_conf_override_on_restarts(Config) ->
     ct:timetrap({seconds, 120}),
     snabbkaffe:fix_ct_logging(),
-    Cluster = cluster([cluster_spec({core, 1}), cluster_spec({core, 2}), cluster_spec({core, 3})]),
+    Cluster = cluster(
+        [cluster_spec({core, 1}), cluster_spec({core, 2}), cluster_spec({core, 3})], Config
+    ),
 
     %% 1. Start all nodes
     Nodes = start_cluster(Cluster),
@@ -50,16 +53,19 @@ t_copy_conf_override_on_restarts(_Config) ->
         stop_cluster(Nodes)
     end.
 
-t_copy_new_data_dir(_Config) ->
+t_copy_new_data_dir(Config) ->
     net_kernel:start(['master1@127.0.0.1', longnames]),
     ct:timetrap({seconds, 120}),
     snabbkaffe:fix_ct_logging(),
-    Cluster = cluster([cluster_spec({core, 4}), cluster_spec({core, 5}), cluster_spec({core, 6})]),
+    Cluster = cluster(
+        [cluster_spec({core, 4}), cluster_spec({core, 5}), cluster_spec({core, 6})], Config
+    ),
 
     %% 1. Start all nodes
     [First | Rest] = Nodes = start_cluster(Cluster),
     try
-        File = "/configs/cluster.hocon",
+        NodeDataDir = erpc:call(First, emqx, data_dir, []),
+        File = NodeDataDir ++ "/configs/cluster.hocon",
         assert_config_load_done(Nodes),
         rpc:call(First, ?MODULE, create_data_dir, [File]),
         {[ok, ok, ok], []} = rpc:multicall(Nodes, application, stop, [emqx_conf]),
@@ -74,16 +80,19 @@ t_copy_new_data_dir(_Config) ->
         stop_cluster(Nodes)
     end.
 
-t_copy_deprecated_data_dir(_Config) ->
+t_copy_deprecated_data_dir(Config) ->
     net_kernel:start(['master2@127.0.0.1', longnames]),
     ct:timetrap({seconds, 120}),
     snabbkaffe:fix_ct_logging(),
-    Cluster = cluster([cluster_spec({core, 7}), cluster_spec({core, 8}), cluster_spec({core, 9})]),
+    Cluster = cluster(
+        [cluster_spec({core, 7}), cluster_spec({core, 8}), cluster_spec({core, 9})], Config
+    ),
 
     %% 1. Start all nodes
     [First | Rest] = Nodes = start_cluster(Cluster),
     try
-        File = "/configs/cluster-override.conf",
+        NodeDataDir = erpc:call(First, emqx, data_dir, []),
+        File = NodeDataDir ++ "/configs/cluster-override.conf",
         assert_config_load_done(Nodes),
         rpc:call(First, ?MODULE, create_data_dir, [File]),
         {[ok, ok, ok], []} = rpc:multicall(Nodes, application, stop, [emqx_conf]),
@@ -131,56 +140,60 @@ t_no_copy_from_newer_version_node(_Config) ->
 %%------------------------------------------------------------------------------
 
 create_data_dir(File) ->
-    Node = atom_to_list(node()),
-    ok = filelib:ensure_dir(Node ++ "/certs/"),
-    ok = filelib:ensure_dir(Node ++ "/authz/"),
-    ok = filelib:ensure_dir(Node ++ "/configs/"),
-    ok = file:write_file(Node ++ "/certs/fake-cert", list_to_binary(Node)),
-    ok = file:write_file(Node ++ "/authz/fake-authz", list_to_binary(Node)),
+    NodeDataDir = emqx:data_dir(),
+    ok = filelib:ensure_dir(NodeDataDir ++ "/certs/"),
+    ok = filelib:ensure_dir(NodeDataDir ++ "/authz/"),
+    ok = filelib:ensure_dir(NodeDataDir ++ "/configs/"),
+    ok = file:write_file(NodeDataDir ++ "/certs/fake-cert", list_to_binary(NodeDataDir)),
+    ok = file:write_file(NodeDataDir ++ "/authz/fake-authz", list_to_binary(NodeDataDir)),
     Telemetry = <<"telemetry.enable = false">>,
-    ok = file:write_file(Node ++ File, Telemetry).
+    ok = file:write_file(File, Telemetry).
 
 set_data_dir_env() ->
-    Node = atom_to_list(node()),
+    NodeDataDir = emqx:data_dir(),
+    NodeStr = atom_to_list(node()),
     %% will create certs and authz dir
-    ok = filelib:ensure_dir(Node ++ "/configs/"),
+    ok = filelib:ensure_dir(NodeDataDir ++ "/configs/"),
     {ok, [ConfigFile]} = application:get_env(emqx, config_files),
-    NewConfigFile = ConfigFile ++ "." ++ Node,
+    NewConfigFile = ConfigFile ++ "." ++ NodeStr,
+    ok = filelib:ensure_dir(NewConfigFile),
     {ok, _} = file:copy(ConfigFile, NewConfigFile),
     Bin = iolist_to_binary(io_lib:format("node.config_files = [~p]~n", [NewConfigFile])),
     ok = file:write_file(NewConfigFile, Bin, [append]),
-    DataDir = iolist_to_binary(io_lib:format("node.data_dir = ~p~n", [Node])),
+    DataDir = iolist_to_binary(io_lib:format("node.data_dir = ~p~n", [NodeDataDir])),
     ok = file:write_file(NewConfigFile, DataDir, [append]),
     application:set_env(emqx, config_files, [NewConfigFile]),
-    application:set_env(emqx, data_dir, Node),
+    %% application:set_env(emqx, data_dir, Node),
     %% We set env both cluster.hocon and cluster-override.conf, but only one will be used
-    application:set_env(emqx, cluster_hocon_file, Node ++ "/configs/cluster.hocon"),
-    application:set_env(emqx, cluster_override_conf_file, Node ++ "/configs/cluster-override.conf"),
+    application:set_env(emqx, cluster_hocon_file, NodeDataDir ++ "/configs/cluster.hocon"),
+    application:set_env(
+        emqx, cluster_override_conf_file, NodeDataDir ++ "/configs/cluster-override.conf"
+    ),
     ok.
 
-assert_data_copy_done([First0 | Rest], File) ->
-    First = atom_to_list(First0),
-    {ok, FakeCertFile} = file:read_file(First ++ "/certs/fake-cert"),
-    {ok, FakeAuthzFile} = file:read_file(First ++ "/authz/fake-authz"),
-    {ok, FakeOverrideFile} = file:read_file(First ++ File),
+assert_data_copy_done([_First | Rest], File) ->
+    FirstDataDir = filename:dirname(filename:dirname(File)),
+    {ok, FakeCertFile} = file:read_file(FirstDataDir ++ "/certs/fake-cert"),
+    {ok, FakeAuthzFile} = file:read_file(FirstDataDir ++ "/authz/fake-authz"),
+    {ok, FakeOverrideFile} = file:read_file(File),
     {ok, ExpectFake} = hocon:binary(FakeOverrideFile),
     lists:foreach(
         fun(Node0) ->
-            Node = atom_to_list(Node0),
+            NodeDataDir = erpc:call(Node0, emqx, data_dir, []),
             ?assertEqual(
                 {ok, FakeCertFile},
-                file:read_file(Node ++ "/certs/fake-cert"),
-                #{node => Node}
+                file:read_file(NodeDataDir ++ "/certs/fake-cert"),
+                #{node => Node0}
             ),
             ?assertEqual(
                 {ok, ExpectFake},
-                hocon:files([Node ++ File]),
-                #{node => Node}
+                hocon:files([File]),
+                #{node => Node0}
             ),
             ?assertEqual(
                 {ok, FakeAuthzFile},
-                file:read_file(Node ++ "/authz/fake-authz"),
-                #{node => Node}
+                file:read_file(NodeDataDir ++ "/authz/fake-authz"),
+                #{node => Node0}
             )
         end,
         Rest
@@ -207,7 +220,7 @@ assert_config_load_done(Nodes) ->
     ).
 
 stop_cluster(Nodes) ->
-    [emqx_common_test_helpers:stop_slave(Node) || Node <- Nodes].
+    emqx_utils:pmap(fun emqx_common_test_helpers:stop_slave/1, Nodes).
 
 start_cluster(Specs) ->
     [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Specs].
@@ -222,7 +235,8 @@ start_cluster_async(Specs) ->
      || {Name, Opts} <- Specs
     ].
 
-cluster(Specs) ->
+cluster(Specs, Config) ->
+    PrivDataDir = ?config(priv_dir, Config),
     Env = [
         {emqx, init_config_load_done, false},
         {emqx, boot_modules, []}
@@ -232,6 +246,7 @@ cluster(Specs) ->
         {apps, [emqx_conf]},
         {load_schema, false},
         {join_to, true},
+        {priv_data_dir, PrivDataDir},
         {env_handler, fun
             (emqx) ->
                 application:set_env(emqx, boot_modules, []),