|
|
@@ -153,7 +153,7 @@ unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
|
|
|
ok.
|
|
|
|
|
|
do_deinit(Name, ReqOpts) ->
|
|
|
- _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts),
|
|
|
+ _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts),
|
|
|
ok.
|
|
|
|
|
|
do_init(ChannName, ReqOpts) ->
|
|
|
@@ -161,7 +161,7 @@ do_init(ChannName, ReqOpts) ->
|
|
|
BrokerInfo = maps:with([version, sysdescr, uptime, datetime],
|
|
|
maps:from_list(emqx_sys:info())),
|
|
|
Req = #{broker => BrokerInfo},
|
|
|
- case do_call(ChannName, 'on_provider_loaded', Req, ReqOpts) of
|
|
|
+ case do_call(ChannName, undefined, 'on_provider_loaded', Req, ReqOpts) of
|
|
|
{ok, InitialResp} ->
|
|
|
try
|
|
|
{ok, resolve_hookspec(maps:get(hooks, InitialResp, []))}
|
|
|
@@ -255,7 +255,6 @@ hooks(#{hookspec := Hooks}) ->
|
|
|
| {error, term()}.
|
|
|
call(Hookpoint, Req, #{name := ChannName, options := ReqOpts,
|
|
|
hookspec := Hooks, prefix := Prefix}) ->
|
|
|
- GrpcFunc = hk2func(Hookpoint),
|
|
|
case maps:get(Hookpoint, Hooks, undefined) of
|
|
|
undefined -> ignore;
|
|
|
Opts ->
|
|
|
@@ -269,7 +268,8 @@ call(Hookpoint, Req, #{name := ChannName, options := ReqOpts,
|
|
|
false -> ignore;
|
|
|
_ ->
|
|
|
inc_metrics(Prefix, Hookpoint),
|
|
|
- do_call(ChannName, GrpcFunc, Req, ReqOpts)
|
|
|
+ GrpcFun = hk2func(Hookpoint),
|
|
|
+ do_call(ChannName, Hookpoint, GrpcFun, Req, ReqOpts)
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
@@ -287,29 +287,39 @@ match_topic_filter(_, []) ->
|
|
|
match_topic_filter(TopicName, TopicFilter) ->
|
|
|
lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
|
|
|
|
|
|
--spec do_call(binary(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
|
|
|
-do_call(ChannName, Fun, Req, ReqOpts) ->
|
|
|
+-spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
|
|
|
+do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
|
|
|
Options = ReqOpts#{channel => ChannName},
|
|
|
?SLOG(debug, #{msg => "do_call", module => ?PB_CLIENT_MOD, function => Fun,
|
|
|
req => Req, options => Options}),
|
|
|
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
|
|
|
{ok, Resp, Metadata} ->
|
|
|
?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
|
|
|
+ update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
|
|
|
{ok, Resp};
|
|
|
{error, {Code, Msg}, _Metadata} ->
|
|
|
?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun,
|
|
|
- req => Req, options => Options, code => Code, packet => Msg}),
|
|
|
+ req => Req, options => Options, code => Code, packet => Msg}),
|
|
|
+ update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
|
|
|
{error, {Code, Msg}};
|
|
|
{error, Reason} ->
|
|
|
?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun,
|
|
|
- req => Req, options => Options, reason => Reason}),
|
|
|
+ req => Req, options => Options, reason => Reason}),
|
|
|
+ update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
|
|
|
{error, Reason};
|
|
|
{'EXIT', {Reason, Stk}} ->
|
|
|
?SLOG(error, #{msg => "exhook_call_exception", module => ?PB_CLIENT_MOD, function => Fun,
|
|
|
- req => Req, options => Options, stacktrace => Stk}),
|
|
|
+ req => Req, options => Options, stacktrace => Stk}),
|
|
|
+ update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
|
|
|
{error, Reason}
|
|
|
end.
|
|
|
|
|
|
+update_metrics(undefined, _ChannName, _Fun) ->
|
|
|
+ ok;
|
|
|
+
|
|
|
+update_metrics(Hookpoint, ChannName, Fun) ->
|
|
|
+ Fun(ChannName, Hookpoint).
|
|
|
+
|
|
|
failed_action(#{options := Opts}) ->
|
|
|
maps:get(failed_action, Opts).
|
|
|
|