Просмотр исходного кода

fix(resource): fast return when starting a unavailable resource

Shawn 3 лет назад
Родитель
Сommit
88ca25c60c

+ 2 - 0
apps/emqx/include/http_api.hrl

@@ -49,6 +49,7 @@
 
 %% Internal error
 -define(INTERNAL_ERROR, 'INTERNAL_ERROR').
+-define(SERVICE_UNAVAILABLE, 'SERVICE_UNAVAILABLE').
 -define(SOURCE_ERROR, 'SOURCE_ERROR').
 -define(UPDATE_FAILED, 'UPDATE_FAILED').
 -define(REST_FAILED, 'REST_FAILED').
@@ -81,6 +82,7 @@
     {'TOPIC_NOT_FOUND', <<"Topic not found">>},
     {'USER_NOT_FOUND', <<"User not found">>},
     {'INTERNAL_ERROR', <<"Server inter error">>},
+    {'SERVICE_UNAVAILABLE', <<"Service unavailable">>},
     {'SOURCE_ERROR', <<"Source error">>},
     {'UPDATE_FAILED', <<"Update failed">>},
     {'REST_FAILED', <<"Reset source or config failed">>},

+ 22 - 5
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -321,7 +321,8 @@ schema("/bridges/:id") ->
             parameters => [param_path_id()],
             responses => #{
                 204 => <<"Bridge deleted">>,
-                400 => error_schema(['INVALID_ID'], "Update bridge failed")
+                400 => error_schema(['INVALID_ID'], "Update bridge failed"),
+                503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
             }
         }
     };
@@ -352,6 +353,7 @@ schema("/bridges/:id/operation/:operation") ->
             ],
             responses => #{
                 200 => <<"Operation success">>,
+                503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"),
                 400 => error_schema('INVALID_ID', "Bad bridge ID")
             }
         }
@@ -371,7 +373,8 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
             responses => #{
                 200 => <<"Operation success">>,
                 400 => error_schema('INVALID_ID', "Bad bridge ID"),
-                403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation")
+                403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"),
+                503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
             }
         }
     }.
@@ -417,6 +420,7 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
         Id,
         case emqx_bridge:remove(BridgeType, BridgeName) of
             {ok, _} -> {204};
+            {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
             {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)}
         end
     ).
@@ -466,6 +470,10 @@ lookup_from_local_node(BridgeType, BridgeName) ->
                         {200};
                     {error, {pre_config_update, _, bridge_not_found}} ->
                         {404, error_msg('NOT_FOUND', <<"bridge not found">>)};
+                    {error, {_, _, timeout}} ->
+                        {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
+                    {error, timeout} ->
+                        {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
                     {error, Reason} ->
                         {500, error_msg('INTERNAL_ERROR', Reason)}
                 end;
@@ -489,11 +497,18 @@ lookup_from_local_node(BridgeType, BridgeName) ->
                 ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]),
                 case maps:get(enable, ConfMap, false) of
                     false ->
-                        {403, error_msg('FORBIDDEN_REQUEST', <<"forbidden operation">>)};
+                        {403,
+                            error_msg(
+                                'FORBIDDEN_REQUEST', <<"forbidden operation: bridge disabled">>
+                            )};
                     true ->
                         case emqx_bridge_proto_v1:OperFunc(TargetNode, BridgeType, BridgeName) of
