Просмотр исходного кода

feat: avoid mixing request with and without the stop_after_render flag

Previously a batch of requests that was sent to a connector could
contain both requests with the stop_after_rendering flag and requests
without this flag. When this happened a warning message was generated and
the stop_after_render flags for the batch would be ignored. This commit
fixes so that a mixed batch is never created so there is no longer any
need for a warning message or ignoring flags.
Kjell Winblad 1 год назад
Родитель
Сommit
cf56050759

+ 9 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -41,6 +41,9 @@
 ]).
 -export([clean_cache/0]).
 
+%% For tests
+-export([hard_coded_test_action_info_modules/0]).
+
 -callback bridge_v1_type_name() ->
     atom()
     | {
@@ -128,8 +131,13 @@ hard_coded_action_info_modules_common() ->
         emqx_bridge_mqtt_pubsub_action_info
     ].
 
+%% This exists so that it can be mocked for test cases
+hard_coded_test_action_info_modules() -> [].
+
 hard_coded_action_info_modules() ->
-    hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().
+    hard_coded_action_info_modules_common() ++
+        hard_coded_action_info_modules_ee() ++
+        ?MODULE:hard_coded_test_action_info_modules().
 
 %% ====================================================================
 %% API

+ 9 - 1
apps/emqx_connector/src/emqx_connector_info.erl

@@ -31,6 +31,9 @@
 
 -export([clean_cache/0]).
 
+%% For tests
+-export([hard_coded_test_connector_info_modules/0]).
+
 %% The type name for the conncector
 -callback type_name() -> atom().
 
@@ -117,8 +120,13 @@ hard_coded_connector_info_modules_common() ->
         emqx_bridge_mqtt_pubsub_connector_info
     ].
 
+%% This exists so that it can be mocked for test cases
+hard_coded_test_connector_info_modules() -> [].
+
 hard_coded_connector_info_modules() ->
-    hard_coded_connector_info_modules_common() ++ hard_coded_connector_info_modules_ee().
+    hard_coded_connector_info_modules_common() ++
+        hard_coded_connector_info_modules_ee() ++
+        ?MODULE:hard_coded_test_connector_info_modules().
 
 %% --------------------------------------------------------------------
 %% Atom macros to avoid typos

