فهرست منبع

Merge pull request #12523 from keynslug/ft/bump-ekka-0.19.0

refactor: bump ekka to 0.19.0 w/o mnesia boot phase
Andrew Mayorov 2 سال پیش
والد
کامیت
ef4ae92da4

+ 1 - 1
apps/emqx/rebar.config

@@ -28,7 +28,7 @@
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
-    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.18.4"}}},
+    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.0"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
     {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.41.0"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},

+ 12 - 6
apps/emqx/test/emqx_cth_cluster.erl

@@ -332,11 +332,12 @@ allocate_listener_ports(Types, Spec) ->
 
 start_nodes_init(Specs, Timeout) ->
     Names = lists:map(fun(#{name := Name}) -> Name end, Specs),
-    Nodes = start_bare_nodes(Names, Timeout),
-    lists:foreach(fun node_init/1, Nodes).
+    _Nodes = start_bare_nodes(Names, Timeout),
+    lists:foreach(fun node_init/1, Specs).
 
 start_bare_nodes(Names) ->
     start_bare_nodes(Names, ?TIMEOUT_NODE_START_MS).
+
 start_bare_nodes(Names, Timeout) ->
     Args = erl_flags(),
     Envs = [],
@@ -355,7 +356,7 @@ start_bare_nodes(Names, Timeout) ->
     Nodes.
 
 deadline(Timeout) ->
-    erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, nanosecond).
+    erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, native).
 
 is_overdue(Deadline) ->
     erlang:monotonic_time() > Deadline.
@@ -379,10 +380,15 @@ wait_boot_complete(Waits, Deadline) ->
         wait_boot_complete(Waits, Deadline)
     end.
 
-node_init(Node) ->
-    % Make it possible to call `ct:pal` and friends (if running under rebar3)
+node_init(#{name := Node, work_dir := WorkDir}) ->
+    %% Create exclusive current directory for the node.  Some configurations, like plugin
+    %% installation directory, are the same for the whole cluster, and nodes on the same
+    %% machine will step on each other's toes...
+    ok = filelib:ensure_path(WorkDir),
+    ok = erpc:call(Node, file, set_cwd, [WorkDir]),
+    %% Make it possible to call `ct:pal` and friends (if running under rebar3)
     _ = share_load_module(Node, cthr),
-    % Enable snabbkaffe trace forwarding
+    %% Enable snabbkaffe trace forwarding
     ok = snabbkaffe:forward_trace(Node),
     when_cover_enabled(fun() -> {ok, _} = cover:start([Node]) end),
     ok.

+ 11 - 22
apps/emqx/test/emqx_cth_peer.erl

@@ -43,28 +43,17 @@ start_link(Name, Args, Envs, Timeout) when is_atom(Name) ->
 
 do_start(Name0, Args, Envs, Timeout, Func) when is_atom(Name0) ->
     {Name, Host} = parse_node_name(Name0),
-    %% Create exclusive current directory for the node.  Some configurations, like plugin
-    %% installation directory, are the same for the whole cluster, and nodes on the same
-    %% machine will step on each other's toes...
-    {ok, Cwd} = file:get_cwd(),
-    NodeCwd = filename:join([Cwd, Name]),
-    ok = filelib:ensure_dir(filename:join([NodeCwd, "dummy"])),
-    try
-        file:set_cwd(NodeCwd),
-        {ok, Pid, Node} = peer:Func(#{
-            name => Name,
-            host => Host,
-            args => Args,
-            env => Envs,
-            wait_boot => Timeout,
-            longnames => true,
-            shutdown => {halt, 1000}
-        }),
-        true = register(Node, Pid),
-        {ok, Node}
-    after
-        file:set_cwd(Cwd)
-    end.
+    {ok, Pid, Node} = peer:Func(#{
+        name => Name,
+        host => Host,
+        args => Args,
+        env => Envs,
+        wait_boot => Timeout,
+        longnames => true,
+        shutdown => {halt, 1000}
+    }),
+    true = register(Node, Pid),
+    {ok, Node}.
 
 stop(Node) when is_atom(Node) ->
     Pid = whereis(Node),

+ 34 - 74
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl

@@ -14,7 +14,7 @@
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 -define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>).
--define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]).
+-define(APPS, [emqx_conf, emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]).
 -define(RULE_TOPIC, "mqtt/rule").
 -define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
 
