Prechádzať zdrojové kódy

feat(rules): support output functions in <<"Mod:Func">> format

Shawn 4 rokov pred
rodič
commit
0e7a3f89a9

+ 10 - 8
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -26,19 +26,21 @@
 -type mf() :: {Module::atom(), Fun::atom()}.
 
 -type hook() :: atom() | 'any'.
-
 -type topic() :: binary().
--type bridge_channel_id() :: binary().
+
 -type selected_data() :: map().
 -type envs() :: map().
--type output_target() :: bridge_channel_id() | atom() | output_fun().
+
+-type builtin_output_func() :: republish | console.
+-type builtin_output_module() :: emqx_rule_outputs.
+-type bridge_channel_id() :: binary().
 -type output_fun_args() :: map().
--type output() :: #{
-        function := output_target(),
-        args => output_fun_args()
-}.
 
--type output_fun() :: fun((selected_data(), envs(), output_fun_args()) -> any()).
+-type output() :: #{
+    mod := builtin_output_module() | module(),
+    func := builtin_output_func() | atom(),
+    args => output_fun_args()
+} | bridge_channel_id().
 
 -type rule() ::
        #{ id := rule_id()

+ 5 - 19
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -260,26 +260,12 @@ do_delete_rule(RuleId) ->
     end.
 
 parse_outputs(Outputs) ->
