|
|
@@ -20,90 +20,98 @@
|
|
|
|
|
|
-export([start_link/0]).
|
|
|
|
|
|
--export([lookup_client/1]).
|
|
|
--export([register_client/1, register_client/2, unregister_client/1]).
|
|
|
-
|
|
|
--export([get_client_attrs/1, lookup_client_pid/1]).
|
|
|
--export([get_client_stats/1, set_client_stats/2]).
|
|
|
+-export([lookup_connection/1]).
|
|
|
+-export([register_connection/1, register_connection/2]).
|
|
|
+-export([unregister_connection/1]).
|
|
|
+-export([get_conn_attrs/1, set_conn_attrs/2]).
|
|
|
+-export([get_conn_stats/1, set_conn_stats/2]).
|
|
|
+-export([lookup_conn_pid/1]).
|
|
|
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
|
code_change/3]).
|
|
|
|
|
|
--record(state, {client_pmon}).
|
|
|
-
|
|
|
-define(CM, ?MODULE).
|
|
|
+
|
|
|
%% ETS Tables.
|
|
|
--define(CLIENT, emqx_client).
|
|
|
--define(CLIENT_ATTRS, emqx_client_attrs).
|
|
|
--define(CLIENT_STATS, emqx_client_stats).
|
|
|
+-define(CONN_TAB, emqx_conn).
|
|
|
+-define(CONN_ATTRS_TAB, emqx_conn_attrs).
|
|
|
+-define(CONN_STATS_TAB, emqx_conn_stats).
|
|
|
|
|
|
-%% @doc Start the client manager.
|
|
|
--spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
|
|
+%% @doc Start the connection manager.
|
|
|
+-spec(start_link() -> emqx_types:startlink_ret()).
|
|
|
start_link() ->
|
|
|
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
|
|
|
|
|
|
-%% @doc Lookup a client.
|
|
|
--spec(lookup_client(client_id()) -> list({client_id(), pid()})).
|
|
|
-lookup_client(ClientId) when is_binary(ClientId) ->
|
|
|
- ets:lookup(?CLIENT, ClientId).
|
|
|
-
|
|
|
-%% @doc Register a client.
|
|
|
--spec(register_client(client_id() | {client_id(), pid()}) -> ok).
|
|
|
-register_client(ClientId) when is_binary(ClientId) ->
|
|
|
- register_client({ClientId, self()});
|
|
|
-
|
|
|
-register_client({ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) ->
|
|
|
- register_client({ClientId, ClientPid}, []).
|
|
|
-
|
|
|
--spec(register_client({client_id(), pid()}, list()) -> ok).
|
|
|
-register_client(CObj = {ClientId, ClientPid}, Attrs) when is_binary(ClientId), is_pid(ClientPid) ->
|
|
|
- _ = ets:insert(?CLIENT, CObj),
|
|
|
- _ = ets:insert(?CLIENT_ATTRS, {CObj, Attrs}),
|
|
|
- notify({registered, ClientId, ClientPid}).
|
|
|
-
|
|
|
-%% @doc Get client attrs
|
|
|
--spec(get_client_attrs({client_id(), pid()}) -> list()).
|
|
|
-get_client_attrs(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) ->
|
|
|
+%% @doc Lookup a connection.
|
|
|
+-spec(lookup_connection(client_id()) -> list({client_id(), pid()})).
|
|
|
+lookup_connection(ClientId) when is_binary(ClientId) ->
|
|
|
+ ets:lookup(?CONN_TAB, ClientId).
|
|
|
+
|
|
|
+%% @doc Register a connection.
|
|
|
+-spec(register_connection(client_id() | {client_id(), pid()}) -> ok).
|
|
|
+register_connection(ClientId) when is_binary(ClientId) ->
|
|
|
+ register_connection({ClientId, self()});
|
|
|
+
|
|
|
+register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
|
+ _ = ets:insert(?CONN_TAB, Conn),
|
|
|
+ notify({registered, ClientId, ConnPid}).
|
|
|
+
|
|
|
+-spec(register_connection(client_id() | {client_id(), pid()}, list()) -> ok).
|
|
|
+register_connection(ClientId, Attrs) when is_binary(ClientId) ->
|
|
|
+ register_connection({ClientId, self()}, Attrs);
|
|
|
+register_connection(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
|
+ set_conn_attrs(Conn, Attrs),
|
|
|
+ register_connection(Conn).
|
|
|
+
|
|
|
+%% @doc Get conn attrs
|
|
|
+-spec(get_conn_attrs({client_id(), pid()}) -> list()).
|
|
|
+get_conn_attrs(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
|
try
|
|
|
- ets:lookup_element(?CLIENT_ATTRS, CObj, 2)
|
|
|
+ ets:lookup_element(?CONN_ATTRS_TAB, Conn, 2)
|
|
|
catch
|
|
|
error:badarg -> []
|
|
|
end.
|
|
|
|
|
|
-%% @doc Unregister a client.
|
|
|
--spec(unregister_client(client_id() | {client_id(), pid()}) -> ok).
|
|
|
-unregister_client(ClientId) when is_binary(ClientId) ->
|
|
|
- unregister_client({ClientId, self()});
|
|
|
-
|
|
|
-unregister_client(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) ->
|
|
|
- _ = ets:delete(?CLIENT_STATS, CObj),
|
|
|
- _ = ets:delete(?CLIENT_ATTRS, CObj),
|
|
|
- _ = ets:delete_object(?CLIENT, CObj),
|
|
|
- notify({unregistered, ClientId, ClientPid}).
|
|
|
-
|
|
|
-%% @doc Lookup client pid
|
|
|
--spec(lookup_client_pid(client_id()) -> pid() | undefined).
|
|
|
-lookup_client_pid(ClientId) when is_binary(ClientId) ->
|
|
|
- case ets:lookup(?CLIENT, ClientId) of
|
|
|
+%% @doc Set conn attrs
|
|
|
+set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) ->
|
|
|
+ set_conn_attrs({ClientId, self()}, Attrs);
|
|
|
+set_conn_attrs(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
|
+ ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}).
|
|
|
+
|
|
|
+%% @doc Unregister a conn.
|
|
|
+-spec(unregister_connection(client_id() | {client_id(), pid()}) -> ok).
|
|
|
+unregister_connection(ClientId) when is_binary(ClientId) ->
|
|
|
+ unregister_connection({ClientId, self()});
|
|
|
+
|
|
|
+unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
|
+ _ = ets:delete(?CONN_STATS_TAB, Conn),
|
|
|
+ _ = ets:delete(?CONN_ATTRS_TAB, Conn),
|
|
|
+ _ = ets:delete_object(?CONN_TAB, Conn),
|
|
|
+ notify({unregistered, ClientId, ConnPid}).
|
|
|
+
|
|
|
+%% @doc Lookup connection pid
|
|
|
+-spec(lookup_conn_pid(client_id()) -> pid() | undefined).
|
|
|
+lookup_conn_pid(ClientId) when is_binary(ClientId) ->
|
|
|
+ case ets:lookup(?CONN_TAB, ClientId) of
|
|
|
[] -> undefined;
|
|
|
[{_, Pid}] -> Pid
|
|
|
end.
|
|
|
|
|
|
-%% @doc Get client stats
|
|
|
--spec(get_client_stats({client_id(), pid()}) -> list(emqx_stats:stats())).
|
|
|
-get_client_stats(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) ->
|
|
|
- try ets:lookup_element(?CLIENT_STATS, CObj, 2)
|
|
|
+%% @doc Get conn stats
|
|
|
+-spec(get_conn_stats({client_id(), pid()}) -> list(emqx_stats:stats())).
|
|
|
+get_conn_stats(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
|
+ try ets:lookup_element(?CONN_STATS_TAB, Conn, 2)
|
|
|
catch
|
|
|
error:badarg -> []
|
|
|
end.
|
|
|
|
|
|
-%% @doc Set client stats.
|
|
|
--spec(set_client_stats(client_id(), list(emqx_stats:stats())) -> boolean()).
|
|
|
-set_client_stats(ClientId, Stats) when is_binary(ClientId) ->
|
|
|
- set_client_stats({ClientId, self()}, Stats);
|
|
|
+%% @doc Set conn stats.
|
|
|
+-spec(set_conn_stats(client_id(), list(emqx_stats:stats())) -> boolean()).
|
|
|
+set_conn_stats(ClientId, Stats) when is_binary(ClientId) ->
|
|
|
+ set_conn_stats({ClientId, self()}, Stats);
|
|
|
|
|
|
-set_client_stats(CObj = {ClientId, ClientPid}, Stats) when is_binary(ClientId), is_pid(ClientPid) ->
|
|
|
- ets:insert(?CLIENT_STATS, {CObj, Stats}).
|
|
|
+set_conn_stats(Conn = {ClientId, ConnPid}, Stats) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
|
+ ets:insert(?CONN_STATS_TAB, {Conn, Stats}).
|
|
|
|
|
|
notify(Msg) ->
|
|
|
gen_server:cast(?CM, {notify, Msg}).
|
|
|
@@ -114,52 +122,52 @@ notify(Msg) ->
|
|
|
|
|
|
init([]) ->
|
|
|
TabOpts = [public, set, {write_concurrency, true}],
|
|
|
- _ = emqx_tables:new(?CLIENT, [{read_concurrency, true} | TabOpts]),
|
|
|
- _ = emqx_tables:new(?CLIENT_ATTRS, TabOpts),
|
|
|
- _ = emqx_tables:new(?CLIENT_STATS, TabOpts),
|
|
|
- ok = emqx_stats:update_interval(cm_stats, fun update_client_stats/0),
|
|
|
- {ok, #state{client_pmon = emqx_pmon:new()}}.
|
|
|
+ _ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
|
|
|
+ _ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
|
|
|
+ _ = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
|
|
|
+ ok = emqx_stats:update_interval(cm_stats, fun update_conn_stats/0),
|
|
|
+ {ok, #{conn_pmon => emqx_pmon:new()}}.
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|
|
|
emqx_logger:error("[CM] unexpected call: ~p", [Req]),
|
|
|
{reply, ignored, State}.
|
|
|
|
|
|
-handle_cast({notify, {registered, ClientId, Pid}}, State = #state{client_pmon = PMon}) ->
|
|
|
- {noreply, State#state{client_pmon = emqx_pmon:monitor(Pid, ClientId, PMon)}};
|
|
|
+handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
|
|
+ {noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}};
|
|
|
|
|
|
-handle_cast({notify, {unregistered, _ClientId, Pid}}, State = #state{client_pmon = PMon}) ->
|
|
|
- {noreply, State#state{client_pmon = emqx_pmon:demonitor(Pid, PMon)}};
|
|
|
+handle_cast({notify, {unregistered, _ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
|
|
+ {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
|
|
|
|
|
|
handle_cast(Msg, State) ->
|
|
|
emqx_logger:error("[CM] unexpected cast: ~p", [Msg]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{client_pmon = PMon}) ->
|
|
|
- case emqx_pmon:find(DownPid, PMon) of
|
|
|
- undefined -> {noreply, State};
|
|
|
- ClientId ->
|
|
|
- unregister_client({ClientId, DownPid}),
|
|
|
- {noreply, State#state{client_pmon = emqx_pmon:erase(DownPid, PMon)}}
|
|
|
+handle_info({'DOWN', _MRef, process, ConnPid, _Reason}, State = #{conn_pmon := PMon}) ->
|
|
|
+ case emqx_pmon:find(ConnPid, PMon) of
|
|
|
+ undefined ->
|
|
|
+ {noreply, State};
|
|
|
+ ClientId ->
|
|
|
+ unregister_connection({ClientId, ConnPid}),
|
|
|
+ {noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}}
|
|
|
end;
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
emqx_logger:error("[CM] unexpected info: ~p", [Info]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-terminate(_Reason, _State = #state{}) ->
|
|
|
+terminate(_Reason, _State) ->
|
|
|
emqx_stats:cancel_update(cm_stats).
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
-%%-----------------------------------------------------------------------------
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
-%%-----------------------------------------------------------------------------
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
|
|
|
-update_client_stats() ->
|
|
|
- case ets:info(?CLIENT, size) of
|
|
|
+update_conn_stats() ->
|
|
|
+ case ets:info(?CONN_TAB, size) of
|
|
|
undefined -> ok;
|
|
|
- Size ->
|
|
|
- emqx_stats:setstat('clients/count', 'clients/max', Size)
|
|
|
+ Size -> emqx_stats:setstat('connections/count', 'connections/max', Size)
|
|
|
end.
|
|
|
|