Przeglądaj źródła

fix(resource): don't crash on resource stopped

Shawn 4 lat temu
rodzic
commit
657ecef67b

+ 4 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -94,7 +94,10 @@ send_message(BridgeId, Message) ->
         not_found ->
             throw({bridge_not_found, BridgeId});
         #{enable := true} ->
-            emqx_resource:query(ResId, {send_message, Message});
+            case emqx_resource:query(ResId, {send_message, Message}) of
+                {error, {emqx_resource, Reason}} -> throw({bridge_not_ready, Reason});
+                Result -> Result
+            end;
         #{enable := false} ->
             throw({bridge_stopped, BridgeId})
     end.

+ 3 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -25,7 +25,7 @@
     mod := module(),
     config := resource_config(),
     state := resource_state(),
-    status := started | stopped,
+    status := started | stopped | starting,
     metrics := emqx_plugin_libs_metrics:metrics()
 }.
 -type resource_group() :: binary().
@@ -41,3 +41,5 @@
 %% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
 %% actions upon query failure
 -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.
+
+-define(TEST_ID_PREFIX, "_test_:").

+ 11 - 5
apps/emqx_resource/src/emqx_resource.erl

@@ -82,7 +82,6 @@
         ]).
 
 -define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}).
-
 -define(DEFAULT_RESOURCE_GROUP, <<"default">>).
 
 -optional_callbacks([ on_query/4
@@ -170,7 +169,7 @@ create_dry_run(ResourceType, Config) ->
 -spec create_dry_run_local(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
 create_dry_run_local(ResourceType, Config) ->
-    InstId = iolist_to_binary(emqx_misc:gen_id(16)),
+    InstId = emqx_resource_instance:make_test_id(),
     call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}).
 
 -spec recreate(instance_id(), resource_type(), resource_config(), term()) ->
@@ -201,14 +200,18 @@ query(InstId, Request) ->
 -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
 query(InstId, Request, AfterQuery) ->
     case get_instance(InstId) of
+        {ok, #{status := starting}} ->
+            query_error(starting, <<"cannot serve query when the resource "
+                "instance is still starting">>);
         {ok, #{status := stopped}} ->
-            error({resource_stopped, InstId});
+            query_error(stopped, <<"cannot serve query when the resource "
+                "instance is stopped">>);
         {ok, #{mod := Mod, state := ResourceState, status := started}} ->
             %% the resource state is readonly to Module:on_query/4
             %% and the `after_query()` functions should be thread safe
             Mod:on_query(InstId, Request, AfterQuery, ResourceState);
-        {error, Reason} ->
-            error({get_instance, {InstId, Reason}})
+        {error, not_found} ->
+            query_error(not_found, <<"the resource id not exists">>)
     end.
 
 -spec restart(instance_id()) -> ok | {error, Reason :: term()}.
@@ -368,3 +371,6 @@ cluster_call(Func, Args) ->
         {ok, _TxnId, Result} -> Result;
         Failed -> Failed
     end.
+
+query_error(Reason, Msg) ->
+    {error, {?MODULE, #{reason => Reason, msg => Msg}}}.

+ 11 - 3
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -26,6 +26,7 @@
 -export([ lookup/1
         , get_metrics/1
         , list_all/0
+        , make_test_id/0
         ]).
 
 -export([ hash_call/2
@@ -61,7 +62,7 @@ hash_call(InstId, Request) ->
 hash_call(InstId, Request, Timeout) ->
     gen_server:call(pick(InstId), Request, Timeout).
 
--spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}.
+-spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}.
 lookup(InstId) ->
     case ets:lookup(emqx_resource_instance, InstId) of
         [] -> {error, not_found};
@@ -69,6 +70,10 @@ lookup(InstId) ->
             {ok, Data#{id => InstId, metrics => get_metrics(InstId)}}
     end.
 
+make_test_id() ->
+    RandId = iolist_to_binary(emqx_misc:gen_id(16)),
+    <<?TEST_ID_PREFIX, RandId/binary>>.
+
 get_metrics(InstId) ->
     emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
 
@@ -146,7 +151,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
         {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
             Config = emqx_resource:call_config_merge(ResourceType, OldConfig,
                         NewConfig, Params),
-            TestInstId = iolist_to_binary(emqx_misc:gen_id(16)),
+            TestInstId = make_test_id(),
             case do_create_dry_run(TestInstId, ResourceType, Config) of
                 ok ->
                     do_remove(ResourceType, InstId, ResourceState, false),
@@ -166,7 +171,9 @@ do_create(InstId, ResourceType, Config, Opts) ->
         {ok, _} -> {ok, already_created};
         _ ->
             Res0 = #{id => InstId, mod => ResourceType, config => Config,
-                     status => stopped, state => undefined},
+                     status => starting, state => undefined},
+            %% The `emqx_resource:call_start/3` need the instance exist beforehand
+            ets:insert(emqx_resource_instance, {InstId, Res0}),
             case emqx_resource:call_start(InstId, ResourceType, Config) of
                 {ok, ResourceState} ->
                     ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
@@ -181,6 +188,7 @@ do_create(InstId, ResourceType, Config, Opts) ->
                     ets:insert(emqx_resource_instance, {InstId, Res0}),
                     {ok, Res0};
                 {error, Reason} when ForceCreate == false ->
+                    ets:delete(emqx_resource_instance, InstId),
                     {error, Reason}
             end
     end.

+ 1 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -142,7 +142,7 @@ t_stop_start(_) ->
 
     ?assertNot(is_process_alive(Pid0)),
 
-    ?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)),
+    ?assertMatch({emqx_resource, #{reason := stopped}}, emqx_resource:query(?ID, get_state)),
 
     ok = emqx_resource:restart(?ID),
 

+ 2 - 1
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -240,7 +240,8 @@ handle_output(RuleId, OutId, Selected, Envs) ->
     catch
         Err:Reason:ST ->
             ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId),
-            ?SLOG(error, #{msg => "output_failed",
+            Level = case Err of throw -> debug; _ -> error end,
+            ?SLOG(Level, #{msg => "output_failed",
                            output => OutId,
                            exception => Err,
                            reason => Reason,