-                            ok -> {200};
-                            {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)}
+                            ok ->
+                                {200};
+                            {error, timeout} ->
+                                {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
+                            {error, Reason} ->
+                                {500, error_msg('INTERNAL_ERROR', Reason)}
                         end
                 end
         end
@@ -518,6 +533,8 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
     case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of
         {ok, _} ->
             {200};
+        {error, [timeout | _]} ->
+            {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
         {error, ErrL} ->
             {500, error_msg('INTERNAL_ERROR', ErrL)}
     end.

+ 5 - 2
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -133,8 +133,11 @@ update(Type, Name, {OldConf, Conf}) ->
             %% we don't need to recreate the bridge if this config change is only to
             %% toggole the config 'bridge.{type}.{name}.enable'
             case maps:get(enable, Conf, true) of
-                true -> restart(Type, Name);
-                false -> stop(Type, Name)
+                true ->
+                    _ = restart(Type, Name),
+                    ok;
+                false ->
+                    stop(Type, Name)
             end
     end.
 

+ 4 - 1
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -378,7 +378,10 @@ t_enable_disable_bridges(_) ->
     {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
 
     {ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>),
-    ?assertEqual(<<"{\"code\":\"FORBIDDEN_REQUEST\",\"message\":\"forbidden operation\"}">>, Res),
+    ?assertEqual(
+        <<"{\"code\":\"FORBIDDEN_REQUEST\",\"message\":\"forbidden operation: bridge disabled\"}">>,
+        Res
+    ),
 
     %% enable a stopped bridge
     {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),

+ 1 - 0
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl

@@ -573,6 +573,7 @@ obfuscate(Map) ->
     ).
 
 is_sensitive(password) -> true;
+is_sensitive(ssl_opts) -> true;
 is_sensitive(_) -> false.
 
 str(A) when is_atom(A) ->

+ 3 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -260,8 +260,10 @@ query(InstId, Request, AfterQuery) ->
                     emqx_metrics_worker:inc(resource_metrics, InstId, exception),
                     erlang:raise(Err, Reason, ST)
             end;
+        {ok, _Group, _Data} ->
+            query_error(not_found, <<"resource not connected">>);
         {error, not_found} ->
-            query_error(not_found, <<"resource not found or not connected">>)
+            query_error(not_found, <<"resource not found">>)
     end.
 
 -spec restart(instance_id()) -> ok | {error, Reason :: term()}.

+ 109 - 69
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -23,19 +23,24 @@
 % API
 -export([
     ensure_resource/5,
+    create/5,
+    recreate/4,
+    remove/1,
     create_dry_run/2,
-    ets_lookup/1,
-    get_metrics/1,
-    health_check/1,
+    restart/2,
+    start/2,
+    stop/1,
+    health_check/1
+]).
+
+-export([
+    lookup/1,
     list_all/0,
     list_group/1,
-    lookup/1,
-    recreate/4,
-    remove/1,
+    ets_lookup/1,
+    get_metrics/1,
     reset_metrics/1,
-    restart/2,
-    set_resource_status_connecting/1,
-    stop/1
+    set_resource_status_connecting/1
 ]).
 
 % Server
@@ -51,6 +56,8 @@
 -define(HEALTHCHECK_INTERVAL, 15000).
 -define(ETS_TABLE, emqx_resource_manager).
 -define(WAIT_FOR_RESOURCE_DELAY, 100).
+-define(T_OPERATION, 5000).
+-define(T_LOOKUP, 1000).
 
 -define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected).
 
@@ -74,11 +81,24 @@ ensure_resource(InstId, Group, ResourceType, Config, Opts) ->
         {ok, _Group, Data} ->
             {ok, Data};
         {error, not_found} ->
-            do_start(InstId, Group, ResourceType, Config, Opts),
+            create(InstId, Group, ResourceType, Config, Opts),
             {ok, _Group, Data} = lookup(InstId),
             {ok, Data}
     end.
 
+%% @doc Create a resource_manager and wait until it is running
+create(InstId, Group, ResourceType, Config, Opts) ->
+    % The state machine will make the actual call to the callback/resource module after init
+    ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts),
+    ok = emqx_metrics_worker:create_metrics(
+        resource_metrics,
+        InstId,
+        [matched, success, failed, exception],
+        [matched]
+    ),
+    wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
+    ok.
+
 %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
 %%
 %% Triggers the `emqx_resource_manager_sup` supervisor to actually create
@@ -90,9 +110,9 @@ create_dry_run(ResourceType, Config) ->
     ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}),
     case wait_for_resource_ready(InstId, 5000) of
         ok ->
