Przeglądaj źródła

fix(bufworker): handle replies of simple async queries

Before that change, simple queries were treated as "retries"
essentially, thus skipping all the reply processing there is.
Andrew Mayorov 2 lat temu
rodzic
commit
2b4e49e7df

+ 14 - 6
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -109,6 +109,7 @@ start_link(Id, Index, Opts) ->
 
 
 -spec sync_query(id(), request(), query_opts()) -> Result :: term().
 -spec sync_query(id(), request(), query_opts()) -> Result :: term().
 sync_query(Id, Request, Opts0) ->
 sync_query(Id, Request, Opts0) ->
+    ?tp(sync_query, #{id => Id, request => Request, query_opts => Opts0}),
     Opts1 = ensure_timeout_query_opts(Opts0, sync),
     Opts1 = ensure_timeout_query_opts(Opts0, sync),
     Opts = ensure_expire_at(Opts1),
     Opts = ensure_expire_at(Opts1),
     PickKey = maps:get(pick_key, Opts, self()),
     PickKey = maps:get(pick_key, Opts, self()),
@@ -118,6 +119,7 @@ sync_query(Id, Request, Opts0) ->
 
 
 -spec async_query(id(), request(), query_opts()) -> Result :: term().
 -spec async_query(id(), request(), query_opts()) -> Result :: term().
 async_query(Id, Request, Opts0) ->
 async_query(Id, Request, Opts0) ->
+    ?tp(async_query, #{id => Id, request => Request, query_opts => Opts0}),
     Opts1 = ensure_timeout_query_opts(Opts0, async),
     Opts1 = ensure_timeout_query_opts(Opts0, async),
     Opts = ensure_expire_at(Opts1),
     Opts = ensure_expire_at(Opts1),
     PickKey = maps:get(pick_key, Opts, self()),
     PickKey = maps:get(pick_key, Opts, self()),
@@ -133,6 +135,7 @@ simple_sync_query(Id, Request) ->
     %% call ends up calling buffering functions, that's a bug and
     %% call ends up calling buffering functions, that's a bug and
     %% would mess up the metrics anyway.  `undefined' is ignored by
     %% would mess up the metrics anyway.  `undefined' is ignored by
     %% `emqx_resource_metrics:*_shift/3'.
     %% `emqx_resource_metrics:*_shift/3'.
+    ?tp(simple_sync_query, #{id => Id, request => Request}),
     Index = undefined,
     Index = undefined,
     QueryOpts = simple_query_opts(),
     QueryOpts = simple_query_opts(),
     emqx_resource_metrics:matched_inc(Id),
     emqx_resource_metrics:matched_inc(Id),
@@ -144,6 +147,7 @@ simple_sync_query(Id, Request) ->
 %% simple async-query the resource without batching and queuing.
 %% simple async-query the resource without batching and queuing.
 -spec simple_async_query(id(), request(), query_opts()) -> term().
 -spec simple_async_query(id(), request(), query_opts()) -> term().
 simple_async_query(Id, Request, QueryOpts0) ->
 simple_async_query(Id, Request, QueryOpts0) ->
+    ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
     Index = undefined,
     Index = undefined,
     QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     emqx_resource_metrics:matched_inc(Id),
@@ -877,7 +881,7 @@ handle_async_worker_down(Data0, Pid) ->
     {keep_state, Data}.
     {keep_state, Data}.
 
 
 call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
 call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
-    ?tp(call_query_enter, #{id => Id, query => Query}),
+    ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}),
     case emqx_resource_manager:ets_lookup(Id) of
     case emqx_resource_manager:ets_lookup(Id) of
         {ok, _Group, #{status := stopped}} ->
         {ok, _Group, #{status := stopped}} ->
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
@@ -994,11 +998,12 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
 handle_async_reply(
 handle_async_reply(
     #{
     #{
         request_ref := Ref,
         request_ref := Ref,
-        inflight_tid := InflightTID
+        inflight_tid := InflightTID,
+        query_opts := Opts
     } = ReplyContext,
     } = ReplyContext,
     Result
     Result
 ) ->
 ) ->
-    case maybe_handle_unknown_async_reply(InflightTID, Ref) of
+    case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
         discard ->
         discard ->
             ok;
             ok;
         continue ->
         continue ->
@@ -1068,11 +1073,12 @@ do_handle_async_reply(
 handle_async_batch_reply(
 handle_async_batch_reply(
     #{
     #{
         inflight_tid := InflightTID,
         inflight_tid := InflightTID,
-        request_ref := Ref
+        request_ref := Ref,
+        query_opts := Opts
     } = ReplyContext,
     } = ReplyContext,
     Result
     Result
 ) ->
 ) ->
-    case maybe_handle_unknown_async_reply(InflightTID, Ref) of
+    case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
         discard ->
         discard ->
             ok;
             ok;
         continue ->
         continue ->
@@ -1206,7 +1212,9 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
 %% 2. If the request was previously failed and now pending on a retry,
 %% 2. If the request was previously failed and now pending on a retry,
 %%    then this function will return 'continue' as there is no way to
 %%    then this function will return 'continue' as there is no way to
 %%    tell if this reply is stae or not.
 %%    tell if this reply is stae or not.
-maybe_handle_unknown_async_reply(InflightTID, Ref) ->
+maybe_handle_unknown_async_reply(undefined, _Ref, #{simple_query := true}) ->
+    continue;
+maybe_handle_unknown_async_reply(InflightTID, Ref, #{}) ->
     try ets:member(InflightTID, Ref) of
     try ets:member(InflightTID, Ref) of
         true ->
         true ->
             continue;
             continue;