emqx_flapping.erl 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_flapping).
  17. -behaviour(gen_server).
  18. -include("emqx.hrl").
  19. -include("types.hrl").
  20. -include("logger.hrl").
  21. -export([start_link/0, update_config/0, stop/0]).
  22. %% API
  23. -export([detect/1]).
  24. -ifdef(TEST).
  25. -export([get_policy/1]).
  26. -endif.
  27. %% gen_server callbacks
  28. -export([
  29. init/1,
  30. handle_call/3,
  31. handle_cast/2,
  32. handle_info/2,
  33. terminate/2,
  34. code_change/3
  35. ]).
  36. %% Tab
  37. -define(FLAPPING_TAB, ?MODULE).
  38. -record(flapping, {
  39. clientid :: emqx_types:clientid(),
  40. peerhost :: emqx_types:peerhost(),
  41. started_at :: pos_integer(),
  42. detect_cnt :: integer()
  43. }).
  44. -opaque flapping() :: #flapping{}.
  45. -export_type([flapping/0]).
  46. -spec start_link() -> emqx_types:startlink_ret().
  47. start_link() ->
  48. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  49. update_config() ->
  50. gen_server:cast(?MODULE, update_config).
  51. stop() -> gen_server:stop(?MODULE).
  52. %% @doc Detect flapping when a MQTT client disconnected.
  53. -spec detect(emqx_types:clientinfo()) -> boolean().
  54. detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
  55. detect(ClientId, PeerHost, get_policy(Zone)).
  56. detect(ClientId, PeerHost, #{enable := true, max_count := Threshold} = Policy) ->
  57. %% The initial flapping record sets the detect_cnt to 0.
  58. InitVal = #flapping{
  59. clientid = ClientId,
  60. peerhost = PeerHost,
  61. started_at = erlang:system_time(millisecond),
  62. detect_cnt = 0
  63. },
  64. case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}, InitVal) of
  65. Cnt when Cnt < Threshold -> false;
  66. _Cnt ->
  67. case ets:take(?FLAPPING_TAB, ClientId) of
  68. [Flapping] ->
  69. ok = gen_server:cast(?MODULE, {detected, Flapping, Policy}),
  70. true;
  71. [] ->
  72. false
  73. end
  74. end;
  75. detect(_ClientId, _PeerHost, #{enable := false}) ->
  76. false.
  77. get_policy(Zone) ->
  78. Flapping = [flapping_detect],
  79. case emqx_config:get_zone_conf(Zone, Flapping, undefined) of
  80. undefined ->
  81. %% If zone has be deleted at running time,
  82. %% we don't crash the connection and disable flapping detect.
  83. Policy = emqx_config:get(Flapping),
  84. Policy#{enable => false};
  85. Policy ->
  86. Policy
  87. end.
  88. now_diff(TS) -> erlang:system_time(millisecond) - TS.
  89. %%--------------------------------------------------------------------
  90. %% gen_server callbacks
  91. %%--------------------------------------------------------------------
  92. init([]) ->
  93. ok = emqx_utils_ets:new(?FLAPPING_TAB, [
  94. public,
  95. set,
  96. {keypos, #flapping.clientid},
  97. {read_concurrency, true},
  98. {write_concurrency, true}
  99. ]),
  100. Timers = start_timers(),
  101. {ok, Timers, hibernate}.
  102. handle_call(Req, _From, State) ->
  103. ?SLOG(error, #{msg => "unexpected_call", call => Req}),
  104. {reply, ignored, State}.
  105. handle_cast(
  106. {detected,
  107. #flapping{
  108. clientid = ClientId,
  109. peerhost = PeerHost,
  110. started_at = StartedAt,
  111. detect_cnt = DetectCnt
  112. },
  113. #{window_time := WindTime, ban_time := Interval}},
  114. State
  115. ) ->
  116. case now_diff(StartedAt) < WindTime of
  117. %% Flapping happened:(
  118. true ->
  119. ?SLOG(
  120. warning,
  121. #{
  122. msg => "flapping_detected",
  123. peer_host => fmt_host(PeerHost),
  124. detect_cnt => DetectCnt,
  125. wind_time_in_ms => WindTime
  126. },
  127. #{clientid => ClientId}
  128. ),
  129. Now = erlang:system_time(second),
  130. Banned = #banned{
  131. who = {clientid, ClientId},
  132. by = <<"flapping detector">>,
  133. reason = <<"flapping is detected">>,
  134. at = Now,
  135. until = Now + (Interval div 1000)
  136. },
  137. {ok, _} = emqx_banned:create(Banned),
  138. ok;
  139. false ->
  140. ?SLOG(
  141. warning,
  142. #{
  143. msg => "client_disconnected",
  144. peer_host => fmt_host(PeerHost),
  145. detect_cnt => DetectCnt,
  146. interval => Interval
  147. },
  148. #{clientid => ClientId}
  149. )
  150. end,
  151. {noreply, State};
  152. handle_cast(update_config, State) ->
  153. NState = update_timer(State),
  154. {noreply, NState};
  155. handle_cast(Msg, State) ->
  156. ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
  157. {noreply, State}.
  158. handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
  159. Policy = #{window_time := WindowTime} = get_policy(Zone),
  160. Timestamp = erlang:system_time(millisecond) - WindowTime,
  161. MatchSpec = [{{'_', '_', '_', '$1', '_'}, [{'<', '$1', Timestamp}], [true]}],
  162. ets:select_delete(?FLAPPING_TAB, MatchSpec),
  163. Timer = start_timer(Policy, Zone),
  164. {noreply, State#{Zone => Timer}, hibernate};
  165. handle_info(Info, State) ->
  166. ?SLOG(error, #{msg => "unexpected_info", info => Info}),
  167. {noreply, State}.
  168. terminate(_Reason, _State) ->
  169. ok.
  170. code_change(_OldVsn, State, _Extra) ->
  171. {ok, State}.
  172. start_timer(#{enable := true, window_time := WindowTime}, Zone) ->
  173. emqx_utils:start_timer(WindowTime, {garbage_collect, Zone});
  174. start_timer(_Policy, _Zone) ->
  175. undefined.
  176. start_timers() ->
  177. maps:map(
  178. fun(ZoneName, #{flapping_detect := FlappingDetect}) ->
  179. start_timer(FlappingDetect, ZoneName)
  180. end,
  181. emqx:get_config([zones], #{})
  182. ).
  183. update_timer(Timers) ->
  184. maps:map(
  185. fun(ZoneName, #{flapping_detect := FlappingDetect = #{enable := Enable}}) ->
  186. case maps:get(ZoneName, Timers, undefined) of
  187. undefined ->
  188. start_timer(FlappingDetect, ZoneName);
  189. TRef when Enable -> TRef;
  190. TRef ->
  191. _ = erlang:cancel_timer(TRef),
  192. undefined
  193. end
  194. end,
  195. emqx:get_config([zones], #{})
  196. ).
  197. fmt_host(PeerHost) ->
  198. try
  199. inet:ntoa(PeerHost)
  200. catch
  201. _:_ -> PeerHost
  202. end.