-            _ = stop(InstId);
+            _ = remove(InstId);
         timeout ->
-            _ = stop(InstId),
+            _ = remove(InstId),
             {error, timeout}
     end.
 
@@ -118,27 +138,36 @@ remove(InstId) when is_binary(InstId) ->
 %% @doc Stops a running resource_manager and optionally clears the metrics for the resource
 -spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}.
 remove(InstId, ClearMetrics) when is_binary(InstId) ->
-    safe_call(InstId, {remove, ClearMetrics}).
+    safe_call(InstId, {remove, ClearMetrics}, ?T_OPERATION).
 
 %% @doc Stops and then starts an instance that was already running
 -spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
 restart(InstId, Opts) when is_binary(InstId) ->
-    case lookup(InstId) of
-        {ok, Group, #{mod := ResourceType, config := Config} = _Data} ->
-            _ = remove(InstId, false),
-            do_start(InstId, Group, ResourceType, Config, Opts);
-        Error ->
+    case safe_call(InstId, restart, ?T_OPERATION) of
+        ok ->
+            wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
+            ok;
+        {error, _Reason} = Error ->
+            Error
+    end.
+
+%% @doc Stop the resource
+-spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
+start(InstId, Opts) ->
+    case safe_call(InstId, start, ?T_OPERATION) of
+        ok ->
+            wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
+            ok;
+        {error, _Reason} = Error ->
             Error
     end.
 
-%% @doc Stop the resource manager process
+%% @doc Start the resource
 -spec stop(instance_id()) -> ok | {error, Reason :: term()}.
 stop(InstId) ->
-    case safe_call(InstId, stop) of
+    case safe_call(InstId, stop, ?T_OPERATION) of
         ok ->
             ok;
-        {error, not_found} ->
-            ok;
         {error, _Reason} = Error ->
             Error
     end.
@@ -146,12 +175,15 @@ stop(InstId) ->
 %% @doc Test helper
 -spec set_resource_status_connecting(instance_id()) -> ok.
 set_resource_status_connecting(InstId) ->
-    safe_call(InstId, set_resource_status_connecting).
+    safe_call(InstId, set_resource_status_connecting, infinity).
 
 %% @doc Lookup the group and data of a resource
 -spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
 lookup(InstId) ->
-    safe_call(InstId, lookup).
+    case safe_call(InstId, lookup, ?T_LOOKUP) of
+        {error, timeout} -> ets_lookup(InstId);
+        Result -> Result
+    end.
 
 %% @doc Lookup the group and data of a resource
 -spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
@@ -192,7 +224,7 @@ list_group(Group) ->
 
 -spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}.
 health_check(InstId) ->
-    safe_call(InstId, health_check).
+    safe_call(InstId, health_check, ?T_OPERATION).
 
 %% Server start/stop callbacks
 
@@ -204,13 +236,16 @@ start_link(InstId, Group, ResourceType, Config, Opts) ->
         mod = ResourceType,
         config = Config,
         opts = Opts,
-        status = undefined,
+        status = connecting,
         state = undefined,
         error = undefined
     },
     gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []).
 
 init(Data) ->
+    process_flag(trap_exit, true),
+    %% init the cache so that lookup/1 will always return something
+    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
     {ok, connecting, Data, {next_event, internal, try_connect}}.
 
 terminate(_Reason, _State, Data) ->
@@ -227,11 +262,20 @@ callback_mode() -> [handle_event_function, state_enter].
 % Called during testing to force a specific state
 handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
     {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]};
