فهرست منبع

Merge branch 'dev-hd' into dev

Feng 10 سال پیش
والد
کامیت
a68fa754fe
1فایلهای تغییر یافته به همراه320 افزوده شده و 133 حذف شده
  1. 320 133
      src/emqttd_vm.erl

+ 320 - 133
src/emqttd_vm.erl

@@ -27,7 +27,46 @@
 
 
 -module(emqttd_vm).
 -module(emqttd_vm).
 
 
--author("Feng Lee <feng@emqtt.io>").
+-define(UTIL_ALLOCATORS, [temp_alloc,
+                          eheap_alloc,
+                          binary_alloc,
+                          ets_alloc,
+                          driver_alloc,
+                          sl_alloc,
+                          ll_alloc,
+                          fix_alloc,
+                          std_alloc
+                         ]).
+
+-define(PROCESS_LIST, [initial_call,
+		       reductions,
+		       memory, 
+		       message_queue_len, 
+		       current_function]).
+
+-define(PROCESS_INFO, [initial_call, 
+		       current_function,
+		       registered_name,
+		       status,
+		       message_queue_len,
+		       group_leader,
+		       priority,
+		       trap_exit,
+		       reductions,
+		       binary,
+		       last_calls,
+		       catchlevel,
+		       trace,
+		       suspending,
+		       sequential_trace_token,
+		       error_handler]).
+
+-define(PROCESS_GC, [memory,
+		     total_heap_size,
+		     heap_size,
+		     stack_size,
+		     min_heap_size]).%,
+		     %fullsweep_after]).
 
 
 -export([timestamp/0, microsecs/0]).
 -export([timestamp/0, microsecs/0]).
 
 
@@ -87,31 +126,57 @@
 -define(SOCKET_OPTS, [
 -define(SOCKET_OPTS, [
                       active,
                       active,
                       broadcast,
                       broadcast,
+		      buffer,
+		      buffer,
+		      buffer,
+		      buffer,
                       delay_send,
                       delay_send,
                       dontroute,
                       dontroute,
                       exit_on_close,
                       exit_on_close,
                       header,
                       header,
+		      high_watermark,
+		      ipv6_v6only,
                       keepalive,
                       keepalive,
+		      linger,
+		      low_watermark,
+		      mode,
                       nodelay,
                       nodelay,
                       packet,
                       packet,
                       packet_size,
                       packet_size,
+		      priority,
                       read_packets,
                       read_packets,
                       recbuf,
                       recbuf,
                       reuseaddr,
                       reuseaddr,
                       send_timeout,
                       send_timeout,
                       send_timeout_close,
                       send_timeout_close,
                       sndbuf,
                       sndbuf,
-                      priority,
                       tos
                       tos
                      ]).
                      ]).
 
 
 
 
+-author("Feng Lee <feng@emqtt.io>").
+
+-export([timestamp/0, microsecs/0]).
+
+-export([loads/0,
+	scheduler_usage/1]).
+
+-export([get_memory/0]).
+
+-export([get_process_list/0,
+ 	 get_process_info/0,
+ 	 get_process_gc/0,
+ 	 get_process_group_leader_info/1]).
+	
+-export([get_ets_list/0,
+ 	 get_ets_info/0,
+	 get_ets_info/1,
+ 	 get_ets_object/0,
+	 get_ets_object/1]).
 
 
--export([get_system_info/0,
- 	% get_statistics/0, 
- 	% get_process_info/0,
- 	 get_ports_info/0,
- 	 get_ets_info/0]).
+-export([get_port_types/0,
+	 get_port_info/0,
+ 	 get_port_info/1]).
 
 
 timestamp() ->
 timestamp() ->
     {MegaSecs, Secs, _MicroSecs} = os:timestamp(),
     {MegaSecs, Secs, _MicroSecs} = os:timestamp(),
@@ -129,136 +194,258 @@ loads() ->
 ftos(F) -> 
 ftos(F) -> 
     [S] = io_lib:format("~.2f", [F]), S.
     [S] = io_lib:format("~.2f", [F]), S.
 
 
