Feng Lee пре 10 година
родитељ
комит
e0bbc72f91

+ 3 - 2
apps/emqttd/include/emqttd.hrl

@@ -80,9 +80,10 @@
     clientid    :: binary() | undefined,
     username    :: binary() | undefined,
     ipaddress   :: inet:ip_address(),
+    client_pid  :: pid(),
+    client_mon  :: reference(),
     clean_sess  :: boolean(),
-    proto_ver   :: 3 | 4,
-    client_pid  :: pid()
+    proto_ver   :: 3 | 4
 }).
 
 -type mqtt_client() :: #mqtt_client{}.

+ 22 - 11
apps/emqttd/src/emqttd_cm.erl

@@ -82,10 +82,10 @@ lookup(ClientId) when is_binary(ClientId) ->
 %% @doc Register clientId with pid.
 %% @end
 %%------------------------------------------------------------------------------
--spec register(ClientId :: binary()) -> ok.
-register(ClientId) when is_binary(ClientId) ->
+-spec register(Client :: mqtt_client()) -> ok.
+register(Client = #mqtt_client{clientid = ClientId}) ->
     CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
-    gen_server:call(CmPid, {register, ClientId, self()}, infinity).
+    gen_server:call(CmPid, {register, Client}, infinity).
 
 %%------------------------------------------------------------------------------
 %% @doc Unregister clientId with pid.
@@ -104,18 +104,18 @@ init([Id, StatsFun]) ->
     gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
     {ok, #state{id = Id, statsfun = StatsFun}}.
 
-handle_call({register, ClientId, Pid}, _From, State) ->
+handle_call({register, Client = #mqtt_client{clientid = ClientId, client_pid = Pid}}, _From, State) ->
 	case ets:lookup(?CLIENT_TAB, ClientId) of
-        [{_, Pid, _}] ->
+        [#mqtt_client{client_pid = Pid}] ->
 			lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
             ignore;
-		[{_, OldPid, MRef}] ->
+		[#mqtt_client{client_pid = OldPid, client_mon = MRef}] ->
 			lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
 			OldPid ! {stop, duplicate_id, Pid},
 			erlang:demonitor(MRef),
-            ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)});
+            ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)});
 		[] -> 
-            ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)})
+            ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)})
 	end,
     {reply, ok, setstats(State)};
 
@@ -125,7 +125,7 @@ handle_call(Req, _From, State) ->
 
 handle_cast({unregister, ClientId, Pid}, State) ->
 	case ets:lookup(?CLIENT_TAB, ClientId) of
