Explorar el Código

Merge pull request #10901 from fix/EMQX-9985/ft-fin-checksum

fix(ft): respect checksum in `fin` packets
Andrew Mayorov hace 2 años
padre
commit
76e5243211

+ 2 - 2
apps/emqx/src/emqx_maybe.erl

@@ -45,8 +45,8 @@ define(Term, _) ->
     Term.
 
 %% @doc Apply a function to a maybe argument.
--spec apply(fun((A) -> maybe(A)), maybe(A)) ->
-    maybe(A).
+-spec apply(fun((A) -> B), maybe(A)) ->
+    maybe(B).
 apply(_Fun, undefined) ->
     undefined;
 apply(Fun, Term) when is_function(Fun) ->

+ 17 - 11
apps/emqx_ft/src/emqx_ft.erl

@@ -45,7 +45,8 @@
     offset/0,
     filemeta/0,
     segment/0,
-    checksum/0
+    checksum/0,
+    finopts/0
 ]).
 
 %% Number of bytes
@@ -80,6 +81,10 @@
 
 -type segment() :: {offset(), _Content :: binary()}.
 
+-type finopts() :: #{
+    checksum => checksum()
+}.
+
 %%--------------------------------------------------------------------
 %% API for app
 %%--------------------------------------------------------------------
@@ -170,8 +175,8 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
             ChecksumBin = emqx_maybe:from_list(MaybeChecksum),
             validate(
                 [{size, FinalSizeBin}, {{maybe, checksum}, ChecksumBin}],
-                fun([FinalSize, Checksum]) ->
-                    on_fin(PacketId, Msg, Transfer, FinalSize, Checksum)
+                fun([FinalSize, FinalChecksum]) ->
+                    on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum)
                 end
             );
         [<<"abort">>] ->
@@ -251,13 +256,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
         end
     end).
 
-on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
+on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
     ?tp(info, "file_transfer_fin", #{
         mqtt_msg => Msg,
         packet_id => PacketId,
         transfer => Transfer,
         final_size => FinalSize,
-        checksum => Checksum
+        checksum => FinalChecksum
     }),
     %% TODO: handle checksum? Do we need it?
     FinPacketKey = {self(), PacketId},
@@ -265,7 +270,7 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
         ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
     end,
     with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
-        case assemble(Transfer, FinalSize) of
+        case assemble(Transfer, FinalSize, FinalChecksum) of
             %% Assembling completed, ack through the responder right away
             ok ->
                 emqx_ft_responder:ack(FinPacketKey, ok);
@@ -314,9 +319,10 @@ store_segment(Transfer, Segment) ->
             {error, {internal_error, E}}
     end.
 
-assemble(Transfer, FinalSize) ->
+assemble(Transfer, FinalSize, FinalChecksum) ->
     try
-        emqx_ft_storage:assemble(Transfer, FinalSize)
+        FinOpts = [{checksum, FinalChecksum} || FinalChecksum /= undefined],
+        emqx_ft_storage:assemble(Transfer, FinalSize, maps:from_list(FinOpts))
     catch
         C:E:S ->
             ?tp(error, "start_assemble_failed", #{
@@ -397,8 +403,8 @@ do_validate([{checksum, Checksum} | Rest], Parsed) ->
         {error, _Reason} ->
             {error, {invalid_checksum, Checksum}}
     end;
-do_validate([{integrity, Payload, Checksum} | Rest], Parsed) ->
-    case crypto:hash(sha256, Payload) of
+do_validate([{integrity, Payload, {Algo, Checksum}} | Rest], Parsed) ->
+    case crypto:hash(Algo, Payload) of
         Checksum ->
             do_validate(Rest, [Payload | Parsed]);
         Mismatch ->
@@ -411,7 +417,7 @@ do_validate([{{maybe, T}, Value} | Rest], Parsed) ->
 
 parse_checksum(Checksum) when is_binary(Checksum) andalso byte_size(Checksum) =:= 64 ->
     try
-        {ok, binary:decode_hex(Checksum)}
+        {ok, {sha256, binary:decode_hex(Checksum)}}
     catch
         error:badarg ->
             {error, invalid_checksum}

+ 8 - 6
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -16,7 +16,7 @@
 
 -module(emqx_ft_assembler).
 
--export([start_link/3]).
+-export([start_link/4]).
 
 -behaviour(gen_statem).
 -export([callback_mode/0]).
@@ -29,6 +29,7 @@
 -type stdata() :: #{
     storage := emqx_ft_storage_fs:storage(),
     transfer := emqx_ft:transfer(),
+    finopts := emqx_ft:finopts(),
     assembly := emqx_ft_assembly:t(),
     export => emqx_ft_storage_exporter:export()
 }.
