Przeglądaj źródła

Merge pull request #6451 from k32/dev/fix-dispatch-when-emqx-is-stopped

fix(broker): Check broker status before dispatch
k32 4 lat temu
rodzic
commit
93acf8a3a7
2 zmienionych plików z 48 dodań i 28 usunięć
  1. 11 3
      apps/emqx/src/emqx.erl
  2. 37 25
      apps/emqx/src/emqx_broker.erl

+ 11 - 3
apps/emqx/src/emqx.erl

@@ -24,6 +24,7 @@
 
 
 %% Start/Stop the application
 %% Start/Stop the application
 -export([ start/0
 -export([ start/0
+        , is_running/0
         , is_running/1
         , is_running/1
         , stop/0
         , stop/0
         ]).
         ]).
@@ -85,10 +86,17 @@ stop() ->
 %% @doc Is emqx running?
 %% @doc Is emqx running?
 -spec(is_running(node()) -> boolean()).
 -spec(is_running(node()) -> boolean()).
 is_running(Node) ->
 is_running(Node) ->
-    case rpc:call(Node, erlang, whereis, [?APP]) of
+    case rpc:call(Node, ?MODULE, is_running, []) of
         {badrpc, _}          -> false;
         {badrpc, _}          -> false;
-        undefined            -> false;
-        Pid when is_pid(Pid) -> true
+        Result               -> Result
+    end.
+
+%% @doc Is emqx running on this node?
+-spec(is_running() -> boolean()).
+is_running() ->
+    case whereis(?APP) of
+        undefined -> false;
+        _         -> true
     end.
     end.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 37 - 25
apps/emqx/src/emqx_broker.erl

@@ -298,33 +298,18 @@ forward(Node, To, Delivery, sync) ->
     end.
     end.
 
 
 -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
 -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
-dispatch(Topic, #delivery{message = Msg}) ->
-    DispN = lists:foldl(
-                fun(Sub, N) ->
-                    N + dispatch(Sub, Topic, Msg)
-                end, 0, subscribers(Topic)),
-    case DispN of
-        0 ->
-            ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
-            ok = inc_dropped_cnt(Msg),
-            {error, no_subscribers};
-        _ ->
-            {ok, DispN}
+dispatch(Topic, Delivery) ->
+    case emqx:is_running() of
+        true  ->
+            do_dispatch(Topic, Delivery);
+        false ->
+            %% In a rare case emqx_router_helper process may delay
+            %% cleanup of the routing table and the peers will
+            %% dispatch messages to a node that is not fully
+            %% initialized. Handle this case gracefully:
+            {error, not_running}
     end.
     end.
 
 
-dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
-    case erlang:is_process_alive(SubPid) of
-        true ->
-            SubPid ! {deliver, Topic, Msg}, 1;
-        false -> 0
-    end;
-
-dispatch({shard, I}, Topic, Msg) ->
-    lists:foldl(
-        fun(SubPid, N) ->
-            N + dispatch(SubPid, Topic, Msg)
-        end, 0, subscribers({shard, Topic, I})).
-
 -compile({inline, [inc_dropped_cnt/1]}).
 -compile({inline, [inc_dropped_cnt/1]}).
 inc_dropped_cnt(Msg) ->
 inc_dropped_cnt(Msg) ->
     case emqx_message:is_sys(Msg) of
     case emqx_message:is_sys(Msg) of
@@ -516,3 +501,30 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
+
+-spec(do_dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
+do_dispatch(Topic, #delivery{message = Msg}) ->
+    DispN = lists:foldl(
+                fun(Sub, N) ->
+                    N + do_dispatch(Sub, Topic, Msg)
+                end, 0, subscribers(Topic)),
+    case DispN of
+        0 ->
+            ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
+            ok = inc_dropped_cnt(Msg),
+            {error, no_subscribers};
+        _ ->
+            {ok, DispN}
+    end.
+
+do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
+    case erlang:is_process_alive(SubPid) of
+        true ->
+            SubPid ! {deliver, Topic, Msg}, 1;
+        false -> 0
+    end;
+do_dispatch({shard, I}, Topic, Msg) ->
+    lists:foldl(
+        fun(SubPid, N) ->
+            N + do_dispatch(SubPid, Topic, Msg)
+        end, 0, subscribers({shard, Topic, I})).