Jelajahi Sumber

Merge pull request #10206 from thalesmg/decouple-buffer-worker-query-call-mode-v50

feat(buffer_worker): decouple query mode from underlying connector call mode
Thales Macedo Garitezi 3 tahun lalu
induk
melakukan
ff272a2071

+ 1 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -265,7 +265,7 @@ query(ResId, Request, Opts) ->
             IsBufferSupported = is_buffer_supported(Module),
             case {IsBufferSupported, QM} of
                 {true, _} ->
-                    %% only Kafka so far
+                    %% only Kafka producer so far
                     Opts1 = Opts#{is_buffer_supported => true},
                     emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
                 {false, sync} ->

+ 11 - 15
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -142,7 +142,7 @@ simple_sync_query(Id, Request) ->
     QueryOpts = simple_query_opts(),
     emqx_resource_metrics:matched_inc(Id),
     Ref = make_request_ref(),
-    Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
+    Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
 
@@ -154,7 +154,7 @@ simple_async_query(Id, Request, QueryOpts0) ->
     QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     Ref = make_request_ref(),
-    Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
+    Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
 
@@ -381,7 +381,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
     } = Data0,
     ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
     QueryOpts = #{simple_query => false},
-    Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
+    Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
     ReplyResult =
         case QueryOrBatch of
             ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
@@ -570,7 +570,7 @@ do_flush(
     %% unwrap when not batching (i.e., batch size == 1)
     [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch,
     QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
-    Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
+    Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts),
     Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
     case reply_caller(Id, Reply, QueryOpts) of
         %% Failed; remove the request from the queue, as we cannot pop
@@ -655,7 +655,7 @@ do_flush(#{queue := Q1} = Data0, #{
         inflight_tid := InflightTID
     } = Data0,
     QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
-    Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
+    Result = call_query(async_if_possible, Id, Index, Ref, Batch, QueryOpts),
     case batch_reply_caller(Id, Result, Batch, QueryOpts) of
         %% Failed; remove the request from the queue, as we cannot pop
         %% from it again, but we'll retry it using the inflight table.
@@ -887,17 +887,13 @@ handle_async_worker_down(Data0, Pid) ->
     mark_inflight_items_as_retriable(Data, WorkerMRef),
     {keep_state, Data}.
 
-call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
-    ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}),
+-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
+call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
+    ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}),
     case emqx_resource_manager:lookup_cached(Id) of
         {ok, _Group, #{status := stopped}} ->
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
         {ok, _Group, Resource} ->
-            QM =
-                case QM0 =:= configured of
-                    true -> maps:get(query_mode, Resource);
-                    false -> QM0
-                end,
             do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
         {error, not_found} ->
             ?RESOURCE_ERROR(not_found, "resource not found")
@@ -1515,9 +1511,9 @@ inc_sent_success(Id, _HasBeenSent = true) ->
 inc_sent_success(Id, _HasBeenSent) ->
     emqx_resource_metrics:success_inc(Id).
 
-call_mode(sync, _) -> sync;
-call_mode(async, always_sync) -> sync;
-call_mode(async, async_if_possible) -> async.
+call_mode(force_sync, _) -> sync;
+call_mode(async_if_possible, always_sync) -> sync;
+call_mode(async_if_possible, async_if_possible) -> async.
 
 assert_ok_result(ok) ->
     true;

+ 15 - 0
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -146,6 +146,12 @@ on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
         {error, timeout}
     end.
 
+on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) ->
+    Pid ! {block, ReplyFun},
+    {ok, Pid};
+on_query_async(_InstId, resume, ReplyFun, #{pid := Pid}) ->
+    Pid ! {resume, ReplyFun},
+    {ok, Pid};
 on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
     Pid ! {inc, N, ReplyFun},
     {ok, Pid};
@@ -274,6 +280,10 @@ counter_loop(
             block ->
                 ct:pal("counter recv: ~p", [block]),
                 State#{status => blocked};
+            {block, ReplyFun} ->
+                ct:pal("counter recv: ~p", [block]),
+                apply_reply(ReplyFun, ok),
+                State#{status => blocked};
             {block_now, ReplyFun} ->
                 ct:pal("counter recv: ~p", [block_now]),
                 apply_reply(
@@ -284,6 +294,11 @@ counter_loop(
                 {messages, Msgs} = erlang:process_info(self(), messages),
                 ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
                 State#{status => running};
+            {resume, ReplyFun} ->
+                {messages, Msgs} = erlang:process_info(self(), messages),
+                ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
+                apply_reply(ReplyFun, ok),
+                State#{status => running};
             {inc, N, ReplyFun} when Status == running ->
                 %ct:pal("async counter recv: ~p", [{inc, N}]),
                 apply_reply(ReplyFun, ok),

+ 83 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -2561,6 +2561,84 @@ do_t_recursive_flush() ->
     ),
     ok.
 
+t_call_mode_uncoupled_from_query_mode(_Config) ->
+    DefaultOpts = #{
+        batch_size => 1,
+        batch_time => 5,
+        worker_pool_size => 1
+    },
+    ?check_trace(
+        begin
+            %% We check that we can call the buffer workers with async
+            %% calls, even if the underlying connector itself only
+            %% supports sync calls.
+            emqx_connector_demo:set_callback_mode(always_sync),
+            {ok, _} = emqx_resource:create(
+                ?ID,
+                ?DEFAULT_RESOURCE_GROUP,
+                ?TEST_RESOURCE,
+                #{name => test_resource},
+                DefaultOpts#{query_mode => async}
+            ),
+            ?tp_span(
+                async_query_sync_driver,
+                #{},
+                ?assertMatch(
+                    {ok, {ok, _}},
+                    ?wait_async_action(
+                        emqx_resource:query(?ID, {inc_counter, 1}),
+                        #{?snk_kind := buffer_worker_flush_ack},
+                        500
+                    )
+                )
+            ),
+            ?assertEqual(ok, emqx_resource:remove_local(?ID)),
+
+            %% And we check the converse: a connector that allows async
+            %% calls can be called synchronously, but the underlying
+            %% call should be async.
+            emqx_connector_demo:set_callback_mode(async_if_possible),
+            {ok, _} = emqx_resource:create(
+                ?ID,
+                ?DEFAULT_RESOURCE_GROUP,
+                ?TEST_RESOURCE,
+                #{name => test_resource},
+                DefaultOpts#{query_mode => sync}
+            ),
+            ?tp_span(
+                sync_query_async_driver,
+                #{},
+                ?assertEqual(ok, emqx_resource:query(?ID, {inc_counter, 2}))
+            ),
+            ?assertEqual(ok, emqx_resource:remove_local(?ID)),
+            ?tp(sync_query_async_driver, #{}),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace1 = trace_between_span(Trace0, async_query_sync_driver),
+            ct:pal("async query calling sync driver\n  ~p", [Trace1]),
+            ?assert(
+                ?strict_causality(
+                    #{?snk_kind := async_query, request := {inc_counter, 1}},
+                    #{?snk_kind := call_query, call_mode := sync},
+                    Trace1
+                )
+            ),
+
+            Trace2 = trace_between_span(Trace0, sync_query_async_driver),
+            ct:pal("sync query calling async driver\n  ~p", [Trace2]),
+            ?assert(
+                ?strict_causality(
+                    #{?snk_kind := sync_query, request := {inc_counter, 2}},
+                    #{?snk_kind := call_query_async},
+                    Trace2
+                )
+            ),
+
+            ok
+        end
+    ).
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
@@ -2742,3 +2820,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
         )
     ),
     ok.
+
+trace_between_span(Trace0, Marker) ->
+    {Trace1, [_ | _]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := {complete, _}}, Trace0),
+    {[_ | _], [_ | Trace2]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := start}, Trace1),
+    Trace2.

