emqx_flapping.erl 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_flapping).
  17. -behaviour(gen_server).
  18. -include("emqx.hrl").
  19. -include("types.hrl").
  20. -include("logger.hrl").
  21. -export([start_link/0, stop/0]).
  22. %% API
  23. -export([detect/1]).
  24. %% gen_server callbacks
  25. -export([ init/1
  26. , handle_call/3
  27. , handle_cast/2
  28. , handle_info/2
  29. , terminate/2
  30. , code_change/3
  31. ]).
  32. %% Tab
  33. -define(FLAPPING_TAB, ?MODULE).
  34. %% Default Policy
  35. -define(FLAPPING_THRESHOLD, 30).
  36. -define(FLAPPING_DURATION, 60000).
  37. -define(FLAPPING_BANNED_INTERVAL, 300000).
  38. -define(DEFAULT_DETECT_POLICY,
  39. #{max_count => ?FLAPPING_THRESHOLD,
  40. window_time => ?FLAPPING_DURATION,
  41. ban_time => ?FLAPPING_BANNED_INTERVAL
  42. }).
  43. -record(flapping, {
  44. clientid :: emqx_types:clientid(),
  45. peerhost :: emqx_types:peerhost(),
  46. started_at :: pos_integer(),
  47. detect_cnt :: integer()
  48. }).
  49. -opaque(flapping() :: #flapping{}).
  50. -export_type([flapping/0]).
  51. -spec(start_link() -> emqx_types:startlink_ret()).
  52. start_link() ->
  53. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  54. stop() -> gen_server:stop(?MODULE).
  55. %% @doc Detect flapping when a MQTT client disconnected.
  56. -spec(detect(emqx_types:clientinfo()) -> boolean()).
  57. detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
  58. Policy = #{max_count := Threshold} = get_policy(Zone),
  59. %% The initial flapping record sets the detect_cnt to 0.
  60. InitVal = #flapping{
  61. clientid = ClientId,
  62. peerhost = PeerHost,
  63. started_at = erlang:system_time(millisecond),
  64. detect_cnt = 0
  65. },
  66. case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}, InitVal) of
  67. Cnt when Cnt < Threshold -> false;
  68. _Cnt ->
  69. case ets:take(?FLAPPING_TAB, ClientId) of
  70. [Flapping] ->
  71. ok = gen_server:cast(?MODULE, {detected, Flapping, Policy}),
  72. true;
  73. [] -> false
  74. end
  75. end.
  76. get_policy(Zone) ->
  77. emqx_config:get_zone_conf(Zone, [flapping_detect]).
  78. now_diff(TS) -> erlang:system_time(millisecond) - TS.
  79. %%--------------------------------------------------------------------
  80. %% gen_server callbacks
  81. %%--------------------------------------------------------------------
  82. init([]) ->
  83. ok = emqx_tables:new(?FLAPPING_TAB, [public, set,
  84. {keypos, #flapping.clientid},
  85. {read_concurrency, true},
  86. {write_concurrency, true}
  87. ]),
  88. start_timers(),
  89. {ok, #{}, hibernate}.
  90. handle_call(Req, _From, State) ->
  91. ?SLOG(error, #{msg => "unexpected_call", call => Req}),
  92. {reply, ignored, State}.
  93. handle_cast({detected, #flapping{clientid = ClientId,
  94. peerhost = PeerHost,
  95. started_at = StartedAt,
  96. detect_cnt = DetectCnt},
  97. #{window_time := WindTime, ban_time := Interval}}, State) ->
  98. case now_diff(StartedAt) < WindTime of
  99. true -> %% Flapping happened:(
  100. ?SLOG(warning, #{
  101. msg => "flapping_detected",
  102. client_id => ClientId,
  103. peer_host => fmt_host(PeerHost),
  104. detect_cnt => DetectCnt,
  105. wind_time_in_ms => WindTime
  106. }),
  107. Now = erlang:system_time(second),
  108. Banned = #banned{who = {clientid, ClientId},
  109. by = <<"flapping detector">>,
  110. reason = <<"flapping is detected">>,
  111. at = Now,
  112. until = Now + (Interval div 1000)},
  113. emqx_banned:create(Banned);
  114. false ->
  115. ?SLOG(warning, #{
  116. msg => "client_disconnected",
  117. client_id => ClientId,
  118. peer_host => fmt_host(PeerHost),
  119. detect_cnt => DetectCnt,
  120. interval => Interval
  121. })
  122. end,
  123. {noreply, State};
  124. handle_cast(Msg, State) ->
  125. ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
  126. {noreply, State}.
  127. handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
  128. Timestamp = erlang:system_time(millisecond)
  129. - maps:get(window_time, get_policy(Zone)),
  130. MatchSpec = [{{'_', '_', '_', '$1', '_'},[{'<', '$1', Timestamp}], [true]}],
  131. ets:select_delete(?FLAPPING_TAB, MatchSpec),
  132. start_timer(Zone),
  133. {noreply, State, hibernate};
  134. handle_info(Info, State) ->
  135. ?SLOG(error, #{msg => "unexpected_info", info => Info}),
  136. {noreply, State}.
  137. terminate(_Reason, _State) ->
  138. ok.
  139. code_change(_OldVsn, State, _Extra) ->
  140. {ok, State}.
  141. start_timer(Zone) ->
  142. WindTime = maps:get(window_time, get_policy(Zone)),
  143. emqx_misc:start_timer(WindTime, {garbage_collect, Zone}).
  144. start_timers() ->
  145. lists:foreach(fun({Zone, _ZoneConf}) ->
  146. start_timer(Zone)
  147. end, maps:to_list(emqx:get_config([zones], #{}))).
  148. fmt_host(PeerHost) ->
  149. try inet:ntoa(PeerHost)
  150. catch _:_ -> PeerHost
  151. end.