Просмотр исходного кода

Merge remote-tracking branch 'origin/master' into 0623-fix-relup

Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
b85abde9c0

+ 0 - 125
apps/emqx/etc/emqx_edge/vm.args

@@ -1,125 +0,0 @@
-######################################################################
-## Erlang VM Args
-######################################################################
-
-## NOTE:
-##
-## Arguments configured in this file might be overridden by configs from `emqx.conf`.
-##
-## Some basic VM arguments are to be configured in `emqx.conf`,
-## such as `node.name` for `-name` and `node.cooke` for `-setcookie`.
-
-## Sets the maximum number of simultaneously existing processes for this system.
-+P 16384
-## Sets the maximum number of simultaneously existing ports for this system.
-+Q 4096
-
-## Sets the maximum number of ETS tables
-+e 512
-
-## Sets the maximum number of atoms the virtual machine can handle.
-+t 262144
-
-## Set the location of crash dumps
--env ERL_CRASH_DUMP {{ platform_log_dir }}/crash.dump
-
-## Set how many times generational garbages collections can be done without
-## forcing a fullsweep collection.
--env ERL_FULLSWEEP_AFTER 0
-
-## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
-## (Disabled by default..use with caution!)
--heart
-
-## Specify the erlang distributed protocol.
-## Can be one of: inet_tcp, inet6_tcp, inet_tls
-#-proto_dist inet_tcp
-
-## The shell is started in a restricted mode.
-## In this mode, the shell evaluates a function call only if allowed.
-## Prevent user from accidentally calling a function from the prompt that could harm a running system.
--stdlib restricted_shell emqx_restricted_shell
-
-## Specify SSL Options in the file if using SSL for Erlang Distribution.
-## Used only when -proto_dist set to inet_tls
-#-ssl_dist_optfile {{ platform_etc_dir }}/ssl_dist.conf
-
-## Specifies the net_kernel tick time in seconds.
-## This is the approximate time a connected node may be unresponsive until
-## it is considered down and thereby disconnected.
-#-kernel net_ticktime 60
-
-## Sets the distribution buffer busy limit (dist_buf_busy_limit).
-+zdbbl 1024
-
-## Sets default scheduler hint for port parallelism.
-+spp false
-
-## Sets the number of threads in async thread pool. Valid range is 0-1024.
-## Increase the parameter if there are many simultaneous file I/O operations.
-+A 1
-
-## Sets the default heap size of processes to the size Size.
-#+hms 233
-
-## Sets the default binary virtual heap size of processes to the size Size.
-#+hmbs 46422
-
-## Sets the default maximum heap size of processes to the size Size.
-## Defaults to 0, which means that no maximum heap size is used.
-##For more information, see process_flag(max_heap_size, MaxHeapSize).
-#+hmax 0
-
-## Sets the default value for process flag message_queue_data. Defaults to on_heap.
-#+hmqd on_heap | off_heap
-
-## Sets the number of IO pollsets to use when polling for I/O.
-+IOp 1
-
-## Sets the number of IO poll threads to use when polling for I/O.
-+IOt 1
-
-## Sets the number of scheduler threads to create and scheduler threads to set online.
-+S 1:1
-
-## Sets the number of dirty CPU scheduler threads to create and dirty CPU scheduler threads to set online.
-+SDcpu 1:1
-
-## Sets the number of dirty I/O scheduler threads to create.
-+SDio 1
-
-## Suggested stack size, in kilowords, for scheduler threads.
-#+sss 32
-
-## Suggested stack size, in kilowords, for dirty CPU scheduler threads.
-#+sssdcpu 40
-
-## Suggested stack size, in kilowords, for dirty IO scheduler threads.
-#+sssdio 40
-
-## Sets scheduler bind type.
-## Can be one of: u, ns, ts, ps, s, nnts, nnps, tnnps, db
-#+sbt db
-
-## Sets a user-defined CPU topology.
-#+sct L0-3c0-3p0N0:L4-7c0-3p1N1
-
-## Sets the mapping of warning messages for error_logger
-#+W w
-
-## Sets time warp mode: no_time_warp | single_time_warp | multi_time_warp
-#+C no_time_warp
-
-## Prevents loading information about source filenames and line numbers.
-+L
-
-## Specifies how long time (in milliseconds) to spend shutting down the system.
-## See: http://erlang.org/doc/man/erl.html
--shutdown_time 10000
-
-## patches dir
--pa "{{ platform_data_dir }}/patches"
-
-## Mnesia thresholds
--mnesia dump_log_write_threshold 5000
--mnesia dump_log_time_threshold 60000

