|
|
@@ -20,220 +20,152 @@
|
|
|
|
|
|
-include("emqx.hrl").
|
|
|
|
|
|
-%% Mnesia Callbacks
|
|
|
--export([mnesia/1]).
|
|
|
+-export([start_link/1]).
|
|
|
|
|
|
--boot_mnesia({mnesia, [boot]}).
|
|
|
--copy_mnesia({mnesia, [copy]}).
|
|
|
+-export([open_session/1, lookup_session/1, close_session/1]).
|
|
|
+-export([resume_session/1, discard_session/1]).
|
|
|
+-export([register_session/1, unregister_session/2]).
|
|
|
|
|
|
-%% API Function Exports
|
|
|
--export([start_link/2]).
|
|
|
-
|
|
|
--export([open_session/1, start_session/2, lookup_session/1, register_session/3,
|
|
|
- unregister_session/1, unregister_session/2]).
|
|
|
+%% lock_session/1, create_session/1, unlock_session/1,
|
|
|
|
|
|
-export([dispatch/3]).
|
|
|
|
|
|
--export([local_sessions/0]).
|
|
|
-
|
|
|
-%% gen_server Function Exports
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
terminate/2, code_change/3]).
|
|
|
|
|
|
--record(state, {pool, id, monitors}).
|
|
|
-
|
|
|
--define(POOL, ?MODULE).
|
|
|
-
|
|
|
--define(TIMEOUT, 120000).
|
|
|
-
|
|
|
--define(LOG(Level, Format, Args, Session),
|
|
|
- lager:Level("SM(~s): " ++ Format, [Session#session.client_id | Args])).
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Mnesia callbacks
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-mnesia(boot) ->
|
|
|
- %% Global Session Table
|
|
|
- ok = ekka_mnesia:create_table(session, [
|
|
|
- {type, set},
|
|
|
- {ram_copies, [node()]},
|
|
|
- {record_name, mqtt_session},
|
|
|
- {attributes, record_info(fields, mqtt_session)}]);
|
|
|
+-record(state, {stats_fun, stats_timer, monitors = #{}}).
|
|
|
|
|
|
-mnesia(copy) ->
|
|
|
- ok = ekka_mnesia:copy_table(mqtt_session).
|
|
|
+-spec(start_link(StatsFun :: fun()) -> {ok, pid()} | ignore | {error, term()}).
|
|
|
+start_link(StatsFun) ->
|
|
|
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []).
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% API
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-%% Open a clean start session.
|
|
|
-open_session(Session = #{client_id := ClientId, clean_start := true, expiry_interval := Interval}) ->
|
|
|
+open_session(Session = #{client_id := ClientId, clean_start := true}) ->
|
|
|
with_lock(ClientId,
|
|
|
fun() ->
|
|
|
- {ResL, BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, discard_session, [ClientId]),
|
|
|
- io:format("ResL: ~p, BadNodes: ~p~n", [ResL, BadNodes]),
|
|
|
- case Interval > 0 of
|
|
|
- true ->
|
|
|
- {ok, emqx_session_sup:start_session_process(Session)};
|
|
|
- false ->
|
|
|
- {ok, emqx_session:init_state(Session)}
|
|
|
- end
|
|
|
- end).
|
|
|
-
|
|
|
-open_session(Session = #{client_id := ClientId, clean_start := false, expiry_interval := Interval}) ->
|
|
|
+ case rpc:multicall(ekka:nodelist(), ?MODULE, discard_session, [ClientId]) of
|
|
|
+ {_Res, []} -> ok;
|
|
|
+ {_Res, BadNodes} -> emqx_log:error("[SM] Bad nodes found when lock a session: ~p", [BadNodes])
|
|
|
+ end,
|
|
|
+ {ok, emqx_session_sup:start_session(Session)}
|
|
|
+ end);
|
|
|
+
|
|
|
+open_session(Session = #{client_id := ClientId, clean_start := false}) ->
|
|
|
with_lock(ClientId,
|
|
|
fun() ->
|
|
|
- {ResL, BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, lookup_session, [ClientId]),
|
|
|
- [SessionPid | _] = lists:flatten(ResL),
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+ {ResL, _BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, lookup_session, [ClientId]),
|
|
|
+ case lists:flatten([Pid || Pid <- ResL, Pid =/= undefined]) of
|
|
|
+ [] ->
|
|
|
+ {ok, emqx_session_sup:start_session(Session)};
|
|
|
+ [SessPid|_] ->
|
|
|
+ case resume_session(SessPid) of
|
|
|
+ ok -> {ok, SessPid};
|
|
|
+ {error, Reason} ->
|
|
|
+ emqx_log:error("[SM] Failed to resume session: ~p, ~p", [Session, Reason]),
|
|
|
+ {ok, emqx_session_sup:start_session(Session)}
|
|
|
+ end
|
|
|
+ end
|
|
|
end).
|
|
|
|
|
|
-lookup_session(ClientId) ->
|
|
|
- ets:lookup(session, ClientId).
|
|
|
+resume_session(SessPid) when node(SessPid) == node() ->
|
|
|
+ case is_process_alive(SessPid) of
|
|
|
+ true ->
|
|
|
+ emqx_session:resume(SessPid, self());
|
|
|
+ false ->
|
|
|
+ emqx_log:error("Cannot resume ~p which seems already dead!", [SessPid]),
|
|
|
+ {error, session_died}
|
|
|
+ end;
|
|
|
+
|
|
|
+resume_session(SessPid) ->
|
|
|
+ case rpc:call(node(SessPid), emqx_session, resume, [SessPid]) of
|
|
|
+ ok -> {ok, SessPid};
|
|
|
+ {badrpc, Reason} ->
|
|
|
+ {error, Reason};
|
|
|
+ {error, Reason} ->
|
|
|
+ {error, Reason}
|
|
|
+ end.
|
|
|
|
|
|
+discard_session(ClientId) ->
|
|
|
+ case lookup_session(ClientId) of
|
|
|
+ undefined -> ok;
|
|
|
+ Pid -> emqx_session:discard(Pid)
|
|
|
+ end.
|
|
|
|
|
|
lookup_session(ClientId) ->
|
|
|
- ets:lookup(session, ClientId).
|
|
|
-
|
|
|
-with_lock(undefined, Fun) ->
|
|
|
- Fun();
|
|
|
+ try ets:lookup_element(session, ClientId, 2) catch error:badarg -> undefined end.
|
|
|
|
|
|
+close_session(SessPid) ->
|
|
|
+ emqx_session:close(SessPid).
|
|
|
|
|
|
with_lock(ClientId, Fun) ->
|
|
|
case emqx_sm_locker:lock(ClientId) of
|
|
|
- true -> Result = Fun(),
|
|
|
- ok = emqx_sm_locker:unlock(ClientId),
|
|
|
- Result;
|
|
|
- false -> {error, client_id_unavailable}
|
|
|
- end.
|
|
|
-
|
|
|
-%% @doc Start a session manager
|
|
|
--spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
|
|
|
-start_link(Pool, Id) ->
|
|
|
- gen_server:start_link(?MODULE, [Pool, Id], []).
|
|
|
-
|
|
|
-%% @doc Start a session
|
|
|
--spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, term()}).
|
|
|
-start_session(CleanSess, {ClientId, Username}) ->
|
|
|
- SM = gproc_pool:pick_worker(?POOL, ClientId),
|
|
|
- call(SM, {start_session, CleanSess, {ClientId, Username}, self()}).
|
|
|
-
|
|
|
-%% @doc Lookup a Session
|
|
|
--spec(lookup_session(binary()) -> mqtt_session() | undefined).
|
|
|
-lookup_session(ClientId) ->
|
|
|
- case mnesia:dirty_read(mqtt_session, ClientId) of
|
|
|
- [Session] -> Session;
|
|
|
- [] -> undefined
|
|
|
+ true -> Result = Fun(),
|
|
|
+ emqx_sm_locker:unlock(ClientId),
|
|
|
+ Result;
|
|
|
+ false -> {error, client_id_unavailable};
|
|
|
+ {error, Reason} -> {error, Reason}
|
|
|
end.
|
|
|
|
|
|
-%% @doc Register a session with info.
|
|
|
--spec(register_session(binary(), boolean(), [tuple()]) -> true).
|
|
|
-register_session(ClientId, CleanSess, Properties) ->
|
|
|
- ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}).
|
|
|
-
|
|
|
-%% @doc Unregister a Session.
|
|
|
--spec(unregister_session(binary()) -> boolean()).
|
|
|
-unregister_session(ClientId) ->
|
|
|
- unregister_session(ClientId, self()).
|
|
|
+-spec(register_session(client_id()) -> true).
|
|
|
+register_session(ClientId) ->
|
|
|
+ ets:insert(session, {ClientId, self()}).
|
|
|
|
|
|
unregister_session(ClientId, Pid) ->
|
|
|
- case ets:lookup(mqtt_local_session, ClientId) of
|
|
|
- [LocalSess = {_, Pid, _, _}] ->
|
|
|
- emqx_stats:del_session_stats(ClientId),
|
|
|
- ets:delete_object(mqtt_local_session, LocalSess);
|
|
|
+ case ets:lookup(session, ClientId) of
|
|
|
+ [{_, Pid}] ->
|
|
|
+ ets:delete_object(session, {ClientId, Pid});
|
|
|
_ ->
|
|
|
false
|
|
|
end.
|
|
|
|
|
|
dispatch(ClientId, Topic, Msg) ->
|
|
|
- try ets:lookup_element(mqtt_local_session, ClientId, 2) of
|
|
|
- Pid -> Pid ! {dispatch, Topic, Msg}
|
|
|
- catch
|
|
|
- error:badarg ->
|
|
|
- emqx_hooks:run('message.dropped', [ClientId, Msg]),
|
|
|
- ok %%TODO: How??
|
|
|
+ case lookup_session(ClientId) of
|
|
|
+ Pid when is_pid(Pid) ->
|
|
|
+ Pid ! {dispatch, Topic, Msg};
|
|
|
+ undefined ->
|
|
|
+ emqx_hooks:run('message.dropped', [ClientId, Msg])
|
|
|
end.
|
|
|
|
|
|
-call(SM, Req) ->
|
|
|
- gen_server:call(SM, Req, ?TIMEOUT). %%infinity).
|
|
|
-
|
|
|
-%% @doc for debug.
|
|
|
-local_sessions() ->
|
|
|
- ets:tab2list(mqtt_local_session).
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
-%% gen_server Callbacks
|
|
|
+%% gen_server callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-init([Pool, Id]) ->
|
|
|
- gproc_pool:connect_worker(Pool, {Pool, Id}),
|
|
|
- {ok, #state{pool = Pool, id = Id, monitors = dict:new()}}.
|
|
|
-
|
|
|
-%% Persistent Session
|
|
|
-handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, State) ->
|
|
|
- case lookup_session(ClientId) of
|
|
|
- undefined ->
|
|
|
- %% Create session locally
|
|
|
- create_session({false, {ClientId, Username}, ClientPid}, State);
|
|
|
- Session ->
|
|
|
- case resume_session(Session, ClientPid) of
|
|
|
- {ok, SessPid} ->
|
|
|
- {reply, {ok, SessPid, true}, State};
|
|
|
- {error, Erorr} ->
|
|
|
- {reply, {error, Erorr}, State}
|
|
|
- end
|
|
|
- end;
|
|
|
-
|
|
|
-%% Transient Session
|
|
|
-handle_call({start_session, true, {ClientId, Username}, ClientPid}, _From, State) ->
|
|
|
- Client = {true, {ClientId, Username}, ClientPid},
|
|
|
- case lookup_session(ClientId) of
|
|
|
- undefined ->
|
|
|
- create_session(Client, State);
|
|
|
- Session ->
|
|
|
- case destroy_session(Session) of
|
|
|
- ok ->
|
|
|
- create_session(Client, State);
|
|
|
- {error, Error} ->
|
|
|
- {reply, {error, Error}, State}
|
|
|
- end
|
|
|
- end;
|
|
|
+init([StatsFun]) ->
|
|
|
+ {ok, TRef} = timer:send_interval(timer:seconds(1), stats),
|
|
|
+ {ok, #state{stats_fun = StatsFun, stats_timer = TRef}}.
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|
|
|
- lager:error("[MQTT-SM] Unexpected Request: ~p", [Req]),
|
|
|
+ emqx_log:error("[SM] Unexpected request: ~p", [Req]),
|
|
|
{reply, ignore, State}.
|
|
|
|
|
|
+handle_cast({monitor_session, SessionPid, ClientId},
|
|
|
+ State = #state{monitors = Monitors}) ->
|
|
|
+ MRef = erlang:monitor(process, SessionPid),
|
|
|
+ {noreply, State#state{monitors = maps:put(MRef, ClientId, Monitors)}};
|
|
|
+
|
|
|
handle_cast(Msg, State) ->
|
|
|
- lager:error("[MQTT-SM] Unexpected Message: ~p", [Msg]),
|
|
|
+ emqx_log:error("[SM] Unexpected msg: ~p", [Msg]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
|
|
- case dict:find(MRef, State#state.monitors) of
|
|
|
- {ok, ClientId} ->
|
|
|
- case mnesia:dirty_read({mqtt_session, ClientId}) of
|
|
|
- [] ->
|
|
|
- ok;
|
|
|
- [Sess = #mqtt_session{sess_pid = DownPid}] ->
|
|
|
- mnesia:dirty_delete_object(Sess);
|
|
|
- [_Sess] ->
|
|
|
- ok
|
|
|
- end,
|
|
|
- {noreply, erase_monitor(MRef, State), hibernate};
|
|
|
+handle_info(stats, State) ->
|
|
|
+ {noreply, setstats(State), hibernate};
|
|
|
+
|
|
|
+handle_info({'DOWN', MRef, process, DownPid, _Reason},
|
|
|
+ State = #state{monitors = Monitors}) ->
|
|
|
+ case maps:find(MRef, Monitors) of
|
|
|
+ {ok, {ClientId, Pid}} ->
|
|
|
+ ets:delete_object(session, {ClientId, Pid}),
|
|
|
+ {noreply, setstats(State#state{monitors = maps:remove(MRef, Monitors)})};
|
|
|
error ->
|
|
|
- lager:error("MRef of session ~p not found", [DownPid]),
|
|
|
+ emqx_log:error("session ~p not found", [DownPid]),
|
|
|
{noreply, State}
|
|
|
end;
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
- lager:error("[MQTT-SM] Unexpected Info: ~p", [Info]),
|
|
|
+ emqx_log:error("[SM] Unexpected info: ~p", [Info]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
|
|
- gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
|
|
+terminate(_Reason, _State = #state{stats_timer = TRef}) ->
|
|
|
+ timer:cancel(TRef).
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
@@ -242,100 +174,6 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%% Internal functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-%% Create Session Locally
|
|
|
-create_session({CleanSess, {ClientId, Username}, ClientPid}, State) ->
|
|
|
- case create_session(CleanSess, {ClientId, Username}, ClientPid) of
|
|
|
- {ok, SessPid} ->
|
|
|
- {reply, {ok, SessPid, false},
|
|
|
- monitor_session(ClientId, SessPid, State)};
|
|
|
- {error, Error} ->
|
|
|
- {reply, {error, Error}, State}
|
|
|
- end.
|
|
|
-
|
|
|
-create_session(CleanSess, {ClientId, Username}, ClientPid) ->
|
|
|
- case emqx_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of
|
|
|
- {ok, SessPid} ->
|
|
|
- Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, clean_sess = CleanSess},
|
|
|
- case insert_session(Session) of
|
|
|
- {aborted, {conflict, ConflictPid}} ->
|
|
|
- %% Conflict with othe node?
|
|
|
- lager:error("SM(~s): Conflict with ~p", [ClientId, ConflictPid]),
|
|
|
- {error, mnesia_conflict};
|
|
|
- {atomic, ok} ->
|
|
|
- {ok, SessPid}
|
|
|
- end;
|
|
|
- {error, Error} ->
|
|
|
- {error, Error}
|
|
|
- end.
|
|
|
-
|
|
|
-insert_session(Session = #mqtt_session{client_id = ClientId}) ->
|
|
|
- mnesia:transaction(
|
|
|
- fun() ->
|
|
|
- case mnesia:wread({mqtt_session, ClientId}) of
|
|
|
- [] ->
|
|
|
- mnesia:write(mqtt_session, Session, write);
|
|
|
- [#mqtt_session{sess_pid = SessPid}] ->
|
|
|
- mnesia:abort({conflict, SessPid})
|
|
|
- end
|
|
|
- end).
|
|
|
-
|
|
|
-%% Local node
|
|
|
-resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid)
|
|
|
- when node(SessPid) =:= node() ->
|
|
|
-
|
|
|
- case is_process_alive(SessPid) of
|
|
|
- true ->
|
|
|
- emqx_session:resume(SessPid, ClientId, ClientPid),
|
|
|
- {ok, SessPid};
|
|
|
- false ->
|
|
|
- ?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session),
|
|
|
- {error, session_died}
|
|
|
- end;
|
|
|
-
|
|
|
-%% Remote node
|
|
|
-resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) ->
|
|
|
- Node = node(SessPid),
|
|
|
- case rpc:call(Node, emqx_session, resume, [SessPid, ClientId, ClientPid]) of
|
|
|
- ok ->
|
|
|
- {ok, SessPid};
|
|
|
- {badrpc, nodedown} ->
|
|
|
- ?LOG(error, "Session died for node '~s' down", [Node], Session),
|
|
|
- remove_session(Session),
|
|
|
- {error, session_nodedown};
|
|
|
- {badrpc, Reason} ->
|
|
|
- ?LOG(error, "Failed to resume from node ~s for ~p", [Node, Reason], Session),
|
|
|
- {error, Reason}
|
|
|
- end.
|
|
|
-
|
|
|
-%% Local node
|
|
|
-destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid})
|
|
|
- when node(SessPid) =:= node() ->
|
|
|
- emqx_session:destroy(SessPid, ClientId),
|
|
|
- remove_session(Session);
|
|
|
-
|
|
|
-%% Remote node
|
|
|
-destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) ->
|
|
|
- Node = node(SessPid),
|
|
|
- case rpc:call(Node, emqx_session, destroy, [SessPid, ClientId]) of
|
|
|
- ok ->
|
|
|
- remove_session(Session);
|
|
|
- {badrpc, nodedown} ->
|
|
|
- ?LOG(error, "Node '~s' down", [Node], Session),
|
|
|
- remove_session(Session);
|
|
|
- {badrpc, Reason} ->
|
|
|
- ?LOG(error, "Failed to destory ~p on remote node ~p for ~s",
|
|
|
- [SessPid, Node, Reason], Session),
|
|
|
- {error, Reason}
|
|
|
- end.
|
|
|
-
|
|
|
-remove_session(Session) ->
|
|
|
- mnesia:dirty_delete_object(Session).
|
|
|
-
|
|
|
-monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
|
|
|
- MRef = erlang:monitor(process, SessPid),
|
|
|
- State#state{monitors = dict:store(MRef, ClientId, Monitors)}.
|
|
|
-
|
|
|
-erase_monitor(MRef, State = #state{monitors = Monitors}) ->
|
|
|
- erlang:demonitor(MRef, [flush]),
|
|
|
- State#state{monitors = dict:erase(MRef, Monitors)}.
|
|
|
+setstats(State = #state{stats_fun = StatsFun}) ->
|
|
|
+ StatsFun(ets:info(session, size)), State.
|
|
|
|