+ 77 - 33
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -583,7 +583,11 @@ flush(Data0) ->
             {keep_state, Data1};
         {_, false} ->
             ?tp(buffer_worker_flush_before_pop, #{}),
-            {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
+            PopOpts = #{
+                count_limit => BatchSize,
+                stop_before => {fun stop_before_mixed_stop_after_render/2, initial_state}
+            },
+            {Q1, QAckRef, Batch} = replayq:pop(Q0, PopOpts),
             Data2 = Data1#{queue := Q1},
             ?tp(buffer_worker_flush_before_sieve_expired, #{}),
             Now = now_(),
@@ -619,6 +623,73 @@ flush(Data0) ->
             end
     end.
 
+stop_before_mixed_stop_after_render(
+    ?QUERY(
+        _,
+        _,
+        _,
+        _,
+        #{stop_action_after_render := true} = _TraceCtx
+    ),
+    initial_state
+) ->
+    stop_action_after_render;
+stop_before_mixed_stop_after_render(
+    ?QUERY(
+        _,
+        _,
+        _,
+        _,
+        _TraceCtx
+    ),
+    initial_state
+) ->
+    no_stop_action_after_render;
+stop_before_mixed_stop_after_render(
+    ?QUERY(
+        _,
+        _,
+        _,
+        _,
+        #{stop_action_after_render := true} = _TraceCtx
+    ),
+    no_stop_action_after_render
+) ->
+    true;
+stop_before_mixed_stop_after_render(
+    ?QUERY(
+        _,
+        _,
+        _,
+        _,
+        #{stop_action_after_render := true} = _TraceCtx
+    ),
+    stop_action_after_render
+) ->
+    stop_action_after_render;
+stop_before_mixed_stop_after_render(
+    ?QUERY(
+        _,
+        _,
+        _,
+        _,
+        _TraceCtx
+    ),
+    stop_action_after_render
+) ->
+    true;
+stop_before_mixed_stop_after_render(
+    ?QUERY(
+        _,
+        _,
+        _,
+        _,
+        _TraceCtx
+    ),
+    State
+) ->
+    State.
+
 -spec do_flush(data(), #{
     is_batch := boolean(),
     batch := [queue_query()],
@@ -1119,25 +1190,13 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
     %% Get the rule ids from requests
     RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests),
     ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests),
-    StopAfterRender = lists:foldl(fun collect_stop_after_render/2, no_info, Requests),
     StopAfterRenderVal =
-        case StopAfterRender of
-            only_true ->
-                logger:update_process_metadata(#{stop_action_after_render => false}),
+        case Requests of
+            %% We know that the batch is not mixed since we prevent this by
+            %% using a stop_after function in the replayq:pop call
+            [?QUERY(_, _, _, _, #{stop_action_after_render := true}) | _] ->
                 true;
-            only_false ->
-                false;
-            mixed ->
-                ?TRACE(
-                    warning,
-                    "ACTION",
-                    "mixed_stop_action_after_render_batch "
-                    "(A batch will be sent to connector where some but "
-                    "not all requests has stop_action_after_render set. "
-                    "The batch will get assigned "
-                    "stop_action_after_render = false)",
-                    #{rule_ids => RuleIDs, client_ids => ClientIDs}
-                ),
+            [?QUERY(_, _, _, _, _TraceCTX) | _] ->
                 false
         end,
     logger:update_process_metadata(#{
@@ -1158,21 +1217,6 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) ->
 collect_client_id(?QUERY(_, _, _, _, _), Acc) ->
     Acc.
 
-collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), no_info) ->
-    only_true;
-collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), only_true) ->
-    only_true;
-collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), only_false) ->
-    mixed;
-collect_stop_after_render(?QUERY(_, _, _, _, _), no_info) ->
-    only_false;
-collect_stop_after_render(?QUERY(_, _, _, _, _), only_true) ->
-    mixed;
-collect_stop_after_render(?QUERY(_, _, _, _, _), only_false) ->
-    only_false;
-collect_stop_after_render(?QUERY(_, _, _, _, _), mixed) ->
-    mixed.
-
 unset_rule_id_trace_meta_data() ->
     logger:update_process_metadata(#{
         rule_ids => #{}, client_ids => #{}, stop_action_after_render => false

+ 165 - 17
apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl

@@ -94,9 +94,6 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
     %% ===================================
     %% Create trace for RuleId
     %% ===================================
-    Now = erlang:system_time(second) - 10,
-    Start = Now,
-    End = Now + 60,
     TraceName = atom_to_binary(?FUNCTION_NAME),
     TraceValue =
         case TraceType of
@@ -105,16 +102,7 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
             clientid ->
                 ClientId
         end,
-    Trace = #{
-        name => TraceName,
-        type => TraceType,
-        TraceType => TraceValue,
-        start_at => Start,
-        end_at => End
-    },
-    emqx_trace_SUITE:reload(),
-    ok = emqx_trace:clear(),
-    {ok, _} = emqx_trace:create(Trace),
+    create_trace(TraceName, TraceType, TraceValue),
     %% ===================================
     Context = #{
         clientid => ClientId,
@@ -125,13 +113,12 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
         username => <<"u_emqx">>
     },
     Params = #{
-        % body => #{
         <<"context">> => Context,
         <<"stop_action_after_template_rendering">> => StopAfterRender
-        % }
     },
     emqx_trace:check(),
     ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType),
