emqx_plugin_libs_rule.erl 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugin_libs_rule).
  17. -elvis([{elvis_style, god_modules, disable}]).
  18. %% preprocess and process template string with place holders
  19. -export([
  20. preproc_tmpl/1,
  21. proc_tmpl/2,
  22. proc_tmpl/3,
  23. preproc_cmd/1,
  24. proc_cmd/2,
  25. proc_cmd/3,
  26. preproc_sql/1,
  27. preproc_sql/2,
  28. proc_sql/2,
  29. proc_sql_param_str/2,
  30. proc_cql_param_str/2,
  31. split_insert_sql/1,
  32. detect_sql_type/1
  33. ]).
  34. %% type converting
  35. -export([
  36. str/1,
  37. bin/1,
  38. bool/1,
  39. int/1,
  40. float/1,
  41. float2str/2,
  42. map/1,
  43. utf8_bin/1,
  44. utf8_str/1,
  45. number_to_binary/1,
  46. atom_key/1,
  47. unsafe_atom_key/1
  48. ]).
  49. %% connectivity check
  50. -export([
  51. http_connectivity/1,
  52. http_connectivity/2,
  53. tcp_connectivity/2,
  54. tcp_connectivity/3
  55. ]).
  56. -export([
  57. now_ms/0,
  58. can_topic_match_oneof/2
  59. ]).
  60. -export_type([tmpl_token/0]).
  61. -compile({no_auto_import, [float/1]}).
  62. -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
  63. %% Space and CRLF
  64. -define(EX_WITHE_CHARS, "\\s").
  65. -type uri_string() :: iodata().
  66. -type tmpl_token() :: list({var, binary()} | {str, binary()}).
  67. -type tmpl_cmd() :: list(tmpl_token()).
  68. -type prepare_statement_key() :: binary().
  69. %% preprocess template string with place holders
  70. -spec preproc_tmpl(binary()) -> tmpl_token().
  71. preproc_tmpl(Str) ->
  72. emqx_placeholder:preproc_tmpl(Str).
  73. -spec proc_tmpl(tmpl_token(), map()) -> binary().
  74. proc_tmpl(Tokens, Data) ->
  75. emqx_placeholder:proc_tmpl(Tokens, Data).
  76. -spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list().
  77. proc_tmpl(Tokens, Data, Opts) ->
  78. emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
  79. -spec preproc_cmd(binary()) -> tmpl_cmd().
  80. preproc_cmd(Str) ->
  81. emqx_placeholder:preproc_cmd(Str).
  82. -spec proc_cmd([tmpl_token()], map()) -> binary() | list().
  83. proc_cmd(Tokens, Data) ->
  84. emqx_placeholder:proc_cmd(Tokens, Data).
  85. -spec proc_cmd([tmpl_token()], map(), map()) -> list().
  86. proc_cmd(Tokens, Data, Opts) ->
  87. emqx_placeholder:proc_cmd(Tokens, Data, Opts).
  88. %% preprocess SQL with place holders
  89. -spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}.
  90. preproc_sql(Sql) ->
  91. emqx_placeholder:preproc_sql(Sql).
  92. -spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n') ->
  93. {prepare_statement_key(), tmpl_token()}.
  94. preproc_sql(Sql, ReplaceWith) ->
  95. emqx_placeholder:preproc_sql(Sql, ReplaceWith).
  96. -spec proc_sql(tmpl_token(), map()) -> list().
  97. proc_sql(Tokens, Data) ->
  98. emqx_placeholder:proc_sql(Tokens, Data).
  99. -spec proc_sql_param_str(tmpl_token(), map()) -> binary().
  100. proc_sql_param_str(Tokens, Data) ->
  101. emqx_placeholder:proc_sql_param_str(Tokens, Data).
  102. -spec proc_cql_param_str(tmpl_token(), map()) -> binary().
  103. proc_cql_param_str(Tokens, Data) ->
  104. emqx_placeholder:proc_cql_param_str(Tokens, Data).
  105. %% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
  106. -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
  107. InsertSQL :: binary(),
  108. Params :: binary().
  109. split_insert_sql(SQL) ->
  110. case re:split(SQL, "((?i)values)", [{return, binary}]) of
  111. [Part1, _, Part3] ->
  112. case string:trim(Part1, leading) of
  113. <<"insert", _/binary>> = InsertSQL ->
  114. {ok, {InsertSQL, Part3}};
  115. <<"INSERT", _/binary>> = InsertSQL ->
  116. {ok, {InsertSQL, Part3}};
  117. _ ->
  118. {error, not_insert_sql}
  119. end;
  120. _ ->
  121. {error, not_insert_sql}
  122. end.
  123. -spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when
  124. Type :: insert | select.
  125. detect_sql_type(SQL) ->
  126. case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of
  127. {match, [First]} ->
  128. Types = [select, insert],
  129. PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types],
  130. LowFirst = string:lowercase(First),
  131. case proplists:lookup(LowFirst, PropTypes) of
  132. {LowFirst, Type} ->
  133. {ok, Type};
  134. _ ->
  135. {error, invalid_sql}
  136. end;
  137. _ ->
  138. {error, invalid_sql}
  139. end.
  140. unsafe_atom_key(Key) when is_atom(Key) ->
  141. Key;
  142. unsafe_atom_key(Key) when is_binary(Key) ->
  143. binary_to_atom(Key, utf8);
  144. unsafe_atom_key(Keys = [_Key | _]) ->
  145. [unsafe_atom_key(SubKey) || SubKey <- Keys];
  146. unsafe_atom_key(Key) ->
  147. error({invalid_key, Key}).
  148. atom_key(Key) when is_atom(Key) ->
  149. Key;
  150. atom_key(Key) when is_binary(Key) ->
  151. try
  152. binary_to_existing_atom(Key, utf8)
  153. catch
  154. error:badarg -> error({invalid_key, Key})
  155. end;
  156. %% nested keys
  157. atom_key(Keys = [_Key | _]) ->
  158. [atom_key(SubKey) || SubKey <- Keys];
  159. atom_key(Key) ->
  160. error({invalid_key, Key}).
  161. -spec http_connectivity(uri_string()) -> ok | {error, Reason :: term()}.
  162. http_connectivity(Url) ->
  163. http_connectivity(Url, 3000).
  164. -spec http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}.
  165. http_connectivity(Url, Timeout) ->
  166. case emqx_http_lib:uri_parse(Url) of
  167. {ok, #{host := Host, port := Port}} ->
  168. tcp_connectivity(Host, Port, Timeout);
  169. {error, Reason} ->
  170. {error, Reason}
  171. end.
  172. -spec tcp_connectivity(
  173. Host :: inet:socket_address() | inet:hostname(),
  174. Port :: inet:port_number()
  175. ) ->
  176. ok | {error, Reason :: term()}.
  177. tcp_connectivity(Host, Port) ->
  178. tcp_connectivity(Host, Port, 3000).
  179. -spec tcp_connectivity(
  180. Host :: inet:socket_address() | inet:hostname(),
  181. Port :: inet:port_number(),
  182. Timeout :: integer()
  183. ) ->
  184. ok | {error, Reason :: term()}.
  185. tcp_connectivity(Host, Port, Timeout) ->
  186. case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
  187. {ok, Sock} ->
  188. gen_tcp:close(Sock),
  189. ok;
  190. {error, Reason} ->
  191. {error, Reason}
  192. end.
  193. str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
  194. str(Num) when is_number(Num) -> number_to_list(Num);
  195. str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
  196. str(Map) when is_map(Map) -> binary_to_list(emqx_json:encode(Map));
  197. str(List) when is_list(List) ->
  198. case io_lib:printable_list(List) of
  199. true -> List;
  200. false -> binary_to_list(emqx_json:encode(List))
  201. end;
  202. str(Data) ->
  203. error({invalid_str, Data}).
  204. utf8_bin(Str) when is_binary(Str); is_list(Str) ->
  205. unicode:characters_to_binary(Str);
  206. utf8_bin(Str) ->
  207. unicode:characters_to_binary(bin(Str)).
  208. utf8_str(Str) when is_binary(Str); is_list(Str) ->
  209. unicode:characters_to_list(Str);
  210. utf8_str(Str) ->
  211. unicode:characters_to_list(str(Str)).
  212. bin(Bin) when is_binary(Bin) -> Bin;
  213. bin(Num) when is_number(Num) -> number_to_binary(Num);
  214. bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
  215. bin(Map) when is_map(Map) -> emqx_json:encode(Map);
  216. bin(List) when is_list(List) ->
  217. case io_lib:printable_list(List) of
  218. true -> list_to_binary(List);
  219. false -> emqx_json:encode(List)
  220. end;
  221. bin(Data) ->
  222. error({invalid_bin, Data}).
  223. int(List) when is_list(List) ->
  224. try
  225. list_to_integer(List)
  226. catch
  227. error:badarg ->
  228. int(list_to_float(List))
  229. end;
  230. int(Bin) when is_binary(Bin) ->
  231. try
  232. binary_to_integer(Bin)
  233. catch
  234. error:badarg ->
  235. int(binary_to_float(Bin))
  236. end;
  237. int(Int) when is_integer(Int) -> Int;
  238. int(Float) when is_float(Float) -> erlang:floor(Float);
  239. int(true) ->
  240. 1;
  241. int(false) ->
  242. 0;
  243. int(Data) ->
  244. error({invalid_number, Data}).
  245. float(List) when is_list(List) ->
  246. try
  247. list_to_float(List)
  248. catch
  249. error:badarg ->
  250. float(list_to_integer(List))
  251. end;
  252. float(Bin) when is_binary(Bin) ->
  253. try
  254. binary_to_float(Bin)
  255. catch
  256. error:badarg ->
  257. float(binary_to_integer(Bin))
  258. end;
  259. float(Num) when is_number(Num) -> erlang:float(Num);
  260. float(Data) ->
  261. error({invalid_number, Data}).
  262. float2str(Float, Precision) when is_float(Float) and is_integer(Precision) ->
  263. float_to_binary(Float, [{decimals, Precision}, compact]).
  264. map(Bin) when is_binary(Bin) ->
  265. case emqx_json:decode(Bin, [return_maps]) of
  266. Map = #{} -> Map;
  267. _ -> error({invalid_map, Bin})
  268. end;
  269. map(List) when is_list(List) -> maps:from_list(List);
  270. map(Map) when is_map(Map) -> Map;
  271. map(Data) ->
  272. error({invalid_map, Data}).
  273. bool(Bool) when
  274. Bool == true;
  275. Bool == <<"true">>;
  276. Bool == 1
  277. ->
  278. true;
  279. bool(Bool) when
  280. Bool == false;
  281. Bool == <<"false">>;
  282. Bool == 0
  283. ->
  284. false;
  285. bool(Bool) ->
  286. error({invalid_boolean, Bool}).
  287. number_to_binary(Int) when is_integer(Int) ->
  288. integer_to_binary(Int);
  289. number_to_binary(Float) when is_float(Float) ->
  290. float_to_binary(Float, [{decimals, 10}, compact]).
  291. number_to_list(Int) when is_integer(Int) ->
  292. integer_to_list(Int);
  293. number_to_list(Float) when is_float(Float) ->
  294. float_to_list(Float, [{decimals, 10}, compact]).
  295. now_ms() ->
  296. erlang:system_time(millisecond).
  297. can_topic_match_oneof(Topic, Filters) ->
  298. lists:any(
  299. fun(Fltr) ->
  300. emqx_topic:match(Topic, Fltr)
  301. end,
  302. Filters
  303. ).