Просмотр исходного кода

feat: hide broker rootkey and move broker.shared_subscription_strategy to mqtt.shared_subscription_strategy

zhongwencool 2 лет назад
Родитель
Сommit
50041fdddf

+ 1 - 0
apps/emqx/include/emqx_schema.hrl

@@ -19,5 +19,6 @@
 -define(TOMBSTONE_TYPE, marked_for_deletion).
 -define(TOMBSTONE_VALUE, <<"marked_for_deletion">>).
 -define(TOMBSTONE_CONFIG_CHANGE_REQ, mark_it_for_deletion).
+-define(CONFIG_NOT_FOUND_MAGIC, '$0tFound').
 
 -endif.

+ 1 - 1
apps/emqx/src/emqx_config.erl

@@ -19,6 +19,7 @@
 -elvis([{elvis_style, god_modules, disable}]).
 -include("logger.hrl").
 -include("emqx.hrl").
+-include("emqx_schema.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -export([
@@ -108,7 +109,6 @@
 -define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
 -define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]).
 
--define(CONFIG_NOT_FOUND_MAGIC, '$0tFound').
 -define(MAX_KEEP_BACKUP_CONFIGS, 10).
 
 -export_type([

+ 286 - 262
apps/emqx/src/emqx_schema.erl

@@ -226,7 +226,10 @@ roots(medium) ->
         {"broker",
             sc(
                 ref("broker"),
-                #{desc => ?DESC(broker)}
+                #{
+                    desc => ?DESC(broker),
+                    importance => ?IMPORTANCE_HIDDEN
+                }
             )},
         {"sys_topics",
             sc(
@@ -439,251 +442,7 @@ fields("authz_cache") ->
             )}
     ];
 fields("mqtt") ->
