|
|
@@ -114,11 +114,15 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
|
|
|
|
|
|
on_session_subscribed(ClientInfo, Topic, SubOpts, Env) ->
|
|
|
apply_event('session.subscribed',
|
|
|
- fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env).
|
|
|
+ fun() ->
|
|
|
+ eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts)
|
|
|
+ end, Env).
|
|
|
|
|
|
on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) ->
|
|
|
apply_event('session.unsubscribed',
|
|
|
- fun() -> eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) end, Env).
|
|
|
+ fun() ->
|
|
|
+ eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts)
|
|
|
+ end, Env).
|
|
|
|
|
|
on_message_dropped(Message, _, Reason, Env) ->
|
|
|
case ignore_sys_message(Message) of
|
|
|
@@ -151,7 +155,8 @@ on_message_acked(ClientInfo, Message, Env) ->
|
|
|
%% Event Messages
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-eventmsg_publish(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
|
|
|
+eventmsg_publish(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags,
|
|
|
+ topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
|
|
|
with_basic_columns('message.publish',
|
|
|
#{id => emqx_guid:to_hexstr(Id),
|
|
|
clientid => ClientId,
|
|
|
@@ -236,7 +241,8 @@ eventmsg_sub_or_unsub(Event, _ClientInfo = #{
|
|
|
qos => QoS
|
|
|
}).
|
|
|
|
|
|
-eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}, Reason) ->
|
|
|
+eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags,
|
|
|
+ topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}, Reason) ->
|
|
|
with_basic_columns('message.dropped',
|
|
|
#{id => emqx_guid:to_hexstr(Id),
|
|
|
reason => Reason,
|
|
|
@@ -257,7 +263,9 @@ eventmsg_delivered(_ClientInfo = #{
|
|
|
peerhost := PeerHost,
|
|
|
clientid := ReceiverCId,
|
|
|
username := ReceiverUsername
|
|
|
- }, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
|
|
|
+ }, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags,
|
|
|
+ topic = Topic, headers = Headers, payload = Payload,
|
|
|
+ timestamp = Timestamp}) ->
|
|
|
with_basic_columns('message.delivered',
|
|
|
#{id => emqx_guid:to_hexstr(Id),
|
|
|
from_clientid => ClientId,
|
|
|
@@ -279,7 +287,10 @@ eventmsg_acked(_ClientInfo = #{
|
|
|
peerhost := PeerHost,
|
|
|
clientid := ReceiverCId,
|
|
|
username := ReceiverUsername
|
|
|
- }, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
|
|
|
+ },
|
|
|
+ Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags,
|
|
|
+ topic = Topic, headers = Headers, payload = Payload,
|
|
|
+ timestamp = Timestamp}) ->
|
|
|
with_basic_columns('message.acked',
|
|
|
#{id => emqx_guid:to_hexstr(Id),
|
|
|
from_clientid => ClientId,
|
|
|
@@ -455,37 +466,9 @@ columns_with_exam('message.publish') ->
|
|
|
, {<<"node">>, node()}
|
|
|
];
|
|
|
columns_with_exam('message.delivered') ->
|
|
|
- [ {<<"event">>, 'message.delivered'}
|
|
|
- , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
|
- , {<<"from_clientid">>, <<"c_emqx_1">>}
|
|
|
- , {<<"from_username">>, <<"u_emqx_1">>}
|
|
|
- , {<<"clientid">>, <<"c_emqx_2">>}
|
|
|
- , {<<"username">>, <<"u_emqx_2">>}
|
|
|
- , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
|
- , {<<"peerhost">>, <<"192.168.0.10">>}
|
|
|
- , {<<"topic">>, <<"t/a">>}
|
|
|
- , {<<"qos">>, 1}
|
|
|
- , {<<"flags">>, #{}}
|
|
|
- , {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
|
|
- , {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
|
- , {<<"node">>, node()}
|
|
|
- ];
|
|
|
+ columns_message_ack_delivered('message.delivered');
|
|
|
columns_with_exam('message.acked') ->
|
|
|
- [ {<<"event">>, 'message.acked'}
|
|
|
- , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
|
- , {<<"from_clientid">>, <<"c_emqx_1">>}
|
|
|
- , {<<"from_username">>, <<"u_emqx_1">>}
|
|
|
- , {<<"clientid">>, <<"c_emqx_2">>}
|
|
|
- , {<<"username">>, <<"u_emqx_2">>}
|
|
|
- , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
|
- , {<<"peerhost">>, <<"192.168.0.10">>}
|
|
|
- , {<<"topic">>, <<"t/a">>}
|
|
|
- , {<<"qos">>, 1}
|
|
|
- , {<<"flags">>, #{}}
|
|
|
- , {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
|
|
- , {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
|
- , {<<"node">>, node()}
|
|
|
- ];
|
|
|
+ columns_message_ack_delivered('message.acked');
|
|
|
columns_with_exam('message.dropped') ->
|
|
|
[ {<<"event">>, 'message.dropped'}
|
|
|
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
|
@@ -530,7 +513,12 @@ columns_with_exam('client.disconnected') ->
|
|
|
, {<<"node">>, node()}
|
|
|
];
|
|
|
columns_with_exam('session.subscribed') ->
|
|
|
- [ {<<"event">>, 'session.subscribed'}
|
|
|
+ columns_message_sub_unsub('session.subscribed');
|
|
|
+columns_with_exam('session.unsubscribed') ->
|
|
|
+ columns_message_sub_unsub('session.unsubscribed').
|
|
|
+
|
|
|
+columns_message_sub_unsub(EventName) ->
|
|
|
+ [ {<<"event">>, EventName}
|
|
|
, {<<"clientid">>, <<"c_emqx">>}
|
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
|
@@ -538,14 +526,21 @@ columns_with_exam('session.subscribed') ->
|
|
|
, {<<"qos">>, 1}
|
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
|
, {<<"node">>, node()}
|
|
|
- ];
|
|
|
-columns_with_exam('session.unsubscribed') ->
|
|
|
- [ {<<"event">>, 'session.unsubscribed'}
|
|
|
- , {<<"clientid">>, <<"c_emqx">>}
|
|
|
- , {<<"username">>, <<"u_emqx">>}
|
|
|
+ ].
|
|
|
+
|
|
|
+columns_message_ack_delivered(EventName) ->
|
|
|
+ [ {<<"event">>, EventName}
|
|
|
+ , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
|
+ , {<<"from_clientid">>, <<"c_emqx_1">>}
|
|
|
+ , {<<"from_username">>, <<"u_emqx_1">>}
|
|
|
+ , {<<"clientid">>, <<"c_emqx_2">>}
|
|
|
+ , {<<"username">>, <<"u_emqx_2">>}
|
|
|
+ , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
|
, {<<"topic">>, <<"t/a">>}
|
|
|
, {<<"qos">>, 1}
|
|
|
+ , {<<"flags">>, #{}}
|
|
|
+ , {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
|
, {<<"node">>, node()}
|
|
|
].
|