+    Now = erlang:system_time(second) - 10,
     {ok, _} = file:read_file(emqx_trace:log_file(TraceName, Now)),
     ?assertMatch({ok, _}, call_apply_rule_api(RuleId, Params)),
     ?retry(
@@ -173,14 +160,175 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
     emqx_trace:delete(TraceName),
     ok.
 
+create_trace(TraceName, TraceType, TraceValue) ->
+    Now = erlang:system_time(second) - 10,
+    Start = Now,
+    End = Now + 60,
+    Trace = #{
+        name => TraceName,
+        type => TraceType,
+        TraceType => TraceValue,
+        start_at => Start,
+        end_at => End
+    },
+    emqx_trace_SUITE:reload(),
+    ok = emqx_trace:clear(),
+    {ok, _} = emqx_trace:create(Trace).
+
+t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
+    MeckOpts = [passthrough, no_link, no_history, non_strict],
+    catch meck:new(emqx_connector_info, MeckOpts),
+    meck:expect(
+        emqx_connector_info,
+        hard_coded_test_connector_info_modules,
+        0,
+        [emqx_rule_engine_test_connector_info]
+    ),
+    emqx_connector_info:clean_cache(),
+    catch meck:new(emqx_action_info, MeckOpts),
+    meck:expect(
+        emqx_action_info,
+        hard_coded_test_action_info_modules,
+        0,
+        [emqx_rule_engine_test_action_info]
+    ),
+    emqx_action_info:clean_cache(),
+    {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}),
+    Name = atom_to_binary(?FUNCTION_NAME),
+    ActionConf =
+        #{
+            <<"connector">> => Name,
+            <<"parameters">> =>
+                #{
+                    <<"values">> =>
+                        #{
+                            <<"send_to_pid">> => emqx_utils:bin_to_hexstr(
+                                term_to_binary(self()), upper
+                            )
+                        }
+                },
+            <<"resource_opts">> => #{
+                <<"batch_size">> => 1000,
+                <<"batch_time">> => 500
+            }
+        },
+    {ok, _} = emqx_bridge_v2:create(
+        rule_engine_test,
+        ?FUNCTION_NAME,
+        ActionConf
+    ),
+    SQL = <<"SELECT payload.is_stop_after_render as stop_after_render FROM \"", Name/binary, "\"">>,
+    {ok, RuleID} = create_rule_with_action(
+        rule_engine_test,
+        ?FUNCTION_NAME,
+        SQL
+    ),
+    create_trace(Name, ruleid, RuleID),
+    emqx_trace:check(),
+    ok = emqx_trace_handler_SUITE:filesync(Name, ruleid),
+    Now = erlang:system_time(second) - 10,
+    %% Stop
+    ParmsStopAfterRender = apply_rule_parms(true, Name),
+    ParmsNoStopAfterRender = apply_rule_parms(false, Name),
+    %% Check that batching is working
+    Count = 400,
+    CountMsgFun =
+        fun
+            CountMsgFunRec(0 = _CurCount, GotBatchWithAtLeastTwo) ->
+                GotBatchWithAtLeastTwo;
+            CountMsgFunRec(CurCount, GotBatchWithAtLeastTwo) ->
+                receive
+                    List ->
+                        Len = length(List),
+                        CountMsgFunRec(CurCount - Len, GotBatchWithAtLeastTwo orelse (Len > 1))
+                end
+        end,
+    lists:foreach(
+        fun(_) ->
+            {ok, _} = call_apply_rule_api(RuleID, ParmsStopAfterRender)
+        end,
+        lists:seq(1, Count)
+    ),
+    %% We should get the messages and at least one batch with more than 1
+    true = CountMsgFun(Count, false),
+    %% We should check that we don't get any mixed batch
+    CheckBatchesFun =
+        fun
+            CheckBatchesFunRec(0 = _CurCount) ->
+                ok;
+            CheckBatchesFunRec(CurCount) ->
+                receive
+                    [{_, #{<<"stop_after_render">> := StopValue}} | _] = List ->
+                        [
+                            ?assertMatch(#{<<"stop_after_render">> := StopValue}, Msg)
+                         || {_, Msg} <- List
+                        ],
+                        Len = length(List),
+                        CheckBatchesFunRec(CurCount - Len)
+                end
+        end,
+    lists:foreach(
+        fun(_) ->
+            case rand:normal() < 0 of
+                true ->
+                    {ok, _} = call_apply_rule_api(RuleID, ParmsStopAfterRender);
+                false ->
+                    {ok, _} = call_apply_rule_api(RuleID, ParmsNoStopAfterRender)
+            end
+        end,
+        lists:seq(1, Count)
+    ),
+    CheckBatchesFun(Count),
+    %% Just check that the log file is created as expected
+    ?retry(
+        _Interval0 = 200,
+        _NAttempts0 = 20,
+        begin
+            Bin = read_rule_trace_file(Name, ruleid, Now),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>]))
+        end
+    ),
+    ok.
+
+apply_rule_parms(StopAfterRender, Name) ->
+    Payload = #{<<"is_stop_after_render">> => StopAfterRender},
+    Context = #{
+        clientid => Name,
+        event_type => message_publish,
+        payload => emqx_utils_json:encode(Payload),
+        qos => 1,
+        topic => Name,
+        username => <<"u_emqx">>
+    },
+    #{
+        <<"context">> => Context,
+        <<"stop_action_after_template_rendering">> => StopAfterRender
+    }.
+
+create_rule_with_action(ActionType, ActionName, SQL) ->
+    BridgeId = emqx_bridge_resource:bridge_id(ActionType, ActionName),
+    Params = #{
+        enable => true,
+        sql => SQL,
+        actions => [BridgeId]
+    },
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ct:pal("rule action params: ~p", [Params]),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res0} ->
+            #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
+            {ok, RuleId};
+        Error ->
+            Error
+    end.
+
 %% Helper Functions
 
 call_apply_rule_api(RuleId, Params) ->
     Method = post,
     Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId, "test"]),
-    ct:pal("sql test (http):\n  ~p", [Params]),
     Res = request(Method, Path, Params),
-    ct:pal("sql test (http) result:\n  ~p", [Res]),
     Res.
 
 request(Method, Path, Params) ->

+ 101 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl

