|
@@ -156,7 +156,7 @@ handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, Stat
|
|
|
{reply, {ok, SessPid, true}, State};
|
|
{reply, {ok, SessPid, true}, State};
|
|
|
{error, Erorr} ->
|
|
{error, Erorr} ->
|
|
|
{reply, {error, Erorr}, State}
|
|
{reply, {error, Erorr}, State}
|
|
|
- end
|
|
|
|
|
|
|
+ end
|
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
%% Transient Session
|
|
%% Transient Session
|
|
@@ -183,16 +183,14 @@ handle_cast(Msg, State) ->
|
|
|
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
|
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
|
|
case dict:find(MRef, State#state.monitors) of
|
|
case dict:find(MRef, State#state.monitors) of
|
|
|
{ok, ClientId} ->
|
|
{ok, ClientId} ->
|
|
|
- mnesia:transaction(fun() ->
|
|
|
|
|
- case mnesia:wread({mqtt_session, ClientId}) of
|
|
|
|
|
- [] ->
|
|
|
|
|
- ok;
|
|
|
|
|
- [Sess = #mqtt_session{sess_pid = DownPid}] ->
|
|
|
|
|
- mnesia:delete_object(mqtt_session, Sess, write);
|
|
|
|
|
- [_Sess] ->
|
|
|
|
|
- ok
|
|
|
|
|
- end
|
|
|
|
|
- end),
|
|
|
|
|
|
|
+ case mnesia:dirty_read({mqtt_session, ClientId}) of
|
|
|
|
|
+ [] ->
|
|
|
|
|
+ ok;
|
|
|
|
|
+ [Sess = #mqtt_session{sess_pid = DownPid}] ->
|
|
|
|
|
+ mnesia:dirty_delete_object(Sess);
|
|
|
|
|
+ [_Sess] ->
|
|
|
|
|
+ ok
|
|
|
|
|
+ end,
|
|
|
{noreply, erase_monitor(MRef, State), hibernate};
|
|
{noreply, erase_monitor(MRef, State), hibernate};
|
|
|
error ->
|
|
error ->
|
|
|
lager:error("MRef of session ~p not found", [DownPid]),
|
|
lager:error("MRef of session ~p not found", [DownPid]),
|
|
@@ -216,8 +214,7 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
create_session({CleanSess, {ClientId, Username}, ClientPid}, State) ->
|
|
create_session({CleanSess, {ClientId, Username}, ClientPid}, State) ->
|
|
|
case create_session(CleanSess, {ClientId, Username}, ClientPid) of
|
|
case create_session(CleanSess, {ClientId, Username}, ClientPid) of
|
|
|
{ok, SessPid} ->
|
|
{ok, SessPid} ->
|
|
|
- {reply, {ok, SessPid, false},
|
|
|
|
|
- monitor_session(ClientId, SessPid, State)};
|
|
|
|
|
|
|
+ {reply, {ok, SessPid, false}, monitor_session(ClientId, SessPid, State)};
|
|
|
{error, Error} ->
|
|
{error, Error} ->
|
|
|
{reply, {error, Error}, State}
|
|
{reply, {error, Error}, State}
|
|
|
end.
|
|
end.
|
|
@@ -284,15 +281,14 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPi
|
|
|
remove_session(Session);
|
|
remove_session(Session);
|
|
|
|
|
|
|
|
%% Remote node
|
|
%% Remote node
|
|
|
-destroy_session(Session = #mqtt_session{client_id = ClientId,
|
|
|
|
|
- sess_pid = SessPid}) ->
|
|
|
|
|
|
|
+destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) ->
|
|
|
Node = node(SessPid),
|
|
Node = node(SessPid),
|
|
|
case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of
|
|
case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of
|
|
|
ok ->
|
|
ok ->
|
|
|
remove_session(Session);
|
|
remove_session(Session);
|
|
|
{badrpc, nodedown} ->
|
|
{badrpc, nodedown} ->
|
|
|
?LOG(error, "Node '~s' down", [Node], Session),
|
|
?LOG(error, "Node '~s' down", [Node], Session),
|
|
|
- remove_session(Session);
|
|
|
|
|
|
|
+ remove_session(Session);
|
|
|
{badrpc, Reason} ->
|
|
{badrpc, Reason} ->
|
|
|
?LOG(error, "Failed to destory ~p on remote node ~p for ~s",
|
|
?LOG(error, "Failed to destory ~p on remote node ~p for ~s",
|
|
|
[SessPid, Node, Reason], Session),
|
|
[SessPid, Node, Reason], Session),
|
|
@@ -300,15 +296,13 @@ destroy_session(Session = #mqtt_session{client_id = ClientId,
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
remove_session(Session) ->
|
|
remove_session(Session) ->
|
|
|
- case mnesia:transaction(fun mnesia:delete_object/1, [Session]) of
|
|
|
|
|
- {atomic, ok} -> ok;
|
|
|
|
|
- {aborted, Error} -> {error, Error}
|
|
|
|
|
- end.
|
|
|
|
|
|
|
+ mnesia:dirty_delete_object(Session).
|
|
|
|
|
|
|
|
monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
|
|
monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
|
|
|
MRef = erlang:monitor(process, SessPid),
|
|
MRef = erlang:monitor(process, SessPid),
|
|
|
State#state{monitors = dict:store(MRef, ClientId, Monitors)}.
|
|
State#state{monitors = dict:store(MRef, ClientId, Monitors)}.
|
|
|
|
|
|
|
|
erase_monitor(MRef, State = #state{monitors = Monitors}) ->
|
|
erase_monitor(MRef, State = #state{monitors = Monitors}) ->
|
|
|
|
|
+ erlang:demonitor(MRef, [flush]),
|
|
|
State#state{monitors = dict:erase(MRef, Monitors)}.
|
|
State#state{monitors = dict:erase(MRef, Monitors)}.
|
|
|
|
|
|