Przeglądaj źródła

feat(ft): add emqx_ft tests and fixes

Ilya Averyanov 3 lat temu
rodzic
commit
2e889f4ac7

+ 10 - 0
apps/emqx/test/emqx_common_test_helpers.erl

@@ -63,6 +63,7 @@
 ]).
 
 -export([
+    maybe_fix_gen_rpc/0,
     emqx_cluster/1,
     emqx_cluster/2,
     start_epmd/0,
@@ -616,6 +617,15 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
     listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}]
 }.
 
+-spec maybe_fix_gen_rpc() -> ok.
+maybe_fix_gen_rpc() ->
+    %% When many tests run in an obscure order, it may occur that
+    %% `gen_rpc` started with its default settings before `emqx_conf`.
+    %% `gen_rpc` and `emqx_conf` have different default `port_discovery` modes,
+    %% so we reinitialize `gen_rpc` explicitly.
+    ok = application:stop(gen_rpc),
+    ok = application:start(gen_rpc).
+
 -spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}].
 emqx_cluster(Specs) ->
     emqx_cluster(Specs, #{}).

+ 90 - 10
apps/emqx_ft/src/emqx_ft.erl

@@ -31,6 +31,10 @@
     on_message_puback/4
 ]).
 
+-export([
+    decode_filemeta/1
+]).
+
 -export([on_assemble_timeout/1]).
 
 -export_type([
@@ -86,6 +90,27 @@ unhook() ->
     ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
     ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}).
 
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+decode_filemeta(Payload) when is_binary(Payload) ->
+    case emqx_json:safe_decode(Payload, [return_maps]) of
+        {ok, Map} ->
+            decode_filemeta(Map);
+        {error, Error} ->
+            {error, {invalid_filemeta_json, Error}}
+    end;
+decode_filemeta(Map) when is_map(Map) ->
+    Schema = emqx_ft_schema:schema(filemeta),
+    try
+        Meta = hocon_tconf:check_plain(Schema, Map, #{atom_key => true, required => false}),
+        {ok, Meta}
+    catch
+        throw:Error ->
+            {error, {invalid_filemeta, Error}}
+    end.
+
 %%--------------------------------------------------------------------
 %% Hooks
 %%--------------------------------------------------------------------
@@ -113,6 +138,8 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
 %% Handlers for transfer messages
 %%--------------------------------------------------------------------
 
+%% TODO Move to emqx_ft_mqtt?
+
 on_file_command(PacketId, Msg, FileCommand) ->
     case string:split(FileCommand, <<"/">>, all) of
         [FileId, <<"init">>] ->
@@ -123,10 +150,14 @@ on_file_command(PacketId, Msg, FileCommand) ->
             on_fin(PacketId, Msg, FileId, Checksum);
         [FileId, <<"abort">>] ->
             on_abort(Msg, FileId);
-        [FileId, Offset] ->
-            on_segment(Msg, FileId, Offset, undefined);
-        [FileId, Offset, Checksum] ->
-            on_segment(Msg, FileId, Offset, Checksum);
+        [FileId, OffsetBin] ->
+            validate([{offset, OffsetBin}], fun([Offset]) ->
+                on_segment(Msg, FileId, Offset, undefined)
+            end);
+        [FileId, OffsetBin, ChecksumBin] ->
+            validate([{offset, OffsetBin}, {checksum, ChecksumBin}], fun([Offset, Checksum]) ->
+                on_segment(Msg, FileId, Offset, Checksum)
+            end);
         _ ->
             ?RC_UNSPECIFIED_ERROR
     end.
@@ -139,11 +170,21 @@ on_init(Msg, FileId) ->
     }),
     Payload = Msg#message.payload,
     % %% Add validations here
-    Meta = emqx_json:decode(Payload, [return_maps]),
-    case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of
-        ok ->
-            ?RC_SUCCESS;
-        {error, _Reason} ->
+    case decode_filemeta(Payload) of
+        {ok, Meta} ->
+            case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of
+                ok ->
+                    ?RC_SUCCESS;
+                {error, _Reason} ->
+                    ?RC_UNSPECIFIED_ERROR
+            end;
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "on_init: invalid filemeta",
+                mqtt_msg => Msg,
+                file_id => FileId,
+                reason => Reason
+            }),
             ?RC_UNSPECIFIED_ERROR
     end.
 
@@ -161,7 +202,7 @@ on_segment(Msg, FileId, Offset, Checksum) ->
     }),
     %% TODO: handle checksum
     Payload = Msg#message.payload,
-    Segment = {binary_to_integer(Offset), Payload},
+    Segment = {Offset, Payload},
     %% Add offset/checksum validations
     case emqx_ft_storage:store_segment(transfer(Msg, FileId), Segment) of
         ok ->
@@ -247,3 +288,42 @@ transfer(Msg, FileId) ->
 on_assemble_timeout({ChanPid, PacketId}) ->
     ?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}),
     erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}).
