emqx_exhook_handler.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_exhook_handler).
  17. -include_lib("emqx/include/emqx.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include_lib("emqx/include/emqx_access_control.hrl").
  20. -export([
  21. on_client_connect/2,
  22. on_client_connack/3,
  23. on_client_connected/2,
  24. on_client_disconnected/3,
  25. on_client_authenticate/2,
  26. on_client_authorize/4,
  27. on_client_subscribe/3,
  28. on_client_unsubscribe/3
  29. ]).
  30. %% Session Lifecircle Hooks
  31. -export([
  32. on_session_created/2,
  33. on_session_subscribed/3,
  34. on_session_unsubscribed/3,
  35. on_session_resumed/2,
  36. on_session_discarded/2,
  37. on_session_takenover/2,
  38. on_session_terminated/3
  39. ]).
  40. -export([
  41. on_message_publish/1,
  42. on_message_dropped/3,
  43. on_message_delivered/2,
  44. on_message_acked/2
  45. ]).
  46. %% Utils
  47. -export([
  48. message/1,
  49. headers/1,
  50. stringfy/1,
  51. merge_responsed_bool/2,
  52. merge_responsed_message/2,
  53. assign_to_message/2,
  54. clientinfo/1,
  55. request_meta/0
  56. ]).
  57. -import(
  58. emqx_exhook,
  59. [
  60. cast/2,
  61. call_fold/3
  62. ]
  63. ).
  64. -elvis([{elvis_style, god_modules, disable}]).
  65. %%--------------------------------------------------------------------
  66. %% Clients
  67. %%--------------------------------------------------------------------
  68. on_client_connect(ConnInfo, Props) ->
  69. Req = #{
  70. conninfo => conninfo(ConnInfo),
  71. props => properties(Props)
  72. },
  73. cast('client.connect', Req).
  74. on_client_connack(ConnInfo, Rc, Props) ->
  75. Req = #{
  76. conninfo => conninfo(ConnInfo),
  77. result_code => stringfy(Rc),
  78. props => properties(Props)
  79. },
  80. cast('client.connack', Req).
  81. on_client_connected(ClientInfo, _ConnInfo) ->
  82. Req = #{clientinfo => clientinfo(ClientInfo)},
  83. cast('client.connected', Req).
  84. on_client_disconnected(ClientInfo, Reason, _ConnInfo) ->
  85. Req = #{
  86. clientinfo => clientinfo(ClientInfo),
  87. reason => stringfy(Reason)
  88. },
  89. cast('client.disconnected', Req).
  90. on_client_authenticate(ClientInfo, AuthResult) ->
  91. %% XXX: Bool is missing more information about the atom of the result
  92. %% So, the `Req` has missed detailed info too.
  93. %%
  94. %% The return value of `call_fold` just a bool, that has missed
  95. %% detailed info too.
  96. %%
  97. Bool = AuthResult == ok,
  98. Req = #{
  99. clientinfo => clientinfo(ClientInfo),
  100. result => Bool
  101. },
  102. case
  103. call_fold(
  104. 'client.authenticate',
  105. Req,
  106. fun merge_responsed_bool/2
  107. )
  108. of
  109. {StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
  110. Result =
  111. case Result0 of
  112. true -> ok;
  113. _ -> {error, not_authorized}
  114. end,
  115. {StopOrOk, Result};
  116. _ ->
  117. {ok, AuthResult}
  118. end.
  119. on_client_authorize(ClientInfo, Action, Topic, Result) ->
  120. Bool = maps:get(result, Result, deny) == allow,
  121. %% TODO: Support full action in major release
  122. Type =
  123. case Action of
  124. ?authz_action(publish) -> 'PUBLISH';
  125. ?authz_action(subscribe) -> 'SUBSCRIBE'
  126. end,
  127. Req = #{
  128. clientinfo => clientinfo(ClientInfo),
  129. type => Type,
  130. topic => emqx_topic:get_shared_real_topic(Topic),
  131. result => Bool
  132. },
  133. case
  134. call_fold(
  135. 'client.authorize',
  136. Req,
  137. fun merge_responsed_bool/2
  138. )
  139. of
  140. {StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
  141. NResult =
  142. case Result0 of
  143. true -> allow;
  144. _ -> deny
  145. end,
  146. {StopOrOk, #{result => NResult, from => exhook}};
  147. _ ->
  148. {ok, Result}
  149. end.
  150. on_client_subscribe(ClientInfo, Props, TopicFilters) ->
  151. Req = #{
  152. clientinfo => clientinfo(ClientInfo),
  153. props => properties(Props),
  154. topic_filters => topicfilters(TopicFilters)
  155. },
  156. cast('client.subscribe', Req).
  157. on_client_unsubscribe(ClientInfo, Props, TopicFilters) ->
  158. Req = #{
  159. clientinfo => clientinfo(ClientInfo),
  160. props => properties(Props),
  161. topic_filters => topicfilters(TopicFilters)
  162. },
  163. cast('client.unsubscribe', Req).
  164. %%--------------------------------------------------------------------
  165. %% Session
  166. %%--------------------------------------------------------------------
  167. on_session_created(ClientInfo, _SessInfo) ->
  168. Req = #{clientinfo => clientinfo(ClientInfo)},
  169. cast('session.created', Req).
  170. on_session_subscribed(ClientInfo, Topic, SubOpts) ->
  171. Req = #{
  172. clientinfo => clientinfo(ClientInfo),
  173. topic => emqx_topic:maybe_format_share(Topic),
  174. subopts => subopts(SubOpts)
  175. },
  176. cast('session.subscribed', Req).
  177. on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
  178. Req = #{
  179. clientinfo => clientinfo(ClientInfo),
  180. topic => emqx_topic:maybe_format_share(Topic)
  181. %% no subopts when unsub
  182. },
  183. cast('session.unsubscribed', Req).
  184. on_session_resumed(ClientInfo, _SessInfo) ->
  185. Req = #{clientinfo => clientinfo(ClientInfo)},
  186. cast('session.resumed', Req).
  187. on_session_discarded(ClientInfo, _SessInfo) ->
  188. Req = #{clientinfo => clientinfo(ClientInfo)},
  189. cast('session.discarded', Req).
  190. on_session_takenover(ClientInfo, _SessInfo) ->
  191. Req = #{clientinfo => clientinfo(ClientInfo)},
  192. cast('session.takenover', Req).
  193. on_session_terminated(ClientInfo, Reason, _SessInfo) ->
  194. Req = #{
  195. clientinfo => clientinfo(ClientInfo),
  196. reason => stringfy(Reason)
  197. },
  198. cast('session.terminated', Req).
  199. %%--------------------------------------------------------------------
  200. %% Message
  201. %%--------------------------------------------------------------------
  202. on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) ->
  203. ok;
  204. on_message_publish(Message) ->
  205. Req = #{message => message(Message)},
  206. case
  207. call_fold(
  208. 'message.publish',
  209. Req,
  210. fun emqx_exhook_handler:merge_responsed_message/2
  211. )
  212. of
  213. {StopOrOk, #{message := NMessage}} ->
  214. {StopOrOk, assign_to_message(NMessage, Message)};
  215. _ ->
  216. {ok, Message}
  217. end.
  218. on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) ->
  219. ok;
  220. on_message_dropped(Message, _By, Reason) ->
  221. Req = #{
  222. message => message(Message),
  223. reason => stringfy(Reason)
  224. },
  225. cast('message.dropped', Req).
  226. on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
  227. ok;
  228. on_message_delivered(ClientInfo, Message) ->
  229. Req = #{
  230. clientinfo => clientinfo(ClientInfo),
  231. message => message(Message)
  232. },
  233. cast('message.delivered', Req).
  234. on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
  235. ok;
  236. on_message_acked(ClientInfo, Message) ->
  237. Req = #{
  238. clientinfo => clientinfo(ClientInfo),
  239. message => message(Message)
  240. },
  241. cast('message.acked', Req).
  242. %%--------------------------------------------------------------------
  243. %% Types
  244. properties(undefined) ->
  245. [];
  246. properties(M) when is_map(M) ->
  247. maps:fold(
  248. fun(K, V, Acc) ->
  249. [
  250. #{
  251. name => stringfy(K),
  252. value => stringfy(V)
  253. }
  254. | Acc
  255. ]
  256. end,
  257. [],
  258. M
  259. ).
  260. conninfo(
  261. ConnInfo =
  262. #{
  263. clientid := ClientId,
  264. peername := {Peerhost, PeerPort},
  265. sockname := {_, SockPort}
  266. }
  267. ) ->
  268. Username = maps:get(username, ConnInfo, undefined),
  269. ProtoName = maps:get(proto_name, ConnInfo, undefined),
  270. ProtoVer = maps:get(proto_ver, ConnInfo, undefined),
  271. Keepalive = maps:get(keepalive, ConnInfo, 0),
  272. #{
  273. node => stringfy(node()),
  274. clientid => ClientId,
  275. username => maybe(Username),
  276. peerhost => ntoa(Peerhost),
  277. peerport => PeerPort,
  278. sockport => SockPort,
  279. proto_name => ProtoName,
  280. proto_ver => stringfy(ProtoVer),
  281. keepalive => Keepalive
  282. }.
  283. clientinfo(
  284. ClientInfo =
  285. #{
  286. clientid := ClientId,
  287. username := Username,
  288. peerhost := PeerHost,
  289. peerport := PeerPort,
  290. sockport := SockPort,
  291. protocol := Protocol,
  292. mountpoint := Mountpoiont
  293. }
  294. ) ->
  295. #{
  296. node => stringfy(node()),
  297. clientid => ClientId,
  298. username => maybe(Username),
  299. password => maybe(maps:get(password, ClientInfo, undefined)),
  300. peerhost => ntoa(PeerHost),
  301. peerport => PeerPort,
  302. sockport => SockPort,
  303. protocol => stringfy(Protocol),
  304. mountpoint => maybe(Mountpoiont),
  305. is_superuser => maps:get(is_superuser, ClientInfo, false),
  306. anonymous => maps:get(anonymous, ClientInfo, true),
  307. cn => maybe(maps:get(cn, ClientInfo, undefined)),
  308. dn => maybe(maps:get(dn, ClientInfo, undefined))
  309. }.
  310. message(#message{
  311. id = Id,
  312. qos = Qos,
  313. from = From,
  314. topic = Topic,
  315. payload = Payload,
  316. timestamp = Ts,
  317. headers = Headers
  318. }) ->
  319. #{
  320. node => stringfy(node()),
  321. id => emqx_guid:to_hexstr(Id),
  322. qos => Qos,
  323. from => stringfy(From),
  324. topic => Topic,
  325. payload => Payload,
  326. timestamp => Ts,
  327. headers => headers(Headers)
  328. }.
  329. headers(Headers) ->
  330. Ls = [username, protocol, peerhost, allow_publish],
  331. maps:fold(
  332. fun
  333. (_, undefined, Acc) ->
  334. %% Ignore undefined value
  335. Acc;
  336. (K, V, Acc) ->
  337. case lists:member(K, Ls) of
  338. true ->
  339. Acc#{atom_to_binary(K) => bin(K, V)};
  340. _ ->
  341. Acc
  342. end
  343. end,
  344. #{},
  345. Headers
  346. ).
  347. bin(K, V) when
  348. K == username;
  349. K == protocol;
  350. K == allow_publish
  351. ->
  352. bin(V);
  353. bin(peerhost, V) ->
  354. bin(inet:ntoa(V)).
  355. bin(V) when is_binary(V) -> V;
  356. bin(V) when is_atom(V) -> atom_to_binary(V);
  357. bin(V) when is_list(V) -> iolist_to_binary(V).
  358. assign_to_message(
  359. InMessage = #{
  360. qos := Qos,
  361. topic := Topic,
  362. payload := Payload
  363. },
  364. Message
  365. ) ->
  366. NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
  367. enrich_header(maps:get(headers, InMessage, #{}), NMsg).
  368. enrich_header(Headers, Message) ->
  369. case maps:get(<<"allow_publish">>, Headers, undefined) of
  370. <<"false">> ->
  371. emqx_message:set_header(allow_publish, false, Message);
  372. <<"true">> ->
  373. emqx_message:set_header(allow_publish, true, Message);
  374. _ ->
  375. Message
  376. end.
  377. topicfilters(Tfs) when is_list(Tfs) ->
  378. [
  379. #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)}
  380. || {Topic, SubOpts} <- Tfs
  381. ].
  382. subopts(SubOpts) ->
  383. #{
  384. qos => maps:get(qos, SubOpts, 0),
  385. rh => maps:get(rh, SubOpts, 0),
  386. rap => maps:get(rap, SubOpts, 0),
  387. nl => maps:get(nl, SubOpts, 0)
  388. }.
  389. ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
  390. list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
  391. ntoa(IP) ->
  392. list_to_binary(inet_parse:ntoa(IP)).
  393. maybe(undefined) -> <<>>;
  394. maybe(B) -> B.
  395. %% @private
  396. stringfy(Term) when is_binary(Term) ->
  397. Term;
  398. stringfy(Term) when is_integer(Term) ->
  399. integer_to_binary(Term);
  400. stringfy(Term) when is_atom(Term) ->
  401. atom_to_binary(Term, utf8);
  402. stringfy(Term) ->
  403. unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
  404. %%--------------------------------------------------------------------
  405. %% Acc funcs
  406. %% see exhook.proto
  407. merge_responsed_bool(_Req, #{type := 'IGNORE'}) ->
  408. ignore;
  409. merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) when
  410. is_boolean(NewBool)
  411. ->
  412. {ret(Type), Req#{result => NewBool}};
  413. merge_responsed_bool(_Req, Resp) ->
  414. ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}),
  415. ignore.
  416. merge_responsed_message(_Req, #{type := 'IGNORE'}) ->
  417. ignore;
  418. merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
  419. {ret(Type), Req#{message => NMessage}};
  420. merge_responsed_message(_Req, Resp) ->
  421. ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}),
  422. ignore.
  423. ret('CONTINUE') -> ok;
  424. ret('STOP_AND_RETURN') -> stop.
  425. request_meta() ->
  426. #{
  427. node => stringfy(node()),
  428. version => emqx_sys:version(),
  429. sysdescr => emqx_sys:sysdescr(),
  430. cluster_name => emqx_sys:cluster_name()
  431. }.