Parcourir la source

feat(ft): add assembler tests

Ilya Averyanov il y a 3 ans
Parent
commit
c44fe92ef1

+ 11 - 0
apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf

@@ -11,4 +11,15 @@ emqx_ft_schema {
         }
     }
 
+    local_storage_root {
+        desc {
+            en: "File system path to keep uploaded files and temporary data."
+            zh: "保存上传文件和临时数据的文件系统路径。"
+        }
+        label: {
+            en: "Local Storage Root"
+            zh: "本地存储根"
+        }
+    }
+
 }

+ 26 - 9
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_ft_assembler).
 
+-include_lib("emqx/include/logger.hrl").
+
 -export([start_link/3]).
 
 -behaviour(gen_statem).
@@ -73,7 +75,7 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
         complete ->
             {next_state, start_assembling, NSt, ?internal([])};
         {incomplete, _} ->
-            Nodes = ekka:nodelist() -- [node()],
+            Nodes = mria_mnesia:running_nodes() -- [node()],
             {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])}
         % TODO: recovery?
         % {error, _} = Reason ->
@@ -105,7 +107,7 @@ handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
             {next_state, start_assembling, NSt, ?internal([])};
         % TODO: retries / recovery?
         {incomplete, _} = Status ->
-            {stop, {error, Status}}
+            {next_state, {failure, {error, Status}}, NSt, ?internal([])}
     end;
 handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) ->
     Filemeta = emqx_ft_assembly:filemeta(Asm),
@@ -124,21 +126,23 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
                 {ok, NHandle} ->
                     {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])};
                 %% TODO: better error handling
-                {error, Error} ->
-                    error(Error)
+                {error, _} = Error ->
+                    {next_state, {failure, Error}, St, ?internal([])}
             end;
-        {error, Error} ->
+        {error, _} = Error ->
             %% TODO: better error handling
-            error(Error)
+            {next_state, {failure, Error}, St, ?internal([])}
     end;
 handle_event(internal, _, {assemble, []}, St = #st{}) ->
     {next_state, complete, St, ?internal([])};
 handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, callback = Callback}) ->
     Filemeta = emqx_ft_assembly:filemeta(Asm),
     Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
-    %% TODO: safe apply
-    _ = Callback(Result),
-    {stop, shutdown}.
+    _ = safe_apply(Callback, Result),
+    {stop, shutdown};
+handle_event(internal, _, {failure, Error}, #st{callback = Callback}) ->
+    _ = safe_apply(Callback, Error),
+    {stop, Error}.
 
 % handle_continue(list_local, St = #st{storage = Storage, transfer = Transfer, assembly = Asm}) ->
 %     % TODO: what we do with non-transients errors here (e.g. `eacces`)?
@@ -170,3 +174,16 @@ pread(Node, Segment, St) ->
 
 segsize(#{fragment := {segment, Info}}) ->
     maps:get(size, Info).
+
+safe_apply(Callback, Result) ->
+    try apply(Callback, [Result]) of
+        _ -> ok
+    catch
+        Class:Reason:Stacktrace ->
+            ?SLOG(error, #{
+                msg => "safe_apply_failed",
+                class => Class,
+                reason => Reason,
+                stacktrace => Stacktrace
+            })
+    end.

+ 1 - 1
apps/emqx_ft/src/emqx_ft_assembler_sup.erl

@@ -29,7 +29,7 @@ start_child(Storage, Transfer, Callback) ->
     Childspec = #{
         id => {Storage, Transfer},
         start => {emqx_ft_assembler, start_link, [Storage, Transfer, Callback]},
-        restart => transient
+        restart => temporary
     },
     supervisor:start_child(?MODULE, Childspec).
 

+ 5 - 0
apps/emqx_ft/src/emqx_ft_schema.erl

@@ -59,6 +59,11 @@ fields(local_storage) ->
             default => local,
             required => false,
             desc => ?DESC("local")
+        }},
+        {root, #{
+            type => binary(),
+            desc => ?DESC("local_storage_root"),
+            required => false
         }}
     ].
 

+ 43 - 0
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -28,6 +28,8 @@ all() ->
     [
         t_assemble_empty_transfer,
         t_assemble_complete_local_transfer,
+        t_assemble_incomplete_transfer,
+        t_assemble_no_meta,
 
         % NOTE
         % It depends on the side effects of all previous testcases.
@@ -155,6 +157,46 @@ t_assemble_complete_local_transfer(Config) ->
         AssemblyFilename
     ).
 
+t_assemble_incomplete_transfer(Config) ->
+    Storage = storage(Config),
+    Transfer = {?CLIENTID2, ?config(file_id, Config)},
+    Filename = "incomplete.pdf",
+    TransferSize = 10000 + rand:uniform(50000),
+    SegmentSize = 4096,
+    Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
+    Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
+    Meta = #{
+        name => Filename,
+        checksum => {sha256, Hash},
+        size => TransferSize,
+        expire_at => 42
+    },
+    ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
+    Self = self(),
+    {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun(Result) ->
+        Self ! {test_assembly_finished, Result}
+    end),
+    receive
+        {test_assembly_finished, Result} ->
+            ?assertMatch({error, _}, Result)
+    after 1000 ->
+        ct:fail("Assembler did not called callback")
+    end.
+
+t_assemble_no_meta(Config) ->
+    Storage = storage(Config),
+    Transfer = {?CLIENTID2, ?config(file_id, Config)},
+    Self = self(),
+    {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun(Result) ->
+        Self ! {test_assembly_finished, Result}
+    end),
+    receive
+        {test_assembly_finished, Result} ->
+            ?assertMatch({error, _}, Result)
+    after 1000 ->
+        ct:fail("Assembler did not called callback")
+    end.
+
 mk_assembly_filename(Config, {ClientID, FileID}, Filename) ->
     filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]).
 
@@ -205,5 +247,6 @@ mk_fileid() ->
 
 storage(Config) ->
     #{
+        type => local,
         root => ?config(storage_root, Config)
     }.