+
+validate(Validations, Fun) ->
+    case do_validate(Validations, []) of
+        {ok, Parsed} ->
+            Fun(Parsed);
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "validate: invalid $file command",
+                reason => Reason
+            }),
+            ?RC_UNSPECIFIED_ERROR
+    end.
+
+do_validate([], Parsed) ->
+    {ok, lists:reverse(Parsed)};
+do_validate([{offset, Offset} | Rest], Parsed) ->
+    case string:to_integer(Offset) of
+        {Int, <<>>} ->
+            do_validate(Rest, [Int | Parsed]);
+        _ ->
+            {error, {invalid_offset, Offset}}
+    end;
+do_validate([{checksum, Checksum} | Rest], Parsed) ->
+    case parse_checksum(Checksum) of
+        {ok, Bin} ->
+            do_validate(Rest, [Bin | Parsed]);
+        {error, _Reason} ->
+            {error, {invalid_checksum, Checksum}}
+    end.
+
+parse_checksum(Checksum) when is_binary(Checksum) andalso byte_size(Checksum) =:= 64 ->
+    try
+        {ok, binary:decode_hex(Checksum)}
+    catch
+        error:badarg ->
+            {error, invalid_checksum}
+    end;
+parse_checksum(_Checksum) ->
+    {error, invalid_checksum}.

+ 8 - 4
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -387,10 +387,14 @@ encode_filemeta(Meta) ->
     Term = hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}),
     emqx_json:encode(?PRELUDE(_Vsn = 1, Term)).
 
-decode_filemeta(Binary) ->
-    Schema = emqx_ft_schema:schema(filemeta),
-    ?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]),
-    hocon_tconf:check_plain(Schema, Term, #{atom_key => true, required => false}).
+decode_filemeta(Binary) when is_binary(Binary) ->
+    ?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]),
+    case emqx_ft:decode_filemeta(Map) of
+        {ok, Meta} ->
+            Meta;
+        {error, Reason} ->
+            error(Reason)
+    end.
 
 mk_segment_filename({Offset, Content}) ->
     lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]).

