فهرست منبع

Monitor/Unmonitor membership events

Feng Lee 8 سال پیش
والد
کامیت
8a804c56f3
2فایلهای تغییر یافته به همراه10 افزوده شده و 12 حذف شده
  1. 7 6
      src/emqttd_router.erl
  2. 3 6
      src/emqttd_sm_helper.erl

+ 7 - 6
src/emqttd_router.erl

@@ -216,7 +216,7 @@ stop() -> gen_server:call(?ROUTER, stop).
 %%--------------------------------------------------------------------
 
 init([]) ->
-    ekka:subscribe(membership),
+    ekka:monitor(membership),
     ets:new(mqtt_local_route, [set, named_table, protected]),
     {ok, TRef}  = timer:send_interval(timer:seconds(1), stats),
     {ok, #state{stats_timer = TRef}}.
@@ -239,14 +239,15 @@ handle_cast({del_local_route, Topic}, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({membership, {member_up, _Node}}, State) ->
-    {noreply, State};
-
-handle_info({membership, {member_down, Node}}, State) ->
+handle_info({membership, {mnesia, down, Node}}, State) ->
     clean_routes_(Node),
     update_stats_(),
     {noreply, State, hibernate};
 
+handle_info({membership, _Event}, State) ->
+    %% ignore
+    {noreply, State};
+
 handle_info(stats, State) ->
     update_stats_(),
     {noreply, State, hibernate};
@@ -256,7 +257,7 @@ handle_info(_Info, State) ->
 
 terminate(_Reason, #state{stats_timer = TRef}) ->
     timer:cancel(TRef),
-    ekka:unsubscribe(membership).
+    ekka:unmonitor(membership).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.

+ 3 - 6
src/emqttd_sm_helper.erl

@@ -42,7 +42,7 @@ start_link(StatsFun) ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []).
 
 init([StatsFun]) ->
-    ekka:subscribe(membership),
+    ekka:monitor(membership),
     {ok, TRef} = timer:send_interval(timer:seconds(1), tick),
     {ok, #state{stats_fun = StatsFun, ticker = TRef}}.
 
@@ -52,7 +52,7 @@ handle_call(Req, _From, State) ->
 handle_cast(Msg, State) ->
     ?UNEXPECTED_MSG(Msg, State).
 
-handle_info({membership, {mnesia_down, Node}}, State) ->
+handle_info({membership, {mnesia, down, Node}}, State) ->
     Fun = fun() ->
             ClientIds =
             mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'},
@@ -62,9 +62,6 @@ handle_info({membership, {mnesia_down, Node}}, State) ->
     mnesia:async_dirty(Fun),
     {noreply, State};
 
-handle_info({membership, {mnesia_up, _Node}}, State) ->
-    {noreply, State};
-
 handle_info({membership, _Event}, State) ->
     {noreply, State};
 
@@ -76,7 +73,7 @@ handle_info(Info, State) ->
 
 terminate(_Reason, _State = #state{ticker = TRef}) ->
     timer:cancel(TRef),
-    ekka:unsubscribe(membership).
+    ekka:unmonitor(membership).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.