Просмотр исходного кода

fix(actions): use action query mode instead of connector's query mode

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
b4a5c141ad

+ 72 - 8
apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl

@@ -913,14 +913,6 @@ t_rule_pointing_to_non_operational_channel(_Config) ->
     ct:pal("connector config:\n  ~p", [ConnectorConfig]),
     ?check_trace(
         begin
-            %% FIXME: this should only matter for the action.  yet, currently the query
-            %% mode from the connector is stored once by the resource manager and later
-            %% used to decide how to call the resource...
-            meck:new(con_mod(), [passthrough, no_history, non_strict]),
-            on_exit(fun() -> catch meck:unload([con_mod()]) end),
-            meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
-            meck:expect(con_mod(), callback_mode, 0, async_if_possible),
-
             {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
 
             ActionName = my_test_action,
@@ -930,6 +922,12 @@ t_rule_pointing_to_non_operational_channel(_Config) ->
                 <<"connector">> => atom_to_binary(ConnectorName)
             },
             ct:pal("action config:\n  ~p", [ActionConfig]),
+
+            meck:new(con_mod(), [passthrough, no_history, non_strict]),
+            on_exit(fun() -> catch meck:unload([con_mod()]) end),
+            meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
+            meck:expect(con_mod(), callback_mode, 0, async_if_possible),
+
             {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
 
             ?assertMatch(
@@ -984,6 +982,72 @@ t_rule_pointing_to_non_operational_channel(_Config) ->
 
     ok.
 
+t_query_uses_action_query_mode(_Config) ->
+    %% Check that we compute the query mode from the action and not from the connector
+    %% when querying the resource.
+
+    %% Set one query mode for the connector...
+    meck:new(con_mod(), [passthrough, no_history, non_strict]),
+    on_exit(fun() -> catch meck:unload([con_mod()]) end),
+    meck:expect(con_mod(), query_mode, 1, sync),
+    meck:expect(con_mod(), callback_mode, 0, always_sync),
+
+    ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
+        <<"resource_opts">> => #{<<"start_timeout">> => 100}
+    }),
+    ConnectorName = ?FUNCTION_NAME,
+    ct:pal("connector config:\n  ~p", [ConnectorConfig]),
+    ?check_trace(
+        begin
+            {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
+
+            ActionName = my_test_action,
+            ActionConfig = (bridge_config())#{
+                <<"connector">> => atom_to_binary(ConnectorName)
+            },
+            ct:pal("action config:\n  ~p", [ActionConfig]),
+
+            %% ... now we use a quite different query mode for the action
+            meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
+            meck:expect(con_mod(), callback_mode, 0, async_if_possible),
+
+            {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
+
+            {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
+                #{
+                    sql => <<"select * from \"t/a\"">>,
+                    id => atom_to_binary(?FUNCTION_NAME),
+                    actions => [
+                        <<
+                            (atom_to_binary(bridge_type()))/binary,
+                            ":",
+                            (atom_to_binary(ActionName))/binary
+                        >>
+                    ]
+                }
+            ),
+            on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
+
+            Msg = emqx_message:make(<<"t/a">>, <<"payload">>),
+            {_, {ok, _}} =
+                ?wait_async_action(
+                    emqx:publish(Msg),
+                    #{?snk_kind := call_query},
+                    2_000
+                ),
+
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch(
+                [#{query_mode := simple_async_internal_buffer}],
+                ?of_kind(simple_query_override, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
 %% Helper Functions
 
 wait_until(Fun) ->

+ 8 - 0
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1148,6 +1148,14 @@ error_if_channel_is_not_installed(Id, QueryOpts) ->
             ok
     end.
 
+do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Resource) when
+    ReqQM =:= simple_sync_internal_buffer; ReqQM =:= simple_async_internal_buffer
+->
+    %% The query overrides the query mode of the resource, send even in disconnected state
+    ?tp(simple_query_override, #{query_mode => ReqQM}),
+    #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
+    CallMode = call_mode(QM, CBM),
+    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
 do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
     ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
 ->