@@ -52,14 +52,27 @@ only_once_tests() ->
     ].
 
 init_per_suite(Config) ->
-    Config.
+    %% Ensure enterprise bridge module is loaded
+    _ = emqx_bridge_enterprise:module_info(),
+    %% TODO
+    %% This is needed to ensure that filenames generated deep inside pulsar/replayq
+    %% will not exceed 256 characters, because replayq eventually turns them into atoms.
+    %% The downside is increased risk of accidental name clashes / testsuite interference.
+    {ok, Cwd} = file:get_cwd(),
+    PrivDir = ?config(priv_dir, Config),
+    WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd),
+    Apps = emqx_cth_suite:start(
+        lists:flatten([
+            ?APPS,
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard()
+        ]),
+        #{work_dir => WorkDir}
+    ),
+    [{suite_apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    emqx_mgmt_api_test_util:end_suite(),
-    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
-    ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
-    _ = application:stop(emqx_connector),
-    ok.
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
 
 init_per_group(plain = Type, Config) ->
     PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"),
@@ -123,13 +136,6 @@ common_init_per_group() ->
     ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
     ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-    %% Ensure enterprise bridge module is loaded
-    ok = emqx_common_test_helpers:start_apps([emqx_conf]),
-    ok = emqx_common_test_helpers:start_apps(?APPS),
-    {ok, _} = application:ensure_all_started(pulsar),
-    _ = emqx_bridge_enterprise:module_info(),
-    {ok, _} = application:ensure_all_started(emqx_connector),
-    emqx_mgmt_api_test_util:init_suite(),
     UniqueNum = integer_to_binary(erlang:unique_integer()),
     MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
     [
@@ -210,9 +216,7 @@ pulsar_config(TestCase, _PulsarType, Config) ->
     PulsarTopic = ?config(pulsar_topic, Config),
     AuthType = proplists:get_value(sasl_auth_mechanism, Config, none),
     UseTLS = proplists:get_value(use_tls, Config, false),
-    Name = <<
-        (atom_to_binary(TestCase))/binary, UniqueNum/binary
-    >>,
+    Name = atom_to_binary(TestCase),
     MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>),
     Prefix =
         case UseTLS of
@@ -508,51 +512,18 @@ try_decode_json(Payload) ->
     end.
 
 cluster(Config) ->
-    PrivDataDir = ?config(priv_dir, Config),
-    Cluster = emqx_common_test_helpers:emqx_cluster(
-        [core, core],
-        [
-            {apps, [emqx_conf] ++ ?APPS ++ [pulsar]},
-            {listener_ports, []},
-            {priv_data_dir, PrivDataDir},
-            {load_schema, true},
-            {start_autocluster, true},
-            {schema_mod, emqx_enterprise_schema},
-            {env_handler, fun
-                (emqx) ->
-                    application:set_env(emqx, boot_modules, [broker]),
-                    ok;
-                (emqx_conf) ->
-                    ok;
-                (_) ->
-                    ok
-            end}
-        ]
-    ),
-    ct:pal("cluster: ~p", [Cluster]),
-    Cluster.
-
-start_cluster(Cluster) ->
-    Nodes =
+    Apps = [
+        {emqx, #{override_env => [{boot_modules, [broker]}]}}
+        | ?APPS
+    ],
+    Nodes = emqx_cth_cluster:start(
         [
-            emqx_common_test_helpers:start_peer(Name, Opts)
-         || {Name, Opts} <- Cluster
+            {emqx_bridge_pulsar_impl_producer1, #{apps => Apps}},
+            {emqx_bridge_pulsar_impl_producer2, #{apps => Apps}}
         ],
-    NumNodes = length(Nodes),
-    on_exit(fun() ->
-        emqx_utils:pmap(
-            fun(N) ->
-                ct:pal("stopping ~p", [N]),
-                ok = emqx_common_test_helpers:stop_peer(N)
-            end,
-            Nodes
-        )
-    end),
-    {ok, _} = snabbkaffe:block_until(
-        %% -1 because only those that join the first node will emit the event.
-        ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
-        30_000
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
+    ok = on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
     Nodes.
 
 kill_resource_managers() ->
@@ -1105,24 +1076,13 @@ do_t_cluster(Config) ->
         begin
             MQTTTopic = ?config(mqtt_topic, Config),
             ResourceId = resource_id(Config),
-            Cluster = cluster(Config),
+            Nodes = [N1, N2 | _] = cluster(Config),
             ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
             QoS = 0,
             Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
-            NumNodes = length(Cluster),
-            {ok, SRef0} = snabbkaffe:subscribe(
-                ?match_event(#{?snk_kind := emqx_bridge_app_started}),
-                NumNodes,
-                25_000
-            ),
-            Nodes = [N1, N2 | _] = start_cluster(Cluster),
-            %% wait until bridge app supervisor is up; by that point,
-            %% `emqx_config_handler:add_handler' has been called and the node should be
-            %% ready to create bridges.
-            {ok, _} = snabbkaffe:receive_events(SRef0),
             {ok, SRef1} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := pulsar_producer_bridge_started}),
-                NumNodes,
+                length(Nodes),
                 25_000
             ),
             {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),
@@ -1130,7 +1090,7 @@ do_t_cluster(Config) ->
             erpc:multicall(Nodes, fun wait_until_producer_connected/0),
             {ok, _} = snabbkaffe:block_until(
                 ?match_n_events(
-                    NumNodes,
+                    length(Nodes),
                     #{?snk_kind := bridge_post_config_update_done}
                 ),
                 25_000

+ 22 - 28
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -19,9 +19,9 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
--include("emqx_conf.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+
 -define(NODE1, emqx_cluster_rpc).
 -define(NODE2, emqx_cluster_rpc2).
 -define(NODE3, emqx_cluster_rpc3).
@@ -42,20 +42,25 @@ suite() -> [{timetrap, {minutes, 5}}].
 groups() -> [].
 
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([]),
-    ok = mria:wait_for_tables(emqx_cluster_rpc:create_tables()),
-    ok = emqx_config:put([node, cluster_call, retry_interval], 1000),
-    meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
-    meck:expect(emqx_alarm, activate, 3, ok),
-    meck:expect(emqx_alarm, deactivate, 3, ok),
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            {emqx_conf,
+                "node.cluster_call {"
+                "\n  retry_interval = 1s"
+                "\n  max_history = 100"
+                "\n  cleanup_interval = 500ms"
+                "\n}"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
     meck:new(mria, [non_strict, passthrough, no_link]),
     meck:expect(mria, running_nodes, 0, [?NODE1, {node(), ?NODE2}, {node(), ?NODE3}]),
-    Config.
+    [{suite_apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([]),
-    meck:unload(emqx_alarm),
-    ok.
+end_per_suite(Config) ->
+    _ = meck:unload(),
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
 
 init_per_testcase(_TestCase, Config) ->
     stop(),
@@ -67,7 +72,6 @@ end_per_testcase(_Config) ->
     ok.
 
 t_base_test(_Config) ->
-    emqx_cluster_rpc:reset(),
     ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
     Pid = self(),
     MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
@@ -94,7 +98,6 @@ t_base_test(_Config) ->
     ok.
 
 t_commit_fail_test(_Config) ->
-    emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
     {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]},
     {init_failure, "MFA return not ok"} = multicall(M, F, A),
@@ -102,7 +105,6 @@ t_commit_fail_test(_Config) ->
     ok.
 
 t_commit_crash_test(_Config) ->
-    emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
     {M, F, A} = {?MODULE, no_exist_function, []},
     {init_failure, {error, Meta}} = multicall(M, F, A),
@@ -150,7 +152,6 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
     ok.
 
 t_commit_concurrency(_Config) ->
-    emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
     Pid = self(),
     {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
@@ -211,7 +212,6 @@ receive_seq_msg(Acc) ->
     end.
 
 t_catch_up_status_handle_next_commit(_Config) ->
-    emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
     {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
     {ok, 1, ok} = multicall(M, F, A, 1, 1000),
@@ -220,7 +220,6 @@ t_catch_up_status_handle_next_commit(_Config) ->
     ok.
 
 t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
-    emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
     ets:new(test, [named_table, public]),
     ets:insert(test, {other_mfa_result, failed}),
@@ -247,7 +246,6 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
     ok.
 
 t_del_stale_mfa(_Config) ->
-    emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
     MFA = {M, F, A} = {io, format, ["test"]},
     Keys = lists:seq(1, 50),
@@ -289,7 +287,6 @@ t_del_stale_mfa(_Config) ->
     ok.
 
 t_skip_failed_commit(_Config) ->
-    emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
     {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
     ct:sleep(180),
@@ -310,7 +307,6 @@ t_skip_failed_commit(_Config) ->
     ok.
 
 t_fast_forward_commit(_Config) ->
-    emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
     {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
     ct:sleep(180),
@@ -358,12 +354,10 @@ tnx_ids(Status) ->
     ).
 
 start() ->
-    {ok, Pid1} = emqx_cluster_rpc:start_link(),
-    {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
-    {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
-    {ok, Pid4} = emqx_cluster_rpc_cleaner:start_link(100, 500),
-    true = erlang:register(emqx_cluster_rpc_cleaner, Pid4),
-    {ok, [Pid1, Pid2, Pid3, Pid4]}.
+    {ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
+    {ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
+    ok = emqx_cluster_rpc:reset(),
+    ok.
 
 stop() ->
     [
@@ -376,7 +370,7 @@ stop() ->
                     erlang:exit(P, kill)
             end
         end
-     || N <- [?NODE1, ?NODE2, ?NODE3, emqx_cluster_rpc_cleaner]
+     || N <- [?NODE2, ?NODE3]
     ].
 
 receive_msg(0, _Msg) ->

+ 20 - 24
apps/emqx_conf/test/emqx_conf_app_SUITE.erl

@@ -74,10 +74,7 @@ t_copy_new_data_dir(Config) ->
         {[ok, ok, ok], []} = rpc:multicall(Nodes, ?MODULE, set_data_dir_env, []),
         ok = rpc:call(First, application, start, [emqx_conf]),
         {[ok, ok], []} = rpc:multicall(Rest, application, start, [emqx_conf]),
-
-        assert_data_copy_done(Nodes, File),
-        stop_cluster(Nodes),
-        ok
+        ok = assert_data_copy_done(Nodes, File)
     after
         stop_cluster(Nodes)
     end.
@@ -101,10 +98,7 @@ t_copy_deprecated_data_dir(Config) ->
         {[ok, ok, ok], []} = rpc:multicall(Nodes, ?MODULE, set_data_dir_env, []),
         ok = rpc:call(First, application, start, [emqx_conf]),
         {[ok, ok], []} = rpc:multicall(Rest, application, start, [emqx_conf]),
-
-        assert_data_copy_done(Nodes, File),
-        stop_cluster(Nodes),
-        ok
+        ok = assert_data_copy_done(Nodes, File)
     after
         stop_cluster(Nodes)
     end.
@@ -133,9 +127,7 @@ t_no_copy_from_newer_version_node(Config) ->
         ]),
         ok = rpc:call(First, application, start, [emqx_conf]),
         {[ok, ok], []} = rpc:multicall(Rest, application, start, [emqx_conf]),
-        ok = assert_no_cluster_conf_copied(Rest, File),
-        stop_cluster(Nodes),
-        ok
+        ok = assert_no_cluster_conf_copied(Rest, File)
     after
         stop_cluster(Nodes)
     end.
@@ -155,26 +147,30 @@ create_data_dir(File) ->
 
 set_data_dir_env() ->
     NodeDataDir = emqx:data_dir(),
-    NodeStr = atom_to_list(node()),
+    NodeConfigDir = filename:join(NodeDataDir, "configs"),
     %% will create certs and authz dir
-    ok = filelib:ensure_dir(NodeDataDir ++ "/configs/"),
-    {ok, [ConfigFile]} = application:get_env(emqx, config_files),
-    NewConfigFile = ConfigFile ++ "." ++ NodeStr,
-    ok = filelib:ensure_dir(NewConfigFile),
-    {ok, _} = file:copy(ConfigFile, NewConfigFile),
-    Bin = iolist_to_binary(io_lib:format("node.config_files = [~p]~n", [NewConfigFile])),
-    ok = file:write_file(NewConfigFile, Bin, [append]),
-    DataDir = iolist_to_binary(io_lib:format("node.data_dir = ~p~n", [NodeDataDir])),
-    ok = file:write_file(NewConfigFile, DataDir, [append]),
-    application:set_env(emqx, config_files, [NewConfigFile]),
+    ok = filelib:ensure_path(NodeConfigDir),
+    ConfigFile = filename:join(NodeConfigDir, "emqx.conf"),
+    ok = append_format(ConfigFile, "node.config_files = [~p]~n", [ConfigFile]),
+    ok = append_format(ConfigFile, "node.data_dir = ~p~n", [NodeDataDir]),
+    application:set_env(emqx, config_files, [ConfigFile]),
     %% application:set_env(emqx, data_dir, Node),
     %% We set env both cluster.hocon and cluster-override.conf, but only one will be used
-    application:set_env(emqx, cluster_hocon_file, NodeDataDir ++ "/configs/cluster.hocon"),
     application:set_env(
-        emqx, cluster_override_conf_file, NodeDataDir ++ "/configs/cluster-override.conf"
+        emqx,
+        cluster_hocon_file,
+        filename:join([NodeDataDir, "configs", "cluster.hocon"])
+    ),
+    application:set_env(
+        emqx,
+        cluster_override_conf_file,
+        filename:join([NodeDataDir, "configs", "cluster-override.conf"])
     ),
     ok.
 
+append_format(Filename, Fmt, Args) ->
+    ok = file:write_file(Filename, io_lib:format(Fmt, Args), [append]).
+
 assert_data_copy_done([_First | Rest], File) ->
     FirstDataDir = filename:dirname(filename:dirname(File)),
     {ok, FakeCertFile} = file:read_file(FirstDataDir ++ "/certs/fake-cert"),

+ 21 - 32
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -604,12 +604,9 @@ t_load_config_from_cli(Config) when is_list(Config) ->
     ok.
 
 group_t_copy_plugin_to_a_new_node({init, Config}) ->
-    WorkDir = proplists:get_value(install_dir, Config),
-    FromInstallDir = filename:join(WorkDir, atom_to_list(plugins_copy_from)),
-    file:del_dir_r(FromInstallDir),
+    FromInstallDir = filename:join(emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), from),
     ok = filelib:ensure_path(FromInstallDir),
-    ToInstallDir = filename:join(WorkDir, atom_to_list(plugins_copy_to)),
-    file:del_dir_r(ToInstallDir),
+    ToInstallDir = filename:join(emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), to),
     ok = filelib:ensure_path(ToInstallDir),
     #{package := Package, release_name := PluginName} = get_demo_plugin_package(FromInstallDir),
     Apps = [
@@ -697,8 +694,7 @@ group_t_copy_plugin_to_a_new_node(Config) ->
 
 %% checks that we can start a cluster with a lone node.
 group_t_copy_plugin_to_a_new_node_single_node({init, Config}) ->
-    WorkDir = ?config(install_dir, Config),
-    ToInstallDir = filename:join(WorkDir, "plugins_copy_to"),
+    ToInstallDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
     file:del_dir_r(ToInstallDir),
     ok = filelib:ensure_path(ToInstallDir),
     #{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir),
@@ -718,9 +714,7 @@ group_t_copy_plugin_to_a_new_node_single_node({init, Config}) ->
     ],
     [CopyToNode] = emqx_cth_cluster:start(
         [{plugins_copy_to, #{role => core, apps => Apps}}],
-        #{
-            work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
-        }
+        #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
     ),
     [
         {to_install_dir, ToInstallDir},
@@ -752,36 +746,31 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) ->
     ok.
 
 group_t_cluster_leave({init, Config}) ->
-    WorkDir = ?config(install_dir, Config),
-    ToInstallDir = filename:join(WorkDir, "plugins_copy_to"),
-    file:del_dir_r(ToInstallDir),
-    ok = filelib:ensure_path(ToInstallDir),
-    #{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir),
+    Specs = emqx_cth_cluster:mk_nodespecs(
+        [
+            {group_t_cluster_leave1, #{role => core, apps => [emqx, emqx_conf, emqx_ctl]}},
+            {group_t_cluster_leave2, #{role => core, apps => [emqx, emqx_conf, emqx_ctl]}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+    ),
+    Nodes = emqx_cth_cluster:start(Specs),
+    InstallRelDir = "plugins_copy_to",
+    InstallDirs = [filename:join(WD, InstallRelDir) || #{work_dir := WD} <- Specs],
+    ok = lists:foreach(fun filelib:ensure_path/1, InstallDirs),
+    #{package := Package, release_name := PluginName} = get_demo_plugin_package(hd(InstallDirs)),
     NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
-    Apps = [
-        emqx,
-        emqx_conf,
-        emqx_ctl,
-        {emqx_plugins, #{
+    [{ok, _}, {ok, _}] = erpc:multicall(Nodes, emqx_cth_suite, start_app, [
+        emqx_plugins,
+        #{
             config => #{
                 plugins => #{
-                    install_dir => ToInstallDir,
+                    install_dir => InstallRelDir,
                     states => [#{name_vsn => NameVsn, enable => true}]
                 }
             }
-        }}
-    ],
-    Nodes = emqx_cth_cluster:start(
-        [
-            {group_t_cluster_leave1, #{role => core, apps => Apps}},
-            {group_t_cluster_leave2, #{role => core, apps => Apps}}
-        ],
-        #{
-            work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
         }
-    ),
+    ]),
     [
-        {to_install_dir, ToInstallDir},
         {nodes, Nodes},
         {name_vsn, NameVsn},
         {plugin_name, PluginName}

+ 23 - 0
apps/emqx_utils/src/emqx_utils_fs.erl

@@ -20,6 +20,7 @@
 
 -export([traverse_dir/3]).
 -export([read_info/1]).
+-export([find_relpath/2]).
 -export([canonicalize/1]).
 
 -type fileinfo() :: #file_info{}.
@@ -62,6 +63,28 @@ traverse_dir(FoldFun, Acc, AbsPath, {error, Reason}) ->
 read_info(AbsPath) ->
     file:read_link_info(AbsPath, [{time, posix}, raw]).
 
+-spec find_relpath(file:name(), file:name()) ->
+    file:name().
+find_relpath(Path, RelativeTo) ->
+    case
+        filename:pathtype(Path) =:= filename:pathtype(RelativeTo) andalso
+            drop_path_prefix(filename:split(Path), filename:split(RelativeTo))
+    of
+        false ->
+            Path;
+        [] ->
+            ".";
+        RelativePath ->
+            filename:join(RelativePath)
+    end.
+
+drop_path_prefix([Name | T1], [Name | T2]) ->
+    drop_path_prefix(T1, T2);
+drop_path_prefix(Path, []) ->
+    Path;
+drop_path_prefix(_Path, _To) ->
+    false.
+
 %% @doc Canonicalize a file path.
 %% Removes stray slashes and converts to a string.
 -spec canonicalize(file:name()) ->

+ 38 - 0
apps/emqx_utils/test/emqx_utils_fs_SUITE.erl

@@ -143,6 +143,44 @@ t_canonicalize_non_utf8(_) ->
         emqx_utils_fs:canonicalize(<<128, 128, 128>>)
     ).
 
+%%
+
+t_find_relpath(_) ->
+    ?assertEqual(
+        "d1/1",
+        emqx_utils_fs:find_relpath("/usr/local/nonempty/d1/1", "/usr/local/nonempty")
+    ).
+
+t_find_relpath_same(_) ->
+    ?assertEqual(
+        ".",
+        emqx_utils_fs:find_relpath("/usr/local/bin", "/usr/local/bin/")
+    ),
+    ?assertEqual(
+        ".",
+        emqx_utils_fs:find_relpath("/usr/local/bin/.", "/usr/local/bin")
+    ).
+
+t_find_relpath_no_prefix(_) ->
+    ?assertEqual(
+        "/usr/lib/erlang/lib",
+        emqx_utils_fs:find_relpath("/usr/lib/erlang/lib", "/usr/local/bin")
+    ).
+
+t_find_relpath_both_relative(_) ->
+    ?assertEqual(
+        "1/2/3",
+        emqx_utils_fs:find_relpath("local/nonempty/1/2/3", "local/nonempty")
+    ).
+
+t_find_relpath_different_types(_) ->
+    ?assertEqual(
+        "local/nonempty/1/2/3",
+        emqx_utils_fs:find_relpath("local/nonempty/1/2/3", "/usr/local/nonempty")
+    ).
+
+%%
+
 chmod_file(File, Mode) ->
     {ok, FileInfo} = file:read_file_info(File),
     ok = file:write_file_info(File, FileInfo#file_info{mode = Mode}).

+ 1 - 1
mix.exs

@@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
       {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
       {:esockd, github: "emqx/esockd", tag: "5.11.1", override: true},
       {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true},
-      {:ekka, github: "emqx/ekka", tag: "0.18.4", override: true},
+      {:ekka, github: "emqx/ekka", tag: "0.19.0", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.15", override: true},

+ 1 - 1
rebar.config

@@ -83,7 +83,7 @@
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
     {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}},
-    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.18.4"}}},
+    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.0"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
     {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.15"}}},