Explorar o código

refactor(emqx_mgmt): Decorate RPCs

k32 %!s(int64=4) %!d(string=hai) anos
pai
achega
2ccf02cea9

+ 3 - 0
apps/emqx/src/emqx_alarm.erl

@@ -104,6 +104,7 @@ activate(Name, Details) ->
 activate(Name, Details, Message) ->
     gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}).
 
+-spec deactivate(binary() | atom()) -> ok | {error, not_found}.
 deactivate(Name) ->
     deactivate(Name, no_details, <<"">>).
 
@@ -113,12 +114,14 @@ deactivate(Name, Details) ->
 deactivate(Name, Details, Message) ->
     gen_server:call(?MODULE, {deactivate_alarm, Name, Details, Message}).
 
+-spec delete_all_deactivated_alarms() -> ok.
 delete_all_deactivated_alarms() ->
     gen_server:call(?MODULE, delete_all_deactivated_alarms).
 
 get_alarms() ->
     get_alarms(all).
 
+-spec get_alarms(all | activated | deactivated) -> [map()].
 get_alarms(all) ->
     gen_server:call(?MODULE, {get_alarms, all});
 

+ 3 - 3
apps/emqx/src/emqx_alarm_handler.erl

@@ -72,11 +72,11 @@ handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
     {ok, State};
 
 handle_event({clear_alarm, system_memory_high_watermark}, State) ->
-    emqx_alarm:deactivate(high_system_memory_usage),
+    _ = emqx_alarm:deactivate(high_system_memory_usage),
     {ok, State};
 
 handle_event({clear_alarm, process_memory_high_watermark}, State) ->
-    emqx_alarm:deactivate(high_process_memory_usage),
+    _ = emqx_alarm:deactivate(high_process_memory_usage),
     {ok, State};
 
 handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) ->
@@ -86,7 +86,7 @@ handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) ->
     {ok, State};
 
 handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) ->
-    emqx_alarm:deactivate(runq_overload),
+    _ = emqx_alarm:deactivate(runq_overload),
     {ok, State};
 
 handle_event(_, State) ->

+ 6 - 0
apps/emqx/src/emqx_broker.erl

@@ -44,6 +44,7 @@
 
 %% PubSub Infos
 -export([ subscriptions/1
+        , subscriptions_via_topic/1
         , subscribers/1
         , subscribed/2
         ]).
@@ -359,6 +360,11 @@ subscriptions(SubId) ->
         undefined -> []
     end.
 
+-spec(subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()]).
+subscriptions_via_topic(Topic) ->
+    MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=', '$1', Topic}], ['$_']}],
+    ets:select(?SUBOPTION, MatchSpec).
+
 -spec(subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean()).
 subscribed(SubPid, Topic) when is_pid(SubPid) ->
     ets:member(?SUBOPTION, {SubPid, Topic});

+ 28 - 7
apps/emqx/src/proto/emqx_broker_proto_v1.erl

@@ -22,10 +22,15 @@
 
         , forward/3
         , forward_async/3
-        , client_subscriptions/2
+        , list_client_subscriptions/2
+        , list_subscriptions_via_topic/2
 
         , lookup_client/2
         , kickout_client/2
+
+        , start_listener/2
+        , stop_listener/2
+        , restart_listener/2
         ]).
 
 -include("bpapi.hrl").
@@ -43,12 +48,6 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
 forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
     emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
 
--spec client_subscriptions(node(), emqx_types:clientid()) ->
-                [{emqx_types:topic(), emqx_types:subopts()}]
-              | emqx_rpc:badrpc().
-client_subscriptions(Node, ClientId) ->
-    rpc:call(Node, emqx_broker, subscriptions, [ClientId]).
-
 -spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
 kickout_client(Node, ClientId) ->
     rpc:call(Node, emqx_cm, kick_session, [ClientId]).
