Просмотр исходного кода

fix: mark the async msg 'queuing' not 'sent.inflight' on recoverable_error

Shawn 3 лет назад
Родитель
Сommit
26234d38b9

+ 9 - 9
apps/emqx_connector/src/emqx_connector_http.erl

@@ -382,15 +382,6 @@ on_query_async(
         {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
         {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
     ).
     ).
 
 
-reply_delegator(ReplyFunAndArgs, Result) ->
-    case Result of
-        {error, Reason} when Reason =:= econnrefused; Reason =:= timeout ->
-            Result1 = {error, {recoverable_error, Reason}},
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
-        _ ->
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
-    end.
-
 on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
 on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
     case do_get_status(PoolName, Timeout) of
     case do_get_status(PoolName, Timeout) of
         true ->
         true ->
@@ -544,3 +535,12 @@ bin(Str) when is_list(Str) ->
     list_to_binary(Str);
     list_to_binary(Str);
 bin(Atom) when is_atom(Atom) ->
 bin(Atom) when is_atom(Atom) ->
     atom_to_binary(Atom, utf8).
     atom_to_binary(Atom, utf8).
+
+reply_delegator(ReplyFunAndArgs, Result) ->
+    case Result of
+        {error, Reason} when Reason =:= econnrefused; Reason =:= timeout ->
+            Result1 = {error, {recoverable_error, Reason}},
+            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
+        _ ->
+            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
+    end.

+ 30 - 26
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -154,12 +154,12 @@ running(enter, _, _St) ->
 running(cast, resume, _St) ->
 running(cast, resume, _St) ->
     keep_state_and_data;
     keep_state_and_data;
 running(cast, block, St) ->
 running(cast, block, St) ->
-    {next_state, block, St};
+    {next_state, blocked, St};
 running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
 running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
     is_list(Batch)
     is_list(Batch)
 ->
 ->
     Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
     Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
-    {next_state, block, St#{queue := Q1}};
+    {next_state, blocked, St#{queue := Q1}};
 running({call, From}, {query, Request, _Opts}, St) ->
 running({call, From}, {query, Request, _Opts}, St) ->
     query_or_acc(From, Request, St);
     query_or_acc(From, Request, St);
 running(cast, {query, Request, Opts}, St) ->
 running(cast, {query, Request, Opts}, St) ->
@@ -366,7 +366,7 @@ reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) ->
 reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) ->
 reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) ->
     _ =
     _ =
         case Result of
         case Result of
-            {async_return, _} -> ok;
+            {async_return, _} -> no_reply_for_now;
             _ -> apply(ReplyFun, Args ++ [Result])
             _ -> apply(ReplyFun, Args ++ [Result])
         end,
         end,
     handle_query_result(Id, Result, BlockWorker);
     handle_query_result(Id, Result, BlockWorker);
@@ -395,6 +395,9 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
     emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
     BlockWorker;
     BlockWorker;
 handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) ->
 handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) ->
+    %% the message will be queued in replayq or inflight window,
+    %% i.e. the counter 'queuing' will increase, so we pretend that we have not
+    %% sent this message.
     emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
     emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
     true;
     true;
 handle_query_result(Id, {error, _}, BlockWorker) ->
 handle_query_result(Id, {error, _}, BlockWorker) ->
@@ -450,15 +453,10 @@ call_query(QM0, Id, Query, QueryOpts) ->
 apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
 apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
     ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
     ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
-    ?APPLY_RESOURCE(
-        begin
-            ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
-            Result = Mod:on_query(Id, Request, ResSt),
-            ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
-            Result
-        end,
-        Request
-    );
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
+    Result = ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
+    Result;
 apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
 apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
     ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
     ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
     Name = maps:get(inflight_name, QueryOpts, undefined),
     Name = maps:get(inflight_name, QueryOpts, undefined),
@@ -483,16 +481,12 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
 apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
 apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
     ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     Requests = [Request || ?QUERY(_From, Request) <- Batch],
     Requests = [Request || ?QUERY(_From, Request) <- Batch],
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),
-    ?APPLY_RESOURCE(
-        begin
-            ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
-            Result = Mod:on_batch_query(Id, Requests, ResSt),
-            ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
-            Result
-        end,
-        Batch
-    );
+    BatchLen = length(Batch),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen),
+    Result = ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen),
+    Result;
 apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
 apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
     ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     Name = maps:get(inflight_name, QueryOpts, undefined),
     Name = maps:get(inflight_name, QueryOpts, undefined),
@@ -503,8 +497,9 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
                 ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
                 ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
                 {async_return, inflight_full};
                 {async_return, inflight_full};
             false ->
             false ->
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
+                BatchLen = length(Batch),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen),
                 ReplyFun = fun ?MODULE:batch_reply_after_query/6,
                 ReplyFun = fun ?MODULE:batch_reply_after_query/6,
                 Ref = make_message_ref(),
                 Ref = make_message_ref(),
                 Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
                 Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
@@ -517,20 +512,29 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
     ).
     ).
 
 
 reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
 reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
+    %% NOTE: 'sent.inflight' is message count that sent but no ACK received,
+    %%        NOT the message number ququed in the inflight window.
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
     case reply_caller(Id, ?REPLY(From, Request, Result)) of
     case reply_caller(Id, ?REPLY(From, Request, Result)) of
         true ->
         true ->
+            %% we marked these messages are 'queuing' although they are in inflight window
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
             ?MODULE:block(Pid);
             ?MODULE:block(Pid);
         false ->
         false ->
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
             inflight_drop(Name, Ref)
             inflight_drop(Name, Ref)
     end.
     end.
 
 
 batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
 batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
+    %% NOTE: 'sent.inflight' is message count that sent but no ACK received,
+    %%        NOT the message number ququed in the inflight window.
+    BatchLen = length(Batch),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen),
     case batch_reply_caller(Id, Result, Batch) of
     case batch_reply_caller(Id, Result, Batch) of
         true ->
         true ->
+            %% we marked these messages are 'queuing' although they are in inflight window
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen),
             ?MODULE:block(Pid);
             ?MODULE:block(Pid);
         false ->
         false ->
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -length(Batch)),
             inflight_drop(Name, Ref)
             inflight_drop(Name, Ref)
     end.
     end.
 %%==============================================================================
 %%==============================================================================

+ 1 - 1
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -96,7 +96,7 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
     Pid ! {From, {inc, N}},
     Pid ! {From, {inc, N}},
     receive
     receive
         {ReqRef, ok} -> ok;
         {ReqRef, ok} -> ok;
-        {ReqRef, incorrect_status} -> {recoverable_error, incorrect_status}
+        {ReqRef, incorrect_status} -> {error, {recoverable_error, incorrect_status}}
     after 1000 ->
     after 1000 ->
         {error, timeout}
         {error, timeout}
     end;
     end;