Przeglądaj źródła

feat(syncer): allow to turn syncer pool on/off through config

Andrew Mayorov 2 lat temu
rodzic
commit
0b3f5f7c37

+ 12 - 2
apps/emqx/src/emqx_broker.erl

@@ -604,11 +604,21 @@ do_dispatch({shard, I}, Topic, Msg) ->
 %%
 
 maybe_add_route(_Existed = false, Topic, ReplyTo) ->
-    emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo});
+    add_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic, ReplyTo);
 maybe_add_route(_Existed = true, _Topic, _ReplyTo) ->
     ok.
 
+add_route(_BatchSync = true, Topic, ReplyTo) ->
+    emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo});
+add_route(_BatchSync = false, Topic, _ReplyTo) ->
+    emqx_router:do_add_route(Topic, node()).
+
 maybe_delete_route(_Exists = false, Topic) ->
-    emqx_router_syncer:push(delete, Topic, node(), #{});
+    delete_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic);
 maybe_delete_route(_Exists = true, _Topic) ->
     ok.
+
+delete_route(_BatchSync = true, Topic) ->
+    emqx_router_syncer:push(delete, Topic, node(), #{});
+delete_route(_BatchSync = false, Topic) ->
+    emqx_router:do_delete_route(Topic, node()).

+ 16 - 0
apps/emqx/src/emqx_schema.erl

@@ -1404,6 +1404,22 @@ fields("broker_routing") ->
                     'readOnly' => true,
                     desc => ?DESC(broker_routing_storage_schema)
                 }
+            )},
+        {"batch_sync",
+            sc(
+                ref("broker_routing_batch_sync"),
+                #{importance => ?IMPORTANCE_HIDDEN}
+            )}
+    ];
+fields("broker_routing_batch_sync") ->
+    [
+        {"enable",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    desc => ?DESC(broker_routing_batch_sync_enabled)
+                }
             )}
     ];
 fields("shared_subscription_group") ->

+ 1 - 0
apps/emqx/test/emqx_cth_suite.erl

@@ -72,6 +72,7 @@
 -export([stop_apps/1]).
 
 -export([merge_appspec/2]).
+-export([merge_config/2]).
 
 %% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs
 -export([schema_module/0, upgrade_raw_conf/1]).

+ 59 - 32
apps/emqx/test/emqx_routing_SUITE.erl

@@ -30,32 +30,52 @@ all() ->
         {group, routing_schema_v1},
         {group, routing_schema_v2},
         t_routing_schema_switch_v1,
-        t_routing_schema_switch_v2,
-        t_concurrent_routing_updates
+        t_routing_schema_switch_v2
     ].
 
 groups() ->
-    TCs = [
+    GroupVsn = [
+        {group, batch_sync_on},
+        {group, batch_sync_off}
+    ],
+    GroupBase = [
+        {group, cluster},
+        t_concurrent_routing_updates
+    ],
+    ClusterTCs = [
         t_cluster_routing,
         t_slow_rlog_routing_consistency
     ],
     [
-        {routing_schema_v1, [], TCs},
-        {routing_schema_v2, [], TCs}
+        {routing_schema_v1, [], GroupVsn},
+        {routing_schema_v2, [], GroupVsn},
+        {batch_sync_on, [], GroupBase},
+        {batch_sync_off, [], GroupBase},
+        {cluster, [], ClusterTCs}
     ].
 
-init_per_group(GroupName, Config) ->
-    WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
+init_per_group(routing_schema_v1, Config) ->
+    [{emqx_config, "broker.routing.storage_schema = v1"} | Config];
+init_per_group(routing_schema_v2, Config) ->
+    [{emqx_config, "broker.routing.storage_schema = v2"} | Config];
+init_per_group(batch_sync_on, Config) ->
+    [{emqx_config, "broker.routing.batch_sync.enable = true"} | Config];
+init_per_group(batch_sync_off, Config) ->
+    [{emqx_config, "broker.routing.batch_sync.enable = false"} | Config];
+init_per_group(cluster, Config) ->
+    WorkDir = emqx_cth_suite:work_dir(Config),
     NodeSpecs = [
-        {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(GroupName, 1)], role => core}},
-        {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(GroupName, 2)], role => core}},
-        {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(GroupName, 3)], role => replicant}}
+        {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}},
+        {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(2, Config)], role => core}},
+        {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}}
     ],
     Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
     [{cluster, Nodes} | Config].
 
