Преглед изворни кода

Merge pull request #12143 from thalesmg/fix-reply-to-async-r54-20231211

fix(actions): increment rule statistics even if channel is not installed
Thales Macedo Garitezi пре 2 година
родитељ
комит
4fc14fb4ba

+ 156 - 2
apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl

@@ -21,6 +21,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -343,7 +344,7 @@ t_send_message_through_rule(_) ->
     BridgeName = my_test_bridge,
     {ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, bridge_config()),
     %% Create a rule to send message to the bridge
-    {ok, _} = emqx_rule_engine:create_rule(
+    {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
         #{
             sql => <<"select * from \"t/a\"">>,
             id => atom_to_binary(?FUNCTION_NAME),
@@ -357,6 +358,7 @@ t_send_message_through_rule(_) ->
             description => <<"bridge_v2 test rule">>
         }
     ),
+    on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
     %% Register name for this process
     register(registered_process_name(), self()),
     %% Send message to the topic
@@ -371,7 +373,6 @@ t_send_message_through_rule(_) ->
         ct:fail("Failed to receive message")
     end,
     unregister(registered_process_name()),
-    ok = emqx_rule_engine:delete_rule(atom_to_binary(?FUNCTION_NAME)),
     ok = emqx_bridge_v2:remove(bridge_type(), BridgeName),
     ok.
 
