|
|
@@ -44,9 +44,9 @@
|
|
|
|
|
|
-callback init_transfer_state(buffer(), map()) -> transfer_state().
|
|
|
|
|
|
--callback process_append(iodata(), transfer_state()) -> {ok, transfer_state()}.
|
|
|
+-callback process_append(iodata(), transfer_state()) -> transfer_state().
|
|
|
|
|
|
--callback process_write(transfer_state()) -> transfer_state().
|
|
|
+-callback process_write(transfer_state()) -> {ok, transfer_state()} | {error, term()}.
|
|
|
|
|
|
-callback process_complete(transfer_state()) -> {ok, term()}.
|
|
|
|
|
|
@@ -134,7 +134,7 @@ process_append_records(
|
|
|
}
|
|
|
) ->
|
|
|
{Writes, Container} = emqx_connector_aggreg_csv:fill(Records, Container0),
|
|
|
- {ok, Transfer} = Mod:process_append(Writes, Transfer0),
|
|
|
+ Transfer = Mod:process_append(Writes, Transfer0),
|
|
|
Delivery#delivery{
|
|
|
container = Container,
|
|
|
transfer = Transfer,
|
|
|
@@ -142,8 +142,13 @@ process_append_records(
|
|
|
}.
|
|
|
|
|
|
process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0}) ->
|
|
|
- Transfer = Mod:process_write(Transfer0),
|
|
|
- Delivery#delivery{transfer = Transfer}.
|
|
|
+ case Mod:process_write(Transfer0) of
|
|
|
+ {ok, Transfer} ->
|
|
|
+ Delivery#delivery{transfer = Transfer};
|
|
|
+ {error, Reason} ->
|
|
|
+ %% Todo: handle more gracefully? Retry?
|
|
|
+ error({transfer_failed, Reason})
|
|
|
+ end.
|
|
|
|
|
|
process_complete(#delivery{name = Name, empty = true}) ->
|
|
|
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}),
|
|
|
@@ -152,7 +157,7 @@ process_complete(#delivery{
|
|
|
name = Name, callback_module = Mod, container = Container, transfer = Transfer0
|
|
|
}) ->
|
|
|
Trailer = emqx_connector_aggreg_csv:close(Container),
|
|
|
- {ok, Transfer} = Mod:process_append(Trailer, Transfer0),
|
|
|
+ Transfer = Mod:process_append(Trailer, Transfer0),
|
|
|
{ok, Completed} = Mod:process_complete(Transfer),
|
|
|
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}),
|
|
|
ok.
|