-get_system_info() ->
-    [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO].
-
-get_system_info(Key) ->
-    try erlang:system_info(Key) catch
-                                    error:badarg->undefined
-                                end.
-
-%% conversion functions for erlang:system_info(Key)
-
-format_system_info(allocated_areas, List) ->
-    [convert_allocated_areas(Value) || Value <- List];
-format_system_info(allocator, {_,_,_,List}) ->
-    List;
-format_system_info(dist_ctrl, List) ->
-    lists:map(fun({Node, Socket}) ->
-                      {ok, Stats} = inet:getstat(Socket),
-                      {Node, Stats}
-              end, List);
-format_system_info(driver_version, Value) ->
-    list_to_binary(Value);
-format_system_info(machine, Value) ->
-    list_to_binary(Value);
-format_system_info(otp_release, Value) ->
-    list_to_binary(Value);
-format_system_info(scheduler_bindings, Value) ->
-    tuple_to_list(Value);
-format_system_info(system_version, Value) ->
-    list_to_binary(Value);
-format_system_info(system_architecture, Value) ->
-    list_to_binary(Value);
-format_system_info(version, Value) ->
-    list_to_binary(Value);
-format_system_info(_, Value) ->
-    Value.
-
-convert_allocated_areas({Key, Value1, Value2}) ->
-    {Key, [Value1, Value2]};
-convert_allocated_areas({Key, Value}) ->
-    {Key, Value}.
-
-
-get_ports_info()->
-    [{pid_port_fun_to_atom(Port), get_port_info(Port)} || Port <- erlang:ports()].
-
-get_port_info(Port) ->
-    Stat = get_socket_getstat(Port),
-    SockName = get_socket_sockname(Port),
-    Opts = get_socket_opts(Port),
-    Protocol = get_socket_protocol(Port),
-    Status = get_socket_status(Port),
-    Type = get_socket_type(Port),
-
-    lists:flatten(lists:append([
-                                Stat,
-                                SockName,
-                                Opts,
-                                Protocol,
-                                Status,
-                                Type
-                               ])).
-
-get_socket_getstat(Socket) ->
-    case catch inet:getstat(Socket) of
-        {ok, Info} ->
-            Info;
-        _ ->
-            []
-    end.
-
-get_socket_sockname(Socket) ->
-    case catch inet:sockname(Socket) of
-        {ok, {Ip, Port}} ->
-            [{ip, ip_to_binary(Ip)}, {port, Port}];
-        _ ->
-            []
-    end.
-
-ip_to_binary(Tuple) ->
-    iolist_to_binary(string:join(lists:map(fun integer_to_list/1, tuple_to_list(Tuple)), ".")).
-
-
-get_socket_protocol(Socket) ->
-    case erlang:port_info(Socket, name) of
-        {name, "tcp_inet"} ->
-            [{protocol, tcp}];
-        {name, "udp_inet"} ->
-            [{protocol, udp}];
-        {name,"sctp_inet"} ->
-            [{protocol, sctp}];
-        _ ->
-            []
-    end.
-
-get_socket_status(Socket) ->
-    case catch prim_inet:getstatus(Socket) of
-        {ok, Status} ->
-            [{status, Status}];
-        _ ->
-         []
-    end.
+%%%% erlang vm scheduler_usage  fun copied from recon 
+scheduler_usage(Interval) when is_integer(Interval) ->
+    %% We start and stop the scheduler_wall_time system flag
+    %% if it wasn't in place already. Usually setting the flag 
+    %% should have a CPU impact(make it higher) only when under low usage.
+    FormerFlag = erlang:system_flag(scheduler_wall_time, true),
+    First = erlang:statistics(scheduler_wall_time),
+    timer:sleep(Interval),
+    Last = erlang:statistics(scheduler_wall_time),
+    erlang:system_flag(scheduler_wall_time, FormerFlag),
+    scheduler_usage_diff(First, Last).
+
+scheduler_usage_diff(First, Last) ->
+    lists:map(
+	    fun({{I, A0, T0},{I, A1, T1}}) ->{I, (A1 - A0)/(T1 - T0)}end,
+	    lists:zip(lists:sort(First), lists:sort(Last))
+    ).
+
+get_memory()->
+    [{Key, get_memory(Key, current)} || Key <- [used, allocated, unused, usage]] ++ erlang:memory().
+
+get_memory(used, Keyword) ->
+    lists:sum(lists:map(fun({_, Prop}) ->
+			        container_size(Prop, Keyword, blocks_size)
+			end, util_alloc()));
+get_memory(allocated, Keyword) ->
+    lists:sum(lists:map(fun({_, Prop})->
+		   		container_size(Prop, Keyword, carriers_size)
+	            	end, util_alloc()));
+get_memory(unused, Keyword) ->
+    get_memory(allocated, Keyword) - get_memory(used, Keyword);
+get_memory(usage, Keyword) ->
+    get_memory(used, Keyword) / get_memory(allocated, Keyword).
+
+util_alloc()->
+    alloc(?UTIL_ALLOCATORS).
+
+alloc()->
+    {_Mem, Allocs} = snapshot_int(),
+    Allocs.
+alloc(Type) ->
+    [{{T, Instance}, Props} || {{T, Instance}, Props} <- alloc(), lists:member(T, Type)].
+
+snapshot_int() ->
+    {erlang:memory(), allocators()}.
+
+allocators() ->
+    UtilAllocators = erlang:system_info(alloc_util_allocators),
+    Allocators = [sys_alloc, mseg_alloc|UtilAllocators],
+    [{{A, N},lists:sort(proplists:delete(versions, Props))} || 
+        A <- Allocators,
+	Allocs <- [erlang:system_info({allocator, A})],
+	Allocs =/= false,
+	{_, N, Props} <- Allocs].
+
+container_size(Prop, Keyword, Container) ->
+    Sbcs = container_value(Prop, Keyword, sbcs, Container),
+    Mbcs = container_value(Prop, Keyword, mbcs, Container),
+    Sbcs+Mbcs.
+
+container_value(Prop, Keyword, Type, Container) when is_atom(Keyword)->
+    container_value(Prop, 2, Type, Container);
+container_value(Props, Pos, mbcs = Type, Container) when is_integer(Pos)->
+    Pool = case proplists:get_value(mbcs_pool, Props) of
+	    PoolProps when PoolProps =/= undefined ->
+		    element(Pos, lists:keyfind(Container, 1, PoolProps));
+	    _ -> 0
+	   end,
+    TypeProps = proplists:get_value(Type, Props),
+    Pool + element(Pos, lists:keyfind(Container, 1, TypeProps));
+
+container_value(Props, Pos, Type, Container) ->
+    TypeProps = proplists:get_value(Type, Props),
+    element(Pos, lists:keyfind(Container, 1, TypeProps)).
+
+get_process_list()->
+    [get_process_list(Pid) || Pid <- processes()].
+
+get_process_list(Pid) when is_pid(Pid) ->
+    Info =  [process_info(Pid, Key) || Key <- ?PROCESS_LIST],
+    [{pid, pid_port_fun_to_atom(Pid)}] ++ lists:flatten([convert_pid_info(Item) || Item <- Info]).
+
+get_process_info() ->
+    [get_process_info(Pid) || Pid <- processes()].
+get_process_info(Pid) when is_pid(Pid) ->
+    ProcessInfo = [process_info(Pid, Key) || Key <- ?PROCESS_INFO],
+    lists:flatten([convert_pid_info(Item) || Item <- ProcessInfo]).
+
+get_process_gc() ->
+    [get_process_gc(Pid) || Pid <- processes()].
+get_process_gc(Pid) when is_pid(Pid) ->
+    GcInfo = [process_info(Pid, Key) || Key <- ?PROCESS_GC],
+    lists:flatten([convert_pid_info(E) || E <- GcInfo]).
+
+get_process_group_leader_info(LeaderPid) when is_pid(LeaderPid) ->
+    LeaderInfo = [{Key, Value}|| {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO)],
+    lists:flatten([convert_pid_info(E) || E <- LeaderInfo]).
+
+get_ets_list() ->
+     ets:all().
 
 
-get_socket_type(Socket) ->
-    case catch prim_inet:gettype(Socket) of
-        {ok, Type} ->
-            [{type, tuple_to_list(Type)}];
-        _ ->
-         []
+get_ets_info() ->
+    [get_ets_info(Tab) || Tab <- ets:all()].
+
+get_ets_info(Tab) ->
+    case ets:info(Tab) of
+	undefined ->
+	    [];
+	Entries when is_list(Entries) ->
+	    mapping(Entries)
+    end.	
+
+get_ets_object() ->
+    [{Tab, get_ets_object(Tab)} || Tab <- ets:all()].
+
+get_ets_object(Tab) ->
+    TabInfo = ets:info(Tab),
+    Size = proplists:get_value(size, TabInfo),
+    NameTab = proplists:get_value(named_table, TabInfo),
+    if  (Size == 0) or (NameTab == false) ->
+	[];
+    true ->
+	ets:tab2list(Tab)	
     end.
     end.
 
 