@@ -894,6 +895,159 @@ t_lookup_status_when_connecting(_Config) ->
     ?assertMatch(#{status := ?status_disconnected}, ChannelData),
     ok.
 
+t_rule_pointing_to_non_operational_channel(_Config) ->
+    %% Check that, if a rule sends a message to an action that is not yet installed and
+    %% uses `simple_async_internal_buffer', then it eventually increments the rule's
+    %% failed counter.
+    ResponseETS = ets:new(response_ets, [public]),
+    ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}),
+    OnGetStatusFun = wrap_fun(fun() ->
+        ets:lookup_element(ResponseETS, on_get_status_value, 2)
+    end),
+
+    ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
+        <<"on_get_status_fun">> => OnGetStatusFun,
+        <<"resource_opts">> => #{<<"start_timeout">> => 100}
+    }),
+    ConnectorName = ?FUNCTION_NAME,
+    ct:pal("connector config:\n  ~p", [ConnectorConfig]),
+    ?check_trace(
+        begin
+            {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
+
+            ActionName = my_test_action,
+            ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end),
+            ActionConfig = (bridge_config())#{
+                <<"on_get_channel_status_fun">> => ChanStatusFun,
+                <<"connector">> => atom_to_binary(ConnectorName)
+            },
+            ct:pal("action config:\n  ~p", [ActionConfig]),
+
+            meck:new(con_mod(), [passthrough, no_history, non_strict]),
+            on_exit(fun() -> catch meck:unload([con_mod()]) end),
+            meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
+            meck:expect(con_mod(), callback_mode, 0, async_if_possible),
+
+            {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
+
+            ?assertMatch(
+                {ok, #{
+                    error := <<"Not installed">>,
+                    status := ?status_connecting,
+                    resource_data := #{status := ?status_connecting}
+                }},
+                emqx_bridge_v2:lookup(bridge_type(), ActionName)
+            ),
+
+            {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
+                #{
+                    sql => <<"select * from \"t/a\"">>,
+                    id => atom_to_binary(?FUNCTION_NAME),
+                    actions => [
+                        <<
+                            (atom_to_binary(bridge_type()))/binary,
+                            ":",
+                            (atom_to_binary(ActionName))/binary
+                        >>
+                    ]
+                }
+            ),
+            on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
+
+            Msg = emqx_message:make(<<"t/a">>, <<"payload">>),
+            emqx:publish(Msg),
+
+            ActionId = emqx_bridge_v2:id(bridge_type(), ActionName, ConnectorName),
+            ?assertEqual(1, emqx_resource_metrics:matched_get(ActionId)),
+            ?assertEqual(1, emqx_resource_metrics:failed_get(ActionId)),
+            ?retry(
+                _Sleep0 = 100,
+                _Attempts = 20,
+                ?assertMatch(
+                    #{
+                        counters :=
+                            #{
+                                matched := 1,
+                                'actions.failed' := 1
+                            }
+                    },
+                    emqx_metrics_worker:get_metrics(rule_metrics, RuleId)
+                )
+            ),
+
+            ok
+        end,
+        []
+    ),
+
+    ok.
+
+t_query_uses_action_query_mode(_Config) ->
+    %% Check that we compute the query mode from the action and not from the connector
+    %% when querying the resource.
+
+    %% Set one query mode for the connector...
+    meck:new(con_mod(), [passthrough, no_history, non_strict]),
+    on_exit(fun() -> catch meck:unload([con_mod()]) end),
+    meck:expect(con_mod(), query_mode, 1, sync),
+    meck:expect(con_mod(), callback_mode, 0, always_sync),
+
+    ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
+        <<"resource_opts">> => #{<<"start_timeout">> => 100}
+    }),
+    ConnectorName = ?FUNCTION_NAME,
+    ct:pal("connector config:\n  ~p", [ConnectorConfig]),
+    ?check_trace(
+        begin
+            {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
+
+            ActionName = my_test_action,
+            ActionConfig = (bridge_config())#{
+                <<"connector">> => atom_to_binary(ConnectorName)
+            },
+            ct:pal("action config:\n  ~p", [ActionConfig]),
+
+            %% ... now we use a quite different query mode for the action
+            meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
+            meck:expect(con_mod(), callback_mode, 0, async_if_possible),
+
+            {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
+
+            {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
+                #{
+                    sql => <<"select * from \"t/a\"">>,
+                    id => atom_to_binary(?FUNCTION_NAME),
+                    actions => [
+                        <<
+                            (atom_to_binary(bridge_type()))/binary,
+                            ":",
+                            (atom_to_binary(ActionName))/binary
+                        >>
+                    ]
+                }
+            ),
+            on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
+
+            Msg = emqx_message:make(<<"t/a">>, <<"payload">>),
+            {_, {ok, _}} =
+                ?wait_async_action(
+                    emqx:publish(Msg),
+                    #{?snk_kind := call_query},
+                    2_000
+                ),
+
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch(
+                [#{query_mode := simple_async_internal_buffer}],
+                ?of_kind(simple_query_override, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
 %% Helper Functions
 
 wait_until(Fun) ->

+ 45 - 23
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1122,14 +1122,14 @@ pre_query_channel_check({Id, _} = _Request, Channels, QueryOpts) when
         true ->
             ok;
         false ->
-            maybe_throw_channel_not_installed(Id, QueryOpts)
+            error_if_channel_is_not_installed(Id, QueryOpts)
     end;
 pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) ->
-    maybe_throw_channel_not_installed(Id, QueryOpts);
+    error_if_channel_is_not_installed(Id, QueryOpts);
 pre_query_channel_check(_Request, _Channels, _QueryOpts) ->
     ok.
 
-maybe_throw_channel_not_installed(Id, QueryOpts) ->
+error_if_channel_is_not_installed(Id, QueryOpts) ->
     %% Fail with a recoverable error if the channel is not installed and there are buffer
     %% workers involved so that the operation can be retried.  Otherwise, this is
     %% unrecoverable.  It is emqx_resource_manager's responsibility to ensure that the
@@ -1137,19 +1137,25 @@ maybe_throw_channel_not_installed(Id, QueryOpts) ->
     IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
     case is_channel_id(Id) of
         true when IsSimpleQuery ->
-            error(
+            {error,
                 {unrecoverable_error,
-                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}
-            );
+                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
         true ->
-            error(
+            {error,
                 {recoverable_error,
-                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}
-            );
+                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
         false ->
             ok
     end.
 
+do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Resource) when
+    ReqQM =:= simple_sync_internal_buffer; ReqQM =:= simple_async_internal_buffer
+->
+    %% The query overrides the query mode of the resource, send even in disconnected state
+    ?tp(simple_query_override, #{query_mode => ReqQM}),
+    #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
+    CallMode = call_mode(QM, CBM),
+    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
 do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
     ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
 ->
@@ -1201,8 +1207,12 @@ apply_query_fun(
         ?APPLY_RESOURCE(
             call_query,
             begin
-                pre_query_channel_check(Request, Channels, QueryOpts),
-                Mod:on_query(extract_connector_id(Id), Request, ResSt)
+                case pre_query_channel_check(Request, Channels, QueryOpts) of
+                    ok ->
+                        Mod:on_query(extract_connector_id(Id), Request, ResSt);
+                    Error ->
+                        Error
+                end
             end,
             Request
         ),
@@ -1232,11 +1242,15 @@ apply_query_fun(
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
-            pre_query_channel_check(Request, Channels, QueryOpts),
-            Result = Mod:on_query_async(
-                extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
-            ),
-            {async_return, Result}
+            case pre_query_channel_check(Request, Channels, QueryOpts) of
+                ok ->
+                    Result = Mod:on_query_async(
+                        extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
+                    ),
+                    {async_return, Result};
+                Error ->
+                    maybe_reply_to(Error, QueryOpts)
+            end
         end,
         Request
     );
@@ -1259,8 +1273,12 @@ apply_query_fun(
         ?APPLY_RESOURCE(
             call_batch_query,
             begin
-                pre_query_channel_check(FirstRequest, Channels, QueryOpts),
-                Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt)
+                case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
+                    ok ->
+                        Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt);
+                    Error ->
+                        Error
+                end
             end,
             Batch
         ),
@@ -1301,11 +1319,15 @@ apply_query_fun(
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
-            pre_query_channel_check(FirstRequest, Channels, QueryOpts),
-            Result = Mod:on_batch_query_async(
-                extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
-            ),
-            {async_return, Result}
+            case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
+                ok ->
+                    Result = Mod:on_batch_query_async(
+                        extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
+                    ),
+                    {async_return, Result};
+                Error ->
+                    maybe_reply_to(Error, QueryOpts)
+            end
         end,
         Batch
     ).