|
|
@@ -1,298 +0,0 @@
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
-%%
|
|
|
-%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
-%% you may not use this file except in compliance with the License.
|
|
|
-%% You may obtain a copy of the License at
|
|
|
-%%
|
|
|
-%% http://www.apache.org/licenses/LICENSE-2.0
|
|
|
-%%
|
|
|
-%% Unless required by applicable law or agreed to in writing, software
|
|
|
-%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
-%% See the License for the specific language governing permissions and
|
|
|
-%% limitations under the License.
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
--module(emqx_zone).
|
|
|
-
|
|
|
--behaviour(gen_server).
|
|
|
-
|
|
|
--include("emqx.hrl").
|
|
|
--include("emqx_mqtt.hrl").
|
|
|
--include("logger.hrl").
|
|
|
--include("types.hrl").
|
|
|
-
|
|
|
--logger_header("[Zone]").
|
|
|
-
|
|
|
--compile({inline,
|
|
|
- [ idle_timeout/1
|
|
|
- , publish_limit/1
|
|
|
- , ratelimit/1
|
|
|
- , mqtt_frame_options/1
|
|
|
- , mqtt_strict_mode/1
|
|
|
- , max_packet_size/1
|
|
|
- , mountpoint/1
|
|
|
- , use_username_as_clientid/1
|
|
|
- , stats_timer/1
|
|
|
- , enable_stats/1
|
|
|
- , enable_acl/1
|
|
|
- , enable_ban/1
|
|
|
- , enable_flapping_detect/1
|
|
|
- , ignore_loop_deliver/1
|
|
|
- , server_keepalive/1
|
|
|
- , keepalive_backoff/1
|
|
|
- , max_inflight/1
|
|
|
- , session_expiry_interval/1
|
|
|
- , force_gc_policy/1
|
|
|
- , force_shutdown_policy/1
|
|
|
- , response_information/1
|
|
|
- , quota_policy/1
|
|
|
- , get_env/2
|
|
|
- , get_env/3
|
|
|
- ]}).
|
|
|
-
|
|
|
-%% APIs
|
|
|
--export([start_link/0, stop/0]).
|
|
|
-
|
|
|
-%% Zone Option API
|
|
|
--export([ idle_timeout/1
|
|
|
- %% XXX: Dedeprecated at v4.2
|
|
|
- , publish_limit/1
|
|
|
- , ratelimit/1
|
|
|
- , mqtt_frame_options/1
|
|
|
- , mqtt_strict_mode/1
|
|
|
- , max_packet_size/1
|
|
|
- , mountpoint/1
|
|
|
- , use_username_as_clientid/1
|
|
|
- , stats_timer/1
|
|
|
- , enable_stats/1
|
|
|
- , enable_acl/1
|
|
|
- , enable_ban/1
|
|
|
- , enable_flapping_detect/1
|
|
|
- , ignore_loop_deliver/1
|
|
|
- , server_keepalive/1
|
|
|
- , keepalive_backoff/1
|
|
|
- , max_inflight/1
|
|
|
- , session_expiry_interval/1
|
|
|
- , force_gc_policy/1
|
|
|
- , force_shutdown_policy/1
|
|
|
- , response_information/1
|
|
|
- , quota_policy/1
|
|
|
- ]).
|
|
|
-
|
|
|
--export([ init_gc_state/1
|
|
|
- , oom_policy/1
|
|
|
- ]).
|
|
|
-
|
|
|
-%% Zone API
|
|
|
--export([ get_env/2
|
|
|
- , get_env/3
|
|
|
- , set_env/3
|
|
|
- , unset_env/2
|
|
|
- , unset_all_env/0
|
|
|
- ]).
|
|
|
-
|
|
|
--export([force_reload/0]).
|
|
|
-
|
|
|
-%% gen_server callbacks
|
|
|
--export([ init/1
|
|
|
- , handle_call/3
|
|
|
- , handle_cast/2
|
|
|
- , handle_info/2
|
|
|
- , terminate/2
|
|
|
- , code_change/3
|
|
|
- ]).
|
|
|
-
|
|
|
--import(emqx_misc, [maybe_apply/2]).
|
|
|
-
|
|
|
--export_type([zone/0]).
|
|
|
-
|
|
|
--type(zone() :: atom()).
|
|
|
-
|
|
|
--define(TAB, ?MODULE).
|
|
|
--define(SERVER, ?MODULE).
|
|
|
--define(DEFAULT_IDLE_TIMEOUT, 30000).
|
|
|
--define(KEY(Zone, Key), {?MODULE, Zone, Key}).
|
|
|
-
|
|
|
--spec(start_link() -> startlink_ret()).
|
|
|
-start_link() ->
|
|
|
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
|
-
|
|
|
--spec(stop() -> ok).
|
|
|
-stop() ->
|
|
|
- gen_server:stop(?SERVER).
|
|
|
-
|
|
|
--spec(init_gc_state(zone()) -> maybe(emqx_gc:gc_state())).
|
|
|
-init_gc_state(Zone) ->
|
|
|
- maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)).
|
|
|
-
|
|
|
--spec(oom_policy(zone()) -> emqx_types:oom_policy()).
|
|
|
-oom_policy(Zone) -> force_shutdown_policy(Zone).
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Zone Options API
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
--spec(idle_timeout(zone()) -> pos_integer()).
|
|
|
-idle_timeout(Zone) ->
|
|
|
- get_env(Zone, idle_timeout, ?DEFAULT_IDLE_TIMEOUT).
|
|
|
-
|
|
|
--spec(publish_limit(zone()) -> maybe(esockd_rate_limit:config())).
|
|
|
-publish_limit(Zone) ->
|
|
|
- get_env(Zone, publish_limit).
|
|
|
-
|
|
|
--spec(ratelimit(zone()) -> [emqx_limiter:specs()]).
|
|
|
-ratelimit(Zone) ->
|
|
|
- get_env(Zone, ratelimit, []).
|
|
|
-
|
|
|
--spec(mqtt_frame_options(zone()) -> emqx_frame:options()).
|
|
|
-mqtt_frame_options(Zone) ->
|
|
|
- #{strict_mode => mqtt_strict_mode(Zone),
|
|
|
- max_size => max_packet_size(Zone)
|
|
|
- }.
|
|
|
-
|
|
|
--spec(mqtt_strict_mode(zone()) -> boolean()).
|
|
|
-mqtt_strict_mode(Zone) ->
|
|
|
- get_env(Zone, strict_mode, false).
|
|
|
-
|
|
|
--spec(max_packet_size(zone()) -> integer()).
|
|
|
-max_packet_size(Zone) ->
|
|
|
- get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE).
|
|
|
-
|
|
|
--spec(mountpoint(zone()) -> maybe(emqx_mountpoint:mountpoint())).
|
|
|
-mountpoint(Zone) -> get_env(Zone, mountpoint).
|
|
|
-
|
|
|
--spec(use_username_as_clientid(zone()) -> boolean()).
|
|
|
-use_username_as_clientid(Zone) ->
|
|
|
- get_env(Zone, use_username_as_clientid, false).
|
|
|
-
|
|
|
--spec(stats_timer(zone()) -> undefined | disabled).
|
|
|
-stats_timer(Zone) ->
|
|
|
- case enable_stats(Zone) of true -> undefined; false -> disabled end.
|
|
|
-
|
|
|
--spec(enable_stats(zone()) -> boolean()).
|
|
|
-enable_stats(Zone) ->
|
|
|
- get_env(Zone, enable_stats, true).
|
|
|
-
|
|
|
--spec(enable_acl(zone()) -> boolean()).
|
|
|
-enable_acl(Zone) ->
|
|
|
- get_env(Zone, enable_acl, true).
|
|
|
-
|
|
|
--spec(enable_ban(zone()) -> boolean()).
|
|
|
-enable_ban(Zone) ->
|
|
|
- get_env(Zone, enable_ban, false).
|
|
|
-
|
|
|
--spec(enable_flapping_detect(zone()) -> boolean()).
|
|
|
-enable_flapping_detect(Zone) ->
|
|
|
- get_env(Zone, enable_flapping_detect, false).
|
|
|
-
|
|
|
--spec(ignore_loop_deliver(zone()) -> boolean()).
|
|
|
-ignore_loop_deliver(Zone) ->
|
|
|
- get_env(Zone, ignore_loop_deliver, false).
|
|
|
-
|
|
|
--spec(server_keepalive(zone()) -> maybe(pos_integer())).
|
|
|
-server_keepalive(Zone) ->
|
|
|
- get_env(Zone, server_keepalive).
|
|
|
-
|
|
|
--spec(keepalive_backoff(zone()) -> float()).
|
|
|
-keepalive_backoff(Zone) ->
|
|
|
- get_env(Zone, keepalive_backoff, 0.75).
|
|
|
-
|
|
|
--spec(max_inflight(zone()) -> 0..65535).
|
|
|
-max_inflight(Zone) ->
|
|
|
- get_env(Zone, max_inflight, 65535).
|
|
|
-
|
|
|
--spec(session_expiry_interval(zone()) -> non_neg_integer()).
|
|
|
-session_expiry_interval(Zone) ->
|
|
|
- get_env(Zone, session_expiry_interval, 0).
|
|
|
-
|
|
|
--spec(force_gc_policy(zone()) -> maybe(emqx_gc:opts())).
|
|
|
-force_gc_policy(Zone) ->
|
|
|
- get_env(Zone, force_gc_policy).
|
|
|
-
|
|
|
--spec(force_shutdown_policy(zone()) -> maybe(emqx_oom:opts())).
|
|
|
-force_shutdown_policy(Zone) ->
|
|
|
- get_env(Zone, force_shutdown_policy).
|
|
|
-
|
|
|
--spec(response_information(zone()) -> string()).
|
|
|
-response_information(Zone) ->
|
|
|
- get_env(Zone, response_information).
|
|
|
-
|
|
|
--spec(quota_policy(zone()) -> emqx_quota:policy()).
|
|
|
-quota_policy(Zone) ->
|
|
|
- get_env(Zone, quota, []).
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% APIs
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
--spec(get_env(maybe(zone()), atom()) -> maybe(term())).
|
|
|
-get_env(undefined, Key) -> emqx:get_env(Key);
|
|
|
-get_env(Zone, Key) ->
|
|
|
- get_env(Zone, Key, undefined).
|
|
|
-
|
|
|
--spec(get_env(maybe(zone()), atom(), term()) -> maybe(term())).
|
|
|
-get_env(undefined, Key, Def) ->
|
|
|
- emqx:get_env(Key, Def);
|
|
|
-get_env(Zone, Key, Def) ->
|
|
|
- try persistent_term:get(?KEY(Zone, Key))
|
|
|
- catch error:badarg ->
|
|
|
- emqx:get_env(Key, Def)
|
|
|
- end.
|
|
|
-
|
|
|
--spec(set_env(zone(), atom(), term()) -> ok).
|
|
|
-set_env(Zone, Key, Val) ->
|
|
|
- persistent_term:put(?KEY(Zone, Key), Val).
|
|
|
-
|
|
|
--spec(unset_env(zone(), atom()) -> boolean()).
|
|
|
-unset_env(Zone, Key) ->
|
|
|
- persistent_term:erase(?KEY(Zone, Key)).
|
|
|
-
|
|
|
--spec(unset_all_env() -> ok).
|
|
|
-unset_all_env() ->
|
|
|
- [unset_env(Zone, Key) || {?KEY(Zone, Key), _Val} <- persistent_term:get()],
|
|
|
- ok.
|
|
|
-
|
|
|
--spec(force_reload() -> ok).
|
|
|
-force_reload() ->
|
|
|
- gen_server:call(?SERVER, force_reload).
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% gen_server callbacks
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-init([]) ->
|
|
|
- _ = do_reload(),
|
|
|
- {ok, #{}}.
|
|
|
-
|
|
|
-handle_call(force_reload, _From, State) ->
|
|
|
- _ = do_reload(),
|
|
|
- {reply, ok, State};
|
|
|
-
|
|
|
-handle_call(Req, _From, State) ->
|
|
|
- ?LOG(error, "Unexpected call: ~p", [Req]),
|
|
|
- {reply, ignored, State}.
|
|
|
-
|
|
|
-handle_cast(Msg, State) ->
|
|
|
- ?LOG(error, "Unexpected cast: ~p", [Msg]),
|
|
|
- {noreply, State}.
|
|
|
-
|
|
|
-handle_info(Info, State) ->
|
|
|
- ?LOG(error, "Unexpected info: ~p", [Info]),
|
|
|
- {noreply, State}.
|
|
|
-
|
|
|
-terminate(_Reason, _State) ->
|
|
|
- ok.
|
|
|
-
|
|
|
-code_change(_OldVsn, State, _Extra) ->
|
|
|
- {ok, State}.
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Internal functions
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-do_reload() ->
|
|
|
- [persistent_term:put(?KEY(Zone, Key), Val)
|
|
|
- || {Zone, Opts} <- emqx:get_env(zones, []), {Key, Val} <- Opts].
|
|
|
-
|