Browse Source

feat(rule_engine): default timeout for jq/2 and jq/3 with timeout

This commit adds a default timeout of 10 seconds to the rule engine's
`jq/2` function, and adds a new function `jq/3` (where the last parameter is
a timeout value). The default timeout can be configured with the setting
"rule_engine.jq_function_default_timeout".

Having a timeout when executing jq code in the rule engine is important
as jq code can potentially run forever. Also, the Erlang jq library
limits the number of jq programs that can execute concurrently so a jq
program that loops forever could potentially also prevent a "non-buggy"
jq program from ever starting.
Kjell Winblad 3 năm trước cách đây
mục cha
commit
05032467bd

+ 1 - 0
apps/emqx_rule_engine/etc/emqx_rule_engine.conf

@@ -3,6 +3,7 @@
 ##====================================================================
 ##====================================================================
 rule_engine {
 rule_engine {
     ignore_sys_message = true
     ignore_sys_message = true
+    jq_function_default_timeout = 10s
     #rules.my_republish_rule {
     #rules.my_republish_rule {
     #    description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'"
     #    description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'"
     #    enable = true
     #    enable = true

+ 11 - 0
apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf

@@ -239,6 +239,17 @@ of the rule, then the string "undefined" is used.
                           }
                           }
                   }
                   }
 
 
+    rule_engine_jq_function_default_timeout {
+      desc {
+          en: "Default timeout for the jq/2 rule engine function"
+          zh: "Default timeout for the jq/2 rule engine function"
+      }
+      label: {
+          en: "Rule engine jq function default timeout"
+          zh: "Rule engine jq function default timeout"
+      }
+    }
+
     desc_rule_engine {
     desc_rule_engine {
                    desc {
                    desc {
                          en: """Configuration for the EMQX Rule Engine."""
                          en: """Configuration for the EMQX Rule Engine."""

+ 39 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -19,8 +19,16 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
 
+-type duration_ms() :: integer().
+
+-typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}).
+
 -behaviour(hocon_schema).
 -behaviour(hocon_schema).
 
 
+-reflect_type([
+    duration_ms/0
+]).
+
 -export([
 -export([
     namespace/0,
     namespace/0,
     roots/0,
     roots/0,
@@ -30,6 +38,11 @@
 
 
 -export([validate_sql/1, validate_rule_name/1]).
 -export([validate_sql/1, validate_rule_name/1]).
 
 
+% workaround: prevent being recognized as unused functions
+-export([
+    to_duration_ms/1
+]).
+
 namespace() -> rule_engine.
 namespace() -> rule_engine.
 
 
 roots() -> ["rule_engine"].
 roots() -> ["rule_engine"].
@@ -41,7 +54,15 @@ fields("rule_engine") ->
         {rules,
         {rules,
             sc(hoconsc:map("id", ref("rules")), #{
             sc(hoconsc:map("id", ref("rules")), #{
                 desc => ?DESC("rule_engine_rules"), default => #{}
                 desc => ?DESC("rule_engine_rules"), default => #{}
-            })}
+            })},
+        {jq_function_default_timeout,
+            sc(
+                duration_ms(),
+                #{
+                    default => "10s",
+                    desc => ?DESC("rule_engine_jq_function_default_timeout")
+                }
+            )}
     ];
     ];
 fields("rules") ->
 fields("rules") ->
     [
     [
@@ -218,3 +239,20 @@ validate_sql(Sql) ->
 
 
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
 ref(Field) -> hoconsc:ref(?MODULE, Field).
 ref(Field) -> hoconsc:ref(?MODULE, Field).
+
+-spec ceiling(number()) -> integer().
+ceiling(X) ->
+    T = erlang:trunc(X),
+    case (X - T) of
+        Neg when Neg < 0 -> T;
+        Pos when Pos > 0 -> T + 1;
+        _ -> T
+    end.
+
+-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when
+    Input :: string() | binary().
+to_duration_ms(Str) ->
+    case hocon_postprocess:duration(Str) of
+        I when is_number(I) -> {ok, ceiling(I)};
+        _ -> {error, Str}
+    end.

+ 23 - 6
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -148,7 +148,8 @@
     ascii/1,
     ascii/1,
     find/2,
     find/2,
     find/3,
     find/3,
-    jq/2
+    jq/2,
+    jq/3
 ]).
 ]).
 
 
 %% Map Funcs
 %% Map Funcs
@@ -780,22 +781,38 @@ find_s(S, P, Dir) ->
         SubStr -> SubStr
         SubStr -> SubStr
     end.
     end.
 
 
--spec jq(FilterProgram, JSON) -> Result when
+-spec jq(FilterProgram, JSON, TimeoutMS) -> Result when
     FilterProgram :: binary(),
     FilterProgram :: binary(),
     JSON :: binary() | term(),
     JSON :: binary() | term(),
+    TimeoutMS :: non_neg_integer(),
     Result :: [term()].
     Result :: [term()].
