emqx_dashboard_monitor_api.erl 9.2 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_dashboard_monitor_api).
  17. -include("emqx_dashboard.hrl").
  18. -include_lib("typerefl/include/types.hrl").
  19. -include_lib("hocon/include/hocon_types.hrl").
  20. -include_lib("emqx_utils/include/emqx_utils_api.hrl").
  21. -behaviour(minirest_api).
  22. -export([api_spec/0]).
  23. -export([
  24. paths/0,
  25. schema/1,
  26. fields/1,
  27. namespace/0
  28. ]).
  29. -export([
  30. monitor/2,
  31. monitor_current/2
  32. ]).
  33. namespace() -> undefined.
  34. api_spec() ->
  35. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
  36. paths() ->
  37. [
  38. "/monitor",
  39. "/monitor/nodes/:node",
  40. "/monitor_current",
  41. "/monitor_current/nodes/:node"
  42. ].
  43. schema("/monitor") ->
  44. #{
  45. 'operationId' => monitor,
  46. get => #{
  47. tags => [<<"Metrics">>],
  48. description => ?DESC(list_monitor),
  49. parameters => [parameter_latest()],
  50. responses => #{
  51. 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
  52. 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
  53. }
  54. }
  55. };
  56. schema("/monitor/nodes/:node") ->
  57. #{
  58. 'operationId' => monitor,
  59. get => #{
  60. tags => [<<"Metrics">>],
  61. description => ?DESC(list_monitor_node),
  62. parameters => [parameter_node(), parameter_latest()],
  63. responses => #{
  64. 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
  65. 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Node not found">>)
  66. }
  67. }
  68. };
  69. schema("/monitor_current") ->
  70. #{
  71. 'operationId' => monitor_current,
  72. get => #{
  73. tags => [<<"Metrics">>],
  74. description => ?DESC(current_stats),
  75. responses => #{
  76. 200 => hoconsc:mk(hoconsc:ref(sampler_current), #{})
  77. }
  78. }
  79. };
  80. schema("/monitor_current/nodes/:node") ->
  81. #{
  82. 'operationId' => monitor_current,
  83. get => #{
  84. tags => [<<"Metrics">>],
  85. description => ?DESC(current_stats_node),
  86. parameters => [parameter_node()],
  87. responses => #{
  88. 200 => hoconsc:mk(hoconsc:ref(sampler_current_node), #{}),
  89. 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Node not found">>)
  90. }
  91. }
  92. }.
  93. parameter_latest() ->
  94. Info = #{
  95. in => query,
  96. required => false,
  97. example => 5 * 60,
  98. desc => <<"The latest N seconds data. Like 300 for 5 min.">>
  99. },
  100. {latest, hoconsc:mk(range(1, inf), Info)}.
  101. parameter_node() ->
  102. Info = #{
  103. in => path,
  104. required => true,
  105. example => node(),
  106. desc => <<"EMQX node name.">>
  107. },
  108. {node, hoconsc:mk(binary(), Info)}.
  109. fields(sampler) ->
  110. Samplers =
  111. [
  112. {SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}
  113. || SamplerName <- ?SAMPLER_LIST
  114. ],
  115. [{time_stamp, hoconsc:mk(non_neg_integer(), #{desc => <<"Timestamp">>})} | Samplers];
  116. fields(sampler_current_node) ->
  117. fields_current(sample_names(sampler_current_node));
  118. fields(sampler_current) ->
  119. fields_current(sample_names(sampler_current)).
  120. sample_names(sampler_current_node) ->
  121. maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST ++ ?CURRENT_SAMPLE_NON_RATE;
  122. sample_names(sampler_current) ->
  123. sample_names(sampler_current_node) -- [node_uptime].
  124. fields_current(Names) ->
  125. [
  126. {SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}
  127. || SamplerName <- Names
  128. ].
  129. %% -------------------------------------------------------------------------------------------------
  130. %% API
  131. monitor(get, #{query_string := QS, bindings := Bindings}) ->
  132. Latest = maps:get(<<"latest">>, QS, infinity),
  133. RawNode = maps:get(node, Bindings, <<"all">>),
  134. emqx_utils_api:with_node_or_cluster(RawNode, dashboard_samplers_fun(Latest)).
  135. dashboard_samplers_fun(Latest) ->
  136. fun(NodeOrCluster) ->
  137. case emqx_dashboard_monitor:samplers(NodeOrCluster, Latest) of
  138. {badrpc, _} = Error -> {error, Error};
  139. Samplers -> {ok, Samplers}
  140. end
  141. end.
  142. monitor_current(get, #{bindings := Bindings}) ->
  143. RawNode = maps:get(node, Bindings, <<"all">>),
  144. case emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1) of
  145. ?OK(Rates) ->
  146. ?OK(maybe_reject_cluster_only_metrics(RawNode, Rates));
  147. Error ->
  148. Error
  149. end.
  150. -spec current_rate(atom()) ->
  151. {error, term()}
  152. | {ok, Result :: map()}.
  153. current_rate(Node) ->
  154. %% Node :: 'all' or `NodeName`
  155. case emqx_dashboard_monitor:current_rate(Node) of
  156. {badrpc, _} = BadRpc ->
  157. {error, BadRpc};
  158. {ok, _} = OkResult ->
  159. OkResult
  160. end.
  161. %% -------------------------------------------------------------------------------------------------
  162. %% Internal
  163. -define(APPROXIMATE_DESC, " Can only represent an approximate state.").
  164. swagger_desc(received) ->
  165. swagger_desc_format("Received messages ");
  166. swagger_desc(received_bytes) ->
  167. swagger_desc_format("Received bytes ");
  168. swagger_desc(sent) ->
  169. swagger_desc_format("Sent messages ");
  170. swagger_desc(sent_bytes) ->
  171. swagger_desc_format("Sent bytes ");
  172. swagger_desc(dropped) ->
  173. swagger_desc_format("Dropped messages ");
  174. swagger_desc(validation_succeeded) ->
  175. swagger_desc_format("Schema validations succeeded ");
  176. swagger_desc(validation_failed) ->
  177. swagger_desc_format("Schema validations failed ");
  178. swagger_desc(transformation_succeeded) ->
  179. swagger_desc_format("Message transformations succeeded ");
  180. swagger_desc(transformation_failed) ->
  181. swagger_desc_format("Message transformations failed ");
  182. swagger_desc(persisted) ->
  183. swagger_desc_format("Messages saved to the durable storage ");
  184. swagger_desc(disconnected_durable_sessions) ->
  185. <<"Disconnected durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
  186. swagger_desc(subscriptions_durable) ->
  187. <<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
  188. swagger_desc(subscriptions) ->
  189. <<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
  190. swagger_desc(topics) ->
  191. <<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>;
  192. swagger_desc(connections) ->
  193. <<"Sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
  194. swagger_desc(live_connections) ->
  195. <<"Connections at the time of sampling.", ?APPROXIMATE_DESC>>;
  196. swagger_desc(cluster_sessions) ->
  197. <<
  198. "Total number of sessions in the cluster at the time of sampling. "
  199. "It includes expired sessions when `broker.session_history_retain` is set to a duration greater than `0s`."
  200. ?APPROXIMATE_DESC
  201. >>;
  202. swagger_desc(received_msg_rate) ->
  203. swagger_desc_format("Dropped messages ", per);
  204. %swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per);
  205. swagger_desc(sent_msg_rate) ->
  206. swagger_desc_format("Sent messages ", per);
  207. %swagger_desc(sent_bytes_rate) -> swagger_desc_format("Sent bytes ", per);
  208. swagger_desc(dropped_msg_rate) ->
  209. swagger_desc_format("Dropped messages ", per);
  210. swagger_desc(validation_succeeded_rate) ->
  211. swagger_desc_format("Schema validations succeeded ", per);
  212. swagger_desc(validation_failed_rate) ->
  213. swagger_desc_format("Schema validations failed ", per);
  214. swagger_desc(transformation_succeeded_rate) ->
  215. swagger_desc_format("Message transformations succeeded ", per);
  216. swagger_desc(transformation_failed_rate) ->
  217. swagger_desc_format("Message transformations failed ", per);
  218. swagger_desc(persisted_rate) ->
  219. swagger_desc_format("Messages saved to the durable storage ", per);
  220. swagger_desc(retained_msg_count) ->
  221. <<"Retained messages count at the time of sampling.", ?APPROXIMATE_DESC>>;
  222. swagger_desc(shared_subscriptions) ->
  223. <<"Shared subscriptions count at the time of sampling.", ?APPROXIMATE_DESC>>;
  224. swagger_desc(node_uptime) ->
  225. <<"Node up time in seconds. Only presented in endpoint: `/monitor_current/nodes/:node`.">>;
  226. swagger_desc(license_quota) ->
  227. <<"License quota. AKA: limited max_connections for cluster">>.
  228. swagger_desc_format(Format) ->
  229. swagger_desc_format(Format, last).
  230. swagger_desc_format(Format, Type) ->
  231. Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
  232. list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).
  233. maybe_reject_cluster_only_metrics(<<"all">>, Rates) ->
  234. Rates;
  235. maybe_reject_cluster_only_metrics(_Node, Rates) ->
  236. maps:without(?CLUSTERONLY_SAMPLER_LIST, Rates).