emqx_utils.erl 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils).
  17. -compile(inline).
  18. %% [TODO] Cleanup so the instruction below is not necessary.
  19. -elvis([{elvis_style, god_modules, disable}]).
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -export([
  22. merge_opts/2,
  23. maybe_apply/2,
  24. compose/1,
  25. compose/2,
  26. cons/2,
  27. run_fold/3,
  28. pipeline/3,
  29. start_timer/2,
  30. start_timer/3,
  31. cancel_timer/1,
  32. drain_deliver/0,
  33. drain_deliver/1,
  34. drain_down/1,
  35. check_oom/1,
  36. check_oom/2,
  37. tune_heap_size/1,
  38. proc_name/2,
  39. proc_stats/0,
  40. proc_stats/1,
  41. rand_seed/0,
  42. now_to_secs/1,
  43. now_to_ms/1,
  44. index_of/2,
  45. maybe_parse_ip/1,
  46. ipv6_probe/1,
  47. gen_id/0,
  48. gen_id/1,
  49. explain_posix/1,
  50. pforeach/2,
  51. pforeach/3,
  52. pmap/2,
  53. pmap/3,
  54. readable_error_msg/1,
  55. safe_to_existing_atom/1,
  56. safe_to_existing_atom/2,
  57. pub_props_to_packet/1,
  58. safe_filename/1,
  59. diff_lists/3,
  60. merge_lists/3,
  61. flattermap/2,
  62. tcp_keepalive_opts/4,
  63. format/1,
  64. call_first_defined/1,
  65. ntoa/1,
  66. foldl_while/3,
  67. is_restricted_str/1
  68. ]).
  69. -export([
  70. bin_to_hexstr/2,
  71. hexstr_to_bin/1
  72. ]).
  73. -export([
  74. nolink_apply/1,
  75. nolink_apply/2
  76. ]).
  77. -export([clamp/3, redact/1, redact/2, is_redacted/2, is_redacted/3]).
  78. -export([deobfuscate/2]).
  79. -export_type([
  80. readable_error_msg/1
  81. ]).
  82. -type readable_error_msg(_Error) :: binary().
  83. -type option(T) :: undefined | T.
  84. -dialyzer({nowarn_function, [nolink_apply/2]}).
  85. -define(SHORT, 8).
  86. -define(DEFAULT_PMAP_TIMEOUT, 5000).
  87. %% @doc Parse v4 or v6 string format address to tuple.
  88. %% `Host' itself is returned if it's not an ip string.
  89. maybe_parse_ip(Host) ->
  90. case inet:parse_address(Host) of
  91. {ok, Addr} when is_tuple(Addr) -> Addr;
  92. {error, einval} -> Host
  93. end.
  94. %% @doc Add `ipv6_probe' socket option if it's supported.
  95. %% gen_tcp:ipv6_probe() -> true. is added to EMQ's OTP forks
  96. ipv6_probe(Opts) ->
  97. case erlang:function_exported(gen_tcp, ipv6_probe, 0) of
  98. true -> [{ipv6_probe, true} | Opts];
  99. false -> Opts
  100. end.
  101. %% @doc Merge options
  102. -spec merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist().
  103. merge_opts(Defaults, Options) ->
  104. lists:foldl(
  105. fun
  106. ({Opt, Val}, Acc) ->
  107. lists:keystore(Opt, 1, Acc, {Opt, Val});
  108. (Opt, Acc) ->
  109. lists:usort([Opt | Acc])
  110. end,
  111. Defaults,
  112. Options
  113. ).
  114. %% @doc Apply a function to a maybe argument.
  115. -spec maybe_apply(fun((option(A)) -> option(A)), option(A)) ->
  116. option(A)
  117. when
  118. A :: any().
  119. maybe_apply(_Fun, undefined) ->
  120. undefined;
  121. maybe_apply(Fun, Arg) when is_function(Fun) ->
  122. erlang:apply(Fun, [Arg]).
  123. -spec compose(list(F)) -> G when
  124. F :: fun((any()) -> any()),
  125. G :: fun((any()) -> any()).
  126. compose([F | More]) -> compose(F, More).
  127. -spec compose(F, G | [Gs]) -> C when
  128. F :: fun((X1) -> X2),
  129. G :: fun((X2) -> X3),
  130. Gs :: [fun((Xn) -> Xn1)],
  131. C :: fun((X1) -> Xm),
  132. X3 :: any(),
  133. Xn :: any(),
  134. Xn1 :: any(),
  135. Xm :: any().
  136. compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end;
  137. compose(F, [G]) -> compose(F, G);
  138. compose(F, [G | More]) -> compose(compose(F, G), More).
  139. -spec cons(X, [X]) -> [X, ...].
  140. cons(Head, Tail) ->
  141. [Head | Tail].
  142. %% @doc RunFold
  143. run_fold([], Acc, _State) ->
  144. Acc;
  145. run_fold([Fun | More], Acc, State) ->
  146. run_fold(More, Fun(Acc, State), State).
  147. %% @doc Pipeline
  148. pipeline([], Input, State) ->
  149. {ok, Input, State};
  150. pipeline([Fun | More], Input, State) ->
  151. case apply_fun(Fun, Input, State) of
  152. ok -> pipeline(More, Input, State);
  153. {ok, NState} -> pipeline(More, Input, NState);
  154. {ok, Output, NState} -> pipeline(More, Output, NState);
  155. {error, Reason} -> {error, Reason, State};
  156. {error, Reason, NState} -> {error, Reason, NState}
  157. end.
  158. -spec foldl_while(fun((X, Acc) -> {cont | halt, Acc}), Acc, [X]) -> Acc.
  159. foldl_while(_Fun, Acc, []) ->
  160. Acc;
  161. foldl_while(Fun, Acc, [X | Xs]) ->
  162. case Fun(X, Acc) of
  163. {cont, NewAcc} ->
  164. foldl_while(Fun, NewAcc, Xs);
  165. {halt, NewAcc} ->
  166. NewAcc
  167. end.
  168. -compile({inline, [apply_fun/3]}).
  169. apply_fun(Fun, Input, State) ->
  170. case erlang:fun_info(Fun, arity) of
  171. {arity, 1} -> Fun(Input);
  172. {arity, 2} -> Fun(Input, State)
  173. end.
  174. -spec start_timer(integer() | atom(), term()) -> option(reference()).
  175. start_timer(Interval, Msg) ->
  176. start_timer(Interval, self(), Msg).
  177. -spec start_timer(integer() | atom(), pid() | atom(), term()) -> option(reference()).
  178. start_timer(Interval, Dest, Msg) when is_number(Interval) ->
  179. erlang:start_timer(erlang:ceil(Interval), Dest, Msg);
  180. start_timer(_Atom, _Dest, _Msg) ->
  181. undefined.
  182. -spec cancel_timer(option(reference())) -> ok.
  183. cancel_timer(Timer) when is_reference(Timer) ->
  184. case erlang:cancel_timer(Timer) of
  185. false ->
  186. receive
  187. {timeout, Timer, _} -> ok
  188. after 0 -> ok
  189. end;
  190. _ ->
  191. ok
  192. end;
  193. cancel_timer(_) ->
  194. ok.
  195. %% @doc Drain delivers
  196. drain_deliver() ->
  197. drain_deliver(-1).
  198. drain_deliver(N) when is_integer(N) ->
  199. drain_deliver(N, []).
  200. drain_deliver(0, Acc) ->
  201. lists:reverse(Acc);
  202. drain_deliver(N, Acc) ->
  203. receive
  204. Deliver = {deliver, _Topic, _Msg} ->
  205. drain_deliver(N - 1, [Deliver | Acc])
  206. after 0 ->
  207. lists:reverse(Acc)
  208. end.
  209. %% @doc Drain process 'DOWN' events.
  210. -spec drain_down(pos_integer()) -> list(pid()).
  211. drain_down(Cnt) when Cnt > 0 ->
  212. drain_down(Cnt, []).
  213. drain_down(0, Acc) ->
  214. lists:reverse(Acc);
  215. drain_down(Cnt, Acc) ->
  216. receive
  217. {'DOWN', _MRef, process, Pid, _Reason} ->
  218. drain_down(Cnt - 1, [Pid | Acc])
  219. after 0 ->
  220. lists:reverse(Acc)
  221. end.
  222. %% @doc Check process's mailbox and heapsize against OOM policy,
  223. %% return `ok | {shutdown, Reason}' accordingly.
  224. %% `ok': There is nothing out of the ordinary.
  225. %% `shutdown': Some numbers (message queue length hit the limit),
  226. %% hence shutdown for greater good (system stability).
  227. %% [FIXME] cross-dependency on `emqx_types`.
  228. -spec check_oom(emqx_types:oom_policy()) -> ok | {shutdown, term()}.
  229. check_oom(Policy) ->
  230. check_oom(self(), Policy).
  231. -spec check_oom(pid(), emqx_types:oom_policy()) -> ok | {shutdown, term()}.
  232. check_oom(_Pid, #{enable := false}) ->
  233. ok;
  234. check_oom(Pid, #{
  235. max_mailbox_size := MaxQLen,
  236. max_heap_size := MaxHeapSize
  237. }) ->
  238. case process_info(Pid, [message_queue_len, total_heap_size]) of
  239. undefined ->
  240. ok;
  241. [{message_queue_len, QLen}, {total_heap_size, HeapSize}] ->
  242. do_check_oom([
  243. {QLen, MaxQLen, mailbox_overflow},
  244. {HeapSize, MaxHeapSize, proc_heap_too_large}
  245. ])
  246. end.
  247. do_check_oom([]) ->
  248. ok;
  249. do_check_oom([{Val, Max, Reason} | Rest]) ->
  250. case is_integer(Max) andalso (0 < Max) andalso (Max < Val) of
  251. true -> {shutdown, #{reason => Reason, value => Val, max => Max}};
  252. false -> do_check_oom(Rest)
  253. end.
  254. tune_heap_size(#{enable := false}) ->
  255. ok;
  256. %% If the max_heap_size is set to zero, the limit is disabled.
  257. tune_heap_size(#{max_heap_size := MaxHeapSize}) when MaxHeapSize > 0 ->
  258. MaxSize =
  259. case erlang:system_info(wordsize) of
  260. % arch_64
  261. 8 ->
  262. (1 bsl 59) - 1;
  263. % arch_32
  264. 4 ->
  265. (1 bsl 27) - 1
  266. end,
  267. OverflowedSize =
  268. case erlang:trunc(MaxHeapSize * 1.5) of
  269. SZ when SZ > MaxSize -> MaxSize;
  270. SZ -> SZ
  271. end,
  272. erlang:process_flag(max_heap_size, #{
  273. size => OverflowedSize,
  274. kill => true,
  275. error_logger => true
  276. }).
  277. -spec proc_name(atom(), pos_integer()) -> atom().
  278. proc_name(Mod, Id) ->
  279. list_to_atom(lists:concat([Mod, "_", Id])).
  280. %% Get Proc's Stats.
  281. %% [FIXME] cross-dependency on `emqx_types`.
  282. -spec proc_stats() -> emqx_types:stats().
  283. proc_stats() -> proc_stats(self()).
  284. -spec proc_stats(pid()) -> emqx_types:stats().
  285. proc_stats(Pid) ->
  286. case
  287. process_info(Pid, [
  288. message_queue_len,
  289. heap_size,
  290. total_heap_size,
  291. reductions,
  292. memory
  293. ])
  294. of
  295. undefined -> [];
  296. [{message_queue_len, Len} | ProcStats] -> [{mailbox_len, Len} | ProcStats]
  297. end.
  298. rand_seed() ->
  299. rand:seed(exsplus, erlang:timestamp()).
  300. -spec now_to_secs(erlang:timestamp()) -> pos_integer().
  301. now_to_secs({MegaSecs, Secs, _MicroSecs}) ->
  302. MegaSecs * 1000000 + Secs.
  303. -spec now_to_ms(erlang:timestamp()) -> pos_integer().
  304. now_to_ms({MegaSecs, Secs, MicroSecs}) ->
  305. (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs / 1000).
  306. %% lists:index_of/2
  307. index_of(E, L) ->
  308. index_of(E, 1, L).
  309. index_of(_E, _I, []) ->
  310. error(badarg);
  311. index_of(E, I, [E | _]) ->
  312. I;
  313. index_of(E, I, [_ | L]) ->
  314. index_of(E, I + 1, L).
  315. -spec bin_to_hexstr(binary(), lower | upper) -> binary().
  316. bin_to_hexstr(B, upper) when is_binary(B) ->
  317. <<<<(int2hexchar(H, upper)), (int2hexchar(L, upper))>> || <<H:4, L:4>> <= B>>;
  318. bin_to_hexstr(B, lower) when is_binary(B) ->
  319. <<<<(int2hexchar(H, lower)), (int2hexchar(L, lower))>> || <<H:4, L:4>> <= B>>.
  320. int2hexchar(I, _) when I >= 0 andalso I < 10 -> I + $0;
  321. int2hexchar(I, upper) -> I - 10 + $A;
  322. int2hexchar(I, lower) -> I - 10 + $a.
  323. -spec hexstr_to_bin(binary()) -> binary().
  324. hexstr_to_bin(B) when is_binary(B) ->
  325. hexstr_to_bin(B, erlang:bit_size(B)).
  326. hexstr_to_bin(B, Size) when is_binary(B) ->
  327. case Size rem 16 of
  328. 0 ->
  329. make_binary(B);
  330. 8 ->
  331. make_binary(<<"0", B/binary>>);
  332. _ ->
  333. throw({unsupport_hex_string, B, Size})
  334. end.
  335. make_binary(B) -> <<<<(hexchar2int(H) * 16 + hexchar2int(L))>> || <<H:8, L:8>> <= B>>.
  336. hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0;
  337. hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10;
  338. hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
  339. -spec gen_id() -> list().
  340. gen_id() ->
  341. gen_id(?SHORT).
  342. -spec gen_id(integer()) -> list().
  343. gen_id(Len) ->
  344. BitLen = Len * 4,
  345. <<R:BitLen>> = crypto:strong_rand_bytes(Len div 2),
  346. int_to_hex(R, Len).
  347. -spec clamp(number(), number(), number()) -> number().
  348. clamp(Val, Min, _Max) when Val < Min -> Min;
  349. clamp(Val, _Min, Max) when Val > Max -> Max;
  350. clamp(Val, _Min, _Max) -> Val.
  351. %% @doc https://www.erlang.org/doc/man/file.html#posix-error-codes
  352. explain_posix(eacces) -> "Permission denied";
  353. explain_posix(eagain) -> "Resource temporarily unavailable";
  354. explain_posix(ebadf) -> "Bad file number";
  355. explain_posix(ebusy) -> "File busy";
  356. explain_posix(edquot) -> "Disk quota exceeded";
  357. explain_posix(eexist) -> "File already exists";
  358. explain_posix(efault) -> "Bad address in system call argument";
  359. explain_posix(efbig) -> "File too large";
  360. explain_posix(eintr) -> "Interrupted system call";
  361. explain_posix(einval) -> "Invalid argument argument file/socket";
  362. explain_posix(eio) -> "I/O error";
  363. explain_posix(eisdir) -> "Illegal operation on a directory";
  364. explain_posix(eloop) -> "Too many levels of symbolic links";
  365. explain_posix(emfile) -> "Too many open files";
  366. explain_posix(emlink) -> "Too many links";
  367. explain_posix(enametoolong) -> "Filename too long";
  368. explain_posix(enfile) -> "File table overflow";
  369. explain_posix(enodev) -> "No such device";
  370. explain_posix(enoent) -> "No such file or directory";
  371. explain_posix(enomem) -> "Not enough memory";
  372. explain_posix(enospc) -> "No space left on device";
  373. explain_posix(enotblk) -> "Block device required";
  374. explain_posix(enotdir) -> "Not a directory";
  375. explain_posix(enotsup) -> "Operation not supported";
  376. explain_posix(enxio) -> "No such device or address";
  377. explain_posix(eperm) -> "Not owner";
  378. explain_posix(epipe) -> "Broken pipe";
  379. explain_posix(erofs) -> "Read-only file system";
  380. explain_posix(espipe) -> "Invalid seek";
  381. explain_posix(esrch) -> "No such process";
  382. explain_posix(estale) -> "Stale remote file handle";
  383. explain_posix(exdev) -> "Cross-domain link";
  384. explain_posix(NotPosix) -> NotPosix.
  385. -spec pforeach(fun((A) -> term()), list(A)) -> ok.
  386. pforeach(Fun, List) when is_function(Fun, 1), is_list(List) ->
  387. pforeach(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
  388. -spec pforeach(fun((A) -> term()), list(A), timeout()) -> ok.
  389. pforeach(Fun, List, Timeout) ->
  390. _ = pmap(Fun, List, Timeout),
  391. ok.
  392. %% @doc Like lists:map/2, only the callback function is evaluated
  393. %% concurrently.
  394. -spec pmap(fun((A) -> B), list(A)) -> list(B).
  395. pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
  396. pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
  397. -spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
  398. pmap(Fun, List, Timeout) when
  399. is_function(Fun, 1),
  400. is_list(List),
  401. (is_integer(Timeout) andalso Timeout >= 0 orelse Timeout =:= infinity)
  402. ->
  403. nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).
  404. %% @doc Delegate a function to a worker process.
  405. %% The function may spawn_link other processes but we do not
  406. %% want the caller process to be linked.
  407. %% This is done by isolating the possible link with a not-linked
  408. %% middleman process.
  409. nolink_apply(Fun) -> nolink_apply(Fun, infinity).
  410. %% @doc Same as `nolink_apply/1', with a timeout.
  411. -spec nolink_apply(function(), timeout()) -> term().
  412. nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
  413. Caller = self(),
  414. ResRef = alias([reply]),
  415. Middleman = erlang:spawn(
  416. fun() ->
  417. process_flag(trap_exit, true),
  418. CallerMRef = erlang:monitor(process, Caller),
  419. Worker = erlang:spawn_link(
  420. fun() ->
  421. Res =
  422. try
  423. {normal, Fun()}
  424. catch
  425. C:E:S ->
  426. {exception, {C, E, S}}
  427. end,
  428. _ = erlang:send(ResRef, {ResRef, Res}),
  429. ?tp(pmap_middleman_sent_response, #{}),
  430. exit(normal)
  431. end
  432. ),
  433. receive
  434. {'DOWN', CallerMRef, process, _, _} ->
  435. %% For whatever reason, if the caller is dead,
  436. %% there is no reason to continue
  437. exit(Worker, kill),
  438. exit(normal);
  439. {'EXIT', Worker, normal} ->
  440. exit(normal);
  441. {'EXIT', Worker, Reason} ->
  442. %% worker exited with some reason other than 'normal'
  443. _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}),
  444. exit(normal)
  445. end
  446. end
  447. ),
  448. receive
  449. {ResRef, {normal, Result}} ->
  450. Result;
  451. {ResRef, {exception, {C, E, S}}} ->
  452. erlang:raise(C, E, S);
  453. {ResRef, {'EXIT', Reason}} ->
  454. exit(Reason)
  455. after Timeout ->
  456. %% possible race condition: a message was received just as we enter the after
  457. %% block.
  458. ?tp(pmap_timeout, #{}),
  459. unalias(ResRef),
  460. exit(Middleman, kill),
  461. receive
  462. {ResRef, {normal, Result}} ->
  463. Result;
  464. {ResRef, {exception, {C, E, S}}} ->
  465. erlang:raise(C, E, S);
  466. {ResRef, {'EXIT', Reason}} ->
  467. exit(Reason)
  468. after 0 ->
  469. exit(timeout)
  470. end
  471. end.
  472. safe_to_existing_atom(In) ->
  473. safe_to_existing_atom(In, utf8).
  474. safe_to_existing_atom(Bin, Encoding) when is_binary(Bin) ->
  475. try_to_existing_atom(fun erlang:binary_to_existing_atom/2, Bin, Encoding);
  476. safe_to_existing_atom(List, Encoding) when is_list(List) ->
  477. try_to_existing_atom(fun(In, _) -> erlang:list_to_existing_atom(In) end, List, Encoding);
  478. safe_to_existing_atom(Atom, _Encoding) when is_atom(Atom) ->
  479. {ok, Atom};
  480. safe_to_existing_atom(_Any, _Encoding) ->
  481. {error, invalid_type}.
  482. -spec tcp_keepalive_opts(term(), non_neg_integer(), non_neg_integer(), non_neg_integer()) ->
  483. {ok, [{keepalive, true} | {raw, non_neg_integer(), non_neg_integer(), binary()}]}
  484. | {error, {unsupported_os, term()}}.
  485. tcp_keepalive_opts({unix, linux}, Idle, Interval, Probes) ->
  486. {ok, [
  487. {keepalive, true},
  488. {raw, 6, 4, <<Idle:32/native>>},
  489. {raw, 6, 5, <<Interval:32/native>>},
  490. {raw, 6, 6, <<Probes:32/native>>}
  491. ]};
  492. tcp_keepalive_opts({unix, darwin}, Idle, Interval, Probes) ->
  493. {ok, [
  494. {keepalive, true},
  495. {raw, 6, 16#10, <<Idle:32/native>>},
  496. {raw, 6, 16#101, <<Interval:32/native>>},
  497. {raw, 6, 16#102, <<Probes:32/native>>}
  498. ]};
  499. tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
  500. {error, {unsupported_os, OS}}.
  501. format(Term) ->
  502. iolist_to_binary(io_lib:format("~0p", [Term])).
  503. -spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return().
  504. call_first_defined([{Module, Function, Args} | Rest]) ->
  505. try
  506. apply(Module, Function, Args)
  507. catch
  508. error:undef:Stacktrace ->
  509. case Stacktrace of
  510. [{Module, Function, _, _} | _] ->
  511. call_first_defined(Rest);
  512. _ ->
  513. erlang:raise(error, undef, Stacktrace)
  514. end
  515. end;
  516. call_first_defined([]) ->
  517. error(none_fun_is_defined).
  518. %%------------------------------------------------------------------------------
  519. %% Internal Functions
  520. %%------------------------------------------------------------------------------
  521. do_parallel_map(Fun, List) ->
  522. Parent = self(),
  523. PidList = lists:map(
  524. fun(Item) ->
  525. erlang:spawn_link(
  526. fun() ->
  527. Res =
  528. try
  529. {normal, Fun(Item)}
  530. catch
  531. C:E:St ->
  532. {exception, {C, E, St}}
  533. end,
  534. Parent ! {self(), Res}
  535. end
  536. )
  537. end,
  538. List
  539. ),
  540. lists:foldr(
  541. fun(Pid, Acc) ->
  542. receive
  543. {Pid, {normal, Result}} ->
  544. [Result | Acc];
  545. {Pid, {exception, {C, E, St}}} ->
  546. erlang:raise(C, E, St)
  547. end
  548. end,
  549. [],
  550. PidList
  551. ).
  552. int_to_hex(I, N) when is_integer(I), I >= 0 ->
  553. int_to_hex([], I, 1, N).
  554. int_to_hex(L, I, Count, N) when
  555. I < 16
  556. ->
  557. pad([int_to_hex(I) | L], N - Count);
  558. int_to_hex(L, I, Count, N) ->
  559. int_to_hex([int_to_hex(I rem 16) | L], I div 16, Count + 1, N).
  560. int_to_hex(I) when 0 =< I, I =< 9 ->
  561. I + $0;
  562. int_to_hex(I) when 10 =< I, I =< 15 ->
  563. (I - 10) + $a.
  564. pad(L, 0) ->
  565. L;
  566. pad(L, Count) ->
  567. pad([$0 | L], Count - 1).
  568. readable_error_msg(Msg) when is_binary(Msg) -> Msg;
  569. readable_error_msg(Error) ->
  570. case io_lib:printable_unicode_list(Error) of
  571. true ->
  572. unicode:characters_to_binary(Error, utf8);
  573. false ->
  574. case emqx_hocon:format_error(Error, #{no_stacktrace => true}) of
  575. {ok, Msg} ->
  576. Msg;
  577. false ->
  578. to_hr_error(Error)
  579. end
  580. end.
  581. to_hr_error(nxdomain) ->
  582. <<"Could not resolve host">>;
  583. to_hr_error(econnrefused) ->
  584. <<"Connection refused">>;
  585. to_hr_error({unauthorized_client, _}) ->
  586. <<"Unauthorized client">>;
  587. to_hr_error({not_authorized, _}) ->
  588. <<"Not authorized">>;
  589. to_hr_error({malformed_username_or_password, _}) ->
  590. <<"Bad username or password">>;
  591. to_hr_error(Error) ->
  592. format(Error).
  593. try_to_existing_atom(Convert, Data, Encoding) ->
  594. try Convert(Data, Encoding) of
  595. Atom ->
  596. {ok, Atom}
  597. catch
  598. _:Reason -> {error, Reason}
  599. end.
  600. redact(Term) ->
  601. emqx_utils_redact:redact(Term).
  602. redact(Term, Checker) ->
  603. emqx_utils_redact:redact(Term, Checker).
  604. deobfuscate(NewConf, OldConf) ->
  605. emqx_utils_redact:deobfuscate(NewConf, OldConf).
  606. is_redacted(K, V) ->
  607. emqx_utils_redact:is_redacted(K, V).
  608. is_redacted(K, V, Fun) ->
  609. emqx_utils_redact:is_redacted(K, V, Fun).
  610. -ifdef(TEST).
  611. -include_lib("eunit/include/eunit.hrl").
  612. ipv6_probe_test() ->
  613. try gen_tcp:ipv6_probe() of
  614. true ->
  615. ?assertEqual([{ipv6_probe, true}], ipv6_probe([]))
  616. catch
  617. _:_ ->
  618. ok
  619. end.
  620. -endif.
  621. pub_props_to_packet(Properties) ->
  622. F = fun
  623. ('User-Property', M) ->
  624. case is_map(M) andalso map_size(M) > 0 of
  625. true -> {true, maps:to_list(M)};
  626. false -> false
  627. end;
  628. ('User-Property-Pairs', _) ->
  629. false;
  630. (_, _) ->
  631. true
  632. end,
  633. maps:filtermap(F, Properties).
  634. %% fix filename by replacing characters which could be invalid on some filesystems
  635. %% with safe ones
  636. -spec safe_filename(binary() | unicode:chardata()) -> binary() | [unicode:chardata()].
  637. safe_filename(Filename) when is_binary(Filename) ->
  638. binary:replace(Filename, <<":">>, <<"-">>, [global]);
  639. safe_filename(Filename) when is_list(Filename) ->
  640. lists:flatten(string:replace(Filename, ":", "-", all)).
  641. %% @doc Compares two lists of maps and returns the differences between them in a
  642. %% map containing four keys – 'removed', 'added', 'identical', and 'changed' –
  643. %% each holding a list of maps. Elements are compared using key function KeyFunc
  644. %% to extract the comparison key used for matching.
  645. %%
  646. %% The return value is a map with the following keys and the list of maps as its values:
  647. %% * 'removed' – a list of maps that were present in the Old list, but not found in the New list.
  648. %% * 'added' – a list of maps that were present in the New list, but not found in the Old list.
  649. %% * 'identical' – a list of maps that were present in both lists and have the same comparison key value.
  650. %% * 'changed' – a list of pairs of maps representing the changes between maps present in the New and Old lists.
  651. %% The first map in the pair represents the map in the Old list, and the second map
  652. %% represents the potential modification in the New list.
  653. %% The KeyFunc parameter is a function that extracts the comparison key used
  654. %% for matching from each map. The function should return a comparable term,
  655. %% such as an atom, a number, or a string. This is used to determine if each
  656. %% element is the same in both lists.
  657. -spec diff_lists(list(T), list(T), Func) ->
  658. #{
  659. added := list(T),
  660. identical := list(T),
  661. removed := list(T),
  662. changed := list({Old :: T, New :: T})
  663. }
  664. when
  665. Func :: fun((T) -> any()),
  666. T :: any().
  667. diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
  668. Removed =
  669. lists:foldl(
  670. fun(E, RemovedAcc) ->
  671. case search(KeyFunc(E), KeyFunc, New) of
  672. false -> [E | RemovedAcc];
  673. _ -> RemovedAcc
  674. end
  675. end,
  676. [],
  677. Old
  678. ),
  679. {Added, Identical, Changed} =
  680. lists:foldl(
  681. fun(E, Acc) ->
  682. {Added0, Identical0, Changed0} = Acc,
  683. case search(KeyFunc(E), KeyFunc, Old) of
  684. false ->
  685. {[E | Added0], Identical0, Changed0};
  686. E ->
  687. {Added0, [E | Identical0], Changed0};
  688. E1 ->
  689. {Added0, Identical0, [{E1, E} | Changed0]}
  690. end
  691. end,
  692. {[], [], []},
  693. New
  694. ),
  695. #{
  696. removed => lists:reverse(Removed),
  697. added => lists:reverse(Added),
  698. identical => lists:reverse(Identical),
  699. changed => lists:reverse(Changed)
  700. }.
  701. %% @doc Merges two lists preserving the original order of elements in both lists.
  702. %% KeyFunc must extract a unique key from each element.
  703. %% If two keys exist in both lists, the value in List1 is superseded by the value in List2, but
  704. %% the element position in the result list will equal its position in List1.
  705. %% Example:
  706. %% emqx_utils:merge_append_lists(
  707. %% [#{id => a, val => old}, #{id => b, val => old}],
  708. %% [#{id => a, val => new}, #{id => c}, #{id => b, val => new}, #{id => d}],
  709. %% fun(#{id := Id}) -> Id end).
  710. %% [#{id => a,val => new},
  711. %% #{id => b,val => new},
  712. %% #{id => c},
  713. %% #{id => d}]
  714. -spec merge_lists(list(T), list(T), KeyFunc) -> list(T) when
  715. KeyFunc :: fun((T) -> any()),
  716. T :: any().
  717. merge_lists(List1, List2, KeyFunc) ->
  718. WithKeysList2 = lists:map(fun(E) -> {KeyFunc(E), E} end, List2),
  719. WithKeysList1 = lists:map(
  720. fun(E) ->
  721. K = KeyFunc(E),
  722. case lists:keyfind(K, 1, WithKeysList2) of
  723. false -> {K, E};
  724. WithKey1 -> WithKey1
  725. end
  726. end,
  727. List1
  728. ),
  729. NewWithKeysList2 = lists:filter(
  730. fun({K, _}) ->
  731. not lists:keymember(K, 1, WithKeysList1)
  732. end,
  733. WithKeysList2
  734. ),
  735. [E || {_, E} <- WithKeysList1 ++ NewWithKeysList2].
  736. search(_ExpectValue, _KeyFunc, []) ->
  737. false;
  738. search(ExpectValue, KeyFunc, [Item | List]) ->
  739. case KeyFunc(Item) =:= ExpectValue of
  740. true -> Item;
  741. false -> search(ExpectValue, KeyFunc, List)
  742. end.
  743. %% @doc Maps over a list of terms and flattens the result, giving back a flat
  744. %% list of terms. It's similar to `lists:flatmap/2`, but it also works on a
  745. %% single term as `Fun` output (thus, the wordplay on "flatter").
  746. %% The purpose of this function is to adapt to `Fun`s that return either a `[]`
  747. %% or a term, and to avoid costs of list construction and flattening when
  748. %% dealing with large lists.
  749. -spec flattermap(Fun, [X]) -> [X] when
  750. Fun :: fun((X) -> [X] | X).
  751. flattermap(_Fun, []) ->
  752. [];
  753. flattermap(Fun, [X | Xs]) ->
  754. flatcomb(Fun(X), flattermap(Fun, Xs)).
  755. flatcomb([], Zs) ->
  756. Zs;
  757. flatcomb(Ys = [_ | _], []) ->
  758. Ys;
  759. flatcomb(Ys = [_ | _], Zs = [_ | _]) ->
  760. Ys ++ Zs;
  761. flatcomb(Y, Zs) ->
  762. [Y | Zs].
  763. %% @doc Format IP address tuple or {IP, Port} tuple to string.
  764. ntoa({IP, Port}) ->
  765. ntoa(IP) ++ ":" ++ integer_to_list(Port);
  766. ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
  767. %% v6 piggyback v4
  768. inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
  769. ntoa(IP) ->
  770. inet_parse:ntoa(IP).
  771. %% @doc Return true if the provided string is a restricted string:
  772. %% Start with a letter or a digit,
  773. %% remaining characters can be '-' or '_' in addition to letters and digits
  774. is_restricted_str(String) ->
  775. RE = <<"^[A-Za-z0-9]+[A-Za-z0-9-_]*$">>,
  776. match =:= re:run(String, RE, [{capture, none}]).
  777. -ifdef(TEST).
  778. -include_lib("eunit/include/eunit.hrl").
  779. diff_lists_test() ->
  780. KeyFunc = fun(#{name := Name}) -> Name end,
  781. ?assertEqual(
  782. #{
  783. removed => [],
  784. added => [],
  785. identical => [],
  786. changed => []
  787. },
  788. diff_lists([], [], KeyFunc)
  789. ),
  790. %% test removed list
  791. ?assertEqual(
  792. #{
  793. removed => [#{name => a, value => 1}],
  794. added => [],
  795. identical => [],
  796. changed => []
  797. },
  798. diff_lists([], [#{name => a, value => 1}], KeyFunc)
  799. ),
  800. %% test added list
  801. ?assertEqual(
  802. #{
  803. removed => [],
  804. added => [#{name => a, value => 1}],
  805. identical => [],
  806. changed => []
  807. },
  808. diff_lists([#{name => a, value => 1}], [], KeyFunc)
  809. ),
  810. %% test identical list
  811. ?assertEqual(
  812. #{
  813. removed => [],
  814. added => [],
  815. identical => [#{name => a, value => 1}],
  816. changed => []
  817. },
  818. diff_lists([#{name => a, value => 1}], [#{name => a, value => 1}], KeyFunc)
  819. ),
  820. Old = [
  821. #{name => a, value => 1},
  822. #{name => b, value => 4},
  823. #{name => e, value => 2},
  824. #{name => d, value => 4}
  825. ],
  826. New = [
  827. #{name => a, value => 1},
  828. #{name => b, value => 2},
  829. #{name => e, value => 2},
  830. #{name => c, value => 3}
  831. ],
  832. Diff = diff_lists(New, Old, KeyFunc),
  833. ?assertEqual(
  834. #{
  835. added => [
  836. #{name => c, value => 3}
  837. ],
  838. identical => [
  839. #{name => a, value => 1},
  840. #{name => e, value => 2}
  841. ],
  842. removed => [
  843. #{name => d, value => 4}
  844. ],
  845. changed => [{#{name => b, value => 4}, #{name => b, value => 2}}]
  846. },
  847. Diff
  848. ),
  849. ok.
  850. -endif.