@@ -57,3 +56,25 @@ kickout_client(Node, ClientId) ->
           [emqx_cm:channel_info()] | {badrpc, _}.
 lookup_client(Node, Key) ->
     rpc:call(Node, emqx_cm, lookup_client, [Key]).
+
+-spec list_client_subscriptions(node(), emqx_types:clientid()) ->
+                [{emqx_types:topic(), emqx_types:subopts()}]
+              | emqx_rpc:badrpc().
+list_client_subscriptions(Node, ClientId) ->
+    rpc:call(Node, emqx_broker, subscriptions, [ClientId]).
+
+-spec list_subscriptions_via_topic(node(), emqx_types:topic()) -> [emqx_types:subopts()].
+list_subscriptions_via_topic(Node, Topic) ->
+    rpc:call(Node, emqx_broker, subscriptions_via_topic, [Topic]).
+
+-spec start_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
+start_listener(Node, Id) ->
+    rpc:call(Node, emqx_listeners, start_listener, [Id]).
+
+-spec stop_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
+stop_listener(Node, Id) ->
+    rpc:call(Node, emqx_listeners, stop_listener, [Id]).
+
+-spec restart_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
+restart_listener(Node, Id) ->
+    rpc:call(Node, emqx_listeners, restart_listener, [Id]).

+ 17 - 0
apps/emqx/src/proto/emqx_proto_v1.erl

@@ -24,9 +24,13 @@
 
         , is_running/1
 
+        , get_alarms/2
         , get_stats/1
         , get_metrics/1
 
+        , deactivate_alarm/2
+        , delete_all_deactivated_alarms/1
+
         , clean_authz_cache/1
         , clean_authz_cache/2
         ]).
@@ -38,6 +42,10 @@ introduced_in() ->
 is_running(Node) ->
     rpc:call(Node, emqx, is_running, []).
 
+-spec get_alarms(node(), all | activated | deactivated) -> [map()].
+get_alarms(Node, Type) ->
+    rpc:call(Node, emqx_alarm, get_alarms, [Type]).
+
 -spec get_stats(node()) -> emqx_stats:stats() | {badrpc, _}.
 get_stats(Node) ->
     rpc:call(Node, emqx_stats, getstats, []).
@@ -56,3 +64,12 @@ clean_authz_cache(Node, ClientId) ->
 -spec clean_authz_cache(node()) -> ok | {badrpc, _}.
 clean_authz_cache(Node) ->
     rpc:call(Node, emqx_authz_cache, drain_cache, []).
+
+-spec deactivate_alarm(node(), binary() | atom()) ->
+          ok | {error, not_found} | {badrpc, _}.
+deactivate_alarm(Node, Name) ->
+    rpc:call(Node, emqx_alarm, deactivate, [Name]).
+
+-spec delete_all_deactivated_alarms(node()) -> ok | {badrpc, _}.
+delete_all_deactivated_alarms(Node) ->
+    rpc:call(Node, emqx_alarm, delete_all_deactivated_alarms, []).

+ 55 - 63
apps/emqx_management/src/emqx_mgmt.erl

@@ -59,7 +59,7 @@
         ]).
 
 %% Internal funcs
--export([call_client/3]).
+-export([do_call_client/2]).
 
 %% Subscriptions
 -export([ list_subscriptions/1
@@ -90,8 +90,10 @@
         , list_listeners_by_id/1
         , get_listener/2
         , manage_listener/2
+        , do_update_listener/2
         , update_listener/2
         , update_listener/3
+        , do_remove_listener/1
         , remove_listener/1
         , remove_listener/2
         ]).
@@ -121,12 +123,6 @@
 
 -elvis([{elvis_style, god_modules, disable}]).
 
--export_type([listener_manage_op/0]).
-
--type listener_manage_op() :: start_listener
-                            | stop_listener
-                            | restart_listener.
-
 %% TODO: remove these function after all api use minirest version 1.X
 return() ->
     ok.
@@ -280,7 +276,7 @@ list_client_subscriptions(ClientId) ->
     end.
 
 client_subscriptions(Node, ClientId) ->