+ 7 - 0
changes/ce/feat-10206.en.md

@@ -0,0 +1,7 @@
+Decouple the query mode from the underlying call mode for buffer
+workers.
+
+Prior to this change, setting the query mode of a resource
+such as a bridge to `sync` would force the buffer to call the
+underlying connector in a synchronous way, even if it supports async
+calls.

+ 2 - 2
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl

@@ -291,7 +291,7 @@ t_setup_via_config_and_publish(Config) ->
         end,
         fun(Trace0) ->
             Trace = ?of_kind(dynamo_connector_query_return, Trace0),
-            ?assertMatch([#{result := {ok, _}}], Trace),
+            ?assertMatch([#{result := ok}], Trace),
             ok
         end
     ),
@@ -328,7 +328,7 @@ t_setup_via_http_api_and_publish(Config) ->
         end,
         fun(Trace0) ->
             Trace = ?of_kind(dynamo_connector_query_return, Trace0),
-            ?assertMatch([#{result := {ok, _}}], Trace),
+            ?assertMatch([#{result := ok}], Trace),
             ok
         end
     ),

+ 18 - 64
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl

@@ -1023,7 +1023,6 @@ t_publish_timeout(Config) ->
     do_econnrefused_or_timeout_test(Config, timeout).
 
 do_econnrefused_or_timeout_test(Config, Error) ->
-    QueryMode = ?config(query_mode, Config),
     ResourceId = ?config(resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
     Topic = <<"t/topic">>,
@@ -1031,15 +1030,8 @@ do_econnrefused_or_timeout_test(Config, Error) ->
     Message = emqx_message:make(Topic, Payload),
     ?check_trace(
         begin
-            case {QueryMode, Error} of
-                {sync, _} ->
-                    {_, {ok, _}} =
-                        ?wait_async_action(
-                            emqx:publish(Message),
-                            #{?snk_kind := gcp_pubsub_request_failed, recoverable_error := true},
-                            15_000
-                        );
-                {async, econnrefused} ->
+            case Error of
+                econnrefused ->
                     %% at the time of writing, async requests
                     %% are never considered expired by ehttpc
                     %% (even if they arrive late, or never
@@ -1059,7 +1051,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                             },
                             15_000
                         );
-                {async, timeout} ->
+                timeout ->
                     %% at the time of writing, async requests
                     %% are never considered expired by ehttpc
                     %% (even if they arrive late, or never
@@ -1077,18 +1069,13 @@ do_econnrefused_or_timeout_test(Config, Error) ->
             end
         end,
         fun(Trace) ->
-            case {QueryMode, Error} of
-                {sync, _} ->
+            case Error of
+                econnrefused ->
                     ?assertMatch(
                         [#{reason := Error, connector := ResourceId} | _],
                         ?of_kind(gcp_pubsub_request_failed, Trace)
                     );
-                {async, econnrefused} ->
-                    ?assertMatch(
-                        [#{reason := Error, connector := ResourceId} | _],
-                        ?of_kind(gcp_pubsub_request_failed, Trace)
-                    );
-                {async, timeout} ->
+                timeout ->
                     ?assertMatch(
                         [_, _ | _],
                         ?of_kind(gcp_pubsub_response, Trace)
@@ -1098,11 +1085,11 @@ do_econnrefused_or_timeout_test(Config, Error) ->
         end
     ),
 
-    case {Error, QueryMode} of
+    case Error of
         %% apparently, async with disabled queue doesn't mark the
         %% message as dropped; and since it never considers the
         %% response expired, this succeeds.
-        {econnrefused, async} ->
+        econnrefused ->
             wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
                 timeout => 10_000, n_events => 1
             }),
@@ -1124,7 +1111,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                 } when Matched >= 1 andalso Inflight + Queueing + Dropped + Failed =< 2,
                 CurrentMetrics
             );
-        {timeout, async} ->
+        timeout ->
             wait_until_gauge_is(inflight, 0, _Timeout = 400),
             wait_until_gauge_is(queuing, 0, _Timeout = 400),
             assert_metrics(
@@ -1139,21 +1126,6 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                     late_reply => 2
                 },
                 ResourceId
-            );
-        {_, sync} ->
-            wait_until_gauge_is(queuing, 0, 500),
-            wait_until_gauge_is(inflight, 1, 500),
-            assert_metrics(
-                #{
-                    dropped => 0,
-                    failed => 0,
-                    inflight => 1,
-                    matched => 1,
-                    queuing => 0,
-                    retried => 0,
-                    success => 0
-                },
-                ResourceId
             )
     end,
 
@@ -1277,7 +1249,6 @@ t_failure_no_body(Config) ->
 
 t_unrecoverable_error(Config) ->
     ResourceId = ?config(resource_id, Config),
-    QueryMode = ?config(query_mode, Config),
     TestPid = self(),
     FailureNoBodyHandler =
         fun(Req0, State) ->
@@ -1308,33 +1279,16 @@ t_unrecoverable_error(Config) ->
     Message = emqx_message:make(Topic, Payload),
     ?check_trace(
         {_, {ok, _}} =
-            case QueryMode of
-                sync ->
-                    ?wait_async_action(
-                        emqx:publish(Message),
-                        #{?snk_kind := gcp_pubsub_request_failed},
-                        5_000
-                    );
-                async ->
-                    ?wait_async_action(
-                        emqx:publish(Message),
-                        #{?snk_kind := gcp_pubsub_response},
-                        5_000
-                    )
-            end,
+            ?wait_async_action(
+                emqx:publish(Message),
+                #{?snk_kind := gcp_pubsub_response},
+                5_000
+            ),
         fun(Trace) ->
-            case QueryMode of
-                sync ->
-                    ?assertMatch(
-                        [#{reason := killed}],
-                        ?of_kind(gcp_pubsub_request_failed, Trace)
-                    );
-                async ->
-                    ?assertMatch(
-                        [#{response := {error, killed}}],
-                        ?of_kind(gcp_pubsub_response, Trace)
-                    )
-            end,
+            ?assertMatch(
+                [#{response := {error, killed}}],
+                ?of_kind(gcp_pubsub_response, Trace)
+            ),
             ok
         end
     ),

+ 23 - 17
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl

@@ -532,10 +532,12 @@ t_start_ok(Config) ->
     },
     ?check_trace(
         begin
-            ?assertEqual(ok, send_message(Config, SentData)),
             case QueryMode of
-                async -> ct:sleep(500);
-                sync -> ok
+                async ->
+                    ?assertMatch(ok, send_message(Config, SentData)),
+                    ct:sleep(500);
+                sync ->
+                    ?assertMatch({ok, 204, _}, send_message(Config, SentData))
             end,
             PersistedData = query_by_clientid(ClientId, Config),
             Expected = #{
@@ -689,10 +691,12 @@ t_const_timestamp(Config) ->
         <<"payload">> => Payload,
         <<"timestamp">> => erlang:system_time(millisecond)
     },
-    ?assertEqual(ok, send_message(Config, SentData)),
     case QueryMode of
-        async -> ct:sleep(500);
-        sync -> ok
+        async ->
+            ?assertMatch(ok, send_message(Config, SentData)),
+            ct:sleep(500);
+        sync ->
+            ?assertMatch({ok, 204, _}, send_message(Config, SentData))
     end,
     PersistedData = query_by_clientid(ClientId, Config),
     Expected = #{foo => <<"123">>},
@@ -745,7 +749,12 @@ t_boolean_variants(Config) ->
                 <<"timestamp">> => erlang:system_time(millisecond),
                 <<"payload">> => Payload
             },
-            ?assertEqual(ok, send_message(Config, SentData)),
+            case QueryMode of
+                sync ->
+                    ?assertMatch({ok, 204, _}, send_message(Config, SentData));
+                async ->
+                    ?assertMatch(ok, send_message(Config, SentData))
+            end,
             case QueryMode of
                 async -> ct:sleep(500);
                 sync -> ok
@@ -841,10 +850,9 @@ t_bad_timestamp(Config) ->
                     );
                 {sync, false} ->
                     ?assertEqual(
-                        {error,
-                            {unrecoverable_error, [
-                                {error, {bad_timestamp, <<"bad_timestamp">>}}
-                            ]}},
+                        {error, [
+                            {error, {bad_timestamp, <<"bad_timestamp">>}}
+                        ]},
                         Return
                     );
                 {sync, true} ->
@@ -964,7 +972,7 @@ t_write_failure(Config) ->
                                 {error, {resource_error, #{reason := timeout}}},
                                 send_message(Config, SentData)
                             ),
-                            #{?snk_kind := buffer_worker_flush_nack},
+                            #{?snk_kind := handle_async_reply, action := nack},
                             1_000
                         );
                 async ->
@@ -978,13 +986,13 @@ t_write_failure(Config) ->
         fun(Trace0) ->
             case QueryMode of
                 sync ->
-                    Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
+                    Trace = ?of_kind(handle_async_reply, Trace0),
                     ?assertMatch([_ | _], Trace),
                     [#{result := Result} | _] = Trace,
                     ?assert(
                         {error, {error, {closed, "The connection was lost."}}} =:= Result orelse
                             {error, {error, closed}} =:= Result orelse
-                            {error, {recoverable_error, {error, econnrefused}}} =:= Result,
+                            {error, {recoverable_error, econnrefused}} =:= Result,
                         #{got => Result}
                     );
                 async ->
@@ -1006,7 +1014,6 @@ t_write_failure(Config) ->
     ok.
 
 t_missing_field(Config) ->
-    QueryMode = ?config(query_mode, Config),
     BatchSize = ?config(batch_size, Config),
     IsBatch = BatchSize > 1,
     {ok, _} =
@@ -1034,8 +1041,7 @@ t_missing_field(Config) ->
             {ok, _} =
                 snabbkaffe:block_until(
                     ?match_n_events(NEvents, #{
-                        ?snk_kind := influxdb_connector_send_query_error,
-                        mode := QueryMode
+                        ?snk_kind := influxdb_connector_send_query_error
                     }),
                     _Timeout1 = 10_000
                 ),

+ 2 - 2
lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl

@@ -94,7 +94,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
         emqx_resource:get_instance(PoolName),
     ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
     % % Perform query as further check that the resource is working as expected
-    ?assertMatch(ok, emqx_resource:query(PoolName, test_query())),
+    ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
     ?assertEqual(ok, emqx_resource:stop(PoolName)),
     % Resource will be listed still, but state will be changed and healthcheck will fail
     % as the worker no longer exists.
@@ -116,7 +116,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
     {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
         emqx_resource:get_instance(PoolName),
     ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
-    ?assertMatch(ok, emqx_resource:query(PoolName, test_query())),
+    ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
     % Stop and remove the resource in one go.
     ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),