Просмотр исходного кода

feat(rule): add 'delivery.dropped' hook for rules

Shawn 4 лет назад
Родитель
Сommit
b2f027bcf7
1 измененных файлов с 72 добавлено и 3 удалено
  1. 72 3
      apps/emqx_rule_engine/src/emqx_rule_events.erl

+ 72 - 3
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -39,6 +39,7 @@
         , on_message_dropped/4
         , on_message_delivered/3
         , on_message_acked/3
+        , on_delivery_dropped/4
         , on_bridge_message_received/2
         ]).
 
@@ -63,6 +64,7 @@ event_names() ->
     , 'message.delivered'
     , 'message.acked'
     , 'message.dropped'
+    , 'delivery.dropped'
     ].
 
 reload() ->
@@ -153,6 +155,15 @@ on_message_acked(ClientInfo, Message, Env) ->
     end,
     {ok, Message}.
 
+on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
+    case ignore_sys_message(Message) of
+        true -> ok;
+        false ->
+            apply_event('delivery.dropped',
+                fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Env)
+    end,
+    {ok, Message}.
+
 %%--------------------------------------------------------------------
 %% Event Messages
 %%--------------------------------------------------------------------
@@ -311,6 +322,32 @@ eventmsg_acked(_ClientInfo = #{
           publish_received_at => Timestamp
         }).
 
+eventmsg_delivery_dropped(_ClientInfo = #{
+            peerhost := PeerHost,
+            clientid := ReceiverCId,
+            username := ReceiverUsername
+        },
+        Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic,
+            headers = Headers, payload = Payload, timestamp = Timestamp},
+        Reason) ->
+    with_basic_columns('delivery.dropped',
+        #{id => emqx_guid:to_hexstr(Id),
+        reason => Reason,
+        from_clientid => ClientId,
+        from_username => emqx_message:get_header(username, Message, undefined),
+        clientid => ReceiverCId,
+        username => ReceiverUsername,
+        payload => Payload,
+        peerhost => ntoa(PeerHost),
+        topic => Topic,
+        qos => QoS,
+        flags => Flags,
+        %% the column 'headers' will be removed in the next major release
+        headers => printable_maps(Headers),
+        pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
+        publish_received_at => Timestamp
+        }).
+
 sub_unsub_prop_key('session.subscribed') -> sub_props;
 sub_unsub_prop_key('session.unsubscribed') -> unsub_props.
 
@@ -345,6 +382,7 @@ event_info() ->
     , event_info_client_disconnected()
     , event_info_session_subscribed()
     , event_info_session_unsubscribed()
+    , event_info_delivery_dropped()
     ].
 
 event_info_message_publish() ->
@@ -371,10 +409,18 @@ event_info_message_acked() ->
 event_info_message_dropped() ->
     event_info_common(
         'message.dropped',
-        {<<"message dropped">>, <<"消息丢弃"/utf8>>},
-        {<<"message dropped">>, <<"消息丢弃"/utf8>>},
+        {<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>},
+        {<<"messages are discarded during routing, usually because there are no subscribers">>, <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>},
         <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
     ).
+event_info_delivery_dropped() ->
+    event_info_common(
+        'delivery.dropped',
+        {<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>},
+        {<<"messages are discarded during delivery, i.e. because the message queue is full">>,
+        <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>},
+        <<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">>
+    ).
 event_info_client_connected() ->
     event_info_common(
         'client.connected',
@@ -414,7 +460,8 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
     }.
 
 test_columns('message.dropped') ->
-    test_columns('message.publish');
+    [ {<<"reason">>, <<"no_subscribers">>}
+    ] ++ test_columns('message.publish');
 test_columns('message.publish') ->
     [ {<<"clientid">>, <<"c_emqx">>}
     , {<<"username">>, <<"u_emqx">>}
@@ -422,6 +469,9 @@ test_columns('message.publish') ->
     , {<<"qos">>, 1}
     , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
     ];
+test_columns('delivery.dropped') ->
+    [ {<<"reason">>, <<"queue_full">>}
+    ] ++ test_columns('message.delivered');
 test_columns('message.acked') ->
     test_columns('message.delivered');
 test_columns('message.delivered') ->
@@ -486,6 +536,23 @@ columns_with_exam('message.dropped') ->
     , {<<"timestamp">>, erlang:system_time(millisecond)}
     , {<<"node">>, node()}
     ];
+columns_with_exam('delivery.dropped') ->
+    [ {<<"event">>, 'delivery.dropped'}
+    , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
+    , {<<"reason">>, queue_full}
+    , {<<"from_clientid">>, <<"c_emqx_1">>}
+    , {<<"from_username">>, <<"u_emqx_1">>}
+    , {<<"clientid">>, <<"c_emqx_2">>}
+    , {<<"username">>, <<"u_emqx_2">>}
+    , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
+    , {<<"peerhost">>, <<"192.168.0.10">>}
+    , {<<"topic">>, <<"t/a">>}
+    , {<<"qos">>, 1}
+    , {<<"flags">>, #{}}
+    , {<<"publish_received_at">>, erlang:system_time(millisecond)}
+    , {<<"timestamp">>, erlang:system_time(millisecond)}
+    , {<<"node">>, node()}
+    ];
 columns_with_exam('client.connected') ->
     [ {<<"event">>, 'client.connected'}
     , {<<"clientid">>, <<"c_emqx">>}
@@ -578,6 +645,7 @@ event_name(<<"$events/session_unsubscribed", _/binary>>) ->
 event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
 event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
 event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
+event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
 event_name(_) -> 'message.publish'.
 
 event_topic('client.connected') -> <<"$events/client_connected">>;
@@ -587,6 +655,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
 event_topic('message.delivered') -> <<"$events/message_delivered">>;
 event_topic('message.acked') -> <<"$events/message_acked">>;
 event_topic('message.dropped') -> <<"$events/message_dropped">>;
+event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
 event_topic('message.publish') -> <<"$events/message_publish">>.
 
 printable_maps(undefined) -> #{};