Просмотр исходного кода

Merge branch 'release-50' into merge-r50-into-v50-20230524

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
324459990f

+ 24 - 0
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl

@@ -623,6 +623,30 @@ t_publish_success(Config) ->
     ),
     ok.
 
+t_publish_success_infinity_timeout(Config) ->
+    ServiceAccountJSON = ?config(service_account_json, Config),
+    Topic = <<"t/topic">>,
+    {ok, _} = create_bridge(Config, #{
+        <<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>}
+    }),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    Payload = <<"payload">>,
+    Message = emqx_message:make(Topic, Payload),
+    emqx:publish(Message),
+    DecodedMessages = assert_http_request(ServiceAccountJSON),
+    ?assertMatch(
+        [
+            #{
+                <<"topic">> := Topic,
+                <<"payload">> := Payload,
+                <<"metadata">> := #{<<"rule_id">> := RuleId}
+            }
+        ],
+        DecodedMessages
+    ),
+    ok.
+
 t_publish_success_local_topic(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),

+ 1 - 1
apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl

@@ -422,7 +422,7 @@ decode_cursor(Cursor) ->
         true = is_list(Name),
         {Node, #{transfer => {ClientId, FileId}, name => Name}}
     catch
-        error:{_, invalid_json} ->
+        error:{Loc, JsonError} when is_integer(Loc), is_atom(JsonError) ->
             error({badarg, cursor});
         error:{badmatch, _} ->
             error({badarg, cursor});

+ 1 - 1
apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl

@@ -167,7 +167,7 @@ parse_filepath(PathBin) ->
             throw({invalid, PathBin})
     end,
     PathComponents = filename:split(PathBin),
-    case lists:any(fun is_special_component/1, PathComponents) of
+    case PathComponents == [] orelse lists:any(fun is_special_component/1, PathComponents) of
         false ->
             filename:join(PathComponents);
         true ->

+ 17 - 4
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -47,6 +47,7 @@ groups() ->
             t_invalid_topic_format,
             t_meta_conflict,
             t_nasty_clientids_fileids,
+            t_nasty_filenames,
             t_no_meta,
             t_no_segment,
             t_simple_transfer
@@ -205,10 +206,6 @@ t_invalid_filename(Config) ->
             encode_meta(meta(lists:duplicate(1000, $A), <<>>)),
             1
         )
-    ),
-    ?assertRCName(
-        success,
-        emqtt:publish(C, mk_init_topic(<<"f5">>), encode_meta(meta("146%", <<>>)), 1)
     ).
 
 t_simple_transfer(Config) ->
@@ -265,6 +262,22 @@ t_nasty_clientids_fileids(_Config) ->
         Transfers
     ).
 
+t_nasty_filenames(_Config) ->
+    Filenames = [
+        {<<"nasty1">>, "146%"},
+        {<<"nasty2">>, "🌚"},
+        {<<"nasty3">>, "中文.txt"}
+    ],
+    ok = lists:foreach(
+        fun({ClientId, Filename}) ->
+            FileId = unicode:characters_to_binary(Filename),
+            ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId),
+            [Export] = list_files(ClientId),
+            ?assertEqual({ok, FileId}, read_export(Export))
+        end,
+        Filenames
+    ).
+
 t_meta_conflict(Config) ->
     C = ?config(client, Config),
 

+ 30 - 8
apps/emqx_ft/test/emqx_ft_api_SUITE.erl

@@ -140,10 +140,7 @@ t_download_transfer(Config) ->
         request(
             get,
             uri(["file_transfer", "file"]) ++
-                query(#{
-                    fileref => FileId,
-                    node => <<"nonode@nohost">>
-                })
+                query(#{fileref => FileId, node => <<"nonode@nohost">>})
         )
     ),
 
@@ -152,10 +149,25 @@ t_download_transfer(Config) ->
         request(
             get,
             uri(["file_transfer", "file"]) ++
-                query(#{
-                    fileref => <<"unknown_file">>,
-                    node => node()
-                })
+                query(#{fileref => <<"unknown_file">>, node => node()})
+        )
+    ),
+
+    ?assertMatch(
+        {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
+        request_json(
+            get,
+            uri(["file_transfer", "file"]) ++
+                query(#{fileref => <<>>, node => node()})
+        )
+    ),
+
+    ?assertMatch(
+        {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
+        request_json(
+            get,
+            uri(["file_transfer", "file"]) ++
+                query(#{fileref => <<"/etc/passwd">>, node => node()})
         )
     ),
 
@@ -204,6 +216,16 @@ t_list_files_paging(Config) ->
         request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}))
     ),
 
