|
|
@@ -98,8 +98,13 @@ handle_cast(_Msg, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) ->
|
|
|
- #{interval := Interval, push_gateway_server := Server} = opts(),
|
|
|
- PushRes = push_to_push_gateway(Server),
|
|
|
+ #{
|
|
|
+ interval := Interval,
|
|
|
+ headers := Headers,
|
|
|
+ job_name := JobName,
|
|
|
+ push_gateway_server := Server
|
|
|
+ } = opts(),
|
|
|
+ PushRes = push_to_push_gateway(Server, Headers, JobName),
|
|
|
NewTimer = ensure_timer(Interval),
|
|
|
NewState = maps:update_with(PushRes, fun(C) -> C + 1 end, 1, State#{timer => NewTimer}),
|
|
|
%% Data is too big, hibernate for saving memory and stop system monitor warning.
|
|
|
@@ -107,18 +112,27 @@ handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) ->
|
|
|
handle_info(_Msg, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
-push_to_push_gateway(Uri) ->
|
|
|
+push_to_push_gateway(Uri, Headers, JobName) when is_list(Headers) ->
|
|
|
[Name, Ip] = string:tokens(atom_to_list(node()), "@"),
|
|
|
- Url = lists:concat([Uri, "/metrics/job/", Name, "/instance/", Name, "~", Ip]),
|
|
|
+ JobName1 = emqx_placeholder:preproc_tmpl(JobName),
|
|
|
+ JobName2 = binary_to_list(
|
|
|
+ emqx_placeholder:proc_tmpl(
|
|
|
+ JobName1,
|
|
|
+ #{<<"name">> => Name, <<"host">> => Ip}
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ Url = lists:concat([Uri, "/metrics/job/", JobName2]),
|
|
|
Data = prometheus_text_format:format(),
|
|
|
- case httpc:request(post, {Url, [], "text/plain", Data}, ?HTTP_OPTIONS, []) of
|
|
|
- {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, _Body}} ->
|
|
|
+ case httpc:request(post, {Url, Headers, "text/plain", Data}, ?HTTP_OPTIONS, []) of
|
|
|
+ {ok, {{"HTTP/1.1", 200, _}, _RespHeaders, _RespBody}} ->
|
|
|
ok;
|
|
|
Error ->
|
|
|
?SLOG(error, #{
|
|
|
msg => "post_to_push_gateway_failed",
|
|
|
error => Error,
|
|
|
- url => Url
|
|
|
+ url => Url,
|
|
|
+ headers => Headers
|
|
|
}),
|
|
|
failed
|
|
|
end.
|