-    {Node, wrap_rpc(emqx_broker_proto_v1:client_subscriptions(Node, ClientId))}.
+    {Node, wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.
 
 clean_authz_cache(ClientId) ->
     Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
@@ -305,7 +301,7 @@ set_ratelimit_policy(ClientId, Policy) ->
 set_quota_policy(ClientId, Policy) ->
     call_client(ClientId, {quota, Policy}).
 
-set_keepalive(ClientId, Interval)when Interval >= 0 andalso Interval =< 65535 ->
+set_keepalive(ClientId, Interval) when Interval >= 0 andalso Interval =< 65535 ->
     call_client(ClientId, {keepalive, Interval});
 set_keepalive(_ClientId, _Interval) ->
     {error, <<"mqtt3.1.1 specification: keepalive must between 0~65535">>}.
@@ -322,7 +318,8 @@ call_client(ClientId, Req) ->
     end.
 
 %% @private
-call_client(Node, ClientId, Req) when Node =:= node() ->
+-spec do_call_client(emqx_types:clientid(), term()) -> term().
+do_call_client(ClientId, Req) ->
     case emqx_cm:lookup_channels(ClientId) of
         [] -> {error, not_found};
         Pids when is_list(Pids) ->
@@ -332,9 +329,11 @@ call_client(Node, ClientId, Req) when Node =:= node() ->
                     erlang:apply(ConnMod, call, [Pid, Req]);
                 undefined -> {error, not_found}
             end
-    end;
+    end.
+
+%% @private
 call_client(Node, ClientId, Req) ->
-    rpc_call(Node, call_client, [Node, ClientId, Req]).
+    wrap_rpc(emqx_management_proto_v1:call_client(Node, ClientId, Req)).
 
 %%--------------------------------------------------------------------
 %% Subscriptions
@@ -354,26 +353,17 @@ list_subscriptions_via_topic(Topic, FormatFun) ->
     lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
                   || Node <- mria_mnesia:running_nodes()]).
 
-
-list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
-    MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
-    erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]);
-
-list_subscriptions_via_topic(Node, Topic, FormatFun) ->
-    rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
+list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
+    case wrap_rpc(emqx_broker_proto_v1:list_subscriptions_via_topic(Node, Topic)) of
+        {error, Reason} -> {error, Reason};
+        Result          -> M:F(Result)
+    end.
 
 lookup_subscriptions(ClientId) ->
     lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
 
-lookup_subscriptions(Node, ClientId) when Node =:= node() ->
-    case ets:lookup(emqx_subid, ClientId) of
-        [] -> [];
-        [{_, Pid}] ->
-            ets:match_object(emqx_suboption, {{Pid, '_'}, '_'})
-    end;
-
 lookup_subscriptions(Node, ClientId) ->
-    rpc_call(Node, lookup_subscriptions, [Node, ClientId]).
+    wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)).
 
 %%--------------------------------------------------------------------
 %% Routes
@@ -390,7 +380,7 @@ subscribe(ClientId, TopicTables) ->
     subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
 
 subscribe([Node | Nodes], ClientId, TopicTables) ->
-    case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of
+    case wrap_rpc(emqx_management_proto_v1:subscribe(Node, ClientId, TopicTables)) of
         {error, _} -> subscribe(Nodes, ClientId, TopicTables);
         Re -> Re
     end;
@@ -398,6 +388,8 @@ subscribe([Node | Nodes], ClientId, TopicTables) ->
 subscribe([], _ClientId, _TopicTables) ->
     {error, channel_not_found}.
 
+-spec do_subscribe(emqx_types:clientid(), emqx_types:topic_filters()) ->
+          {subscribe, _} | {error, atom()}.
 do_subscribe(ClientId, TopicTables) ->
     case ets:lookup(emqx_channel, ClientId) of
         [] -> {error, channel_not_found};
@@ -410,18 +402,23 @@ publish(Msg) ->
     emqx_metrics:inc_msg(Msg),
     emqx:publish(Msg).
 
