Просмотр исходного кода

chore(emqx_management): merge enterprise to opensource

zhanghongtong 5 лет назад
Родитель
Сommit
4e9330a37e

+ 18 - 5
lib-ce/emqx_management/src/emqx_mgmt_api_pubsub.erl

@@ -78,7 +78,19 @@ subscribe(_Bindings, Params) ->
 publish(_Bindings, Params) ->
     logger:debug("API publish Params:~p", [Params]),
     {ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params),
-    return(do_publish(ClientId, Topic, Qos, Retain, Payload)).
+    case do_publish(ClientId, Topic, Qos, Retain, Payload) of
+        {ok, MsgIds} ->
+            case get_value(<<"return">>, Params, undefined) of
+                undefined -> return(ok);
+                _Val ->
+                    case get_value(<<"topics">>, Params, undefined) of
+                        undefined -> return({ok, #{msgid => lists:last(MsgIds)}});
+                        _ -> return({ok, #{msgids => MsgIds}})
+                    end
+            end;
+        Result ->
+            return(Result)
+    end.
 
 unsubscribe(_Bindings, Params) ->
     logger:debug("API unsubscribe Params:~p", [Params]),
@@ -119,7 +131,7 @@ loop_publish([], Result) ->
 loop_publish([Params | ParamsN], Acc) ->
     {ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params),
     Code = case do_publish(ClientId, Topic, Qos, Retain, Payload) of
-        ok -> 0;
+        {ok, _} -> 0;
         {_, Code0, _} -> Code0
     end,
     Result = #{topic => resp_topic(get_value(<<"topic">>, Params), get_value(<<"topics">>, Params, <<"">>)),
@@ -153,11 +165,12 @@ do_subscribe(ClientId, Topics, QoS) ->
 do_publish(_ClientId, [], _Qos, _Retain, _Payload) ->
     {ok, ?ERROR15, bad_topic};
 do_publish(ClientId, Topics, Qos, Retain, Payload) ->
-    lists:foreach(fun(Topic) ->
+    MsgIds = lists:map(fun(Topic) ->
         Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
-        emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}})
+        emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}}),
+        emqx_guid:to_hexstr(Msg#message.id)
     end, Topics),
-    ok.
+    {ok, MsgIds}.
 
 do_unsubscribe(ClientId, Topic) ->
     case validate_by_filter(Topic) of

+ 4 - 0
lib-ce/emqx_management/src/emqx_mgmt_http.erl

@@ -119,11 +119,15 @@ authorize_appid(Req) ->
          _  -> false
     end.
 
+-ifdef(EMQX_ENTERPRISE).
+filter(_) -> true.
+-else.
 filter(#{app := App}) ->
     case emqx_plugins:find_plugin(App) of
         false -> false;
         Plugin -> Plugin#plugin.active
     end.
+-endif.
 
 format(Port) when is_integer(Port) ->
     io_lib:format("0.0.0.0:~w", [Port]);

+ 79 - 6
lib-ce/emqx_management/src/emqx_mgmt_api_topic_metrics.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_mgmt_api_topic_metrics).
+-module(emqx_mod_api_topic_metrics).
 
 -import(minirest, [return/1]).
 