apps/emqx/etc/emqx_cloud/vm.args → apps/emqx/etc/vm.args.cloud


+ 4 - 2
apps/emqx/src/emqx.appup.src

@@ -2,12 +2,14 @@
 %% Unless you know what you are doing, DO NOT edit manually!!
 %% Unless you know what you are doing, DO NOT edit manually!!
 {VSN,
 {VSN,
   [{"5.0.0",
   [{"5.0.0",
-    [{load_module,emqx_schema,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_schema,brutal_purge,soft_purge,[]},
      {load_module,emqx_release,brutal_purge,soft_purge,[]},
      {load_module,emqx_release,brutal_purge,soft_purge,[]},
      {load_module,emqx_relup}]},
      {load_module,emqx_relup}]},
    {<<".*">>,[]}],
    {<<".*">>,[]}],
   [{"5.0.0",
   [{"5.0.0",
-    [{load_module,emqx_schema,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_schema,brutal_purge,soft_purge,[]},
      {load_module,emqx_release,brutal_purge,soft_purge,[]},
      {load_module,emqx_release,brutal_purge,soft_purge,[]},
      {load_module,emqx_relup}]},
      {load_module,emqx_relup}]},
    {<<".*">>,[]}]}.
    {<<".*">>,[]}]}.

+ 12 - 7
apps/emqx/src/emqx_channel.erl

