emqx_plugin_libs_rule.erl 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugin_libs_rule).
  17. -elvis([{elvis_style, god_modules, disable}]).
  18. %% preprocess and process template string with place holders
  19. -export([ preproc_tmpl/1
  20. , proc_tmpl/2
  21. , proc_tmpl/3
  22. , preproc_cmd/1
  23. , proc_cmd/2
  24. , proc_cmd/3
  25. , preproc_sql/1
  26. , preproc_sql/2
  27. , proc_sql/2
  28. , proc_sql_param_str/2
  29. , proc_cql_param_str/2
  30. ]).
  31. %% type converting
  32. -export([ str/1
  33. , bin/1
  34. , bool/1
  35. , int/1
  36. , float/1
  37. , map/1
  38. , utf8_bin/1
  39. , utf8_str/1
  40. , number_to_binary/1
  41. , atom_key/1
  42. , unsafe_atom_key/1
  43. ]).
  44. %% connectivity check
  45. -export([ http_connectivity/1
  46. , http_connectivity/2
  47. , tcp_connectivity/2
  48. , tcp_connectivity/3
  49. ]).
  50. -export([ now_ms/0
  51. , can_topic_match_oneof/2
  52. ]).
  53. -export([cluster_call/3]).
  54. -compile({no_auto_import,
  55. [ float/1
  56. ]}).
  57. -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
  58. -define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF
  59. -type(uri_string() :: iodata()).
  60. -type(tmpl_token() :: list({var, binary()} | {str, binary()})).
  61. -type(tmpl_cmd() :: list(tmpl_token())).
  62. -type(prepare_statement_key() :: binary()).
  63. %% preprocess template string with place holders
  64. -spec(preproc_tmpl(binary()) -> tmpl_token()).
  65. preproc_tmpl(Str) ->
  66. emqx_placeholder:preproc_tmpl(Str).
  67. -spec(proc_tmpl(tmpl_token(), map()) -> binary()).
  68. proc_tmpl(Tokens, Data) ->
  69. emqx_placeholder:proc_tmpl(Tokens, Data).
  70. -spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()).
  71. proc_tmpl(Tokens, Data, Opts) ->
  72. emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
  73. -spec(preproc_cmd(binary()) -> tmpl_cmd()).
  74. preproc_cmd(Str) ->
  75. emqx_placeholder:preproc_cmd(Str).
  76. -spec(proc_cmd([tmpl_token()], map()) -> binary() | list()).
  77. proc_cmd(Tokens, Data) ->
  78. emqx_placeholder:proc_cmd(Tokens, Data).
  79. -spec(proc_cmd([tmpl_token()], map(), map()) -> list()).
  80. proc_cmd(Tokens, Data, Opts) ->
  81. emqx_placeholder:proc_cmd(Tokens, Data, Opts).
  82. %% preprocess SQL with place holders
  83. -spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}).
  84. preproc_sql(Sql) ->
  85. emqx_placeholder:preproc_sql(Sql).
  86. -spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n')
  87. -> {prepare_statement_key(), tmpl_token()}).
  88. preproc_sql(Sql, ReplaceWith) ->
  89. emqx_placeholder:preproc_sql(Sql, ReplaceWith).
  90. -spec(proc_sql(tmpl_token(), map()) -> list()).
  91. proc_sql(Tokens, Data) ->
  92. emqx_placeholder:proc_sql(Tokens, Data).
  93. -spec(proc_sql_param_str(tmpl_token(), map()) -> binary()).
  94. proc_sql_param_str(Tokens, Data) ->
  95. emqx_placeholder:proc_sql_param_str(Tokens, Data).
  96. -spec(proc_cql_param_str(tmpl_token(), map()) -> binary()).
  97. proc_cql_param_str(Tokens, Data) ->
  98. emqx_placeholder:proc_cql_param_str(Tokens, Data).
  99. unsafe_atom_key(Key) when is_atom(Key) ->
  100. Key;
  101. unsafe_atom_key(Key) when is_binary(Key) ->
  102. binary_to_atom(Key, utf8);
  103. unsafe_atom_key(Keys = [_Key | _]) ->
  104. [unsafe_atom_key(SubKey) || SubKey <- Keys];
  105. unsafe_atom_key(Key) ->
  106. error({invalid_key, Key}).
  107. atom_key(Key) when is_atom(Key) ->
  108. Key;
  109. atom_key(Key) when is_binary(Key) ->
  110. try binary_to_existing_atom(Key, utf8)
  111. catch error:badarg -> error({invalid_key, Key})
  112. end;
  113. atom_key(Keys = [_Key | _]) -> %% nested keys
  114. [atom_key(SubKey) || SubKey <- Keys];
  115. atom_key(Key) ->
  116. error({invalid_key, Key}).
  117. -spec(http_connectivity(uri_string()) -> ok | {error, Reason :: term()}).
  118. http_connectivity(Url) ->
  119. http_connectivity(Url, 3000).
  120. -spec(http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}).
  121. http_connectivity(Url, Timeout) ->
  122. case emqx_http_lib:uri_parse(Url) of
  123. {ok, #{host := Host, port := Port}} ->
  124. tcp_connectivity(Host, Port, Timeout);
  125. {error, Reason} ->
  126. {error, Reason}
  127. end.
  128. -spec tcp_connectivity(Host :: inet:socket_address() | inet:hostname(),
  129. Port :: inet:port_number())
  130. -> ok | {error, Reason :: term()}.
  131. tcp_connectivity(Host, Port) ->
  132. tcp_connectivity(Host, Port, 3000).
  133. -spec(tcp_connectivity(Host :: inet:socket_address() | inet:hostname(),
  134. Port :: inet:port_number(),
  135. Timeout :: integer())
  136. -> ok | {error, Reason :: term()}).
  137. tcp_connectivity(Host, Port, Timeout) ->
  138. case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
  139. {ok, Sock} -> gen_tcp:close(Sock), ok;
  140. {error, Reason} -> {error, Reason}
  141. end.
  142. str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
  143. str(Num) when is_number(Num) -> number_to_list(Num);
  144. str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
  145. str(Map) when is_map(Map) -> binary_to_list(emqx_json:encode(Map));
  146. str(List) when is_list(List) ->
  147. case io_lib:printable_list(List) of
  148. true -> List;
  149. false -> binary_to_list(emqx_json:encode(List))
  150. end;
  151. str(Data) -> error({invalid_str, Data}).
  152. utf8_bin(Str) when is_binary(Str); is_list(Str) ->
  153. unicode:characters_to_binary(Str);
  154. utf8_bin(Str) ->
  155. unicode:characters_to_binary(bin(Str)).
  156. utf8_str(Str) when is_binary(Str); is_list(Str) ->
  157. unicode:characters_to_list(Str);
  158. utf8_str(Str) ->
  159. unicode:characters_to_list(str(Str)).
  160. bin(Bin) when is_binary(Bin) -> Bin;
  161. bin(Num) when is_number(Num) -> number_to_binary(Num);
  162. bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
  163. bin(Map) when is_map(Map) -> emqx_json:encode(Map);
  164. bin(List) when is_list(List) ->
  165. case io_lib:printable_list(List) of
  166. true -> list_to_binary(List);
  167. false -> emqx_json:encode(List)
  168. end;
  169. bin(Data) -> error({invalid_bin, Data}).
  170. int(List) when is_list(List) ->
  171. try list_to_integer(List)
  172. catch error:badarg ->
  173. int(list_to_float(List))
  174. end;
  175. int(Bin) when is_binary(Bin) ->
  176. try binary_to_integer(Bin)
  177. catch error:badarg ->
  178. int(binary_to_float(Bin))
  179. end;
  180. int(Int) when is_integer(Int) -> Int;
  181. int(Float) when is_float(Float) -> erlang:floor(Float);
  182. int(true) -> 1;
  183. int(false) -> 0;
  184. int(Data) -> error({invalid_number, Data}).
  185. float(List) when is_list(List) ->
  186. try list_to_float(List)
  187. catch error:badarg ->
  188. float(list_to_integer(List))
  189. end;
  190. float(Bin) when is_binary(Bin) ->
  191. try binary_to_float(Bin)
  192. catch error:badarg ->
  193. float(binary_to_integer(Bin))
  194. end;
  195. float(Num) when is_number(Num) -> erlang:float(Num);
  196. float(Data) -> error({invalid_number, Data}).
  197. map(Bin) when is_binary(Bin) ->
  198. case emqx_json:decode(Bin, [return_maps]) of
  199. Map = #{} -> Map;
  200. _ -> error({invalid_map, Bin})
  201. end;
  202. map(List) when is_list(List) -> maps:from_list(List);
  203. map(Map) when is_map(Map) -> Map;
  204. map(Data) -> error({invalid_map, Data}).
  205. bool(Bool) when Bool == true;
  206. Bool == <<"true">>;
  207. Bool == 1 -> true;
  208. bool(Bool) when Bool == false;
  209. Bool == <<"false">>;
  210. Bool == 0 -> false;
  211. bool(Bool) -> error({invalid_boolean, Bool}).
  212. number_to_binary(Int) when is_integer(Int) ->
  213. integer_to_binary(Int);
  214. number_to_binary(Float) when is_float(Float) ->
  215. float_to_binary(Float, [{decimals, 10}, compact]).
  216. number_to_list(Int) when is_integer(Int) ->
  217. integer_to_list(Int);
  218. number_to_list(Float) when is_float(Float) ->
  219. float_to_list(Float, [{decimals, 10}, compact]).
  220. now_ms() ->
  221. erlang:system_time(millisecond).
  222. can_topic_match_oneof(Topic, Filters) ->
  223. lists:any(fun(Fltr) ->
  224. emqx_topic:match(Topic, Fltr)
  225. end, Filters).
  226. cluster_call(Module, Func, Args) ->
  227. {ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args),
  228. Result.