emqx_plugin_libs_rule.erl 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugin_libs_rule).
  17. -elvis([{elvis_style, god_modules, disable}]).
  18. %% preprocess and process template string with place holders
  19. -export([
  20. preproc_tmpl/1,
  21. proc_tmpl/2,
  22. proc_tmpl/3,
  23. preproc_cmd/1,
  24. proc_cmd/2,
  25. proc_cmd/3,
  26. preproc_sql/1,
  27. preproc_sql/2,
  28. proc_sql/2,
  29. proc_sql_param_str/2,
  30. proc_cql_param_str/2,
  31. split_insert_sql/1,
  32. detect_sql_type/1,
  33. proc_batch_sql/3
  34. ]).
  35. %% type converting
  36. -export([
  37. str/1,
  38. bin/1,
  39. bool/1,
  40. int/1,
  41. float/1,
  42. float2str/2,
  43. map/1,
  44. utf8_bin/1,
  45. utf8_str/1,
  46. number_to_binary/1,
  47. atom_key/1,
  48. unsafe_atom_key/1
  49. ]).
  50. %% connectivity check
  51. -export([
  52. http_connectivity/1,
  53. http_connectivity/2,
  54. tcp_connectivity/2,
  55. tcp_connectivity/3
  56. ]).
  57. -export([
  58. now_ms/0,
  59. can_topic_match_oneof/2
  60. ]).
  61. -export_type([tmpl_token/0]).
  62. -compile({no_auto_import, [float/1]}).
  63. -type uri_string() :: iodata().
  64. -type tmpl_token() :: list({var, binary()} | {str, binary()}).
  65. -type tmpl_cmd() :: list(tmpl_token()).
  66. -type prepare_statement_key() :: binary().
  67. %% preprocess template string with place holders
  68. -spec preproc_tmpl(binary()) -> tmpl_token().
  69. preproc_tmpl(Str) ->
  70. emqx_placeholder:preproc_tmpl(Str).
  71. -spec proc_tmpl(tmpl_token(), map()) -> binary().
  72. proc_tmpl(Tokens, Data) ->
  73. emqx_placeholder:proc_tmpl(Tokens, Data).
  74. -spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list().
  75. proc_tmpl(Tokens, Data, Opts) ->
  76. emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
  77. -spec preproc_cmd(binary()) -> tmpl_cmd().
  78. preproc_cmd(Str) ->
  79. emqx_placeholder:preproc_cmd(Str).
  80. -spec proc_cmd([tmpl_token()], map()) -> binary() | list().
  81. proc_cmd(Tokens, Data) ->
  82. emqx_placeholder:proc_cmd(Tokens, Data).
  83. -spec proc_cmd([tmpl_token()], map(), map()) -> list().
  84. proc_cmd(Tokens, Data, Opts) ->
  85. emqx_placeholder:proc_cmd(Tokens, Data, Opts).
  86. %% preprocess SQL with place holders
  87. -spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}.
  88. preproc_sql(Sql) ->
  89. emqx_placeholder:preproc_sql(Sql).
  90. -spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n') ->
  91. {prepare_statement_key(), tmpl_token()}.
  92. preproc_sql(Sql, ReplaceWith) ->
  93. emqx_placeholder:preproc_sql(Sql, ReplaceWith).
  94. -spec proc_sql(tmpl_token(), map()) -> list().
  95. proc_sql(Tokens, Data) ->
  96. emqx_placeholder:proc_sql(Tokens, Data).
  97. -spec proc_sql_param_str(tmpl_token(), map()) -> binary().
  98. proc_sql_param_str(Tokens, Data) ->
  99. emqx_placeholder:proc_sql_param_str(Tokens, Data).
  100. -spec proc_cql_param_str(tmpl_token(), map()) -> binary().
  101. proc_cql_param_str(Tokens, Data) ->
  102. emqx_placeholder:proc_cql_param_str(Tokens, Data).
  103. %% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
  104. -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
  105. InsertSQL :: binary(),
  106. Params :: binary().
  107. split_insert_sql(SQL) ->
  108. case re:split(SQL, "((?i)values)", [{return, binary}]) of
  109. [Part1, _, Part3] ->
  110. case string:trim(Part1, leading) of
  111. <<"insert", _/binary>> = InsertSQL ->
  112. {ok, {InsertSQL, Part3}};
  113. <<"INSERT", _/binary>> = InsertSQL ->
  114. {ok, {InsertSQL, Part3}};
  115. _ ->
  116. {error, not_insert_sql}
  117. end;
  118. _ ->
  119. {error, not_insert_sql}
  120. end.
  121. -spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when
  122. Type :: insert | select.
  123. detect_sql_type(SQL) ->
  124. case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of
  125. {match, [First]} ->
  126. Types = [select, insert],
  127. PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types],
  128. LowFirst = string:lowercase(First),
  129. case proplists:lookup(LowFirst, PropTypes) of
  130. {LowFirst, Type} ->
  131. {ok, Type};
  132. _ ->
  133. {error, invalid_sql}
  134. end;
  135. _ ->
  136. {error, invalid_sql}
  137. end.
  138. -spec proc_batch_sql(
  139. BatchReqs :: list({atom(), map()}),
  140. InsertPart :: binary(),
  141. Tokens :: tmpl_token()
  142. ) -> InsertSQL :: binary().
  143. proc_batch_sql(BatchReqs, InsertPart, Tokens) ->
  144. ValuesPart = erlang:iolist_to_binary(
  145. lists:join($,, [
  146. proc_sql_param_str(Tokens, Msg)
  147. || {_, Msg} <- BatchReqs
  148. ])
  149. ),
  150. <<InsertPart/binary, " values ", ValuesPart/binary>>.
  151. unsafe_atom_key(Key) when is_atom(Key) ->
  152. Key;
  153. unsafe_atom_key(Key) when is_binary(Key) ->
  154. binary_to_atom(Key, utf8);
  155. unsafe_atom_key(Keys = [_Key | _]) ->
  156. [unsafe_atom_key(SubKey) || SubKey <- Keys];
  157. unsafe_atom_key(Key) ->
  158. error({invalid_key, Key}).
  159. atom_key(Key) when is_atom(Key) ->
  160. Key;
  161. atom_key(Key) when is_binary(Key) ->
  162. try
  163. binary_to_existing_atom(Key, utf8)
  164. catch
  165. error:badarg -> error({invalid_key, Key})
  166. end;
  167. %% nested keys
  168. atom_key(Keys = [_Key | _]) ->
  169. [atom_key(SubKey) || SubKey <- Keys];
  170. atom_key(Key) ->
  171. error({invalid_key, Key}).
  172. -spec http_connectivity(uri_string()) -> ok | {error, Reason :: term()}.
  173. http_connectivity(Url) ->
  174. http_connectivity(Url, 3000).
  175. -spec http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}.
  176. http_connectivity(Url, Timeout) ->
  177. case emqx_http_lib:uri_parse(Url) of
  178. {ok, #{host := Host, port := Port}} ->
  179. tcp_connectivity(Host, Port, Timeout);
  180. {error, Reason} ->
  181. {error, Reason}
  182. end.
  183. -spec tcp_connectivity(
  184. Host :: inet:socket_address() | inet:hostname(),
  185. Port :: inet:port_number()
  186. ) ->
  187. ok | {error, Reason :: term()}.
  188. tcp_connectivity(Host, Port) ->
  189. tcp_connectivity(Host, Port, 3000).
  190. -spec tcp_connectivity(
  191. Host :: inet:socket_address() | inet:hostname(),
  192. Port :: inet:port_number(),
  193. Timeout :: integer()
  194. ) ->
  195. ok | {error, Reason :: term()}.
  196. tcp_connectivity(Host, Port, Timeout) ->
  197. case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
  198. {ok, Sock} ->
  199. gen_tcp:close(Sock),
  200. ok;
  201. {error, Reason} ->
  202. {error, Reason}
  203. end.
  204. str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
  205. str(Num) when is_number(Num) -> number_to_list(Num);
  206. str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
  207. str(Map) when is_map(Map) -> binary_to_list(emqx_json:encode(Map));
  208. str(List) when is_list(List) ->
  209. case io_lib:printable_list(List) of
  210. true -> List;
  211. false -> binary_to_list(emqx_json:encode(List))
  212. end;
  213. str(Data) ->
  214. error({invalid_str, Data}).
  215. utf8_bin(Str) when is_binary(Str); is_list(Str) ->
  216. unicode:characters_to_binary(Str);
  217. utf8_bin(Str) ->
  218. unicode:characters_to_binary(bin(Str)).
  219. utf8_str(Str) when is_binary(Str); is_list(Str) ->
  220. unicode:characters_to_list(Str);
  221. utf8_str(Str) ->
  222. unicode:characters_to_list(str(Str)).
  223. bin(Bin) when is_binary(Bin) -> Bin;
  224. bin(Num) when is_number(Num) -> number_to_binary(Num);
  225. bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
  226. bin(Map) when is_map(Map) -> emqx_json:encode(Map);
  227. bin(List) when is_list(List) ->
  228. case io_lib:printable_list(List) of
  229. true -> list_to_binary(List);
  230. false -> emqx_json:encode(List)
  231. end;
  232. bin(Data) ->
  233. error({invalid_bin, Data}).
  234. int(List) when is_list(List) ->
  235. try
  236. list_to_integer(List)
  237. catch
  238. error:badarg ->
  239. int(list_to_float(List))
  240. end;
  241. int(Bin) when is_binary(Bin) ->
  242. try
  243. binary_to_integer(Bin)
  244. catch
  245. error:badarg ->
  246. int(binary_to_float(Bin))
  247. end;
  248. int(Int) when is_integer(Int) -> Int;
  249. int(Float) when is_float(Float) -> erlang:floor(Float);
  250. int(true) ->
  251. 1;
  252. int(false) ->
  253. 0;
  254. int(Data) ->
  255. error({invalid_number, Data}).
  256. float(List) when is_list(List) ->
  257. try
  258. list_to_float(List)
  259. catch
  260. error:badarg ->
  261. float(list_to_integer(List))
  262. end;
  263. float(Bin) when is_binary(Bin) ->
  264. try
  265. binary_to_float(Bin)
  266. catch
  267. error:badarg ->
  268. float(binary_to_integer(Bin))
  269. end;
  270. float(Num) when is_number(Num) -> erlang:float(Num);
  271. float(Data) ->
  272. error({invalid_number, Data}).
  273. float2str(Float, Precision) when is_float(Float) and is_integer(Precision) ->
  274. float_to_binary(Float, [{decimals, Precision}, compact]).
  275. map(Bin) when is_binary(Bin) ->
  276. case emqx_json:decode(Bin, [return_maps]) of
  277. Map = #{} -> Map;
  278. _ -> error({invalid_map, Bin})
  279. end;
  280. map(List) when is_list(List) -> maps:from_list(List);
  281. map(Map) when is_map(Map) -> Map;
  282. map(Data) ->
  283. error({invalid_map, Data}).
  284. bool(Bool) when
  285. Bool == true;
  286. Bool == <<"true">>;
  287. Bool == 1
  288. ->
  289. true;
  290. bool(Bool) when
  291. Bool == false;
  292. Bool == <<"false">>;
  293. Bool == 0
  294. ->
  295. false;
  296. bool(Bool) ->
  297. error({invalid_boolean, Bool}).
  298. number_to_binary(Int) when is_integer(Int) ->
  299. integer_to_binary(Int);
  300. number_to_binary(Float) when is_float(Float) ->
  301. float_to_binary(Float, [{decimals, 10}, compact]).
  302. number_to_list(Int) when is_integer(Int) ->
  303. integer_to_list(Int);
  304. number_to_list(Float) when is_float(Float) ->
  305. float_to_list(Float, [{decimals, 10}, compact]).
  306. now_ms() ->
  307. erlang:system_time(millisecond).
  308. can_topic_match_oneof(Topic, Filters) ->
  309. lists:any(
  310. fun(Fltr) ->
  311. emqx_topic:match(Topic, Fltr)
  312. end,
  313. Filters
  314. ).