+-spec unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
+          {unsubscribe, _} | {error, channel_not_found}.
 unsubscribe(ClientId, Topic) ->
     unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic).
 
+-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
+          {unsubscribe, _} | {error, channel_not_found}.
 unsubscribe([Node | Nodes], ClientId, Topic) ->
-    case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of
+    case wrap_rpc(emqx_management_proto_v1:unsubscribe(Node, ClientId, Topic)) of
         {error, _} -> unsubscribe(Nodes, ClientId, Topic);
         Re -> Re
     end;
-
 unsubscribe([], _ClientId, _Topic) ->
     {error, channel_not_found}.
 
+-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
+          {unsubscribe, _} | {error, _}.
 do_unsubscribe(ClientId, Topic) ->
     case ets:lookup(emqx_channel, ClientId) of
         [] -> {error, channel_not_found};
@@ -457,44 +454,51 @@ listener_id_filter(Id, Listeners) ->
     Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
     lists:filter(Filter, Listeners).
 
--spec manage_listener( listener_manage_op()
-                     , Param :: map()) ->
-          ok | {error, Reason :: term()}.
-manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->
-    erlang:apply(emqx_listeners, Operation, [ID]);
-manage_listener(Operation, Param = #{node := Node}) ->
-    rpc_call(Node, manage_listener, [Operation, Param]).
-
-update_listener(Id, Config) ->
-    [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
-
-update_listener(Node, Id, Config) when Node =:= node() ->
+-spec manage_listener( start_listener | stop_listener | restart_listener
+                     , #{id := atom(), node := node()}
+                     ) -> ok | {error, Reason :: term()}.
+manage_listener(start_listener, #{id := ID, node := Node}) ->
+    wrap_rpc(emqx_broker_proto_v1:start_listener(Node, ID));
+manage_listener(stop_listener, #{id := ID, node := Node}) ->
+    wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, ID));
+manage_listener(restart_listener, #{id := ID, node := Node}) ->
+    wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, ID)).
+
+-spec do_update_listener(string(), emqx_config:update_request()) ->
+          map() | {error, _}.
+do_update_listener(Id, Config) ->
     case emqx_listeners:parse_listener_id(Id) of
         {error, {invalid_listener_id, Id}} ->
             {error, {invalid_listener_id, Id}};
         {Type, Name} ->
             case emqx:update_config([listeners, Type, Name], Config, #{}) of
                 {ok, #{raw_config := RawConf}} ->
-                    RawConf#{node => Node, id => Id, running => true};
+                    RawConf#{node => node(), id => Id, running => true};
                 {error, Reason} ->
                     {error, Reason}
             end
-    end;
+    end.
+
+update_listener(Id, Config) ->
+    [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
+
 update_listener(Node, Id, Config) ->
-    rpc_call(Node, update_listener, [Node, Id, Config]).
+    wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)).
 
 remove_listener(Id) ->
     [remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
 
-remove_listener(Node, Id) when Node =:= node() ->
+-spec do_remove_listener(string()) -> ok.
+do_remove_listener(Id) ->
     {Type, Name} = emqx_listeners:parse_listener_id(Id),
     case emqx:remove_config([listeners, Type, Name], #{}) of
         {ok, _} -> ok;
         {error, Reason} ->
             error(Reason)
-    end;
+    end.
+
 remove_listener(Node, Id) ->
-    rpc_call(Node, remove_listener, [Node, Id]).
+    wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)).
 
 %%--------------------------------------------------------------------
 %% Get Alarms
@@ -503,23 +507,17 @@ remove_listener(Node, Id) ->
 get_alarms(Type) ->
     [{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()].
 
-get_alarms(Node, Type) when Node =:= node() ->
-    add_duration_field(emqx_alarm:get_alarms(Type));
 get_alarms(Node, Type) ->
-    rpc_call(Node, get_alarms, [Node, Type]).
+    add_duration_field(wrap_rpc(emqx_proto_v1:get_alarms(Node, Type))).
 
-deactivate(Node, Name) when Node =:= node() ->
-    emqx_alarm:deactivate(Name);
 deactivate(Node, Name) ->
-    rpc_call(Node, deactivate, [Node, Name]).
+    wrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)).
 
 delete_all_deactivated_alarms() ->
     [delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()].
 
-delete_all_deactivated_alarms(Node) when Node =:= node() ->
-    emqx_alarm:delete_all_deactivated_alarms();
 delete_all_deactivated_alarms(Node) ->
-    rpc_call(Node, delete_deactivated_alarms, [Node]).
+    wrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)).
 
 add_duration_field(Alarms) ->
     Now = erlang:system_time(microsecond),
