Jelajahi Sumber

Merge pull request #7463 from thalesmg/mria-shard-transport

feat: allow customizing per-shard transaction log transport
Thales Macedo Garitezi 3 tahun lalu
induk
melakukan
59d5478d4e

+ 2 - 2
apps/emqx_conf/etc/emqx_conf.conf

@@ -319,8 +319,8 @@ db {
   ##
   ## @doc db.backend
   ## ValueType: mnesia | rlog
-  ## Default: mnesia
-  backend = mnesia
+  ## Default: rlog
+  backend = rlog
 
   ## RLOG role
   ##

+ 22 - 0
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -475,6 +475,28 @@ In sync mode the core node waits for an ack from the replicant nodes before send
 transaction log entry.
 """
            })}
+    , {"default_shard_transport",
+       sc(hoconsc:enum([gen_rpc, distr]),
+          #{ mapping => "mria.shard_transport"
+           , default => gen_rpc
+           , desc =>
+               "Defines the default transport for pushing transaction logs.<br/>"
+               "This may be overridden on a per-shard basis in <code>db.shard_transports</code>."
+               "<code>gen_rpc</code> uses the <code>gen_rpc</code> library, "
+               "<code>distr</code> uses the Erlang distribution.<br/>"
+           })}
+    , {"shard_transports",
+       sc(map(shard, hoconsc:enum([gen_rpc, distr])),
+          #{ desc =>
+               "Allows to tune the transport method used for transaction log replication, "
+               "on a per-shard basis.<br/>"
+               "<code>gen_rpc</code> uses the <code>gen_rpc</code> library, "
+               "<code>distr</code> uses the Erlang distribution.<br/>"
+               "If not specified, the default is to use the value "
+               "set in <code>db.default_shard_transport</code>."
+           , mapping => "emqx_machine.custom_shard_transports"
+           , default => #{}
+           })}
     ];
 
 fields("cluster_call") ->

+ 12 - 2
apps/emqx_machine/src/emqx_machine.erl

@@ -30,12 +30,13 @@
 start() ->
     case os:type() of
         {win32, nt} -> ok;
-        _nix ->
+        _Nix ->
             os:set_signal(sighup, ignore),
             os:set_signal(sigterm, handle) %% default is handle
     end,
     ok = set_backtrace_depth(),
     start_sysmon(),
+    configure_shard_transports(),
     ekka:start(),
     ok = print_otp_version_warning().
 
@@ -64,7 +65,7 @@ start_sysmon() ->
     application:set_env(system_monitor, node_status_fun, {?MODULE, node_status}),
     application:set_env(system_monitor, status_checks, [{?MODULE, update_vips, false, 10}]),
     case application:get_env(system_monitor, db_hostname) of
-        {ok, [_|_]}  ->
+        {ok, [_ | _]}  ->
             application:set_env(system_monitor, callback_mod, system_monitor_pg),
             _ = application:ensure_all_started(system_monitor, temporary),
             ok;
@@ -81,3 +82,12 @@ node_status() ->
 
 update_vips() ->
     system_monitor:add_vip(mria_status:shards_up()).
+
+configure_shard_transports() ->
+    ShardTransports = application:get_env(emqx_machine, custom_shard_transports, #{}),
+    maps:foreach(
+      fun(ShardBin, Transport) ->
+              ShardName = binary_to_existing_atom(ShardBin),
+              mria_config:set_shard_transport(ShardName, Transport)
+      end,
+      ShardTransports).

+ 32 - 1
apps/emqx_machine/test/emqx_machine_SUITE.erl

@@ -34,7 +34,7 @@ init_per_suite(Config) ->
     %%   emqx_machine_SUITE.erl
     %%
     %% Reason:
-    %%   the `emqx_machine_boot:ensure_apps_started()` will crashed
+    %%   the `emqx_machine_boot:ensure_apps_started()` will crash
     %%   on starting `emqx_authz` with dirty confs, which caused the file
     %%   `.._build/test/lib/emqx_conf/etc/acl.conf` could not be found
     %%
@@ -65,6 +65,25 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_common_test_helpers:stop_apps([]).
 
+init_per_testcase(t_custom_shard_transports, Config) ->
+    OldConfig = application:get_env(emqx_machine, custom_shard_transports),
+    [{old_config, OldConfig} | Config];
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(t_custom_shard_transports, Config) ->
+    OldConfig0 = ?config(old_config, Config),
+    application:stop(ekka),
+    case OldConfig0 of
+        {ok, OldConfig} ->
+            application:set_env(emqx_machine, custom_shard_transports, OldConfig);
+        undefined ->
+            application:unset_env(emqx_machine, custom_shard_transports)
+    end,
+    ok;
+end_per_testcase(_TestCase, _Config) ->
+    ok.
+
 t_shutdown_reboot(_Config) ->
     emqx_machine_boot:stop_apps(),
     false = emqx:is_running(node()),
@@ -72,3 +91,15 @@ t_shutdown_reboot(_Config) ->
     true = emqx:is_running(node()),
     ok = emqx_machine_boot:stop_apps(),
     false = emqx:is_running(node()).
+
+t_custom_shard_transports(_Config) ->
+    %% used to ensure the atom exists
+    Shard = test_shard,
+    %% the config keys are binaries
+    ShardBin = atom_to_binary(Shard),
+    DefaultTransport = gen_rpc,
+    ?assertEqual(DefaultTransport, mria_config:shard_transport(Shard)),
+    application:set_env(emqx_machine, custom_shard_transports, #{ShardBin => distr}),
+    emqx_machine:start(),
+    ?assertEqual(distr, mria_config:shard_transport(Shard)),
+    ok.