+% Called when the resource is to be restarted
+handle_event({call, From}, restart, _State, Data) ->
+    _ = stop_resource(Data),
+    start_resource(Data, From);
+% Called when the resource is to be started
+handle_event({call, From}, start, _State, #data{status = disconnected} = Data) ->
+    start_resource(Data, From);
+handle_event({call, From}, start, _State, _Data) ->
+    {keep_state_and_data, [{reply, From, ok}]};
 % Called when the resource is to be stopped
 handle_event({call, From}, stop, _State, #data{status = disconnected} = Data) ->
     {next_state, stopped, Data, [{reply, From, ok}]};
 handle_event({call, From}, stop, _State, Data) ->
-    Result = do_stop(Data),
+    Result = stop_resource(Data),
     UpdatedData = Data#data{status = disconnected},
     {next_state, stopped, UpdatedData, [{reply, From, Result}]};
 % Called when a resource is to be stopped and removed.
@@ -243,9 +287,9 @@ handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
     {keep_state_and_data, [{reply, From, Reply}]};
 % Connecting state enter
 handle_event(internal, try_connect, connecting, Data) ->
-    handle_connection_attempt(Data);
+    start_resource(Data, undefined);
 handle_event(enter, _OldState, connecting, Data) ->
-    ets:delete(?ETS_TABLE, Data#data.id),
+    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
     Actions = [{state_timeout, 0, health_check}],
     {next_state, connecting, Data, Actions};
 % Connecting state health_check timeouts.
@@ -261,15 +305,16 @@ handle_event(enter, _OldState, connected, Data) ->
 handle_event(state_timeout, health_check, connected, Data) ->
     perform_connected_health_check(Data);
 handle_event(enter, _OldState, disconnected, Data) ->
+    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
     handle_disconnected_state_enter(Data);
 handle_event(state_timeout, auto_retry, disconnected, Data) ->
-    handle_connection_attempt(Data);
+    start_resource(Data, undefined);
 handle_event(enter, _OldState, stopped, Data) ->
     UpdatedData = Data#data{status = disconnected},
-    ets:delete(?ETS_TABLE, Data#data.id),
+    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}),
     {next_state, stopped, UpdatedData};
 % Resource has been explicitly stopped, so return that as the error reason.
-handle_event({call, From}, _, stopped, _Data) ->
+handle_event({call, From}, health_check, stopped, _Data) ->
     Actions = [{reply, From, {error, resource_is_stopped}}],
     {keep_state_and_data, Actions};
 handle_event({call, From}, health_check, _State, Data) ->
@@ -293,56 +338,43 @@ handle_event(EventType, EventData, State, Data) ->
 %%------------------------------------------------------------------------------
 
 handle_disconnected_state_enter(Data) ->
-    UpdatedData = Data#data{status = disconnected},
-    ets:delete(?ETS_TABLE, Data#data.id),
     case maps:get(auto_retry_interval, Data#data.opts, undefined) of
         undefined ->
-            {next_state, disconnected, UpdatedData};
+            {next_state, disconnected, Data};
         RetryInterval ->
             Actions = [{state_timeout, RetryInterval, auto_retry}],
-            {next_state, disconnected, UpdatedData, Actions}
+            {next_state, disconnected, Data, Actions}
     end.
 
-handle_connection_attempt(Data) ->
+handle_remove_event(From, ClearMetrics, Data) ->
+    stop_resource(Data),
+    case ClearMetrics of
+        true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id);
+        false -> ok
+    end,
+    {stop_and_reply, normal, [{reply, From, ok}]}.
+
+start_resource(Data, From) ->
+    %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
+    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
     case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of
         {ok, ResourceState} ->
             UpdatedData = Data#data{state = ResourceState, status = connecting},
             %% Perform an initial health_check immediately before transitioning into a connected state
-            Actions = [{state_timeout, 0, health_check}],
+            Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
             {next_state, connecting, UpdatedData, Actions};
-        {error, Reason} ->
+        {error, Reason} = Err ->
             %% Keep track of the error reason why the connection did not work
             %% so that the Reason can be returned when the verification call is made.
             UpdatedData = Data#data{status = disconnected, error = Reason},
-            {next_state, disconnected, UpdatedData}
+            Actions = maybe_reply([], From, Err),
+            {next_state, disconnected, UpdatedData, Actions}
     end.
 
-handle_remove_event(From, ClearMetrics, Data) ->
-    do_stop(Data),
-    ets:delete(?ETS_TABLE, Data#data.id),
-    case ClearMetrics of
-        true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id);
-        false -> ok
-    end,
-    {stop_and_reply, normal, [{reply, From, ok}]}.
-
-do_start(InstId, Group, ResourceType, Config, Opts) ->
-    % The state machine will make the actual call to the callback/resource module after init
-    ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts),
-    ok = emqx_metrics_worker:create_metrics(
-        resource_metrics,
-        InstId,
-        [matched, success, failed, exception],
-        [matched]
-    ),
-    wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
-    ok.
-
-do_stop(#data{state = undefined} = _Data) ->
+stop_resource(#data{state = undefined} = _Data) ->
     ok;
-do_stop(Data) ->
+stop_resource(Data) ->
     Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state),
-    ets:delete(?ETS_TABLE, Data#data.id),
     _ = maybe_clear_alarm(Data#data.id),
     Result.
 
@@ -388,10 +420,11 @@ with_health_check(Data, Func) ->
     ResId = Data#data.id,
     HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state),
     {Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state),
-    _ = maybe_alarm_resource_down(connected, ResId),
+    _ = maybe_alarm_resource_down(Status, ResId),
     UpdatedData = Data#data{
         state = NewState, status = Status, error = Err
     },
+    ets:insert(?ETS_TABLE, {ResId, UpdatedData#data.group, UpdatedData}),
     Func(Status, UpdatedData).
 
 maybe_alarm_resource_down(connected, _ResId) ->
@@ -417,6 +450,11 @@ parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status)
 parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) ->
     {Status, NewState, Error}.
 
+maybe_reply(Actions, undefined, _Reply) ->
+    Actions;
+maybe_reply(Actions, From, Reply) ->
+    [{reply, From, Reply} | Actions].
+
 data_record_to_external_map_with_metrics(Data) ->
     #{
         id => Data#data.id,
@@ -446,10 +484,12 @@ do_wait_for_resource_ready(InstId, Retry) ->
             do_wait_for_resource_ready(InstId, Retry - 1)
     end.
 
-safe_call(InstId, Message) ->
+safe_call(InstId, Message, Timeout) ->
     try
-        gen_statem:call(proc_name(InstId), Message)
+        gen_statem:call(proc_name(InstId), Message, {clean_timeout, Timeout})
     catch
-        exit:_ ->
-            {error, not_found}
+        exit:{R, _} when R == noproc; R == normal; R == shutdown ->
+            {error, not_found};
+        exit:{timeout, _} ->
+            {error, timeout}
     end.

+ 4 - 5
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -21,6 +21,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include("emqx_resource.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
 
 -define(TEST_RESOURCE, emqx_test_resource).
 -define(ID, <<"id">>).
@@ -183,7 +184,6 @@ t_healthy(_) ->
     emqx_resource:set_resource_status_connecting(?ID),
 
     {ok, connected} = emqx_resource:health_check(?ID),
-
     ?assertMatch(
         [#{status := connected}],
         emqx_resource:list_instances_verbose()
@@ -194,7 +194,7 @@ t_healthy(_) ->
     ?assertEqual({ok, connecting}, emqx_resource:health_check(?ID)),
 
     ?assertMatch(
-        [],
+        [#{status := connecting}],
         emqx_resource:list_instances_verbose()
     ),
 
@@ -236,7 +236,6 @@ t_stop_start(_) ->
     ),
 
     ok = emqx_resource:restart(?ID),
-
     timer:sleep(300),
 
     #{pid := Pid1} = emqx_resource:query(?ID, get_state),
@@ -334,11 +333,11 @@ t_create_dry_run_local_failed(_) ->
     ),
     ?assertEqual(error, Res2),
 
-    {Res3, _} = emqx_resource:create_dry_run_local(
+    Res3 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
         #{name => test_resource, stop_error => true}
     ),
-    ?assertEqual(error, Res3).
+    ?assertEqual(ok, Res3).
 
 t_test_func(_) ->
     ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),