Browse Source

Merge remote-tracking branch 'origin/master' into release-51

Zaiming (Stone) Shi 2 years ago
parent
commit
af5c6720de

+ 11 - 0
apps/emqx/src/emqx_config_handler.erl

@@ -457,6 +457,17 @@ bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
 bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(B) when is_binary(B) -> B.
 
+atom(Bin) when is_binary(Bin), size(Bin) > 255 ->
+    erlang:throw(
+        iolist_to_binary(
+            io_lib:format(
+                "Name is is too long."
+                " Please provide a shorter name (<= 255 bytes)."
+                " The name that is too long: \"~s\"",
+                [Bin]
+            )
+        )
+    );
 atom(Bin) when is_binary(Bin) ->
     binary_to_atom(Bin, utf8);
 atom(Str) when is_list(Str) ->

+ 2 - 2
apps/emqx/src/emqx_schema.erl

@@ -3018,7 +3018,7 @@ non_empty_string(_) -> {error, invalid_string}.
 %%               hosts can be successfully parsed.
 %% 3. parsing: Done at runtime in each module which uses this config
 servers_sc(Meta0, ParseOpts) ->
-    %% if this filed has a default value
+    %% if this field has a default value
     %% then it is not NOT required
     %% NOTE: maps:is_key is not the solution because #{default => undefined} is legit
     HasDefault = (maps:get(default, Meta0, undefined) =/= undefined),
@@ -3079,7 +3079,7 @@ servers_validator(Opts, Required) ->
                 %% we should remove field from config if it's empty
                 throw("cannot_be_empty");
             "undefined" when Required ->
-                %% when the filed is not set in config file
+                %% when the field is not set in config file
                 %% NOTE: assuming nobody is going to name their server "undefined"
                 throw("cannot_be_empty");
             "undefined" ->

+ 1 - 1
apps/emqx_authz/test/emqx_authz_redis_SUITE.erl

@@ -181,7 +181,7 @@ t_create_with_config_values_wont_work(_Config) ->
         InvalidConfigs
     ).
 
-%% creating without a require filed should return error
+%% creating without a require field should return error
 t_create_invalid_config(_Config) ->
     AuthzConfig = raw_redis_authz_config(),
     C = maps:without([<<"server">>], AuthzConfig),

+ 2 - 2
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -602,8 +602,8 @@ create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
     case emqx_bridge:create(BridgeType, BridgeName, Conf) of
         {ok, _} ->
             lookup_from_all_nodes(BridgeType, BridgeName, HttpStatusCode);
-        {error, #{kind := validation_error} = Reason} ->
-            ?BAD_REQUEST(map_to_json(Reason))
+        {error, Reason} when is_map(Reason) ->
+            ?BAD_REQUEST(map_to_json(emqx_utils:redact(Reason)))
     end.
 
 get_metrics_from_local_node(BridgeType, BridgeName) ->

+ 20 - 0
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -421,6 +421,26 @@ t_http_crud_apis(Config) ->
     ),
 
     %% Test bad updates
+    %% ================
+
+    %% Add bridge with a name that is too long
+    %% We only support bridge names up to 255 characters
+    LongName = list_to_binary(lists:duplicate(256, $a)),
+    NameTooLongRequestResult = request_json(
+        post,
+        uri(["bridges"]),
+        ?HTTP_BRIDGE(URL1, LongName),
+        Config
+    ),
+    ?assertMatch(
+        {ok, 400, _},
+        NameTooLongRequestResult
+    ),
+    {ok, 400, #{<<"message">> := NameTooLongMessage}} = NameTooLongRequestResult,
+    %% Use regex to check that the message contains the name
+    Match = re:run(NameTooLongMessage, LongName),
+    ?assertMatch({match, _}, Match),
+    %% Add bridge without the URL field
     {ok, 400, PutFail1} = request_json(
         put,
         uri(["bridges", BridgeID]),

+ 9 - 9
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl

@@ -95,33 +95,33 @@ namespace() -> "bridge_influxdb".
 roots() -> [].
 
 fields("post_api_v1") ->
-    method_fileds(post, influxdb_api_v1);
+    method_fields(post, influxdb_api_v1);
 fields("post_api_v2") ->
-    method_fileds(post, influxdb_api_v2);
+    method_fields(post, influxdb_api_v2);
 fields("put_api_v1") ->
-    method_fileds(put, influxdb_api_v1);
+    method_fields(put, influxdb_api_v1);
 fields("put_api_v2") ->
-    method_fileds(put, influxdb_api_v2);
+    method_fields(put, influxdb_api_v2);
 fields("get_api_v1") ->
-    method_fileds(get, influxdb_api_v1);
+    method_fields(get, influxdb_api_v1);
 fields("get_api_v2") ->
-    method_fileds(get, influxdb_api_v2);
+    method_fields(get, influxdb_api_v2);
 fields(Type) when
     Type == influxdb_api_v1 orelse Type == influxdb_api_v2
 ->
     influxdb_bridge_common_fields() ++
         connector_fields(Type).
 
-method_fileds(post, ConnectorType) ->
+method_fields(post, ConnectorType) ->
     influxdb_bridge_common_fields() ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType);
-method_fileds(get, ConnectorType) ->
+method_fields(get, ConnectorType) ->
     influxdb_bridge_common_fields() ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType) ++
         emqx_bridge_schema:status_fields();
