emqx_exhook_handler.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_exhook_handler).
  17. -include("emqx_exhook.hrl").
  18. -include_lib("emqx/include/emqx.hrl").
  19. -include_lib("emqx/include/logger.hrl").
  20. -export([
  21. on_client_connect/2,
  22. on_client_connack/3,
  23. on_client_connected/2,
  24. on_client_disconnected/3,
  25. on_client_authenticate/2,
  26. on_client_authorize/4,
  27. on_client_subscribe/3,
  28. on_client_unsubscribe/3
  29. ]).
  30. %% Session Lifecircle Hooks
  31. -export([
  32. on_session_created/2,
  33. on_session_subscribed/3,
  34. on_session_unsubscribed/3,
  35. on_session_resumed/2,
  36. on_session_discarded/2,
  37. on_session_takenover/2,
  38. on_session_terminated/3
  39. ]).
  40. -export([
  41. on_message_publish/1,
  42. on_message_dropped/3,
  43. on_message_delivered/2,
  44. on_message_acked/2
  45. ]).
  46. %% Utils
  47. -export([
  48. message/1,
  49. headers/1,
  50. stringfy/1,
  51. merge_responsed_bool/2,
  52. merge_responsed_message/2,
  53. assign_to_message/2,
  54. clientinfo/1,
  55. request_meta/0
  56. ]).
  57. -import(
  58. emqx_exhook,
  59. [
  60. cast/2,
  61. call_fold/3
  62. ]
  63. ).
  64. -elvis([{elvis_style, god_modules, disable}]).
  65. %%--------------------------------------------------------------------
  66. %% Clients
  67. %%--------------------------------------------------------------------
  68. on_client_connect(ConnInfo, Props) ->
  69. Req = #{
  70. conninfo => conninfo(ConnInfo),
  71. props => properties(Props)
  72. },
  73. cast('client.connect', Req).
  74. on_client_connack(ConnInfo, Rc, Props) ->
  75. Req = #{
  76. conninfo => conninfo(ConnInfo),
  77. result_code => stringfy(Rc),
  78. props => properties(Props)
  79. },
  80. cast('client.connack', Req).
  81. on_client_connected(ClientInfo, _ConnInfo) ->
  82. Req = #{clientinfo => clientinfo(ClientInfo)},
  83. cast('client.connected', Req).
  84. on_client_disconnected(ClientInfo, Reason, _ConnInfo) ->
  85. Req = #{
  86. clientinfo => clientinfo(ClientInfo),
  87. reason => stringfy(Reason)
  88. },
  89. cast('client.disconnected', Req).
  90. on_client_authenticate(ClientInfo, AuthResult) ->
  91. %% XXX: Bool is missing more information about the atom of the result
  92. %% So, the `Req` has missed detailed info too.
  93. %%
  94. %% The return value of `call_fold` just a bool, that has missed
  95. %% detailed info too.
  96. %%
  97. Bool = AuthResult == ok,
  98. Req = #{
  99. clientinfo => clientinfo(ClientInfo),
  100. result => Bool
  101. },
  102. case
  103. call_fold(
  104. 'client.authenticate',
  105. Req,
  106. fun merge_responsed_bool/2
  107. )
  108. of
  109. {StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
  110. Result =
  111. case Result0 of
  112. true -> ok;
  113. _ -> {error, not_authorized}
  114. end,
  115. {StopOrOk, Result};
  116. _ ->
  117. {ok, AuthResult}
  118. end.
  119. on_client_authorize(ClientInfo, PubSub, Topic, Result) ->
  120. Bool = maps:get(result, Result, deny) == allow,
  121. Type =
  122. case PubSub of
  123. publish -> 'PUBLISH';
  124. subscribe -> 'SUBSCRIBE'
  125. end,
  126. Req = #{
  127. clientinfo => clientinfo(ClientInfo),
  128. type => Type,
  129. topic => Topic,
  130. result => Bool
  131. },
  132. case
  133. call_fold(
  134. 'client.authorize',
  135. Req,
  136. fun merge_responsed_bool/2
  137. )
  138. of
  139. {StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
  140. NResult =
  141. case Result0 of
  142. true -> allow;
  143. _ -> deny
  144. end,
  145. {StopOrOk, #{result => NResult, from => exhook}};
  146. _ ->
  147. {ok, Result}
  148. end.
  149. on_client_subscribe(ClientInfo, Props, TopicFilters) ->
  150. Req = #{
  151. clientinfo => clientinfo(ClientInfo),
  152. props => properties(Props),
  153. topic_filters => topicfilters(TopicFilters)
  154. },
  155. cast('client.subscribe', Req).
  156. on_client_unsubscribe(ClientInfo, Props, TopicFilters) ->
  157. Req = #{
  158. clientinfo => clientinfo(ClientInfo),
  159. props => properties(Props),
  160. topic_filters => topicfilters(TopicFilters)
  161. },
  162. cast('client.unsubscribe', Req).
  163. %%--------------------------------------------------------------------
  164. %% Session
  165. %%--------------------------------------------------------------------
  166. on_session_created(ClientInfo, _SessInfo) ->
  167. Req = #{clientinfo => clientinfo(ClientInfo)},
  168. cast('session.created', Req).
  169. on_session_subscribed(ClientInfo, Topic, SubOpts) ->
  170. Req = #{
  171. clientinfo => clientinfo(ClientInfo),
  172. topic => Topic,
  173. subopts => maps:with([qos, share, rh, rap, nl], SubOpts)
  174. },
  175. cast('session.subscribed', Req).
  176. on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
  177. Req = #{
  178. clientinfo => clientinfo(ClientInfo),
  179. topic => Topic
  180. },
  181. cast('session.unsubscribed', Req).
  182. on_session_resumed(ClientInfo, _SessInfo) ->
  183. Req = #{clientinfo => clientinfo(ClientInfo)},
  184. cast('session.resumed', Req).
  185. on_session_discarded(ClientInfo, _SessInfo) ->
  186. Req = #{clientinfo => clientinfo(ClientInfo)},
  187. cast('session.discarded', Req).
  188. on_session_takenover(ClientInfo, _SessInfo) ->
  189. Req = #{clientinfo => clientinfo(ClientInfo)},
  190. cast('session.takenover', Req).
  191. on_session_terminated(ClientInfo, Reason, _SessInfo) ->
  192. Req = #{
  193. clientinfo => clientinfo(ClientInfo),
  194. reason => stringfy(Reason)
  195. },
  196. cast('session.terminated', Req).
  197. %%--------------------------------------------------------------------
  198. %% Message
  199. %%--------------------------------------------------------------------
  200. on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) ->
  201. ok;
  202. on_message_publish(Message) ->
  203. Req = #{message => message(Message)},
  204. case
  205. call_fold(
  206. 'message.publish',
  207. Req,
  208. fun emqx_exhook_handler:merge_responsed_message/2
  209. )
  210. of
  211. {StopOrOk, #{message := NMessage}} ->
  212. {StopOrOk, assign_to_message(NMessage, Message)};
  213. _ ->
  214. {ok, Message}
  215. end.
  216. on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) ->
  217. ok;
  218. on_message_dropped(Message, _By, Reason) ->
  219. Req = #{
  220. message => message(Message),
  221. reason => stringfy(Reason)
  222. },
  223. cast('message.dropped', Req).
  224. on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
  225. ok;
  226. on_message_delivered(ClientInfo, Message) ->
  227. Req = #{
  228. clientinfo => clientinfo(ClientInfo),
  229. message => message(Message)
  230. },
  231. cast('message.delivered', Req).
  232. on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
  233. ok;
  234. on_message_acked(ClientInfo, Message) ->
  235. Req = #{
  236. clientinfo => clientinfo(ClientInfo),
  237. message => message(Message)
  238. },
  239. cast('message.acked', Req).
  240. %%--------------------------------------------------------------------
  241. %% Types
  242. properties(undefined) ->
  243. [];
  244. properties(M) when is_map(M) ->
  245. maps:fold(
  246. fun(K, V, Acc) ->
  247. [
  248. #{
  249. name => stringfy(K),
  250. value => stringfy(V)
  251. }
  252. | Acc
  253. ]
  254. end,
  255. [],
  256. M
  257. ).
  258. conninfo(
  259. ConnInfo =
  260. #{
  261. clientid := ClientId,
  262. peername := {Peerhost, _},
  263. sockname := {_, SockPort}
  264. }
  265. ) ->
  266. Username = maps:get(username, ConnInfo, undefined),
  267. ProtoName = maps:get(proto_name, ConnInfo, undefined),
  268. ProtoVer = maps:get(proto_ver, ConnInfo, undefined),
  269. Keepalive = maps:get(keepalive, ConnInfo, 0),
  270. #{
  271. node => stringfy(node()),
  272. clientid => ClientId,
  273. username => maybe(Username),
  274. peerhost => ntoa(Peerhost),
  275. sockport => SockPort,
  276. proto_name => ProtoName,
  277. proto_ver => stringfy(ProtoVer),
  278. keepalive => Keepalive
  279. }.
  280. clientinfo(
  281. ClientInfo =
  282. #{
  283. clientid := ClientId,
  284. username := Username,
  285. peerhost := PeerHost,
  286. sockport := SockPort,
  287. protocol := Protocol,
  288. mountpoint := Mountpoiont
  289. }
  290. ) ->
  291. #{
  292. node => stringfy(node()),
  293. clientid => ClientId,
  294. username => maybe(Username),
  295. password => maybe(maps:get(password, ClientInfo, undefined)),
  296. peerhost => ntoa(PeerHost),
  297. sockport => SockPort,
  298. protocol => stringfy(Protocol),
  299. mountpoint => maybe(Mountpoiont),
  300. is_superuser => maps:get(is_superuser, ClientInfo, false),
  301. anonymous => maps:get(anonymous, ClientInfo, true),
  302. cn => maybe(maps:get(cn, ClientInfo, undefined)),
  303. dn => maybe(maps:get(dn, ClientInfo, undefined))
  304. }.
  305. message(#message{
  306. id = Id,
  307. qos = Qos,
  308. from = From,
  309. topic = Topic,
  310. payload = Payload,
  311. timestamp = Ts,
  312. headers = Headers
  313. }) ->
  314. #{
  315. node => stringfy(node()),
  316. id => emqx_guid:to_hexstr(Id),
  317. qos => Qos,
  318. from => stringfy(From),
  319. topic => Topic,
  320. payload => Payload,
  321. timestamp => Ts,
  322. headers => headers(Headers)
  323. }.
  324. headers(Headers) ->
  325. Ls = [username, protocol, peerhost, allow_publish],
  326. maps:fold(
  327. fun
  328. (_, undefined, Acc) ->
  329. %% Ignore undefined value
  330. Acc;
  331. (K, V, Acc) ->
  332. case lists:member(K, Ls) of
  333. true ->
  334. Acc#{atom_to_binary(K) => bin(K, V)};
  335. _ ->
  336. Acc
  337. end
  338. end,
  339. #{},
  340. Headers
  341. ).
  342. bin(K, V) when
  343. K == username;
  344. K == protocol;
  345. K == allow_publish
  346. ->
  347. bin(V);
  348. bin(peerhost, V) ->
  349. bin(inet:ntoa(V)).
  350. bin(V) when is_binary(V) -> V;
  351. bin(V) when is_atom(V) -> atom_to_binary(V);
  352. bin(V) when is_list(V) -> iolist_to_binary(V).
  353. assign_to_message(
  354. InMessage = #{
  355. qos := Qos,
  356. topic := Topic,
  357. payload := Payload
  358. },
  359. Message
  360. ) ->
  361. NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
  362. enrich_header(maps:get(headers, InMessage, #{}), NMsg).
  363. enrich_header(Headers, Message) ->
  364. case maps:get(<<"allow_publish">>, Headers, undefined) of
  365. <<"false">> ->
  366. emqx_message:set_header(allow_publish, false, Message);
  367. <<"true">> ->
  368. emqx_message:set_header(allow_publish, true, Message);
  369. _ ->
  370. Message
  371. end.
  372. topicfilters(Tfs) when is_list(Tfs) ->
  373. [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
  374. ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
  375. list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
  376. ntoa(IP) ->
  377. list_to_binary(inet_parse:ntoa(IP)).
  378. maybe(undefined) -> <<>>;
  379. maybe(B) -> B.
  380. %% @private
  381. stringfy(Term) when is_binary(Term) ->
  382. Term;
  383. stringfy(Term) when is_integer(Term) ->
  384. integer_to_binary(Term);
  385. stringfy(Term) when is_atom(Term) ->
  386. atom_to_binary(Term, utf8);
  387. stringfy(Term) ->
  388. unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
  389. %%--------------------------------------------------------------------
  390. %% Acc funcs
  391. %% see exhook.proto
  392. merge_responsed_bool(_Req, #{type := 'IGNORE'}) ->
  393. ignore;
  394. merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) when
  395. is_boolean(NewBool)
  396. ->
  397. {ret(Type), Req#{result => NewBool}};
  398. merge_responsed_bool(_Req, Resp) ->
  399. ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}),
  400. ignore.
  401. merge_responsed_message(_Req, #{type := 'IGNORE'}) ->
  402. ignore;
  403. merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
  404. {ret(Type), Req#{message => NMessage}};
  405. merge_responsed_message(_Req, Resp) ->
  406. ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}),
  407. ignore.
  408. ret('CONTINUE') -> ok;
  409. ret('STOP_AND_RETURN') -> stop.
  410. request_meta() ->
  411. #{
  412. node => stringfy(node()),
  413. version => emqx_sys:version(),
  414. sysdescr => emqx_sys:sysdescr(),
  415. cluster_name => emqx_sys:cluster_name()
  416. }.