emqx_mod_api_topic_metrics.erl 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_mod_api_topic_metrics).
  17. -rest_api(#{name => list_all_topic_metrics,
  18. method => 'GET',
  19. path => "/topic-metrics",
  20. func => list,
  21. descr => "A list of all topic metrics of all nodes in the cluster"}).
  22. -rest_api(#{name => list_topic_metrics,
  23. method => 'GET',
  24. path => "/topic-metrics/:bin:topic",
  25. func => list,
  26. descr => "A list of specfied topic metrics of all nodes in the cluster"}).
  27. -rest_api(#{name => register_topic_metrics,
  28. method => 'POST',
  29. path => "/topic-metrics",
  30. func => register,
  31. descr => "Register topic metrics"}).
  32. -rest_api(#{name => unregister_all_topic_metrics,
  33. method => 'DELETE',
  34. path => "/topic-metrics",
  35. func => unregister,
  36. descr => "Unregister all topic metrics"}).
  37. -rest_api(#{name => unregister_topic_metrics,
  38. method => 'DELETE',
  39. path => "/topic-metrics/:bin:topic",
  40. func => unregister,
  41. descr => "Unregister topic metrics"}).
  42. -export([ list/2
  43. , register/2
  44. , unregister/2
  45. ]).
  46. -export([ get_topic_metrics/2
  47. , register_topic_metrics/2
  48. , unregister_topic_metrics/2
  49. , unregister_all_topic_metrics/1
  50. ]).
  51. list(#{topic := Topic0}, _Params) ->
  52. execute_when_enabled(fun() ->
  53. Topic = emqx_mgmt_util:urldecode(Topic0),
  54. case safe_validate(Topic) of
  55. true ->
  56. case get_topic_metrics(Topic) of
  57. {error, Reason} -> return({error, Reason});
  58. Metrics -> return({ok, maps:from_list(Metrics)})
  59. end;
  60. false ->
  61. return({error, invalid_topic_name})
  62. end
  63. end);
  64. list(_Bindings, _Params) ->
  65. execute_when_enabled(fun() ->
  66. case get_all_topic_metrics() of
  67. {error, Reason} -> return({error, Reason});
  68. Metrics -> return({ok, Metrics})
  69. end
  70. end).
  71. register(_Bindings, Params) ->
  72. execute_when_enabled(fun() ->
  73. case proplists:get_value(<<"topic">>, Params) of
  74. undefined ->
  75. return({error, missing_required_params});
  76. Topic ->
  77. case safe_validate(Topic) of
  78. true ->
  79. register_topic_metrics(Topic),
  80. return(ok);
  81. false ->
  82. return({error, invalid_topic_name})
  83. end
  84. end
  85. end).
  86. unregister(Bindings, _Params) when map_size(Bindings) =:= 0 ->
  87. execute_when_enabled(fun() ->
  88. unregister_all_topic_metrics(),
  89. return(ok)
  90. end);
  91. unregister(#{topic := Topic0}, _Params) ->
  92. execute_when_enabled(fun() ->
  93. Topic = emqx_mgmt_util:urldecode(Topic0),
  94. case safe_validate(Topic) of
  95. true ->
  96. unregister_topic_metrics(Topic),
  97. return(ok);
  98. false ->
  99. return({error, invalid_topic_name})
  100. end
  101. end).
  102. execute_when_enabled(Fun) ->
  103. case emqx_modules:find_module(topic_metrics) of
  104. true ->
  105. Fun();
  106. false ->
  107. return({error, module_not_loaded})
  108. end.
  109. safe_validate(Topic) ->
  110. try emqx_topic:validate(name, Topic) of
  111. true -> true
  112. catch
  113. error:_Error ->
  114. false
  115. end.
  116. get_all_topic_metrics() ->
  117. lists:foldl(fun(Topic, Acc) ->
  118. case get_topic_metrics(Topic) of
  119. {error, _Reason} ->
  120. Acc;
  121. Metrics ->
  122. [#{topic => Topic, metrics => Metrics} | Acc]
  123. end
  124. end, [], emqx_mod_topic_metrics:all_registered_topics()).
  125. get_topic_metrics(Topic) ->
  126. lists:foldl(fun(Node, Acc) ->
  127. case get_topic_metrics(Node, Topic) of
  128. {error, _Reason} ->
  129. Acc;
  130. Metrics ->
  131. case Acc of
  132. [] -> Metrics;
  133. _ ->
  134. lists:foldl(fun({K, V}, Acc0) ->
  135. [{K, V + proplists:get_value(K, Metrics, 0)} | Acc0]
  136. end, [], Acc)
  137. end
  138. end
  139. end, [], ekka_mnesia:running_nodes()).
  140. get_topic_metrics(Node, Topic) when Node =:= node() ->
  141. emqx_mod_topic_metrics:metrics(Topic);
  142. get_topic_metrics(Node, Topic) ->
  143. rpc_call(Node, get_topic_metrics, [Node, Topic]).
  144. register_topic_metrics(Topic) ->
  145. Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
  146. case lists:any(fun(Item) -> Item =:= ok end, Results) of
  147. true -> ok;
  148. false -> lists:last(Results)
  149. end.
  150. register_topic_metrics(Node, Topic) when Node =:= node() ->
  151. emqx_mod_topic_metrics:register(Topic);
  152. register_topic_metrics(Node, Topic) ->
  153. rpc_call(Node, register_topic_metrics, [Node, Topic]).
  154. unregister_topic_metrics(Topic) ->
  155. Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
  156. case lists:any(fun(Item) -> Item =:= ok end, Results) of
  157. true -> ok;
  158. false -> lists:last(Results)
  159. end.
  160. unregister_topic_metrics(Node, Topic) when Node =:= node() ->
  161. emqx_mod_topic_metrics:unregister(Topic);
  162. unregister_topic_metrics(Node, Topic) ->
  163. rpc_call(Node, unregister_topic_metrics, [Node, Topic]).
  164. unregister_all_topic_metrics() ->
  165. Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()],
  166. case lists:any(fun(Item) -> Item =:= ok end, Results) of
  167. true -> ok;
  168. false -> lists:last(Results)
  169. end.
  170. unregister_all_topic_metrics(Node) when Node =:= node() ->
  171. emqx_mod_topic_metrics:unregister_all();
  172. unregister_all_topic_metrics(Node) ->
  173. rpc_call(Node, unregister_topic_metrics, [Node]).
  174. rpc_call(Node, Fun, Args) ->
  175. case rpc:call(Node, ?MODULE, Fun, Args) of
  176. {badrpc, Reason} -> {error, Reason};
  177. Res -> Res
  178. end.
  179. return(_) ->
  180. %% TODO: V5 API
  181. ok.