emqx_rule_engine_api.erl 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine_api).
  17. -include("rule_engine.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include_lib("hocon/include/hoconsc.hrl").
  20. -include_lib("typerefl/include/types.hrl").
  21. -behaviour(minirest_api).
  22. -import(hoconsc, [mk/2, ref/2, array/1]).
  23. -export([printable_function_name/2]).
  24. %% Swagger specs from hocon schema
  25. -export([api_spec/0, paths/0, schema/1, namespace/0]).
  26. %% API callbacks
  27. -export([
  28. '/rule_events'/2,
  29. '/rule_test'/2,
  30. '/rules'/2,
  31. '/rules/:id'/2,
  32. '/rules/:id/metrics'/2,
  33. '/rules/:id/metrics/reset'/2
  34. ]).
  35. %% query callback
  36. -export([qs2ms/2, run_fuzzy_match/2, format_rule_resp/1]).
  37. -define(ERR_BADARGS(REASON), begin
  38. R0 = err_msg(REASON),
  39. <<"Bad Arguments: ", R0/binary>>
  40. end).
  41. -define(CHECK_PARAMS(PARAMS, TAG, EXPR),
  42. case emqx_rule_api_schema:check_params(PARAMS, TAG) of
  43. {ok, CheckedParams} ->
  44. EXPR;
  45. {error, REASON} ->
  46. {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(REASON)}}
  47. end
  48. ).
  49. -define(METRICS(
  50. MATCH,
  51. PASS,
  52. FAIL,
  53. FAIL_EX,
  54. FAIL_NORES,
  55. O_TOTAL,
  56. O_FAIL,
  57. O_FAIL_OOS,
  58. O_FAIL_UNKNOWN,
  59. O_SUCC,
  60. RATE,
  61. RATE_MAX,
  62. RATE_5
  63. ),
  64. #{
  65. 'matched' => MATCH,
  66. 'passed' => PASS,
  67. 'failed' => FAIL,
  68. 'failed.exception' => FAIL_EX,
  69. 'failed.no_result' => FAIL_NORES,
  70. 'actions.total' => O_TOTAL,
  71. 'actions.failed' => O_FAIL,
  72. 'actions.failed.out_of_service' => O_FAIL_OOS,
  73. 'actions.failed.unknown' => O_FAIL_UNKNOWN,
  74. 'actions.success' => O_SUCC,
  75. 'matched.rate' => RATE,
  76. 'matched.rate.max' => RATE_MAX,
  77. 'matched.rate.last5m' => RATE_5
  78. }
  79. ).
  80. -define(metrics(
  81. MATCH,
  82. PASS,
  83. FAIL,
  84. FAIL_EX,
  85. FAIL_NORES,
  86. O_TOTAL,
  87. O_FAIL,
  88. O_FAIL_OOS,
  89. O_FAIL_UNKNOWN,
  90. O_SUCC,
  91. RATE,
  92. RATE_MAX,
  93. RATE_5
  94. ),
  95. #{
  96. 'matched' := MATCH,
  97. 'passed' := PASS,
  98. 'failed' := FAIL,
  99. 'failed.exception' := FAIL_EX,
  100. 'failed.no_result' := FAIL_NORES,
  101. 'actions.total' := O_TOTAL,
  102. 'actions.failed' := O_FAIL,
  103. 'actions.failed.out_of_service' := O_FAIL_OOS,
  104. 'actions.failed.unknown' := O_FAIL_UNKNOWN,
  105. 'actions.success' := O_SUCC,
  106. 'matched.rate' := RATE,
  107. 'matched.rate.max' := RATE_MAX,
  108. 'matched.rate.last5m' := RATE_5
  109. }
  110. ).
  111. -define(RULE_QS_SCHEMA, [
  112. {<<"enable">>, atom},
  113. {<<"from">>, binary},
  114. {<<"like_id">>, binary},
  115. {<<"like_from">>, binary},
  116. {<<"match_from">>, binary},
  117. {<<"like_description">>, binary}
  118. ]).
  119. namespace() -> "rule".
  120. api_spec() ->
  121. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
  122. paths() ->
  123. [
  124. "/rule_events",
  125. "/rule_test",
  126. "/rules",
  127. "/rules/:id",
  128. "/rules/:id/metrics",
  129. "/rules/:id/metrics/reset"
  130. ].
  131. error_schema(Code, Message) when is_atom(Code) ->
  132. emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)).
  133. rule_creation_schema() ->
  134. ref(emqx_rule_api_schema, "rule_creation").
  135. rule_test_schema() ->
  136. ref(emqx_rule_api_schema, "rule_test").
  137. rule_info_schema() ->
  138. ref(emqx_rule_api_schema, "rule_info").
  139. rule_metrics_schema() ->
  140. ref(emqx_rule_api_schema, "rule_metrics").
  141. schema("/rules") ->
  142. #{
  143. 'operationId' => '/rules',
  144. get => #{
  145. tags => [<<"rules">>],
  146. description => ?DESC("api1"),
  147. parameters => [
  148. {enable,
  149. mk(boolean(), #{desc => ?DESC("api1_enable"), in => query, required => false})},
  150. {from, mk(binary(), #{desc => ?DESC("api1_from"), in => query, required => false})},
  151. {like_id,
  152. mk(binary(), #{desc => ?DESC("api1_like_id"), in => query, required => false})},
  153. {like_from,
  154. mk(binary(), #{desc => ?DESC("api1_like_from"), in => query, required => false})},
  155. {like_description,
  156. mk(binary(), #{
  157. desc => ?DESC("api1_like_description"), in => query, required => false
  158. })},
  159. {match_from,
  160. mk(binary(), #{desc => ?DESC("api1_match_from"), in => query, required => false})},
  161. ref(emqx_dashboard_swagger, page),
  162. ref(emqx_dashboard_swagger, limit)
  163. ],
  164. summary => <<"List Rules">>,
  165. responses => #{
  166. 200 =>
  167. [
  168. {data, mk(array(rule_info_schema()), #{desc => ?DESC("desc9")})},
  169. {meta, mk(ref(emqx_dashboard_swagger, meta), #{})}
  170. ],
  171. 400 => error_schema('BAD_REQUEST', "Invalid Parameters")
  172. }
  173. },
  174. post => #{
  175. tags => [<<"rules">>],
  176. description => ?DESC("api2"),
  177. summary => <<"Create a Rule">>,
  178. 'requestBody' => rule_creation_schema(),
  179. responses => #{
  180. 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
  181. 201 => rule_info_schema()
  182. }
  183. }
  184. };
  185. schema("/rule_events") ->
  186. #{
  187. 'operationId' => '/rule_events',
  188. get => #{
  189. tags => [<<"rules">>],
  190. description => ?DESC("api3"),
  191. summary => <<"List Events">>,
  192. responses => #{
  193. 200 => mk(ref(emqx_rule_api_schema, "rule_events"), #{})
  194. }
  195. }
  196. };
  197. schema("/rules/:id") ->
  198. #{
  199. 'operationId' => '/rules/:id',
  200. get => #{
  201. tags => [<<"rules">>],
  202. description => ?DESC("api4"),
  203. summary => <<"Get a Rule">>,
  204. parameters => param_path_id(),
  205. responses => #{
  206. 404 => error_schema('NOT_FOUND', "Rule not found"),
  207. 200 => rule_info_schema()
  208. }
  209. },
  210. put => #{
  211. tags => [<<"rules">>],
  212. description => ?DESC("api5"),
  213. summary => <<"Update a Rule">>,
  214. parameters => param_path_id(),
  215. 'requestBody' => rule_creation_schema(),
  216. responses => #{
  217. 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
  218. 200 => rule_info_schema()
  219. }
  220. },
  221. delete => #{
  222. tags => [<<"rules">>],
  223. description => ?DESC("api6"),
  224. summary => <<"Delete a Rule">>,
  225. parameters => param_path_id(),
  226. responses => #{
  227. 204 => <<"Delete rule successfully">>
  228. }
  229. }
  230. };
  231. schema("/rules/:id/metrics") ->
  232. #{
  233. 'operationId' => '/rules/:id/metrics',
  234. get => #{
  235. tags => [<<"rules">>],
  236. description => ?DESC("api4_1"),
  237. summary => <<"Get a Rule's Metrics">>,
  238. parameters => param_path_id(),
  239. responses => #{
  240. 404 => error_schema('NOT_FOUND', "Rule not found"),
  241. 200 => rule_metrics_schema()
  242. }
  243. }
  244. };
  245. schema("/rules/:id/metrics/reset") ->
  246. #{
  247. 'operationId' => '/rules/:id/metrics/reset',
  248. put => #{
  249. tags => [<<"rules">>],
  250. description => ?DESC("api7"),
  251. summary => <<"Reset a Rule Metrics">>,
  252. parameters => param_path_id(),
  253. responses => #{
  254. 404 => error_schema('NOT_FOUND', "Rule not found"),
  255. 204 => <<"Reset Success">>
  256. }
  257. }
  258. };
  259. schema("/rule_test") ->
  260. #{
  261. 'operationId' => '/rule_test',
  262. post => #{
  263. tags => [<<"rules">>],
  264. description => ?DESC("api8"),
  265. summary => <<"Test a Rule">>,
  266. 'requestBody' => rule_test_schema(),
  267. responses => #{
  268. 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
  269. 412 => error_schema('NOT_MATCH', "SQL Not Match"),
  270. 200 => <<"Rule Test Pass">>
  271. }
  272. }
  273. }.
  274. param_path_id() ->
  275. [{id, mk(binary(), #{in => path, example => <<"my_rule_id">>})}].
  276. %%------------------------------------------------------------------------------
  277. %% Rules API
  278. %%------------------------------------------------------------------------------
  279. '/rule_events'(get, _Params) ->
  280. {200, emqx_rule_events:event_info()}.
  281. '/rules'(get, #{query_string := QueryString}) ->
  282. case
  283. emqx_mgmt_api:node_query(
  284. node(),
  285. ?RULE_TAB,
  286. QueryString,
  287. ?RULE_QS_SCHEMA,
  288. fun ?MODULE:qs2ms/2,
  289. fun ?MODULE:format_rule_resp/1
  290. )
  291. of
  292. {error, page_limit_invalid} ->
  293. {400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
  294. Result ->
  295. {200, Result}
  296. end;
  297. '/rules'(post, #{body := Params0}) ->
  298. case maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))) of
  299. <<>> ->
  300. {400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
  301. Id ->
  302. Params = filter_out_request_body(add_metadata(Params0)),
  303. ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
  304. case emqx_rule_engine:get_rule(Id) of
  305. {ok, _Rule} ->
  306. {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
  307. not_found ->
  308. case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
  309. {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
  310. [Rule] = get_one_rule(AllRules, Id),
  311. {201, format_rule_resp(Rule)};
  312. {error, Reason} ->
  313. ?SLOG(error, #{
  314. msg => "create_rule_failed",
  315. id => Id,
  316. reason => Reason
  317. }),
  318. {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
  319. end
  320. end
  321. end.
  322. '/rule_test'(post, #{body := Params}) ->
  323. ?CHECK_PARAMS(
  324. Params,
  325. rule_test,
  326. case emqx_rule_sqltester:test(CheckedParams) of
  327. {ok, Result} ->
  328. {200, Result};
  329. {error, {parse_error, Reason}} ->
  330. {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
  331. {error, nomatch} ->
  332. {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}};
  333. {error, Reason} ->
  334. {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
  335. end
  336. ).
  337. '/rules/:id'(get, #{bindings := #{id := Id}}) ->
  338. case emqx_rule_engine:get_rule(Id) of
  339. {ok, Rule} ->
  340. {200, format_rule_resp(Rule)};
  341. not_found ->
  342. {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
  343. end;
  344. '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
  345. Params = filter_out_request_body(Params0),
  346. ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
  347. case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
  348. {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
  349. [Rule] = get_one_rule(AllRules, Id),
  350. {200, format_rule_resp(Rule)};
  351. {error, Reason} ->
  352. ?SLOG(error, #{
  353. msg => "update_rule_failed",
  354. id => Id,
  355. reason => Reason
  356. }),
  357. {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
  358. end;
  359. '/rules/:id'(delete, #{bindings := #{id := Id}}) ->
  360. ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
  361. case emqx_conf:remove(ConfPath, #{override_to => cluster}) of
  362. {ok, _} ->
  363. {204};
  364. {error, Reason} ->
  365. ?SLOG(error, #{
  366. msg => "delete_rule_failed",
  367. id => Id,
  368. reason => Reason
  369. }),
  370. {500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}}
  371. end.
  372. '/rules/:id/metrics'(get, #{bindings := #{id := Id}}) ->
  373. case emqx_rule_engine:get_rule(Id) of
  374. {ok, _Rule} ->
  375. NodeMetrics = get_rule_metrics(Id),
  376. MetricsResp =
  377. #{
  378. id => Id,
  379. metrics => aggregate_metrics(NodeMetrics),
  380. node_metrics => NodeMetrics
  381. },
  382. {200, MetricsResp};
  383. not_found ->
  384. {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
  385. end.
  386. '/rules/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
  387. case emqx_rule_engine:get_rule(Id) of
  388. {ok, _Rule} ->
  389. ok = emqx_rule_engine_proto_v1:reset_metrics(Id),
  390. {204};
  391. not_found ->
  392. {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
  393. end.
  394. %%------------------------------------------------------------------------------
  395. %% Internal functions
  396. %%------------------------------------------------------------------------------
  397. err_msg(Msg) -> emqx_misc:readable_error_msg(Msg).
  398. format_rule_resp(Rules) when is_list(Rules) ->
  399. [format_rule_resp(R) || R <- Rules];
  400. format_rule_resp({Id, Rule}) ->
  401. format_rule_resp(Rule#{id => Id});
  402. format_rule_resp(#{
  403. id := Id,
  404. name := Name,
  405. created_at := CreatedAt,
  406. from := Topics,
  407. actions := Action,
  408. sql := SQL,
  409. enable := Enable,
  410. description := Descr
  411. }) ->
  412. #{
  413. id => Id,
  414. name => Name,
  415. from => Topics,
  416. actions => format_action(Action),
  417. sql => SQL,
  418. enable => Enable,
  419. created_at => format_datetime(CreatedAt, millisecond),
  420. description => Descr
  421. }.
  422. format_datetime(Timestamp, Unit) ->
  423. list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
  424. format_action(Actions) ->
  425. [do_format_action(Act) || Act <- Actions].
  426. do_format_action(#{mod := Mod, func := Func, args := Args}) ->
  427. #{
  428. function => printable_function_name(Mod, Func),
  429. args => maps:remove(preprocessed_tmpl, Args)
  430. };
  431. do_format_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
  432. BridgeChannelId.
  433. printable_function_name(emqx_rule_actions, Func) ->
  434. Func;
  435. printable_function_name(Mod, Func) ->
  436. list_to_binary(lists:concat([Mod, ":", Func])).
  437. get_rule_metrics(Id) ->
  438. Format = fun(
  439. Node,
  440. #{
  441. counters :=
  442. #{
  443. 'matched' := Matched,
  444. 'passed' := Passed,
  445. 'failed' := Failed,
  446. 'failed.exception' := FailedEx,
  447. 'failed.no_result' := FailedNoRes,
  448. 'actions.total' := OTotal,
  449. 'actions.failed' := OFailed,
  450. 'actions.failed.out_of_service' := OFailedOOS,
  451. 'actions.failed.unknown' := OFailedUnknown,
  452. 'actions.success' := OFailedSucc
  453. },
  454. rate :=
  455. #{
  456. 'matched' :=
  457. #{current := Current, max := Max, last5m := Last5M}
  458. }
  459. }
  460. ) ->
  461. #{
  462. metrics => ?METRICS(
  463. Matched,
  464. Passed,
  465. Failed,
  466. FailedEx,
  467. FailedNoRes,
  468. OTotal,
  469. OFailed,
  470. OFailedOOS,
  471. OFailedUnknown,
  472. OFailedSucc,
  473. Current,
  474. Max,
  475. Last5M
  476. ),
  477. node => Node
  478. }
  479. end,
  480. [
  481. Format(Node, emqx_plugin_libs_proto_v1:get_metrics(Node, rule_metrics, Id))
  482. || Node <- mria_mnesia:running_nodes()
  483. ].
  484. aggregate_metrics(AllMetrics) ->
  485. InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
  486. lists:foldl(
  487. fun(
  488. #{
  489. metrics := ?metrics(
  490. Match1,
  491. Passed1,
  492. Failed1,
  493. FailedEx1,
  494. FailedNoRes1,
  495. OTotal1,
  496. OFailed1,
  497. OFailedOOS1,
  498. OFailedUnknown1,
  499. OFailedSucc1,
  500. Rate1,
  501. RateMax1,
  502. Rate5m1
  503. )
  504. },
  505. ?metrics(
  506. Match0,
  507. Passed0,
  508. Failed0,
  509. FailedEx0,
  510. FailedNoRes0,
  511. OTotal0,
  512. OFailed0,
  513. OFailedOOS0,
  514. OFailedUnknown0,
  515. OFailedSucc0,
  516. Rate0,
  517. RateMax0,
  518. Rate5m0
  519. )
  520. ) ->
  521. ?METRICS(
  522. Match1 + Match0,
  523. Passed1 + Passed0,
  524. Failed1 + Failed0,
  525. FailedEx1 + FailedEx0,
  526. FailedNoRes1 + FailedNoRes0,
  527. OTotal1 + OTotal0,
  528. OFailed1 + OFailed0,
  529. OFailedOOS1 + OFailedOOS0,
  530. OFailedUnknown1 + OFailedUnknown0,
  531. OFailedSucc1 + OFailedSucc0,
  532. Rate1 + Rate0,
  533. RateMax1 + RateMax0,
  534. Rate5m1 + Rate5m0
  535. )
  536. end,
  537. InitMetrics,
  538. AllMetrics
  539. ).
  540. get_one_rule(AllRules, Id) ->
  541. [R || R = #{id := Id0} <- AllRules, Id0 == Id].
  542. add_metadata(Params) ->
  543. Params#{
  544. <<"metadata">> => #{
  545. <<"created_at">> => emqx_rule_engine:now_ms()
  546. }
  547. }.
  548. filter_out_request_body(Conf) ->
  549. ExtraConfs = [
  550. <<"id">>,
  551. <<"status">>,
  552. <<"node_status">>,
  553. <<"node_metrics">>,
  554. <<"metrics">>,
  555. <<"node">>
  556. ],
  557. maps:without(ExtraConfs, Conf).
  558. -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
  559. qs2ms(_Tab, {Qs, Fuzzy}) ->
  560. case lists:keytake(from, 1, Qs) of
  561. false ->
  562. #{match_spec => generate_match_spec(Qs), fuzzy_fun => fuzzy_match_fun(Fuzzy)};
  563. {value, {from, '=:=', From}, Ls} ->
  564. #{
  565. match_spec => generate_match_spec(Ls),
  566. fuzzy_fun => fuzzy_match_fun([{from, '=:=', From} | Fuzzy])
  567. }
  568. end.
  569. generate_match_spec(Qs) ->
  570. {MtchHead, Conds} = generate_match_spec(Qs, 2, {#{}, []}),
  571. [{{'_', MtchHead}, Conds, ['$_']}].
  572. generate_match_spec([], _, {MtchHead, Conds}) ->
  573. {MtchHead, lists:reverse(Conds)};
  574. generate_match_spec([Qs | Rest], N, {MtchHead, Conds}) ->
  575. Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
  576. NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
  577. NConds = put_conds(Qs, Holder, Conds),
  578. generate_match_spec(Rest, N + 1, {NMtchHead, NConds}).
  579. put_conds({_, Op, V}, Holder, Conds) ->
  580. [{Op, Holder, V} | Conds].
  581. ms(enable, X) ->
  582. #{enable => X}.
  583. fuzzy_match_fun([]) ->
  584. undefined;
  585. fuzzy_match_fun(Fuzzy) ->
  586. {fun ?MODULE:run_fuzzy_match/2, [Fuzzy]}.
  587. run_fuzzy_match(_, []) ->
  588. true;
  589. run_fuzzy_match(E = {Id, _}, [{id, like, Pattern} | Fuzzy]) ->
  590. binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
  591. run_fuzzy_match(E = {_Id, #{description := Desc}}, [{description, like, Pattern} | Fuzzy]) ->
  592. binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
  593. run_fuzzy_match(E = {_, #{from := Topics}}, [{from, '=:=', Pattern} | Fuzzy]) ->
  594. lists:member(Pattern, Topics) /= false andalso run_fuzzy_match(E, Fuzzy);
  595. run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, match, Pattern} | Fuzzy]) ->
  596. lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics) andalso
  597. run_fuzzy_match(E, Fuzzy);
  598. run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, like, Pattern} | Fuzzy]) ->
  599. lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics) andalso
  600. run_fuzzy_match(E, Fuzzy);
  601. run_fuzzy_match(E, [_ | Fuzzy]) ->
  602. run_fuzzy_match(E, Fuzzy).