|
|
@@ -10,6 +10,7 @@
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
-include_lib("snabbkaffe/include/test_macros.hrl").
|
|
|
+-include_lib("emqx_utils/include/emqx_message.hrl").
|
|
|
|
|
|
-import(emqx_utils_conv, [bin/1]).
|
|
|
|
|
|
@@ -351,58 +352,60 @@ t_aggreg_upload_restart(Config) ->
|
|
|
erl_csv:decode(Content)
|
|
|
).
|
|
|
|
|
|
+%% NOTE
|
|
|
+%% This test verifies that the bridge can recover from a buffer file corruption,
|
|
|
+%% and does so while preserving uncompromised data.
|
|
|
t_aggreg_upload_restart_corrupted(Config) ->
|
|
|
- %% NOTE
|
|
|
- %% This test verifies that the bridge can recover from a buffer file corruption,
|
|
|
- %% and does so while preserving uncompromised data.
|
|
|
Bucket = ?config(s3_bucket, Config),
|
|
|
BridgeName = ?config(bridge_name, Config),
|
|
|
- AggregId = aggreg_id(BridgeName),
|
|
|
BatchSize = ?CONF_MAX_RECORDS div 2,
|
|
|
- %% Create a bridge with the sample configuration.
|
|
|
- ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
|
|
- %% Send some sample messages that look like Rule SQL productions.
|
|
|
- Messages1 = [
|
|
|
- {integer_to_binary(N), <<"a/b/c">>, <<"{\"hello\":\"world\"}">>}
|
|
|
- || N <- lists:seq(1, BatchSize)
|
|
|
- ],
|
|
|
- %% Ensure that they span multiple batch queries.
|
|
|
- ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
|
|
|
- {ok, _} = ?block_until(
|
|
|
- #{?snk_kind := connector_aggreg_records_written, action := AggregId},
|
|
|
- infinity,
|
|
|
- 0
|
|
|
- ),
|
|
|
- %% Find out the buffer file.
|
|
|
- {ok, #{filename := Filename}} = ?block_until(
|
|
|
- #{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId}
|
|
|
- ),
|
|
|
- %% Stop the bridge, corrupt the buffer file, and restart the bridge.
|
|
|
- {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
|
|
|
- BufferFileSize = filelib:file_size(Filename),
|
|
|
- ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, BufferFileSize div 2),
|
|
|
- {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
|
|
|
- %% Send some more messages.
|
|
|
- Messages2 = [
|
|
|
- {integer_to_binary(N), <<"c/d/e">>, <<"{\"hello\":\"world\"}">>}
|
|
|
- || N <- lists:seq(1, BatchSize)
|
|
|
- ],
|
|
|
- ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
|
|
|
- %% Wait until the delivery is completed.
|
|
|
- {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
|
|
|
- %% Check that upload contains part of the first batch and all of the second batch.
|
|
|
- _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
|
|
- CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
|
|
|
- NRows = length(Rows),
|
|
|
- ?assert(
|
|
|
- NRows > BatchSize,
|
|
|
- CSV
|
|
|
- ),
|
|
|
- ?assertEqual(
|
|
|
- lists:sublist(Messages1, NRows - BatchSize) ++ Messages2,
|
|
|
- [{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows],
|
|
|
- CSV
|
|
|
- ).
|
|
|
+ Opts = #{
|
|
|
+ aggreg_id => aggreg_id(BridgeName),
|
|
|
+ batch_size => BatchSize,
|
|
|
+ rule_sql => <<
|
|
|
+ "SELECT"
|
|
|
+ " *,"
|
|
|
+ " strlen(payload) as psize,"
|
|
|
+ " unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
|
|
|
+ " FROM 's3/#'"
|
|
|
+ >>,
|
|
|
+ make_message_fn => fun(N) ->
|
|
|
+ mk_message(
|
|
|
+ {integer_to_binary(N), <<"s3/a/b/c">>, <<"{\"hello\":\"world\"}">>}
|
|
|
+ )
|
|
|
+ end,
|
|
|
+ message_check_fn => fun(Context) ->
|
|
|
+ #{
|
|
|
+ messages_before := Messages1,
|
|
|
+ messages_after := Messages2
|
|
|
+ } = Context,
|
|
|
+
|
|
|
+ _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
|
|
+ CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
|
|
|
+ NRows = length(Rows),
|
|
|
+ ?assert(
|
|
|
+ NRows > BatchSize,
|
|
|
+ CSV
|
|
|
+ ),
|
|
|
+ Expected = [
|
|
|
+ {ClientId, Topic, Payload}
|
|
|
+ || #message{
|
|
|
+ from = ClientId,
|
|
|
+ topic = Topic,
|
|
|
+ payload = Payload
|
|
|
+ } <- lists:sublist(Messages1, NRows - BatchSize) ++ Messages2
|
|
|
+ ],
|
|
|
+ ?assertEqual(
|
|
|
+ Expected,
|
|
|
+ [{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows],
|
|
|
+ CSV
|
|
|
+ ),
|
|
|
+
|
|
|
+ ok
|
|
|
+ end
|
|
|
+ },
|
|
|
+ emqx_bridge_v2_testlib:t_aggreg_upload_restart_corrupted(Config, Opts),
|
|
|
+ ok.
|
|
|
|
|
|
t_aggreg_pending_upload_restart(Config) ->
|
|
|
%% NOTE
|
|
|
@@ -511,6 +514,9 @@ receive_sender_reports([]) ->
|
|
|
|
|
|
%%
|
|
|
|
|
|
+mk_message({ClientId, Topic, Payload}) ->
|
|
|
+ emqx_message:make(bin(ClientId), bin(Topic), Payload).
|
|
|
+
|
|
|
mk_message_event({ClientID, Topic, Payload}) ->
|
|
|
emqx_bridge_s3_test_helpers:mk_message_event(ClientID, Topic, Payload).
|
|
|
|