-get_socket_opts(Socket) ->
-    [get_socket_opts(Socket, Key) || Key <- ?SOCKET_OPTS].
+get_port_types() ->
+    lists:usort(fun({KA, VA},{KB, VB})-> {VA, KB} >{VB, KA} end,
+	ports_type_count([Type || {_Port, Type} <- ports_type_list()])).
+
+get_port_info() ->
+    [get_port_info(Port) ||Port <- erlang:ports()].
+
+get_port_info(PortTerm) ->
+    Port = transform_port(PortTerm),
+    [port_info(Port, Type) || Type <- [meta, signals, io, memory_used, specific]].
+
+port_info(Port, meta) ->
+	{meta, List} = port_info_type(Port, meta, [id, name, os_pid]),
+	case port_info(Port, registered_name)  of
+		[] -> {meta, List};
+		Name -> {meta, [Name | List]}
+end;
+
+port_info(PortTerm, signals) ->
+    port_info_type(PortTerm, signals, [connected, links, monitors]);
+
+port_info(PortTerm, io) ->
+    port_info_type(PortTerm, io, [input, output]);
+
+port_info(PortTerm, memory_used) ->
+    port_info_type(PortTerm, memory_used, [memory, queue_size]);
+
+port_info(PortTerm, specific) ->
+	Port = transform_port(PortTerm),
+	Props = case erlang:port_info(Port, name) of
+		{_, Type}  when Type =:= "udp_inet";
+				Type =:= "tcp_inet";
+				Type =:= "sctp_inet" ->
+					case catch inet:getstat(Port) of
+						{ok, Stats} -> [{statistics, Stats}];
+						_ ->[]
+
+		end ++
+		case catch inet:peername(Port) of
+			{ok, Peer} ->[{peername, Peer}];
+			{error, _} ->[]
+	end ++
+	case catch inet:sockname(Port) of
+		{ok, Local} ->[{sockname, Local}];
+		{error, _} -> []
+end ++
+case catch inet:getopts(Port, ?SOCKET_OPTS ) of
+	{ok, Opts} -> [{options, Opts}];
+	{error, _} -> []
+end;
+	{_, "efile"} ->
+		[];
+	_ ->[]
+end,
+	{specific, Props};
+port_info(PortTerm, Keys) when is_list(Keys) ->
+	Port = transform_port(PortTerm),
+	[erlang:port_info(Port, Key) || Key <- Keys];
+port_info(PortTerm, Key) when is_atom(Key) ->
+	Port = transform_port(PortTerm),
+	erlang:port_info(Port, Key).
+
+port_info_type(PortTerm, Type, Keys) ->
+	Port = transform_port(PortTerm),
+	{Type, [erlang:port_info(Port, Key) || Key <- Keys]}.
+
+transform_port(Port) when is_port(Port)  -> Port;
+transform_port("#Port<0." ++ Id) ->
+    N = list_to_integer(lists:sublist(Id, length(Id) - 1)),
+    transform_port(N);
+transform_port(N) when is_integer(N) ->
+    Name = iolist_to_binary(atom_to_list(node())),
+    NameLen = iolist_size(Name),
+    Vsn = binary:last(term_to_binary(self())),
+    Bin = <<131, 102, 100,
+    	    NameLen:2/unit:8,
+	    Name:NameLen/binary,
+	    N:4/unit:8,
+	    Vsn:8>>,
+    binary_to_term(Bin).
+
+ports_type_list() ->
+    [{Port, PortType} || Port <- erlang:ports(),
+		{_, PortType} <- [erlang:port_info(Port, name)]].
+
+ports_type_count(Types) ->
+    DictTypes = lists:foldl(fun(Type, Acc)->
+			dict:update_counter(Type, 1, Acc)	       
+		end, dict:new(), Types),
+    dict:to_list(DictTypes).
+
+mapping(Entries) ->
+    mapping(Entries, []).
+mapping([], Acc) ->
+    Acc;
+mapping([{owner, V}|Entries], Acc) when is_pid(V) ->
+    OwnerInfo = process_info(V),
+    Owner = proplists:get_value(registered_name, OwnerInfo, undefined),
+    mapping(Entries, [{owner, pid_port_fun_to_atom(Owner)}|Acc]);
+mapping([{Key, Value}|Entries], Acc) ->
+    mapping(Entries, [{Key, pid_port_fun_to_atom(Value)}|Acc]).
+
+%ip_to_binary(Tuple) ->
+%    iolist_to_binary(string:join(lists:map(fun integer_to_list/1, tuple_to_list(Tuple)), ".")).
+
+convert_pid_info({initial_call,{_M, F, _A}}) ->
+    {initial_call, F};
+convert_pid_info({current_function, {M, F, A}}) ->
+    {current_function, list_to_atom(lists:concat([atom_to_list(M),":",atom_to_list(F),"/",integer_to_list(A)]))};
+convert_pid_info({suspending, List}) ->
+    {suspending, [pid_port_fun_to_atom(E) || E <- List]};
+convert_pid_info({binary, List}) ->
+    {binary,[tuple_to_list(E) || E <- List]};
+convert_pid_info({Key, Term}) when is_pid(Term) or is_port(Term) or is_function(Term) ->
+    {Key, pid_port_fun_to_atom(Term)};
+convert_pid_info(Item) ->
+    Item.
+
+%convert_port_info({name, Name}) ->
+%    {name, list_to_binary(Name)};
+%convert_port_info({links, List}) ->
+%    {links, [pid_port_fun_to_atom(Item) || Item <- List]};
+%convert_port_info({connected, Pid}) ->
+%    erlang:process_info(Pid, registered_name);
+%convert_port_info(Item) ->
+%    Item.
 
 
-get_socket_opts(Socket, Key) ->
-    case catch inet:getopts(Socket, [Key]) of
-        {ok, Opt} ->
-            Opt;
-        _ ->
-            []
-    end.
-
-get_ets_info() ->
-    [{Tab, get_ets_dets_info(ets, Tab)} || Tab <- ets:all()].
-
-get_ets_dets_info(Type, Tab) ->
-    case Type:info(Tab) of
-        undefined -> [];
-        Entries when is_list(Entries) ->
-            [{Key, pid_port_fun_to_atom(Value)} || {Key, Value} <- Entries]
-    end. 
 
 
 pid_port_fun_to_atom(Term) when is_pid(Term) ->
 pid_port_fun_to_atom(Term) when is_pid(Term) ->
     erlang:list_to_atom(pid_to_list(Term));
     erlang:list_to_atom(pid_to_list(Term));