-end_per_group(_GroupName, Config) ->
-    emqx_cth_cluster:stop(?config(cluster, Config)).
+end_per_group(cluster, Config) ->
+    emqx_cth_cluster:stop(?config(cluster, Config));
+end_per_group(_, _Config) ->
+    ok.
 
 init_per_testcase(TC, Config) ->
     emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config).
@@ -63,9 +83,9 @@ init_per_testcase(TC, Config) ->
 end_per_testcase(TC, Config) ->
     emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config).
 
-mk_emqx_appspec(GroupName, N) ->
+mk_emqx_appspec(N, Config) ->
     {emqx, #{
-        config => mk_config(GroupName, N),
+        config => mk_config(N, Config),
         after_start => fun() ->
             % NOTE
             % This one is actually defined on `emqx_conf_schema` level, but used
@@ -79,24 +99,28 @@ mk_genrpc_appspec() ->
         override_env => [{port_discovery, stateless}]
     }}.
 
-mk_config(GroupName, N) ->
-    #{
-        broker => mk_config_broker(GroupName),
-        listeners => mk_config_listeners(N)
-    }.
+mk_config(N, ConfigOrVsn) ->
+    emqx_cth_suite:merge_config(
+        mk_config_broker(ConfigOrVsn),
+        mk_config_listeners(N)
+    ).
 
-mk_config_broker(Vsn) when Vsn == routing_schema_v1; Vsn == v1 ->
-    #{routing => #{storage_schema => v1}};
-mk_config_broker(Vsn) when Vsn == routing_schema_v2; Vsn == v2 ->
-    #{routing => #{storage_schema => v2}}.
+mk_config_broker(v1) ->
+    "broker.routing.storage_schema = v1";
+mk_config_broker(v2) ->
+    "broker.routing.storage_schema = v2";
+mk_config_broker(CTConfig) ->
+    string:join(proplists:get_all_values(emqx_config, CTConfig), "\n").
 
 mk_config_listeners(N) ->
     Port = 1883 + N,
     #{
-        tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}},
-        ssl => #{default => #{enable => false}},
-        ws => #{default => #{enable => false}},
-        wss => #{default => #{enable => false}}
+        listeners => #{
+            tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}},
+            ssl => #{default => #{enable => false}},
+            ws => #{default => #{enable => false}},
+            wss => #{default => #{enable => false}}
+        }
     }.
 
 %%
@@ -202,12 +226,15 @@ t_concurrent_routing_updates(init, Config) ->
     Apps = emqx_cth_suite:start(
         [
             {emqx, #{
-                config => #{broker => #{routing => #{storage_schema => v2}}},
+                config => mk_config_broker(Config),
+                %% NOTE
+                %% Artificially increasing pool workers contention by forcing small pool size.
                 before_start => fun() ->
                     % NOTE
                     % This one is actually defined on `emqx_conf_schema` level, but used
                     % in `emqx_broker`. Thus we have to resort to this ugly hack.
-                    emqx_config:force_put([node, broker_pool_size], 2)
+                    emqx_config:force_put([node, broker_pool_size], 2),
+                    emqx_app:set_config_loader(?MODULE)
                 end
             }}
         ],
@@ -331,7 +358,7 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) ->
     [Node1] = emqx_cth_cluster:start(
         [
             {routing_schema_switch1, #{
-                apps => [mk_genrpc_appspec(), mk_emqx_appspec(VTo, 1)]
+                apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, VTo)]
             }}
         ],
         #{work_dir => WorkDir}
@@ -344,12 +371,12 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) ->
     [Node2, Node3] = emqx_cth_cluster:start(
         [
             {routing_schema_switch2, #{
-                apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 2)],
+                apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, VFrom)],
                 base_port => 20000,
                 join_to => Node1
             }},
             {routing_schema_switch3, #{
-                apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 3)],
+                apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, VFrom)],
                 base_port => 20100,
                 join_to => Node1
             }}

+ 4 - 0
rel/i18n/emqx_schema.hocon

@@ -1541,6 +1541,10 @@ Set <code>v1</code> to use the former schema.
 NOTE: Schema <code>v2</code> is still experimental.
 NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
 
+broker_routing_batch_sync_enable.desc:
+"""Use separate process pool to synchronize subscriptions with the global routing table in a batched manner.
+Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded."""
+
 broker_perf_trie_compaction.desc:
 """Enable trie path compaction.
 Enabling it significantly improves wildcard topic subscribe rate, if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', where ID is unique per subscriber.