emqx_exhook_server.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_exhook_server).
  17. -include("emqx_exhook.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include_lib("emqx/include/emqx_hooks.hrl").
  20. %% The exhook proto version should be fixed as `v2` in EMQX v5.x
  21. %% to make sure the exhook proto version is compatible
  22. -define(PB_CLIENT_MOD, emqx_exhook_v_2_hook_provider_client).
  23. %% Load/Unload
  24. -export([
  25. load/2,
  26. unload/1
  27. ]).
  28. %% APIs
  29. -export([call/3]).
  30. %% Infos
  31. -export([
  32. name/1,
  33. hooks/1,
  34. format/1,
  35. failed_action/1
  36. ]).
  37. -ifdef(TEST).
  38. -export([hk2func/1]).
  39. -endif.
  40. %% Server name (equal to grpc client channel name)
  41. -type server() :: #{
  42. name := binary(),
  43. %% The function options
  44. options := map(),
  45. %% gRPC channel pid
  46. channel := pid(),
  47. %% Registered hook names and options
  48. hookspec := #{hookpoint() => map()},
  49. %% Metrcis name prefix
  50. prefix := list()
  51. }.
  52. -type hookpoint() ::
  53. 'client.connect'
  54. | 'client.connack'
  55. | 'client.connected'
  56. | 'client.disconnected'
  57. | 'client.authenticate'
  58. | 'client.authorize'
  59. | 'client.subscribe'
  60. | 'client.unsubscribe'
  61. | 'session.created'
  62. | 'session.subscribed'
  63. | 'session.unsubscribed'
  64. | 'session.resumed'
  65. | 'session.discarded'
  66. | 'session.takenover'
  67. | 'session.terminated'
  68. | 'message.publish'
  69. | 'message.delivered'
  70. | 'message.acked'
  71. | 'message.dropped'.
  72. -export_type([server/0, hookpoint/0]).
  73. -dialyzer({nowarn_function, [inc_metrics/2]}).
  74. -elvis([{elvis_style, dont_repeat_yourself, disable}]).
  75. %%--------------------------------------------------------------------
  76. %% Load/Unload APIs
  77. %%--------------------------------------------------------------------
  78. -spec load(binary(), map()) -> {ok, server()} | {error, term()} | {load_error, term()} | disable.
  79. load(_Name, #{enable := false}) ->
  80. disable;
  81. load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) ->
  82. ReqOpts = #{timeout => Timeout, failed_action => FailedAction},
  83. case channel_opts(Opts) of
  84. {ok, {SvrAddr, ClientOpts}} ->
  85. case
  86. emqx_exhook_sup:start_grpc_client_channel(
  87. Name,
  88. SvrAddr,
  89. ClientOpts
  90. )
  91. of
  92. {ok, _ChannPoolPid} ->
  93. case do_init(Name, ReqOpts) of
  94. {ok, HookSpecs} ->
  95. %% Register metrics
  96. Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])),
  97. ensure_metrics(Prefix, HookSpecs),
  98. %% Ensure hooks
  99. ensure_hooks(HookSpecs),
  100. {ok, #{
  101. name => Name,
  102. options => ReqOpts,
  103. channel => _ChannPoolPid,
  104. hookspec => HookSpecs,
  105. prefix => Prefix
  106. }};
  107. {error, Reason} ->
  108. emqx_exhook_sup:stop_grpc_client_channel(Name),
  109. {load_error, Reason}
  110. end;
  111. {error, _} = E ->
  112. E
  113. end;
  114. Error ->
  115. Error
  116. end.
  117. %% @private
  118. channel_opts(Opts = #{url := URL, socket_options := SockOptsT}) ->
  119. ClientOpts = maps:merge(
  120. #{pool_size => erlang:system_info(schedulers)},
  121. Opts
  122. ),
  123. SockOpts = maps:to_list(SockOptsT),
  124. case uri_string:parse(URL) of
  125. #{scheme := <<"http">>, host := Host, port := Port} ->
  126. NClientOpts = ClientOpts#{
  127. gun_opts =>
  128. #{transport_opts => SockOpts}
  129. },
  130. {ok, {format_http_uri("http", Host, Port), NClientOpts}};
  131. #{scheme := <<"https">>, host := Host, port := Port} ->
  132. SslOpts =
  133. case maps:get(ssl, Opts, undefined) of
  134. undefined ->
  135. [];
  136. #{enable := false} ->
  137. [];
  138. MapOpts ->
  139. filter(
  140. [
  141. {cacertfile, maps:get(cacertfile, MapOpts, undefined)},
  142. {certfile, maps:get(certfile, MapOpts, undefined)},
  143. {keyfile, maps:get(keyfile, MapOpts, undefined)}
  144. ]
  145. )
  146. end,
  147. NClientOpts = ClientOpts#{
  148. gun_opts =>
  149. #{
  150. transport => ssl,
  151. transport_opts => SockOpts ++ SslOpts
  152. }
  153. },
  154. {ok, {format_http_uri("https", Host, Port), NClientOpts}};
  155. Error ->
  156. {error, {bad_server_url, URL, Error}}
  157. end.
  158. format_http_uri(Scheme, Host, Port) ->
  159. lists:flatten(io_lib:format("~ts://~ts:~w", [Scheme, Host, Port])).
  160. filter(Ls) ->
  161. [E || E <- Ls, E /= undefined].
  162. -spec unload(server()) -> ok.
  163. unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
  164. _ = may_unload_hooks(HookSpecs),
  165. _ = do_deinit(Name, ReqOpts),
  166. _ = emqx_exhook_sup:stop_grpc_client_channel(Name),
  167. ok.
  168. do_deinit(Name, ReqOpts) ->
  169. %% Override the request timeout to deinit grpc server to
  170. %% avoid emqx_exhook_mgr force killed by upper supervisor
  171. NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT},
  172. _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts),
  173. ok.
  174. do_init(ChannName, ReqOpts) ->
  175. %% BrokerInfo defined at: exhook.protos
  176. BrokerInfo = maps:with(
  177. [version, sysdescr, uptime, datetime],
  178. maps:from_list(emqx_sys:info())
  179. ),
  180. Req = #{broker => BrokerInfo},
  181. case do_call(ChannName, undefined, 'on_provider_loaded', Req, ReqOpts) of
  182. {ok, InitialResp} ->
  183. try
  184. {ok, resolve_hookspec(maps:get(hooks, InitialResp, []))}
  185. catch
  186. _:Reason:Stk ->
  187. ?SLOG(error, #{
  188. msg => "failed_to_init_channel",
  189. channel_name => ChannName,
  190. reason => Reason,
  191. stacktrace => Stk
  192. }),
  193. {error, Reason}
  194. end;
  195. {error, Reason} ->
  196. {error, Reason}
  197. end.
  198. %% @private
  199. resolve_hookspec(HookSpecs) when is_list(HookSpecs) ->
  200. MessageHooks = message_hooks(),
  201. AvailableHooks = available_hooks(),
  202. lists:foldr(
  203. fun(HookSpec, Acc) ->
  204. case maps:get(name, HookSpec, undefined) of
  205. undefined ->
  206. Acc;
  207. Name0 ->
  208. Name =
  209. try
  210. binary_to_existing_atom(Name0, utf8)
  211. catch
  212. T:R:_ -> {T, R}
  213. end,
  214. case {lists:member(Name, AvailableHooks), lists:member(Name, MessageHooks)} of
  215. {false, _} ->
  216. error({unknown_hookpoint, Name0});
  217. {true, false} ->
  218. Acc#{Name => #{}};
  219. {true, true} ->
  220. Acc#{
  221. Name => #{
  222. topics => maps:get(topics, HookSpec, [])
  223. }
  224. }
  225. end
  226. end
  227. end,
  228. #{},
  229. HookSpecs
  230. ).
  231. ensure_metrics(Prefix, HookSpecs) ->
  232. Keys = [
  233. list_to_atom(Prefix ++ atom_to_list(Hookpoint))
  234. || Hookpoint <- maps:keys(HookSpecs)
  235. ],
  236. lists:foreach(fun emqx_metrics:ensure/1, Keys).
  237. ensure_hooks(HookSpecs) ->
  238. lists:foreach(
  239. fun(Hookpoint) ->
  240. case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
  241. false ->
  242. ?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
  243. {Hookpoint, {M, F, A}} ->
  244. emqx_hooks:put(Hookpoint, {M, F, A}, ?HP_EXHOOK),
  245. ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
  246. end
  247. end,
  248. maps:keys(HookSpecs)
  249. ).
  250. may_unload_hooks(HookSpecs) ->
  251. lists:foreach(
  252. fun(Hookpoint) ->
  253. case ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
  254. Cnt when Cnt =< 0 ->
  255. case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
  256. {Hookpoint, {M, F, _A}} ->
  257. emqx_hooks:del(Hookpoint, {M, F});
  258. _ ->
  259. ok
  260. end,
  261. ets:delete(?HOOKS_REF_COUNTER, Hookpoint);
  262. _ ->
  263. ok
  264. end
  265. end,
  266. maps:keys(HookSpecs)
  267. ).
  268. format(#{name := Name, hookspec := Hooks}) ->
  269. lists:flatten(
  270. io_lib:format("name=~ts, hooks=~0p, active=true", [Name, Hooks])
  271. ).
  272. %%--------------------------------------------------------------------
  273. %% APIs
  274. %%--------------------------------------------------------------------
  275. name(#{name := Name}) ->
  276. Name.
  277. hooks(#{hookspec := Hooks}) ->
  278. FoldFun = fun(Hook, Params, Acc) ->
  279. [
  280. #{
  281. name => Hook,
  282. params => Params
  283. }
  284. | Acc
  285. ]
  286. end,
  287. maps:fold(FoldFun, [], Hooks).
  288. -spec call(hookpoint(), map(), server()) ->
  289. ignore
  290. | {ok, Resp :: term()}
  291. | {error, term()}.
  292. call(Hookpoint, Req, #{
  293. name := ChannName,
  294. options := ReqOpts,
  295. hookspec := Hooks,
  296. prefix := Prefix
  297. }) ->
  298. case maps:get(Hookpoint, Hooks, undefined) of
  299. undefined ->
  300. ignore;
  301. Opts ->
  302. NeedCall =
  303. case lists:member(Hookpoint, message_hooks()) of
  304. false ->
  305. true;
  306. _ ->
  307. #{message := #{topic := Topic}} = Req,
  308. match_topic_filter(Topic, maps:get(topics, Opts, []))
  309. end,
  310. case NeedCall of
  311. false ->
  312. ignore;
  313. _ ->
  314. inc_metrics(Prefix, Hookpoint),
  315. GrpcFun = hk2func(Hookpoint),
  316. do_call(ChannName, Hookpoint, GrpcFun, Req, ReqOpts)
  317. end
  318. end.
  319. %% @private
  320. inc_metrics(IncFun, Name) when is_function(IncFun) ->
  321. %% BACKW: e4.2.0-e4.2.2
  322. {env, [Prefix | _]} = erlang:fun_info(IncFun, env),
  323. inc_metrics(Prefix, Name);
  324. inc_metrics(Prefix, Name) when is_list(Prefix) ->
  325. emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
  326. -compile({inline, [match_topic_filter/2]}).
  327. match_topic_filter(_, []) ->
  328. true;
  329. match_topic_filter(TopicName, TopicFilter) ->
  330. lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
  331. -ifdef(TEST).
  332. -define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
  333. apply(?PB_CLIENT_MOD, Fun, [Req, #{<<"channel">> => ChannName}, Options])
  334. ).
  335. -else.
  336. -define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
  337. apply(?PB_CLIENT_MOD, Fun, [Req, Options])
  338. ).
  339. -endif.
  340. -spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
  341. do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
  342. NReq = Req#{meta => emqx_exhook_handler:request_meta()},
  343. Options = ReqOpts#{
  344. channel => ChannName,
  345. key_dispatch => key_dispatch(NReq)
  346. },
  347. ?SLOG(debug, #{
  348. msg => "do_call",
  349. module => ?PB_CLIENT_MOD,
  350. function => Fun,
  351. req => NReq,
  352. options => Options
  353. }),
  354. case catch ?CALL_PB_CLIENT(ChanneName, Fun, NReq, Options) of
  355. {ok, Resp, Metadata} ->
  356. ?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
  357. update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
  358. {ok, Resp};
  359. {error, {Code, Msg}, _Metadata} ->
  360. ?SLOG(error, #{
  361. msg => "exhook_call_error",
  362. module => ?PB_CLIENT_MOD,
  363. function => Fun,
  364. req => NReq,
  365. options => Options,
  366. code => Code,
  367. packet => Msg
  368. }),
  369. update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
  370. {error, {Code, Msg}};
  371. {error, Reason} ->
  372. ?SLOG(error, #{
  373. msg => "exhook_call_error",
  374. module => ?PB_CLIENT_MOD,
  375. function => Fun,
  376. req => NReq,
  377. options => Options,
  378. reason => Reason
  379. }),
  380. update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
  381. {error, Reason};
  382. {'EXIT', {Reason, Stk}} ->
  383. ?SLOG(error, #{
  384. msg => "exhook_call_exception",
  385. module => ?PB_CLIENT_MOD,
  386. function => Fun,
  387. req => NReq,
  388. options => Options,
  389. stacktrace => Stk
  390. }),
  391. update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
  392. {error, Reason}
  393. end.
  394. update_metrics(undefined, _ChannName, _Fun) ->
  395. ok;
  396. update_metrics(Hookpoint, ChannName, Fun) ->
  397. Fun(ChannName, Hookpoint).
  398. failed_action(#{options := Opts}) ->
  399. maps:get(failed_action, Opts).
  400. %%--------------------------------------------------------------------
  401. %% Internal funcs
  402. %%--------------------------------------------------------------------
  403. -compile({inline, [hk2func/1]}).
  404. hk2func('client.connect') -> 'on_client_connect';
  405. hk2func('client.connack') -> 'on_client_connack';
  406. hk2func('client.connected') -> 'on_client_connected';
  407. hk2func('client.disconnected') -> 'on_client_disconnected';
  408. hk2func('client.authenticate') -> 'on_client_authenticate';
  409. hk2func('client.authorize') -> 'on_client_authorize';
  410. hk2func('client.subscribe') -> 'on_client_subscribe';
  411. hk2func('client.unsubscribe') -> 'on_client_unsubscribe';
  412. hk2func('session.created') -> 'on_session_created';
  413. hk2func('session.subscribed') -> 'on_session_subscribed';
  414. hk2func('session.unsubscribed') -> 'on_session_unsubscribed';
  415. hk2func('session.resumed') -> 'on_session_resumed';
  416. hk2func('session.discarded') -> 'on_session_discarded';
  417. hk2func('session.takenover') -> 'on_session_takenover';
  418. hk2func('session.terminated') -> 'on_session_terminated';
  419. hk2func('message.publish') -> 'on_message_publish';
  420. hk2func('message.delivered') -> 'on_message_delivered';
  421. hk2func('message.acked') -> 'on_message_acked';
  422. hk2func('message.dropped') -> 'on_message_dropped'.
  423. -compile({inline, [message_hooks/0]}).
  424. message_hooks() ->
  425. [
  426. 'message.publish',
  427. 'message.delivered',
  428. 'message.acked',
  429. 'message.dropped'
  430. ].
  431. -compile({inline, [available_hooks/0]}).
  432. available_hooks() ->
  433. [
  434. 'client.connect',
  435. 'client.connack',
  436. 'client.connected',
  437. 'client.disconnected',
  438. 'client.authenticate',
  439. 'client.authorize',
  440. 'client.subscribe',
  441. 'client.unsubscribe',
  442. 'session.created',
  443. 'session.subscribed',
  444. 'session.unsubscribed',
  445. 'session.resumed',
  446. 'session.discarded',
  447. 'session.takenover',
  448. 'session.terminated'
  449. | message_hooks()
  450. ].
  451. %% @doc Get dispatch_key for each request
  452. key_dispatch(_Req = #{clientinfo := #{clientid := ClientId}}) ->
  453. ClientId;
  454. key_dispatch(_Req = #{conninfo := #{clientid := ClientId}}) ->
  455. ClientId;
  456. key_dispatch(_Req = #{message := #{from := From}}) ->
  457. From;
  458. key_dispatch(_Req) ->
  459. self().