Преглед на файлове

Merge branch 'release-50' into merge-r50-into-v50-a

Thales Macedo Garitezi преди 2 години
родител
ревизия
447b76464b
променени са 27 файла, в които са добавени 218 реда и са изтрити 143 реда
  1. 1 1
      apps/emqx/include/emqx_release.hrl
  2. 1 1
      apps/emqx/src/emqx.app.src
  3. 2 1
      apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
  4. 3 2
      apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
  5. 18 0
      apps/emqx/test/emqx_ratelimiter_SUITE.erl
  6. 1 1
      apps/emqx_bridge/src/emqx_bridge.app.src
  7. 9 2
      apps/emqx_bridge/src/emqx_bridge_api.erl
  8. 11 8
      apps/emqx_bridge/src/emqx_bridge_resource.erl
  9. 3 29
      apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
  10. 21 7
      apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl
  11. 30 8
      apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
  12. 14 20
      apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
  13. 1 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
  14. 1 1
      apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src
  15. 6 0
      apps/emqx_node_rebalance/src/emqx_node_rebalance.erl
  16. 1 1
      apps/emqx_resource/src/emqx_resource.app.src
  17. 53 52
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl
  18. 5 1
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  19. 1 1
      apps/emqx_rule_engine/src/emqx_rule_engine.app.src
  20. 2 2
      apps/emqx_rule_engine/src/emqx_rule_runtime.erl
  21. 10 2
      apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
  22. 3 0
      changes/ce/feat-10713.en.md
  23. 6 0
      changes/ce/fix-10340-en.md
  24. 1 0
      changes/ce/fix-10708.md
  25. 1 0
      changes/ce/fix-10717.en.md
  26. 11 0
      changes/ce/fix-10728.en.md
  27. 2 2
      rel/i18n/emqx_connector_schema_lib.hocon

+ 1 - 1
apps/emqx/include/emqx_release.hrl

