emqx_utils.erl 30 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils).
  17. -compile(inline).
  18. %% [TODO] Cleanup so the instruction below is not necessary.
  19. -elvis([{elvis_style, god_modules, disable}]).
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -export([
  22. merge_opts/2,
  23. maybe_apply/2,
  24. compose/1,
  25. compose/2,
  26. cons/2,
  27. run_fold/3,
  28. pipeline/3,
  29. start_timer/2,
  30. start_timer/3,
  31. cancel_timer/1,
  32. drain_deliver/0,
  33. drain_deliver/1,
  34. drain_down/1,
  35. check_oom/1,
  36. check_oom/2,
  37. tune_heap_size/1,
  38. proc_name/2,
  39. proc_stats/0,
  40. proc_stats/1,
  41. rand_seed/0,
  42. rand_id/1,
  43. now_to_secs/1,
  44. now_to_ms/1,
  45. index_of/2,
  46. maybe_parse_ip/1,
  47. ipv6_probe/1,
  48. gen_id/0,
  49. gen_id/1,
  50. explain_posix/1,
  51. pforeach/2,
  52. pforeach/3,
  53. pmap/2,
  54. pmap/3,
  55. readable_error_msg/1,
  56. safe_to_existing_atom/1,
  57. safe_to_existing_atom/2,
  58. pub_props_to_packet/1,
  59. safe_filename/1,
  60. diff_lists/3,
  61. merge_lists/3,
  62. flattermap/2,
  63. tcp_keepalive_opts/4,
  64. format/1,
  65. format/2,
  66. format_mfal/2,
  67. call_first_defined/3,
  68. ntoa/1,
  69. foldl_while/3,
  70. is_restricted_str/1,
  71. interactive_load/1
  72. ]).
  73. -export([
  74. bin_to_hexstr/2,
  75. hexstr_to_bin/1
  76. ]).
  77. -export([
  78. nolink_apply/1,
  79. nolink_apply/2
  80. ]).
  81. -export([clamp/3, redact/1, redact/2, is_redacted/2, is_redacted/3]).
  82. -export([deobfuscate/2]).
  83. -export_type([
  84. readable_error_msg/1
  85. ]).
  86. -type readable_error_msg(_Error) :: binary().
  87. -type option(T) :: undefined | T.
  88. -dialyzer({nowarn_function, [nolink_apply/2]}).
  89. -define(SHORT, 8).
  90. -define(DEFAULT_PMAP_TIMEOUT, 5000).
  91. %% @doc Parse v4 or v6 string format address to tuple.
  92. %% `Host' itself is returned if it's not an ip string.
  93. maybe_parse_ip(Host) ->
  94. case inet:parse_address(Host) of
  95. {ok, Addr} when is_tuple(Addr) -> Addr;
  96. {error, einval} -> Host
  97. end.
  98. %% @doc Add `ipv6_probe' socket option if it's supported.
  99. %% gen_tcp:ipv6_probe() -> true. is added to EMQ's OTP forks
  100. ipv6_probe(Opts) ->
  101. case erlang:function_exported(gen_tcp, ipv6_probe, 0) of
  102. true -> [{ipv6_probe, true} | Opts];
  103. false -> Opts
  104. end.
  105. %% @doc Merge options
  106. -spec merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist().
  107. merge_opts(Defaults, Options) ->
  108. lists:foldl(
  109. fun
  110. ({Opt, Val}, Acc) ->
  111. lists:keystore(Opt, 1, Acc, {Opt, Val});
  112. (Opt, Acc) ->
  113. lists:usort([Opt | Acc])
  114. end,
  115. Defaults,
  116. Options
  117. ).
  118. %% @doc Apply a function to a maybe argument.
  119. -spec maybe_apply(fun((option(A)) -> option(A)), option(A)) ->
  120. option(A)
  121. when
  122. A :: any().
  123. maybe_apply(_Fun, undefined) ->
  124. undefined;
  125. maybe_apply(Fun, Arg) when is_function(Fun) ->
  126. erlang:apply(Fun, [Arg]).
  127. -spec compose(list(F)) -> G when
  128. F :: fun((any()) -> any()),
  129. G :: fun((any()) -> any()).
  130. compose([F | More]) -> compose(F, More).
  131. -spec compose(F, G | [Gs]) -> C when
  132. F :: fun((X1) -> X2),
  133. G :: fun((X2) -> X3),
  134. Gs :: [fun((Xn) -> Xn1)],
  135. C :: fun((X1) -> Xm),
  136. X3 :: any(),
  137. Xn :: any(),
  138. Xn1 :: any(),
  139. Xm :: any().
  140. compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end;
  141. compose(F, [G]) -> compose(F, G);
  142. compose(F, [G | More]) -> compose(compose(F, G), More).
  143. -spec cons(X, [X]) -> [X, ...].
  144. cons(Head, Tail) ->
  145. [Head | Tail].
  146. %% @doc RunFold
  147. run_fold([], Acc, _State) ->
  148. Acc;
  149. run_fold([Fun | More], Acc, State) ->
  150. run_fold(More, Fun(Acc, State), State).
  151. %% @doc Pipeline
  152. pipeline([], Input, State) ->
  153. {ok, Input, State};
  154. pipeline([Fun | More], Input, State) ->
  155. case apply_fun(Fun, Input, State) of
  156. ok -> pipeline(More, Input, State);
  157. {ok, NState} -> pipeline(More, Input, NState);
  158. {ok, Output, NState} -> pipeline(More, Output, NState);
  159. {error, Reason} -> {error, Reason, State};
  160. {error, Reason, NState} -> {error, Reason, NState}
  161. end.
  162. -spec foldl_while(fun((X, Acc) -> {cont | halt, Acc}), Acc, [X]) -> Acc.
  163. foldl_while(_Fun, Acc, []) ->
  164. Acc;
  165. foldl_while(Fun, Acc, [X | Xs]) ->
  166. case Fun(X, Acc) of
  167. {cont, NewAcc} ->
  168. foldl_while(Fun, NewAcc, Xs);
  169. {halt, NewAcc} ->
  170. NewAcc
  171. end.
  172. -compile({inline, [apply_fun/3]}).
  173. apply_fun(Fun, Input, State) ->
  174. case erlang:fun_info(Fun, arity) of
  175. {arity, 1} -> Fun(Input);
  176. {arity, 2} -> Fun(Input, State)
  177. end.
  178. -spec start_timer(integer() | atom(), term()) -> option(reference()).
  179. start_timer(Interval, Msg) ->
  180. start_timer(Interval, self(), Msg).
  181. -spec start_timer(integer() | atom(), pid() | atom(), term()) -> option(reference()).
  182. start_timer(Interval, Dest, Msg) when is_number(Interval) ->
  183. erlang:start_timer(erlang:ceil(Interval), Dest, Msg);
  184. start_timer(_Atom, _Dest, _Msg) ->
  185. undefined.
  186. -spec cancel_timer(option(reference())) -> ok.
  187. cancel_timer(Timer) when is_reference(Timer) ->
  188. case erlang:cancel_timer(Timer) of
  189. false ->
  190. receive
  191. {timeout, Timer, _} -> ok
  192. after 0 -> ok
  193. end;
  194. _ ->
  195. ok
  196. end;
  197. cancel_timer(_) ->
  198. ok.
  199. %% @doc Drain delivers
  200. drain_deliver() ->
  201. drain_deliver(-1).
  202. drain_deliver(N) when is_integer(N) ->
  203. drain_deliver(N, []).
  204. drain_deliver(0, Acc) ->
  205. lists:reverse(Acc);
  206. drain_deliver(N, Acc) ->
  207. receive
  208. Deliver = {deliver, _Topic, _Msg} ->
  209. drain_deliver(N - 1, [Deliver | Acc])
  210. after 0 ->
  211. lists:reverse(Acc)
  212. end.
  213. %% @doc Drain process 'DOWN' events.
  214. -spec drain_down(pos_integer()) -> list(pid()).
  215. drain_down(Cnt) when Cnt > 0 ->
  216. drain_down(Cnt, []).
  217. drain_down(0, Acc) ->
  218. lists:reverse(Acc);
  219. drain_down(Cnt, Acc) ->
  220. receive
  221. {'DOWN', _MRef, process, Pid, _Reason} ->
  222. drain_down(Cnt - 1, [Pid | Acc])
  223. after 0 ->
  224. lists:reverse(Acc)
  225. end.
  226. %% @doc Check process's mailbox and heapsize against OOM policy,
  227. %% return `ok | {shutdown, Reason}' accordingly.
  228. %% `ok': There is nothing out of the ordinary.
  229. %% `shutdown': Some numbers (message queue length hit the limit),
  230. %% hence shutdown for greater good (system stability).
  231. %% [FIXME] cross-dependency on `emqx_types`.
  232. -spec check_oom(emqx_types:oom_policy()) -> ok | {shutdown, term()}.
  233. check_oom(Policy) ->
  234. check_oom(self(), Policy).
  235. -spec check_oom(pid(), emqx_types:oom_policy()) -> ok | {shutdown, term()}.
  236. check_oom(_Pid, #{enable := false}) ->
  237. ok;
  238. check_oom(Pid, #{
  239. max_mailbox_size := MaxQLen,
  240. max_heap_size := MaxHeapSize
  241. }) ->
  242. case process_info(Pid, [message_queue_len, total_heap_size]) of
  243. undefined ->
  244. ok;
  245. [{message_queue_len, QLen}, {total_heap_size, HeapSize}] ->
  246. do_check_oom([
  247. {QLen, MaxQLen, mailbox_overflow},
  248. {HeapSize, MaxHeapSize, proc_heap_too_large}
  249. ])
  250. end.
  251. do_check_oom([]) ->
  252. ok;
  253. do_check_oom([{Val, Max, Reason} | Rest]) ->
  254. case is_integer(Max) andalso (0 < Max) andalso (Max < Val) of
  255. true -> {shutdown, #{reason => Reason, value => Val, max => Max}};
  256. false -> do_check_oom(Rest)
  257. end.
  258. tune_heap_size(#{enable := false}) ->
  259. ignore;
  260. %% If the max_heap_size is set to zero, the limit is disabled.
  261. tune_heap_size(#{max_heap_size := 0}) ->
  262. ignore;
  263. tune_heap_size(#{max_heap_size := MaxHeapSize}) when MaxHeapSize > 0 ->
  264. MaxSize =
  265. case erlang:system_info(wordsize) of
  266. % arch_64
  267. 8 ->
  268. (1 bsl 59) - 1;
  269. % arch_32
  270. 4 ->
  271. (1 bsl 27) - 1
  272. end,
  273. OverflowedSize =
  274. case erlang:trunc(MaxHeapSize * 1.5) of
  275. SZ when SZ > MaxSize -> MaxSize;
  276. SZ -> SZ
  277. end,
  278. erlang:process_flag(max_heap_size, #{
  279. size => OverflowedSize,
  280. kill => true,
  281. error_logger => true
  282. }).
  283. -spec proc_name(atom(), pos_integer()) -> atom().
  284. proc_name(Mod, Id) ->
  285. list_to_atom(lists:concat([Mod, "_", Id])).
  286. %% Get Proc's Stats.
  287. %% [FIXME] cross-dependency on `emqx_types`.
  288. -spec proc_stats() -> emqx_types:stats().
  289. proc_stats() -> proc_stats(self()).
  290. -spec proc_stats(pid()) -> emqx_types:stats().
  291. proc_stats(Pid) ->
  292. case
  293. process_info(Pid, [
  294. message_queue_len,
  295. heap_size,
  296. total_heap_size,
  297. reductions,
  298. memory
  299. ])
  300. of
  301. undefined -> [];
  302. [{message_queue_len, Len} | ProcStats] -> [{mailbox_len, Len} | ProcStats]
  303. end.
  304. rand_seed() ->
  305. rand:seed(exsplus, erlang:timestamp()).
  306. -spec now_to_secs(erlang:timestamp()) -> pos_integer().
  307. now_to_secs({MegaSecs, Secs, _MicroSecs}) ->
  308. MegaSecs * 1000000 + Secs.
  309. -spec now_to_ms(erlang:timestamp()) -> pos_integer().
  310. now_to_ms({MegaSecs, Secs, MicroSecs}) ->
  311. (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs / 1000).
  312. %% lists:index_of/2
  313. index_of(E, L) ->
  314. index_of(E, 1, L).
  315. index_of(_E, _I, []) ->
  316. error(badarg);
  317. index_of(E, I, [E | _]) ->
  318. I;
  319. index_of(E, I, [_ | L]) ->
  320. index_of(E, I + 1, L).
  321. -spec bin_to_hexstr(binary(), lower | upper) -> binary().
  322. bin_to_hexstr(B, upper) when is_binary(B) ->
  323. <<<<(int2hexchar(H, upper)), (int2hexchar(L, upper))>> || <<H:4, L:4>> <= B>>;
  324. bin_to_hexstr(B, lower) when is_binary(B) ->
  325. <<<<(int2hexchar(H, lower)), (int2hexchar(L, lower))>> || <<H:4, L:4>> <= B>>.
  326. int2hexchar(I, _) when I >= 0 andalso I < 10 -> I + $0;
  327. int2hexchar(I, upper) -> I - 10 + $A;
  328. int2hexchar(I, lower) -> I - 10 + $a.
  329. -spec hexstr_to_bin(binary()) -> binary().
  330. hexstr_to_bin(B) when is_binary(B) ->
  331. hexstr_to_bin(B, erlang:bit_size(B)).
  332. hexstr_to_bin(B, Size) when is_binary(B) ->
  333. case Size rem 16 of
  334. 0 ->
  335. make_binary(B);
  336. 8 ->
  337. make_binary(<<"0", B/binary>>);
  338. _ ->
  339. throw({unsupport_hex_string, B, Size})
  340. end.
  341. make_binary(B) -> <<<<(hexchar2int(H) * 16 + hexchar2int(L))>> || <<H:8, L:8>> <= B>>.
  342. hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0;
  343. hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10;
  344. hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
  345. -spec gen_id() -> list().
  346. gen_id() ->
  347. gen_id(?SHORT).
  348. -spec gen_id(integer()) -> list().
  349. gen_id(Len) ->
  350. BitLen = Len * 4,
  351. <<R:BitLen>> = crypto:strong_rand_bytes(Len div 2),
  352. int_to_hex(R, Len).
  353. -spec clamp(number(), number(), number()) -> number().
  354. clamp(Val, Min, _Max) when Val < Min -> Min;
  355. clamp(Val, _Min, Max) when Val > Max -> Max;
  356. clamp(Val, _Min, _Max) -> Val.
  357. %% @doc https://www.erlang.org/doc/man/file.html#posix-error-codes
  358. explain_posix(eacces) -> "Permission denied";
  359. explain_posix(eagain) -> "Resource temporarily unavailable";
  360. explain_posix(ebadf) -> "Bad file number";
  361. explain_posix(ebusy) -> "File busy";
  362. explain_posix(edquot) -> "Disk quota exceeded";
  363. explain_posix(eexist) -> "File already exists";
  364. explain_posix(efault) -> "Bad address in system call argument";
  365. explain_posix(efbig) -> "File too large";
  366. explain_posix(eintr) -> "Interrupted system call";
  367. explain_posix(einval) -> "Invalid argument argument file/socket";
  368. explain_posix(eio) -> "I/O error";
  369. explain_posix(eisdir) -> "Illegal operation on a directory";
  370. explain_posix(eloop) -> "Too many levels of symbolic links";
  371. explain_posix(emfile) -> "Too many open files";
  372. explain_posix(emlink) -> "Too many links";
  373. explain_posix(enametoolong) -> "Filename too long";
  374. explain_posix(enfile) -> "File table overflow";
  375. explain_posix(enodev) -> "No such device";
  376. explain_posix(enoent) -> "No such file or directory";
  377. explain_posix(enomem) -> "Not enough memory";
  378. explain_posix(enospc) -> "No space left on device";
  379. explain_posix(enotblk) -> "Block device required";
  380. explain_posix(enotdir) -> "Not a directory";
  381. explain_posix(enotsup) -> "Operation not supported";
  382. explain_posix(enxio) -> "No such device or address";
  383. explain_posix(eperm) -> "Not owner";
  384. explain_posix(epipe) -> "Broken pipe";
  385. explain_posix(erofs) -> "Read-only file system";
  386. explain_posix(espipe) -> "Invalid seek";
  387. explain_posix(esrch) -> "No such process";
  388. explain_posix(estale) -> "Stale remote file handle";
  389. explain_posix(exdev) -> "Cross-domain link";
  390. explain_posix(NotPosix) -> NotPosix.
  391. -spec pforeach(fun((A) -> term()), list(A)) -> ok.
  392. pforeach(Fun, List) when is_function(Fun, 1), is_list(List) ->
  393. pforeach(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
  394. -spec pforeach(fun((A) -> term()), list(A), timeout()) -> ok.
  395. pforeach(Fun, List, Timeout) ->
  396. _ = pmap(Fun, List, Timeout),
  397. ok.
  398. %% @doc Like lists:map/2, only the callback function is evaluated
  399. %% concurrently.
  400. -spec pmap(fun((A) -> B), list(A)) -> list(B).
  401. pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
  402. pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
  403. -spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
  404. pmap(Fun, List, Timeout) when
  405. is_function(Fun, 1),
  406. is_list(List),
  407. (is_integer(Timeout) andalso Timeout >= 0 orelse Timeout =:= infinity)
  408. ->
  409. nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).
  410. %% @doc Delegate a function to a worker process.
  411. %% The function may spawn_link other processes but we do not
  412. %% want the caller process to be linked.
  413. %% This is done by isolating the possible link with a not-linked
  414. %% middleman process.
  415. nolink_apply(Fun) -> nolink_apply(Fun, infinity).
  416. %% @doc Same as `nolink_apply/1', with a timeout.
  417. -spec nolink_apply(function(), timeout()) -> term().
  418. nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
  419. Caller = self(),
  420. ResRef = alias([reply]),
  421. Middleman = erlang:spawn(
  422. fun() ->
  423. process_flag(trap_exit, true),
  424. CallerMRef = erlang:monitor(process, Caller),
  425. Worker = erlang:spawn_link(
  426. fun() ->
  427. Res =
  428. try
  429. {normal, Fun()}
  430. catch
  431. C:E:S ->
  432. {exception, {C, E, S}}
  433. end,
  434. _ = erlang:send(ResRef, {ResRef, Res}),
  435. ?tp(pmap_middleman_sent_response, #{}),
  436. exit(normal)
  437. end
  438. ),
  439. receive
  440. {'DOWN', CallerMRef, process, _, _} ->
  441. %% For whatever reason, if the caller is dead,
  442. %% there is no reason to continue
  443. exit(Worker, kill),
  444. exit(normal);
  445. {'EXIT', Worker, normal} ->
  446. exit(normal);
  447. {'EXIT', Worker, Reason} ->
  448. %% worker exited with some reason other than 'normal'
  449. _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}),
  450. exit(normal)
  451. end
  452. end
  453. ),
  454. receive
  455. {ResRef, {normal, Result}} ->
  456. Result;
  457. {ResRef, {exception, {C, E, S}}} ->
  458. erlang:raise(C, E, S);
  459. {ResRef, {'EXIT', Reason}} ->
  460. exit(Reason)
  461. after Timeout ->
  462. %% possible race condition: a message was received just as we enter the after
  463. %% block.
  464. ?tp(pmap_timeout, #{}),
  465. unalias(ResRef),
  466. exit(Middleman, kill),
  467. receive
  468. {ResRef, {normal, Result}} ->
  469. Result;
  470. {ResRef, {exception, {C, E, S}}} ->
  471. erlang:raise(C, E, S);
  472. {ResRef, {'EXIT', Reason}} ->
  473. exit(Reason)
  474. after 0 ->
  475. exit(timeout)
  476. end
  477. end.
  478. safe_to_existing_atom(In) ->
  479. safe_to_existing_atom(In, utf8).
  480. safe_to_existing_atom(Bin, Encoding) when is_binary(Bin) ->
  481. try_to_existing_atom(fun erlang:binary_to_existing_atom/2, Bin, Encoding);
  482. safe_to_existing_atom(List, Encoding) when is_list(List) ->
  483. try_to_existing_atom(fun(In, _) -> erlang:list_to_existing_atom(In) end, List, Encoding);
  484. safe_to_existing_atom(Atom, _Encoding) when is_atom(Atom) ->
  485. {ok, Atom};
  486. safe_to_existing_atom(_Any, _Encoding) ->
  487. {error, invalid_type}.
  488. -spec tcp_keepalive_opts(term(), non_neg_integer(), non_neg_integer(), non_neg_integer()) ->
  489. {ok, [{keepalive, true} | {raw, non_neg_integer(), non_neg_integer(), binary()}]}
  490. | {error, {unsupported_os, term()}}.
  491. tcp_keepalive_opts({unix, linux}, Idle, Interval, Probes) ->
  492. {ok, [
  493. {keepalive, true},
  494. {raw, 6, 4, <<Idle:32/native>>},
  495. {raw, 6, 5, <<Interval:32/native>>},
  496. {raw, 6, 6, <<Probes:32/native>>}
  497. ]};
  498. tcp_keepalive_opts({unix, darwin}, Idle, Interval, Probes) ->
  499. {ok, [
  500. {keepalive, true},
  501. {raw, 6, 16#10, <<Idle:32/native>>},
  502. {raw, 6, 16#101, <<Interval:32/native>>},
  503. {raw, 6, 16#102, <<Probes:32/native>>}
  504. ]};
  505. tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
  506. {error, {unsupported_os, OS}}.
  507. format(Term) ->
  508. unicode:characters_to_binary(io_lib:format("~0tp", [Term])).
  509. format(Fmt, Args) ->
  510. unicode:characters_to_binary(io_lib:format(Fmt, Args)).
  511. %% @doc Helper function for log formatters.
  512. -spec format_mfal(map(), map()) -> undefined | binary().
  513. format_mfal(Data, #{with_mfa := true}) ->
  514. Line =
  515. case maps:get(line, Data, undefined) of
  516. undefined ->
  517. <<"">>;
  518. Num ->
  519. ["(", integer_to_list(Num), ")"]
  520. end,
  521. case maps:get(mfa, Data, undefined) of
  522. {M, F, A} ->
  523. iolist_to_binary([
  524. atom_to_binary(M, utf8),
  525. $:,
  526. atom_to_binary(F, utf8),
  527. $/,
  528. integer_to_binary(A),
  529. Line
  530. ]);
  531. _ ->
  532. undefined
  533. end;
  534. format_mfal(_, _) ->
  535. undefined.
  536. -spec call_first_defined(module(), atom(), list()) -> term() | no_return().
  537. call_first_defined(Module, Function, []) ->
  538. error({not_exported, Module, Function});
  539. call_first_defined(Module, Function, [Args | Rest]) ->
  540. %% ensure module is loaded
  541. ok = interactive_load(Module),
  542. case erlang:function_exported(Module, Function, length(Args)) of
  543. true ->
  544. apply(Module, Function, Args);
  545. false ->
  546. call_first_defined(Module, Function, Rest)
  547. end.
  548. %%------------------------------------------------------------------------------
  549. %% Internal Functions
  550. %%------------------------------------------------------------------------------
  551. do_parallel_map(Fun, List) ->
  552. Parent = self(),
  553. PidList = lists:map(
  554. fun(Item) ->
  555. erlang:spawn_link(
  556. fun() ->
  557. Res =
  558. try
  559. {normal, Fun(Item)}
  560. catch
  561. C:E:St ->
  562. {exception, {C, E, St}}
  563. end,
  564. Parent ! {self(), Res}
  565. end
  566. )
  567. end,
  568. List
  569. ),
  570. lists:foldr(
  571. fun(Pid, Acc) ->
  572. receive
  573. {Pid, {normal, Result}} ->
  574. [Result | Acc];
  575. {Pid, {exception, {C, E, St}}} ->
  576. erlang:raise(C, E, St)
  577. end
  578. end,
  579. [],
  580. PidList
  581. ).
  582. int_to_hex(I, N) when is_integer(I), I >= 0 ->
  583. int_to_hex([], I, 1, N).
  584. int_to_hex(L, I, Count, N) when
  585. I < 16
  586. ->
  587. pad([int_to_hex(I) | L], N - Count);
  588. int_to_hex(L, I, Count, N) ->
  589. int_to_hex([int_to_hex(I rem 16) | L], I div 16, Count + 1, N).
  590. int_to_hex(I) when 0 =< I, I =< 9 ->
  591. I + $0;
  592. int_to_hex(I) when 10 =< I, I =< 15 ->
  593. (I - 10) + $a.
  594. pad(L, 0) ->
  595. L;
  596. pad(L, Count) ->
  597. pad([$0 | L], Count - 1).
  598. readable_error_msg(Msg) when is_binary(Msg) -> Msg;
  599. readable_error_msg(Error) ->
  600. case io_lib:printable_unicode_list(Error) of
  601. true ->
  602. unicode:characters_to_binary(Error, utf8);
  603. false ->
  604. case emqx_hocon:format_error(Error, #{no_stacktrace => true}) of
  605. {ok, Msg} ->
  606. Msg;
  607. false ->
  608. to_hr_error(Error)
  609. end
  610. end.
  611. to_hr_error(nxdomain) ->
  612. <<"Could not resolve host">>;
  613. to_hr_error(econnrefused) ->
  614. <<"Connection refused">>;
  615. to_hr_error({unauthorized_client, _}) ->
  616. <<"Unauthorized client">>;
  617. to_hr_error({not_authorized, _}) ->
  618. <<"Not authorized">>;
  619. to_hr_error({malformed_username_or_password, _}) ->
  620. <<"Bad username or password">>;
  621. to_hr_error(Error) ->
  622. format(Error).
  623. try_to_existing_atom(Convert, Data, Encoding) ->
  624. try Convert(Data, Encoding) of
  625. Atom ->
  626. {ok, Atom}
  627. catch
  628. _:Reason -> {error, Reason}
  629. end.
  630. redact(Term) ->
  631. emqx_utils_redact:redact(Term).
  632. redact(Term, Checker) ->
  633. emqx_utils_redact:redact(Term, Checker).
  634. deobfuscate(NewConf, OldConf) ->
  635. emqx_utils_redact:deobfuscate(NewConf, OldConf).
  636. is_redacted(K, V) ->
  637. emqx_utils_redact:is_redacted(K, V).
  638. is_redacted(K, V, Fun) ->
  639. emqx_utils_redact:is_redacted(K, V, Fun).
  640. -ifdef(TEST).
  641. -include_lib("eunit/include/eunit.hrl").
  642. ipv6_probe_test() ->
  643. try gen_tcp:ipv6_probe() of
  644. true ->
  645. ?assertEqual([{ipv6_probe, true}], ipv6_probe([]))
  646. catch
  647. _:_ ->
  648. ok
  649. end.
  650. -endif.
  651. pub_props_to_packet(Properties) ->
  652. F = fun
  653. ('User-Property', M) ->
  654. case is_map(M) andalso map_size(M) > 0 of
  655. true -> {true, maps:to_list(M)};
  656. false -> false
  657. end;
  658. ('User-Property-Pairs', _) ->
  659. false;
  660. (_, _) ->
  661. true
  662. end,
  663. maps:filtermap(F, Properties).
  664. %% fix filename by replacing characters which could be invalid on some filesystems
  665. %% with safe ones
  666. -spec safe_filename(binary() | unicode:chardata()) -> binary() | [unicode:chardata()].
  667. safe_filename(Filename) when is_binary(Filename) ->
  668. binary:replace(Filename, <<":">>, <<"-">>, [global]);
  669. safe_filename(Filename) when is_list(Filename) ->
  670. lists:flatten(string:replace(Filename, ":", "-", all)).
  671. %% @doc Compares two lists of maps and returns the differences between them in a
  672. %% map containing four keys – 'removed', 'added', 'identical', and 'changed' –
  673. %% each holding a list of maps. Elements are compared using key function KeyFunc
  674. %% to extract the comparison key used for matching.
  675. %%
  676. %% The return value is a map with the following keys and the list of maps as its values:
  677. %% * 'removed' – a list of maps that were present in the Old list, but not found in the New list.
  678. %% * 'added' – a list of maps that were present in the New list, but not found in the Old list.
  679. %% * 'identical' – a list of maps that were present in both lists and have the same comparison key value.
  680. %% * 'changed' – a list of pairs of maps representing the changes between maps present in the New and Old lists.
  681. %% The first map in the pair represents the map in the Old list, and the second map
  682. %% represents the potential modification in the New list.
  683. %% The KeyFunc parameter is a function that extracts the comparison key used
  684. %% for matching from each map. The function should return a comparable term,
  685. %% such as an atom, a number, or a string. This is used to determine if each
  686. %% element is the same in both lists.
  687. -spec diff_lists(list(T), list(T), Func) ->
  688. #{
  689. added := list(T),
  690. identical := list(T),
  691. removed := list(T),
  692. changed := list({Old :: T, New :: T})
  693. }
  694. when
  695. Func :: fun((T) -> any()),
  696. T :: any().
  697. diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
  698. Removed =
  699. lists:foldl(
  700. fun(E, RemovedAcc) ->
  701. case search(KeyFunc(E), KeyFunc, New) of
  702. false -> [E | RemovedAcc];
  703. _ -> RemovedAcc
  704. end
  705. end,
  706. [],
  707. Old
  708. ),
  709. {Added, Identical, Changed} =
  710. lists:foldl(
  711. fun(E, Acc) ->
  712. {Added0, Identical0, Changed0} = Acc,
  713. case search(KeyFunc(E), KeyFunc, Old) of
  714. false ->
  715. {[E | Added0], Identical0, Changed0};
  716. E ->
  717. {Added0, [E | Identical0], Changed0};
  718. E1 ->
  719. {Added0, Identical0, [{E1, E} | Changed0]}
  720. end
  721. end,
  722. {[], [], []},
  723. New
  724. ),
  725. #{
  726. removed => lists:reverse(Removed),
  727. added => lists:reverse(Added),
  728. identical => lists:reverse(Identical),
  729. changed => lists:reverse(Changed)
  730. }.
  731. %% @doc Merges two lists preserving the original order of elements in both lists.
  732. %% KeyFunc must extract a unique key from each element.
  733. %% If two keys exist in both lists, the value in List1 is superseded by the value in List2, but
  734. %% the element position in the result list will equal its position in List1.
  735. %% Example:
  736. %% emqx_utils:merge_append_lists(
  737. %% [#{id => a, val => old}, #{id => b, val => old}],
  738. %% [#{id => a, val => new}, #{id => c}, #{id => b, val => new}, #{id => d}],
  739. %% fun(#{id := Id}) -> Id end).
  740. %% [#{id => a,val => new},
  741. %% #{id => b,val => new},
  742. %% #{id => c},
  743. %% #{id => d}]
  744. -spec merge_lists(list(T), list(T), KeyFunc) -> list(T) when
  745. KeyFunc :: fun((T) -> any()),
  746. T :: any().
  747. merge_lists(List1, List2, KeyFunc) ->
  748. WithKeysList2 = lists:map(fun(E) -> {KeyFunc(E), E} end, List2),
  749. WithKeysList1 = lists:map(
  750. fun(E) ->
  751. K = KeyFunc(E),
  752. case lists:keyfind(K, 1, WithKeysList2) of
  753. false -> {K, E};
  754. WithKey1 -> WithKey1
  755. end
  756. end,
  757. List1
  758. ),
  759. NewWithKeysList2 = lists:filter(
  760. fun({K, _}) ->
  761. not lists:keymember(K, 1, WithKeysList1)
  762. end,
  763. WithKeysList2
  764. ),
  765. [E || {_, E} <- WithKeysList1 ++ NewWithKeysList2].
  766. search(_ExpectValue, _KeyFunc, []) ->
  767. false;
  768. search(ExpectValue, KeyFunc, [Item | List]) ->
  769. case KeyFunc(Item) =:= ExpectValue of
  770. true -> Item;
  771. false -> search(ExpectValue, KeyFunc, List)
  772. end.
  773. %% @doc Maps over a list of terms and flattens the result, giving back a flat
  774. %% list of terms. It's similar to `lists:flatmap/2`, but it also works on a
  775. %% single term as `Fun` output (thus, the wordplay on "flatter").
  776. %% The purpose of this function is to adapt to `Fun`s that return either a `[]`
  777. %% or a term, and to avoid costs of list construction and flattening when
  778. %% dealing with large lists.
  779. -spec flattermap(Fun, [X]) -> [X] when
  780. Fun :: fun((X) -> [X] | X).
  781. flattermap(_Fun, []) ->
  782. [];
  783. flattermap(Fun, [X | Xs]) ->
  784. flatcomb(Fun(X), flattermap(Fun, Xs)).
  785. flatcomb([], Zs) ->
  786. Zs;
  787. flatcomb(Ys = [_ | _], []) ->
  788. Ys;
  789. flatcomb(Ys = [_ | _], Zs = [_ | _]) ->
  790. Ys ++ Zs;
  791. flatcomb(Y, Zs) ->
  792. [Y | Zs].
  793. %% @doc Format IP address tuple or {IP, Port} tuple to string.
  794. ntoa({IP, Port}) ->
  795. ntoa(IP) ++ ":" ++ integer_to_list(Port);
  796. ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
  797. %% v6 piggyback v4
  798. inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
  799. ntoa(IP) ->
  800. inet_parse:ntoa(IP).
  801. %% @doc Return true if the provided string is a restricted string:
  802. %% Start with a letter or a digit,
  803. %% remaining characters can be '-' or '_' in addition to letters and digits
  804. is_restricted_str(String) ->
  805. RE = <<"^[A-Za-z0-9]+[A-Za-z0-9-_]*$">>,
  806. match =:= re:run(String, RE, [{capture, none}]).
  807. %% @doc Generate random, printable bytes as an ID.
  808. %% The first byte is ensured to be a-z or A-Z.
  809. rand_id(Len) when Len > 0 ->
  810. iolist_to_binary([rand_first_char(), rand_chars(Len - 1)]).
  811. rand_first_char() ->
  812. base62(rand:uniform(52) - 1).
  813. rand_chars(0) ->
  814. [];
  815. rand_chars(N) ->
  816. [rand_char() | rand_chars(N - 1)].
  817. rand_char() ->
  818. base62(rand:uniform(62) - 1).
  819. base62(I) when I < 26 -> $A + I;
  820. base62(I) when I < 52 -> $a + I - 26;
  821. base62(I) -> $0 + I - 52.
  822. %% In production code, EMQX is always booted in embedded mode.
  823. %% so making a dynamic call to Module:module_info(module) is cheap.
  824. %% In interactive mode (test, and emqx config check CLI), the first attempt
  825. %% to make the dynamic call to Module:module_info(module) will cost,
  826. %% once loaded, it's cheap for subsequent calls.
  827. %% NOTE: For non-existing modules, this call is not as effective!
  828. interactive_load(Module) ->
  829. _ = catch apply(Module, module_info, [module]),
  830. ok.
  831. -ifdef(TEST).
  832. -include_lib("eunit/include/eunit.hrl").
  833. diff_lists_test() ->
  834. KeyFunc = fun(#{name := Name}) -> Name end,
  835. ?assertEqual(
  836. #{
  837. removed => [],
  838. added => [],
  839. identical => [],
  840. changed => []
  841. },
  842. diff_lists([], [], KeyFunc)
  843. ),
  844. %% test removed list
  845. ?assertEqual(
  846. #{
  847. removed => [#{name => a, value => 1}],
  848. added => [],
  849. identical => [],
  850. changed => []
  851. },
  852. diff_lists([], [#{name => a, value => 1}], KeyFunc)
  853. ),
  854. %% test added list
  855. ?assertEqual(
  856. #{
  857. removed => [],
  858. added => [#{name => a, value => 1}],
  859. identical => [],
  860. changed => []
  861. },
  862. diff_lists([#{name => a, value => 1}], [], KeyFunc)
  863. ),
  864. %% test identical list
  865. ?assertEqual(
  866. #{
  867. removed => [],
  868. added => [],
  869. identical => [#{name => a, value => 1}],
  870. changed => []
  871. },
  872. diff_lists([#{name => a, value => 1}], [#{name => a, value => 1}], KeyFunc)
  873. ),
  874. Old = [
  875. #{name => a, value => 1},
  876. #{name => b, value => 4},
  877. #{name => e, value => 2},
  878. #{name => d, value => 4}
  879. ],
  880. New = [
  881. #{name => a, value => 1},
  882. #{name => b, value => 2},
  883. #{name => e, value => 2},
  884. #{name => c, value => 3}
  885. ],
  886. Diff = diff_lists(New, Old, KeyFunc),
  887. ?assertEqual(
  888. #{
  889. added => [
  890. #{name => c, value => 3}
  891. ],
  892. identical => [
  893. #{name => a, value => 1},
  894. #{name => e, value => 2}
  895. ],
  896. removed => [
  897. #{name => d, value => 4}
  898. ],
  899. changed => [{#{name => b, value => 4}, #{name => b, value => 2}}]
  900. },
  901. Diff
  902. ),
  903. ok.
  904. -endif.