Explorar o código

refactor(session): allow peeking at mqueue less intrusively

Andrew Mayorov %!s(int64=2) %!d(string=hai) anos
pai
achega
e1e4c64a30

+ 3 - 6
apps/emqx/src/emqx_channel.erl

@@ -151,7 +151,7 @@
 info(Channel) ->
     maps:from_list(info(?INFO_KEYS, Channel)).
 
--spec info(list(atom()) | atom(), channel()) -> term().
+-spec info(list(atom()) | atom() | tuple(), channel()) -> term().
 info(Keys, Channel) when is_list(Keys) ->
     [{Key, info(Key, Channel)} || Key <- Keys];
 info(conninfo, #channel{conninfo = ConnInfo}) ->
@@ -180,6 +180,8 @@ info(username, #channel{clientinfo = ClientInfo}) ->
     maps:get(username, ClientInfo, undefined);
 info(session, #channel{session = Session}) ->
     maybe_apply(fun emqx_session:info/1, Session);
+info({session, Info}, #channel{session = Session}) ->
+    maybe_apply(fun(S) -> emqx_session:info(Info, S) end, Session);
 info(conn_state, #channel{conn_state = ConnState}) ->
     ConnState;
 info(keepalive, #channel{keepalive = Keepalive}) ->
@@ -1195,8 +1197,6 @@ handle_call(
     ChanInfo1 = info(NChannel),
     emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
     reply(ok, reset_timer(alive_timer, NChannel));
-handle_call(get_mqueue, Channel) ->
-    reply({ok, get_mqueue(Channel)}, Channel);
 handle_call(Req, Channel) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     reply(ignored, Channel).
@@ -2240,6 +2240,3 @@ get_mqtt_conf(Zone, Key, Default) ->
 set_field(Name, Value, Channel) ->
     Pos = emqx_utils:index_of(Name, record_info(fields, channel)),
     setelement(Pos + 1, Channel, Value).
-
-get_mqueue(#channel{session = Session}) ->
-    emqx_session:get_mqueue(Session).

+ 6 - 4
apps/emqx/src/emqx_connection.erl

@@ -44,6 +44,7 @@
 
 -export([
     info/1,
+    info/2,
     stats/1
 ]).
 
@@ -221,11 +222,10 @@ info(CPid) when is_pid(CPid) ->
     call(CPid, info);
 info(State = #state{channel = Channel}) ->
     ChanInfo = emqx_channel:info(Channel),
-    SockInfo = maps:from_list(
-        info(?INFO_KEYS, State)
-    ),
+    SockInfo = maps:from_list(info(?INFO_KEYS, State)),
     ChanInfo#{sockinfo => SockInfo}.
 
+-spec info([atom()] | atom() | tuple(), pid() | state()) -> term().
 info(Keys, State) when is_list(Keys) ->
     [{Key, info(Key, State)} || Key <- Keys];
 info(socktype, #state{transport = Transport, socket = Socket}) ->
@@ -241,7 +241,9 @@ info(stats_timer, #state{stats_timer = StatsTimer}) ->
 info(limiter, #state{limiter = Limiter}) ->
     Limiter;
 info(limiter_timer, #state{limiter_timer = Timer}) ->
-    Timer.
+    Timer;
+info({channel, Info}, #state{channel = Channel}) ->
+    emqx_channel:info(Info, Channel).
 
 %% @doc Get stats of the connection/channel.
 -spec stats(pid() | state()) -> emqx_types:stats().

+ 1 - 5
apps/emqx/src/emqx_session.erl

@@ -65,8 +65,7 @@
     info/1,
     info/2,
     stats/1,
-    obtain_next_pkt_id/1,
-    get_mqueue/1
+    obtain_next_pkt_id/1
 ]).
 
 -export([
@@ -955,6 +954,3 @@ age(Now, Ts) -> Now - Ts.
 set_field(Name, Value, Session) ->
     Pos = emqx_utils:index_of(Name, record_info(fields, session)),
     setelement(Pos + 1, Session, Value).
-
-get_mqueue(#session{mqueue = Q}) ->
-    emqx_mqueue:to_list(Q).

+ 5 - 2
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -758,13 +758,16 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
 
     {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1),
     ct:sleep(100),
-    {ok, Msgs1} = gen_server:call(Pid1, get_mqueue),
-    {ok, Msgs2} = gen_server:call(Pid2, get_mqueue),
+    Msgs1 = emqx_mqueue:to_list(get_mqueue(Pid1)),
+    Msgs2 = emqx_mqueue:to_list(get_mqueue(Pid2)),
     %% assert the message is in mqueue (because socket is closed)
     ?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2),
     emqtt:stop(ConnPub),
     ok.
 
+get_mqueue(ConnPid) ->
+    emqx_connection:info({channel, {session, mqueue}}, sys:get_state(ConnPid)).
+
 %% No ack, QoS 2 subscriptions,
 %% client1 receives one message, send pubrec, then suspend
 %% client2 acts normal (auto_ack=true)