-    [
-        {"idle_timeout",
-            sc(
-                hoconsc:union([infinity, duration()]),
-                #{
-                    default => <<"15s">>,
-                    desc => ?DESC(mqtt_idle_timeout)
-                }
-            )},
-        {"max_packet_size",
-            sc(
-                bytesize(),
-                #{
-                    default => <<"1MB">>,
-                    desc => ?DESC(mqtt_max_packet_size)
-                }
-            )},
-        {"max_clientid_len",
-            sc(
-                range(23, 65535),
-                #{
-                    default => 65535,
-                    desc => ?DESC(mqtt_max_clientid_len)
-                }
-            )},
-        {"max_topic_levels",
-            sc(
-                range(1, 65535),
-                #{
-                    default => 128,
-                    desc => ?DESC(mqtt_max_topic_levels)
-                }
-            )},
-        {"max_qos_allowed",
-            sc(
-                qos(),
-                #{
-                    default => 2,
-                    desc => ?DESC(mqtt_max_qos_allowed)
-                }
-            )},
-        {"max_topic_alias",
-            sc(
-                range(0, 65535),
-                #{
-                    default => 65535,
-                    desc => ?DESC(mqtt_max_topic_alias)
-                }
-            )},
-        {"retain_available",
-            sc(
-                boolean(),
-                #{
-                    default => true,
-                    desc => ?DESC(mqtt_retain_available)
-                }
-            )},
-        {"wildcard_subscription",
-            sc(
-                boolean(),
-                #{
-                    default => true,
-                    desc => ?DESC(mqtt_wildcard_subscription)
-                }
-            )},
-        {"shared_subscription",
-            sc(
-                boolean(),
-                #{
-                    default => true,
-                    desc => ?DESC(mqtt_shared_subscription)
-                }
-            )},
-        {"exclusive_subscription",
-            sc(
-                boolean(),
-                #{
-                    default => false,
-                    desc => ?DESC(mqtt_exclusive_subscription)
-                }
-            )},
-        {"ignore_loop_deliver",
-            sc(
-                boolean(),
-                #{
-                    default => false,
-                    desc => ?DESC(mqtt_ignore_loop_deliver)
-                }
-            )},
-        {"strict_mode",
-            sc(
-                boolean(),
-                #{
-                    default => false,
-                    desc => ?DESC(mqtt_strict_mode)
-                }
-            )},
-        {"response_information",
-            sc(
-                string(),
-                #{
-                    default => <<"">>,
-                    desc => ?DESC(mqtt_response_information)
-                }
-            )},
-        {"server_keepalive",
-            sc(
-                hoconsc:union([integer(), disabled]),
-                #{
-                    default => disabled,
-                    desc => ?DESC(mqtt_server_keepalive)
-                }
-            )},
-        {"keepalive_backoff",
-            sc(
-                number(),
-                #{
-                    default => ?DEFAULT_BACKOFF,
-                    %% Must add required => false, zone schema has no default.
-                    required => false,
-                    importance => ?IMPORTANCE_HIDDEN
-                }
-            )},
-        {"keepalive_multiplier",
-            sc(
-                number(),
-                #{
-                    default => ?DEFAULT_MULTIPLIER,
-                    validator => fun ?MODULE:validate_keepalive_multiplier/1,
-                    desc => ?DESC(mqtt_keepalive_multiplier)
-                }
-            )},
-        {"max_subscriptions",
-            sc(
-                hoconsc:union([range(1, inf), infinity]),
-                #{
-                    default => infinity,
-                    desc => ?DESC(mqtt_max_subscriptions)
-                }
-            )},
-        {"upgrade_qos",
-            sc(
-                boolean(),
-                #{
-                    default => false,
-                    desc => ?DESC(mqtt_upgrade_qos)
-                }
-            )},
-        {"max_inflight",
-            sc(
-                range(1, 65535),
-                #{
-                    default => 32,
-                    desc => ?DESC(mqtt_max_inflight)
-                }
-            )},
-        {"retry_interval",
-            sc(
-                duration(),
-                #{
-                    default => <<"30s">>,
-                    desc => ?DESC(mqtt_retry_interval)
-                }
-            )},
-        {"max_awaiting_rel",
-            sc(
-                hoconsc:union([integer(), infinity]),
-                #{
-                    default => 100,
-                    desc => ?DESC(mqtt_max_awaiting_rel)
-                }
-            )},
-        {"await_rel_timeout",
-            sc(
-                duration(),
-                #{
-                    default => <<"300s">>,
-                    desc => ?DESC(mqtt_await_rel_timeout)
-                }
-            )},
-        {"session_expiry_interval",
-            sc(
-                duration(),
-                #{
-                    default => <<"2h">>,
-                    desc => ?DESC(mqtt_session_expiry_interval)
-                }
-            )},
-        {"max_mqueue_len",
-            sc(
-                hoconsc:union([non_neg_integer(), infinity]),
-                #{
-                    default => 1000,
-                    desc => ?DESC(mqtt_max_mqueue_len)
-                }
-            )},
-        {"mqueue_priorities",
-            sc(
-                hoconsc:union([disabled, map()]),
-                #{
-                    default => disabled,
-                    desc => ?DESC(mqtt_mqueue_priorities)
-                }
-            )},
-        {"mqueue_default_priority",
-            sc(
-                hoconsc:enum([highest, lowest]),
-                #{
-                    default => lowest,
-                    desc => ?DESC(mqtt_mqueue_default_priority)
-                }
-            )},
-        {"mqueue_store_qos0",
-            sc(
-                boolean(),
-                #{
-                    default => true,
-                    desc => ?DESC(mqtt_mqueue_store_qos0)
-                }
-            )},
-        {"use_username_as_clientid",
-            sc(
-                boolean(),
-                #{
-                    default => false,
-                    desc => ?DESC(mqtt_use_username_as_clientid)
-                }
-            )},
-        {"peer_cert_as_username",
-            sc(
-                hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
-                #{
-                    default => disabled,
-                    desc => ?DESC(mqtt_peer_cert_as_username)
-                }
-            )},
-        {"peer_cert_as_clientid",
-            sc(
-                hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
-                #{
-                    default => disabled,
-                    desc => ?DESC(mqtt_peer_cert_as_clientid)
-                }
-            )}
-    ];
+    mqtt_general() ++ mqtt_session();
 fields("zone") ->
     emqx_zone_schema:zones_without_default();
 fields("flapping_detect") ->
@@ -1563,22 +1322,7 @@ fields("broker") ->
                     desc => ?DESC(broker_session_locking_strategy)
                 }
             )},
