Przeglądaj źródła

fix(channel): update the calls to emqx_session APIs

Shawn 4 lat temu
rodzic
commit
9f7f5070b2

+ 7 - 1
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -452,7 +452,8 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
     }.
     }.
 
 
 test_columns('message.dropped') ->
 test_columns('message.dropped') ->
-    test_columns('message.publish');
+    [ {<<"reason">>, <<"no_subscribers">>}
+    ] ++ test_columns('message.publish');
 test_columns('message.publish') ->
 test_columns('message.publish') ->
     [ {<<"clientid">>, <<"c_emqx">>}
     [ {<<"clientid">>, <<"c_emqx">>}
     , {<<"username">>, <<"u_emqx">>}
     , {<<"username">>, <<"u_emqx">>}
@@ -471,6 +472,9 @@ test_columns('message.delivered') ->
     , {<<"qos">>, 1}
     , {<<"qos">>, 1}
     , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
     , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
     ];
     ];
+test_columns('delivery.dropped') ->
+    [ {<<"reason">>, <<"queue_full">>}
+    ] ++ test_columns('message.delivered');
 test_columns('client.connected') ->
 test_columns('client.connected') ->
     [ {<<"clientid">>, <<"c_emqx">>}
     [ {<<"clientid">>, <<"c_emqx">>}
     , {<<"username">>, <<"u_emqx">>}
     , {<<"username">>, <<"u_emqx">>}
@@ -657,6 +661,7 @@ event_name(<<"$events/session_unsubscribed", _/binary>>) ->
 event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
 event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
 event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
 event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
 event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
 event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
+event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
 event_name(_) -> 'message.publish'.
 event_name(_) -> 'message.publish'.
 
 
 event_topic('client.connected') -> <<"$events/client_connected">>;
 event_topic('client.connected') -> <<"$events/client_connected">>;
@@ -666,6 +671,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
 event_topic('message.delivered') -> <<"$events/message_delivered">>;
 event_topic('message.delivered') -> <<"$events/message_delivered">>;
 event_topic('message.acked') -> <<"$events/message_acked">>;
 event_topic('message.acked') -> <<"$events/message_acked">>;
 event_topic('message.dropped') -> <<"$events/message_dropped">>;
 event_topic('message.dropped') -> <<"$events/message_dropped">>;
+event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
 event_topic('message.publish') -> <<"$events/message_publish">>.
 event_topic('message.publish') -> <<"$events/message_publish">>.
 
 
 printable_maps(undefined) -> #{};
 printable_maps(undefined) -> #{};

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -101,7 +101,7 @@ do_apply_rule(#rule{id = RuleId,
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
         true ->
         true ->
             Collection2 = filter_collection(Input, InCase, DoEach, Collection),
             Collection2 = filter_collection(Input, InCase, DoEach, Collection),
-            case Collection2 of 
+            case Collection2 of
                 [] -> emqx_rule_metrics:inc_rules_no_result(RuleId);
                 [] -> emqx_rule_metrics:inc_rules_no_result(RuleId);
                 _ -> emqx_rule_metrics:inc_rules_passed(RuleId)
                 _ -> emqx_rule_metrics:inc_rules_passed(RuleId)
             end,
             end,

+ 4 - 2
apps/emqx_sn/src/emqx_sn_gateway.erl

@@ -408,7 +408,8 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)},
     inc_ping_counter(),
     inc_ping_counter(),
     case ClientIdPing of
     case ClientIdPing of
         ClientId ->
         ClientId ->
-            case emqx_session:replay(emqx_channel:get_session(Channel)) of
+            case emqx_session:replay(emqx_channel:info(clientinfo, Channel),
+                        emqx_channel:get_session(Channel)) of
                 {ok, [], Session0} ->
                 {ok, [], Session0} ->
                     State0 = send_message(?SN_PINGRESP_MSG(), State),
                     State0 = send_message(?SN_PINGRESP_MSG(), State),
                     {keep_state, State0#state{
                     {keep_state, State0#state{
@@ -521,7 +522,8 @@ handle_event(info, {deliver, _Topic, Msg}, asleep,
     % section 6.14, Support of sleeping clients
     % section 6.14, Support of sleeping clients
     ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p",
     ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p",
          [Msg, Pendings]),
          [Msg, Pendings]),
-    Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)),
+    Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
+                Msg, emqx_channel:get_session(Channel)),
     {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
     {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
 
 
 handle_event(info, Deliver = {deliver, _Topic, _Msg}, _StateName,
 handle_event(info, Deliver = {deliver, _Topic, _Msg}, _StateName,

+ 32 - 24
src/emqx_channel.erl

@@ -342,7 +342,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
 
 
 handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
 handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
           = #channel{clientinfo = ClientInfo, session = Session}) ->
           = #channel{clientinfo = ClientInfo, session = Session}) ->
-    case emqx_session:puback(PacketId, Session) of
+    case emqx_session:puback(ClientInfo, PacketId, Session) of
         {ok, Msg, NSession} ->
         {ok, Msg, NSession} ->
             ok = after_message_acked(ClientInfo, Msg, Properties),
             ok = after_message_acked(ClientInfo, Msg, Properties),
             {ok, Channel#channel{session = NSession}};
             {ok, Channel#channel{session = NSession}};
@@ -387,8 +387,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
             handle_out(pubcomp, {PacketId, RC}, Channel)
             handle_out(pubcomp, {PacketId, RC}, Channel)
     end;
     end;
 
 
-handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
-    case emqx_session:pubcomp(PacketId, Session) of
+handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{
+        clientinfo = ClientInfo, session = Session}) ->
+    case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
         {ok, NSession} ->
         {ok, NSession} ->
             {ok, Channel#channel{session = NSession}};
             {ok, Channel#channel{session = NSession}};
         {ok, Publishes, NSession} ->
         {ok, Publishes, NSession} ->
@@ -720,27 +721,33 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
 
 
 -spec(handle_deliver(list(emqx_types:deliver()), channel())
 -spec(handle_deliver(list(emqx_types:deliver()), channel())
       -> {ok, channel()} | {ok, replies(), channel()}).
       -> {ok, channel()} | {ok, replies(), channel()}).
-handle_deliver(Delivers, Channel = #channel{takeover = true,
-                                            pendings = Pendings,
-                                            session = Session,
-                                            clientinfo = #{clientid := ClientId}}) ->
+handle_deliver(Delivers, Channel = #channel{
+        takeover = true,
+        pendings = Pendings,
+        session = Session,
+        clientinfo = #{clientid := ClientId} = ClientInfo}) ->
     %% NOTE: Order is important here. While the takeover is in
     %% NOTE: Order is important here. While the takeover is in
     %% progress, the session cannot enqueue messages, since it already
     %% progress, the session cannot enqueue messages, since it already
     %% passed on the queue to the new connection in the session state.
     %% passed on the queue to the new connection in the session state.
-    NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
+    NPendings = lists:append(Pendings,
+        ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)),
     {ok, Channel#channel{pendings = NPendings}};
     {ok, Channel#channel{pendings = NPendings}};
 
 
-handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
-                                            takeover   = false,
-                                            session    = Session,
-                                            clientinfo = #{clientid := ClientId}}) ->
-    NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
+handle_deliver(Delivers, Channel = #channel{
+        conn_state = disconnected,
+        takeover   = false,
+        session    = Session,
+        clientinfo = #{clientid := ClientId} = ClientInfo}) ->
+    NSession = emqx_session:enqueue(ClientInfo,
+        ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session),
     {ok, Channel#channel{session = NSession}};
     {ok, Channel#channel{session = NSession}};
 
 
-handle_deliver(Delivers, Channel = #channel{session = Session,
-                                            takeover = false,
-                                            clientinfo = #{clientid := ClientId}}) ->
-    case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
+handle_deliver(Delivers, Channel = #channel{
+        session = Session,
+        takeover = false,
+        clientinfo = #{clientid := ClientId} = ClientInfo}) ->
+    case emqx_session:deliver(ClientInfo,
+            ignore_local(ClientInfo, Delivers, ClientId, Session), Session) of
         {ok, Publishes, NSession} ->
         {ok, Publishes, NSession} ->
             NChannel = Channel#channel{session = NSession},
             NChannel = Channel#channel{session = NSession},
             handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
             handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
@@ -748,12 +755,12 @@ handle_deliver(Delivers, Channel = #channel{session = Session,
             {ok, Channel#channel{session = NSession}}
             {ok, Channel#channel{session = NSession}}
     end.
     end.
 
 
-ignore_local(Delivers, Subscriber, Session) ->
+ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
     Subs = emqx_session:info(subscriptions, Session),
     Subs = emqx_session:info(subscriptions, Session),
     lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
     lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
                         case maps:find(Topic, Subs) of
                         case maps:find(Topic, Subs) of
                             {ok, #{nl := 1}} when Subscriber =:= Publisher ->
                             {ok, #{nl := 1}} when Subscriber =:= Publisher ->
-                                ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, no_local]),
+                                ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
                                 ok = emqx_metrics:inc('delivery.dropped'),
                                 ok = emqx_metrics:inc('delivery.dropped'),
                                 ok = emqx_metrics:inc('delivery.dropped.no_local'),
                                 ok = emqx_metrics:inc('delivery.dropped.no_local'),
                                 true;
                                 true;
@@ -1026,8 +1033,8 @@ handle_timeout(_TRef, retry_delivery,
                Channel = #channel{conn_state = disconnected}) ->
                Channel = #channel{conn_state = disconnected}) ->
     {ok, Channel};
     {ok, Channel};
 handle_timeout(_TRef, retry_delivery,
 handle_timeout(_TRef, retry_delivery,
-               Channel = #channel{session = Session}) ->
-    case emqx_session:retry(Session) of
+               Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
+    case emqx_session:retry(ClientInfo, Session) of
         {ok, NSession} ->
         {ok, NSession} ->
             {ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
             {ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
         {ok, Publishes, Timeout, NSession} ->
         {ok, Publishes, Timeout, NSession} ->
@@ -1589,9 +1596,10 @@ maybe_resume_session(#channel{resuming = false}) ->
     ignore;
     ignore;
 maybe_resume_session(#channel{session  = Session,
 maybe_resume_session(#channel{session  = Session,
                               resuming = true,
                               resuming = true,
-                              pendings = Pendings}) ->
-    {ok, Publishes, Session1} = emqx_session:replay(Session),
-    case emqx_session:deliver(Pendings, Session1) of
+                              pendings = Pendings,
+                              clientinfo = ClientInfo}) ->
+    {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
+    case emqx_session:deliver(ClientInfo, Pendings, Session1) of
         {ok, Session2} ->
         {ok, Session2} ->
             {ok, Publishes, Session2};
             {ok, Publishes, Session2};
         {ok, More, Session2} ->
         {ok, More, Session2} ->