@@ -32,7 +32,7 @@
 %% `apps/emqx/src/bpapi/README.md'
 
 %% Community edition
--define(EMQX_RELEASE_CE, "5.0.25-rc.1").
+-define(EMQX_RELEASE_CE, "5.0.25").
 
 %% Enterprise edition
 -define(EMQX_RELEASE_EE, "5.0.4-alpha.1").

+ 1 - 1
apps/emqx/src/emqx.app.src

@@ -3,7 +3,7 @@
     {id, "emqx"},
     {description, "EMQX Core"},
     % strict semver, bump manually!
-    {vsn, "5.0.25"},
+    {vsn, "5.0.26"},
     {modules, []},
     {registered, []},
     {applications, [

+ 2 - 1
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -36,6 +36,7 @@
     calc_capacity/1,
     extract_with_type/2,
     default_client_config/0,
+    default_bucket_config/0,
     short_paths_fields/1,
     get_listener_opts/1,
     get_node_opts/1,
@@ -61,7 +62,7 @@
 -type limiter_id() :: atom().
 -type bucket_name() :: atom().
 -type rate() :: infinity | float().
--type burst_rate() :: 0 | float().
+-type burst_rate() :: number().
 %% this is a compatible type for the deprecated field and type `capacity`.
 -type burst() :: burst_rate().
 %% the capacity of the token bucket

+ 3 - 2
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -131,6 +131,9 @@ connect(Id, Type, Cfg) ->
 -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
 add_bucket(_Id, _Type, undefined) ->
     ok;
+%% a bucket with an infinity rate shouldn't be added to this server, because it is always full
+add_bucket(_Id, _Type, #{rate := infinity}) ->
+    ok;
 add_bucket(Id, Type, Cfg) ->
     ?CALL(Type, {add_bucket, Id, Cfg}).
 
@@ -507,8 +510,6 @@ make_root(#{rate := Rate, burst := Burst}) ->
         correction => 0
     }.
 
-do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) ->
-    State;
 do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) ->
     case maps:get(Id, Buckets, undefined) of
         undefined ->

+ 18 - 0
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -617,6 +617,24 @@ t_extract_with_type(_) ->
         )
     ).
 
+t_add_bucket(_) ->
+    Checker = fun(Size) ->
+        #{buckets := Buckets} = sys:get_state(emqx_limiter_server:whereis(bytes)),
+        ?assertEqual(Size, maps:size(Buckets), Buckets)
+    end,
+    DefBucket = emqx_limiter_schema:default_bucket_config(),
+    ?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, undefined)),
+    Checker(0),
+    ?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, DefBucket)),
+    Checker(0),
+    ?assertEqual(
+        ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, DefBucket#{rate := 100})
+    ),
+    Checker(1),
+    ?assertEqual(ok, emqx_limiter_server:del_bucket(?FUNCTION_NAME, bytes)),
+    Checker(0),
+    ok.
+
 %%--------------------------------------------------------------------
 %% Test Cases  Create Instance
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.1.18"},
+    {vsn, "0.1.19"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 9 - 2
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -892,10 +892,17 @@ fill_defaults(Type, RawConf) ->
 pack_bridge_conf(Type, RawConf) ->
     #{<<"bridges">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
 
+%% Hide webhook's resource_opts.request_timeout from user.
+filter_raw_conf(<<"webhook">>, RawConf0) ->
+    emqx_utils_maps:deep_remove([<<"resource_opts">>, <<"request_timeout">>], RawConf0);
+filter_raw_conf(_TypeBin, RawConf) ->
+    RawConf.
+
 unpack_bridge_conf(Type, PackedConf) ->
+    TypeBin = bin(Type),
     #{<<"bridges">> := Bridges} = PackedConf,
-    #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges),
-    RawConf.
+    #{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges),
+    filter_raw_conf(TypeBin, RawConf).
 
 is_ok(ok) ->
     ok;

+ 11 - 8
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -165,20 +165,20 @@ create(BridgeId, Conf) ->
 create(Type, Name, Conf) ->
     create(Type, Name, Conf, #{}).
 
-create(Type, Name, Conf, Opts0) ->
+create(Type, Name, Conf, Opts) ->
     ?SLOG(info, #{
         msg => "create bridge",
         type => Type,
         name => Name,
         config => emqx_utils:redact(Conf)
     }),
-    Opts = override_start_after_created(Conf, Opts0),
+    TypeBin = bin(Type),
     {ok, _Data} = emqx_resource:create_local(
         resource_id(Type, Name),
         <<"emqx_bridge">>,
         bridge_to_resource_type(Type),
-        parse_confs(bin(Type), Name, Conf),
-        Opts
+        parse_confs(TypeBin, Name, Conf),
+        parse_opts(Conf, Opts)
     ),
     ok.
 
@@ -189,7 +189,7 @@ update(BridgeId, {OldConf, Conf}) ->
 update(Type, Name, {OldConf, Conf}) ->
     update(Type, Name, {OldConf, Conf}, #{}).
 
-update(Type, Name, {OldConf, Conf}, Opts0) ->
+update(Type, Name, {OldConf, Conf}, Opts) ->
     %% TODO: sometimes its not necessary to restart the bridge connection.
     %%
     %% - if the connection related configs like `servers` is updated, we should restart/start
@@ -198,7 +198,6 @@ update(Type, Name, {OldConf, Conf}, Opts0) ->
     %% the `method` or `headers` of a WebHook is changed, then the bridge can be updated
     %% without restarting the bridge.
     %%
-    Opts = override_start_after_created(Conf, Opts0),
     case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of
         false ->
             ?SLOG(info, #{
@@ -241,11 +240,12 @@ recreate(Type, Name, Conf) ->
     recreate(Type, Name, Conf, #{}).
 
 recreate(Type, Name, Conf, Opts) ->
+    TypeBin = bin(Type),
     emqx_resource:recreate_local(
         resource_id(Type, Name),
         bridge_to_resource_type(Type),
-        parse_confs(bin(Type), Name, Conf),
-        Opts
+        parse_confs(TypeBin, Name, Conf),
+        parse_opts(Conf, Opts)
     ).
 
 create_dry_run(Type, Conf0) ->
@@ -402,6 +402,9 @@ bin(Bin) when is_binary(Bin) -> Bin;
 bin(Str) when is_list(Str) -> list_to_binary(Str);
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
 
+parse_opts(Conf, Opts0) ->
+    override_start_after_created(Conf, Opts0).
+
 override_start_after_created(Config, Opts) ->
     Enabled = maps:get(enable, Config, true),
     StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled),

+ 3 - 29
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -238,36 +238,10 @@ webhook_bridge_converter(Conf0, _HoconOpts) ->
             )
     end.
 
+%% We hide resource_opts.request_timeout from user.
 do_convert_webhook_config(
-    #{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf
+    #{<<"request_timeout">> := ReqT, <<"resource_opts">> := ResOpts} = Conf
 ) ->
-    %% ok: same values
-    Conf;
-do_convert_webhook_config(
-    #{
-        <<"request_timeout">> := ReqTRootRaw,
-        <<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw}
-    } = Conf0
-) ->
-    %% different values; we set them to the same, if they are valid
-    %% durations
-    MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw),
-    MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw),
-    case {MReqTRoot, MReqTResource} of
-        {{ok, ReqTRoot}, {ok, ReqTResource}} ->
-            {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}),
-            Conf1 = emqx_utils_maps:deep_merge(
-                Conf0,
-                #{
-                    <<"request_timeout">> => ReqTRaw,
-                    <<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw}
-                }
-            ),
-            Conf1;
-        _ ->
-            %% invalid values; let the type checker complain about
-            %% that.
-            Conf0
-    end;
+    Conf#{<<"resource_opts">> => ResOpts#{<<"request_timeout">> => ReqT}};
 do_convert_webhook_config(Conf) ->
     Conf.

+ 21 - 7
apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl

@@ -40,12 +40,15 @@ fields("put") ->
 fields("get") ->
     emqx_bridge_schema:status_fields() ++ fields("post");
 fields("creation_opts") ->
-    lists:filter(
-        fun({K, _V}) ->
-            not lists:member(K, unsupported_opts())
-        end,
-        emqx_resource_schema:fields("creation_opts")
-    ).
+    [
+        hidden_request_timeout()
+        | lists:filter(
+            fun({K, _V}) ->
+                not lists:member(K, unsupported_opts())
+            end,
+            emqx_resource_schema:fields("creation_opts")
+        )
+    ].
 
 desc("config") ->
     ?DESC("desc_config");
@@ -163,7 +166,8 @@ unsupported_opts() ->
     [
         enable_batch,
         batch_size,
-        batch_time
+        batch_time,
+        request_timeout
     ].
 
 %%======================================================================================
@@ -190,3 +194,13 @@ name_field() ->
 
 method() ->
     enum([post, put, get, delete]).
+
+hidden_request_timeout() ->
+    {request_timeout,
+        mk(
+            hoconsc:union([infinity, emqx_schema:duration_ms()]),
+            #{
+                required => false,
+                importance => ?IMPORTANCE_HIDDEN
+            }
+        )}.

+ 30 - 8
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -1284,21 +1284,43 @@ t_inconsistent_webhook_request_timeouts(Config) ->
                 <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
             }
         ),
-    ?assertMatch(
-        {ok, 201, #{
-            %% note: same value on both fields
-            <<"request_timeout">> := <<"2s">>,
-            <<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>}
-        }},
+    {ok, 201, #{
+        <<"request_timeout">> := <<"1s">>,
+        <<"resource_opts">> := ResourceOpts
+    }} =
         request_json(
             post,
             uri(["bridges"]),
             BadBridgeParams,
             Config
-        )
-    ),
+        ),
+    ?assertNot(maps:is_key(<<"request_timeout">>, ResourceOpts)),
+    validate_resource_request_timeout(proplists:get_value(group, Config), 1000, Name),
     ok.
 
+validate_resource_request_timeout(single, Timeout, Name) ->
+    SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
+    ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
+    ?check_trace(
+        begin
+            {ok, Res} =
+                ?wait_async_action(
+                    emqx_bridge:send_message(BridgeID, SentData),
+                    #{?snk_kind := async_query},
+                    1000
+                ),
+            ?assertMatch({ok, #{id := ResId, query_opts := #{timeout := Timeout}}}, Res)
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(async_query, Trace0),
+            ?assertMatch([#{query_opts := #{timeout := Timeout}}], Trace),
+            ok
+        end
+    );
+validate_resource_request_timeout(_Cluster, _Timeout, _Name) ->
+    ignore.
+
 %%
 
 request(Method, URL, Config) ->

+ 14 - 20
apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl

@@ -59,27 +59,21 @@ webhook_config_test() ->
         },
         check(Conf2)
     ),
-
-    %% the converter should pick the greater of the two
-    %% request_timeouts and place them in the root and inside
-    %% resource_opts.
-    ?assertMatch(
-        #{
-            <<"bridges">> := #{
-                <<"webhook">> := #{
-                    <<"the_name">> :=
-                        #{
-                            <<"method">> := get,
-                            <<"request_timeout">> := 60_000,
-                            <<"resource_opts">> := #{<<"request_timeout">> := 60_000},
-                            <<"body">> := <<"${payload}">>
-                        }
-                }
+    #{
+        <<"bridges">> := #{
+            <<"webhook">> := #{
+                <<"the_name">> :=
+                    #{
+                        <<"method">> := get,
+                        <<"request_timeout">> := RequestTime,
+                        <<"resource_opts">> := ResourceOpts,
+                        <<"body">> := <<"${payload}">>
+                    }
             }
-        },
-        check(Conf3)
-    ),
-
+        }
+    } = check(Conf3),
+    ?assertEqual(60_000, RequestTime),
+    ?assertMatch(#{<<"request_timeout">> := 60_000}, ResourceOpts),
     ok.
 
 up(#{<<"bridges">> := Bridges0} = Conf0) ->

+ 1 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_pulsar, [
     {description, "EMQX Pulsar Bridge"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src

@@ -1,6 +1,6 @@
 {application, emqx_node_rebalance, [
     {description, "EMQX Node Rebalance"},
-    {vsn, "5.0.0"},
+    {vsn, "5.0.1"},
     {registered, [
         emqx_node_rebalance_sup,
         emqx_node_rebalance,

+ 6 - 0
apps/emqx_node_rebalance/src/emqx_node_rebalance.erl

@@ -267,6 +267,9 @@ evict_conns(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opts}
             ConnEvictRate = maps:get(conn_evict_rate, Opts),
             NodesToEvict = nodes_to_evict(RecipientAvg, DonorNodeCounts),
             ?SLOG(warning, #{
+                donor_conn_avg => DonorAvg,
+                recipient_conn_avg => RecipientAvg,
+                thresholds => Thresholds,
                 msg => "node_rebalance_evict_conns",
                 nodes => NodesToEvict,
                 counts => ConnEvictRate
@@ -297,6 +300,9 @@ evict_sessions(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opt
             SessEvictRate = maps:get(sess_evict_rate, Opts),
             NodesToEvict = nodes_to_evict(RecipientAvg, DonorNodeCounts),
             ?SLOG(warning, #{
+                donor_sess_avg => DonorAvg,
+                recipient_sess_avg => RecipientAvg,
+                thresholds => Thresholds,
                 msg => "node_rebalance_evict_sessions",
                 nodes => NodesToEvict,
                 counts => SessEvictRate

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.15"},
+    {vsn, "0.1.16"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 53 - 52
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -70,18 +70,6 @@
 -define(RETRY_IDX, 3).
 -define(WORKER_MREF_IDX, 4).
 
--define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR),
-    (fun() ->
-        IsFullBefore = is_inflight_full(InflightTID),
-        case (EXPR) of
-            blocked ->
-                ok;
-            ok ->
-                ok = maybe_flush_after_async_reply(IsFullBefore)
-        end
-    end)()
-).
-
 -type id() :: binary().
 -type index() :: pos_integer().
 -type expire_at() :: infinity | integer().
@@ -337,7 +325,8 @@ resume_from_blocked(Data) ->
                     {next_state, running, Data}
             end;
         {expired, Ref, Batch} ->
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
+            WorkerPid = self(),
+            IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
             IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
             ?tp(buffer_worker_retry_expired, #{expired => Batch}),
             resume_from_blocked(Data);
@@ -389,7 +378,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
             {keep_state, Data0, {state_timeout, ResumeT, unblock}};
         %% Send ok or failed but the resource is working
         {ack, PostFn} ->
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
+            WorkerPid = self(),
+            IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
             %% we need to defer bumping the counters after
             %% `inflight_drop' to avoid the race condition when an
             %% inflight request might get completed concurrently with