+ 1 - 1
apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 1 - 1
apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 284 - 0
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -0,0 +1,284 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+-define(assertRCName(RCName, PublishRes),
+    ?assertMatch(
+        {ok, #{reason_code_name := RCName}},
+        PublishRes
+    )
+).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun set_special_configs/1),
+    ok = emqx_common_test_helpers:maybe_fix_gen_rpc(),
+    Config.
+
+end_per_suite(_Config) ->
+    ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
+    ok.
+
+set_special_configs(emqx_ft) ->
+    {ok, _} = emqx:update_config([file_transfer, storage], #{<<"type">> => <<"local">>}),
+    ok;
+set_special_configs(_App) ->
+    ok.
+
+init_per_testcase(_Case, Config) ->
+    _ = file:del_dir_r(filename:join(emqx:data_dir(), "file_transfer")),
+    ClientId = <<"client">>,
+    {ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]),
+    {ok, _} = emqtt:connect(C),
+    [{client, C}, {clientid, ClientId} | Config].
+
+end_per_testcase(_Case, Config) ->
+    C = ?config(client, Config),
+    ok = emqtt:stop(C),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Tests
+%%--------------------------------------------------------------------
+
+t_invalid_topic_format(Config) ->
+    C = ?config(client, Config),
+
+    %% TODO: more invalid topics
+
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, <<"$file/XYZ">>, <<>>, 1)
+    ),
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, <<"$file/X/Y/Z">>, <<>>, 1)
+    ).
+
+t_simple_transfer(Config) ->
+    C = ?config(client, Config),
+
+    Filename = <<"topsecret.pdf">>,
+    FileId = <<"f1">>,
+
+    Data = [<<"first">>, <<"second">>, <<"third">>],
+
+    Meta = meta(Filename, Data),
+    MetaPayload = emqx_json:encode(Meta),
+
+    MetaTopic = <<"$file/", FileId/binary, "/init">>,
+    ?assertRCName(
+        success,
+        emqtt:publish(C, MetaTopic, MetaPayload, 1)
+    ),
+
+    lists:foreach(
+        fun({Chunk, Offset}) ->
+            OffsetBin = integer_to_binary(Offset),
+            SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>,
+            ?assertRCName(
+                success,
+                emqtt:publish(C, SegmentTopic, Chunk, 1)
+            )
+        end,
+        with_offsets(Data)
+    ),
+
+    FinTopic = <<"$file/", FileId/binary, "/fin">>,
+    ?assertRCName(
+        success,
+        emqtt:publish(C, FinTopic, <<>>, 1)
+    ),
+
+    ReadyTransferId = #{
+        <<"fileid">> => FileId,
+        <<"clientid">> => ?config(clientid, Config),
+        <<"node">> => atom_to_binary(node(), utf8)
+    },
+
+    {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId),
+
+    ?assertEqual(
+        iolist_to_binary(Data),
+        iolist_to_binary(qlc:eval(TableQH))
+    ).
+
+t_meta_conflict(Config) ->
+    C = ?config(client, Config),
+
+    Filename = <<"topsecret.pdf">>,
+    FileId = <<"f1">>,
+
+    Meta = meta(Filename, [<<"x">>]),
+    MetaPayload = emqx_json:encode(Meta),
+
+    MetaTopic = <<"$file/", FileId/binary, "/init">>,
+    ?assertRCName(
+        success,
+        emqtt:publish(C, MetaTopic, MetaPayload, 1)
+    ),
+
+    ConflictMeta = Meta#{name => <<"conflict.pdf">>},
+    ConflictMetaPayload = emqx_json:encode(ConflictMeta),
+
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, MetaTopic, ConflictMetaPayload, 1)
+    ).
+
+t_no_meta(Config) ->
+    C = ?config(client, Config),
+
+    FileId = <<"f1">>,
+    Data = <<"first">>,
+
+    SegmentTopic = <<"$file/", FileId/binary, "/0">>,
+    ?assertRCName(
+        success,
+        emqtt:publish(C, SegmentTopic, Data, 1)
+    ),
+
+    FinTopic = <<"$file/", FileId/binary, "/fin">>,
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, FinTopic, <<>>, 1)
+    ).
+
+t_no_segment(Config) ->
+    C = ?config(client, Config),
+
+    Filename = <<"topsecret.pdf">>,
+    FileId = <<"f1">>,
+
+    Data = [<<"first">>, <<"second">>, <<"third">>],
+
+    Meta = meta(Filename, Data),
+    MetaPayload = emqx_json:encode(Meta),
+
+    MetaTopic = <<"$file/", FileId/binary, "/init">>,
+    ?assertRCName(
+        success,
+        emqtt:publish(C, MetaTopic, MetaPayload, 1)
+    ),
+
+    lists:foreach(
+        fun({Chunk, Offset}) ->
+            OffsetBin = integer_to_binary(Offset),
+            SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>,
+            ?assertRCName(
+                success,
+                emqtt:publish(C, SegmentTopic, Chunk, 1)
+            )
+        end,
+        %% Skip the first segment
+        tl(with_offsets(Data))
+    ),
+
+    FinTopic = <<"$file/", FileId/binary, "/fin">>,
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, FinTopic, <<>>, 1)
+    ).
+
+t_invalid_meta(Config) ->
+    C = ?config(client, Config),
+
+    FileId = <<"f1">>,
+
+    MetaTopic = <<"$file/", FileId/binary, "/init">>,
+
+    %% Invalid schema
+    Meta = #{foo => <<"bar">>},
+    MetaPayload = emqx_json:encode(Meta),
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, MetaTopic, MetaPayload, 1)
+    ),
+
+    %% Invalid JSON
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, MetaTopic, <<"{oops;">>, 1)
+    ).
+
+t_invalid_checksum(Config) ->
+    C = ?config(client, Config),
+
+    Filename = <<"topsecret.pdf">>,
+    FileId = <<"f1">>,
+
+    Data = [<<"first">>, <<"second">>, <<"third">>],
+
+    Meta = meta(Filename, Data),
+    MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}),
+
+    MetaTopic = <<"$file/", FileId/binary, "/init">>,
+    ?assertRCName(
+        success,
+        emqtt:publish(C, MetaTopic, MetaPayload, 1)
+    ),
+
+    lists:foreach(
+        fun({Chunk, Offset}) ->
+            OffsetBin = integer_to_binary(Offset),
+            SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>,
+            ?assertRCName(
+                success,
+                emqtt:publish(C, SegmentTopic, Chunk, 1)
+            )
+        end,
+        with_offsets(Data)
+    ),
+
+    FinTopic = <<"$file/", FileId/binary, "/fin">>,
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, FinTopic, <<>>, 1)
+    ).
+
+%%--------------------------------------------------------------------
+%% Helpers
+%%--------------------------------------------------------------------
+
+with_offsets(Items) ->
+    {List, _} = lists:mapfoldl(
+        fun(Item, Offset) ->
+            {{Item, Offset}, Offset + byte_size(Item)}
+        end,
+        0,
+        Items
+    ),
+    List.
+
+sha256hex(Data) ->
+    binary:encode_hex(crypto:hash(sha256, Data)).
+
+meta(FileName, Data) ->
+    FullData = iolist_to_binary(Data),
+    #{
+        name => FileName,
+        checksum => sha256hex(FullData),
+        expire_at => erlang:system_time(_Unit = second) + 3600,
+        size => byte_size(FullData)
+    }.

+ 1 - 1
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 1 - 1
apps/emqx_ft/test/emqx_ft_content_gen.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 1 - 1
apps/emqx_ft/test/emqx_ft_responder_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 1 - 1
apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.