@@ -38,8 +39,8 @@
 
 %%
 
-start_link(Storage, Transfer, Size) ->
-    gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []).
+start_link(Storage, Transfer, Size, Opts) ->
+    gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size, Opts}, []).
 
 where(Transfer) ->
     gproc:where(?NAME(Transfer)).
@@ -60,11 +61,12 @@ callback_mode() ->
     handle_event_function.
 
 -spec init(_Args) -> {ok, state(), stdata()}.
-init({Storage, Transfer, Size}) ->
+init({Storage, Transfer, Size, Opts}) ->
     _ = erlang:process_flag(trap_exit, true),
     St = #{
         storage => Storage,
         transfer => Transfer,
+        finopts => Opts,
         assembly => emqx_ft_assembly:new(Size)
     },
     {ok, idle, St}.
@@ -164,8 +166,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export :=
     end;
 handle_event(internal, _, {assemble, []}, St = #{}) ->
     {next_state, complete, St, ?internal([])};
-handle_event(internal, _, complete, St = #{export := Export}) ->
-    Result = emqx_ft_storage_exporter:complete(Export),
+handle_event(internal, _, complete, St = #{export := Export, finopts := Opts}) ->
+    Result = emqx_ft_storage_exporter:complete(Export, Opts),
     _ = maybe_garbage_collect(Result, St),
     {stop, {shutdown, Result}, maps:remove(export, St)}.
 

+ 3 - 3
apps/emqx_ft/src/emqx_ft_assembler_sup.erl

@@ -17,7 +17,7 @@
 -module(emqx_ft_assembler_sup).
 
 -export([start_link/0]).
--export([ensure_child/3]).
+-export([ensure_child/4]).
 
 -behaviour(supervisor).
 -export([init/1]).
@@ -25,10 +25,10 @@
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
-ensure_child(Storage, Transfer, Size) ->
+ensure_child(Storage, Transfer, Size, Opts) ->
     Childspec = #{
         id => Transfer,
-        start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size]},
+        start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size, Opts]},
         restart => temporary
     },
     case supervisor:start_child(?MODULE, Childspec) of

+ 5 - 5
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -20,7 +20,7 @@
     [
         store_filemeta/2,
         store_segment/2,
-        assemble/2,
+        assemble/3,
 
         files/0,
         files/1,
@@ -88,7 +88,7 @@
     ok | {async, pid()} | {error, term()}.
 -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
     ok | {async, pid()} | {error, term()}.
--callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) ->
+-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes(), emqx_ft:finopts()) ->
     ok | {async, pid()} | {error, term()}.
 
 -callback files(storage(), query(Cursor)) ->
@@ -114,10 +114,10 @@ store_filemeta(Transfer, FileMeta) ->
 store_segment(Transfer, Segment) ->
     dispatch(store_segment, [Transfer, Segment]).
 
--spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
+-spec assemble(emqx_ft:transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
     ok | {async, pid()} | {error, term()}.
-assemble(Transfer, Size) ->
-    dispatch(assemble, [Transfer, Size]).
+assemble(Transfer, Size, FinOpts) ->
+    dispatch(assemble, [Transfer, Size, FinOpts]).
 
 -spec files() ->
     {ok, page(file_info(), _)} | {error, term()}.

+ 15 - 8
apps/emqx_ft/src/emqx_ft_storage_exporter.erl

@@ -24,7 +24,7 @@
 %% Export API
 -export([start_export/3]).
 -export([write/2]).
--export([complete/1]).
+-export([complete/2]).
 -export([discard/1]).
 
 %% Listing API
@@ -117,12 +117,19 @@ write(#{mod := ExporterMod, st := ExportSt, hash := Hash} = Export, Content) ->
             Error
     end.
 
--spec complete(export()) ->
+-spec complete(export(), emqx_ft:finopts()) ->
     ok | {error, _Reason}.
-complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}) ->
-    case verify_checksum(Hash, Filemeta) of
-        {ok, Checksum} ->
-            ExporterMod:complete(ExportSt, Checksum);
+complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}, Opts) ->
+    Checksum = emqx_maybe:define(
+        % NOTE
+        % Checksum in `Opts` takes precedence over one in `Filemeta` according to the spec.
+        % We do not care if they differ.
+        maps:get(checksum, Opts, undefined),
+        maps:get(checksum, Filemeta, undefined)
+    ),
+    case verify_checksum(Hash, Checksum) of
+        {ok, ExportChecksum} ->
+            ExporterMod:complete(ExportSt, ExportChecksum);
         {error, _} = Error ->
             _ = ExporterMod:discard(ExportSt),
             Error
