Pārlūkot izejas kodu

Merge pull request #13104 from keynslug/fix/EMQX-12415/last-resort-poll

fix(dsrepl): trigger "last-resort" pending transitions handler when idle
Andrew Mayorov 1 gadu atpakaļ
vecāks
revīzija
b4c6968f8c

+ 1 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -54,7 +54,7 @@ init() ->
 
 -spec is_persistence_enabled() -> boolean().
 is_persistence_enabled() ->
-    persistent_term:get(?PERSISTENCE_ENABLED).
+    persistent_term:get(?PERSISTENCE_ENABLED, false).
 
 -spec is_persistence_enabled(emqx_types:zone()) -> boolean().
 is_persistence_enabled(Zone) ->

+ 7 - 3
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -79,11 +79,15 @@ start_shard({DB, Shard}) ->
 start_egress({DB, Shard}) ->
     supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
 
--spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok.
+-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, not_found}.
 stop_shard({DB, Shard}) ->
     Sup = ?via(#?shards_sup{db = DB}),
-    ok = supervisor:terminate_child(Sup, Shard),
-    ok = supervisor:delete_child(Sup, Shard).
+    case supervisor:terminate_child(Sup, Shard) of
+        ok ->
+            supervisor:delete_child(Sup, Shard);
+        {error, Reason} ->
+            {error, Reason}
+    end.
 
 -spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
 terminate_storage({DB, Shard}) ->

+ 8 - 5
apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl

@@ -41,6 +41,7 @@
 -define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}).
 
 -define(ALLOCATE_RETRY_TIMEOUT, 1_000).
+-define(TRIGGER_PENDING_TIMEOUT, 60_000).
 
 -define(TRANS_RETRY_TIMEOUT, 5_000).
 -define(CRASH_RETRY_DELAY, 20_000).
@@ -106,7 +107,7 @@ handle_call(_Call, _From, State) ->
 
 -spec handle_cast(_Cast, state()) -> {noreply, state()}.
 handle_cast(#trigger_transitions{}, State) ->
-    {noreply, handle_pending_transitions(State)};
+    {noreply, handle_pending_transitions(State), ?TRIGGER_PENDING_TIMEOUT};
 handle_cast(_Cast, State) ->
     {noreply, State}.
 
@@ -118,13 +119,15 @@ handle_cast(_Cast, State) ->
 handle_info({timeout, _TRef, allocate}, State) ->
     {noreply, handle_allocate_shards(State)};
 handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) ->
-    {noreply, handle_shard_changed(Shard, State)};
+    {noreply, handle_shard_changed(Shard, State), ?TRIGGER_PENDING_TIMEOUT};
 handle_info({changed, _}, State) ->
-    {noreply, State};
+    {noreply, State, ?TRIGGER_PENDING_TIMEOUT};
 handle_info({'EXIT', Pid, Reason}, State) ->
-    {noreply, handle_exit(Pid, Reason, State)};
+    {noreply, handle_exit(Pid, Reason, State), ?TRIGGER_PENDING_TIMEOUT};
+handle_info(timeout, State) ->
+    {noreply, handle_pending_transitions(State), ?TRIGGER_PENDING_TIMEOUT};
 handle_info(_Info, State) ->
-    {noreply, State}.
+    {noreply, State, ?TRIGGER_PENDING_TIMEOUT}.
 
 -spec terminate(_Reason, state()) -> _Ok.
 terminate(_Reason, State = #{db := DB, shards := Shards}) ->

+ 11 - 4
apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl

@@ -29,11 +29,18 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_mgmt_api_test_util:init_suite(),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard()
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
 
-end_per_suite(_) ->
-    emqx_mgmt_api_test_util:end_suite().
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
 
 init_per_testcase(Case, Config) ->
     ?MODULE:Case({init, Config}).

+ 0 - 1
apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl

@@ -33,7 +33,6 @@ init_per_suite(Config) ->
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
-    {ok, _Api} = emqx_common_test_http:create_default_app(),
     [{apps, Apps} | Config].
 
 end_per_suite(Config) ->

+ 11 - 4
apps/emqx_management/test/emqx_mgmt_api_status_SUITE.erl

@@ -51,11 +51,18 @@ groups() ->
     ].
 
 init_per_suite(Config) ->
-    emqx_mgmt_api_test_util:init_suite(),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard()
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
 
-end_per_suite(_) ->
-    emqx_mgmt_api_test_util:end_suite().
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
 
 init_per_group(api_status_endpoint, Config) ->
     [{get_status_path, ["api", "v5", "status"]} | Config];

+ 11 - 4
apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl

@@ -25,12 +25,19 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
+    Apps = emqx_cth_suite:start(
+        [
+            emqx_conf,
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard()
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
     ok = emqx_mgmt_cli:load(),
-    Config.
+    [{apps, Apps} | Config].
 
-end_per_suite(_) ->
-    emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
 
 init_per_testcase(t_autocluster_leave = TC, Config) ->
     [Core1, Core2, Repl1, Repl2] =