-jq(FilterProgram, JSONBin) when
+jq(FilterProgram, JSONBin, TimeoutMS) when
     is_binary(FilterProgram), is_binary(JSONBin)
     is_binary(FilterProgram), is_binary(JSONBin)
 ->
 ->
-    case jq:process_json(FilterProgram, JSONBin) of
+    case jq:process_json(FilterProgram, JSONBin, TimeoutMS) of
         {ok, Result} ->
         {ok, Result} ->
             [json_decode(JSONString) || JSONString <- Result];
             [json_decode(JSONString) || JSONString <- Result];
         {error, ErrorReason} ->
         {error, ErrorReason} ->
             erlang:throw({jq_exception, ErrorReason})
             erlang:throw({jq_exception, ErrorReason})
     end;
     end;
-jq(FilterProgram, JSONTerm) when is_binary(FilterProgram) ->
+jq(FilterProgram, JSONTerm, TimeoutMS) when is_binary(FilterProgram) ->
     JSONBin = json_encode(JSONTerm),
     JSONBin = json_encode(JSONTerm),
-    jq(FilterProgram, JSONBin).
+    jq(FilterProgram, JSONBin, TimeoutMS).
+
+-spec jq(FilterProgram, JSON) -> Result when
+    FilterProgram :: binary(),
+    JSON :: binary() | term(),
+    Result :: [term()].
+jq(FilterProgram, JSONBin) ->
+    ConfigRootKey = emqx_rule_engine_schema:namespace(),
+    jq(
+        FilterProgram,
+        JSONBin,
+        emqx_config:get([
+            ConfigRootKey,
+            jq_function_default_timeout
+        ])
+    ).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Array Funcs
 %% Array Funcs

+ 48 - 1
apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl

@@ -28,6 +28,14 @@
 -define(PROPTEST(F), ?assert(proper:quickcheck(F()))).
 -define(PROPTEST(F), ?assert(proper:quickcheck(F()))).
 %%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))).
 %%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))).
 
 
+init_per_suite(Config) ->
+    application:load(emqx_conf),
+    ConfigConf = <<"rule_engine {jq_function_default_timeout {}}">>,
+    ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ConfigConf),
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Test cases for IoT Funcs
 %% Test cases for IoT Funcs
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -645,7 +653,46 @@ t_jq(_) ->
     ?assertEqual(
     ?assertEqual(
         jq_1_elm_res("{\"b\":2}"),
         jq_1_elm_res("{\"b\":2}"),
         apply_func(jq, [<<".">>, <<"{\"b\": 2}">>])
         apply_func(jq, [<<".">>, <<"{\"b\": 2}">>])
-    ).
+    ),
+    %% Expicitly set timeout
+    ?assertEqual(
+        jq_1_elm_res("{\"b\":2}"),
+        apply_func(jq, [<<".">>, <<"{\"b\": 2}">>, 10000])
+    ),
+    TOProgram = erlang:iolist_to_binary(
+        "def while(cond; update):"
+        "  def _while:"
+        "    if cond then  (update | _while) else . end;"
+        "  _while;"
+        "while(. < 42; . * 2)"
+    ),
+    got_timeout =
+        try
+            apply_func(jq, [TOProgram, <<"-2">>, 10])
+        catch
+            throw:{jq_exception, {timeout, _}} ->
+                %% Got timeout as expected
+                got_timeout
+        end,
+    ConfigRootKey = emqx_rule_engine_schema:namespace(),
+    DefaultTimeOut = emqx_config:get([
+        ConfigRootKey,
+        jq_function_default_timeout
+    ]),
+    case DefaultTimeOut =< 15000 of
+        true ->
+            got_timeout =
+                try
+                    apply_func(jq, [TOProgram, <<"-2">>])
+                catch
+                    throw:{jq_exception, {timeout, _}} ->
+                        %% Got timeout as expected
+                        got_timeout
+                end;
+        false ->
+            %% Skip test as we don't want it to take to long time to run
+            ok
+    end.
 
 
 ascii_string() -> list(range(0, 127)).
 ascii_string() -> list(range(0, 127)).
 
 

+ 1 - 1
mix.exs

@@ -611,7 +611,7 @@ defmodule EMQXUmbrella.MixProject do
 
 
   defp jq_dep() do
   defp jq_dep() do
     if enable_jq?(),
     if enable_jq?(),
-      do: [{:jq, github: "emqx/jq", tag: "v0.2.2", override: true}],
+      do: [{:jq, github: "emqx/jq", tag: "v0.3.0-beta.1", override: true}],
       else: []
       else: []
   end
   end
 
 

+ 1 - 1
rebar.config.erl

@@ -41,7 +41,7 @@ quicer() ->
     {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}.
     {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}.
 
 
 jq() ->
 jq() ->
-    {jq, {git, "https://github.com/emqx/jq", {tag, "v0.2.2"}}}.
+    {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.0-beta.1"}}}.
 
 
 deps(Config) ->
 deps(Config) ->
     {deps, OldDeps} = lists:keyfind(deps, 1, Config),
     {deps, OldDeps} = lists:keyfind(deps, 1, Config),