@@ -495,8 +485,7 @@ flush(Data0) ->
             {keep_state, Data1};
         {_, true} ->
             ?tp(buffer_worker_flush_but_inflight_full, #{}),
-            Data2 = ensure_flush_timer(Data1),
-            {keep_state, Data2};
+            {keep_state, Data1};
         {_, false} ->
             ?tp(buffer_worker_flush_before_pop, #{}),
             {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
@@ -596,13 +585,14 @@ do_flush(
             %% must ensure the async worker is being monitored for
             %% such requests.
             IsUnrecoverableError = is_unrecoverable_error(Result),
+            WorkerPid = self(),
             case is_async_return(Result) of
                 true when IsUnrecoverableError ->
-                    ack_inflight(InflightTID, Ref, Id, Index);
+                    ack_inflight(InflightTID, Ref, Id, Index, WorkerPid);
                 true ->
                     ok;
                 false ->
-                    ack_inflight(InflightTID, Ref, Id, Index)
+                    ack_inflight(InflightTID, Ref, Id, Index, WorkerPid)
             end,
             {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
             store_async_worker_reference(InflightTID, Ref, WorkerMRef),
@@ -680,13 +670,14 @@ do_flush(#{queue := Q1} = Data0, #{
             %% must ensure the async worker is being monitored for
             %% such requests.
             IsUnrecoverableError = is_unrecoverable_error(Result),
+            WorkerPid = self(),
             case is_async_return(Result) of
                 true when IsUnrecoverableError ->
-                    ack_inflight(InflightTID, Ref, Id, Index);
+                    ack_inflight(InflightTID, Ref, Id, Index, WorkerPid);
                 true ->
                     ok;
                 false ->
-                    ack_inflight(InflightTID, Ref, Id, Index)
+                    ack_inflight(InflightTID, Ref, Id, Index, WorkerPid)
             end,
             {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
             store_async_worker_reference(InflightTID, Ref, WorkerMRef),
@@ -1006,7 +997,7 @@ handle_async_reply(
         discard ->
             ok;
         continue ->
-            ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result))
+            handle_async_reply1(ReplyContext, Result)
     end.
 
 handle_async_reply1(
@@ -1015,6 +1006,7 @@ handle_async_reply1(
         inflight_tid := InflightTID,
         resource_id := Id,
         worker_index := Index,
+        buffer_worker := WorkerPid,
         min_query := ?QUERY(_, _, _, ExpireAt) = _Query
     } = ReplyContext,
     Result
@@ -1026,7 +1018,7 @@ handle_async_reply1(
     Now = now_(),
     case is_expired(ExpireAt, Now) of
         true ->
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
+            IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
             IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
             ?tp(handle_async_reply_expired, #{expired => [_Query]}),
             ok;
@@ -1040,7 +1032,7 @@ do_handle_async_reply(
         resource_id := Id,
         request_ref := Ref,
         worker_index := Index,
-        buffer_worker := Pid,
+        buffer_worker := WorkerPid,
         inflight_tid := InflightTID,
         min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
     },
@@ -1063,10 +1055,10 @@ do_handle_async_reply(
         nack ->
             %% Keep retrying.
             ok = mark_inflight_as_retriable(InflightTID, Ref),
-            ok = ?MODULE:block(Pid),
+            ok = ?MODULE:block(WorkerPid),
             blocked;
         ack ->
-            ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
+            ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts)
     end.
 
 handle_async_batch_reply(
@@ -1081,7 +1073,7 @@ handle_async_batch_reply(
         discard ->
             ok;
         continue ->
-            ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result))
+            handle_async_batch_reply1(ReplyContext, Result)
     end.
 
 handle_async_batch_reply1(
@@ -1119,6 +1111,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
     #{
         resource_id := Id,
         worker_index := Index,
+        buffer_worker := WorkerPid,
         inflight_tid := InflightTID,
         request_ref := Ref,
         min_batch := Batch
@@ -1141,7 +1134,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
     case RealNotExpired of
         [] ->
             %% all expired, no need to update back the inflight batch
-            _ = ack_inflight(InflightTID, Ref, Id, Index),
+            _ = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
             ok;
         _ ->
             %% some queries are not expired, put them back to the inflight batch
@@ -1152,7 +1145,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
 
 do_handle_async_batch_reply(
     #{
-        buffer_worker := Pid,
+        buffer_worker := WorkerPid,
         resource_id := Id,
         worker_index := Index,
         inflight_tid := InflightTID,
@@ -1173,14 +1166,14 @@ do_handle_async_batch_reply(
         nack ->
             %% Keep retrying.
             ok = mark_inflight_as_retriable(InflightTID, Ref),
-            ok = ?MODULE:block(Pid),
+            ok = ?MODULE:block(WorkerPid),
             blocked;
         ack ->
-            ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
+            ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts)
     end.
 
-do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) ->
-    IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index),
+do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) ->
+    IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
     case maps:get(simple_query, QueryOpts, false) of
         true ->
             PostFn();
@@ -1191,18 +1184,6 @@ do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) ->
     end,
     ok.
 
-maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) ->
-    %% inflight was not full before async reply is handled,
-    %% after it is handled, the inflight table must be even smaller
-    %% hance we can rely on the buffer worker's flush timer to trigger
-    %% the next flush
-    ?tp(skip_flushing_worker, #{}),
-    ok;
-maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
-    %% the inflight table was full before handling aync reply
-    ?tp(do_flushing_worker, #{}),
-    ok = ?MODULE:flush_worker(self()).
-
 %% check if the async reply is valid.
 %% e.g. if a connector evaluates the callback more than once:
 %% 1. If the request was previously deleted from inflight table due to
@@ -1429,9 +1410,9 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
     ),
     ok.
 
-ack_inflight(undefined, _Ref, _Id, _Index) ->
+ack_inflight(undefined, _Ref, _Id, _Index, _WorkerPid) ->
     false;
-ack_inflight(InflightTID, Ref, Id, Index) ->
+ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) ->
     {Count, Removed} =
         case ets:take(InflightTID, Ref) of
             [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
@@ -1441,7 +1422,11 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
             [] ->
                 {0, false}
         end,
-    ok = dec_inflight_remove(InflightTID, Count, Removed),
+    FlushCheck = dec_inflight_remove(InflightTID, Count, Removed),
+    case FlushCheck of
+        continue -> ok;
+        flush -> ?MODULE:flush_worker(WorkerPid)
+    end,
     IsKnownRef = (Count > 0),
     case IsKnownRef of
         true ->
@@ -1476,16 +1461,32 @@ inc_inflight(InflightTID, Count) ->
     _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
     ok.
 
+-spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) ->
+    continue | flush.
 dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
-    ok;
+    continue;
 dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
-    _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
-    ok;
+    NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
+    MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
+    %% if the new value is Max - 1, it means that we've just made room
+    %% in the inflight table, so we should poke the buffer worker to
+    %% make it continue flushing.
+    case NewValue =:= MaxValue - 1 of
+        true -> flush;
+        false -> continue
+    end;
 dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
     %% If Count > 0, it must have been removed
-    _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
+    NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
     _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
-    ok.
+    MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
+    %% if the new value is Max - 1, it means that we've just made room
+    %% in the inflight table, so we should poke the buffer worker to
+    %% make it continue flushing.
+    case NewValue =:= MaxValue - 1 of
+        true -> flush;
+        false -> continue
+    end.
 
 dec_inflight_update(_InflightTID, _Count = 0) ->
     ok;

+ 5 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1627,7 +1627,11 @@ t_retry_async_inflight_full(_Config) ->
             end
         ]
     ),
-    ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
+    ?retry(
+        _Sleep = 300,
+        _Attempts0 = 20,
+        ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID))
+    ),
     ok.
 
 %% this test case is to ensure the buffer worker will not go crazy even

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -2,7 +2,7 @@
 {application, emqx_rule_engine, [
     {description, "EMQX Rule Engine"},
     % strict semver, bump manually!
-    {vsn, "5.0.16"},
+    {vsn, "5.0.17"},
     {modules, []},
     {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
     {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]},

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -144,14 +144,14 @@ do_apply_rule(
         )
     of
         true ->
-            Collection2 = filter_collection(Columns, InCase, DoEach, Collection),
+            Collection2 = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection),
             case Collection2 of
                 [] ->
                     ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
                 _ ->
                     ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
             end,
-            NewEnvs = maps:merge(Columns, Envs),
+            NewEnvs = maps:merge(ColumnsAndSelected, Envs),
             {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]};
         false ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),

+ 10 - 2
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -1794,11 +1794,12 @@ t_sqlparse_foreach_7(_Config) ->
         )
     ).
 
