emqx_event_message.erl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_event_message).
  17. -include_lib("emqx/include/emqx.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include("emqx_modules.hrl").
  20. -export([ list/0
  21. , update/1
  22. , enable/0
  23. , disable/0
  24. ]).
  25. -export([ on_client_connected/2
  26. , on_client_disconnected/3
  27. , on_client_subscribed/3
  28. , on_client_unsubscribed/3
  29. , on_message_dropped/3
  30. , on_message_delivered/2
  31. , on_message_acked/2
  32. ]).
  33. -ifdef(TEST).
  34. -export([reason/1]).
  35. -endif.
  36. list() ->
  37. emqx_conf:get([event_message], #{}).
  38. update(Params) ->
  39. disable(),
  40. case emqx_conf:update([event_message],
  41. Params,
  42. #{rawconf_with_defaults => true, override_to => cluster}) of
  43. {ok, #{raw_config := NewEventMessage}} ->
  44. enable(),
  45. {ok, NewEventMessage};
  46. {error, Reason} ->
  47. {error, Reason}
  48. end.
  49. enable() ->
  50. lists:foreach(fun({_Topic, false}) -> ok;
  51. ({Topic, true}) ->
  52. case Topic of
  53. client_connected ->
  54. emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []});
  55. client_disconnected ->
  56. emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []});
  57. client_subscribed ->
  58. emqx_hooks:put('session.subscribed', {?MODULE, on_client_subscribed, []});
  59. client_unsubscribed ->
  60. emqx_hooks:put('session.unsubscribed', {?MODULE, on_client_unsubscribed, []});
  61. message_delivered ->
  62. emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []});
  63. message_acked ->
  64. emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []});
  65. message_dropped ->
  66. emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []});
  67. _ ->
  68. ok
  69. end
  70. end, maps:to_list(list())).
  71. disable() ->
  72. lists:foreach(fun({_Topic, false}) -> ok;
  73. ({Topic, true}) ->
  74. case Topic of
  75. client_connected ->
  76. emqx_hooks:del('client.connected', {?MODULE, on_client_connected});
  77. client_disconnected ->
  78. emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected});
  79. client_subscribed ->
  80. emqx_hooks:del('session.subscribed', {?MODULE, on_client_subscribed});
  81. client_unsubscribed ->
  82. emqx_hooks:del('session.unsubscribed', {?MODULE, on_client_unsubscribed});
  83. message_delivered ->
  84. emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered});
  85. message_acked ->
  86. emqx_hooks:del('message.acked', {?MODULE, on_message_acked});
  87. message_dropped ->
  88. emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped});
  89. _ ->
  90. ok
  91. end
  92. end, maps:to_list(list())).
  93. %%--------------------------------------------------------------------
  94. %% Callbacks
  95. %%--------------------------------------------------------------------
  96. on_client_connected(ClientInfo, ConnInfo) ->
  97. Payload0 = common_infos(ClientInfo, ConnInfo),
  98. Payload = Payload0#{
  99. keepalive => maps:get(keepalive, ConnInfo, 0),
  100. clean_start => maps:get(clean_start, ConnInfo, true),
  101. expiry_interval => maps:get(expiry_interval, ConnInfo, 0),
  102. connected_at => maps:get(connected_at, ConnInfo)
  103. },
  104. publish_event_msg(<<"$event/client_connected">>, Payload).
  105. on_client_disconnected(ClientInfo,
  106. Reason, ConnInfo = #{disconnected_at := DisconnectedAt}) ->
  107. Payload0 = common_infos(ClientInfo, ConnInfo),
  108. Payload = Payload0#{
  109. reason => reason(Reason),
  110. disconnected_at => DisconnectedAt
  111. },
  112. publish_event_msg(<<"$event/client_disconnected">>, Payload).
  113. on_client_subscribed(_ClientInfo = #{clientid := ClientId,
  114. username := Username},
  115. Topic, SubOpts) ->
  116. Payload = #{clientid => ClientId,
  117. username => Username,
  118. topic => Topic,
  119. subopts => SubOpts,
  120. ts => erlang:system_time(millisecond)
  121. },
  122. publish_event_msg(<<"$event/client_subscribed">>, Payload).
  123. on_client_unsubscribed(_ClientInfo = #{clientid := ClientId,
  124. username := Username},
  125. Topic, _SubOpts) ->
  126. Payload = #{clientid => ClientId,
  127. username => Username,
  128. topic => Topic,
  129. ts => erlang:system_time(millisecond)
  130. },
  131. publish_event_msg(<<"$event/client_unsubscribed">>, Payload).
  132. on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
  133. case ignore_sys_message(Message) of
  134. true -> ok;
  135. false ->
  136. Payload0 = base_message(Message),
  137. Payload = Payload0#{
  138. reason => Reason,
  139. clientid => ClientId,
  140. username => emqx_message:get_header(username, Message, undefined),
  141. peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined))
  142. },
  143. publish_event_msg(<<"$event/message_dropped">>, Payload)
  144. end,
  145. {ok, Message}.
  146. on_message_delivered(_ClientInfo = #{
  147. peerhost := PeerHost,
  148. clientid := ReceiverCId,
  149. username := ReceiverUsername},
  150. #message{from = ClientId} = Message) ->
  151. case ignore_sys_message(Message) of
  152. true -> ok;
  153. false ->
  154. Payload0 = base_message(Message),
  155. Payload = Payload0#{
  156. from_clientid => ClientId,
  157. from_username => emqx_message:get_header(username, Message, undefined),
  158. clientid => ReceiverCId,
  159. username => ReceiverUsername,
  160. peerhost => ntoa(PeerHost)
  161. },
  162. publish_event_msg(<<"$event/message_delivered">>, Payload)
  163. end,
  164. {ok, Message}.
  165. on_message_acked(_ClientInfo = #{
  166. peerhost := PeerHost,
  167. clientid := ReceiverCId,
  168. username := ReceiverUsername},
  169. #message{from = ClientId} = Message) ->
  170. case ignore_sys_message(Message) of
  171. true -> ok;
  172. false ->
  173. Payload0 = base_message(Message),
  174. Payload = Payload0#{
  175. from_clientid => ClientId,
  176. from_username => emqx_message:get_header(username, Message, undefined),
  177. clientid => ReceiverCId,
  178. username => ReceiverUsername,
  179. peerhost => ntoa(PeerHost)
  180. },
  181. publish_event_msg(<<"$event/message_acked">>, Payload)
  182. end,
  183. {ok, Message}.
  184. %%--------------------------------------------------------------------
  185. %% Helper functions
  186. %%--------------------------------------------------------------------
  187. common_infos(
  188. _ClientInfo = #{clientid := ClientId,
  189. username := Username,
  190. peerhost := PeerHost,
  191. sockport := SockPort
  192. },
  193. _ConnInfo = #{proto_name := ProtoName,
  194. proto_ver := ProtoVer
  195. }) ->
  196. #{clientid => ClientId,
  197. username => Username,
  198. ipaddress => ntoa(PeerHost),
  199. sockport => SockPort,
  200. proto_name => ProtoName,
  201. proto_ver => ProtoVer
  202. }.
  203. make_msg(Topic, Payload) ->
  204. emqx_message:set_flag(
  205. sys, emqx_message:make(
  206. ?MODULE, 0, Topic, iolist_to_binary(Payload))).
  207. -compile({inline, [reason/1]}).
  208. reason(Reason) when is_atom(Reason) -> Reason;
  209. reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
  210. reason({Error, _}) when is_atom(Error) -> Error;
  211. reason(_) -> internal_error.
  212. ntoa(undefined) -> undefined;
  213. ntoa({IpAddr, Port}) ->
  214. iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
  215. ntoa(IpAddr) ->
  216. iolist_to_binary(inet:ntoa(IpAddr)).
  217. printable_maps(undefined) -> #{};
  218. printable_maps(Headers) ->
  219. maps:fold(
  220. fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname ->
  221. AccIn#{K => ntoa(V0)};
  222. ('User-Property', V0, AccIn) when is_list(V0) ->
  223. AccIn#{
  224. 'User-Property' => maps:from_list(V0),
  225. 'User-Property-Pairs' => [#{
  226. key => Key,
  227. value => Value
  228. } || {Key, Value} <- V0]
  229. };
  230. (K, V0, AccIn) -> AccIn#{K => V0}
  231. end, #{}, Headers).
  232. base_message(Message) ->
  233. #message{
  234. id = Id,
  235. qos = QoS,
  236. flags = Flags,
  237. topic = Topic,
  238. headers = Headers,
  239. payload = Payload,
  240. timestamp = Timestamp} = Message,
  241. #{
  242. id => emqx_guid:to_hexstr(Id),
  243. payload => Payload,
  244. topic => Topic,
  245. qos => QoS,
  246. flags => Flags,
  247. headers => printable_maps(Headers),
  248. pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
  249. publish_received_at => Timestamp
  250. }.
  251. ignore_sys_message(#message{flags = Flags}) ->
  252. maps:get(sys, Flags, false).
  253. publish_event_msg(Topic, Payload) ->
  254. _ = emqx_broker:safe_publish(make_msg(Topic, emqx_json:encode(Payload))),
  255. ok.