|
|
@@ -144,6 +144,7 @@ init_per_testcase(t_events, Config) ->
|
|
|
"\"$events/message_acked\", "
|
|
|
"\"$events/message_delivered\", "
|
|
|
"\"$events/message_dropped\", "
|
|
|
+ "\"$events/delivery_dropped\", "
|
|
|
"\"t1\"",
|
|
|
{ok, Rule} = emqx_rule_engine:create_rule(
|
|
|
#{id => <<"rule:t_events">>,
|
|
|
@@ -322,18 +323,20 @@ t_events(_Config) ->
|
|
|
]),
|
|
|
ct:pal("====== verify $events/client_connected"),
|
|
|
client_connected(Client, Client2),
|
|
|
+ ct:pal("====== verify $events/message_dropped"),
|
|
|
+ message_dropped(Client),
|
|
|
ct:pal("====== verify $events/session_subscribed"),
|
|
|
session_subscribed(Client2),
|
|
|
ct:pal("====== verify t1"),
|
|
|
message_publish(Client),
|
|
|
+ ct:pal("====== verify $events/delivery_dropped"),
|
|
|
+ delivery_dropped(Client),
|
|
|
ct:pal("====== verify $events/message_delivered"),
|
|
|
message_delivered(Client),
|
|
|
ct:pal("====== verify $events/message_acked"),
|
|
|
message_acked(Client),
|
|
|
ct:pal("====== verify $events/session_unsubscribed"),
|
|
|
session_unsubscribed(Client2),
|
|
|
- ct:pal("====== verify $events/message_dropped"),
|
|
|
- message_dropped(Client),
|
|
|
ct:pal("====== verify $events/client_disconnected"),
|
|
|
client_disconnected(Client, Client2),
|
|
|
ok.
|
|
|
@@ -365,6 +368,15 @@ session_unsubscribed(Client2) ->
|
|
|
message_delivered(_Client) ->
|
|
|
verify_event('message.delivered'),
|
|
|
ok.
|
|
|
+delivery_dropped(Client) ->
|
|
|
+ %% subscribe "t1" and then publish to "t1", the message will not be received by itself
|
|
|
+ %% because we have set the subscribe flag 'nl' = true
|
|
|
+ {ok, _, _} = emqtt:subscribe(Client, #{}, <<"t1">>, [{nl, true}, {qos, 1}]),
|
|
|
+ ct:sleep(50),
|
|
|
+ message_publish(Client),
|
|
|
+ ct:pal("--- current emqx hooks: ~p", [ets:tab2list(emqx_hooks)]),
|
|
|
+ verify_event('delivery.dropped'),
|
|
|
+ ok.
|
|
|
message_dropped(Client) ->
|
|
|
message_publish(Client),
|
|
|
verify_event('message.dropped'),
|
|
|
@@ -1490,6 +1502,45 @@ verify_event_fields(SubUnsub, Fields) when SubUnsub == 'session.subscribed'
|
|
|
maps:get(PropKey, Fields)),
|
|
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000);
|
|
|
|
|
|
+verify_event_fields('delivery.dropped', Fields) ->
|
|
|
+ #{event := 'delivery.dropped',
|
|
|
+ id := ID,
|
|
|
+ metadata := #{rule_id := RuleId},
|
|
|
+ reason := Reason,
|
|
|
+ clientid := ClientId,
|
|
|
+ username := Username,
|
|
|
+ from_clientid := FromClientId,
|
|
|
+ from_username := FromUsername,
|
|
|
+ node := Node,
|
|
|
+ payload := Payload,
|
|
|
+ peerhost := PeerHost,
|
|
|
+ pub_props := Properties,
|
|
|
+ publish_received_at := EventAt,
|
|
|
+ qos := QoS,
|
|
|
+ flags := Flags,
|
|
|
+ timestamp := Timestamp,
|
|
|
+ topic := Topic} = Fields,
|
|
|
+ Now = erlang:system_time(millisecond),
|
|
|
+ TimestampElapse = Now - Timestamp,
|
|
|
+ RcvdAtElapse = Now - EventAt,
|
|
|
+ ?assert(is_binary(ID)),
|
|
|
+ ?assertEqual(<<"rule:t_events">>, RuleId),
|
|
|
+ ?assertEqual(no_local, Reason),
|
|
|
+ ?assertEqual(node(), Node),
|
|
|
+ ?assertEqual(<<"c_event">>, ClientId),
|
|
|
+ ?assertEqual(<<"u_event">>, Username),
|
|
|
+ ?assertEqual(<<"c_event">>, FromClientId),
|
|
|
+ ?assertEqual(<<"u_event">>, FromUsername),
|
|
|
+ ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
|
|
|
+ verify_ipaddr(PeerHost),
|
|
|
+ ?assertEqual(<<"t1">>, Topic),
|
|
|
+ ?assertEqual(1, QoS),
|
|
|
+ ?assert(is_map(Flags)),
|
|
|
+ ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
|
|
+ ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
|
|
|
+ ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
|
|
+ ?assert(EventAt =< Timestamp);
|
|
|
+
|
|
|
verify_event_fields('message.dropped', Fields) ->
|
|
|
#{id := ID,
|
|
|
reason := Reason,
|