Просмотр исходного кода

Merge branch 'dev/v4.3.0' into e422_to_v430

JianBo He 5 лет назад
Родитель
Сommit
1c2d9767ae

+ 2 - 1
.gitignore

@@ -34,4 +34,5 @@ Mnesia.*/
 *.DS_Store
 _checkouts
 rebar.config.rendered
-/rebar3
+/rebar3
+rebar.lock

+ 8 - 0
Makefile

@@ -19,6 +19,14 @@ ensure-rebar3:
 
 $(REBAR): ensure-rebar3
 
+.PHONY: xref
+xref: $(REBAR)
+	$(REBAR) xref
+
+.PHONY: dialyzer
+dialyzer: $(REBAR)
+	$(REBAR) dialyzer
+
 .PHONY: distclean
 distclean:
 	@rm -rf _build

+ 13 - 4
README.md

@@ -40,13 +40,22 @@ Get the binary package of the corresponding OS from [EMQ X Download](https://www
 
 The *EMQ X* broker requires Erlang/OTP R21+ to build since 3.0 release.
 
-```
-git clone -b v4.0.0 https://github.com/emqx/emqx-rel.git
+For 4.3 and later versions.
 
-cd emqx-rel && make
+```bash
+git clone https://github.com/emqx/emqx.git
+cd emqx
+make
+_build/emqx/rel/emqx/bin console
+```
 
-cd _build/emqx/rel/emqx && ./bin/emqx console
+For earlier versions, release has to be built from another repo.
 
+```bash
+git clone https://github.com/emqx/emqx-rel.git
+cd emqx-rel
+make
+_build/emqx/rel/emqx/bin console
 ```
 
 ## Quick Start

+ 1 - 0
apps/emqx_auth_http/src/emqx_acl_http.erl

@@ -36,6 +36,7 @@
 
 -spec(register_metrics() -> ok).
 register_metrics() ->
+    io:format("testing"),
     lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
 
 %%--------------------------------------------------------------------

+ 2 - 2
apps/emqx_auth_http/src/emqx_http_client.erl

@@ -191,7 +191,7 @@ handle_info(Info, State) ->
     {noreply, State}.
 
 terminate(_Reason, #state{pool = Pool, id = Id}) ->
-    gropc:disconnect_worker(Pool, {Pool, Id}),
+    gproc_pool:disconnect_worker(Pool, {Pool, Id}),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
@@ -253,4 +253,4 @@ flush_stream(Client, StreamRef) ->
             flush_stream(Client, StreamRef)
 	after 0 ->
 		ok
-	end.
+	end.

+ 1 - 16
apps/emqx_management/src/emqx_mgmt.erl

@@ -128,7 +128,6 @@
         , export_auth_username/0
         , export_auth_mnesia/0
         , export_acl_mnesia/0
-        , export_schemas/0
         , import_rules/1
         , import_resources/1
         , import_blacklist/1
@@ -138,7 +137,6 @@
         , import_auth_username/1
         , import_auth_mnesia/1
         , import_acl_mnesia/1
-        , import_schemas/1
         , to_version/1
         ]).
 
@@ -679,13 +677,6 @@ export_acl_mnesia() ->
                         end, [], ets:tab2list(emqx_acl))
     end.
 
-export_schemas() ->
-    case ets:info(emqx_schema) of
-        undefined -> [];
-        _ ->
-            [emqx_schema_api:format_schema(Schema) || Schema <- emqx_schema_registry:get_all_schemas()]
-    end.
-
 import_rules(Rules) ->
     lists:foreach(fun(#{<<"id">> := RuleId,
                         <<"rawsql">> := RawSQL,
@@ -788,19 +779,13 @@ import_auth_mnesia(Auths) ->
 import_acl_mnesia(Acls) ->
     case ets:info(emqx_acl) of
         undefined -> ok;
-        _ -> 
+        _ ->
             [ mnesia:dirty_write({emqx_acl ,Login, Topic, Action, Allow}) || #{<<"login">> := Login, 
                                                                                <<"topic">> := Topic,
                                                                                <<"action">> := Action,
                                                                                <<"allow">> := Allow} <- Acls ]
     end.
 
-import_schemas(Schemas) -> 
-    case ets:info(emqx_schema) of
-        undefined -> ok;
-        _ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas]
-    end.
-
 any_to_atom(L) when is_list(L) -> list_to_atom(L);
 any_to_atom(B) when is_binary(B) -> binary_to_atom(B, utf8);
 any_to_atom(A) when is_atom(A) -> A.

+ 6 - 9
apps/emqx_management/src/emqx_mgmt_api_data.erl

@@ -84,7 +84,6 @@ export(_Bindings, _Params) ->
     AuthUsername = emqx_mgmt:export_auth_username(),
     AuthMnesia = emqx_mgmt:export_auth_mnesia(),
     AclMnesia = emqx_mgmt:export_acl_mnesia(),
-    Schemas = emqx_mgmt:export_schemas(),
     Seconds = erlang:system_time(second),
     {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
     Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
@@ -100,8 +99,7 @@ export(_Bindings, _Params) ->
             {auth_clientid, AuthClientid},
             {auth_username, AuthUsername},
             {auth_mnesia, AuthMnesia},
-            {acl_mnesia, AclMnesia},
-            {schemas, Schemas}
+            {acl_mnesia, AclMnesia}
            ],
 
     Bin = emqx_json:encode(Data),
@@ -180,20 +178,19 @@ do_import(Filename) ->
             case lists:member(Version, ?VERSIONS) of
                 true  ->
                     try
-                        emqx_mgmt:import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])),
+                        %emqx_mgmt:import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])),
                         emqx_mgmt:import_resources(maps:get(<<"resources">>, Data, [])),
                         emqx_mgmt:import_rules(maps:get(<<"rules">>, Data, [])),
                         emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
                         emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
                         emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
-                        emqx_mgmt:import_modules(maps:get(<<"modules">>, Data, [])),
+                        %emqx_mgmt:import_modules(maps:get(<<"modules">>, Data, [])),
                         emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
                         emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
-                        emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
-                        emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
-                        emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])),
+                        %emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
+                        %emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
                         logger:debug("The emqx data has been imported successfully"),
-                        ok
+                        error({not_implemented, [import_confs,import_modules,import_auth_mnesia,import_acl_mnesia]})
                     catch Class:Reason:Stack ->
                         logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
                         {error, import_failed}

+ 2 - 4
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -567,7 +567,6 @@ data(["export"]) ->
     AuthUsername = emqx_mgmt:export_auth_username(),
     AuthMnesia = emqx_mgmt:export_auth_mnesia(),
     AclMnesia = emqx_mgmt:export_acl_mnesia(),
-    Schemas = emqx_mgmt:export_schemas(),
     Seconds = erlang:system_time(second),
     {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
     Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
@@ -583,8 +582,8 @@ data(["export"]) ->
             {auth_clientid, AuthClientID},
             {auth_username, AuthUsername},
             {auth_mnesia, AuthMnesia},
-            {acl_mnesia, AclMnesia},
-            {schemas, Schemas}],
+            {acl_mnesia, AclMnesia}
+            ],
     ok = filelib:ensure_dir(NFilename),
     case file:write_file(NFilename, emqx_json:encode(Data)) of
         ok ->
@@ -610,7 +609,6 @@ data(["import", Filename]) ->
                         emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
                         emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, [])),
                         emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, [])),
-                        emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])),
                         emqx_ctl:print("The emqx data has been imported successfully.~n")
                     catch Class:Reason:Stack ->
                         emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}])

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl

@@ -45,7 +45,7 @@ init([]) ->
                 shutdown => 5000,
                 type => worker,
                 modules => [emqx_rule_metrics]},
-    {ok, {{one_for_all, 10, 100}, [Registry, Metrics]}}.
+    {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
 
 start_locker() ->
     Locker = #{id => emqx_rule_locker,

+ 7 - 1
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -360,6 +360,12 @@ printable_maps(Headers) ->
         fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname ->
                 AccIn#{K => ntoa(V0)};
             ('User-Property', V0, AccIn) when is_list(V0) ->
-                AccIn#{'User-Property' => maps:from_list(V0)};
+                AccIn#{
+                    'User-Property' => maps:from_list(V0),
+                    'User-Property-Pairs' => [#{
+                        key => Key,
+                        value => Value
+                     } || {Key, Value} <- V0]
+                };
             (K, V0, AccIn) -> AccIn#{K => V0}
         end, #{}, Headers).

+ 75 - 45
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -78,6 +78,10 @@
         , bitxor/2
         , bitsl/2
         , bitsr/2
+        , bitsize/1
+        , subbits/2
+        , subbits/3
+        , subbits/6
         ]).
 
 %% Data Type Convertion
@@ -233,7 +237,7 @@ payload() ->
 
 payload(Path) ->
     fun(#{payload := Payload}) when erlang:is_map(Payload) ->
-            emqx_rule_maps:nested_get(map_path(Path), Payload);
+            map_get(Path, Payload);
        (_) -> undefined
     end.
 
@@ -401,6 +405,74 @@ bitsl(X, I) when is_integer(X), is_integer(I) ->
 bitsr(X, I) when is_integer(X), is_integer(I) ->
     X bsr I.
 
+bitsize(Bits) when is_bitstring(Bits) ->
+    bit_size(Bits).
+
+subbits(Bits, Len) when is_integer(Len), is_bitstring(Bits) ->
+    subbits(Bits, 1, Len).
+
+subbits(Bits, Start, Len) when is_integer(Start), is_integer(Len), is_bitstring(Bits) ->
+    get_subbits(Bits, Start, Len, <<"integer">>, <<"unsigned">>, <<"big">>).
+
+subbits(Bits, Start, Len, Type, Signedness, Endianness) when is_integer(Start), is_integer(Len), is_bitstring(Bits) ->
+    get_subbits(Bits, Start, Len, Type, Signedness, Endianness).
+
+get_subbits(Bits, Start, Len, Type, Signedness, Endianness) ->
+    Begin = Start - 1,
+    case Bits of
+        <<_:Begin, Rem/bits>> when Rem =/= <<>> ->
+            Sz = bit_size(Rem),
+            do_get_subbits(Rem, Sz, Len, Type, Signedness, Endianness);
+        _ -> undefined
+    end.
+
+-define(match_bits(Bits0, Pattern, ElesePattern),
+    case Bits0 of
+        Pattern ->
+            SubBits;
+        ElesePattern ->
+            SubBits
+    end).
+do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"unsigned">>, <<"big">>) ->
+    ?match_bits(Bits, <<SubBits:Len/integer-unsigned-big-unit:1, _/bits>>,
+                      <<SubBits:Sz/integer-unsigned-big-unit:1>>);
+do_get_subbits(Bits, Sz, Len, <<"float">>, <<"unsigned">>, <<"big">>) ->
+    ?match_bits(Bits, <<SubBits:Len/float-unsigned-big-unit:1, _/bits>>,
+                      <<SubBits:Sz/float-unsigned-big-unit:1>>);
+do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"unsigned">>, <<"big">>) ->
+    ?match_bits(Bits, <<SubBits:Len/bits-unsigned-big-unit:1, _/bits>>,
+                      <<SubBits:Sz/bits-unsigned-big-unit:1>>);
+
+do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"signed">>, <<"big">>) ->
+    ?match_bits(Bits, <<SubBits:Len/integer-signed-big-unit:1, _/bits>>,
+                      <<SubBits:Sz/integer-signed-big-unit:1>>);
+do_get_subbits(Bits, Sz, Len, <<"float">>, <<"signed">>, <<"big">>) ->
+    ?match_bits(Bits, <<SubBits:Len/float-signed-big-unit:1, _/bits>>,
+                      <<SubBits:Sz/float-signed-big-unit:1>>);
+do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"big">>) ->
+    ?match_bits(Bits, <<SubBits:Len/bits-signed-big-unit:1, _/bits>>,
+                      <<SubBits:Sz/bits-signed-big-unit:1>>);
+
+do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"unsigned">>, <<"little">>) ->
+    ?match_bits(Bits, <<SubBits:Len/integer-unsigned-little-unit:1, _/bits>>,
+                      <<SubBits:Sz/integer-unsigned-little-unit:1>>);
+do_get_subbits(Bits, Sz, Len, <<"float">>, <<"unsigned">>, <<"little">>) ->
+    ?match_bits(Bits, <<SubBits:Len/float-unsigned-little-unit:1, _/bits>>,
+                      <<SubBits:Sz/float-unsigned-little-unit:1>>);
+do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"unsigned">>, <<"little">>) ->
+    ?match_bits(Bits, <<SubBits:Len/bits-unsigned-little-unit:1, _/bits>>,
+                      <<SubBits:Sz/bits-unsigned-little-unit:1>>);
+
+do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"signed">>, <<"little">>) ->
+    ?match_bits(Bits, <<SubBits:Len/integer-signed-little-unit:1, _/bits>>,
+                      <<SubBits:Sz/integer-signed-little-unit:1>>);
+do_get_subbits(Bits, Sz, Len, <<"float">>, <<"signed">>, <<"little">>) ->
+    ?match_bits(Bits, <<SubBits:Len/float-signed-little-unit:1, _/bits>>,
+                      <<SubBits:Sz/float-signed-little-unit:1>>);
+do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"little">>) ->
+    ?match_bits(Bits, <<SubBits:Len/bits-signed-little-unit:1, _/bits>>,
+                      <<SubBits:Sz/bits-signed-little-unit:1>>).
+
 %%------------------------------------------------------------------------------
 %% Data Type Convertion Funcs
 %%------------------------------------------------------------------------------
@@ -607,52 +679,10 @@ map_get(Key, Map) ->
     map_get(Key, Map, undefined).
 
 map_get(Key, Map, Default) ->
-    case maps:find(Key, Map) of
-        {ok, Val} -> Val;
-        error when is_atom(Key) ->
-            %% the map may have an equivalent binary-form key
-            BinKey = emqx_rule_utils:bin(Key),
-            case maps:find(BinKey, Map) of
-                {ok, Val} -> Val;
-                error -> Default
-            end;
-        error when is_binary(Key) ->
-            try %% the map may have an equivalent atom-form key
-                AtomKey = list_to_existing_atom(binary_to_list(Key)),
-                case maps:find(AtomKey, Map) of
-                    {ok, Val} -> Val;
-                    error -> Default
-                end
-            catch error:badarg ->
-                Default
-            end;
-        error ->
-            Default
-    end.
+    emqx_rule_maps:nested_get(map_path(Key), Map, Default).
 
 map_put(Key, Val, Map) ->
-    case maps:find(Key, Map) of
-        {ok, _} -> maps:put(Key, Val, Map);
-        error when is_atom(Key) ->
-            %% the map may have an equivalent binary-form key
-            BinKey = emqx_rule_utils:bin(Key),
-            case maps:find(BinKey, Map) of
-                {ok, _} -> maps:put(BinKey, Val, Map);
-                error -> maps:put(Key, Val, Map)
-            end;
-        error when is_binary(Key) ->
-            try %% the map may have an equivalent atom-form key
-                AtomKey = list_to_existing_atom(binary_to_list(Key)),
-                case maps:find(AtomKey, Map) of
-                    {ok, _} -> maps:put(AtomKey, Val, Map);
-                    error -> maps:put(Key, Val, Map)
-                end
-            catch error:badarg ->
-                maps:put(Key, Val, Map)
-            end;
-        error ->
-            maps:put(Key, Val, Map)
-    end.
+    emqx_rule_maps:nested_put(map_path(Key), Val, Map).
 
 mget(Key, Map) ->
     mget(Key, Map, undefined).

+ 13 - 9
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -234,20 +234,26 @@ take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = A
             = emqx_rule_registry:get_action_instance_params(Id),
         emqx_rule_metrics:inc_actions_taken(Id),
         apply_action_func(Selected, Envs, Apply, ActName)
+    of
+        {badact, Reason} ->
+            handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason);
+        Result -> Result
     catch
         error:{badfun, _Func}:_ST ->
             %?LOG(warning, "Action ~p maybe outdated, refresh it and try again."
             %              "Func: ~p~nST:~0p", [Id, Func, ST]),
             trans_action_on(Id, fun() ->
-                    emqx_rule_engine:refresh_actions([ActInst])
+                emqx_rule_engine:refresh_actions([ActInst])
             end, 5000),
             emqx_rule_metrics:inc_actions_retry(Id),
             take_action(ActInst, Selected, Envs, OnFailed, RetryN-1);
         Error:Reason:Stack ->
+            emqx_rule_metrics:inc_actions_exception(Id),
             handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack})
     end;
 
 take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) ->
+    emqx_rule_metrics:inc_actions_error(Id),
     handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}).
 
 apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) ->
@@ -284,12 +290,10 @@ wait_action_on(Id, RetryN) ->
     end.
 
 handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) ->
-    emqx_rule_metrics:inc_actions_exception(Id),
     ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]),
     take_actions(Fallbacks, Selected, Envs, continue),
     failed;
 handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) ->
-    emqx_rule_metrics:inc_actions_exception(Id),
     ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]),
     take_actions(Fallbacks, Selected, Envs, continue),
     error({take_action_failed, {Id, Reason}}).
@@ -409,11 +413,13 @@ add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) ->
 %%------------------------------------------------------------------------------
 %% Internal Functions
 %%------------------------------------------------------------------------------
-may_decode_payload(Payload) ->
+may_decode_payload(Payload) when is_binary(Payload) ->
     case get_cached_payload() of
-        undefined -> ensure_decoded(Payload);
+        undefined -> safe_decode_and_cache(Payload);
         DecodedP -> DecodedP
-    end.
+    end;
+may_decode_payload(Payload) ->
+    Payload.
 
 get_cached_payload() ->
     erlang:get(rule_payload).
@@ -422,9 +428,7 @@ cache_payload(DecodedP) ->
     erlang:put(rule_payload, DecodedP),
     DecodedP.
 
-ensure_decoded(Json) when is_map(Json); is_list(Json) ->
-    Json;
-ensure_decoded(MaybeJson) ->
+safe_decode_and_cache(MaybeJson) ->
     try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
     catch _:_ -> #{}
     end.

+ 94 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -37,6 +37,7 @@ all() ->
     , {group, runtime}
     , {group, events}
     , {group, multi_actions}
+    , {group, bugs}
     ].
 
 suite() ->
@@ -123,11 +124,15 @@ groups() ->
      {events, [],
       [t_events
       ]},
+     {bugs, [],
+      [t_sqlparse_payload_as
+      ]},
      {multi_actions, [],
       [t_sqlselect_multi_actoins_1,
        t_sqlselect_multi_actoins_1_1,
        t_sqlselect_multi_actoins_2,
        t_sqlselect_multi_actoins_3,
+       t_sqlselect_multi_actoins_3_1,
        t_sqlselect_multi_actoins_4
       ]}
     ].
@@ -200,6 +205,7 @@ init_per_testcase(Test, Config)
             ;Test =:= t_sqlselect_multi_actoins_1_1
             ;Test =:= t_sqlselect_multi_actoins_2
             ;Test =:= t_sqlselect_multi_actoins_3
+            ;Test =:= t_sqlselect_multi_actoins_3_1
             ;Test =:= t_sqlselect_multi_actoins_4
         ->
     ok = emqx_rule_engine:load_providers(),
@@ -209,6 +215,12 @@ init_per_testcase(Test, Config)
                     types=[], params_spec = #{},
                     title = #{en => <<"Crash Action">>},
                     description = #{en => <<"This action will always fail!">>}}),
+    ok = emqx_rule_registry:add_action(
+            #action{name = 'failure_action', app = ?APP,
+                    module = ?MODULE, on_create = failure_action,
+                    types=[], params_spec = #{},
+                    title = #{en => <<"Crash Action">>},
+                    description = #{en => <<"This action will always fail!">>}}),
     ok = emqx_rule_registry:add_action(
             #action{name = 'plus_by_one', app = ?APP,
                     module = ?MODULE, on_create = plus_by_one_action,
@@ -1288,6 +1300,44 @@ t_sqlselect_multi_actoins_3(Config) ->
 
     emqx_rule_registry:remove_rule(Rule).
 
+t_sqlselect_multi_actoins_3_1(Config) ->
+    %% We create 2 actions in the same rule (on_action_failed = continue):
+    %% The first will fail (with a 'badact' return) and we need to make sure the
+    %% fallback actions can be executed, and the next actoins
+    %% will be run without influence
+    {ok, Rule} = emqx_rule_engine:create_rule(
+                    #{rawsql => ?config(connsql, Config),
+                      on_action_failed => continue,
+                      actions => [
+                          #{name => 'failure_action', args => #{}, fallbacks =>[
+                              #{name => 'plus_by_one', args => #{}, fallbacks =>[]},
+                              #{name => 'plus_by_one', args => #{}, fallbacks =>[]}
+                          ]},
+                          #{name => 'republish',
+                            args => #{<<"target_topic">> => <<"t2">>,
+                                      <<"target_qos">> => -1,
+                                      <<"payload_tmpl">> => <<"clientid=${clientid}">>
+                                    },
+                            fallbacks => []}
+                      ]
+                     }),
+
+    (?config(conn_event, Config))(),
+    timer:sleep(100),
+
+    %% verfiy the fallback actions has been run
+    ?assertEqual(2, ets:lookup_element(plus_by_one_action, num, 2)),
+
+    %% verfiy the next actions can be run
+    receive {publish, #{topic := T, payload := Payload}} ->
+        ?assertEqual(<<"t2">>, T),
+        ?assertEqual(<<"clientid=c_emqx1">>, Payload)
+    after 1000 ->
+        ct:fail(wait_for_t2)
+    end,
+
+    emqx_rule_registry:remove_rule(Rule).
+
 t_sqlselect_multi_actoins_4(Config) ->
     %% We create 2 actions in the same rule (on_action_failed = continue):
     %% The first will fail and we need to make sure the
@@ -1920,6 +1970,44 @@ t_sqlparse_new_map(_Config) ->
                    <<"c">> := [#{}]
                    }, Res00).
 
+t_sqlparse_payload_as(_Config) ->
+    %% https://github.com/emqx/emqx/issues/3866
+    Sql00 = "SELECT "
+            " payload, map_get('engineWorkTime', payload.params, -1) as payload.params.engineWorkTime, "
+            " map_get('hydOilTem', payload.params, -1) as payload.params.hydOilTem "
+            "FROM \"t/#\" ",
+    Payload1 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42, \"hydOilTem\": 30 } }">>,
+    {ok, Res01} = emqx_rule_sqltester:test(
+                    #{<<"rawsql">> => Sql00,
+                      <<"ctx">> => #{<<"payload">> => Payload1,
+                                     <<"topic">> => <<"t/a">>}}),
+    ?assertMatch(#{
+        <<"payload">> := #{
+            <<"params">> := #{
+                <<"convertTemp">> := 20,
+                <<"engineSpeed">> := 42,
+                <<"engineWorkTime">> := -1,
+                <<"hydOilTem">> := 30
+            }
+        }
+    }, Res01),
+
+    Payload2 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42 } }">>,
+    {ok, Res02} = emqx_rule_sqltester:test(
+                    #{<<"rawsql">> => Sql00,
+                      <<"ctx">> => #{<<"payload">> => Payload2,
+                                     <<"topic">> => <<"t/a">>}}),
+    ?assertMatch(#{
+        <<"payload">> := #{
+            <<"params">> := #{
+                <<"convertTemp">> := 20,
+                <<"engineSpeed">> := 42,
+                <<"engineWorkTime">> := -1,
+                <<"hydOilTem">> := -1
+            }
+        }
+    }, Res02).
+
 %%------------------------------------------------------------------------------
 %% Internal helpers
 %%------------------------------------------------------------------------------
@@ -2006,6 +2094,12 @@ mfa_action(Id, _Params) ->
 mfa_action_do(_Data, _Envs, K) ->
     persistent_term:put(K, 1).
 
+failure_action(_Id, _Params) ->
+    fun(Data, _Envs) ->
+        ct:pal("applying crash action, Data: ~p", [Data]),
+        {badact, intentional_failure}
+    end.
+
 crash_action(_Id, _Params) ->
     fun(Data, _Envs) ->
         ct:pal("applying crash action, Data: ~p", [Data]),

+ 57 - 4
apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl

@@ -489,22 +489,75 @@ t_contains(_) ->
 
 t_map_get(_) ->
     ?assertEqual(1, apply_func(map_get, [<<"a">>, #{a => 1}])),
-    ?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])).
+    ?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])),
+    ?assertEqual(1, apply_func(map_get, [<<"a.b">>, #{a => #{b => 1}}])),
+    ?assertEqual(undefined, apply_func(map_get, [<<"a.c">>, #{a => #{b => 1}}])).
 
 t_map_put(_) ->
     ?assertEqual(#{<<"a">> => 1}, apply_func(map_put, [<<"a">>, 1, #{}])),
-    ?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])).
+    ?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])),
+    ?assertEqual(#{<<"a">> => #{<<"b">> => 1}}, apply_func(map_put, [<<"a.b">>, 1, #{}])),
+    ?assertEqual(#{a => #{b => 1, <<"c">> => 1}}, apply_func(map_put, [<<"a.c">>, 1, #{a => #{b => 1}}])).
 
- t_mget(_) ->
+t_mget(_) ->
     ?assertEqual(1, apply_func(map_get, [<<"a">>, #{a => 1}])),
     ?assertEqual(1, apply_func(map_get, [<<"a">>, #{<<"a">> => 1}])),
     ?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])).
 
- t_mput(_) ->
+t_mput(_) ->
     ?assertEqual(#{<<"a">> => 1}, apply_func(map_put, [<<"a">>, 1, #{}])),
     ?assertEqual(#{<<"a">> => 2}, apply_func(map_put, [<<"a">>, 2, #{<<"a">> => 1}])),
     ?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])).
 
+t_bitsize(_) ->
+    ?assertEqual(8, apply_func(bitsize, [<<"a">>])),
+    ?assertEqual(4, apply_func(bitsize, [<<15:4>>])).
+
+t_subbits(_) ->
+    ?assertEqual(1, apply_func(subbits, [<<255:8>>, 1])),
+    ?assertEqual(3, apply_func(subbits, [<<255:8>>, 2])),
+    ?assertEqual(7, apply_func(subbits, [<<255:8>>, 3])),
+    ?assertEqual(15, apply_func(subbits, [<<255:8>>, 4])),
+    ?assertEqual(31, apply_func(subbits, [<<255:8>>, 5])),
+    ?assertEqual(63, apply_func(subbits, [<<255:8>>, 6])),
+    ?assertEqual(127, apply_func(subbits, [<<255:8>>, 7])),
+    ?assertEqual(255, apply_func(subbits, [<<255:8>>, 8])).
+
+t_subbits2(_) ->
+    ?assertEqual(1, apply_func(subbits, [<<255:8>>, 1, 1])),
+    ?assertEqual(3, apply_func(subbits, [<<255:8>>, 1, 2])),
+    ?assertEqual(7, apply_func(subbits, [<<255:8>>, 1, 3])),
+    ?assertEqual(15, apply_func(subbits, [<<255:8>>, 1, 4])),
+    ?assertEqual(31, apply_func(subbits, [<<255:8>>, 1, 5])),
+    ?assertEqual(63, apply_func(subbits, [<<255:8>>, 1, 6])),
+    ?assertEqual(127, apply_func(subbits, [<<255:8>>, 1, 7])),
+    ?assertEqual(255, apply_func(subbits, [<<255:8>>, 1, 8])).
+
+t_subbits2_1(_) ->
+    ?assertEqual(1, apply_func(subbits, [<<255:8>>, 2, 1])),
+    ?assertEqual(3, apply_func(subbits, [<<255:8>>, 2, 2])),
+    ?assertEqual(7, apply_func(subbits, [<<255:8>>, 2, 3])),
+    ?assertEqual(15, apply_func(subbits, [<<255:8>>, 2, 4])),
+    ?assertEqual(31, apply_func(subbits, [<<255:8>>, 2, 5])),
+    ?assertEqual(63, apply_func(subbits, [<<255:8>>, 2, 6])),
+    ?assertEqual(127, apply_func(subbits, [<<255:8>>, 2, 7])),
+    ?assertEqual(127, apply_func(subbits, [<<255:8>>, 2, 8])).
+
+t_subbits2_integer(_) ->
+    ?assertEqual(456, apply_func(subbits, [<<456:32/integer>>, 1, 32, <<"integer">>, <<"signed">>, <<"big">>])),
+    ?assertEqual(-456, apply_func(subbits, [<<-456:32/integer>>, 1, 32, <<"integer">>, <<"signed">>, <<"big">>])).
+
+t_subbits2_float(_) ->
+    R = apply_func(subbits, [<<5.3:64/float>>, 1, 64, <<"float">>, <<"unsigned">>, <<"big">>]),
+    RL = (5.3 - R),
+    ct:pal(";;;;~p", [R]),
+    ?assert( (RL >= 0 andalso RL < 0.0001) orelse (RL =< 0 andalso RL > -0.0001)),
+
+    R2 = apply_func(subbits, [<<-5.3:64/float>>, 1, 64, <<"float">>, <<"signed">>, <<"big">>]),
+
+    RL2 = (5.3 + R2),
+    ct:pal(";;;;~p", [R2]),
+    ?assert( (RL2 >= 0 andalso RL2 < 0.0001) orelse (RL2 =< 0 andalso RL2 > -0.0001)).
 
 %%------------------------------------------------------------------------------
 %% Test cases for Hash funcs

+ 4 - 4
rebar.config

@@ -14,6 +14,7 @@
 {overrides,[{add,[{erl_opts,[no_debug_info,compressed,deterministic,
                              {parse_transform,mod_vsn}]}]}]}.
 
+
 {xref_checks,[undefined_function_calls,undefined_functions,locals_not_used,
               deprecated_function_calls,warnings_as_errors, deprecated_functions]}.
 
@@ -37,15 +38,14 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.4"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
-    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.1"}}}
+    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.2"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.0"}}}
     , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.2.0"}}}
-    , {erlport, {git, "https://github.com/emqx/erlport", {tag, "v1.2.2"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.3"}}}
-    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}
+    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}
     , {getopt, "1.0.1"}
     ]}.

+ 0 - 1
rebar.config.erl

@@ -28,7 +28,6 @@ plugins() ->
 
 test_deps() ->
     [ {bbmustache, "1.10.0"}
-    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}
     , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}}
     , meck
     ].