Przeglądaj źródła

refactor: change '{recoverable_error,Reason}' to '{error,{recoverable_error,Reason}}'

Shawn 3 lat temu
rodzic
commit
0ef0b68de4

+ 19 - 6
apps/emqx_connector/src/emqx_connector_http.erl

@@ -46,7 +46,7 @@
     namespace/0
 ]).
 
--export([check_ssl_opts/2]).
+-export([check_ssl_opts/2, validate_method/1]).
 
 -type connect_timeout() :: emqx_schema:duration() | infinity.
 -type pool_type() :: random | hash.
@@ -137,8 +137,10 @@ fields(config) ->
 fields("request") ->
     [
         {method,
-            hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{
-                required => false, desc => ?DESC("method")
+            hoconsc:mk(binary(), #{
+                required => false,
+                desc => ?DESC("method"),
+                validator => fun ?MODULE:validate_method/1
             })},
         {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
         {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
@@ -171,6 +173,17 @@ desc(_) ->
 validations() ->
     [{check_ssl_opts, fun check_ssl_opts/1}].
 
+validate_method(M) when M =:= <<"post">>; M =:= <<"put">>; M =:= <<"get">>; M =:= <<"delete">> ->
+    ok;
+validate_method(M) ->
+    case string:find(M, "${") of
+        nomatch ->
+            {error,
+                <<"Invalid method, should be one of 'post', 'put', 'get', 'delete' or variables in ${field} format.">>};
+        _ ->
+            ok
+    end.
+
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
 ref(Field) -> hoconsc:ref(?MODULE, Field).
 
@@ -286,13 +299,13 @@ on_query(
             Retry
         )
     of
-        {error, econnrefused} ->
+        {error, Reason} when Reason =:= econnrefused; Reason =:= timeout ->
             ?SLOG(warning, #{
                 msg => "http_connector_do_request_failed",
-                reason => econnrefused,
+                reason => Reason,
                 connector => InstId
             }),
-            {recoverable_error, econnrefused};
+            {error, {recoverable_error, Reason}};
         {error, Reason} = Result ->
             ?SLOG(error, #{
                 msg => "http_connector_do_request_failed",

+ 1 - 1
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -420,7 +420,7 @@ on_sql_query(
                 error,
                 LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
             ),
-            {recoverable_error, Reason};
+            {error, {recoverable_error, Reason}};
         {error, Reason} ->
             ?SLOG(
                 error,

+ 2 - 2
apps/emqx_resource/include/emqx_resource.hrl

@@ -76,8 +76,8 @@
 -type query_result() ::
     ok
     | {ok, term()}
-    | {error, term()}
-    | {recoverable_error, term()}.
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
 
 -define(WORKER_POOL_SIZE, 16).
 

+ 4 - 4
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -396,7 +396,7 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
 handle_query_result(Id, {error, _}, BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
     BlockWorker;
-handle_query_result(Id, {recoverable_error, _}, _BlockWorker) ->
+handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
     true;
 handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
@@ -433,7 +433,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
     try
         %% if the callback module (connector) wants to return an error that
         %% makes the current resource goes into the `blocked` state, it should
-        %% return `{recoverable_error, Reason}`
+        %% return `{error, {recoverable_error, Reason}}`
         EXPR
     catch
         ERR:REASON:STACKTRACE ->
@@ -457,7 +457,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
     ?APPLY_RESOURCE(
         case inflight_is_full(Name, WinSize) of
             true ->
-                ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
+                ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
                 {async_return, inflight_full};
             false ->
                 ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
@@ -483,7 +483,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
     ?APPLY_RESOURCE(
         case inflight_is_full(Name, WinSize) of
             true ->
-                ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
+                ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
                 {async_return, inflight_full};
             false ->
                 ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -510,7 +510,7 @@ nested_put(Alias, Val, Columns0) ->
 -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
 inc_action_metrics(ok, RuleId) ->
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
-inc_action_metrics({recoverable_error, _}, RuleId) ->
+inc_action_metrics({error, {recoverable_error, _}}, RuleId) ->
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
 inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');