@@ -183,13 +190,13 @@ init_checksum(#{}) ->
 update_checksum(Ctx, IoData) ->
     crypto:hash_update(Ctx, IoData).
 
-verify_checksum(Ctx, #{checksum := {Algo, Digest} = Checksum}) ->
+verify_checksum(Ctx, {Algo, Digest} = Checksum) ->
     case crypto:hash_final(Ctx) of
         Digest ->
             {ok, Checksum};
         Mismatch ->
             {error, {checksum, Algo, binary:encode_hex(Mismatch)}}
     end;
-verify_checksum(Ctx, #{}) ->
+verify_checksum(Ctx, undefined) ->
     Digest = crypto:hash_final(Ctx),
     {ok, {sha256, Digest}}.

+ 6 - 6
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -36,7 +36,7 @@
 -export([list/3]).
 -export([pread/5]).
 -export([lookup_local_assembler/1]).
--export([assemble/3]).
+-export([assemble/4]).
 
 -export([transfers/1]).
 
@@ -211,14 +211,14 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
             {error, Reason}
     end.
 
--spec assemble(storage(), transfer(), emqx_ft:bytes()) ->
+-spec assemble(storage(), transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
     {async, _Assembler :: pid()} | ok | {error, _TODO}.
-assemble(Storage, Transfer, Size) ->
+assemble(Storage, Transfer, Size, Opts) ->
     LookupSources = [
         fun() -> lookup_local_assembler(Transfer) end,
         fun() -> lookup_remote_assembler(Transfer) end,
         fun() -> check_if_already_exported(Storage, Transfer) end,
-        fun() -> ensure_local_assembler(Storage, Transfer, Size) end
+        fun() -> ensure_local_assembler(Storage, Transfer, Size, Opts) end
     ],
     lookup_assembler(LookupSources).
 
@@ -295,8 +295,8 @@ lookup_remote_assembler(Transfer) ->
         _ -> {error, not_found}
     end.
 
-ensure_local_assembler(Storage, Transfer, Size) ->
-    {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size),
+ensure_local_assembler(Storage, Transfer, Size, Opts) ->
+    {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size, Opts),
     {async, Pid}.
 
 -spec transfers(storage()) ->

+ 15 - 2
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -159,6 +159,10 @@ t_invalid_topic_format(Config) ->
         unspecified_error,
         emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1)
     ),
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, <<"$file/fileid/fin/42/xyz">>, <<>>, 1)
+    ),
     ?assertRCName(
         unspecified_error,
         emqtt:publish(C, <<"$file/">>, <<>>, 1)
@@ -390,9 +394,18 @@ t_invalid_checksum(Config) ->
         with_offsets(Data)
     ),
 
+    % Send `fin` w/o checksum, should fail since filemeta checksum is invalid
+    FinTopic = mk_fin_topic(FileId, Filesize),
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
+        emqtt:publish(C, FinTopic, <<>>, 1)
+    ),
+
+    % Send `fin` with the correct checksum
+    Checksum = binary:encode_hex(sha256(Data)),
+    ?assertRCName(
+        success,
+        emqtt:publish(C, <<FinTopic/binary, "/", Checksum/binary>>, <<>>, 1)
     ).
 
 t_corrupted_segment_retry(Config) ->
@@ -507,7 +520,7 @@ t_assemble_crash(Config) ->
     C = ?config(client, Config),
 
     meck:new(emqx_ft_storage_fs),
-    meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end),
+    meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end),
 
     ?assertRCName(
         unspecified_error,

+ 1 - 1
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -178,7 +178,7 @@ complete_assemble(Storage, Transfer, Size) ->
     complete_assemble(Storage, Transfer, Size, 1000).
 
 complete_assemble(Storage, Transfer, Size, Timeout) ->
-    {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size),
+    {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}),
     MRef = erlang:monitor(process, Pid),
     Pid ! kickoff,
     receive

+ 1 - 1
apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl

@@ -381,7 +381,7 @@ complete_transfer(Storage, Transfer, Size) ->
     complete_transfer(Storage, Transfer, Size, 100).
 
 complete_transfer(Storage, Transfer, Size, Timeout) ->
-    case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of
+    case emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}) of
         ok ->
             ok;
         {async, Pid} ->