|
|
@@ -206,6 +206,7 @@ datalayers_connector_config(Name, DatalayersHost, DatalayersPort, Config) ->
|
|
|
" enable = true\n"
|
|
|
" server = \"~s:~b\"\n"
|
|
|
" parameters {\n"
|
|
|
+ " driver_type = influxdb_v1\n"
|
|
|
" database = mqtt\n"
|
|
|
" username = admin\n"
|
|
|
" password = public\n"
|
|
|
@@ -451,7 +452,7 @@ t_start_ok(Config) ->
|
|
|
ok
|
|
|
end,
|
|
|
fun(Trace0) ->
|
|
|
- Trace = ?of_kind(datalayers_connector_send_query, Trace0),
|
|
|
+ Trace = ?of_kind(influxdb_connector_send_query, Trace0),
|
|
|
?assertMatch([#{points := [_]}], Trace),
|
|
|
[#{points := [Point]}] = Trace,
|
|
|
ct:pal("sent point: ~p", [Point]),
|
|
|
@@ -474,7 +475,7 @@ t_start_ok(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_start_stop(Config) ->
|
|
|
- ok = emqx_bridge_v2_testlib:t_start_stop(Config, datalayers_client_stopped),
|
|
|
+ ok = emqx_bridge_v2_testlib:t_start_stop(Config, influxdb_client_stopped),
|
|
|
ok.
|
|
|
|
|
|
t_start_already_started(Config) ->
|
|
|
@@ -494,7 +495,7 @@ t_start_already_started(Config) ->
|
|
|
emqx_bridge_datalayers_connector:on_start(ConnectorId, ConnConfigMap),
|
|
|
fun(Result, Trace) ->
|
|
|
?assertMatch({ok, _}, Result),
|
|
|
- ?assertMatch([_], ?of_kind(datalayers_connector_start_already_started, Trace)),
|
|
|
+ ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),
|
|
|
ok
|
|
|
end
|
|
|
),
|
|
|
@@ -853,7 +854,7 @@ t_bad_timestamp(Config) ->
|
|
|
?check_trace(
|
|
|
?wait_async_action(
|
|
|
send_message(Config1, SentData),
|
|
|
- #{?snk_kind := datalayers_connector_send_query_error},
|
|
|
+ #{?snk_kind := influxdb_connector_send_query_error},
|
|
|
10_000
|
|
|
),
|
|
|
fun(Result, Trace) ->
|
|
|
@@ -865,7 +866,7 @@ t_bad_timestamp(Config) ->
|
|
|
?assertEqual(ok, Return),
|
|
|
?assertMatch(
|
|
|
[#{error := points_trans_failed}],
|
|
|
- ?of_kind(datalayers_connector_send_query_error, Trace)
|
|
|
+ ?of_kind(influxdb_connector_send_query_error, Trace)
|
|
|
);
|
|
|
{async, false} ->
|
|
|
?assertEqual(ok, Return),
|
|
|
@@ -877,7 +878,7 @@ t_bad_timestamp(Config) ->
|
|
|
]
|
|
|
}
|
|
|
],
|
|
|
- ?of_kind(datalayers_connector_send_query_error, Trace)
|
|
|
+ ?of_kind(influxdb_connector_send_query_error, Trace)
|
|
|
);
|
|
|
{sync, false} ->
|
|
|
?assertEqual(
|
|
|
@@ -915,14 +916,14 @@ t_create_disconnected(Config) ->
|
|
|
?assertMatch({ok, _}, create_bridge(Config))
|
|
|
end),
|
|
|
fun(Trace) ->
|
|
|
- [#{error := datalayers_client_not_alive, reason := Reason}] =
|
|
|
- ?of_kind(datalayers_connector_start_failed, Trace),
|
|
|
+ [#{error := influxdb_client_not_alive, reason := Reason}] =
|
|
|
+ ?of_kind(influxdb_connector_start_failed, Trace),
|
|
|
case Reason of
|
|
|
econnrefused -> ok;
|
|
|
closed -> ok;
|
|
|
{closed, _} -> ok;
|
|
|
{shutdown, closed} -> ok;
|
|
|
- _ -> ct:fail("datalayers_client_not_alive with wrong reason: ~p", [Reason])
|
|
|
+ _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
|
|
|
end,
|
|
|
ok
|
|
|
end
|
|
|
@@ -939,7 +940,7 @@ t_start_error(Config) ->
|
|
|
fun() ->
|
|
|
?wait_async_action(
|
|
|
?assertMatch({ok, _}, create_bridge(Config)),
|
|
|
- #{?snk_kind := datalayers_connector_start_failed},
|
|
|
+ #{?snk_kind := influxdb_connector_start_failed},
|
|
|
10_000
|
|
|
)
|
|
|
end
|
|
|
@@ -947,7 +948,7 @@ t_start_error(Config) ->
|
|
|
fun(Trace) ->
|
|
|
?assertMatch(
|
|
|
[#{error := some_error}],
|
|
|
- ?of_kind(datalayers_connector_start_failed, Trace)
|
|
|
+ ?of_kind(influxdb_connector_start_failed, Trace)
|
|
|
),
|
|
|
ok
|
|
|
end
|
|
|
@@ -964,7 +965,7 @@ t_start_exception(Config) ->
|
|
|
fun() ->
|
|
|
?wait_async_action(
|
|
|
?assertMatch({ok, _}, create_bridge(Config)),
|
|
|
- #{?snk_kind := datalayers_connector_start_exception},
|
|
|
+ #{?snk_kind := influxdb_connector_start_exception},
|
|
|
10_000
|
|
|
)
|
|
|
end
|
|
|
@@ -972,7 +973,7 @@ t_start_exception(Config) ->
|
|
|
fun(Trace) ->
|
|
|
?assertMatch(
|
|
|
[#{error := {error, boom}}],
|
|
|
- ?of_kind(datalayers_connector_start_exception, Trace)
|
|
|
+ ?of_kind(influxdb_connector_start_exception, Trace)
|
|
|
),
|
|
|
ok
|
|
|
end
|
|
|
@@ -1026,7 +1027,7 @@ t_write_failure(Config) ->
|
|
|
?assertMatch([_ | _], Trace),
|
|
|
[#{result := Result} | _] = Trace,
|
|
|
?assert(
|
|
|
- not emqx_bridge_datalayers_connector:is_unrecoverable_error(Result),
|
|
|
+ not emqx_bridge_influxdb_connector:is_unrecoverable_error(Result),
|
|
|
#{got => Result}
|
|
|
);
|
|
|
async ->
|
|
|
@@ -1034,7 +1035,7 @@ t_write_failure(Config) ->
|
|
|
?assertMatch([#{action := nack} | _], Trace),
|
|
|
[#{result := Result} | _] = Trace,
|
|
|
?assert(
|
|
|
- not emqx_bridge_datalayers_connector:is_unrecoverable_error(Result),
|
|
|
+ not emqx_bridge_influxdb_connector:is_unrecoverable_error(Result),
|
|
|
#{got => Result}
|
|
|
)
|
|
|
end,
|
|
|
@@ -1073,7 +1074,7 @@ t_missing_field(Config) ->
|
|
|
{ok, _} =
|
|
|
snabbkaffe:block_until(
|
|
|
?match_n_events(NEvents, #{
|
|
|
- ?snk_kind := datalayers_connector_send_query_error
|
|
|
+ ?snk_kind := influxdb_connector_send_query_error
|
|
|
}),
|
|
|
_Timeout1 = 10_000
|
|
|
),
|
|
|
@@ -1086,12 +1087,12 @@ t_missing_field(Config) ->
|
|
|
true ->
|
|
|
?assertMatch(
|
|
|
[#{error := points_trans_failed} | _],
|
|
|
- ?of_kind(datalayers_connector_send_query_error, Trace)
|
|
|
+ ?of_kind(influxdb_connector_send_query_error, Trace)
|
|
|
);
|
|
|
false ->
|
|
|
?assertMatch(
|
|
|
[#{error := [{error, no_fields}]} | _],
|
|
|
- ?of_kind(datalayers_connector_send_query_error, Trace)
|
|
|
+ ?of_kind(influxdb_connector_send_query_error, Trace)
|
|
|
)
|
|
|
end,
|
|
|
%% nothing should have been persisted
|
|
|
@@ -1113,14 +1114,14 @@ t_authentication_error(Config0) ->
|
|
|
begin
|
|
|
?wait_async_action(
|
|
|
create_bridge(Config),
|
|
|
- #{?snk_kind := datalayers_connector_start_failed},
|
|
|
+ #{?snk_kind := influxdb_connector_start_failed},
|
|
|
10_000
|
|
|
)
|
|
|
end,
|
|
|
fun(Trace) ->
|
|
|
?assertMatch(
|
|
|
[#{error := auth_error} | _],
|
|
|
- ?of_kind(datalayers_connector_start_failed, Trace)
|
|
|
+ ?of_kind(influxdb_connector_start_failed, Trace)
|
|
|
),
|
|
|
ok
|
|
|
end
|
|
|
@@ -1217,7 +1218,7 @@ t_authentication_error_on_send_message(Config0) ->
|
|
|
fun(Trace) ->
|
|
|
?assertMatch(
|
|
|
[#{error := <<"authorization failure">>} | _],
|
|
|
- ?of_kind(datalayers_connector_do_query_failure, Trace)
|
|
|
+ ?of_kind(influxdb_connector_do_query_failure, Trace)
|
|
|
),
|
|
|
ok
|
|
|
end
|