-	[{_, Pid, MRef}] ->
+	[#mqtt_client{client_pid = Pid, client_mon = MRef}] ->
 		erlang:demonitor(MRef, [flush]),
 		ets:delete(?CLIENT_TAB, ClientId);
 	[_] -> 
@@ -138,8 +138,18 @@ handle_cast({unregister, ClientId, Pid}, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
-	ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}),
+handle_info({'DOWN', MRef, process, DownPid, Reason}, State) ->
+    case ets:match_object(?CLIENT_TAB, {mqtt_client, '$1', '_', '_', DownPid, MRef, '_', '_'}) of
+        [] ->
+            ignore;
+        Clients ->
+            lists:foreach(
+                fun(Client = #mqtt_client{clientid = ClientId}) ->
+                        ets:delete_object(?CLIENT_TAB, Client),
+                        lager:error("Client ~s is Down: ~p", [ClientId, Reason]),
+                        emqttd_broker:foreach_hooks(client_disconnected, [Reason, ClientId])
+                end, Clients)
+    end,
     {noreply, setstats(State)};
 
 handle_info(_Info, State) ->
@@ -158,3 +168,4 @@ code_change(_OldVsn, State, _Extra) ->
 setstats(State = #state{statsfun = StatsFun}) ->
     StatsFun(ets:info(?CLIENT_TAB, size)), State.
 
+

+ 1 - 1
apps/emqttd/src/emqttd_cm_sup.erl

@@ -42,7 +42,7 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    ets:new(emqttd_cm:table(), [set, named_table, public,
+    ets:new(emqttd_cm:table(), [set, named_table, public, {keypos, 2},
                                 {write_concurrency, true}]),
     Schedulers = erlang:system_info(schedulers),
     gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]),

+ 220 - 0
apps/emqttd/src/emqttd_vm.erl

@@ -33,6 +33,87 @@
 
 -export([loads/0]).
 
+-define(SYSTEM_INFO, [
+                      allocated_areas,
+                      allocator,
+                      alloc_util_allocators,
+                      build_type,
+                      check_io,
+                      compat_rel,
+                      creation,
+                      debug_compiled,
+                      dist,
+                      dist_ctrl,
+                      driver_version,
+                      elib_malloc,
+                      dist_buf_busy_limit,
+                      %fullsweep_after, % included in garbage_collection
+                      garbage_collection,
+                      %global_heaps_size, % deprecated
+                      heap_sizes,
+                      heap_type,
+                      info,
+                      kernel_poll,
+                      loaded,
+                      logical_processors,
+                      logical_processors_available,
+                      logical_processors_online,
+                      machine,
+                      %min_heap_size, % included in garbage_collection
+                      %min_bin_vheap_size, % included in garbage_collection
+                      modified_timing_level,
+                      multi_scheduling,
+                      multi_scheduling_blockers,
+                      otp_release,
+                      port_count,
+                      process_count,
+                      process_limit,
+                      scheduler_bind_type,
+                      scheduler_bindings,
+                      scheduler_id,
+                      schedulers,
+                      schedulers_online,
+                      smp_support,
+                      system_version,
+                      system_architecture,
+                      threads,
+                      thread_pool_size,
+                      trace_control_word,
+                      update_cpu_info,
+                      version,
+                      wordsize
+                  ]).
+
+-define(SOCKET_OPTS, [
+                      active,
+                      broadcast,
+                      delay_send,
+                      dontroute,
+                      exit_on_close,
+                      header,
+                      keepalive,
+                      nodelay,
+                      packet,
+                      packet_size,
+                      read_packets,
+                      recbuf,
+                      reuseaddr,
+                      send_timeout,
+                      send_timeout_close,
+                      sndbuf,
+                      priority,
+                      tos
+                     ]).
+
+
+
+-export([loads/0,
+	 get_system_info/0,
+ 	% get_statistics/0, 
+ 	% get_process_info/0,
+ 	 get_ports_info/0,
+ 	 get_ets_info/0]).
+
 timestamp() ->
     {MegaSecs, Secs, _MicroSecs} = os:timestamp(),
     MegaSecs * 1000000 + Secs.
@@ -49,3 +130,142 @@ loads() ->
 ftos(F) -> 
     [S] = io_lib:format("~.2f", [F]), S.
 
+get_system_info() ->
+    [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO].
+
+get_system_info(Key) ->
+    try erlang:system_info(Key) catch
+                                    error:badarg->undefined
+                                end.
+
+%% conversion functions for erlang:system_info(Key)
+
+format_system_info(allocated_areas, List) ->
+    [convert_allocated_areas(Value) || Value <- List];
+format_system_info(allocator, {_,_,_,List}) ->
+    List;
+format_system_info(dist_ctrl, List) ->
+    lists:map(fun({Node, Socket}) ->
+                      {ok, Stats} = inet:getstat(Socket),
+                      {Node, Stats}
+              end, List);
+format_system_info(driver_version, Value) ->
+    list_to_binary(Value);
+format_system_info(machine, Value) ->
+    list_to_binary(Value);
+format_system_info(otp_release, Value) ->
+    list_to_binary(Value);
+format_system_info(scheduler_bindings, Value) ->
+    tuple_to_list(Value);
+format_system_info(system_version, Value) ->
+    list_to_binary(Value);
+format_system_info(system_architecture, Value) ->
+    list_to_binary(Value);
+format_system_info(version, Value) ->
+    list_to_binary(Value);
+format_system_info(_, Value) ->
+    Value.
+
+convert_allocated_areas({Key, Value1, Value2}) ->
+    {Key, [Value1, Value2]};
+convert_allocated_areas({Key, Value}) ->
+    {Key, Value}.
+
+
+get_ports_info()->
+    [{pid_port_fun_to_atom(Port), get_port_info(Port)} || Port <- erlang:ports()].
+
+get_port_info(Port) ->
+    Stat = get_socket_getstat(Port),
+    SockName = get_socket_sockname(Port),
+    Opts = get_socket_opts(Port),
+    Protocol = get_socket_protocol(Port),
+    Status = get_socket_status(Port),
+    Type = get_socket_type(Port),
+
+    lists:flatten(lists:append([
+                                Stat,
+                                SockName,
+                                Opts,
+                                Protocol,
+                                Status,
+                                Type
+                               ])).
+
+get_socket_getstat(Socket) ->
+    case catch inet:getstat(Socket) of
+        {ok, Info} ->
+            Info;
+        _ ->
+            []
+    end.
+
+get_socket_sockname(Socket) ->
+    case catch inet:sockname(Socket) of
+        {ok, {Ip, Port}} ->
+            [{ip, ip_to_binary(Ip)}, {port, Port}];
+        _ ->
+            []
+    end.
+
+ip_to_binary(Tuple) ->
+    iolist_to_binary(string:join(lists:map(fun integer_to_list/1, tuple_to_list(Tuple)), ".")).
+
+
+get_socket_protocol(Socket) ->
+    case erlang:port_info(Socket, name) of
+        {name, "tcp_inet"} ->
+            [{protocol, tcp}];
+        {name, "udp_inet"} ->
+            [{protocol, udp}];
+        {name,"sctp_inet"} ->
+            [{protocol, sctp}];
+        _ ->
+            []
+    end.
+
+get_socket_status(Socket) ->
+    case catch prim_inet:getstatus(Socket) of
+        {ok, Status} ->
+            [{status, Status}];
+        _ ->
+         []
+    end.
+
+get_socket_type(Socket) ->
+    case catch prim_inet:gettype(Socket) of
+        {ok, Type} ->
+            [{type, tuple_to_list(Type)}];
+        _ ->
+         []
+    end.
+
+get_socket_opts(Socket) ->
+    [get_socket_opts(Socket, Key) || Key <- ?SOCKET_OPTS].
+
+get_socket_opts(Socket, Key) ->
+    case catch inet:getopts(Socket, [Key]) of
+        {ok, Opt} ->
+            Opt;
+        _ ->
+            []
+    end.
+
+get_ets_info() ->
+    [{Tab, get_ets_dets_info(ets, Tab)} || Tab <- ets:all()].
+
+get_ets_dets_info(Type, Tab) ->
+    case Type:info(Tab) of
+        undefined -> [];
+        Entries when is_list(Entries) ->
+            [{Key, pid_port_fun_to_atom(Value)} || {Key, Value} <- Entries]
+    end. 
+
+pid_port_fun_to_atom(Term) when is_pid(Term) ->
+    erlang:list_to_atom(pid_to_list(Term));
+pid_port_fun_to_atom(Term) when is_port(Term) ->
+    erlang:list_to_atom(erlang:port_to_list(Term));
+pid_port_fun_to_atom(Term) when is_function(Term) ->
+    erlang:list_to_atom(erlang:fun_to_list(Term));
+pid_port_fun_to_atom(Term) ->
+    Term.