-method_fileds(put, ConnectorType) ->
+method_fields(put, ConnectorType) ->
     influxdb_bridge_common_fields() ++
         connector_fields(ConnectorType).
 

+ 1 - 1
apps/emqx_conf/etc/emqx_conf.conf

@@ -6,7 +6,7 @@
 ## are stored in data/configs/cluster.hocon.
 ## To avoid confusion, please do not store the same configs in both files.
 ##
-## See https://docs.emqx.com/en/enterprise/v5.0/configuration/configuration.html
+## See {{ emqx_configuration_doc }} for more details.
 ## Configuration full example can be found in emqx.conf.example
 
 node {

+ 64 - 23
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -22,36 +22,32 @@
     unload/0
 ]).
 
+-include_lib("hocon/include/hoconsc.hrl").
+
+%% kept cluster_call for compatibility
 -define(CLUSTER_CALL, cluster_call).
 -define(CONF, conf).
 
 load() ->
-    emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, []),
+    emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]),
     emqx_ctl:register_command(?CONF, {?MODULE, conf}, []).
 
 unload() ->
     emqx_ctl:unregister_command(?CLUSTER_CALL),
     emqx_ctl:unregister_command(?CONF).
 
-conf(["show", "--keys-only"]) ->
-    print(emqx_config:get_root_names());
+conf(["show_keys" | _]) ->
+    print_keys(get_config());
 conf(["show"]) ->
     print_hocon(get_config());
 conf(["show", Key]) ->
     print_hocon(get_config(Key));
 conf(["load", Path]) ->
     load_config(Path);
+conf(["cluster_sync" | Args]) ->
+    admins(Args);
 conf(_) ->
-    emqx_ctl:usage(
-        [
-            %% TODO add reload
-            %{"conf reload", "reload etc/emqx.conf on local node"},
-            {"conf show --keys-only", "print all keys"},
-            {"conf show", "print all running configures"},
-            {"conf show <key>", "print a specific configuration"},
-            {"conf load <path>", "load a hocon file to all nodes"}
-        ]
-    ).
+    emqx_ctl:usage(usage_conf() ++ usage_sync()).
 
 admins(["status"]) ->
     status();
@@ -87,14 +83,34 @@ admins(["fast_forward", Node0, ToTnxId]) ->
     emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId),
     status();
 admins(_) ->
-    emqx_ctl:usage(
-        [
-            {"cluster_call status", "status"},
-            {"cluster_call skip [node]", "increase one commit on specific node"},
-            {"cluster_call tnxid <TnxId>", "get detailed about TnxId"},
-            {"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id"}
-        ]
-    ).
+    emqx_ctl:usage(usage_sync()).
+
+usage_conf() ->
+    [
+        %% TODO add reload
+        %{"conf reload", "reload etc/emqx.conf on local node"},
+        {"conf show_keys", "Print all config keys"},
+        {"conf show [<key>]",
+            "Print in-use configs (including default values) under the given key. "
+            "Print ALL keys if key is not provided"},
+        {"conf load <path>",
+            "Load a HOCON format config file."
+            "The config is overlay on top of the existing configs. "
+            "The current node will initiate a cluster wide config change "
+            "transaction to sync the changes to other nodes in the cluster. "
+            "NOTE: do not make runtime config changes during rolling upgrade."}
+    ].
+
+usage_sync() ->
+    [
+        {"conf cluster_sync status", "Show cluster config sync status summary"},
+        {"conf cluster_sync skip [node]", "Increase one commit on specific node"},
+        {"conf cluster_sync tnxid <TnxId>",
+            "Display detailed information of the config change transaction at TnxId"},
+        {"conf cluster_sync fast_forward [node] [tnx_id]",
+            "Fast-forward config change transaction to tnx_id on the given node."
+            "WARNING: This results in inconsistent configs among the clustered nodes."}
+    ].
 
 status() ->
     emqx_ctl:print("-----------------------------------------------\n"),
