Sfoglia il codice sorgente

feat(listeners): Add an api to ensure all listeners are started

Zaiming Shi 5 anni fa
parent
commit
c5a02c729a

+ 4 - 4
lib-opensource/emqx_management/src/emqx_mgmt_cli.erl

@@ -554,16 +554,16 @@ listeners(_) ->
                    ]).
 
 stop_listener(false, Input) ->
-    ok = emqx_ctl:print("No such listener ~p~n", [Input]);
+    emqx_ctl:print("No such listener ~p~n", [Input]);
 stop_listener(#{listen_on := ListenOn} = Listener, _Input) ->
     ID = emqx_listeners:identifier(Listener),
     ListenOnStr = emqx_listeners:format_listen_on(ListenOn),
     case emqx_listeners:stop_listener(Listener) of
         ok ->
-            ok = emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOnStr]);
+            emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOnStr]);
         {error, Reason} ->
-            ok = emqx_ctl:print("Failed to stop ~s listener on ~s - ~p~n.",
-                                [ID, ListenOnStr, Reason])
+            emqx_ctl:print("Failed to stop ~s listener on ~s - ~p~n.",
+                           [ID, ListenOnStr, Reason])
     end.
 
 %%--------------------------------------------------------------------

+ 16 - 4
lib-opensource/emqx_management/test/emqx_mgmt_SUITE.erl

@@ -49,7 +49,8 @@ groups() ->
         t_broker_cmd,
         t_router_cmd,
         t_subscriptions_cmd,
-        t_listeners_cmd
+        t_listeners_cmd_old,
+        t_listeners_cmd_new
        ]}].
 
 apps() ->
@@ -275,12 +276,23 @@ t_subscriptions_cmd(_) ->
     ?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n"),
     unmock_print().
 
-t_listeners_cmd(_) ->
+t_listeners_cmd_old(_) ->
+    ok = emqx_listeners:ensure_all_started(),
     mock_print(),
     ?assertEqual(emqx_mgmt_cli:listeners([]), ok),
     ?assertEqual(
-       emqx_mgmt_cli:listeners(["stop", "wss", "8084"]),
-       "Stop wss listener on 8084 successfully.\n"
+       "Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
+       emqx_mgmt_cli:listeners(["stop", "wss", "8084"])
+      ),
+    unmock_print().
+
+t_listeners_cmd_new(_) ->
+    ok = emqx_listeners:ensure_all_started(),
+    mock_print(),
+    ?assertEqual(emqx_mgmt_cli:listeners([]), ok),
+    ?assertEqual(
+       "Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
+       emqx_mgmt_cli:listeners(["stop", "mqtt:wss:external"])
       ),
     unmock_print().
 

+ 22 - 0
src/emqx_listeners.erl

@@ -21,6 +21,7 @@
 
 %% APIs
 -export([ start/0
+        , ensure_all_started/0
         , restart/0
         , stop/0
         ]).
@@ -76,6 +77,27 @@ identifier(#{proto := Proto, name := Name}) ->
 start() ->
     lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])).
 
+%% @doc Ensure all configured listeners are started.
+%% Raise exception if any of them failed to start.
+-spec(ensure_all_started() -> ok).
+ensure_all_started() ->
+    ensure_all_started(emqx:get_env(listeners, []), []).
+
+ensure_all_started([], []) -> ok;
+ensure_all_started([], Failed) -> error(Failed);
+ensure_all_started([L | Rest], Results) ->
+    #{proto := Proto, listen_on := ListenOn, opts := Options} = L,
+    NewResults =
+        case start_listener(Proto, ListenOn, Options) of
+            {ok, _Pid} ->
+                Results;
+            {error, {already_started, _Pid}} ->
+                Results;
+            {error, Reason} ->
+                [{identifier(L), Reason} | Results]
+        end,
+    ensure_all_started(Rest, NewResults).
+
 %% @doc Format address:port for logging.
 -spec(format_listen_on(esockd:listen_on()) -> binary()).
 format_listen_on(ListenOn) -> format(ListenOn).