@@ -513,12 +513,6 @@ handle_in(
                 true ->
                 true ->
                     handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
                     handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
                 false ->
                 false ->
-                    Replace = fun
-                        _Fun(TupleList, [Tuple = {Key, _Value} | More]) ->
-                            _Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More);
-                        _Fun(TupleList, []) ->
-                            TupleList
-                    end,
                     TopicFilters2 = [TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0],
                     TopicFilters2 = [TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0],
                     TopicFilters3 = run_hooks(
                     TopicFilters3 = run_hooks(
                         'client.subscribe',
                         'client.subscribe',
@@ -530,7 +524,18 @@ handle_in(
                         Properties,
                         Properties,
                         Channel
                         Channel
                     ),
                     ),
-                    TupleTopicFilters2 = Replace(TupleTopicFilters0, TupleTopicFilters1),
+                    TupleTopicFilters2 =
+                        lists:foldl(
+                            fun
+                                ({{Topic, Opts = #{delete := true}}, _QoS}, Acc) ->
+                                    Key = {Topic, maps:without([delete], Opts)},
+                                    lists:keydelete(Key, 1, Acc);
+                                (Tuple = {Key, _Value}, Acc) ->
+                                    lists:keyreplace(Key, 1, Acc, Tuple)
+                            end,
+                            TupleTopicFilters0,
+                            TupleTopicFilters1
+                        ),
                     ReasonCodes2 = [
                     ReasonCodes2 = [
                         ReasonCode
                         ReasonCode
                      || {_TopicFilter, ReasonCode} <- TupleTopicFilters2
                      || {_TopicFilter, ReasonCode} <- TupleTopicFilters2

+ 12 - 6
apps/emqx/src/emqx_config.erl

@@ -144,7 +144,7 @@ get_root([RootName | _]) ->
 %% @doc For the given path, get raw root value enclosed in a single-key map.
 %% @doc For the given path, get raw root value enclosed in a single-key map.
 %% key is ensured to be binary.
 %% key is ensured to be binary.
 get_root_raw([RootName | _]) ->
 get_root_raw([RootName | _]) ->
-    #{bin(RootName) => do_get(?RAW_CONF, [RootName], #{})}.
+    #{bin(RootName) => do_get_raw([RootName], #{})}.
 
 
 %% @doc Get a config value for the given path.
 %% @doc Get a config value for the given path.
 %% The path should at least include root config name.
 %% The path should at least include root config name.
@@ -173,7 +173,7 @@ find(KeyPath) ->
     {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
     {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
 find_raw([]) ->
 find_raw([]) ->
     Ref = make_ref(),
     Ref = make_ref(),
-    case do_get(?RAW_CONF, [], Ref) of
+    case do_get_raw([], Ref) of
         Ref -> {not_found, []};
         Ref -> {not_found, []};
         Res -> {ok, Res}
         Res -> {ok, Res}
     end;
     end;
@@ -281,10 +281,10 @@ get_default_value([RootName | _] = KeyPath) ->
     end.
     end.
 
 
 -spec get_raw(emqx_map_lib:config_key_path()) -> term().
 -spec get_raw(emqx_map_lib:config_key_path()) -> term().
-get_raw(KeyPath) -> hocon_tconf:remove_env_meta(do_get(?RAW_CONF, KeyPath)).
+get_raw(KeyPath) -> do_get_raw(KeyPath).
 
 
 -spec get_raw(emqx_map_lib:config_key_path(), term()) -> term().
 -spec get_raw(emqx_map_lib:config_key_path(), term()) -> term().
-get_raw(KeyPath, Default) -> hocon_tconf:remove_env_meta(do_get(?RAW_CONF, KeyPath, Default)).
+get_raw(KeyPath, Default) -> do_get_raw(KeyPath, Default).
 
 
 -spec put_raw(map()) -> ok.
 -spec put_raw(map()) -> ok.
 put_raw(Config) ->
 put_raw(Config) ->
@@ -398,11 +398,11 @@ include_dirs() ->
     [filename:join(emqx:data_dir(), "configs")].
     [filename:join(emqx:data_dir(), "configs")].
 
 
 merge_envs(SchemaMod, RawConf) ->
 merge_envs(SchemaMod, RawConf) ->
-    %% TODO: evil, remove, required should be declared in schema
     Opts = #{
     Opts = #{
         required => false,
         required => false,
         format => map,
         format => map,
-        apply_override_envs => true
+        apply_override_envs => true,
+        check_lazy => true
     },
     },
     hocon_tconf:merge_env_overrides(SchemaMod, RawConf, all, Opts).
     hocon_tconf:merge_env_overrides(SchemaMod, RawConf, all, Opts).
 
 
@@ -571,6 +571,12 @@ load_hocon_file(FileName, LoadType) ->
             #{}
             #{}
     end.
     end.
 
 
+do_get_raw(Path) ->
+    hocon_tconf:remove_env_meta(do_get(?RAW_CONF, Path)).
+
+do_get_raw(Path, Default) ->
+    hocon_tconf:remove_env_meta(do_get(?RAW_CONF, Path, Default)).
+
 do_get(Type, KeyPath) ->
 do_get(Type, KeyPath) ->
     Ref = make_ref(),
     Ref = make_ref(),
     Res = do_get(Type, KeyPath, Ref),
     Res = do_get(Type, KeyPath, Ref),

+ 12 - 12
apps/emqx/src/emqx_schema.erl

@@ -2216,29 +2216,29 @@ str(B) when is_binary(B) ->
 str(S) when is_list(S) ->
 str(S) when is_list(S) ->
     S.
     S.
 
 
-authentication(Type) ->
+authentication(Which) ->
     Desc =
     Desc =
-        case Type of
+        case Which of
             global -> ?DESC(global_authentication);
             global -> ?DESC(global_authentication);
             listener -> ?DESC(listener_authentication)
             listener -> ?DESC(listener_authentication)
         end,
         end,
-    %% authentication schema is lazy to make it more 'plugable'
-    %% the type checks are done in emqx_auth application when it boots.
-    %% and in emqx_authentication_config module for runtime changes.
-    Default = hoconsc:lazy(hoconsc:union([hoconsc:array(typerefl:map())])),
-    %% as the type is lazy, the runtime module injection
+    %% The runtime module injection
     %% from EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY
     %% from EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY
     %% is for now only affecting document generation.
     %% is for now only affecting document generation.
     %% maybe in the future, we can find a more straightforward way to support
     %% maybe in the future, we can find a more straightforward way to support
     %% * document generation (at compile time)
     %% * document generation (at compile time)
     %% * type checks before boot (in bin/emqx config generation)
     %% * type checks before boot (in bin/emqx config generation)
     %% * type checks at runtime (when changing configs via management API)
     %% * type checks at runtime (when changing configs via management API)
+    Type0 =
+        case persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, undefined) of
+            undefined -> hoconsc:array(typerefl:map());
+            Module -> Module:root_type()
+        end,
+    %% It is a lazy type because when handing runtime update requests
+    %% the config is not checked by emqx_schema, but by the injected schema
+    Type = hoconsc:lazy(Type0),
     #{
     #{
-        type =>
-            case persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, undefined) of
-                undefined -> Default;
-                Module -> hoconsc:lazy(Module:root_type())
-            end,
+        type => Type,
         desc => Desc
         desc => Desc
     }.
     }.
 
 

+ 22 - 0
apps/emqx/test/emqx_broker_SUITE.erl

@@ -715,6 +715,24 @@ t_connack_auth_error(Config) when is_list(Config) ->
     ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')),
     ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')),
     ok.
     ok.
 
 
+t_handle_in_empty_client_subscribe_hook({init, Config}) ->
+    Hook = {?MODULE, client_subscribe_delete_all_hook, []},
+    ok = emqx_hooks:put('client.subscribe', Hook, _Priority = 100),
+    Config;
+t_handle_in_empty_client_subscribe_hook({'end', _Config}) ->
+    emqx_hooks:del('client.subscribe', {?MODULE, client_subscribe_delete_all_hook}),
+    ok;
+t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
+    {ok, C} = emqtt:start_link(),
+    {ok, _} = emqtt:connect(C),
+    try
+        {ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
+        ?assertEqual([], RCs),
+        ok
+    after
+        emqtt:disconnect(C)
+    end.
+
 wait_for_events(Action, Kinds) ->
 wait_for_events(Action, Kinds) ->
     wait_for_events(Action, Kinds, 500).
     wait_for_events(Action, Kinds, 500).
 
 
@@ -771,3 +789,7 @@ recv_msgs(Count, Msgs) ->
     after 100 ->
     after 100 ->
         Msgs
         Msgs
     end.
     end.
+
+client_subscribe_delete_all_hook(_ClientInfo, _Username, TopicFilter) ->
+    EmptyFilters = [{T, Opts#{delete => true}} || {T, Opts} <- TopicFilter],
+    {stop, EmptyFilters}.

+ 7 - 7
apps/emqx_conf/src/emqx_conf_app.erl

@@ -139,13 +139,7 @@ copy_override_conf_from_core_node() ->
                             copy_override_conf_from_core_node()
                             copy_override_conf_from_core_node()
                     end;
                     end;
                 _ ->
                 _ ->
-                    SortFun = fun(
-                        {ok, #{wall_clock := W1}},
-                        {ok, #{wall_clock := W2}}
-                    ) ->
-                        W1 > W2
-                    end,
-                    [{ok, Info} | _] = lists:sort(SortFun, Ready),
+                    [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
                     #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
                     #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
                     Msg = #{
                     Msg = #{
                         msg => "copy_overide_conf_from_core_node_success",
                         msg => "copy_overide_conf_from_core_node_success",
@@ -173,3 +167,9 @@ should_proceed_with_boot() ->
             %% be up.  Try again.
             %% be up.  Try again.
             false
             false
     end.
     end.
+
+conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true;
+conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) ->
+    W1 > W2;
+conf_sort({ok, _}, {ok, _}) ->
+    false.

+ 1 - 1
mix.exs

@@ -370,7 +370,7 @@ defmodule EMQXUmbrella.MixProject do
     vm_args_template_path =
     vm_args_template_path =
       case release_type do
       case release_type do
         :cloud ->
         :cloud ->
-          "apps/emqx/etc/emqx_cloud/vm.args"
+          "apps/emqx/etc/vm.args.cloud"
       end
       end
 
 
     render_template(
     render_template(

+ 1 - 1
rebar.config.erl

@@ -412,7 +412,7 @@ emqx_etc_overlay(ReleaseType, Edition) ->
         emqx_etc_overlay_common().
         emqx_etc_overlay_common().
 
 
 emqx_etc_overlay_per_rel(cloud) ->
 emqx_etc_overlay_per_rel(cloud) ->
-    [{"{{base_dir}}/lib/emqx/etc/emqx_cloud/vm.args", "etc/vm.args"}].
+    [{"{{base_dir}}/lib/emqx/etc/vm.args.cloud", "etc/vm.args"}].
 
 
 emqx_etc_overlay_common() ->
 emqx_etc_overlay_common() ->
     [{"{{base_dir}}/lib/emqx/etc/ssl_dist.conf", "etc/ssl_dist.conf"}].
     [{"{{base_dir}}/lib/emqx/etc/ssl_dist.conf", "etc/ssl_dist.conf"}].