@@ -58,7 +58,7 @@ list(#{topic := Topic0}, _Params) ->
         Topic = emqx_mgmt_util:urldecode(Topic0),
         case safe_validate(Topic) of
             true -> 
-                case emqx_mgmt:get_topic_metrics(Topic) of
+                case get_topic_metrics(Topic) of
                     {error, Reason} -> return({error, Reason});
                     Metrics         -> return({ok, maps:from_list(Metrics)})
                 end;
@@ -69,7 +69,7 @@ list(#{topic := Topic0}, _Params) ->
 
 list(_Bindings, _Params) ->
     execute_when_enabled(fun() ->
-        case emqx_mgmt:get_all_topic_metrics() of
+        case get_all_topic_metrics() of
             {error, Reason} -> return({error, Reason});
             Metrics         -> return({ok, Metrics})
         end
@@ -83,7 +83,7 @@ register(_Bindings, Params) ->
             Topic ->
                 case safe_validate(Topic) of
                     true -> 
-                        emqx_mgmt:register_topic_metrics(Topic),
+                        register_topic_metrics(Topic),
                         return(ok);
                     false ->
                         return({error, invalid_topic_name})
@@ -93,7 +93,7 @@ register(_Bindings, Params) ->
 
 unregister(Bindings, _Params) when map_size(Bindings) =:= 0 ->
     execute_when_enabled(fun() ->
-        emqx_mgmt:unregister_all_topic_metrics(),
+        unregister_all_topic_metrics(),
         return(ok)
     end);
 
@@ -102,7 +102,7 @@ unregister(#{topic := Topic0}, _Params) ->
         Topic = emqx_mgmt_util:urldecode(Topic0),
         case safe_validate(Topic) of
             true -> 
-                emqx_mgmt:unregister_topic_metrics(Topic),
+                unregister_topic_metrics(Topic),
                 return(ok);
             false ->
                 return({error, invalid_topic_name})
@@ -128,3 +128,76 @@ safe_validate(Topic) ->
         error:_Error ->
             false
     end.
+
+get_all_topic_metrics() ->
+    lists:foldl(fun(Topic, Acc) ->
+                    case get_topic_metrics(Topic) of
+                        {error, _Reason} ->
+                            Acc;
+                        Metrics ->
+                            [#{topic => Topic, metrics => Metrics} | Acc]
+                    end
+                end, [], emqx_mod_topic_metrics:all_registered_topics()).
+
+get_topic_metrics(Topic) ->
+    lists:foldl(fun(Node, Acc) ->
+                    case get_topic_metrics(Node, Topic) of
+                        {error, _Reason} ->
+                            Acc;
+                        Metrics ->
+                            case Acc of
+                                [] -> Metrics;
+                                _ ->
+                                    lists:foldl(fun({K, V}, Acc0) ->
+                                                    [{K, V + proplists:get_value(K, Metrics, 0)} | Acc0]
+                                                end, [], Acc)
+                            end
+                    end
+                end, [], ekka_mnesia:running_nodes()).
+
+get_topic_metrics(Node, Topic) when Node =:= node() ->
+    emqx_mod_topic_metrics:metrics(Topic);
+get_topic_metrics(Node, Topic) ->
+    rpc_call(Node, get_topic_metrics, [Node, Topic]).
+
+register_topic_metrics(Topic) ->
+    Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
+    case lists:any(fun(Item) -> Item =:= ok end, Results) of
+        true  -> ok;
+        false -> lists:last(Results)
+    end.
+
+register_topic_metrics(Node, Topic) when Node =:= node() ->
+    emqx_mod_topic_metrics:register(Topic);
+register_topic_metrics(Node, Topic) ->
+    rpc_call(Node, register_topic_metrics, [Node, Topic]).
+
+unregister_topic_metrics(Topic) ->
+    Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
+    case lists:any(fun(Item) -> Item =:= ok end, Results) of
+        true  -> ok;
+        false -> lists:last(Results)
+    end.
+
+unregister_topic_metrics(Node, Topic) when Node =:= node() ->
+    emqx_mod_topic_metrics:unregister(Topic);
+unregister_topic_metrics(Node, Topic) ->
+    rpc_call(Node, unregister_topic_metrics, [Node, Topic]).
+
+unregister_all_topic_metrics() ->
+    Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()],
+    case lists:any(fun(Item) -> Item =:= ok end, Results) of
+        true  -> ok;
+        false -> lists:last(Results)
+    end.
+
+unregister_all_topic_metrics(Node) when Node =:= node() ->
+    emqx_mod_topic_metrics:unregister_all();
+unregister_all_topic_metrics(Node) ->
+    rpc_call(Node, unregister_topic_metrics, [Node]).
+
+rpc_call(Node, Fun, Args) ->
+    case rpc:call(Node, ?MODULE, Fun, Args) of
+        {badrpc, Reason} -> {error, Reason};
+        Res -> Res
+    end.

+ 40 - 0
lib-ce/emqx_modules/src/emqx_modules.erl

@@ -30,6 +30,8 @@
         , load_module/2
         ]).
 
+-export([cli/1]).
+
 %% @doc List all available plugins
 -spec(list() -> [{atom(), boolean()}]).
 list() ->
@@ -170,3 +172,41 @@ write_loaded(true) ->
             ok
     end;
 write_loaded(false) -> ok.
+
+%%--------------------------------------------------------------------
+%% @doc Modules Command
+cli(["list"]) ->
+    foreach(fun(Module) -> print({module, Module}) end, emqx_modules:list());
+
+cli(["load", Name]) ->
+    case emqx_modules:load(list_to_atom(Name)) of
+        ok ->
+            emqx_ctl:print("Module ~s loaded successfully.~n", [Name]);
+        {error, Reason}   ->
+            emqx_ctl:print("Load module ~s error: ~p.~n", [Name, Reason])
+    end;
+
+cli(["unload", Name]) ->
+    case emqx_modules:unload(list_to_atom(Name)) of
+        ok ->
+            emqx_ctl:print("Module ~s unloaded successfully.~n", [Name]);
+        {error, Reason} ->
+            emqx_ctl:print("Unload module ~s error: ~p.~n", [Name, Reason])
+    end;
+
+cli(["reload", "emqx_mod_acl_internal" = Name]) ->
+    case emqx_modules:reload(list_to_atom(Name)) of
+        ok ->
+            emqx_ctl:print("Module ~s reloaded successfully.~n", [Name]);
+        {error, Reason} ->
+            emqx_ctl:print("Reload module ~s error: ~p.~n", [Name, Reason])
+    end;
+cli(["reload", Name]) ->
+    emqx_ctl:print("Module: ~p does not need to be reloaded.~n", [Name]);
+
+cli(_) ->
+    emqx_ctl:usage([{"modules list",            "Show loaded modules"},
+                    {"modules load <Module>",   "Load module"},
+                    {"modules unload <Module>", "Unload module"},
+                    {"modules reload <Module>", "Reload module"}
+                   ]).

+ 41 - 11
lib-ce/emqx_management/src/emqx_mgmt_api_modules.erl

@@ -14,9 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_mgmt_api_modules).
-
--include("emqx_mgmt.hrl").
+-module(emqx_modules_api).
 
 -import(minirest, [return/1]).
 
@@ -75,16 +73,16 @@
         ]).
 
 list(#{node := Node}, _Params) ->
-    return({ok, [format(Module) || Module <- emqx_mgmt:list_modules(Node)]});
+    return({ok, [format(Module) || Module <- list_modules(Node)]});
 
 list(_Bindings, _Params) ->
-    return({ok, [format(Node, Modules) || {Node, Modules} <- emqx_mgmt:list_modules()]}).
+    return({ok, [format(Node, Modules) || {Node, Modules} <- list_modules()]}).
 
 load(#{node := Node, module := Module}, _Params) ->
-    return(emqx_mgmt:load_module(Node, Module));
+    return(load_module(Node, Module));
 
 load(#{module := Module}, _Params) ->
-    Results = [emqx_mgmt:load_module(Node, Module) || {Node, _Info} <- emqx_mgmt:list_nodes()],
+    Results = [load_module(Node, Module) || {Node, _Info} <- list_nodes()],
     case lists:filter(fun(Item) -> Item =/= ok end, Results) of
         [] ->
             return(ok);
@@ -93,10 +91,10 @@ load(#{module := Module}, _Params) ->
     end.
 
 unload(#{node := Node, module := Module}, _Params) ->
-    return(emqx_mgmt:unload_module(Node, Module));
+    return(unload_module(Node, Module));
 
 unload(#{module := Module}, _Params) ->
-    Results = [emqx_mgmt:unload_module(Node, Module) || {Node, _Info} <- emqx_mgmt:list_nodes()],
+    Results = [unload_module(Node, Module) || {Node, _Info} <- list_nodes()],
     case lists:filter(fun(Item) -> Item =/= ok end, Results) of
         [] ->
             return(ok);
@@ -105,13 +103,13 @@ unload(#{module := Module}, _Params) ->
     end.
 
 reload(#{node := Node, module := Module}, _Params) ->
-    case emqx_mgmt:reload_module(Node, Module) of
+    case reload_module(Node, Module) of
         ignore -> return(ok);
         Result -> return(Result)
     end;
 
 reload(#{module := Module}, _Params) ->
-    Results = [emqx_mgmt:reload_module(Node, Module) || {Node, _Info} <- emqx_mgmt:list_nodes()],
+    Results = [reload_module(Node, Module) || {Node, _Info} <- list_nodes()],
     case lists:filter(fun(Item) -> Item =/= ok end, Results) of
         [] ->
             return(ok);
@@ -119,6 +117,10 @@ reload(#{module := Module}, _Params) ->
             return(lists:last(Errors))
     end.
 
+%%------------------------------------------------------------------------------
+%% Internal Functions
+%%------------------------------------------------------------------------------
+
 format(Node, Modules) ->
     #{node => Node, modules => [format(Module) || Module <- Modules]}.
 
@@ -127,3 +129,31 @@ format({Name, Active}) ->
       description => iolist_to_binary(Name:description()),
       active => Active}.
 
+list_modules() ->
+    [{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()].
+
+list_modules(Node) when Node =:= node() ->
+    emqx_modules:list();
+list_modules(Node) ->
+    rpc_call(Node, list_modules, [Node]).
+
+load_module(Node, Module) when Node =:= node() ->
+    emqx_modules:load(Module);
+load_module(Node, Module) ->
+    rpc_call(Node, load_module, [Node, Module]).
+
+unload_module(Node, Module) when Node =:= node() ->
+    emqx_modules:unload(Module);
+unload_module(Node, Module) ->
+    rpc_call(Node, unload_module, [Node, Module]).
+
+reload_module(Node, Module) when Node =:= node() ->
+    emqx_modules:reload(Module);
+reload_module(Node, Module) ->
+    rpc_call(Node, reload_module, [Node, Module]).
+
+rpc_call(Node, Fun, Args) ->
+    case rpc:call(Node, ?MODULE, Fun, Args) of
+        {badrpc, Reason} -> {error, Reason};
+        Res -> Res
+    end.

+ 1 - 0
lib-ce/emqx_modules/src/emqx_modules_app.erl

@@ -30,6 +30,7 @@ start(_Type, _Args) ->
     application:load(emqx),
     {ok, Pid} = emqx_mod_sup:start_link(),
     ok = emqx_modules:load(),
+    emqx_ctl:register_command(modules, {emqx_modules, cli}, []),
     {ok, Pid}.
 
 stop(_State) ->