emqx_rule_engine.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine).
  17. -behaviour(gen_server).
  18. -behaviour(emqx_config_handler).
  19. -behaiour(emqx_config_backup).
  20. -include("rule_engine.hrl").
  21. -include_lib("emqx/include/logger.hrl").
  22. -include_lib("stdlib/include/qlc.hrl").
  23. -export([start_link/0]).
  24. -export([
  25. post_config_update/5
  26. ]).
  27. %% Rule Management
  28. -export([load_rules/0]).
  29. -export([
  30. create_rule/1,
  31. update_rule/1,
  32. delete_rule/1,
  33. get_rule/1
  34. ]).
  35. -export([
  36. get_rules/0,
  37. get_rules_for_topic/1,
  38. get_rules_with_same_event/1,
  39. get_rule_ids_by_action/1,
  40. ensure_action_removed/2,
  41. get_rules_ordered_by_ts/0
  42. ]).
  43. %% exported for cluster_call
  44. -export([
  45. do_delete_rule/1,
  46. do_insert_rule/1
  47. ]).
  48. -export([
  49. load_hooks_for_rule/1,
  50. unload_hooks_for_rule/1,
  51. maybe_add_metrics_for_rule/1,
  52. clear_metrics_for_rule/1,
  53. reset_metrics_for_rule/1
  54. ]).
  55. %% exported for `emqx_telemetry'
  56. -export([get_basic_usage_info/0]).
  57. -export([now_ms/0]).
  58. %% gen_server Callbacks
  59. -export([
  60. init/1,
  61. handle_call/3,
  62. handle_cast/2,
  63. handle_info/2,
  64. terminate/2,
  65. code_change/3
  66. ]).
  67. %% Data backup
  68. -export([
  69. import_config/1
  70. ]).
  71. -define(RULE_ENGINE, ?MODULE).
  72. -define(T_CALL, infinity).
  73. %% NOTE: This order cannot be changed! This is to make the metric working during relup.
  74. %% Append elements to this list to add new metrics.
  75. -define(METRICS, [
  76. 'matched',
  77. 'passed',
  78. 'failed',
  79. 'failed.exception',
  80. 'failed.no_result',
  81. 'actions.total',
  82. 'actions.success',
  83. 'actions.failed',
  84. 'actions.failed.out_of_service',
  85. 'actions.failed.unknown'
  86. ]).
  87. -define(RATE_METRICS, ['matched']).
  88. -type action_name() :: binary() | #{function := binary()}.
  89. -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
  90. start_link() ->
  91. gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
  92. %%----------------------------------------------------------------------------------------
  93. %% The config handler for emqx_rule_engine
  94. %%------------------------------------------------------------------------------
  95. post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
  96. create_rule(NewRule#{id => bin(RuleId)});
  97. post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) ->
  98. delete_rule(bin(RuleId));
  99. post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) ->
  100. update_rule(NewRule#{id => bin(RuleId)});
  101. post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) ->
  102. #{added := Added, removed := Removed, changed := Updated} =
  103. emqx_utils_maps:diff_maps(NewRules, OldRules),
  104. try
  105. maps_foreach(
  106. fun({Id, {_Old, New}}) ->
  107. {ok, _} = update_rule(New#{id => bin(Id)})
  108. end,
  109. Updated
  110. ),
  111. maps_foreach(
  112. fun({Id, _Rule}) ->
  113. ok = delete_rule(bin(Id))
  114. end,
  115. Removed
  116. ),
  117. maps_foreach(
  118. fun({Id, Rule}) ->
  119. {ok, _} = create_rule(Rule#{id => bin(Id)})
  120. end,
  121. Added
  122. ),
  123. ok
  124. catch
  125. throw:#{kind := _} = Error ->
  126. {error, Error}
  127. end.
  128. %%----------------------------------------------------------------------------------------
  129. %% APIs for rules
  130. %%----------------------------------------------------------------------------------------
  131. -spec load_rules() -> ok.
  132. load_rules() ->
  133. maps_foreach(
  134. fun
  135. ({Id, #{metadata := #{created_at := CreatedAt}} = Rule}) ->
  136. create_rule(Rule#{id => bin(Id)}, CreatedAt);
  137. ({Id, Rule}) ->
  138. create_rule(Rule#{id => bin(Id)})
  139. end,
  140. emqx:get_config([rule_engine, rules], #{})
  141. ).
  142. -spec create_rule(map()) -> {ok, rule()} | {error, term()}.
  143. create_rule(Params) ->
  144. create_rule(Params, maps:get(created_at, Params, now_ms())).
  145. create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) ->
  146. case get_rule(RuleId) of
  147. not_found -> parse_and_insert(Params, CreatedAt);
  148. {ok, _} -> {error, already_exists}
  149. end.
  150. -spec update_rule(map()) -> {ok, rule()} | {error, term()}.
  151. update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
  152. case get_rule(RuleId) of
  153. not_found ->
  154. {error, not_found};
  155. {ok, #{created_at := CreatedAt}} ->
  156. parse_and_insert(Params, CreatedAt)
  157. end.
  158. -spec delete_rule(RuleId :: rule_id()) -> ok.
  159. delete_rule(RuleId) when is_binary(RuleId) ->
  160. gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL).
  161. -spec insert_rule(Rule :: rule()) -> ok.
  162. insert_rule(Rule) ->
  163. gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL).
  164. %%----------------------------------------------------------------------------------------
  165. %% Rule Management
  166. %%----------------------------------------------------------------------------------------
  167. -spec get_rules() -> [rule()].
  168. get_rules() ->
  169. get_all_records(?RULE_TAB).
  170. get_rules_ordered_by_ts() ->
  171. lists:sort(
  172. fun(#{created_at := CreatedA}, #{created_at := CreatedB}) ->
  173. CreatedA =< CreatedB
  174. end,
  175. get_rules()
  176. ).
  177. -spec get_rules_for_topic(Topic :: binary()) -> [rule()].
  178. get_rules_for_topic(Topic) ->
  179. [
  180. Rule
  181. || Rule = #{from := From} <- get_rules(),
  182. emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)
  183. ].
  184. -spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
  185. get_rules_with_same_event(Topic) ->
  186. EventName = emqx_rule_events:event_name(Topic),
  187. [
  188. Rule
  189. || Rule = #{from := From} <- get_rules(),
  190. lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)
  191. ].
  192. -spec get_rule_ids_by_action(action_name()) -> [rule_id()].
  193. get_rule_ids_by_action(BridgeId) when is_binary(BridgeId) ->
  194. [
  195. Id
  196. || #{actions := Acts, id := Id, from := Froms} <- get_rules(),
  197. forwards_to_bridge(Acts, BridgeId) orelse
  198. references_ingress_bridge(Froms, BridgeId)
  199. ];
  200. get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
  201. {Mod, Fun} =
  202. case string:split(FuncName, ":", leading) of
  203. [M, F] -> {binary_to_module(M), F};
  204. [F] -> {emqx_rule_actions, F}
  205. end,
  206. [
  207. Id
  208. || #{actions := Acts, id := Id} <- get_rules(),
  209. contains_actions(Acts, Mod, Fun)
  210. ].
  211. -spec ensure_action_removed(rule_id(), action_name()) -> ok.
  212. ensure_action_removed(RuleId, ActionName) ->
  213. FilterFunc =
  214. fun
  215. (Func, Func) -> false;
  216. (#{<<"function">> := Func}, #{function := Func}) -> false;
  217. (_, _) -> true
  218. end,
  219. case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of
  220. not_found ->
  221. ok;
  222. #{<<"actions">> := Acts} = Conf ->
  223. NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)],
  224. {ok, _} = emqx_conf:update(
  225. ?RULE_PATH(RuleId),
  226. Conf#{<<"actions">> => NewActs},
  227. #{override_to => cluster}
  228. ),
  229. ok
  230. end.
  231. is_of_event_name(EventName, Topic) ->
  232. EventName =:= emqx_rule_events:event_name(Topic).
  233. -spec get_rule(Id :: rule_id()) -> {ok, rule()} | not_found.
  234. get_rule(Id) ->
  235. case ets:lookup(?RULE_TAB, Id) of
  236. [{Id, Rule}] -> {ok, Rule#{id => Id}};
  237. [] -> not_found
  238. end.
  239. load_hooks_for_rule(#{from := Topics}) ->
  240. lists:foreach(fun emqx_rule_events:load/1, Topics).
  241. maybe_add_metrics_for_rule(Id) ->
  242. case emqx_metrics_worker:has_metrics(rule_metrics, Id) of
  243. true ->
  244. ok = reset_metrics_for_rule(Id);
  245. false ->
  246. ok = emqx_metrics_worker:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS)
  247. end.
  248. clear_metrics_for_rule(Id) ->
  249. ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id).
  250. -spec reset_metrics_for_rule(rule_id()) -> ok.
  251. reset_metrics_for_rule(Id) ->
  252. emqx_metrics_worker:reset_metrics(rule_metrics, Id).
  253. unload_hooks_for_rule(#{id := Id, from := Topics}) ->
  254. lists:foreach(
  255. fun(Topic) ->
  256. case get_rules_with_same_event(Topic) of
  257. %% we are now deleting the last rule
  258. [#{id := Id0}] when Id0 == Id ->
  259. emqx_rule_events:unload(Topic);
  260. _ ->
  261. ok
  262. end
  263. end,
  264. Topics
  265. ).
  266. %%----------------------------------------------------------------------------------------
  267. %% Telemetry helper functions
  268. %%----------------------------------------------------------------------------------------
  269. -spec get_basic_usage_info() ->
  270. #{
  271. num_rules => non_neg_integer(),
  272. referenced_bridges =>
  273. #{BridgeType => non_neg_integer()}
  274. }
  275. when
  276. BridgeType :: atom().
  277. get_basic_usage_info() ->
  278. try
  279. Rules = get_rules(),
  280. EnabledRules =
  281. lists:filter(
  282. fun(#{enable := Enabled}) -> Enabled end,
  283. Rules
  284. ),
  285. NumRules = length(EnabledRules),
  286. ReferencedBridges =
  287. lists:foldl(
  288. fun(#{actions := Actions, from := Froms}, Acc) ->
  289. BridgeIDs0 = get_referenced_hookpoints(Froms),
  290. BridgeIDs1 = get_egress_bridges(Actions),
  291. tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc)
  292. end,
  293. #{},
  294. EnabledRules
  295. ),
  296. #{
  297. num_rules => NumRules,
  298. referenced_bridges => ReferencedBridges
  299. }
  300. catch
  301. _:_ ->
  302. #{
  303. num_rules => 0,
  304. referenced_bridges => #{}
  305. }
  306. end.
  307. tally_referenced_bridges(BridgeIDs, Acc0) ->
  308. lists:foldl(
  309. fun(BridgeID, Acc) ->
  310. {BridgeType, _BridgeName} = emqx_bridge_resource:parse_bridge_id(
  311. BridgeID,
  312. #{atom_name => false}
  313. ),
  314. maps:update_with(
  315. BridgeType,
  316. fun(X) -> X + 1 end,
  317. 1,
  318. Acc
  319. )
  320. end,
  321. Acc0,
  322. BridgeIDs
  323. ).
  324. %%----------------------------------------------------------------------------------------
  325. %% Data backup
  326. %%----------------------------------------------------------------------------------------
  327. import_config(#{<<"rule_engine">> := #{<<"rules">> := NewRules} = RuleEngineConf}) ->
  328. OldRules = emqx:get_raw_config(?KEY_PATH, #{}),
  329. RuleEngineConf1 = RuleEngineConf#{<<"rules">> => maps:merge(OldRules, NewRules)},
  330. case emqx_conf:update([rule_engine], RuleEngineConf1, #{override_to => cluster}) of
  331. {ok, #{raw_config := #{<<"rules">> := NewRawRules}}} ->
  332. Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawRules, OldRules)),
  333. ChangedPaths = [?RULE_PATH(Id) || Id <- maps:keys(Changed)],
  334. {ok, #{root_key => rule_engine, changed => ChangedPaths}};
  335. Error ->
  336. {error, #{root_key => rule_engine, reason => Error}}
  337. end;
  338. import_config(_RawConf) ->
  339. {ok, #{root_key => rule_engine, changed => []}}.
  340. %%----------------------------------------------------------------------------------------
  341. %% gen_server callbacks
  342. %%----------------------------------------------------------------------------------------
  343. init([]) ->
  344. _TableId = ets:new(?KV_TAB, [
  345. named_table,
  346. set,
  347. public,
  348. {write_concurrency, true},
  349. {read_concurrency, true}
  350. ]),
  351. ok = emqx_conf:add_handler(
  352. [rule_engine, jq_implementation_module],
  353. emqx_rule_engine_schema
  354. ),
  355. {ok, #{}}.
  356. handle_call({insert_rule, Rule}, _From, State) ->
  357. do_insert_rule(Rule),
  358. {reply, ok, State};
  359. handle_call({delete_rule, Rule}, _From, State) ->
  360. do_delete_rule(Rule),
  361. {reply, ok, State};
  362. handle_call(Req, _From, State) ->
  363. ?SLOG(error, #{msg => "unexpected_call", request => Req}),
  364. {reply, ignored, State}.
  365. handle_cast(Msg, State) ->
  366. ?SLOG(error, #{msg => "unexpected_cast", request => Msg}),
  367. {noreply, State}.
  368. handle_info(Info, State) ->
  369. ?SLOG(error, #{msg => "unexpected_info", request => Info}),
  370. {noreply, State}.
  371. terminate(_Reason, _State) ->
  372. ok.
  373. code_change(_OldVsn, State, _Extra) ->
  374. {ok, State}.
  375. %%----------------------------------------------------------------------------------------
  376. %% Internal Functions
  377. %%----------------------------------------------------------------------------------------
  378. parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) ->
  379. case emqx_rule_sqlparser:parse(Sql) of
  380. {ok, Select} ->
  381. Rule = #{
  382. id => RuleId,
  383. name => maps:get(name, Params, <<"">>),
  384. created_at => CreatedAt,
  385. updated_at => now_ms(),
  386. enable => maps:get(enable, Params, true),
  387. sql => Sql,
  388. actions => parse_actions(Actions),
  389. description => maps:get(description, Params, ""),
  390. %% -- calculated fields:
  391. from => emqx_rule_sqlparser:select_from(Select),
  392. is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
  393. fields => emqx_rule_sqlparser:select_fields(Select),
  394. doeach => emqx_rule_sqlparser:select_doeach(Select),
  395. incase => emqx_rule_sqlparser:select_incase(Select),
  396. conditions => emqx_rule_sqlparser:select_where(Select)
  397. %% -- calculated fields end
  398. },
  399. ok = insert_rule(Rule),
  400. {ok, Rule};
  401. {error, Reason} ->
  402. {error, Reason}
  403. end.
  404. do_insert_rule(#{id := Id} = Rule) ->
  405. ok = load_hooks_for_rule(Rule),
  406. ok = maybe_add_metrics_for_rule(Id),
  407. true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
  408. ok.
  409. do_delete_rule(RuleId) ->
  410. case get_rule(RuleId) of
  411. {ok, Rule} ->
  412. ok = unload_hooks_for_rule(Rule),
  413. ok = clear_metrics_for_rule(RuleId),
  414. true = ets:delete(?RULE_TAB, RuleId),
  415. ok;
  416. not_found ->
  417. ok
  418. end.
  419. parse_actions(Actions) ->
  420. [do_parse_action(Act) || Act <- Actions].
  421. do_parse_action(Action) when is_map(Action) ->
  422. emqx_rule_actions:parse_action(Action);
  423. do_parse_action(BridgeId) when is_binary(BridgeId) ->
  424. {Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
  425. {bridge, Type, Name, emqx_bridge_resource:resource_id(Type, Name)}.
  426. get_all_records(Tab) ->
  427. [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].
  428. maps_foreach(Fun, Map) ->
  429. lists:foreach(Fun, maps:to_list(Map)).
  430. now_ms() ->
  431. erlang:system_time(millisecond).
  432. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  433. bin(B) when is_binary(B) -> B.
  434. binary_to_module(ModName) ->
  435. try
  436. binary_to_existing_atom(ModName, utf8)
  437. catch
  438. error:badarg ->
  439. not_exist_mod
  440. end.
  441. contains_actions(Actions, Mod0, Func0) ->
  442. lists:any(
  443. fun
  444. (#{mod := Mod, func := Func}) when Mod =:= Mod0; Func =:= Func0 -> true;
  445. (_) -> false
  446. end,
  447. Actions
  448. ).
  449. forwards_to_bridge(Actions, BridgeId) ->
  450. Action = do_parse_action(BridgeId),
  451. lists:any(fun(A) -> A =:= Action end, Actions).
  452. references_ingress_bridge(Froms, BridgeId) ->
  453. lists:member(
  454. BridgeId,
  455. [
  456. RefBridgeId
  457. || From <- Froms,
  458. {ok, RefBridgeId} <-
  459. [emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
  460. ]
  461. ).
  462. get_referenced_hookpoints(Froms) ->
  463. [
  464. BridgeID
  465. || From <- Froms,
  466. {ok, BridgeID} <-
  467. [emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
  468. ].
  469. get_egress_bridges(Actions) ->
  470. [
  471. emqx_bridge_resource:bridge_id(BridgeType, BridgeName)
  472. || {bridge, BridgeType, BridgeName, _ResId} <- Actions
  473. ].