|
@@ -29,7 +29,7 @@
|
|
|
callback_module :: module(),
|
|
callback_module :: module(),
|
|
|
container :: emqx_connector_aggreg_csv:container(),
|
|
container :: emqx_connector_aggreg_csv:container(),
|
|
|
reader :: emqx_connector_aggreg_buffer:reader(),
|
|
reader :: emqx_connector_aggreg_buffer:reader(),
|
|
|
- upload :: impl_specific_upload_state(),
|
|
|
|
|
|
|
+ transfer :: transfer_state(),
|
|
|
empty :: boolean()
|
|
empty :: boolean()
|
|
|
}).
|
|
}).
|
|
|
|
|
|
|
@@ -40,16 +40,15 @@
|
|
|
any() => term()
|
|
any() => term()
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
|
--type impl_specific_upload_state() :: term().
|
|
|
|
|
|
|
+-type transfer_state() :: term().
|
|
|
|
|
|
|
|
--callback init_upload_state(buffer(), map()) -> impl_specific_upload_state().
|
|
|
|
|
|
|
+-callback init_transfer_state(buffer(), map()) -> transfer_state().
|
|
|
|
|
|
|
|
--callback process_append(iodata(), impl_specific_upload_state()) ->
|
|
|
|
|
- {ok, impl_specific_upload_state()}.
|
|
|
|
|
|
|
+-callback process_append(iodata(), transfer_state()) -> {ok, transfer_state()}.
|
|
|
|
|
|
|
|
--callback process_write(impl_specific_upload_state()) -> impl_specific_upload_state().
|
|
|
|
|
|
|
+-callback process_write(transfer_state()) -> transfer_state().
|
|
|
|
|
|
|
|
--callback process_complete(impl_specific_upload_state()) -> {ok, term()}.
|
|
|
|
|
|
|
+-callback process_complete(transfer_state()) -> {ok, term()}.
|
|
|
|
|
|
|
|
%%
|
|
%%
|
|
|
|
|
|
|
@@ -81,7 +80,7 @@ init_delivery(
|
|
|
callback_module = Mod,
|
|
callback_module = Mod,
|
|
|
container = mk_container(ContainerOpts),
|
|
container = mk_container(ContainerOpts),
|
|
|
reader = Reader,
|
|
reader = Reader,
|
|
|
- upload = Mod:init_upload_state(Buffer, Opts),
|
|
|
|
|
|
|
+ transfer = Mod:init_transfer_state(Buffer, Opts),
|
|
|
empty = true
|
|
empty = true
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
@@ -131,31 +130,31 @@ process_append_records(
|
|
|
Delivery = #delivery{
|
|
Delivery = #delivery{
|
|
|
callback_module = Mod,
|
|
callback_module = Mod,
|
|
|
container = Container0,
|
|
container = Container0,
|
|
|
- upload = Upload0
|
|
|
|
|
|
|
+ transfer = Transfer0
|
|
|
}
|
|
}
|
|
|
) ->
|
|
) ->
|
|
|
{Writes, Container} = emqx_connector_aggreg_csv:fill(Records, Container0),
|
|
{Writes, Container} = emqx_connector_aggreg_csv:fill(Records, Container0),
|
|
|
- {ok, Upload} = Mod:process_append(Writes, Upload0),
|
|
|
|
|
|
|
+ {ok, Transfer} = Mod:process_append(Writes, Transfer0),
|
|
|
Delivery#delivery{
|
|
Delivery#delivery{
|
|
|
container = Container,
|
|
container = Container,
|
|
|
- upload = Upload,
|
|
|
|
|
|
|
+ transfer = Transfer,
|
|
|
empty = false
|
|
empty = false
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
|
-process_write(Delivery = #delivery{callback_module = Mod, upload = Upload0}) ->
|
|
|
|
|
- Upload = Mod:process_write(Upload0),
|
|
|
|
|
- Delivery#delivery{upload = Upload}.
|
|
|
|
|
|
|
+process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0}) ->
|
|
|
|
|
+ Transfer = Mod:process_write(Transfer0),
|
|
|
|
|
+ Delivery#delivery{transfer = Transfer}.
|
|
|
|
|
|
|
|
process_complete(#delivery{name = Name, empty = true}) ->
|
|
process_complete(#delivery{name = Name, empty = true}) ->
|
|
|
- ?tp(connector_aggreg_delivery_completed, #{action => Name, upload => empty}),
|
|
|
|
|
|
|
+ ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}),
|
|
|
exit({shutdown, {skipped, empty}});
|
|
exit({shutdown, {skipped, empty}});
|
|
|
process_complete(#delivery{
|
|
process_complete(#delivery{
|
|
|
- name = Name, callback_module = Mod, container = Container, upload = Upload0
|
|
|
|
|
|
|
+ name = Name, callback_module = Mod, container = Container, transfer = Transfer0
|
|
|
}) ->
|
|
}) ->
|
|
|
Trailer = emqx_connector_aggreg_csv:close(Container),
|
|
Trailer = emqx_connector_aggreg_csv:close(Container),
|
|
|
- {ok, Upload} = Mod:process_append(Trailer, Upload0),
|
|
|
|
|
- {ok, Completed} = Mod:process_complete(Upload),
|
|
|
|
|
- ?tp(connector_aggreg_delivery_completed, #{action => Name, upload => Completed}),
|
|
|
|
|
|
|
+ {ok, Transfer} = Mod:process_append(Trailer, Transfer0),
|
|
|
|
|
+ {ok, Completed} = Mod:process_complete(Transfer),
|
|
|
|
|
+ ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}),
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
%%
|
|
%%
|
|
@@ -172,12 +171,12 @@ system_continue(Parent, Debug, Delivery) ->
|
|
|
loop(Delivery, Parent, Debug).
|
|
loop(Delivery, Parent, Debug).
|
|
|
|
|
|
|
|
-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _.
|
|
-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _.
|
|
|
-system_terminate(_Reason, _Parent, _Debug, #delivery{callback_module = Mod, upload = Upload}) ->
|
|
|
|
|
- Mod:process_terminate(Upload).
|
|
|
|
|
|
|
+system_terminate(_Reason, _Parent, _Debug, #delivery{callback_module = Mod, transfer = Transfer}) ->
|
|
|
|
|
+ Mod:process_terminate(Transfer).
|
|
|
|
|
|
|
|
-spec format_status(normal, Args :: [term()]) -> _StateFormatted.
|
|
-spec format_status(normal, Args :: [term()]) -> _StateFormatted.
|
|
|
format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) ->
|
|
format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) ->
|
|
|
#delivery{callback_module = Mod} = Delivery,
|
|
#delivery{callback_module = Mod} = Delivery,
|
|
|
Delivery#delivery{
|
|
Delivery#delivery{
|
|
|
- upload = Mod:process_format_status(Delivery#delivery.upload)
|
|
|
|
|
|
|
+ transfer = Mod:process_format_status(Delivery#delivery.transfer)
|
|
|
}.
|
|
}.
|