-    [do_parse_outputs(Out) || Out <- Outputs].
-
-do_parse_outputs(#{function := Repub, args := Args})
-        when Repub == republish; Repub == <<"republish">> ->
-    #{function => republish, args => emqx_rule_outputs:pre_process_repub_args(Args)};
-do_parse_outputs(#{function := Func} = Output) ->
-    #{function => parse_output_func(Func), args => maps:get(args, Output, #{})};
-do_parse_outputs(BridgeChannelId) when is_binary(BridgeChannelId) ->
-    BridgeChannelId.
+    [do_parse_output(Out) || Out <- Outputs].
 
-parse_output_func(FuncName) when is_atom(FuncName) ->
-    FuncName;
-parse_output_func(BinFunc) when is_binary(BinFunc) ->
-    try binary_to_existing_atom(BinFunc) of
-        Func -> emqx_rule_outputs:assert_builtin_output(Func)
-    catch
-        error:badarg -> error({unknown_builtin_function, BinFunc})
-    end;
-parse_output_func(Func) when is_function(Func) ->
-    Func.
+do_parse_output(Output) when is_map(Output) ->
+    emqx_rule_outputs:parse_output(Output);
+do_parse_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
+    BridgeChannelId.
 
 get_all_records(Tab) ->
     [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].

+ 3 - 7
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -330,13 +330,9 @@ format_datetime(Timestamp, Unit) ->
 format_output(Outputs) ->
     [do_format_output(Out) || Out <- Outputs].
 
-do_format_output(#{function := Func}) when is_function(Func) ->
-    FunInfo = erlang:fun_info(Func),
-    FunMod = proplists:get_value(module, FunInfo),
-    FunName = proplists:get_value(name, FunInfo),
-    #{function => list_to_binary(lists:concat([FunMod,":",FunName]))};
-do_format_output(#{function := Name, args := Args}) ->
-    #{function => Name, args => maps:remove(preprocessed_tmpl, Args)};
+do_format_output(#{mod := Mod, func := Func, args := Args}) ->
+    #{function => list_to_binary(lists:concat([Mod,":",Func])),
+      args => maps:remove(preprocessed_tmpl, Args)};
 do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
     BridgeChannelId.
 

+ 77 - 19
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -37,24 +37,64 @@ fields("rule_engine") ->
     ];
 
 fields("rules") ->
-    [ {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false,
-                             validator => fun ?MODULE:validate_sql/1})}
+    [ {"sql", sc(binary(),
+        #{ desc => """
+SQL query to transform the messages.<br>
+Example: <code>SELECT * FROM \"test/topic\" WHERE payload.x = 1</code><br>
+"""
+         , nullable => false
+         , validator => fun ?MODULE:validate_sql/1})}
     , {"outputs", sc(hoconsc:array(hoconsc:union(
                                    [ binary()
                                    , ref("builtin_output_republish")
                                    , ref("builtin_output_console")
                                    ])),
-          #{desc => "The outputs of the rule. An output can be a string refers to the channel Id "
-                    "of a emqx bridge, or a object refers to a built-in function.",
-            default => []})}
+        #{ desc => """
+A list of outputs of the rule.<br>
+An output can be a string that refers to the channel Id of a emqx bridge, or a object
+that refers to a function.<br>
+There a some built-in functions like \"republish\" and \"console\", and we also support user
+provided functions like \"<SomeModule>:<SomeFunction>\".<br>
+The outputs in the list is executed one by one in order.
+This means that if one of the output is executing slowly, all of the outputs comes after it will not
+be executed until it returns.<br>
+If one of the output crashed, all other outputs come after it will still be executed, in the
+original order.<br>
+If there's any error when running an output, there will be an error message, and the 'failure'
+counter of the function output or the bridge channel will increase.
+"""
+        , default => []
+        })}
     , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})}
     , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})}
     ];
 
 fields("builtin_output_republish") ->
     [ {function, sc(republish, #{desc => "Republish the message as a new MQTT message"})}
-    , {args, sc(ref("republish_args"), #{desc => "The arguments of the built-in 'republish' output",
-        default => #{}})}
+    , {args, sc(ref("republish_args"),
+        #{ desc => """
+The arguments of the built-in 'republish' output.<br>
+We can use variables in the args.<br>
+
+The variables are selected by the rule. For exmaple, if the rule SQL is defined as following:
+<code>
+    SELECT clientid, qos, payload FROM \"t/1\"
+</code>
+Then there are 3 variables available: <code>clientid</code>, <code>qos</code> and
+<code>payload</code>. And if we've set the args to:
+<code>
+    {
+        topic = \"t/${clientid}\"
+        qos = \"${qos}\"
+        payload = \"msg: ${payload}\"
+    }
+</code>
+When the rule is triggered by an MQTT message with payload = \"hello\", qos = 1,
+clientid = \"steve\", the rule will republish a new MQTT message to topic \"t/steve\",
+payload = \"msg: hello\", and qos = 1.
+"""
+         , default => #{}
+         })}
     ];
 
 fields("builtin_output_console") ->
@@ -66,21 +106,39 @@ fields("builtin_output_console") ->
 
 fields("republish_args") ->
     [ {topic, sc(binary(),
-        #{desc => "The target topic of the re-published message."
-                  " Template with with variables is allowed.",
-          nullable => false})}
+        #{ desc =>"""
+The target topic of message to be re-published.<br>
+Template with variables is allowed, see description of the 'republish_args'.
+"""
+          , nullable => false
+          })}
     , {qos, sc(binary(),
-        #{desc => "The qos of the re-published message."
-                  " Template with with variables is allowed. Defaults to ${qos}.",
-          default => <<"${qos}">> })}
+        #{ desc => """
+The qos of the message to be re-published.
+Template with with variables is allowed, see description of the 'republish_args.<br>
+Defaults to ${qos}. If variable ${qos} is not found from the selected result of the rule,
+0 is used.
+"""
+         , default => <<"${qos}">>
+         })}
     , {retain, sc(binary(),
-        #{desc => "The retain of the re-published message."
-                  " Template with with variables is allowed. Defaults to ${retain}.",
-          default => <<"${retain}">> })}
+        #{ desc => """
+The retain flag of the message to be re-published.
+Template with with variables is allowed, see description of the 'republish_args.<br>
+Defaults to ${retain}. If variable ${retain} is not found from the selected result
+of the rule, false is used.
+"""
+        , default => <<"${retain}">>
+        })}
     , {payload, sc(binary(),
-        #{desc => "The payload of the re-published message."
-                  " Template with with variables is allowed. Defaults to ${payload}.",
-          default => <<"${payload}">>})}
+        #{ desc => """
+The payload of the message to be re-published.
+Template with with variables is allowed, see description of the 'republish_args.<br>.
+Defaults to ${payload}. If variable ${payload} is not found from the selected result
+of the rule, then the string \"undefined\" is used.
+"""
+         , default => <<"${payload}">>
+         })}
     ].
 
 validate_sql(Sql) ->

+ 81 - 27
apps/emqx_rule_engine/src/emqx_rule_outputs.erl

@@ -16,28 +16,54 @@
 
 %% Define the default actions.
 -module(emqx_rule_outputs).
+
+-include("rule_engine.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx.hrl").
 
--define(OUTPUT_FUNCS,
-        [ console
-        , republish
+%% APIs
+-export([ parse_output/1
+        ]).
+
+%% callbacks of emqx_rule_output
+-export([ pre_process_output_args/2
         ]).
 
+%% output functions
 -export([ console/3
         , republish/3
         ]).
 
--export([ pre_process_repub_args/1
-        , assert_builtin_output/1
-        ]).
+-optional_callbacks([ pre_process_output_args/2
+                    ]).
 
-assert_builtin_output(FuncName) ->
-    case lists:member(FuncName, ?OUTPUT_FUNCS) of
-        true -> FuncName;
-        false -> error({unknown_builtin_function, FuncName})
-    end.
+-callback pre_process_output_args(FuncName :: atom(), output_fun_args()) -> output_fun_args().
 
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+parse_output(#{function := OutputFunc} = Output) ->
+    {Mod, Func} = parse_output_func(OutputFunc),
+    #{mod => Mod, func => Func,
+      args => pre_process_args(Mod, Func, maps:get(args, Output, #{}))}.
+
+%%--------------------------------------------------------------------
+%% callbacks of emqx_rule_output
+%%--------------------------------------------------------------------
+pre_process_output_args(republish, #{topic := Topic, qos := QoS, retain := Retain,
+        payload := Payload} = Args) ->
+    Args#{preprocessed_tmpl => #{
+            topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
+            qos => preproc_vars(QoS),
+            retain => preproc_vars(Retain),
+            payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
+        }};
+pre_process_output_args(_, Args) ->
+    Args.
+
+%%--------------------------------------------------------------------
+%% output functions
+%%--------------------------------------------------------------------
 -spec console(map(), map(), map()) -> any().
 console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) ->
     ?ULOG("[rule output] ~ts~n"
@@ -57,8 +83,8 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}},
             payload := PayloadTks}}) ->
     Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
     Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
-    QoS = replace_simple_var(QoSTks, Selected),
-    Retain = replace_simple_var(RetainTks, Selected),
+    QoS = replace_simple_var(QoSTks, Selected, 0),
+    Retain = replace_simple_var(RetainTks, Selected, false),
     ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
     safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
 
@@ -71,11 +97,45 @@ republish(Selected, #{metadata := #{rule_id := RuleId}},
                 payload := PayloadTks}}) ->
     Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
     Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
-    QoS = replace_simple_var(QoSTks, Selected),
-    Retain = replace_simple_var(RetainTks, Selected),
+    QoS = replace_simple_var(QoSTks, Selected, 0),
+    Retain = replace_simple_var(RetainTks, Selected, false),
     ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
     safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
 
+%%--------------------------------------------------------------------
+%% internal functions
+%%--------------------------------------------------------------------
+parse_output_func(OutputFunc) ->
+    {Mod, Func} = get_output_mod_func(OutputFunc),
+    assert_function_supported(Mod, Func),
+    {Mod, Func}.
+
+get_output_mod_func(OutputFunc) when is_atom(OutputFunc) ->
+    {emqx_rule_outputs, OutputFunc};
+get_output_mod_func(OutputFunc) when is_binary(OutputFunc) ->
+    ToAtom = fun(Bin) ->
+        try binary_to_existing_atom(Bin) of Atom -> Atom
+        catch error:badarg -> error({unknown_output_function, OutputFunc})
+        end
+    end,
+    case string:split(OutputFunc, ":", all) of
+        [Func1] -> {emqx_rule_outputs, ToAtom(Func1)};
+        [Mod1, Func1] -> {ToAtom(Mod1), ToAtom(Func1)};
+        _ -> error({invalid_output_function, OutputFunc})
+    end.
+
+assert_function_supported(Mod, Func) ->
+    case erlang:function_exported(Mod, Func, 3) of
+        true -> ok;
+        false -> error({output_function_not_supported, Func})
+    end.
+
+pre_process_args(Mod, Func, Args) ->
+    case erlang:function_exported(Mod, pre_process_output_args, 2) of
+        true -> Mod:pre_process_output_args(Func, Args);
+        false -> Args
+    end.
+
 safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
     Msg = #message{
         id = emqx_guid:gen(),
@@ -90,22 +150,16 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
     _ = emqx_broker:safe_publish(Msg),
     emqx_metrics:inc_msg(Msg).
 
-pre_process_repub_args(#{topic := Topic, qos := QoS, retain := Retain,
-                         payload := Payload} = Args) ->
-    Args#{preprocessed_tmpl => #{
-            topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
-            qos => preproc_vars(QoS),
-            retain => preproc_vars(Retain),
-            payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
-        }}.
-
 preproc_vars(Data) when is_binary(Data) ->
     emqx_plugin_libs_rule:preproc_tmpl(Data);
 preproc_vars(Data) ->
     Data.
 
-replace_simple_var(Tokens, Data) when is_list(Tokens) ->
+replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
     [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
-    Var;
-replace_simple_var(Val, _Data) ->
+    case Var of
+        undefined -> Default; %% cannot find the variable from Data
+        _ -> Var
+    end;
+replace_simple_var(Val, _Data, _Default) ->
     Val.

+ 2 - 7
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -250,13 +250,8 @@ handle_output(OutId, Selected, Envs) ->
 do_handle_output(ChannelId, Selected, _Envs) when is_binary(ChannelId) ->
     ?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}),
     emqx_bridge:send_message(ChannelId, Selected);
-do_handle_output(#{function := Func} = Out, Selected, Envs) when is_function(Func) ->
-    erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]);
-do_handle_output(#{function := Func} = Out, Selected, Envs) when is_atom(Func) ->
-    case erlang:function_exported(emqx_rule_outputs, Func, 3) of
-        true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs, maps:get(args, Out, #{})]);
-        false -> error(not_found)
-    end.
+do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
+    Mod:Func(Selected, Envs, Args).
 
 eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
     nested_get({path, Path}, may_decode_payload(Payload));

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -46,7 +46,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
         id => RuleId,
         sql => Sql,
         from => EventTopics,
-        outputs => [#{function => fun ?MODULE:get_selected_data/3, args => #{}}],
+        outputs => [#{mod => ?MODULE, func => get_selected_data, args => #{}}],
         enabled => true,
         is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
         fields => emqx_rule_sqlparser:select_fields(Select),

+ 2 - 2
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -150,7 +150,7 @@ init_per_testcase(t_events, Config) ->
             sql => SQL,
             outputs => [
                 #{function => console},
-                #{function => fun ?MODULE:output_record_triggered_events/3,
+                #{function => <<"emqx_rule_engine_SUITE:output_record_triggered_events">>,
                   args => #{}}
             ],
             description => <<"to console and record triggered events">>}),
@@ -1318,7 +1318,7 @@ make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) ->
       fields => [<<"*">>],
       is_foreach => false,
       conditions => {},
-      ouputs => [#{function => console}],
+      outputs => [#{mod => emqx_rule_outputs, func => console, args => #{}}],
       description => <<"simple rule">>,
       created_at => Ts
     }.