+    ?assertMatch(
+        {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}))
+    ),
+
+    ?assertMatch(
+        {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}))
+    ),
+
     ?assertMatch(
         {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
         request_json(

+ 5 - 1
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -1115,7 +1115,11 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) ->
 '$handle_undefined_function'(schema_decode, Args) ->
     error({args_count_error, {schema_decode, Args}});
 '$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) ->
-    emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs);
+    %% encode outputs iolists, but when the rule actions process those
+    %% it might wrongly encode them as JSON lists, so we force them to
+    %% binaries here.
+    IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs),
+    iolist_to_binary(IOList);
 '$handle_undefined_function'(schema_encode, Args) ->
     error({args_count_error, {schema_encode, Args}});
 '$handle_undefined_function'(sprintf, [Format | Args]) ->

+ 13 - 4
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl

@@ -82,8 +82,12 @@ make_trace_fn_action() ->
     #{function => Fn, args => #{}}.
 
 create_rule_http(RuleParams) ->
+    create_rule_http(RuleParams, _Overrides = #{}).
+
+create_rule_http(RuleParams, Overrides) ->
     RepublishTopic = <<"republish/schema_registry">>,
     emqx:subscribe(RepublishTopic),
+    PayloadTemplate = maps:get(payload_template, Overrides, <<>>),
     DefaultParams = #{
         enable => true,
         actions => [
@@ -93,7 +97,7 @@ create_rule_http(RuleParams) ->
                 <<"args">> =>
                     #{
                         <<"topic">> => RepublishTopic,
-                        <<"payload">> => <<>>,
+                        <<"payload">> => PayloadTemplate,
                         <<"qos">> => 0,
                         <<"retain">> => false,
                         <<"user_properties">> => <<>>
@@ -177,10 +181,12 @@ test_params_for(avro, encode1) ->
             "from t\n"
         >>,
     Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
+    PayloadTemplate = <<"${.encoded}">>,
     ExtraArgs = [],
     #{
         sql => SQL,
         payload => Payload,
+        payload_template => PayloadTemplate,
         extra_args => ExtraArgs
     };
 test_params_for(avro, decode1) ->
@@ -251,10 +257,12 @@ test_params_for(protobuf, encode1) ->
             "from t\n"
         >>,
     Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
+    PayloadTemplate = <<"${.encoded}">>,
     ExtraArgs = [<<"Person">>],
     #{
         sql => SQL,
         payload => Payload,
+        payload_template => PayloadTemplate,
         extra_args => ExtraArgs
     };
 test_params_for(protobuf, union1) ->
@@ -487,17 +495,18 @@ t_encode(Config) ->
     #{
         sql := SQL,
         payload := Payload,
+        payload_template := PayloadTemplate,
         extra_args := ExtraArgs
     } = test_params_for(SerdeType, encode1),
-    {ok, _} = create_rule_http(#{sql => SQL}),
+    {ok, _} = create_rule_http(#{sql => SQL}, #{payload_template => PayloadTemplate}),
     PayloadBin = emqx_utils_json:encode(Payload),
     emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
     Published = receive_published(?LINE),
     ?assertMatch(
-        #{payload := #{<<"encoded">> := _}},
+        #{payload := P} when is_binary(P),
         Published
     ),
-    #{payload := #{<<"encoded">> := Encoded}} = Published,
+    #{payload := Encoded} = Published,
     {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
     ?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])),
     ok.

+ 1 - 1
mix.exs

@@ -49,7 +49,7 @@ defmodule EMQXUmbrella.MixProject do
       {:redbug, "2.0.8"},
       {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true},
       {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
-      {:ehttpc, github: "emqx/ehttpc", tag: "0.4.8", override: true},
+      {:ehttpc, github: "emqx/ehttpc", tag: "0.4.10", override: true},
       {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
       {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},

+ 1 - 1
rebar.config

@@ -56,7 +56,7 @@
     , {gpb, "4.19.7"}
     , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
     , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
-    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.8"}}}
+    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.10"}}}
     , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}