|
|
@@ -94,19 +94,19 @@ start_link() ->
|
|
|
|
|
|
%% @doc Register a channel.
|
|
|
%% Channel will be unregistered automatically when the channel process dies
|
|
|
--spec(register_channel(emqx_types:client_id()) -> ok).
|
|
|
+-spec(register_channel(emqx_types:clientid()) -> ok).
|
|
|
register_channel(ClientId) when is_binary(ClientId) ->
|
|
|
register_channel(ClientId, self()).
|
|
|
|
|
|
%% @doc Register a channel with pid.
|
|
|
--spec(register_channel(emqx_types:client_id(), chan_pid()) -> ok).
|
|
|
+-spec(register_channel(emqx_types:clientid(), chan_pid()) -> ok).
|
|
|
register_channel(ClientId, ChanPid) ->
|
|
|
Chan = {ClientId, ChanPid},
|
|
|
true = ets:insert(?CHAN_TAB, Chan),
|
|
|
ok = emqx_cm_registry:register_channel(Chan),
|
|
|
cast({registered, Chan}).
|
|
|
|
|
|
--spec(unregister_channel(emqx_types:client_id()) -> ok).
|
|
|
+-spec(unregister_channel(emqx_types:clientid()) -> ok).
|
|
|
unregister_channel(ClientId) when is_binary(ClientId) ->
|
|
|
true = do_unregister_channel({ClientId, self()}),
|
|
|
ok.
|
|
|
@@ -119,31 +119,31 @@ do_unregister_channel(Chan) ->
|
|
|
true = ets:delete(?CHAN_STATS_TAB, Chan),
|
|
|
ets:delete_object(?CHAN_TAB, Chan).
|
|
|
|
|
|
-%% @doc Get attrs of a channel.
|
|
|
--spec(get_chan_attrs(emqx_types:client_id()) -> maybe(emqx_types:attrs())).
|
|
|
+%% @doc Get info of a channel.
|
|
|
+-spec(get_chan_attrs(emqx_types:clientid()) -> maybe(emqx_types:attrs())).
|
|
|
get_chan_attrs(ClientId) ->
|
|
|
with_channel(ClientId, fun(ChanPid) -> get_chan_attrs(ClientId, ChanPid) end).
|
|
|
|
|
|
--spec(get_chan_attrs(emqx_types:client_id(), chan_pid()) -> maybe(emqx_types:attrs())).
|
|
|
+-spec(get_chan_attrs(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:attrs())).
|
|
|
get_chan_attrs(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
|
Chan = {ClientId, ChanPid},
|
|
|
emqx_tables:lookup_value(?CHAN_ATTRS_TAB, Chan);
|
|
|
get_chan_attrs(ClientId, ChanPid) ->
|
|
|
rpc_call(node(ChanPid), get_chan_attrs, [ClientId, ChanPid]).
|
|
|
|
|
|
-%% @doc Set attrs of a channel.
|
|
|
--spec(set_chan_attrs(emqx_types:client_id(), emqx_types:attrs()) -> ok).
|
|
|
-set_chan_attrs(ClientId, Attrs) when is_binary(ClientId) ->
|
|
|
+%% @doc Set info of a channel.
|
|
|
+-spec(set_chan_attrs(emqx_types:clientid(), emqx_types:attrs()) -> ok).
|
|
|
+set_chan_attrs(ClientId, Info) when is_binary(ClientId) ->
|
|
|
Chan = {ClientId, self()},
|
|
|
- true = ets:insert(?CHAN_ATTRS_TAB, {Chan, Attrs}),
|
|
|
+ true = ets:insert(?CHAN_ATTRS_TAB, {Chan, Info}),
|
|
|
ok.
|
|
|
|
|
|
%% @doc Get channel's stats.
|
|
|
--spec(get_chan_stats(emqx_types:client_id()) -> maybe(emqx_types:stats())).
|
|
|
+-spec(get_chan_stats(emqx_types:clientid()) -> maybe(emqx_types:stats())).
|
|
|
get_chan_stats(ClientId) ->
|
|
|
with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end).
|
|
|
|
|
|
--spec(get_chan_stats(emqx_types:client_id(), chan_pid()) -> maybe(emqx_types:stats())).
|
|
|
+-spec(get_chan_stats(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:stats())).
|
|
|
get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
|
Chan = {ClientId, ChanPid},
|
|
|
emqx_tables:lookup_value(?CHAN_STATS_TAB, Chan);
|
|
|
@@ -151,23 +151,23 @@ get_chan_stats(ClientId, ChanPid) ->
|
|
|
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
|
|
|
|
|
|
%% @doc Set channel's stats.
|
|
|
--spec(set_chan_stats(emqx_types:client_id(), emqx_types:stats()) -> ok).
|
|
|
+-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> ok).
|
|
|
set_chan_stats(ClientId, Stats) when is_binary(ClientId) ->
|
|
|
set_chan_stats(ClientId, self(), Stats).
|
|
|
|
|
|
--spec(set_chan_stats(emqx_types:client_id(), chan_pid(), emqx_types:stats()) -> ok).
|
|
|
+-spec(set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats()) -> ok).
|
|
|
set_chan_stats(ClientId, ChanPid, Stats) ->
|
|
|
Chan = {ClientId, ChanPid},
|
|
|
true = ets:insert(?CHAN_STATS_TAB, {Chan, Stats}),
|
|
|
ok.
|
|
|
|
|
|
%% @doc Open a session.
|
|
|
--spec(open_session(boolean(), emqx_types:client_info(), emqx_types:conninfo())
|
|
|
+-spec(open_session(boolean(), emqx_types:clientinfo(), emqx_types:conninfo())
|
|
|
-> {ok, #{session := emqx_session:session(),
|
|
|
present := boolean(),
|
|
|
pendings => list()}}
|
|
|
| {error, Reason :: term()}).
|
|
|
-open_session(true, ClientInfo = #{client_id := ClientId}, ConnInfo) ->
|
|
|
+open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
|
|
CleanStart = fun(_) ->
|
|
|
ok = discard_session(ClientId),
|
|
|
Session = emqx_session:init(ClientInfo, ConnInfo),
|
|
|
@@ -175,7 +175,7 @@ open_session(true, ClientInfo = #{client_id := ClientId}, ConnInfo) ->
|
|
|
end,
|
|
|
emqx_cm_locker:trans(ClientId, CleanStart);
|
|
|
|
|
|
-open_session(false, ClientInfo = #{client_id := ClientId}, ConnInfo) ->
|
|
|
+open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
|
|
ResumeStart = fun(_) ->
|
|
|
case takeover_session(ClientId) of
|
|
|
{ok, ConnMod, ChanPid, Session} ->
|
|
|
@@ -192,7 +192,7 @@ open_session(false, ClientInfo = #{client_id := ClientId}, ConnInfo) ->
|
|
|
emqx_cm_locker:trans(ClientId, ResumeStart).
|
|
|
|
|
|
%% @doc Try to takeover a session.
|
|
|
--spec(takeover_session(emqx_types:client_id())
|
|
|
+-spec(takeover_session(emqx_types:clientid())
|
|
|
-> {ok, emqx_session:session()} | {error, Reason :: term()}).
|
|
|
takeover_session(ClientId) ->
|
|
|
case lookup_channels(ClientId) of
|
|
|
@@ -221,7 +221,7 @@ takeover_session(ClientId, ChanPid) ->
|
|
|
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]).
|
|
|
|
|
|
%% @doc Discard all the sessions identified by the ClientId.
|
|
|
--spec(discard_session(emqx_types:client_id()) -> ok).
|
|
|
+-spec(discard_session(emqx_types:clientid()) -> ok).
|
|
|
discard_session(ClientId) when is_binary(ClientId) ->
|
|
|
case lookup_channels(ClientId) of
|
|
|
[] -> ok;
|
|
|
@@ -259,12 +259,12 @@ with_channel(ClientId, Fun) ->
|
|
|
end.
|
|
|
|
|
|
%% @doc Lookup channels.
|
|
|
--spec(lookup_channels(emqx_types:client_id()) -> list(chan_pid())).
|
|
|
+-spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())).
|
|
|
lookup_channels(ClientId) ->
|
|
|
lookup_channels(global, ClientId).
|
|
|
|
|
|
%% @doc Lookup local or global channels.
|
|
|
--spec(lookup_channels(local | global, emqx_types:client_id()) -> list(chan_pid())).
|
|
|
+-spec(lookup_channels(local | global, emqx_types:clientid()) -> list(chan_pid())).
|
|
|
lookup_channels(global, ClientId) ->
|
|
|
case emqx_cm_registry:is_enabled() of
|
|
|
true ->
|