|
|
@@ -25,27 +25,58 @@
|
|
|
|
|
|
-logger_header("[Zone]").
|
|
|
|
|
|
+-compile({inline,
|
|
|
+ [ idle_timeout/1
|
|
|
+ , publish_limit/1
|
|
|
+ , mqtt_frame_options/1
|
|
|
+ , mqtt_strict_mode/1
|
|
|
+ , max_packet_size/1
|
|
|
+ , mountpoint/1
|
|
|
+ , use_username_as_clientid/1
|
|
|
+ , stats_timer/1
|
|
|
+ , enable_stats/1
|
|
|
+ , enable_acl/1
|
|
|
+ , enable_ban/1
|
|
|
+ , enable_flapping_detect/1
|
|
|
+ , ignore_loop_deliver/1
|
|
|
+ , server_keepalive/1
|
|
|
+ , keepalive_backoff/1
|
|
|
+ , max_inflight/1
|
|
|
+ , session_expiry_interval/1
|
|
|
+ , force_gc_policy/1
|
|
|
+ , force_shutdown_policy/1
|
|
|
+ ]}).
|
|
|
+
|
|
|
%% APIs
|
|
|
-export([start_link/0, stop/0]).
|
|
|
|
|
|
--export([ frame_options/1
|
|
|
+%% Zone Option API
|
|
|
+-export([ idle_timeout/1
|
|
|
+ , publish_limit/1
|
|
|
+ , mqtt_frame_options/1
|
|
|
, mqtt_strict_mode/1
|
|
|
, max_packet_size/1
|
|
|
+ , mountpoint/1
|
|
|
, use_username_as_clientid/1
|
|
|
+ , stats_timer/1
|
|
|
, enable_stats/1
|
|
|
, enable_acl/1
|
|
|
, enable_ban/1
|
|
|
, enable_flapping_detect/1
|
|
|
, ignore_loop_deliver/1
|
|
|
, server_keepalive/1
|
|
|
+ , keepalive_backoff/1
|
|
|
, max_inflight/1
|
|
|
, session_expiry_interval/1
|
|
|
, force_gc_policy/1
|
|
|
, force_shutdown_policy/1
|
|
|
]).
|
|
|
|
|
|
--export([check_oom/2]).
|
|
|
+-export([ init_gc_state/1
|
|
|
+ , oom_policy/1
|
|
|
+ ]).
|
|
|
|
|
|
+%% Zone API
|
|
|
-export([ get_env/2
|
|
|
, get_env/3
|
|
|
, set_env/3
|
|
|
@@ -63,27 +94,46 @@
|
|
|
, code_change/3
|
|
|
]).
|
|
|
|
|
|
--export_type([zone/0]).
|
|
|
+-import(emqx_misc, [maybe_apply/2]).
|
|
|
|
|
|
-%% dummy state
|
|
|
--record(state, {}).
|
|
|
+-export_type([zone/0]).
|
|
|
|
|
|
-type(zone() :: atom()).
|
|
|
|
|
|
-define(TAB, ?MODULE).
|
|
|
-define(SERVER, ?MODULE).
|
|
|
+-define(DEFAULT_IDLE_TIMEOUT, 30000).
|
|
|
-define(KEY(Zone, Key), {?MODULE, Zone, Key}).
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% APIs
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-spec(start_link() -> startlink_ret()).
|
|
|
start_link() ->
|
|
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
|
|
|
|
--spec(frame_options(zone()) -> emqx_frame:options()).
|
|
|
-frame_options(Zone) ->
|
|
|
+-spec(stop() -> ok).
|
|
|
+stop() ->
|
|
|
+ gen_server:stop(?SERVER).
|
|
|
+
|
|
|
+-spec(init_gc_state(zone()) -> emqx_gc:gc_state()).
|
|
|
+init_gc_state(Zone) ->
|
|
|
+ maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)).
|
|
|
+
|
|
|
+-spec(oom_policy(zone()) -> emqx_types:oom_policy()).
|
|
|
+oom_policy(Zone) -> force_shutdown_policy(Zone).
|
|
|
+
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Zone Options API
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+
|
|
|
+-spec(idle_timeout(zone()) -> pos_integer()).
|
|
|
+idle_timeout(Zone) ->
|
|
|
+ get_env(Zone, idle_timeout, ?DEFAULT_IDLE_TIMEOUT).
|
|
|
+
|
|
|
+-spec(publish_limit(zone()) -> maybe(esockd_rate_limit:config())).
|
|
|
+publish_limit(Zone) ->
|
|
|
+ get_env(Zone, publish_limit).
|
|
|
+
|
|
|
+-spec(mqtt_frame_options(zone()) -> emqx_frame:options()).
|
|
|
+mqtt_frame_options(Zone) ->
|
|
|
#{strict_mode => mqtt_strict_mode(Zone),
|
|
|
max_size => max_packet_size(Zone)
|
|
|
}.
|
|
|
@@ -96,10 +146,17 @@ mqtt_strict_mode(Zone) ->
|
|
|
max_packet_size(Zone) ->
|
|
|
get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE).
|
|
|
|
|
|
+-spec(mountpoint(zone()) -> maybe(emqx_mountpoint:mountpoint())).
|
|
|
+mountpoint(Zone) -> get_env(Zone, mountpoint).
|
|
|
+
|
|
|
-spec(use_username_as_clientid(zone()) -> boolean()).
|
|
|
use_username_as_clientid(Zone) ->
|
|
|
get_env(Zone, use_username_as_clientid, false).
|
|
|
|
|
|
+-spec(stats_timer(zone()) -> undefined | disabled).
|
|
|
+stats_timer(Zone) ->
|
|
|
+ case enable_stats(Zone) of true -> undefined; false -> disabled end.
|
|
|
+
|
|
|
-spec(enable_stats(zone()) -> boolean()).
|
|
|
enable_stats(Zone) ->
|
|
|
get_env(Zone, enable_stats, true).
|
|
|
@@ -124,6 +181,10 @@ ignore_loop_deliver(Zone) ->
|
|
|
server_keepalive(Zone) ->
|
|
|
get_env(Zone, server_keepalive).
|
|
|
|
|
|
+-spec(keepalive_backoff(zone()) -> float()).
|
|
|
+keepalive_backoff(Zone) ->
|
|
|
+ get_env(Zone, keepalive_backoff, 0.75).
|
|
|
+
|
|
|
-spec(max_inflight(zone()) -> 0..65535).
|
|
|
max_inflight(Zone) ->
|
|
|
get_env(Zone, max_inflight, 65535).
|
|
|
@@ -140,18 +201,9 @@ force_gc_policy(Zone) ->
|
|
|
force_shutdown_policy(Zone) ->
|
|
|
get_env(Zone, force_shutdown_policy).
|
|
|
|
|
|
--spec(check_oom(zone(), fun()) -> ok | term()).
|
|
|
-check_oom(Zone, Action) ->
|
|
|
- case emqx_zone:force_shutdown_policy(Zone) of
|
|
|
- undefined -> ok;
|
|
|
- Policy -> do_check_oom(emqx_oom:init(Policy), Action)
|
|
|
- end.
|
|
|
-
|
|
|
-do_check_oom(OomPolicy, Action) ->
|
|
|
- case emqx_oom:check(OomPolicy) of
|
|
|
- ok -> ok;
|
|
|
- Shutdown -> Action(Shutdown)
|
|
|
- end.
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% APIs
|
|
|
+%%--------------------------------------------------------------------
|
|
|
|
|
|
-spec(get_env(maybe(zone()), atom()) -> maybe(term())).
|
|
|
get_env(undefined, Key) -> emqx:get_env(Key);
|
|
|
@@ -179,17 +231,13 @@ unset_env(Zone, Key) ->
|
|
|
force_reload() ->
|
|
|
gen_server:call(?SERVER, force_reload).
|
|
|
|
|
|
--spec(stop() -> ok).
|
|
|
-stop() ->
|
|
|
- gen_server:stop(?SERVER, normal, infinity).
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
init([]) ->
|
|
|
_ = do_reload(),
|
|
|
- {ok, #state{}}.
|
|
|
+ {ok, #{}}.
|
|
|
|
|
|
handle_call(force_reload, _From, State) ->
|
|
|
_ = do_reload(),
|