@@ -116,14 +132,39 @@ status() ->
     ),
     emqx_ctl:print("-----------------------------------------------\n").
 
+print_keys(Config) ->
+    print(lists:sort(maps:keys(Config))).
+
 print(Json) ->
     emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Json)]).
 
 print_hocon(Hocon) ->
     emqx_ctl:print("~ts~n", [hocon_pp:do(Hocon, #{})]).
 
-get_config() -> emqx_config:fill_defaults(emqx:get_raw_config([])).
-get_config(Key) -> emqx_config:fill_defaults(#{Key => emqx:get_raw_config([Key])}).
+get_config() ->
+    drop_hidden_roots(emqx_config:fill_defaults(emqx:get_raw_config([]))).
+
+drop_hidden_roots(Conf) ->
+    Hidden = hidden_roots(),
+    maps:without(Hidden, Conf).
+
+hidden_roots() ->
+    SchemaModule = emqx_conf:schema_module(),
+    Roots = hocon_schema:roots(SchemaModule),
+    lists:filtermap(
+        fun({BinName, {_RefName, Schema}}) ->
+            case hocon_schema:field_schema(Schema, importance) =/= ?IMPORTANCE_HIDDEN of
+                true ->
+                    false;
+                false ->
+                    {true, BinName}
+            end
+        end,
+        Roots
+    ).
+
+get_config(Key) ->
+    emqx_config:fill_defaults(#{Key => emqx:get_raw_config([Key])}).
 
 -define(OPTIONS, #{rawconf_with_defaults => true, override_to => cluster}).
 load_config(Path) ->

+ 1 - 1
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -1323,7 +1323,7 @@ roots(Module) ->
     lists:map(fun({_BinName, Root}) -> Root end, hocon_schema:roots(Module)).
 
 %% Like authentication schema, authorization schema is incomplete in emqx_schema
-%% module, this function replaces the root filed "authorization" with a new schema
+%% module, this function replaces the root field "authorization" with a new schema
 emqx_schema_high_prio_roots() ->
     Roots = emqx_schema:roots(high),
     Authz =

+ 10 - 7
apps/emqx_ctl/src/emqx_ctl.erl

@@ -157,18 +157,21 @@ help() ->
                     print("No commands available.~n");
                 Cmds ->
                     print("Usage: ~ts~n", ["emqx ctl"]),
-                    lists:foreach(
-                        fun({_, {Mod, Cmd}, _}) ->
-                            print("~110..-s~n", [""]),
-                            apply(Mod, Cmd, [usage])
-                        end,
-                        Cmds
-                    )
+                    lists:foreach(fun print_usage/1, Cmds)
             end;
         false ->
             print("Command table is initializing.~n")
     end.
 
+print_usage({_, {Mod, Cmd}, Opts}) ->
+    case proplists:get_bool(hidden, Opts) of
+        true ->
+            ok;
+        false ->
+            print("~110..-s~n", [""]),
+            apply(Mod, Cmd, [usage])
+    end.
+
 -spec print(io:format()) -> ok.
 print(Msg) ->
     io:format("~ts", [format(Msg, [])]).

+ 7 - 3
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -970,6 +970,12 @@ close_socket(State = #state{socket = Socket}) ->
 %% Inc incoming/outgoing stats
 
 inc_incoming_stats(Ctx, FrameMod, Packet) ->
+    do_inc_incoming_stats(FrameMod:type(Packet), Ctx, FrameMod, Packet).
+
+%% If a mailformed packet is received, the type of the packet is undefined.
+do_inc_incoming_stats(undefined, _Ctx, _FrameMod, _Packet) ->
+    ok;
+do_inc_incoming_stats(Type, Ctx, FrameMod, Packet) ->
     inc_counter(recv_pkt, 1),
     case FrameMod:is_message(Packet) of
         true ->
@@ -978,9 +984,7 @@ inc_incoming_stats(Ctx, FrameMod, Packet) ->
         false ->
             ok
     end,
-    Name = list_to_atom(
-        lists:concat(["packets.", FrameMod:type(Packet), ".received"])
-    ),
+    Name = list_to_atom(lists:concat(["packets.", Type, ".received"])),
     emqx_gateway_ctx:metrics_inc(Ctx, Name).
 
 inc_outgoing_stats(Ctx, FrameMod, Packet) ->

+ 1 - 1
apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src

@@ -1,6 +1,6 @@
 {application, emqx_gateway_stomp, [
     {description, "Stomp Gateway"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 18 - 8
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -499,7 +499,7 @@ handle_in(
                 [{MountedTopic, SubOpts} | _] ->
                     NSubs = [{SubId, MountedTopic, Ack, SubOpts} | Subs],
                     NChannel1 = NChannel#channel{subscriptions = NSubs},
-                    handle_out(receipt, receipt_id(Headers), NChannel1)
+                    handle_out_and_update(receipt, receipt_id(Headers), NChannel1)
             end;
         {error, ErrMsg, NChannel} ->
             ?SLOG(error, #{
@@ -541,7 +541,7 @@ handle_in(
             false ->
                 {ok, Channel}
         end,
-    handle_out(receipt, receipt_id(Headers), NChannel);
+    handle_out_and_update(receipt, receipt_id(Headers), NChannel);
 %% XXX: How to ack a frame ???
 handle_in(Frame = ?PACKET(?CMD_ACK, Headers), Channel) ->
     case header(<<"transaction">>, Headers) of
@@ -638,12 +638,12 @@ handle_in(
                 ]
         end,
     {ok, Outgoings, Channel};
+handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
+    shutdown(Reason, Channel);
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
-    ?SLOG(error, #{
-        msg => "unexpected_frame_error",
-        reason => Reason
-    }),
-    shutdown(Reason, Channel).
+    ErrMsg = io_lib:format("Frame error: ~0p", [Reason]),
+    Frame = error_frame(undefined, ErrMsg),
+    shutdown(Reason, Frame, Channel).
 
 with_transaction(Headers, Channel = #channel{transaction = Trans}, Fun) ->
     Id = header(<<"transaction">>, Headers),
@@ -769,6 +769,12 @@ handle_out(receipt, ReceiptId, Channel) ->
     Frame = receipt_frame(ReceiptId),
     {ok, {outgoing, Frame}, Channel}.
 
+handle_out_and_update(receipt, undefined, Channel) ->
+    {ok, [{event, updated}], Channel};
+handle_out_and_update(receipt, ReceiptId, Channel) ->
+    Frame = receipt_frame(ReceiptId),
+    {ok, [{outgoing, Frame}, {event, updated}], Channel}.
+
 %%--------------------------------------------------------------------
 %% Handle call
 %%--------------------------------------------------------------------
@@ -812,7 +818,7 @@ handle_call(
                     ),
                     NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts} | Subs],
                     NChannel1 = NChannel#channel{subscriptions = NSubs},
-                    reply({ok, {MountedTopic, NSubOpts}}, NChannel1);
+                    reply({ok, {MountedTopic, NSubOpts}}, [{event, updated}], NChannel1);
                 {error, ErrMsg, NChannel} ->
                     ?SLOG(error, #{
                         msg => "failed_to_subscribe_topic",
@@ -841,6 +847,7 @@ handle_call(
     ),
     reply(
         ok,
+        [{event, updated}],
         Channel#channel{
             subscriptions = lists:keydelete(MountedTopic, 2, Subs)
         }
@@ -1107,6 +1114,9 @@ terminate(Reason, #channel{
 reply(Reply, Channel) ->
     {reply, Reply, Channel}.
 
+reply(Reply, Msgs, Channel) ->
+    {reply, Reply, Msgs, Channel}.
+
 shutdown(Reason, Channel) ->
     {shutdown, Reason, Channel}.
 

+ 63 - 5
apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl

@@ -129,8 +129,8 @@ initial_parse_state(Opts) ->
 
 limit(Opts) ->
     #frame_limit{
-        max_header_num = g(max_header_num, Opts, ?MAX_HEADER_NUM),
-        max_header_length = g(max_header_length, Opts, ?MAX_HEADER_LENGTH),
+        max_header_num = g(max_headers, Opts, ?MAX_HEADER_NUM),
+        max_header_length = g(max_headers_length, Opts, ?MAX_HEADER_LENGTH),
         max_body_length = g(max_body_length, Opts, ?MAX_BODY_LENGTH)
     }.
 
@@ -243,7 +243,9 @@ content_len(#parser_state{headers = Headers}) ->
         false -> none
     end.
 
-new_frame(#parser_state{cmd = Cmd, headers = Headers, acc = Acc}) ->
+new_frame(#parser_state{cmd = Cmd, headers = Headers, acc = Acc, limit = Limit}) ->
+    ok = check_max_headers(Headers, Limit),
+    ok = check_max_body(Acc, Limit),
     #stomp_frame{command = Cmd, headers = Headers, body = Acc}.
 
 acc(Chunk, State = #parser_state{acc = Acc}) when is_binary(Chunk) ->
@@ -261,6 +263,57 @@ unescape($c) -> ?COLON;
 unescape($\\) -> ?BSL;
 unescape(_Ch) -> error(cannnot_unescape).
 
+check_max_headers(
+    Headers,
+    #frame_limit{
+        max_header_num = MaxNum,
+        max_header_length = MaxLen
+    }
+) ->
+    HeadersLen = length(Headers),
+    case HeadersLen > MaxNum of
+        true ->
+            error(
+                {too_many_headers, #{
+                    max_header_num => MaxNum,
+                    received_headers_num => length(Headers)
+                }}
+            );
+        false ->
+            ok
+    end,
+    lists:foreach(
+        fun({Name, Val}) ->
+            Len = byte_size(Name) + byte_size(Val),
+            case Len > MaxLen of
+                true ->
+                    error(
+                        {too_long_header, #{
+                            max_header_length => MaxLen,
+                            found_header_length => Len
+                        }}
+                    );
+                false ->
+                    ok
+            end
+        end,
+        Headers
+    ).
+
+check_max_body(Acc, #frame_limit{max_body_length = MaxLen}) ->
+    Len = byte_size(Acc),
+    case Len > MaxLen of
+        true ->
+            error(
+                {too_long_body, #{
+                    max_body_length => MaxLen,
+                    received_body_length => Len
+                }}
+            );
+        false ->
+            ok
+    end.
+
 %%--------------------------------------------------------------------
 %% Serialize funcs
 %%--------------------------------------------------------------------
@@ -330,7 +383,10 @@ make(Command, Headers, Body) ->
     #stomp_frame{command = Command, headers = Headers, body = Body}.
 
 %% @doc Format a frame
-format(Frame) -> serialize_pkt(Frame, #{}).
+format({frame_error, _Reason} = Error) ->
+    Error;
+format(Frame) ->
+    serialize_pkt(Frame, #{}).
 
 is_message(#stomp_frame{command = CMD}) when
     CMD == ?CMD_SEND;
@@ -373,4 +429,6 @@ type(?CMD_RECEIPT) ->
 type(?CMD_ERROR) ->
     error;
 type(?CMD_HEARTBEAT) ->
-    heartbeat.
+    heartbeat;
+type(_) ->
+    undefined.

+ 148 - 4
apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl

@@ -40,7 +40,12 @@
     "    username = \"${Packet.headers.login}\"\n"
     "    password = \"${Packet.headers.passcode}\"\n"
     "  }\n"
-    "  listeners.tcp.default {\n"
+    " frame {\n"
+    "    max_headers = 10\n"
+    "    max_headers_length = 100\n"
+    "    max_body_length = 1024\n"
+    "  }\n"
+    " listeners.tcp.default {\n"
     "    bind = 61613\n"
     "  }\n"
     "}\n"
@@ -256,6 +261,10 @@ t_subscribe(_) ->
             ]
         ),
 
+        %% assert subscription stats
+        [ClientInfo1] = clients(),
+        ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1),
+
         %% Unsubscribe
         gen_tcp:send(
             Sock,
@@ -278,6 +287,10 @@ t_subscribe(_) ->
             },
             _, _} = parse(Data2),
 
+        %% assert subscription stats
+        [ClientInfo2] = clients(),
+        ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2),
+
         gen_tcp:send(
             Sock,
             serialize(
@@ -697,6 +710,129 @@ t_sticky_packets_truncate_after_headers(_) ->
             ?assert(false, "waiting message timeout")
         end
     end).
+
+t_frame_error_in_connect(_) ->
+    with_connection(fun(Sock) ->
+        gen_tcp:send(
+            Sock,
+            serialize(
+                <<"CONNECT">>,
+                [
+                    {<<"accept-version">>, ?STOMP_VER},
+                    {<<"host">>, <<"127.0.0.1:61613">>},
+                    {<<"login">>, <<"guest">>},
+                    {<<"passcode">>, <<"guest">>},
+                    {<<"heart-beat">>, <<"0,0">>},
+                    {<<"custome_header1">>, <<"val">>},
+                    {<<"custome_header2">>, <<"val">>},
+                    {<<"custome_header3">>, <<"val">>},
+                    {<<"custome_header4">>, <<"val">>},
+                    {<<"custome_header5">>, <<"val">>},
+                    {<<"custome_header6">>, <<"val">>}
+                ]
+            )
+        ),
+        ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0))
+    end).
+
+t_frame_error_too_many_headers(_) ->
+    Frame = serialize(
+        <<"SEND">>,
+        [
+            {<<"destination">>, <<"/queue/foo">>},
+            {<<"custome_header1">>, <<"val">>},
+            {<<"custome_header2">>, <<"val">>},
+            {<<"custome_header3">>, <<"val">>},
+            {<<"custome_header4">>, <<"val">>},
+            {<<"custome_header5">>, <<"val">>},
+            {<<"custome_header6">>, <<"val">>},
+            {<<"custome_header7">>, <<"val">>},
+            {<<"custome_header8">>, <<"val">>},
+            {<<"custome_header9">>, <<"val">>},
+            {<<"custome_header10">>, <<"val">>}
+        ],
+        <<"test">>
+    ),
+    Assert =
+        fun(Sock) ->
+            {ok, Data} = gen_tcp:recv(Sock, 0),
+            {ok, ErrorFrame, _, _} = parse(Data),
+            ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
+            ?assertMatch(
+                match, re:run(ErrorFrame#stomp_frame.body, "too_many_headers", [{capture, none}])
+            ),
+            ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0))
+        end,
+    test_frame_error(Frame, Assert).
+
+t_frame_error_too_long_header(_) ->
+    LongHeaderVal = emqx_utils:bin_to_hexstr(crypto:strong_rand_bytes(50), upper),
+    Frame = serialize(
+        <<"SEND">>,
+        [
+            {<<"destination">>, <<"/queue/foo">>},
+            {<<"custome_header10">>, LongHeaderVal}
+        ],
+        <<"test">>
+    ),
+    Assert =
+        fun(Sock) ->
+            {ok, Data} = gen_tcp:recv(Sock, 0),
+            {ok, ErrorFrame, _, _} = parse(Data),
+            ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
+            ?assertMatch(
+                match, re:run(ErrorFrame#stomp_frame.body, "too_long_header", [{capture, none}])
+            ),
+            ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0))
+        end,
+    test_frame_error(Frame, Assert).
+
+t_frame_error_too_long_body(_) ->
+    LongBody = emqx_utils:bin_to_hexstr(crypto:strong_rand_bytes(513), upper),
+    Frame = serialize(
+        <<"SEND">>,
+        [{<<"destination">>, <<"/queue/foo">>}],
+        LongBody
+    ),
+    Assert =
+        fun(Sock) ->
+            {ok, Data} = gen_tcp:recv(Sock, 0),
+            {ok, ErrorFrame, _, _} = parse(Data),
+            ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
+            ?assertMatch(
+                match, re:run(ErrorFrame#stomp_frame.body, "too_long_body", [{capture, none}])
+            ),
+            ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0))
+        end,
+    test_frame_error(Frame, Assert).
+
+test_frame_error(Frame, AssertFun) ->
+    with_connection(fun(Sock) ->
+        gen_tcp:send(
+            Sock,
+            serialize(
+                <<"CONNECT">>,
+                [
+                    {<<"accept-version">>, ?STOMP_VER},
+                    {<<"host">>, <<"127.0.0.1:61613">>},
+                    {<<"login">>, <<"guest">>},
+                    {<<"passcode">>, <<"guest">>},
+                    {<<"heart-beat">>, <<"0,0">>}
+                ]
+            )
+        ),
+        {ok, Data} = gen_tcp:recv(Sock, 0),
+        {ok,
+            #stomp_frame{
+                command = <<"CONNECTED">>,
+                headers = _,
+                body = _
+            },
+            _, _} = parse(Data),
+        gen_tcp:send(Sock, Frame),
+        AssertFun(Sock)
+    end).
+
 t_rest_clienit_info(_) ->
     with_connection(fun(Sock) ->
         gen_tcp:send(
@@ -802,10 +938,14 @@ t_rest_clienit_info(_) ->
 
         {200, Subs1} = request(get, ClientPath ++ "/subscriptions"),
         ?assertEqual(2, length(Subs1)),
+        {200, StompClient2} = request(get, ClientPath),
+        ?assertMatch(#{subscriptions_cnt := 2}, StompClient2),
 
         {204, _} = request(delete, ClientPath ++ "/subscriptions/t%2Fa"),
         {200, Subs2} = request(get, ClientPath ++ "/subscriptions"),
         ?assertEqual(1, length(Subs2)),
+        {200, StompClient3} = request(get, ClientPath),
+        ?assertMatch(#{subscriptions_cnt := 1}, StompClient3),
 
         %% kickout
         {204, _} = request(delete, ClientPath),
@@ -844,9 +984,9 @@ serialize(Command, Headers, Body) ->
 
 parse(Data) ->
     ProtoEnv = #{
-        max_headers => 10,
-        max_header_length => 1024,
-        max_body_length => 8192
+        max_headers => 1024,
+        max_header_length => 10240,
+        max_body_length => 81920
     },
     Parser = emqx_stomp_frame:initial_parse_state(ProtoEnv),
     emqx_stomp_frame:parse(Data, Parser).
@@ -855,3 +995,7 @@ get_field(command, #stomp_frame{command = Command}) ->
     Command;
 get_field(body, #stomp_frame{body = Body}) ->
     Body.
+
+clients() ->
+    {200, Clients} = request(get, "/gateways/stomp/clients"),
+    maps:get(data, Clients).

+ 1 - 1
apps/emqx_prometheus/grafana_template/ErlangVM.json

@@ -1207,7 +1207,7 @@
             "type": "prometheus",
             "uid": "${datasource}"
           },
-          "expr": "erlang_vm_statistics_run_queues_length_total{job=~\"$job\", instance=\"$instance\"}",
+          "expr": "erlang_vm_statistics_run_queues_length{job=~\"$job\", instance=\"$instance\"}",
           "format": "time_series",
           "intervalFactor": 2,
           "legendFormat": "Run queue length",

+ 1 - 1
apps/emqx_prometheus/rebar.config

@@ -3,7 +3,7 @@
 {deps, [
     {emqx, {path, "../emqx"}},
     {emqx_utils, {path, "../emqx_utils"}},
-    {prometheus, {git, "https://github.com/deadtrickster/prometheus.erl", {tag, "v4.8.1"}}}
+    {prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.1"}}}
 ]}.
 
 {edoc_opts, [{preprocess, true}]}.

+ 3 - 0
apps/emqx_prometheus/src/emqx_prometheus_schema.erl

@@ -170,4 +170,7 @@ validate_push_gateway_server(Url) ->
 
 %% for CI test, CI don't load the whole emqx_conf_schema.
 translation(Name) ->
+    %% translate 'vm_dist_collector', 'mnesia_collector', 'vm_statistics_collector',
+    %% 'vm_system_info_collector', 'vm_memory_collector', 'vm_msacc_collector'
+    %% to prometheus envrionments
     emqx_conf_schema:translation(Name).

+ 5 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -388,7 +388,11 @@ call_start(ResId, Mod, Config) ->
         throw:Error ->
             {error, Error};
         Kind:Error:Stacktrace ->
-            {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}}
+            {error, #{
+                exception => Kind,
+                reason => Error,
+                stacktrace => emqx_utils:redact(Stacktrace)
+            }}
     end.
 
 -spec call_health_check(resource_id(), module(), resource_state()) ->

+ 2 - 0
changes/ce/feat-10985.en.md

@@ -0,0 +1,2 @@
+Renamed emqx ctl command 'cluster_call' to 'conf cluster_sync'.
+The old command 'cluster_call' is still a valid command, but not included in usage info.

+ 1 - 0
changes/ce/fix-10911.en.md

@@ -0,0 +1 @@
+The error message and log entry that appear when one tries to create a bridge with a name the exceeds 255 bytes is now easier to understand.

+ 1 - 0
changes/ce/fix-10977.en.md

@@ -0,0 +1 @@
+Fix delay in updating subscription count metric and correct configuration issues in Stomp gateway.

+ 3 - 0
changes/ce/perf-10941.en.md

@@ -0,0 +1,3 @@
+Improve the collection speed of Prometheus metrics when setting
+`prometheus.vm_dist_collector=disabled` and
+metric `erlang_vm_statistics_run_queues_length_total` is renamed to `erlang_vm_statistics_run_queues_length`

+ 1 - 0
changes/ce/perf-10988.en.md

@@ -0,0 +1 @@
+Improve log security when data bridge creation fails to ensure sensitive data is always obfuscated.

+ 12 - 12
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl

@@ -101,23 +101,23 @@ namespace() -> "bridge_redis".
 roots() -> [].
 
 fields("post_single") ->
-    method_fileds(post, redis_single);
+    method_fields(post, redis_single);
 fields("post_sentinel") ->
-    method_fileds(post, redis_sentinel);
+    method_fields(post, redis_sentinel);
 fields("post_cluster") ->
-    method_fileds(post, redis_cluster);
+    method_fields(post, redis_cluster);
 fields("put_single") ->
-    method_fileds(put, redis_single);
+    method_fields(put, redis_single);
 fields("put_sentinel") ->
-    method_fileds(put, redis_sentinel);
+    method_fields(put, redis_sentinel);
 fields("put_cluster") ->
-    method_fileds(put, redis_cluster);
+    method_fields(put, redis_cluster);
 fields("get_single") ->
-    method_fileds(get, redis_single);
+    method_fields(get, redis_single);
 fields("get_sentinel") ->
-    method_fileds(get, redis_sentinel);
+    method_fields(get, redis_sentinel);
 fields("get_cluster") ->
-    method_fileds(get, redis_cluster);
+    method_fields(get, redis_cluster);
 fields(Type) when
     Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster
 ->
@@ -126,16 +126,16 @@ fields(Type) when
 fields("creation_opts_" ++ Type) ->
     resource_creation_fields(Type).
 
-method_fileds(post, ConnectorType) ->
+method_fields(post, ConnectorType) ->
     redis_bridge_common_fields(ConnectorType) ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType);
-method_fileds(get, ConnectorType) ->
+method_fields(get, ConnectorType) ->
     redis_bridge_common_fields(ConnectorType) ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType) ++
         emqx_bridge_schema:status_fields();
-method_fileds(put, ConnectorType) ->
+method_fields(put, ConnectorType) ->
     redis_bridge_common_fields(ConnectorType) ++
         connector_fields(ConnectorType).
 

+ 9 - 1
mix.exs

@@ -6,7 +6,7 @@ defmodule EMQXUmbrella.MixProject do
   The purpose of this file is to configure the release of EMQX under
   Mix.  Since EMQX uses its own configuration conventions and startup
   procedures, one cannot simply use `iex -S mix`.  Instead, it's
-  recommendd to build and use the release.
+  recommended to build and use the release.
 
   ## Profiles
 
@@ -736,6 +736,7 @@ defmodule EMQXUmbrella.MixProject do
   defp template_vars(release, release_type, :bin = _package_type, edition_type) do
     [
       emqx_default_erlang_cookie: default_cookie(),
+      emqx_configuration_doc: emqx_configuration_doc(edition_type),
       platform_data_dir: "data",
       platform_etc_dir: "etc",
       platform_plugins_dir: "plugins",
@@ -758,6 +759,7 @@ defmodule EMQXUmbrella.MixProject do
   defp template_vars(release, release_type, :pkg = _package_type, edition_type) do
     [
       emqx_default_erlang_cookie: default_cookie(),
+      emqx_configuration_doc: emqx_configuration_doc(edition_type),
       platform_data_dir: "/var/lib/emqx",
       platform_etc_dir: "/etc/emqx",
       platform_plugins_dir: "/var/lib/emqx/plugins",
@@ -791,6 +793,12 @@ defmodule EMQXUmbrella.MixProject do
     end
   end
 
+  defp emqx_configuration_doc(:enterprise),
+    do: "https://docs.emqx.com/en/enterprise/v5.0/configuration/configuration.html"
+
+  defp emqx_configuration_doc(:community),
+    do: "https://www.emqx.io/docs/en/v5.0/configuration/configuration.html"
+
   defp emqx_schema_mod(:enterprise), do: :emqx_enterprise_schema
   defp emqx_schema_mod(:community), do: :emqx_conf_schema
 

+ 4 - 0
rebar.config.erl

@@ -345,11 +345,15 @@ overlay_vars(cloud, PkgType, Edition) ->
 overlay_vars_edition(ce) ->
     [
         {emqx_schema_mod, emqx_conf_schema},
+        {emqx_configuration_doc,
+            "https://www.emqx.io/docs/en/v5.0/configuration/configuration.html"},
         {is_enterprise, "no"}
     ];
 overlay_vars_edition(ee) ->
     [
         {emqx_schema_mod, emqx_enterprise_schema},
+        {emqx_configuration_doc,
+            "https://docs.emqx.com/en/enterprise/v5.0/configuration/configuration.html"},
         {is_enterprise, "yes"}
     ].
 

+ 2 - 2
rel/emqx_conf.template.en.md

@@ -84,7 +84,7 @@ There are 4 complex data types in EMQX's HOCON config:
 1. Array: `[ElementType]`
 
 ::: tip Tip
-If map filed name is a positive integer number, it is interpreted as an alternative representation of an `Array`.
+If map field name is a positive integer number, it is interpreted as an alternative representation of an `Array`.
 For example:
 ```
 myarray.1 = 74
@@ -120,7 +120,7 @@ If we consider the whole EMQX config as a tree,
 to reference a primitive value, we can use a dot-separated names form string for
 the path from the tree-root (always a Struct) down to the primitive values at tree-leaves.
 
-Each segment of the dotted string is a Struct filed name or Map key.
+Each segment of the dotted string is a Struct field name or Map key.
 For Array elements, 1-based index is used.
 
 below are some examples