| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_exhook_server).
- -include("emqx_exhook.hrl").
- -include_lib("emqx/include/logger.hrl").
- -include_lib("emqx/include/emqx_hooks.hrl").
- %% The exhook proto version should be fixed as `v2` in EMQX v5.x
- %% to make sure the exhook proto version is compatible
- -define(PB_CLIENT_MOD, emqx_exhook_v_2_hook_provider_client).
- %% Load/Unload
- -export([
- load/2,
- unload/1
- ]).
- %% APIs
- -export([call/3]).
- %% Infos
- -export([
- name/1,
- hooks/1,
- format/1,
- failed_action/1
- ]).
- -ifdef(TEST).
- -export([hk2func/1]).
- -endif.
- %% Server name (equal to grpc client channel name)
- -type server() :: #{
- name := binary(),
- %% The function options
- options := map(),
- %% gRPC channel pid
- channel := pid(),
- %% Registered hook names and options
- hookspec := #{hookpoint() => map()},
- %% Metrcis name prefix
- prefix := list()
- }.
- -type hookpoint() ::
- 'client.connect'
- | 'client.connack'
- | 'client.connected'
- | 'client.disconnected'
- | 'client.authenticate'
- | 'client.authorize'
- | 'client.subscribe'
- | 'client.unsubscribe'
- | 'session.created'
- | 'session.subscribed'
- | 'session.unsubscribed'
- | 'session.resumed'
- | 'session.discarded'
- | 'session.takenover'
- | 'session.terminated'
- | 'message.publish'
- | 'message.delivered'
- | 'message.acked'
- | 'message.dropped'.
- -export_type([server/0, hookpoint/0]).
- -dialyzer({nowarn_function, [inc_metrics/2]}).
- -elvis([{elvis_style, dont_repeat_yourself, disable}]).
- %%--------------------------------------------------------------------
- %% Load/Unload APIs
- %%--------------------------------------------------------------------
- -spec load(binary(), map()) -> {ok, server()} | {error, term()} | {load_error, term()} | disable.
- load(_Name, #{enable := false}) ->
- disable;
- load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) ->
- ReqOpts = #{timeout => Timeout, failed_action => FailedAction},
- case channel_opts(Opts) of
- {ok, {SvrAddr, ClientOpts}} ->
- case
- emqx_exhook_sup:start_grpc_client_channel(
- Name,
- SvrAddr,
- ClientOpts
- )
- of
- {ok, _ChannPoolPid} ->
- case do_init(Name, ReqOpts) of
- {ok, HookSpecs} ->
- %% Register metrics
- Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])),
- ensure_metrics(Prefix, HookSpecs),
- %% Ensure hooks
- ensure_hooks(HookSpecs),
- {ok, #{
- name => Name,
- options => ReqOpts,
- channel => _ChannPoolPid,
- hookspec => HookSpecs,
- prefix => Prefix
- }};
- {error, Reason} ->
- emqx_exhook_sup:stop_grpc_client_channel(Name),
- {load_error, Reason}
- end;
- {error, _} = E ->
- E
- end;
- Error ->
- Error
- end.
- %% @private
- channel_opts(Opts = #{url := URL, socket_options := SockOptsT}) ->
- ClientOpts = maps:merge(
- #{pool_size => erlang:system_info(schedulers)},
- Opts
- ),
- SockOpts = maps:to_list(SockOptsT),
- case uri_string:parse(URL) of
- #{scheme := <<"http">>, host := Host, port := Port} ->
- NClientOpts = ClientOpts#{
- gun_opts =>
- #{transport_opts => SockOpts}
- },
- {ok, {format_http_uri("http", Host, Port), NClientOpts}};
- #{scheme := <<"https">>, host := Host, port := Port} ->
- SslOpts =
- case maps:get(ssl, Opts, undefined) of
- undefined ->
- [];
- #{enable := false} ->
- [];
- MapOpts ->
- filter(
- [
- {cacertfile, maps:get(cacertfile, MapOpts, undefined)},
- {certfile, maps:get(certfile, MapOpts, undefined)},
- {keyfile, maps:get(keyfile, MapOpts, undefined)}
- ]
- )
- end,
- NClientOpts = ClientOpts#{
- gun_opts =>
- #{
- transport => ssl,
- transport_opts => SockOpts ++ SslOpts
- }
- },
- {ok, {format_http_uri("https", Host, Port), NClientOpts}};
- Error ->
- {error, {bad_server_url, URL, Error}}
- end.
- format_http_uri(Scheme, Host, Port) ->
- lists:flatten(io_lib:format("~ts://~ts:~w", [Scheme, Host, Port])).
- filter(Ls) ->
- [E || E <- Ls, E /= undefined].
- -spec unload(server()) -> ok.
- unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
- _ = may_unload_hooks(HookSpecs),
- _ = do_deinit(Name, ReqOpts),
- _ = emqx_exhook_sup:stop_grpc_client_channel(Name),
- ok.
- do_deinit(Name, ReqOpts) ->
- %% Override the request timeout to deinit grpc server to
- %% avoid emqx_exhook_mgr force killed by upper supervisor
- NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT},
- _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts),
- ok.
- do_init(ChannName, ReqOpts) ->
- %% BrokerInfo defined at: exhook.protos
- BrokerInfo = maps:with(
- [version, sysdescr, uptime, datetime],
- maps:from_list(emqx_sys:info())
- ),
- Req = #{broker => BrokerInfo},
- case do_call(ChannName, undefined, 'on_provider_loaded', Req, ReqOpts) of
- {ok, InitialResp} ->
- try
- {ok, resolve_hookspec(maps:get(hooks, InitialResp, []))}
- catch
- _:Reason:Stk ->
- ?SLOG(error, #{
- msg => "failed_to_init_channel",
- channel_name => ChannName,
- reason => Reason,
- stacktrace => Stk
- }),
- {error, Reason}
- end;
- {error, Reason} ->
- {error, Reason}
- end.
- %% @private
- resolve_hookspec(HookSpecs) when is_list(HookSpecs) ->
- MessageHooks = message_hooks(),
- AvailableHooks = available_hooks(),
- lists:foldr(
- fun(HookSpec, Acc) ->
- case maps:get(name, HookSpec, undefined) of
- undefined ->
- Acc;
- Name0 ->
- Name =
- try
- binary_to_existing_atom(Name0, utf8)
- catch
- T:R:_ -> {T, R}
- end,
- case {lists:member(Name, AvailableHooks), lists:member(Name, MessageHooks)} of
- {false, _} ->
- error({unknown_hookpoint, Name0});
- {true, false} ->
- Acc#{Name => #{}};
- {true, true} ->
- Acc#{
- Name => #{
- topics => maps:get(topics, HookSpec, [])
- }
- }
- end
- end
- end,
- #{},
- HookSpecs
- ).
- ensure_metrics(Prefix, HookSpecs) ->
- Keys = [
- list_to_atom(Prefix ++ atom_to_list(Hookpoint))
- || Hookpoint <- maps:keys(HookSpecs)
- ],
- lists:foreach(fun emqx_metrics:ensure/1, Keys).
- ensure_hooks(HookSpecs) ->
- lists:foreach(
- fun(Hookpoint) ->
- case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
- false ->
- ?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
- {Hookpoint, {M, F, A}} ->
- emqx_hooks:put(Hookpoint, {M, F, A}, ?HP_EXHOOK),
- ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
- end
- end,
- maps:keys(HookSpecs)
- ).
- may_unload_hooks(HookSpecs) ->
- lists:foreach(
- fun(Hookpoint) ->
- case ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
- Cnt when Cnt =< 0 ->
- case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
- {Hookpoint, {M, F, _A}} ->
- emqx_hooks:del(Hookpoint, {M, F});
- _ ->
- ok
- end,
- ets:delete(?HOOKS_REF_COUNTER, Hookpoint);
- _ ->
- ok
- end
- end,
- maps:keys(HookSpecs)
- ).
- format(#{name := Name, hookspec := Hooks}) ->
- lists:flatten(
- io_lib:format("name=~ts, hooks=~0p, active=true", [Name, Hooks])
- ).
- %%--------------------------------------------------------------------
- %% APIs
- %%--------------------------------------------------------------------
- name(#{name := Name}) ->
- Name.
- hooks(#{hookspec := Hooks}) ->
- FoldFun = fun(Hook, Params, Acc) ->
- [
- #{
- name => Hook,
- params => Params
- }
- | Acc
- ]
- end,
- maps:fold(FoldFun, [], Hooks).
- -spec call(hookpoint(), map(), server()) ->
- ignore
- | {ok, Resp :: term()}
- | {error, term()}.
- call(Hookpoint, Req, #{
- name := ChannName,
- options := ReqOpts,
- hookspec := Hooks,
- prefix := Prefix
- }) ->
- case maps:get(Hookpoint, Hooks, undefined) of
- undefined ->
- ignore;
- Opts ->
- NeedCall =
- case lists:member(Hookpoint, message_hooks()) of
- false ->
- true;
- _ ->
- #{message := #{topic := Topic}} = Req,
- match_topic_filter(Topic, maps:get(topics, Opts, []))
- end,
- case NeedCall of
- false ->
- ignore;
- _ ->
- inc_metrics(Prefix, Hookpoint),
- GrpcFun = hk2func(Hookpoint),
- do_call(ChannName, Hookpoint, GrpcFun, Req, ReqOpts)
- end
- end.
- %% @private
- inc_metrics(IncFun, Name) when is_function(IncFun) ->
- %% BACKW: e4.2.0-e4.2.2
- {env, [Prefix | _]} = erlang:fun_info(IncFun, env),
- inc_metrics(Prefix, Name);
- inc_metrics(Prefix, Name) when is_list(Prefix) ->
- emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
- -compile({inline, [match_topic_filter/2]}).
- match_topic_filter(_, []) ->
- true;
- match_topic_filter(TopicName, TopicFilter) ->
- lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
- -ifdef(TEST).
- -define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
- apply(?PB_CLIENT_MOD, Fun, [Req, #{<<"channel">> => ChannName}, Options])
- ).
- -else.
- -define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
- apply(?PB_CLIENT_MOD, Fun, [Req, Options])
- ).
- -endif.
- -spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
- do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
- NReq = Req#{meta => emqx_exhook_handler:request_meta()},
- Options = ReqOpts#{
- channel => ChannName,
- key_dispatch => key_dispatch(NReq)
- },
- ?SLOG(debug, #{
- msg => "do_call",
- module => ?PB_CLIENT_MOD,
- function => Fun,
- req => NReq,
- options => Options
- }),
- case catch ?CALL_PB_CLIENT(ChanneName, Fun, NReq, Options) of
- {ok, Resp, Metadata} ->
- ?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
- update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
- {ok, Resp};
- {error, {Code, Msg}, _Metadata} ->
- ?SLOG(error, #{
- msg => "exhook_call_error",
- module => ?PB_CLIENT_MOD,
- function => Fun,
- req => NReq,
- options => Options,
- code => Code,
- packet => Msg
- }),
- update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
- {error, {Code, Msg}};
- {error, Reason} ->
- ?SLOG(error, #{
- msg => "exhook_call_error",
- module => ?PB_CLIENT_MOD,
- function => Fun,
- req => NReq,
- options => Options,
- reason => Reason
- }),
- update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
- {error, Reason};
- {'EXIT', {Reason, Stk}} ->
- ?SLOG(error, #{
- msg => "exhook_call_exception",
- module => ?PB_CLIENT_MOD,
- function => Fun,
- req => NReq,
- options => Options,
- stacktrace => Stk
- }),
- update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
- {error, Reason}
- end.
- update_metrics(undefined, _ChannName, _Fun) ->
- ok;
- update_metrics(Hookpoint, ChannName, Fun) ->
- Fun(ChannName, Hookpoint).
- failed_action(#{options := Opts}) ->
- maps:get(failed_action, Opts).
- %%--------------------------------------------------------------------
- %% Internal funcs
- %%--------------------------------------------------------------------
- -compile({inline, [hk2func/1]}).
- hk2func('client.connect') -> 'on_client_connect';
- hk2func('client.connack') -> 'on_client_connack';
- hk2func('client.connected') -> 'on_client_connected';
- hk2func('client.disconnected') -> 'on_client_disconnected';
- hk2func('client.authenticate') -> 'on_client_authenticate';
- hk2func('client.authorize') -> 'on_client_authorize';
- hk2func('client.subscribe') -> 'on_client_subscribe';
- hk2func('client.unsubscribe') -> 'on_client_unsubscribe';
- hk2func('session.created') -> 'on_session_created';
- hk2func('session.subscribed') -> 'on_session_subscribed';
- hk2func('session.unsubscribed') -> 'on_session_unsubscribed';
- hk2func('session.resumed') -> 'on_session_resumed';
- hk2func('session.discarded') -> 'on_session_discarded';
- hk2func('session.takenover') -> 'on_session_takenover';
- hk2func('session.terminated') -> 'on_session_terminated';
- hk2func('message.publish') -> 'on_message_publish';
- hk2func('message.delivered') -> 'on_message_delivered';
- hk2func('message.acked') -> 'on_message_acked';
- hk2func('message.dropped') -> 'on_message_dropped'.
- -compile({inline, [message_hooks/0]}).
- message_hooks() ->
- [
- 'message.publish',
- 'message.delivered',
- 'message.acked',
- 'message.dropped'
- ].
- -compile({inline, [available_hooks/0]}).
- available_hooks() ->
- [
- 'client.connect',
- 'client.connack',
- 'client.connected',
- 'client.disconnected',
- 'client.authenticate',
- 'client.authorize',
- 'client.subscribe',
- 'client.unsubscribe',
- 'session.created',
- 'session.subscribed',
- 'session.unsubscribed',
- 'session.resumed',
- 'session.discarded',
- 'session.takenover',
- 'session.terminated'
- | message_hooks()
- ].
- %% @doc Get dispatch_key for each request
- key_dispatch(_Req = #{clientinfo := #{clientid := ClientId}}) ->
- ClientId;
- key_dispatch(_Req = #{conninfo := #{clientid := ClientId}}) ->
- ClientId;
- key_dispatch(_Req = #{message := #{from := From}}) ->
- From;
- key_dispatch(_Req) ->
- self().
|