Просмотр исходного кода

fix(ruleeng): parse bridge id on create / update

Instead of parsing it every time in the `send_message/2` hot path.
Andrew Mayorov 2 лет назад
Родитель
Сommit
29fe201676

+ 9 - 3
apps/emqx_bridge/src/emqx_bridge.erl

@@ -42,7 +42,10 @@
     list/0
 ]).
 
--export([send_message/2]).
+-export([
+    send_message/2,
+    send_message/4
+]).
 
 -export([config_key_path/0]).
 
@@ -199,14 +202,17 @@ send_to_matched_egress_bridges(Topic, Msg) ->
 send_message(BridgeId, Message) ->
     {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
     ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
+    send_message(BridgeType, BridgeName, ResId, Message).
+
+send_message(BridgeType, BridgeName, ResId, Message) ->
     case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
         not_found ->
-            {error, {bridge_not_found, BridgeId}};
+            {error, bridge_not_found};
         #{enable := true} = Config ->
             QueryOpts = query_opts(Config),
             emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
         #{enable := false} ->
-            {error, {bridge_stopped, BridgeId}}
+            {error, bridge_stopped}
     end.
 
 query_opts(Config) ->

+ 4 - 1
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -112,7 +112,10 @@ validate_name(Name0, Opts) ->
             case lists:all(fun is_id_char/1, Name) of
                 true ->
                     case maps:get(atom_name, Opts, true) of
-                        true -> list_to_existing_atom(Name);
+                        % NOTE
+                        % Rule may be created before bridge, thus not `list_to_existing_atom/1`,
+                        % also it is infrequent user input anyway.
+                        true -> list_to_atom(Name);
                         false -> Name0
                     end;
                 false ->

+ 7 - 3
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -146,21 +146,25 @@ get_action_mod_func(ActionFunc) when is_binary(ActionFunc) ->
         try binary_to_existing_atom(Bin) of
             Atom -> Atom
         catch
-            error:badarg -> error({unknown_action_function, ActionFunc})
+            error:badarg -> validation_error(unknown_action_function)
         end
     end,
     case string:split(ActionFunc, ":", all) of
         [Func1] -> {emqx_rule_actions, ToAtom(Func1)};
         [Mod1, Func1] -> {ToAtom(Mod1), ToAtom(Func1)};
-        _ -> error({invalid_action_function, ActionFunc})
+        _ -> validation_error(invalid_action_function)
     end.
 
 assert_function_supported(Mod, Func) ->
     case erlang:function_exported(Mod, Func, 3) of
         true -> ok;
-        false -> error({action_function_not_supported, Func})
+        false -> validation_error(action_function_not_supported)
     end.
 
+-spec validation_error(any()) -> no_return().
+validation_error(Reason) ->
+    throw(#{kind => validation_error, reason => Reason}).
+
 pre_process_args(Mod, Func, Args) ->
     case erlang:function_exported(Mod, pre_process_action_args, 2) of
         true -> Mod:pre_process_action_args(Func, Args);

+ 34 - 25
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -36,7 +36,6 @@
 
 -export([
     create_rule/1,
-    insert_rule/1,
     update_rule/1,
     delete_rule/1,
     get_rule/1
@@ -116,25 +115,30 @@ start_link() ->
 post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
     #{added := Added, removed := Removed, changed := Updated} =
         emqx_utils_maps:diff_maps(NewRules, OldRules),
-    maps_foreach(
-        fun({Id, {_Old, New}}) ->
-            {ok, _} = update_rule(New#{id => bin(Id)})
-        end,
-        Updated
-    ),
-    maps_foreach(
-        fun({Id, _Rule}) ->
-            ok = delete_rule(bin(Id))
-        end,
-        Removed
-    ),
-    maps_foreach(
-        fun({Id, Rule}) ->
-            {ok, _} = create_rule(Rule#{id => bin(Id)})
-        end,
-        Added
-    ),
-    {ok, get_rules()}.
+    try
+        maps_foreach(
+            fun({Id, {_Old, New}}) ->
+                {ok, _} = update_rule(New#{id => bin(Id)})
+            end,
+            Updated
+        ),
+        maps_foreach(
+            fun({Id, _Rule}) ->
+                ok = delete_rule(bin(Id))
+            end,
+            Removed
+        ),
+        maps_foreach(
+            fun({Id, Rule}) ->
+                {ok, _} = create_rule(Rule#{id => bin(Id)})
+            end,
+            Added
+        ),
+        {ok, get_rules()}
+    catch
+        throw:#{kind := _} = Error ->
+            {error, Error}
+    end.
 
 %%------------------------------------------------------------------------------
 %% APIs for rules
@@ -154,7 +158,7 @@ load_rules() ->
 
 -spec create_rule(map()) -> {ok, rule()} | {error, term()}.
 create_rule(Params) ->
-    create_rule(Params, now_ms()).
+    create_rule(Params, maps:get(created_at, Params, now_ms())).
 
 create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) ->
     case get_rule(RuleId) of
@@ -451,8 +455,9 @@ parse_actions(Actions) ->
 
 do_parse_action(Action) when is_map(Action) ->
     emqx_rule_actions:parse_action(Action);
-do_parse_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
-    BridgeChannelId.
+do_parse_action(BridgeId) when is_binary(BridgeId) ->
+    {Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
+    {bridge, Type, Name, emqx_bridge_resource:resource_id(Type, Name)}.
 
 get_all_records(Tab) ->
     [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].
@@ -484,7 +489,8 @@ contains_actions(Actions, Mod0, Func0) ->
     ).
 
 forwards_to_bridge(Actions, BridgeId) ->
-    lists:any(fun(A) -> A =:= BridgeId end, Actions).
+    Action = do_parse_action(BridgeId),
+    lists:any(fun(A) -> A =:= Action end, Actions).
 
 references_ingress_bridge(Froms, BridgeId) ->
     lists:member(
@@ -506,4 +512,7 @@ get_referenced_hookpoints(Froms) ->
     ].
 
 get_egress_bridges(Actions) ->
-    lists:filter(fun is_binary/1, Actions).
+    [
+        emqx_bridge_resource:bridge_id(BridgeType, BridgeName)
+     || {bridge, BridgeType, BridgeName, _ResId} <- Actions
+    ].

+ 3 - 3
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -515,13 +515,13 @@ format_datetime(Timestamp, Unit) ->
 format_action(Actions) ->
     [do_format_action(Act) || Act <- Actions].
 
+do_format_action({bridge, BridgeType, BridgeName, _ResId}) ->
+    emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
 do_format_action(#{mod := Mod, func := Func, args := Args}) ->
     #{
         function => printable_function_name(Mod, Func),
         args => maps:remove(preprocessed_tmpl, Args)
-    };
-do_format_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
-    BridgeChannelId.
+    }.
 
 printable_function_name(emqx_rule_actions, Func) ->
     Func;

+ 8 - 7
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -17,7 +17,6 @@
 -module(emqx_rule_runtime).
 
 -include("rule_engine.hrl").
--include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_resource/include/emqx_resource_errors.hrl").
 
@@ -50,8 +49,6 @@
     iolist_to_binary(io_lib:format("_v_~ts_~p_~p", [TYPE, NAME, erlang:system_time()]))
 ).
 
--define(ActionMaxRetry, 3).
-
 %%------------------------------------------------------------------------------
 %% Apply rules
 %%------------------------------------------------------------------------------
@@ -348,10 +345,14 @@ handle_action(RuleId, ActId, Selected, Envs) ->
             })
     end.
 
-do_handle_action(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
-    ?TRACE("BRIDGE", "bridge_action", #{bridge_id => BridgeId}),
-    case emqx_bridge:send_message(BridgeId, Selected) of
-        {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped ->
+do_handle_action({bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
+    ?TRACE(
+        "BRIDGE",
+        "bridge_action",
+        #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
+    ),
+    case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected) of
+        {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
             throw(out_of_service);
         Result ->
             Result

+ 44 - 68
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -250,14 +250,14 @@ t_kv_store(_) ->
 
 t_add_get_remove_rule(_Config) ->
     RuleId0 = <<"rule-debug-0">>,
-    ok = emqx_rule_engine:insert_rule(make_simple_rule(RuleId0)),
+    ok = create_rule(make_simple_rule(RuleId0)),
     ?assertMatch({ok, #{id := RuleId0}}, emqx_rule_engine:get_rule(RuleId0)),
     ok = delete_rule(RuleId0),
     ?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId0)),
 
     RuleId1 = <<"rule-debug-1">>,
     Rule1 = make_simple_rule(RuleId1),
-    ok = emqx_rule_engine:insert_rule(Rule1),
+    ok = create_rule(Rule1),
     ?assertMatch({ok, #{id := RuleId1}}, emqx_rule_engine:get_rule(RuleId1)),
     ok = delete_rule(Rule1),
     ?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId1)),
@@ -265,7 +265,7 @@ t_add_get_remove_rule(_Config) ->
 
 t_add_get_remove_rules(_Config) ->
     delete_rules_by_ids([Id || #{id := Id} <- emqx_rule_engine:get_rules()]),
-    ok = insert_rules(
+    ok = create_rules(
         [
             make_simple_rule(<<"rule-debug-1">>),
             make_simple_rule(<<"rule-debug-2">>)
@@ -294,7 +294,7 @@ t_create_existing_rule(_Config) ->
 
 t_get_rules_for_topic(_Config) ->
     Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>)),
-    ok = insert_rules(
+    ok = create_rules(
         [
             make_simple_rule(<<"rule-debug-1">>),
             make_simple_rule(<<"rule-debug-2">>)
@@ -305,12 +305,12 @@ t_get_rules_for_topic(_Config) ->
     ok.
 
 t_get_rules_ordered_by_ts(_Config) ->
-    Now = fun() -> erlang:system_time(nanosecond) end,
-    ok = insert_rules(
+    Now = erlang:system_time(microsecond),
+    ok = create_rules(
         [
-            make_simple_rule_with_ts(<<"rule-debug-0">>, Now()),
-            make_simple_rule_with_ts(<<"rule-debug-1">>, Now()),
-            make_simple_rule_with_ts(<<"rule-debug-2">>, Now())
+            make_simple_rule_with_ts(<<"rule-debug-0">>, Now + 1),
+            make_simple_rule_with_ts(<<"rule-debug-1">>, Now + 2),
+            make_simple_rule_with_ts(<<"rule-debug-2">>, Now + 3)
         ]
     ),
     ?assertMatch(
@@ -324,23 +324,19 @@ t_get_rules_ordered_by_ts(_Config) ->
 
 t_get_rules_for_topic_2(_Config) ->
     Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>)),
-    ok = insert_rules(
+    ok = create_rules(
         [
-            make_simple_rule(<<"rule-debug-1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
-            make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]),
-            make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>, [
-                <<"simple/+/1">>
-            ]),
-            make_simple_rule(<<"rule-debug-4">>, <<"select * from \"simple/1\"">>, [<<"simple/1">>]),
+            make_simple_rule(<<"rule-debug-1">>, _1 = <<"select * from \"simple/#\"">>),
+            make_simple_rule(<<"rule-debug-2">>, _2 = <<"select * from \"simple/+\"">>),
+            make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>),
+            make_simple_rule(<<"rule-debug-4">>, _3 = <<"select * from \"simple/1\"">>),
             make_simple_rule(
                 <<"rule-debug-5">>,
-                <<"select * from \"simple/2,simple/+,simple/3\"">>,
-                [<<"simple/2">>, <<"simple/+">>, <<"simple/3">>]
+                _4 = <<"select * from \"simple/2\", \"simple/+\", \"simple/3\"">>
             ),
             make_simple_rule(
                 <<"rule-debug-6">>,
-                <<"select * from \"simple/2,simple/3,simple/4\"">>,
-                [<<"simple/2">>, <<"simple/3">>, <<"simple/4">>]
+                <<"select * from \"simple/2\", \"simple/3\", \"simple/4\"">>
             )
         ]
     ),
@@ -367,52 +363,44 @@ t_get_rules_with_same_event(_Config) ->
     ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>)),
     ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>)),
     ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>)),
-    ok = insert_rules(
+    ok = create_rules(
         [
-            make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
-            make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]),
+            make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>),
+            make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>),
             make_simple_rule(
                 <<"r3">>,
-                <<"select * from \"$events/client_connected\"">>,
-                [<<"$events/client_connected">>]
+                <<"select * from \"$events/client_connected\"">>
             ),
             make_simple_rule(
                 <<"r4">>,
-                <<"select * from \"$events/client_disconnected\"">>,
-                [<<"$events/client_disconnected">>]
+                <<"select * from \"$events/client_disconnected\"">>
             ),
             make_simple_rule(
                 <<"r5">>,
-                <<"select * from \"$events/session_subscribed\"">>,
-                [<<"$events/session_subscribed">>]
+                <<"select * from \"$events/session_subscribed\"">>
             ),
             make_simple_rule(
                 <<"r6">>,
-                <<"select * from \"$events/session_unsubscribed\"">>,
-                [<<"$events/session_unsubscribed">>]
+                <<"select * from \"$events/session_unsubscribed\"">>
             ),
             make_simple_rule(
                 <<"r7">>,
-                <<"select * from \"$events/message_delivered\"">>,
-                [<<"$events/message_delivered">>]
+                <<"select * from \"$events/message_delivered\"">>
             ),
             make_simple_rule(
                 <<"r8">>,
-                <<"select * from \"$events/message_acked\"">>,
-                [<<"$events/message_acked">>]
+                <<"select * from \"$events/message_acked\"">>
             ),
             make_simple_rule(
                 <<"r9">>,
-                <<"select * from \"$events/message_dropped\"">>,
-                [<<"$events/message_dropped">>]
+                <<"select * from \"$events/message_dropped\"">>
             ),
             make_simple_rule(
                 <<"r10">>,
                 <<
-                    "select * from \"t/1, "
-                    "$events/session_subscribed, $events/client_connected\""
-                >>,
-                [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>]
+                    "select * from \"t/1\", "
+                    "\"$events/session_subscribed\", \"$events/client_connected\""
+                >>
             )
         ]
     ),
@@ -455,23 +443,18 @@ t_get_rules_with_same_event(_Config) ->
 t_get_rule_ids_by_action(_) ->
     ID = <<"t_get_rule_ids_by_action">>,
     Rule1 = #{
-        enable => false,
         id => ID,
         sql => <<"SELECT * FROM \"t\"">>,
-        from => [<<"t">>],
-        fields => [<<"*">>],
-        is_foreach => false,
-        conditions => {},
         actions => [
-            #{mod => emqx_rule_actions, func => console, args => #{}},
-            #{mod => emqx_rule_actions, func => republish, args => #{}},
+            #{function => console, args => #{}},
+            #{function => republish, args => #{}},
             <<"mqtt:my_mqtt_bridge">>,
             <<"mysql:foo">>
         ],
         description => ID,
         created_at => erlang:system_time(millisecond)
     },
-    ok = insert_rules([Rule1]),
+    ok = create_rules([Rule1]),
     ?assertMatch(
         [ID],
         emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:console">>})
@@ -2834,26 +2817,20 @@ republish_action(Topic, Payload, UserProperties) ->
 
 make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
     SQL = <<"select * from \"simple/topic\"">>,
-    Topics = [<<"simple/topic">>],
-    make_simple_rule(RuleId, SQL, Topics, Ts).
+    make_simple_rule(RuleId, SQL, Ts).
 
 make_simple_rule(RuleId) when is_binary(RuleId) ->
     SQL = <<"select * from \"simple/topic\"">>,
-    Topics = [<<"simple/topic">>],
-    make_simple_rule(RuleId, SQL, Topics).
+    make_simple_rule(RuleId, SQL).
 
-make_simple_rule(RuleId, SQL, Topics) when is_binary(RuleId) ->
-    make_simple_rule(RuleId, SQL, Topics, erlang:system_time(millisecond)).
+make_simple_rule(RuleId, SQL) when is_binary(RuleId) ->
+    make_simple_rule(RuleId, SQL, erlang:system_time(millisecond)).
 
-make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) ->
+make_simple_rule(RuleId, SQL, Ts) when is_binary(RuleId) ->
     #{
         id => RuleId,
         sql => SQL,
-        from => Topics,
-        fields => [<<"*">>],
-        is_foreach => false,
-        conditions => {},
-        actions => [#{mod => emqx_rule_actions, func => console, args => #{}}],
+        actions => [#{function => console, args => #{}}],
         description => <<"simple rule">>,
         created_at => Ts
     }.
@@ -3233,13 +3210,12 @@ deps_path(App, RelativePath) ->
 local_path(RelativePath) ->
     deps_path(emqx_rule_engine, RelativePath).
 
-insert_rules(Rules) ->
-    lists:foreach(
-        fun(Rule) ->
-            ok = emqx_rule_engine:insert_rule(Rule)
-        end,
-        Rules
-    ).
+create_rules(Rules) ->
+    lists:foreach(fun create_rule/1, Rules).
+
+create_rule(Rule) ->
+    {ok, _} = emqx_rule_engine:create_rule(Rule),
+    ok.
 
 delete_rules_by_ids(Ids) ->
     lists:foreach(