emqx_rule_engine_api.erl 21 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine_api).
  17. -include("rule_engine.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include_lib("hocon/include/hoconsc.hrl").
  20. -include_lib("typerefl/include/types.hrl").
  21. -behaviour(minirest_api).
  22. -import(hoconsc, [mk/2, ref/2, array/1]).
  23. -export([printable_function_name/2]).
  24. %% Swagger specs from hocon schema
  25. -export([api_spec/0, paths/0, schema/1, namespace/0]).
  26. %% API callbacks
  27. -export([
  28. '/rule_events'/2,
  29. '/rule_test'/2,
  30. '/rules'/2,
  31. '/rules/:id'/2,
  32. '/rules/:id/metrics'/2,
  33. '/rules/:id/metrics/reset'/2
  34. ]).
  35. %% query callback
  36. -export([qs2ms/2, run_fuzzy_match/2, format_rule_resp/1]).
  37. -define(ERR_BADARGS(REASON), begin
  38. R0 = err_msg(REASON),
  39. <<"Bad Arguments: ", R0/binary>>
  40. end).
  41. -define(CHECK_PARAMS(PARAMS, TAG, EXPR),
  42. case emqx_rule_api_schema:check_params(PARAMS, TAG) of
  43. {ok, CheckedParams} ->
  44. EXPR;
  45. {error, REASON} ->
  46. {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(REASON)}}
  47. end
  48. ).
  49. -define(METRICS(
  50. MATCH,
  51. PASS,
  52. FAIL,
  53. FAIL_EX,
  54. FAIL_NORES,
  55. O_TOTAL,
  56. O_FAIL,
  57. O_FAIL_OOS,
  58. O_FAIL_UNKNOWN,
  59. O_SUCC,
  60. RATE,
  61. RATE_MAX,
  62. RATE_5
  63. ),
  64. #{
  65. 'matched' => MATCH,
  66. 'passed' => PASS,
  67. 'failed' => FAIL,
  68. 'failed.exception' => FAIL_EX,
  69. 'failed.no_result' => FAIL_NORES,
  70. 'actions.total' => O_TOTAL,
  71. 'actions.failed' => O_FAIL,
  72. 'actions.failed.out_of_service' => O_FAIL_OOS,
  73. 'actions.failed.unknown' => O_FAIL_UNKNOWN,
  74. 'actions.success' => O_SUCC,
  75. 'matched.rate' => RATE,
  76. 'matched.rate.max' => RATE_MAX,
  77. 'matched.rate.last5m' => RATE_5
  78. }
  79. ).
  80. -define(metrics(
  81. MATCH,
  82. PASS,
  83. FAIL,
  84. FAIL_EX,
  85. FAIL_NORES,
  86. O_TOTAL,
  87. O_FAIL,
  88. O_FAIL_OOS,
  89. O_FAIL_UNKNOWN,
  90. O_SUCC,
  91. RATE,
  92. RATE_MAX,
  93. RATE_5
  94. ),
  95. #{
  96. 'matched' := MATCH,
  97. 'passed' := PASS,
  98. 'failed' := FAIL,
  99. 'failed.exception' := FAIL_EX,
  100. 'failed.no_result' := FAIL_NORES,
  101. 'actions.total' := O_TOTAL,
  102. 'actions.failed' := O_FAIL,
  103. 'actions.failed.out_of_service' := O_FAIL_OOS,
  104. 'actions.failed.unknown' := O_FAIL_UNKNOWN,
  105. 'actions.success' := O_SUCC,
  106. 'matched.rate' := RATE,
  107. 'matched.rate.max' := RATE_MAX,
  108. 'matched.rate.last5m' := RATE_5
  109. }
  110. ).
  111. -define(RULE_QS_SCHEMA, [
  112. {<<"enable">>, atom},
  113. {<<"from">>, binary},
  114. {<<"like_id">>, binary},
  115. {<<"like_from">>, binary},
  116. {<<"match_from">>, binary},
  117. {<<"like_description">>, binary}
  118. ]).
  119. namespace() -> "rule".
  120. api_spec() ->
  121. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
  122. paths() ->
  123. [
  124. "/rule_events",
  125. "/rule_test",
  126. "/rules",
  127. "/rules/:id",
  128. "/rules/:id/metrics",
  129. "/rules/:id/metrics/reset"
  130. ].
  131. error_schema(Code, Message) when is_atom(Code) ->
  132. emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)).
  133. rule_creation_schema() ->
  134. ref(emqx_rule_api_schema, "rule_creation").
  135. rule_test_schema() ->
  136. ref(emqx_rule_api_schema, "rule_test").
  137. rule_info_schema() ->
  138. ref(emqx_rule_api_schema, "rule_info").
  139. rule_metrics_schema() ->
  140. ref(emqx_rule_api_schema, "rule_metrics").
  141. schema("/rules") ->
  142. #{
  143. 'operationId' => '/rules',
  144. get => #{
  145. tags => [<<"rules">>],
  146. description => ?DESC("api1"),
  147. parameters => [
  148. {enable,
  149. mk(boolean(), #{desc => ?DESC("api1_enable"), in => query, required => false})},
  150. {from, mk(binary(), #{desc => ?DESC("api1_from"), in => query, required => false})},
  151. {like_id,
  152. mk(binary(), #{desc => ?DESC("api1_like_id"), in => query, required => false})},
  153. {like_from,
  154. mk(binary(), #{desc => ?DESC("api1_like_from"), in => query, required => false})},
  155. {like_description,
  156. mk(binary(), #{
  157. desc => ?DESC("api1_like_description"), in => query, required => false
  158. })},
  159. {match_from,
  160. mk(binary(), #{desc => ?DESC("api1_match_from"), in => query, required => false})},
  161. ref(emqx_dashboard_swagger, page),
  162. ref(emqx_dashboard_swagger, limit)
  163. ],
  164. summary => <<"List Rules">>,
  165. responses => #{
  166. 200 =>
  167. [
  168. {data, mk(array(rule_info_schema()), #{desc => ?DESC("desc9")})},
  169. {meta, mk(ref(emqx_dashboard_swagger, meta), #{})}
  170. ],
  171. 400 => error_schema('BAD_REQUEST', "Invalid Parameters")
  172. }
  173. },
  174. post => #{
  175. tags => [<<"rules">>],
  176. description => ?DESC("api2"),
  177. summary => <<"Create a Rule">>,
  178. 'requestBody' => rule_creation_schema(),
  179. responses => #{
  180. 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
  181. 201 => rule_info_schema()
  182. }
  183. }
  184. };
  185. schema("/rule_events") ->
  186. #{
  187. 'operationId' => '/rule_events',
  188. get => #{
  189. tags => [<<"rules">>],
  190. description => ?DESC("api3"),
  191. summary => <<"List Events">>,
  192. responses => #{
  193. 200 => mk(ref(emqx_rule_api_schema, "rule_events"), #{})
  194. }
  195. }
  196. };
  197. schema("/rules/:id") ->
  198. #{
  199. 'operationId' => '/rules/:id',
  200. get => #{
  201. tags => [<<"rules">>],
  202. description => ?DESC("api4"),
  203. summary => <<"Get a Rule">>,
  204. parameters => param_path_id(),
  205. responses => #{
  206. 404 => error_schema('NOT_FOUND', "Rule not found"),
  207. 200 => rule_info_schema()
  208. }
  209. },
  210. put => #{
  211. tags => [<<"rules">>],
  212. description => ?DESC("api5"),
  213. summary => <<"Update a Rule">>,
  214. parameters => param_path_id(),
  215. 'requestBody' => rule_creation_schema(),
  216. responses => #{
  217. 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
  218. 200 => rule_info_schema()
  219. }
  220. },
  221. delete => #{
  222. tags => [<<"rules">>],
  223. description => ?DESC("api6"),
  224. summary => <<"Delete a Rule">>,
  225. parameters => param_path_id(),
  226. responses => #{
  227. 204 => <<"Delete rule successfully">>
  228. }
  229. }
  230. };
  231. schema("/rules/:id/metrics") ->
  232. #{
  233. 'operationId' => '/rules/:id/metrics',
  234. get => #{
  235. tags => [<<"rules">>],
  236. description => ?DESC("api4_1"),
  237. summary => <<"Get a Rule's Metrics">>,
  238. parameters => param_path_id(),
  239. responses => #{
  240. 404 => error_schema('NOT_FOUND', "Rule not found"),
  241. 200 => rule_metrics_schema()
  242. }
  243. }
  244. };
  245. schema("/rules/:id/metrics/reset") ->
  246. #{
  247. 'operationId' => '/rules/:id/metrics/reset',
  248. put => #{
  249. tags => [<<"rules">>],
  250. description => ?DESC("api7"),
  251. summary => <<"Reset a Rule Metrics">>,
  252. parameters => param_path_id(),
  253. responses => #{
  254. 404 => error_schema('NOT_FOUND', "Rule not found"),
  255. 204 => <<"Reset Success">>
  256. }
  257. }
  258. };
  259. schema("/rule_test") ->
  260. #{
  261. 'operationId' => '/rule_test',
  262. post => #{
  263. tags => [<<"rules">>],
  264. description => ?DESC("api8"),
  265. summary => <<"Test a Rule">>,
  266. 'requestBody' => rule_test_schema(),
  267. responses => #{
  268. 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
  269. 412 => error_schema('NOT_MATCH', "SQL Not Match"),
  270. 200 => <<"Rule Test Pass">>
  271. }
  272. }
  273. }.
  274. param_path_id() ->
  275. [{id, mk(binary(), #{in => path, example => <<"my_rule_id">>})}].
  276. %%------------------------------------------------------------------------------
  277. %% Rules API
  278. %%------------------------------------------------------------------------------
  279. '/rule_events'(get, _Params) ->
  280. {200, emqx_rule_events:event_info()}.
  281. '/rules'(get, #{query_string := QueryString}) ->
  282. case
  283. emqx_mgmt_api:node_query(
  284. node(),
  285. ?RULE_TAB,
  286. QueryString,
  287. ?RULE_QS_SCHEMA,
  288. fun ?MODULE:qs2ms/2,
  289. fun ?MODULE:format_rule_resp/1
  290. )
  291. of
  292. {error, page_limit_invalid} ->
  293. {400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
  294. Result ->
  295. {200, Result}
  296. end;
  297. '/rules'(post, #{body := Params0}) ->
  298. case maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))) of
  299. <<>> ->
  300. {400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
  301. Id ->
  302. Params = filter_out_request_body(add_metadata(Params0)),
  303. ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
  304. case emqx_rule_engine:get_rule(Id) of
  305. {ok, _Rule} ->
  306. {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
  307. not_found ->
  308. case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
  309. {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
  310. [Rule] = get_one_rule(AllRules, Id),
  311. {201, format_rule_resp(Rule)};
  312. {error, Reason} ->
  313. ?SLOG(error, #{
  314. msg => "create_rule_failed",
  315. id => Id,
  316. reason => Reason
  317. }),
  318. {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
  319. end
  320. end
  321. end.
  322. '/rule_test'(post, #{body := Params}) ->
  323. ?CHECK_PARAMS(
  324. Params,
  325. rule_test,
  326. case emqx_rule_sqltester:test(CheckedParams) of
  327. {ok, Result} ->
  328. {200, Result};
  329. {error, {parse_error, Reason}} ->
  330. {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
  331. {error, nomatch} ->
  332. {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}};
  333. {error, Reason} ->
  334. {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
  335. end
  336. ).
  337. '/rules/:id'(get, #{bindings := #{id := Id}}) ->
  338. case emqx_rule_engine:get_rule(Id) of
  339. {ok, Rule} ->
  340. {200, format_rule_resp(Rule)};
  341. not_found ->
  342. {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
  343. end;
  344. '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
  345. Params = filter_out_request_body(Params0),
  346. ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
  347. case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
  348. {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
  349. [Rule] = get_one_rule(AllRules, Id),
  350. {200, format_rule_resp(Rule)};
  351. {error, Reason} ->
  352. ?SLOG(error, #{
  353. msg => "update_rule_failed",
  354. id => Id,
  355. reason => Reason
  356. }),
  357. {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
  358. end;
  359. '/rules/:id'(delete, #{bindings := #{id := Id}}) ->
  360. ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
  361. case emqx_conf:remove(ConfPath, #{override_to => cluster}) of
  362. {ok, _} ->
  363. {204};
  364. {error, Reason} ->
  365. ?SLOG(error, #{
  366. msg => "delete_rule_failed",
  367. id => Id,
  368. reason => Reason
  369. }),
  370. {500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}}
  371. end.
  372. '/rules/:id/metrics'(get, #{bindings := #{id := Id}}) ->
  373. case emqx_rule_engine:get_rule(Id) of
  374. {ok, _Rule} ->
  375. NodeMetrics = get_rule_metrics(Id),
  376. MetricsResp =
  377. #{
  378. id => Id,
  379. metrics => aggregate_metrics(NodeMetrics),
  380. node_metrics => NodeMetrics
  381. },
  382. {200, MetricsResp};
  383. not_found ->
  384. {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
  385. end.
  386. '/rules/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
  387. case emqx_rule_engine:get_rule(Id) of
  388. {ok, _Rule} ->
  389. ok = emqx_rule_engine_proto_v1:reset_metrics(Id),
  390. {204};
  391. not_found ->
  392. {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
  393. end.
  394. %%------------------------------------------------------------------------------
  395. %% Internal functions
  396. %%------------------------------------------------------------------------------
  397. err_msg({RuleError, {_E, Reason, _S}}) ->
  398. emqx_misc:readable_error_msg(encode_nested_error(RuleError, Reason));
  399. err_msg({Reason, _Details}) ->
  400. emqx_misc:readable_error_msg(Reason);
  401. err_msg(Msg) ->
  402. emqx_misc:readable_error_msg(Msg).
  403. encode_nested_error(RuleError, Reason) when is_tuple(Reason) ->
  404. encode_nested_error(RuleError, element(1, Reason));
  405. encode_nested_error(RuleError, Reason) ->
  406. case emqx_json:safe_encode([{RuleError, Reason}]) of
  407. {ok, Json} ->
  408. Json;
  409. _ ->
  410. {RuleError, Reason}
  411. end.
  412. format_rule_resp(Rules) when is_list(Rules) ->
  413. [format_rule_resp(R) || R <- Rules];
  414. format_rule_resp({Id, Rule}) ->
  415. format_rule_resp(Rule#{id => Id});
  416. format_rule_resp(#{
  417. id := Id,
  418. name := Name,
  419. created_at := CreatedAt,
  420. from := Topics,
  421. actions := Action,
  422. sql := SQL,
  423. enable := Enable,
  424. description := Descr
  425. }) ->
  426. #{
  427. id => Id,
  428. name => Name,
  429. from => Topics,
  430. actions => format_action(Action),
  431. sql => SQL,
  432. enable => Enable,
  433. created_at => format_datetime(CreatedAt, millisecond),
  434. description => Descr
  435. }.
  436. format_datetime(Timestamp, Unit) ->
  437. list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
  438. format_action(Actions) ->
  439. [do_format_action(Act) || Act <- Actions].
  440. do_format_action(#{mod := Mod, func := Func, args := Args}) ->
  441. #{
  442. function => printable_function_name(Mod, Func),
  443. args => maps:remove(preprocessed_tmpl, Args)
  444. };
  445. do_format_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
  446. BridgeChannelId.
  447. printable_function_name(emqx_rule_actions, Func) ->
  448. Func;
  449. printable_function_name(Mod, Func) ->
  450. list_to_binary(lists:concat([Mod, ":", Func])).
  451. get_rule_metrics(Id) ->
  452. Format = fun(
  453. Node,
  454. #{
  455. counters :=
  456. #{
  457. 'matched' := Matched,
  458. 'passed' := Passed,
  459. 'failed' := Failed,
  460. 'failed.exception' := FailedEx,
  461. 'failed.no_result' := FailedNoRes,
  462. 'actions.total' := OTotal,
  463. 'actions.failed' := OFailed,
  464. 'actions.failed.out_of_service' := OFailedOOS,
  465. 'actions.failed.unknown' := OFailedUnknown,
  466. 'actions.success' := OFailedSucc
  467. },
  468. rate :=
  469. #{
  470. 'matched' :=
  471. #{current := Current, max := Max, last5m := Last5M}
  472. }
  473. }
  474. ) ->
  475. #{
  476. metrics => ?METRICS(
  477. Matched,
  478. Passed,
  479. Failed,
  480. FailedEx,
  481. FailedNoRes,
  482. OTotal,
  483. OFailed,
  484. OFailedOOS,
  485. OFailedUnknown,
  486. OFailedSucc,
  487. Current,
  488. Max,
  489. Last5M
  490. ),
  491. node => Node
  492. }
  493. end,
  494. [
  495. Format(Node, emqx_plugin_libs_proto_v1:get_metrics(Node, rule_metrics, Id))
  496. || Node <- mria:running_nodes()
  497. ].
  498. aggregate_metrics(AllMetrics) ->
  499. InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
  500. lists:foldl(
  501. fun(
  502. #{
  503. metrics := ?metrics(
  504. Match1,
  505. Passed1,
  506. Failed1,
  507. FailedEx1,
  508. FailedNoRes1,
  509. OTotal1,
  510. OFailed1,
  511. OFailedOOS1,
  512. OFailedUnknown1,
  513. OFailedSucc1,
  514. Rate1,
  515. RateMax1,
  516. Rate5m1
  517. )
  518. },
  519. ?metrics(
  520. Match0,
  521. Passed0,
  522. Failed0,
  523. FailedEx0,
  524. FailedNoRes0,
  525. OTotal0,
  526. OFailed0,
  527. OFailedOOS0,
  528. OFailedUnknown0,
  529. OFailedSucc0,
  530. Rate0,
  531. RateMax0,
  532. Rate5m0
  533. )
  534. ) ->
  535. ?METRICS(
  536. Match1 + Match0,
  537. Passed1 + Passed0,
  538. Failed1 + Failed0,
  539. FailedEx1 + FailedEx0,
  540. FailedNoRes1 + FailedNoRes0,
  541. OTotal1 + OTotal0,
  542. OFailed1 + OFailed0,
  543. OFailedOOS1 + OFailedOOS0,
  544. OFailedUnknown1 + OFailedUnknown0,
  545. OFailedSucc1 + OFailedSucc0,
  546. Rate1 + Rate0,
  547. RateMax1 + RateMax0,
  548. Rate5m1 + Rate5m0
  549. )
  550. end,
  551. InitMetrics,
  552. AllMetrics
  553. ).
  554. get_one_rule(AllRules, Id) ->
  555. [R || R = #{id := Id0} <- AllRules, Id0 == Id].
  556. add_metadata(Params) ->
  557. Params#{
  558. <<"metadata">> => #{
  559. <<"created_at">> => emqx_rule_engine:now_ms()
  560. }
  561. }.
  562. filter_out_request_body(Conf) ->
  563. ExtraConfs = [
  564. <<"id">>,
  565. <<"status">>,
  566. <<"node_status">>,
  567. <<"node_metrics">>,
  568. <<"metrics">>,
  569. <<"node">>
  570. ],
  571. maps:without(ExtraConfs, Conf).
  572. -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
  573. qs2ms(_Tab, {Qs, Fuzzy}) ->
  574. case lists:keytake(from, 1, Qs) of
  575. false ->
  576. #{match_spec => generate_match_spec(Qs), fuzzy_fun => fuzzy_match_fun(Fuzzy)};
  577. {value, {from, '=:=', From}, Ls} ->
  578. #{
  579. match_spec => generate_match_spec(Ls),
  580. fuzzy_fun => fuzzy_match_fun([{from, '=:=', From} | Fuzzy])
  581. }
  582. end.
  583. generate_match_spec(Qs) ->
  584. {MtchHead, Conds} = generate_match_spec(Qs, 2, {#{}, []}),
  585. [{{'_', MtchHead}, Conds, ['$_']}].
  586. generate_match_spec([], _, {MtchHead, Conds}) ->
  587. {MtchHead, lists:reverse(Conds)};
  588. generate_match_spec([Qs | Rest], N, {MtchHead, Conds}) ->
  589. Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
  590. NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
  591. NConds = put_conds(Qs, Holder, Conds),
  592. generate_match_spec(Rest, N + 1, {NMtchHead, NConds}).
  593. put_conds({_, Op, V}, Holder, Conds) ->
  594. [{Op, Holder, V} | Conds].
  595. ms(enable, X) ->
  596. #{enable => X}.
  597. fuzzy_match_fun([]) ->
  598. undefined;
  599. fuzzy_match_fun(Fuzzy) ->
  600. {fun ?MODULE:run_fuzzy_match/2, [Fuzzy]}.
  601. run_fuzzy_match(_, []) ->
  602. true;
  603. run_fuzzy_match(E = {Id, _}, [{id, like, Pattern} | Fuzzy]) ->
  604. binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
  605. run_fuzzy_match(E = {_Id, #{description := Desc}}, [{description, like, Pattern} | Fuzzy]) ->
  606. binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
  607. run_fuzzy_match(E = {_, #{from := Topics}}, [{from, '=:=', Pattern} | Fuzzy]) ->
  608. lists:member(Pattern, Topics) /= false andalso run_fuzzy_match(E, Fuzzy);
  609. run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, match, Pattern} | Fuzzy]) ->
  610. lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics) andalso
  611. run_fuzzy_match(E, Fuzzy);
  612. run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, like, Pattern} | Fuzzy]) ->
  613. lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics) andalso
  614. run_fuzzy_match(E, Fuzzy);
  615. run_fuzzy_match(E, [_ | Fuzzy]) ->
  616. run_fuzzy_match(E, Fuzzy).