emqx_plugin_libs_rule.erl 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugin_libs_rule).
  17. -elvis([{elvis_style, god_modules, disable}]).
  18. %% preprocess and process template string with place holders
  19. -export([
  20. preproc_tmpl/1,
  21. proc_tmpl/2,
  22. proc_tmpl/3,
  23. preproc_cmd/1,
  24. proc_cmd/2,
  25. proc_cmd/3,
  26. preproc_sql/1,
  27. preproc_sql/2,
  28. proc_sql/2,
  29. proc_sql_param_str/2,
  30. proc_cql_param_str/2,
  31. split_insert_sql/1,
  32. detect_sql_type/1
  33. ]).
  34. %% type converting
  35. -export([
  36. str/1,
  37. bin/1,
  38. bool/1,
  39. int/1,
  40. float/1,
  41. float2str/2,
  42. map/1,
  43. utf8_bin/1,
  44. utf8_str/1,
  45. number_to_binary/1,
  46. atom_key/1,
  47. unsafe_atom_key/1
  48. ]).
  49. %% connectivity check
  50. -export([
  51. http_connectivity/1,
  52. http_connectivity/2,
  53. tcp_connectivity/2,
  54. tcp_connectivity/3
  55. ]).
  56. -export([
  57. now_ms/0,
  58. can_topic_match_oneof/2
  59. ]).
  60. -compile({no_auto_import, [float/1]}).
  61. -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
  62. %% Space and CRLF
  63. -define(EX_WITHE_CHARS, "\\s").
  64. -type uri_string() :: iodata().
  65. -type tmpl_token() :: list({var, binary()} | {str, binary()}).
  66. -type tmpl_cmd() :: list(tmpl_token()).
  67. -type prepare_statement_key() :: binary().
  68. %% preprocess template string with place holders
  69. -spec preproc_tmpl(binary()) -> tmpl_token().
  70. preproc_tmpl(Str) ->
  71. emqx_placeholder:preproc_tmpl(Str).
  72. -spec proc_tmpl(tmpl_token(), map()) -> binary().
  73. proc_tmpl(Tokens, Data) ->
  74. emqx_placeholder:proc_tmpl(Tokens, Data).
  75. -spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list().
  76. proc_tmpl(Tokens, Data, Opts) ->
  77. emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
  78. -spec preproc_cmd(binary()) -> tmpl_cmd().
  79. preproc_cmd(Str) ->
  80. emqx_placeholder:preproc_cmd(Str).
  81. -spec proc_cmd([tmpl_token()], map()) -> binary() | list().
  82. proc_cmd(Tokens, Data) ->
  83. emqx_placeholder:proc_cmd(Tokens, Data).
  84. -spec proc_cmd([tmpl_token()], map(), map()) -> list().
  85. proc_cmd(Tokens, Data, Opts) ->
  86. emqx_placeholder:proc_cmd(Tokens, Data, Opts).
  87. %% preprocess SQL with place holders
  88. -spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}.
  89. preproc_sql(Sql) ->
  90. emqx_placeholder:preproc_sql(Sql).
  91. -spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n') ->
  92. {prepare_statement_key(), tmpl_token()}.
  93. preproc_sql(Sql, ReplaceWith) ->
  94. emqx_placeholder:preproc_sql(Sql, ReplaceWith).
  95. -spec proc_sql(tmpl_token(), map()) -> list().
  96. proc_sql(Tokens, Data) ->
  97. emqx_placeholder:proc_sql(Tokens, Data).
  98. -spec proc_sql_param_str(tmpl_token(), map()) -> binary().
  99. proc_sql_param_str(Tokens, Data) ->
  100. emqx_placeholder:proc_sql_param_str(Tokens, Data).
  101. -spec proc_cql_param_str(tmpl_token(), map()) -> binary().
  102. proc_cql_param_str(Tokens, Data) ->
  103. emqx_placeholder:proc_cql_param_str(Tokens, Data).
  104. %% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
  105. -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
  106. InsertSQL :: binary(),
  107. Params :: binary().
  108. split_insert_sql(SQL) ->
  109. case re:split(SQL, "((?i)values)", [{return, binary}]) of
  110. [Part1, _, Part3] ->
  111. case string:trim(Part1, leading) of
  112. <<"insert", _/binary>> = InsertSQL ->
  113. {ok, {InsertSQL, Part3}};
  114. <<"INSERT", _/binary>> = InsertSQL ->
  115. {ok, {InsertSQL, Part3}};
  116. _ ->
  117. {error, not_insert_sql}
  118. end;
  119. _ ->
  120. {error, not_insert_sql}
  121. end.
  122. -spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when
  123. Type :: insert | select.
  124. detect_sql_type(SQL) ->
  125. case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of
  126. {match, [First]} ->
  127. Types = [select, insert],
  128. PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types],
  129. LowFirst = string:lowercase(First),
  130. case proplists:lookup(LowFirst, PropTypes) of
  131. {LowFirst, Type} ->
  132. {ok, Type};
  133. _ ->
  134. {error, invalid_sql}
  135. end;
  136. _ ->
  137. {error, invalid_sql}
  138. end.
  139. unsafe_atom_key(Key) when is_atom(Key) ->
  140. Key;
  141. unsafe_atom_key(Key) when is_binary(Key) ->
  142. binary_to_atom(Key, utf8);
  143. unsafe_atom_key(Keys = [_Key | _]) ->
  144. [unsafe_atom_key(SubKey) || SubKey <- Keys];
  145. unsafe_atom_key(Key) ->
  146. error({invalid_key, Key}).
  147. atom_key(Key) when is_atom(Key) ->
  148. Key;
  149. atom_key(Key) when is_binary(Key) ->
  150. try
  151. binary_to_existing_atom(Key, utf8)
  152. catch
  153. error:badarg -> error({invalid_key, Key})
  154. end;
  155. %% nested keys
  156. atom_key(Keys = [_Key | _]) ->
  157. [atom_key(SubKey) || SubKey <- Keys];
  158. atom_key(Key) ->
  159. error({invalid_key, Key}).
  160. -spec http_connectivity(uri_string()) -> ok | {error, Reason :: term()}.
  161. http_connectivity(Url) ->
  162. http_connectivity(Url, 3000).
  163. -spec http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}.
  164. http_connectivity(Url, Timeout) ->
  165. case emqx_http_lib:uri_parse(Url) of
  166. {ok, #{host := Host, port := Port}} ->
  167. tcp_connectivity(Host, Port, Timeout);
  168. {error, Reason} ->
  169. {error, Reason}
  170. end.
  171. -spec tcp_connectivity(
  172. Host :: inet:socket_address() | inet:hostname(),
  173. Port :: inet:port_number()
  174. ) ->
  175. ok | {error, Reason :: term()}.
  176. tcp_connectivity(Host, Port) ->
  177. tcp_connectivity(Host, Port, 3000).
  178. -spec tcp_connectivity(
  179. Host :: inet:socket_address() | inet:hostname(),
  180. Port :: inet:port_number(),
  181. Timeout :: integer()
  182. ) ->
  183. ok | {error, Reason :: term()}.
  184. tcp_connectivity(Host, Port, Timeout) ->
  185. case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
  186. {ok, Sock} ->
  187. gen_tcp:close(Sock),
  188. ok;
  189. {error, Reason} ->
  190. {error, Reason}
  191. end.
  192. str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
  193. str(Num) when is_number(Num) -> number_to_list(Num);
  194. str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
  195. str(Map) when is_map(Map) -> binary_to_list(emqx_json:encode(Map));
  196. str(List) when is_list(List) ->
  197. case io_lib:printable_list(List) of
  198. true -> List;
  199. false -> binary_to_list(emqx_json:encode(List))
  200. end;
  201. str(Data) ->
  202. error({invalid_str, Data}).
  203. utf8_bin(Str) when is_binary(Str); is_list(Str) ->
  204. unicode:characters_to_binary(Str);
  205. utf8_bin(Str) ->
  206. unicode:characters_to_binary(bin(Str)).
  207. utf8_str(Str) when is_binary(Str); is_list(Str) ->
  208. unicode:characters_to_list(Str);
  209. utf8_str(Str) ->
  210. unicode:characters_to_list(str(Str)).
  211. bin(Bin) when is_binary(Bin) -> Bin;
  212. bin(Num) when is_number(Num) -> number_to_binary(Num);
  213. bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
  214. bin(Map) when is_map(Map) -> emqx_json:encode(Map);
  215. bin(List) when is_list(List) ->
  216. case io_lib:printable_list(List) of
  217. true -> list_to_binary(List);
  218. false -> emqx_json:encode(List)
  219. end;
  220. bin(Data) ->
  221. error({invalid_bin, Data}).
  222. int(List) when is_list(List) ->
  223. try
  224. list_to_integer(List)
  225. catch
  226. error:badarg ->
  227. int(list_to_float(List))
  228. end;
  229. int(Bin) when is_binary(Bin) ->
  230. try
  231. binary_to_integer(Bin)
  232. catch
  233. error:badarg ->
  234. int(binary_to_float(Bin))
  235. end;
  236. int(Int) when is_integer(Int) -> Int;
  237. int(Float) when is_float(Float) -> erlang:floor(Float);
  238. int(true) ->
  239. 1;
  240. int(false) ->
  241. 0;
  242. int(Data) ->
  243. error({invalid_number, Data}).
  244. float(List) when is_list(List) ->
  245. try
  246. list_to_float(List)
  247. catch
  248. error:badarg ->
  249. float(list_to_integer(List))
  250. end;
  251. float(Bin) when is_binary(Bin) ->
  252. try
  253. binary_to_float(Bin)
  254. catch
  255. error:badarg ->
  256. float(binary_to_integer(Bin))
  257. end;
  258. float(Num) when is_number(Num) -> erlang:float(Num);
  259. float(Data) ->
  260. error({invalid_number, Data}).
  261. float2str(Float, Precision) when is_float(Float) and is_integer(Precision) ->
  262. float_to_binary(Float, [{decimals, Precision}, compact]).
  263. map(Bin) when is_binary(Bin) ->
  264. case emqx_json:decode(Bin, [return_maps]) of
  265. Map = #{} -> Map;
  266. _ -> error({invalid_map, Bin})
  267. end;
  268. map(List) when is_list(List) -> maps:from_list(List);
  269. map(Map) when is_map(Map) -> Map;
  270. map(Data) ->
  271. error({invalid_map, Data}).
  272. bool(Bool) when
  273. Bool == true;
  274. Bool == <<"true">>;
  275. Bool == 1
  276. ->
  277. true;
  278. bool(Bool) when
  279. Bool == false;
  280. Bool == <<"false">>;
  281. Bool == 0
  282. ->
  283. false;
  284. bool(Bool) ->
  285. error({invalid_boolean, Bool}).
  286. number_to_binary(Int) when is_integer(Int) ->
  287. integer_to_binary(Int);
  288. number_to_binary(Float) when is_float(Float) ->
  289. float_to_binary(Float, [{decimals, 10}, compact]).
  290. number_to_list(Int) when is_integer(Int) ->
  291. integer_to_list(Int);
  292. number_to_list(Float) when is_float(Float) ->
  293. float_to_list(Float, [{decimals, 10}, compact]).
  294. now_ms() ->
  295. erlang:system_time(millisecond).
  296. can_topic_match_oneof(Topic, Filters) ->
  297. lists:any(
  298. fun(Fltr) ->
  299. emqx_topic:match(Topic, Fltr)
  300. end,
  301. Filters
  302. ).