Просмотр исходного кода

Merge pull request #7916 from emqx/EMQX-4204-auto-timer-based-retry-when-in-disconnected-state

feat: add auto_retry for disconnected state in resource manager
Xinyu Liu 3 лет назад
Родитель
Сommit
c4fd31ae25

+ 27 - 17
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -230,9 +230,6 @@ handle_event({call, From}, stop, _State, Data) ->
     Result = do_stop(Data),
     UpdatedData = Data#data{status = disconnected},
     {next_state, stopped, UpdatedData, [{reply, From, Result}]};
-% Nothing happens once the stopped state is entered. It is a 'holding' state waiting for external actions.
-handle_event(enter, _OldState, stopped, Data) ->
-    {next_state, stopped, Data};
 % Called when a resource is to be stopped and removed.
 handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
     handle_remove_event(From, ClearMetrics, Data);
@@ -244,15 +241,9 @@ handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
 handle_event({call, From}, health_check, disconnected, Data) ->
     Actions = [{reply, From, {error, Data#data.error}}],
     {keep_state_and_data, Actions};
-% Resource has been explicitly stopped, so return that as the error reason.
-handle_event({call, From}, health_check, stopped, _Data) ->
-    Actions = [{reply, From, {error, stopped}}],
-    {keep_state_and_data, Actions};
-handle_event({call, From}, health_check, _State, Data) ->
-    handle_health_check_event(From, Data);
 % Connecting state enter
 handle_event(enter, connecting, connecting, Data) ->
-    handle_connecting_state_enter_event(Data);
+    handle_connection_attempt(Data);
 handle_event(enter, _OldState, connecting, Data) ->
     Actions = [{state_timeout, 0, health_check}],
     {next_state, connecting, Data, Actions};
@@ -269,19 +260,38 @@ handle_event(enter, _OldState, connected, Data) ->
     {next_state, connected, Data, Actions};
 handle_event(state_timeout, health_check, connected, Data) ->
     perform_connected_health_check(Data);
-% Disconnected state entered when a healtcheck has failed.
-handle_event(enter, _OldState, disconnected, #data{id = InstId} = Data) ->
+handle_event(enter, _OldState, disconnected, Data) ->
+    handle_disconnected_state_enter(Data);
+handle_event(state_timeout, auto_retry, disconnected, Data) ->
+    handle_connection_attempt(Data);
+handle_event(enter, _OldState, stopped, Data) ->
     UpdatedData = Data#data{status = disconnected},
-    ets:delete(?ETS_TABLE, InstId),
-    {next_state, disconnected, UpdatedData}.
+    ets:delete(?ETS_TABLE, Data#data.id),
+    {next_state, stopped, UpdatedData};
+% Resource has been explicitly stopped, so return that as the error reason.
+handle_event({call, From}, health_check, stopped, _Data) ->
+    Actions = [{reply, From, {error, stopped}}],
+    {keep_state_and_data, Actions};
+handle_event({call, From}, health_check, _State, Data) ->
+    handle_health_check_event(From, Data).
 
 %%------------------------------------------------------------------------------
 %% internal functions
 %%------------------------------------------------------------------------------
 
-handle_connecting_state_enter_event(Data) ->
-    Result = emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config),
-    case Result of
+handle_disconnected_state_enter(Data) ->
+    UpdatedData = Data#data{status = disconnected},
+    ets:delete(?ETS_TABLE, Data#data.id),
+    case maps:get(auto_retry_interval, Data#data.config, undefined) of
+        undefined ->
+            {next_state, disconnected, UpdatedData};
+        RetryInterval ->
+            Actions = [{state_timeout, RetryInterval, auto_retry}],
+            {next_state, disconnected, UpdatedData, Actions}
+    end.
+
+handle_connection_attempt(Data) ->
+    case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of
         {ok, ResourceState} ->
             UpdatedData = Data#data{state = ResourceState, status = connecting},
             %% Perform an initial health_check immediately before transitioning into a connected state

+ 7 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -360,6 +360,13 @@ t_reset_metrics(_) ->
     ok = emqx_resource:remove(?ID),
     ?assertNot(is_process_alive(Pid)).
 
+t_auto_retry(_) ->
+    {Res, _} = emqx_resource:create_dry_run_local(
+        ?TEST_RESOURCE,
+        #{name => test_resource, create_error => true, auto_retry_interval => 1000}
+    ),
+    ?assertEqual(error, Res).
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------