@@ -0,0 +1,101 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_engine_test_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-define(CONNECTOR_TYPE, rule_engine_test).
+-define(ACTION_TYPE, ?CONNECTOR_TYPE).
+
+bridge_v1_type_name() -> ?ACTION_TYPE.
+
+action_type_name() -> ?ACTION_TYPE.
+
+connector_type_name() -> ?ACTION_TYPE.
+
+schema_module() -> emqx_rule_engine_test_action_info.
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+
+namespace() -> "bridge_test_action_info".
+
+roots() -> [].
+
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    Fields =
+        fields(connector_fields) ++
+            emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
+    emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "post_bridge_v2";
+    Field == "put_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(rule_engine_test_action));
+fields(action) ->
+    {?ACTION_TYPE,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(?MODULE, rule_engine_test_action)),
+            #{
+                desc => <<"Test Action Config">>,
+                required => false
+            }
+        )};
+fields(rule_engine_test_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        hoconsc:mk(
+            hoconsc:ref(?MODULE, action_parameters),
+            #{
+                required => true,
+                desc => undefined
+            }
+        )
+    );
+fields(action_parameters) ->
+    [
+        {values,
+            hoconsc:mk(
+                typerefl:map(),
+                #{desc => undefined, default => #{}}
+            )}
+    ];
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        fields(connector_fields) ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
+fields("config") ->
+    emqx_resource_schema:fields("resource_opts") ++
+        fields(connector_fields);
+fields(connector_fields) ->
+    [
+        {values,
+            hoconsc:mk(
+                typerefl:map(),
+                #{desc => undefined, default => #{}}
+            )}
+    ].
+desc(_) ->
+    undefined.

+ 99 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl

@@ -0,0 +1,99 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_rule_engine_test_connector).
+
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-behaviour(emqx_resource).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_batch_query/3,
+    on_get_status/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
+]).
+
+%% ===================================================================
+callback_mode() -> always_sync.
+
+on_start(
+    _InstId,
+    _Config
+) ->
+    {ok, #{installed_channels => #{}}}.
+
+on_stop(_InstId, _State) ->
+    ok.
+
+on_add_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels
+    } = OldState,
+    ChannelId,
+    ChannelConfig
+) ->
+    NewInstalledChannels = maps:put(ChannelId, ChannelConfig, InstalledChannels),
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+on_remove_channel(
+    _InstId,
+    OldState,
+    _ChannelId
+) ->
+    {ok, OldState}.
+
+on_get_channel_status(
+    _ResId,
+    _ChannelId,
+    _State
+) ->
+    connected.
+
+on_get_channels(ResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ResId).
+
+on_query(
+    _InstId,
+    _Query,
+    _State
+) ->
+    ok.
+
+on_batch_query(
+    _InstId,
+    [{ChannelId, _Req} | _] = Msg,
+    #{installed_channels := Channels} = _State
+) ->
+    #{parameters := #{values := #{send_to_pid := PidBin}}} = maps:get(ChannelId, Channels),
+    Pid = binary_to_term(emqx_utils:hexstr_to_bin(PidBin)),
+    Pid ! Msg,
+    ok.
+
+on_get_status(_InstId, _State) ->
+    connected.

+ 43 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl

@@ -0,0 +1,43 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_engine_test_connector_info).
+
+-behaviour(emqx_connector_info).
+
+-export([
+    type_name/0,
+    bridge_types/0,
+    resource_callback_module/0,
+    config_schema/0,
+    schema_module/0,
+    api_schema/1
+]).
+
+type_name() ->
+    rule_engine_test.
+
+bridge_types() ->
+    [rule_engine_test].
+
+resource_callback_module() ->
+    emqx_rule_engine_test_connector.
+
+config_schema() ->
+    {rule_engine_test,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(emqx_rule_engine_test_action_info, "config_connector")),
+            #{
+                desc => <<"Test Connector Config">>,
+                required => false
+            }
+        )}.
+
+schema_module() ->
+    emqx_rule_engine_test_action_info.
+
+api_schema(Method) ->
+    emqx_connector_schema:api_ref(
+        ?MODULE, <<"rule_engine_test">>, Method ++ "_connector"
+    ).

+ 1 - 1
mix.exs

@@ -60,7 +60,7 @@ defmodule EMQXUmbrella.MixProject do
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.4.0", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true},
-      {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
+      {:replayq, github: "emqx/replayq", tag: "0.3.8", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       # maybe forbid to fetch quicer
       {:emqtt,

+ 1 - 1
rebar.config

@@ -88,7 +88,7 @@
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
     {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.0"}}},
     {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}},
-    {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}},
+    {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.8"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}},
     {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.0"}}},