-        {"shared_subscription_strategy",
-            sc(
-                hoconsc:enum([
-                    random,
-                    round_robin,
-                    round_robin_per_group,
-                    sticky,
-                    local,
-                    hash_topic,
-                    hash_clientid
-                ]),
-                #{
-                    default => round_robin,
-                    desc => ?DESC(broker_shared_subscription_strategy)
-                }
-            )},
+        shared_subscription_strategy(),
         {"shared_dispatch_ack_enabled",
             sc(
                 boolean(),
@@ -3564,3 +3308,283 @@ flapping_detect_converter(Conf = #{<<"window_time">> := <<"disable">>}, _Opts) -
     Conf#{<<"window_time">> => ?DEFAULT_WINDOW_TIME, <<"enable">> => false};
 flapping_detect_converter(Conf, _Opts) ->
     Conf.
+mqtt_general() ->
+    [
+        {"idle_timeout",
+            sc(
+                hoconsc:union([infinity, duration()]),
+                #{
+                    default => <<"15s">>,
+                    desc => ?DESC(mqtt_idle_timeout)
+                }
+            )},
+        {"max_packet_size",
+            sc(
+                bytesize(),
+                #{
+                    default => <<"1MB">>,
+                    desc => ?DESC(mqtt_max_packet_size)
+                }
+            )},
+        {"max_clientid_len",
+            sc(
+                range(23, 65535),
+                #{
+                    default => 65535,
+                    desc => ?DESC(mqtt_max_clientid_len)
+                }
+            )},
+        {"max_topic_levels",
+            sc(
+                range(1, 65535),
+                #{
+                    default => 128,
+                    desc => ?DESC(mqtt_max_topic_levels)
+                }
+            )},
+        {"max_topic_alias",
+            sc(
+                range(0, 65535),
+                #{
+                    default => 65535,
+                    desc => ?DESC(mqtt_max_topic_alias)
+                }
+            )},
+        {"retain_available",
+            sc(
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC(mqtt_retain_available)
+                }
+            )},
+        {"wildcard_subscription",
+            sc(
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC(mqtt_wildcard_subscription)
+                }
+            )},
+        {"shared_subscription",
+            sc(
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC(mqtt_shared_subscription)
+                }
+            )},
+        shared_subscription_strategy(),
+        {"exclusive_subscription",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    desc => ?DESC(mqtt_exclusive_subscription)
+                }
+            )},
+        {"ignore_loop_deliver",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    desc => ?DESC(mqtt_ignore_loop_deliver)
+                }
+            )},
+        {"strict_mode",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    desc => ?DESC(mqtt_strict_mode)
+                }
+            )},
+        {"response_information",
+            sc(
+                string(),
+                #{
+                    default => <<"">>,
+                    desc => ?DESC(mqtt_response_information)
+                }
+            )},
+        {"server_keepalive",
+            sc(
+                hoconsc:union([integer(), disabled]),
+                #{
+                    default => disabled,
+                    desc => ?DESC(mqtt_server_keepalive)
+                }
+            )},
+        {"keepalive_backoff",
+            sc(
+                number(),
+                #{
+                    default => ?DEFAULT_BACKOFF,
+                    %% Must add required => false, zone schema has no default.
+                    required => false,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {"keepalive_multiplier",
+            sc(
+                number(),
+                #{
+                    default => ?DEFAULT_MULTIPLIER,
+                    validator => fun ?MODULE:validate_keepalive_multiplier/1,
+                    desc => ?DESC(mqtt_keepalive_multiplier)
+                }
+            )},
+        {"retry_interval",
+            sc(
+                duration(),
+                #{
+                    default => <<"30s">>,
+                    desc => ?DESC(mqtt_retry_interval)
+                }
+            )},
+        {"use_username_as_clientid",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    desc => ?DESC(mqtt_use_username_as_clientid)
+                }
+            )},
+        {"peer_cert_as_username",
+            sc(
+                hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
+                #{
+                    default => disabled,
+                    desc => ?DESC(mqtt_peer_cert_as_username)
+                }
+            )},
+        {"peer_cert_as_clientid",
+            sc(
+                hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
+                #{
+                    default => disabled,
+                    desc => ?DESC(mqtt_peer_cert_as_clientid)
+                }
+            )}
+    ].
+%% All session's importance should be lower than general part to organize document.
+mqtt_session() ->
+    [
+        {"session_expiry_interval",
+            sc(
+                duration(),
+                #{
+                    default => <<"2h">>,
+                    desc => ?DESC(mqtt_session_expiry_interval),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"max_awaiting_rel",
+            sc(
+                hoconsc:union([integer(), infinity]),
+                #{
+                    default => 100,
+                    desc => ?DESC(mqtt_max_awaiting_rel),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"max_qos_allowed",
+            sc(
+                qos(),
+                #{
+                    default => 2,
+                    desc => ?DESC(mqtt_max_qos_allowed),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"mqueue_priorities",
+            sc(
+                hoconsc:union([disabled, map()]),
+                #{
+                    default => disabled,
+                    desc => ?DESC(mqtt_mqueue_priorities),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"mqueue_default_priority",
+            sc(
+                hoconsc:enum([highest, lowest]),
+                #{
+                    default => lowest,
+                    desc => ?DESC(mqtt_mqueue_default_priority),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"mqueue_store_qos0",
+            sc(
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC(mqtt_mqueue_store_qos0),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"max_mqueue_len",
+            sc(
+                hoconsc:union([non_neg_integer(), infinity]),
+                #{
+                    default => 1000,
+                    desc => ?DESC(mqtt_max_mqueue_len),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"max_inflight",
+            sc(
+                range(1, 65535),
+                #{
+                    default => 32,
+                    desc => ?DESC(mqtt_max_inflight),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"max_subscriptions",
+            sc(
+                hoconsc:union([range(1, inf), infinity]),
+                #{
+                    default => infinity,
+                    desc => ?DESC(mqtt_max_subscriptions),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"upgrade_qos",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    desc => ?DESC(mqtt_upgrade_qos),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
+        {"await_rel_timeout",
+            sc(
+                duration(),
+                #{
+                    default => <<"300s">>,
+                    desc => ?DESC(mqtt_await_rel_timeout),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )}
+    ].
+
+shared_subscription_strategy() ->
+    {"shared_subscription_strategy",
+        sc(
+            hoconsc:enum([
+                random,
+                round_robin,
+                round_robin_per_group,
+                sticky,
+                local,
+                hash_topic,
+                hash_clientid
+            ]),
+            #{
+                default => round_robin,
+                desc => ?DESC(broker_shared_subscription_strategy)
+            }
+        )}.

+ 12 - 23
apps/emqx/src/emqx_shared_sub.erl

@@ -18,6 +18,7 @@
 
 -behaviour(gen_server).
 
+-include("emqx_schema.hrl").
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
 -include("logger.hrl").
@@ -158,24 +159,18 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
 
 -spec strategy(emqx_types:group()) -> strategy().
 strategy(Group) ->
-    try
-        emqx:get_config([
-            broker,
-            shared_subscription_group,
-            binary_to_existing_atom(Group),
-            strategy
-        ])
+    try binary_to_existing_atom(Group) of
+        GroupAtom ->
+            Key = [broker, shared_subscription_group, GroupAtom, strategy],
+            case emqx:get_config(Key, ?CONFIG_NOT_FOUND_MAGIC) of
+                ?CONFIG_NOT_FOUND_MAGIC -> get_default_shared_subscription_strategy();
+                Strategy -> Strategy
+            end
     catch
-        error:{config_not_found, _} ->
-            get_default_shared_subscription_strategy();
         error:badarg ->
             get_default_shared_subscription_strategy()
     end.
 
--spec ack_enabled() -> boolean().
-ack_enabled() ->
-    emqx:get_config([broker, shared_dispatch_ack_enabled], false).
-
 do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
     %% Deadlock otherwise
     SubPid ! {deliver, Topic, Msg},
@@ -187,14 +182,8 @@ do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
 do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
     %% Retry implies all subscribers nack:ed, send again without ack
     send(SubPid, Topic, {deliver, Topic, Msg});
-do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
-    case ack_enabled() of
-        true ->
-            %% TODO: delete this clase after 5.1.0
-            do_dispatch_with_ack(SubPid, Group, Topic, Msg);
-        false ->
-            send(SubPid, Topic, {deliver, Topic, Msg})
-    end.
+do_dispatch(SubPid, _Group, Topic, Msg, fresh) ->
+    send(SubPid, Topic, {deliver, Topic, Msg}).
 
 -spec do_dispatch_with_ack(pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message()) ->
     ok | {error, _}.
@@ -240,7 +229,7 @@ with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) ->
 with_redispatch_to(Msg, Group, Topic) ->
     emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
 
-%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
+%% @hidden Redispatch is needed only for the messages with redispatch_to header added.
 is_redispatch_needed(#message{} = Msg) ->
     case get_redispatch_to(Msg) of
         ?REDISPATCH_TO(_, _) ->
@@ -555,4 +544,4 @@ delete_route_if_needed({Group, Topic} = GroupTopic) ->
     end).
 
 get_default_shared_subscription_strategy() ->
-    emqx:get_config([broker, shared_subscription_strategy]).
+    emqx:get_config([mqtt, shared_subscription_strategy]).

+ 1 - 0
changes/ce/feat-11034.en.md

@@ -0,0 +1 @@
+Hide the broker and move the `broker.shared_subscription_strategy` to `mqtt.shared_subscription_strategy` as it belongs to `mqtt`.