emqx_plugin_libs_rule.erl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugin_libs_rule).
  17. -elvis([{elvis_style, god_modules, disable}]).
  18. %% preprocess and process template string with place holders
  19. -export([
  20. preproc_tmpl/1,
  21. proc_tmpl/2,
  22. proc_tmpl/3,
  23. preproc_cmd/1,
  24. proc_cmd/2,
  25. proc_cmd/3,
  26. preproc_sql/1,
  27. preproc_sql/2,
  28. proc_sql/2,
  29. proc_sql_param_str/2,
  30. proc_cql_param_str/2,
  31. split_insert_sql/1,
  32. detect_sql_type/1,
  33. proc_batch_sql/3
  34. ]).
  35. %% type converting
  36. -export([
  37. str/1,
  38. bin/1,
  39. bool/1,
  40. int/1,
  41. float/1,
  42. float2str/2,
  43. map/1,
  44. utf8_bin/1,
  45. utf8_str/1,
  46. number_to_binary/1,
  47. atom_key/1,
  48. unsafe_atom_key/1
  49. ]).
  50. %% connectivity check
  51. -export([
  52. http_connectivity/1,
  53. http_connectivity/2,
  54. tcp_connectivity/2,
  55. tcp_connectivity/3
  56. ]).
  57. -export([
  58. now_ms/0,
  59. can_topic_match_oneof/2
  60. ]).
  61. -export_type([tmpl_token/0]).
  62. -compile({no_auto_import, [float/1]}).
  63. -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
  64. %% Space and CRLF
  65. -define(EX_WITHE_CHARS, "\\s").
  66. -type uri_string() :: iodata().
  67. -type tmpl_token() :: list({var, binary()} | {str, binary()}).
  68. -type tmpl_cmd() :: list(tmpl_token()).
  69. -type prepare_statement_key() :: binary().
  70. %% preprocess template string with place holders
  71. -spec preproc_tmpl(binary()) -> tmpl_token().
  72. preproc_tmpl(Str) ->
  73. emqx_placeholder:preproc_tmpl(Str).
  74. -spec proc_tmpl(tmpl_token(), map()) -> binary().
  75. proc_tmpl(Tokens, Data) ->
  76. emqx_placeholder:proc_tmpl(Tokens, Data).
  77. -spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list().
  78. proc_tmpl(Tokens, Data, Opts) ->
  79. emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
  80. -spec preproc_cmd(binary()) -> tmpl_cmd().
  81. preproc_cmd(Str) ->
  82. emqx_placeholder:preproc_cmd(Str).
  83. -spec proc_cmd([tmpl_token()], map()) -> binary() | list().
  84. proc_cmd(Tokens, Data) ->
  85. emqx_placeholder:proc_cmd(Tokens, Data).
  86. -spec proc_cmd([tmpl_token()], map(), map()) -> list().
  87. proc_cmd(Tokens, Data, Opts) ->
  88. emqx_placeholder:proc_cmd(Tokens, Data, Opts).
  89. %% preprocess SQL with place holders
  90. -spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}.
  91. preproc_sql(Sql) ->
  92. emqx_placeholder:preproc_sql(Sql).
  93. -spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n') ->
  94. {prepare_statement_key(), tmpl_token()}.
  95. preproc_sql(Sql, ReplaceWith) ->
  96. emqx_placeholder:preproc_sql(Sql, ReplaceWith).
  97. -spec proc_sql(tmpl_token(), map()) -> list().
  98. proc_sql(Tokens, Data) ->
  99. emqx_placeholder:proc_sql(Tokens, Data).
  100. -spec proc_sql_param_str(tmpl_token(), map()) -> binary().
  101. proc_sql_param_str(Tokens, Data) ->
  102. emqx_placeholder:proc_sql_param_str(Tokens, Data).
  103. -spec proc_cql_param_str(tmpl_token(), map()) -> binary().
  104. proc_cql_param_str(Tokens, Data) ->
  105. emqx_placeholder:proc_cql_param_str(Tokens, Data).
  106. %% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
  107. -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
  108. InsertSQL :: binary(),
  109. Params :: binary().
  110. split_insert_sql(SQL) ->
  111. case re:split(SQL, "((?i)values)", [{return, binary}]) of
  112. [Part1, _, Part3] ->
  113. case string:trim(Part1, leading) of
  114. <<"insert", _/binary>> = InsertSQL ->
  115. {ok, {InsertSQL, Part3}};
  116. <<"INSERT", _/binary>> = InsertSQL ->
  117. {ok, {InsertSQL, Part3}};
  118. _ ->
  119. {error, not_insert_sql}
  120. end;
  121. _ ->
  122. {error, not_insert_sql}
  123. end.
  124. -spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when
  125. Type :: insert | select.
  126. detect_sql_type(SQL) ->
  127. case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of
  128. {match, [First]} ->
  129. Types = [select, insert],
  130. PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types],
  131. LowFirst = string:lowercase(First),
  132. case proplists:lookup(LowFirst, PropTypes) of
  133. {LowFirst, Type} ->
  134. {ok, Type};
  135. _ ->
  136. {error, invalid_sql}
  137. end;
  138. _ ->
  139. {error, invalid_sql}
  140. end.
  141. -spec proc_batch_sql(
  142. BatchReqs :: list({atom(), map()}),
  143. InsertPart :: binary(),
  144. Tokens :: tmpl_token()
  145. ) -> InsertSQL :: binary().
  146. proc_batch_sql(BatchReqs, InsertPart, Tokens) ->
  147. ValuesPart = erlang:iolist_to_binary(
  148. lists:join(", ", [
  149. emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Msg)
  150. || {_, Msg} <- BatchReqs
  151. ])
  152. ),
  153. <<InsertPart/binary, " values ", ValuesPart/binary>>.
  154. unsafe_atom_key(Key) when is_atom(Key) ->
  155. Key;
  156. unsafe_atom_key(Key) when is_binary(Key) ->
  157. binary_to_atom(Key, utf8);
  158. unsafe_atom_key(Keys = [_Key | _]) ->
  159. [unsafe_atom_key(SubKey) || SubKey <- Keys];
  160. unsafe_atom_key(Key) ->
  161. error({invalid_key, Key}).
  162. atom_key(Key) when is_atom(Key) ->
  163. Key;
  164. atom_key(Key) when is_binary(Key) ->
  165. try
  166. binary_to_existing_atom(Key, utf8)
  167. catch
  168. error:badarg -> error({invalid_key, Key})
  169. end;
  170. %% nested keys
  171. atom_key(Keys = [_Key | _]) ->
  172. [atom_key(SubKey) || SubKey <- Keys];
  173. atom_key(Key) ->
  174. error({invalid_key, Key}).
  175. -spec http_connectivity(uri_string()) -> ok | {error, Reason :: term()}.
  176. http_connectivity(Url) ->
  177. http_connectivity(Url, 3000).
  178. -spec http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}.
  179. http_connectivity(Url, Timeout) ->
  180. case emqx_http_lib:uri_parse(Url) of
  181. {ok, #{host := Host, port := Port}} ->
  182. tcp_connectivity(Host, Port, Timeout);
  183. {error, Reason} ->
  184. {error, Reason}
  185. end.
  186. -spec tcp_connectivity(
  187. Host :: inet:socket_address() | inet:hostname(),
  188. Port :: inet:port_number()
  189. ) ->
  190. ok | {error, Reason :: term()}.
  191. tcp_connectivity(Host, Port) ->
  192. tcp_connectivity(Host, Port, 3000).
  193. -spec tcp_connectivity(
  194. Host :: inet:socket_address() | inet:hostname(),
  195. Port :: inet:port_number(),
  196. Timeout :: integer()
  197. ) ->
  198. ok | {error, Reason :: term()}.
  199. tcp_connectivity(Host, Port, Timeout) ->
  200. case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
  201. {ok, Sock} ->
  202. gen_tcp:close(Sock),
  203. ok;
  204. {error, Reason} ->
  205. {error, Reason}
  206. end.
  207. str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
  208. str(Num) when is_number(Num) -> number_to_list(Num);
  209. str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
  210. str(Map) when is_map(Map) -> binary_to_list(emqx_json:encode(Map));
  211. str(List) when is_list(List) ->
  212. case io_lib:printable_list(List) of
  213. true -> List;
  214. false -> binary_to_list(emqx_json:encode(List))
  215. end;
  216. str(Data) ->
  217. error({invalid_str, Data}).
  218. utf8_bin(Str) when is_binary(Str); is_list(Str) ->
  219. unicode:characters_to_binary(Str);
  220. utf8_bin(Str) ->
  221. unicode:characters_to_binary(bin(Str)).
  222. utf8_str(Str) when is_binary(Str); is_list(Str) ->
  223. unicode:characters_to_list(Str);
  224. utf8_str(Str) ->
  225. unicode:characters_to_list(str(Str)).
  226. bin(Bin) when is_binary(Bin) -> Bin;
  227. bin(Num) when is_number(Num) -> number_to_binary(Num);
  228. bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
  229. bin(Map) when is_map(Map) -> emqx_json:encode(Map);
  230. bin(List) when is_list(List) ->
  231. case io_lib:printable_list(List) of
  232. true -> list_to_binary(List);
  233. false -> emqx_json:encode(List)
  234. end;
  235. bin(Data) ->
  236. error({invalid_bin, Data}).
  237. int(List) when is_list(List) ->
  238. try
  239. list_to_integer(List)
  240. catch
  241. error:badarg ->
  242. int(list_to_float(List))
  243. end;
  244. int(Bin) when is_binary(Bin) ->
  245. try
  246. binary_to_integer(Bin)
  247. catch
  248. error:badarg ->
  249. int(binary_to_float(Bin))
  250. end;
  251. int(Int) when is_integer(Int) -> Int;
  252. int(Float) when is_float(Float) -> erlang:floor(Float);
  253. int(true) ->
  254. 1;
  255. int(false) ->
  256. 0;
  257. int(Data) ->
  258. error({invalid_number, Data}).
  259. float(List) when is_list(List) ->
  260. try
  261. list_to_float(List)
  262. catch
  263. error:badarg ->
  264. float(list_to_integer(List))
  265. end;
  266. float(Bin) when is_binary(Bin) ->
  267. try
  268. binary_to_float(Bin)
  269. catch
  270. error:badarg ->
  271. float(binary_to_integer(Bin))
  272. end;
  273. float(Num) when is_number(Num) -> erlang:float(Num);
  274. float(Data) ->
  275. error({invalid_number, Data}).
  276. float2str(Float, Precision) when is_float(Float) and is_integer(Precision) ->
  277. float_to_binary(Float, [{decimals, Precision}, compact]).
  278. map(Bin) when is_binary(Bin) ->
  279. case emqx_json:decode(Bin, [return_maps]) of
  280. Map = #{} -> Map;
  281. _ -> error({invalid_map, Bin})
  282. end;
  283. map(List) when is_list(List) -> maps:from_list(List);
  284. map(Map) when is_map(Map) -> Map;
  285. map(Data) ->
  286. error({invalid_map, Data}).
  287. bool(Bool) when
  288. Bool == true;
  289. Bool == <<"true">>;
  290. Bool == 1
  291. ->
  292. true;
  293. bool(Bool) when
  294. Bool == false;
  295. Bool == <<"false">>;
  296. Bool == 0
  297. ->
  298. false;
  299. bool(Bool) ->
  300. error({invalid_boolean, Bool}).
  301. number_to_binary(Int) when is_integer(Int) ->
  302. integer_to_binary(Int);
  303. number_to_binary(Float) when is_float(Float) ->
  304. float_to_binary(Float, [{decimals, 10}, compact]).
  305. number_to_list(Int) when is_integer(Int) ->
  306. integer_to_list(Int);
  307. number_to_list(Float) when is_float(Float) ->
  308. float_to_list(Float, [{decimals, 10}, compact]).
  309. now_ms() ->
  310. erlang:system_time(millisecond).
  311. can_topic_match_oneof(Topic, Filters) ->
  312. lists:any(
  313. fun(Fltr) ->
  314. emqx_topic:match(Topic, Fltr)
  315. end,
  316. Filters
  317. ).