+-define(COLL, #{<<"info">> := [<<"haha">>, #{<<"name">> := <<"cmd1">>, <<"cmd">> := <<"1">>}]}).
 t_sqlparse_foreach_8(_Config) ->
     %% Verify foreach-do-incase and cascaded AS
     Sql =
         "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
-        "do info.cmd as msg_type, info.name as name "
+        "do info.cmd as msg_type, info.name as name, s, c "
         "incase is_map(info) "
         "from \"t/#\" "
         "where s.page = '2' ",
@@ -1807,7 +1808,14 @@ t_sqlparse_foreach_8(_Config) ->
         "{\"info\":[\"haha\", {\"name\":\"cmd1\", \"cmd\":\"1\"}]} } }"
     >>,
     ?assertMatch(
-        {ok, [#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}]},
+        {ok, [
+            #{
+                <<"name">> := <<"cmd1">>,
+                <<"msg_type">> := <<"1">>,
+                <<"s">> := #{<<"page">> := 2, <<"collection">> := ?COLL},
+                <<"c">> := ?COLL
+            }
+        ]},
         emqx_rule_sqltester:test(
             #{
                 sql => Sql,

+ 3 - 0
changes/ce/feat-10713.en.md

@@ -0,0 +1,3 @@
+We hide the request_timeout in resource_option of the webhook to keep it consistent with the http request_timeout of the webhook.
+From now on, when configuring a webhook through API or configuration files,
+it is no longer necessary to configure the request_timeout of the resource. Only configuring the http request_timeout is sufficient, and the request_timeout in the resource will automatically be consistent with the http request_timeout.

+ 6 - 0
changes/ce/fix-10340-en.md

@@ -0,0 +1,6 @@
+Fixed the issue that could lead to crash logs being printed when stopping EMQ X via systemd.
+```
+2023-03-29T16:43:25.915761+08:00 [error] Generic server memsup terminating. Reason: {port_died,normal}. Last message: {'EXIT',<0.2117.0>,{port_died,normal}}. State: [{data,[{"Timeout",60000}]},{items,{"Memory Usage",[{"Allocated",929959936},{"Total",3832242176}]}},{items,{"Worst Memory User",[{"Pid",<0.2031.0>},{"Memory",4720472}]}}].
+2023-03-29T16:43:25.924764+08:00 [error] crasher: initial call: memsup:init/1, pid: <0.2116.0>, registered_name: memsup, exit: {{port_died,normal},[{gen_server,handle_common_reply,8,[{file,"gen_server.erl"},{line,811}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [os_mon_sup,<0.2114.0>], message_queue_len: 0, messages: [], links: [<0.2115.0>], dictionary: [], trap_exit: true, status: running, heap_size: 4185, stack_size: 29, reductions: 187637; neighbours:
+2023-03-29T16:43:25.924979+08:00 [error] Supervisor: {local,os_mon_sup}. Context: child_terminated. Reason: {port_died,normal}. Offender: id=memsup,pid=<0.2116.0>.
+```

+ 1 - 0
changes/ce/fix-10708.md

@@ -0,0 +1 @@
+Enhanced clarity of the descriptions for the bridge configuration fields (username and password) to better guide users during setup.

+ 1 - 0
changes/ce/fix-10717.en.md

@@ -0,0 +1 @@
+Fixed an issue where the buffering layer processes could use a lot of CPU when inflight window is full.

+ 11 - 0
changes/ce/fix-10728.en.md

@@ -0,0 +1,11 @@
+Fixed an issue where the rule engine was unable to access variables exported by `FOREACH` in the `DO` clause. 
+
+  Given a payload: `{"date": "2023-05-06", "array": ["a"]}`, as well as the following SQL statement:
+  ```
+  FOREACH payload.date as date, payload.array as elem
+  DO date, elem
+  FROM "t/#"
+  ```
+  Prior to the fix, the `date` variable exported by `FOREACH` could not be accessed in the `DO` clause of the above SQL, resulting in the following output for the SQL statement:
+  `[{"elem": "a","date": "undefined"}]`.
+  After the fix, the output of the SQL statement is: `[{"elem": "a","date": "2023-05-06"}]`

+ 2 - 2
rel/i18n/emqx_connector_schema_lib.hocon

@@ -13,7 +13,7 @@ database_desc.label:
 """Database Name"""
 
 password.desc:
-"""EMQX's password in the external database."""
+"""The password associated with the bridge, used for authentication with the external database."""
 
 password.label:
 """Password"""
@@ -37,7 +37,7 @@ ssl.label:
 """Enable SSL"""
 
 username.desc:
-"""EMQX's username in the external database."""
+"""The username associated with the bridge in the external database used for authentication or identification purposes."""
 
 username.label:
 """Username"""