Browse Source

feat(channel): Pass clientinfo and all unknown messages to session

ieQu1 1 year ago
parent
commit
c64cce5e5a

+ 1 - 4
apps/emqx/include/emqx_session.hrl

@@ -22,9 +22,6 @@
 
 %% (Erlang) messages that a connection process should forward to the
 %% session handler.
--record(session_message, {
-    message :: term()
-}).
--define(session_message(MSG), #session_message{message = MSG}).
+-define(session_message(MSG), MSG).
 
 -endif.

+ 3 - 6
apps/emqx/src/emqx_channel.erl

@@ -1365,12 +1365,9 @@ handle_info({'DOWN', Ref, process, Pid, Reason}, Channel) ->
         [] -> {ok, Channel};
         Msgs -> {ok, Msgs, Channel}
     end;
-handle_info(?session_message(Message), #channel{session = Session} = Channel) ->
-    NSession = emqx_session:handle_info(Message, Session),
-    {ok, Channel#channel{session = NSession}};
-handle_info(Info, Channel) ->
-    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
-    {ok, Channel}.
+handle_info(Info, Channel = #channel{session = Session0, clientinfo = ClientInfo}) ->
+    Session = emqx_session:handle_info(Info, Session0, ClientInfo),
+    {ok, Channel#channel{session = Session}}.
 
 -ifdef(TEST).
 

+ 9 - 4
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -64,7 +64,7 @@
     deliver/3,
     replay/3,
     handle_timeout/3,
-    handle_info/2,
+    handle_info/3,
     disconnect/2,
     terminate/2
 ]).
@@ -661,10 +661,15 @@ handle_timeout(_ClientInfo, Timeout, Session) ->
 %% Generic messages
 %%--------------------------------------------------------------------
 
--spec handle_info(term(), session()) -> session().
-handle_info(?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := SharedSubS0}) ->
+-spec handle_info(term(), session(), clientinfo()) -> session().
+handle_info(
+    ?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := SharedSubS0}, _ClientInfo
+) ->
     {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_info(S0, SharedSubS0, Msg),
-    Session#{s => S, shared_sub_s => SharedSubS}.
+    Session#{s => S, shared_sub_s => SharedSubS};
+handle_info(Msg, Session, _ClientInfo) ->
+    ?SLOG(warning, #{msg => emqx_session_ds_unknown_message, message => Msg}),
+    Session.
 
 %%--------------------------------------------------------------------
 %% Shared subscription outgoing messages

+ 5 - 5
apps/emqx/src/emqx_session.erl

@@ -83,7 +83,7 @@
 
 -export([
     deliver/3,
-    handle_info/2,
+    handle_info/3,
     handle_timeout/3,
     disconnect/3,
     terminate/3
@@ -192,7 +192,7 @@
 -callback handle_timeout(clientinfo(), common_timer_name() | custom_timer_name(), t()) ->
     {ok, replies(), t()}
     | {ok, replies(), timeout(), t()}.
--callback handle_info(term(), t()) -> t().
+-callback handle_info(term(), t(), clientinfo()) -> t().
 
 %%--------------------------------------------------------------------
 %% Create a Session
@@ -493,9 +493,9 @@ handle_timeout(ClientInfo, Timer, Session) ->
 %% Generic Messages
 %%--------------------------------------------------------------------
 
--spec handle_info(term(), t()) -> t().
-handle_info(Info, Session) ->
-    ?IMPL(Session):handle_info(Info, Session).
+-spec handle_info(term(), t(), clientinfo()) -> t().
+handle_info(Info, Session, ClientInfo) ->
+    ?IMPL(Session):handle_info(Info, Session, ClientInfo).
 
 %%--------------------------------------------------------------------
 

+ 3 - 3
apps/emqx/src/emqx_session_mem.erl

@@ -87,7 +87,7 @@
     deliver/3,
     replay/3,
     handle_timeout/3,
-    handle_info/2,
+    handle_info/3,
     disconnect/2,
     terminate/2
 ]).
@@ -602,8 +602,8 @@ handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
 %% Geneic messages
 %%--------------------------------------------------------------------
 
--spec handle_info(term(), session()) -> session().
-handle_info(Msg, Session) ->
+-spec handle_info(term(), session(), clientinfo()) -> session().
+handle_info(Msg, Session, _ClientInfo) ->
     ?SLOG(warning, #{msg => emqx_session_mem_unknown_message, message => Msg}),
     Session.