@@ -562,12 +560,6 @@ item(route, {Topic, Node}) ->
 %% Internal Functions.
 %%--------------------------------------------------------------------
 
-rpc_call(Node, Fun, Args) ->
-    case rpc:call(Node, ?MODULE, Fun, Args) of
-        {badrpc, Reason} -> {error, Reason};
-        Res -> Res
-    end.
-
 wrap_rpc({badrpc, Reason}) ->
     {error, Reason};
 wrap_rpc(Res) ->

+ 2 - 3
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -579,9 +579,6 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) ->
     case do_unsubscribe(ClientID, Topic) of
         {error, channel_not_found} ->
             {404, ?CLIENT_ID_NOT_FOUND};
-        {error, Reason} ->
-            Message = list_to_binary(io_lib:format("~p", [Reason])),
-            {500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
         {unsubscribe, [{Topic, #{}}]} ->
             {200}
     end.
@@ -608,6 +605,8 @@ do_subscribe(ClientID, Topic0, Qos) ->
             end
     end.
 
+-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
+          {unsubscribe, _} | {error, channel_not_found}.
 do_unsubscribe(ClientID, Topic) ->
     case emqx_mgmt:unsubscribe(ClientID, Topic) of
         {error, Reason} ->

+ 30 - 0
apps/emqx_management/src/proto/emqx_management_proto_v1.erl

@@ -25,6 +25,13 @@
         , list_subscriptions/1
 
         , list_listeners/1
+        , remove_listener/2
+
+        , update_listener/3
+        , subscribe/3
+        , unsubscribe/3
+
+        , call_client/3
         ]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -47,3 +54,26 @@ list_subscriptions(Node) ->
 -spec list_listeners(node()) -> [map()] | {badrpc, _}.
 list_listeners(Node) ->
     rpc:call(Node, emqx_mgmt, do_list_listeners, []).
+
+-spec remove_listener(node(), string()) -> ok | {badrpc, _}.
+remove_listener(Node, Id) ->
+    rpc:call(Node, emqx_mgmt, do_remove_listener, [Id]).
+
+-spec update_listener(node(), string(), emqx_config:update_request()) ->
+          map() | {error, _} | {badrpc, _}.
+update_listener(Node, Id, Config) ->
+    rpc:call(Node, emqx_mgmt, do_update_listener, [Id, Config]).
+
+-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
+          {subscribe, _} | {error, atom()} | {badrpc, _}.
+subscribe(Node, ClientId, TopicTables) ->
+    rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]).
+
+-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) ->
+          {unsubscribe, _} | {error, _} | {badrpc, _}.
+unsubscribe(Node, ClientId, Topic) ->
+    rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).
+
+-spec call_client(node(), emqx_types:clientid(), term()) -> term().
+call_client(Node, ClientId, Req) ->
+    rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]).

+ 1 - 1
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -89,7 +89,7 @@ t_clients(_) ->
     {ok, _} =  emqx_mgmt_api_test_util:request_api(post, SubscribePath,
         "", AuthHeader, SubscribeBody),
     timer:sleep(100),
-    [{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
+    [{AfterSubTopic, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
     ?assertEqual(AfterSubTopic, Topic),
     ?assertEqual(AfterSubQos, Qos),