Parcourir la source

Merge pull request #13755 from HJianBo/refator-datalayer-impl

Refactor datalayers impl
JianBo He il y a 1 an
Parent
commit
d3364f4024

+ 1 - 1
apps/emqx_bridge_datalayers/src/emqx_bridge_datalayers.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_datalayers, [
     {description, "EMQX Enterprise Datalayers Bridge"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [
         kernel,

+ 63 - 696
apps/emqx_bridge_datalayers/src/emqx_bridge_datalayers_connector.erl

@@ -28,10 +28,8 @@
     on_batch_query/3,
     on_query_async/4,
     on_batch_query_async/4,
-    on_get_status/2,
-    on_format_query_result/1
+    on_get_status/2
 ]).
--export([reply_callback/2]).
 
 -export([
     roots/0,
@@ -42,14 +40,6 @@
 
 -export([precision_field/0]).
 
-%% only for test
--export([is_unrecoverable_error/1]).
-
--type ts_precision() :: ns | us | ms | s.
-
-%% Allocatable resources
--define(datalayers_client, datalayers_client).
-
 -define(DATALAYERS_DEFAULT_PORT, 8361).
 
 %% datalayers servers don't need parse
@@ -57,16 +47,6 @@
     default_port => ?DATALAYERS_DEFAULT_PORT
 }).
 
--define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
-
--define(set_tag, set_tag).
--define(set_field, set_field).
-
--define(IS_HTTP_ERROR(STATUS_CODE),
-    (is_integer(STATUS_CODE) andalso
-        (STATUS_CODE < 200 orelse STATUS_CODE >= 300))
-).
-
 %%--------------------------------------------------------------------
 %% resource callback
 
@@ -75,150 +55,79 @@ resource_type() -> datalayers.
 callback_mode() -> async_if_possible.
 
 on_add_channel(
-    _InstanceId,
-    #{channels := Channels, client := Client} = OldState,
+    InstId,
+    OldState,
     ChannelId,
-    #{parameters := Parameters} = ChannelConfig0
+    ChannelConf
 ) ->
-    #{write_syntax := WriteSytaxTmpl} = Parameters,
-    Precision = maps:get(precision, Parameters, ms),
-    ChannelConfig = maps:merge(
-        Parameters,
-        ChannelConfig0#{
-            channel_client => influxdb:update_precision(Client, Precision),
-            write_syntax => to_config(WriteSytaxTmpl, Precision)
-        }
-    ),
-    {ok, OldState#{
-        channels => maps:put(ChannelId, ChannelConfig, Channels)
-    }}.
-
-on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
-    NewState = State#{channels => maps:remove(ChannelId, Channels)},
-    {ok, NewState}.
-
-on_get_channel_status(InstanceId, _ChannelId, State) ->
-    case on_get_status(InstanceId, State) of
-        connected -> connected;
-        _ -> connecting
-    end.
+    emqx_bridge_influxdb_connector:on_add_channel(
+        InstId,
+        OldState,
+        ChannelId,
+        ChannelConf
+    ).
+
+on_remove_channel(InstId, State, ChannelId) ->
+    emqx_bridge_influxdb_connector:on_remove_channel(InstId, State, ChannelId).
 
-on_get_channels(InstanceId) ->
-    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+on_get_channel_status(InstId, ChannelId, State) ->
+    emqx_bridge_influxdb_connector:on_get_channel_status(InstId, ChannelId, State).
+
+on_get_channels(InstId) ->
+    emqx_bridge_influxdb_connector:on_get_channels(InstId).
 
 on_start(InstId, Config) ->
-    %% InstID as pool would be handled by influxdb client
-    %% so there is no need to allocate pool_name here
-    start_client(InstId, Config).
-
-on_stop(InstId, _State) ->
-    case emqx_resource:get_allocated_resources(InstId) of
-        #{?datalayers_client := Client} ->
-            Res = influxdb:stop_client(Client),
-            ?tp(datalayers_client_stopped, #{instance_id => InstId}),
-            Res;
-        _ ->
-            ok
+    case driver_type(Config) of
+        influxdb_v1 ->
+            Config1 = convert_config_to_influxdb(Config),
+            emqx_bridge_influxdb_connector:on_start(InstId, Config1)
     end.
 
-on_query(InstId, {Channel, Message}, #{channels := ChannelConf}) ->
-    #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
-    #{channel_client := Client} = maps:get(Channel, ChannelConf),
-    case data_to_points(Message, SyntaxLines) of
-        {ok, Points} ->
-            ?tp(
-                datalayers_connector_send_query,
-                #{points => Points, batch => false, mode => sync}
-            ),
-            do_query(InstId, Channel, Client, Points);
-        {error, ErrorPoints} ->
-            ?tp(
-                datalayers_connector_send_query_error,
-                #{batch => false, mode => sync, error => ErrorPoints}
-            ),
-            log_error_points(InstId, ErrorPoints),
-            {error, {unrecoverable_error, ErrorPoints}}
-    end.
+driver_type(#{parameters := #{driver_type := influxdb_v1}}) ->
+    influxdb_v1.
 
-%% Once a Batched Data trans to points failed.
-%% This batch query failed
-on_batch_query(InstId, BatchData, #{channels := ChannelConf}) ->
-    [{Channel, _} | _] = BatchData,
-    #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
-    #{channel_client := Client} = maps:get(Channel, ChannelConf),
-    case parse_batch_data(InstId, BatchData, SyntaxLines) of
-        {ok, Points} ->
-            ?tp(
-                datalayers_connector_send_query,
-                #{points => Points, batch => true, mode => sync}
-            ),
-            do_query(InstId, Channel, Client, Points);
-        {error, Reason} ->
-            ?tp(
-                dayalayers_connector_send_query_error,
-                #{batch => true, mode => sync, error => Reason}
-            ),
-            {error, {unrecoverable_error, Reason}}
-    end.
+convert_config_to_influxdb(Config = #{parameters := Params = #{driver_type := influxdb_v1}}) ->
+    Config#{
+        parameters := maps:without([driver_type], Params#{influxdb_type => influxdb_api_v1})
+    }.
+
+on_stop(InstId, State) ->
+    emqx_bridge_influxdb_connector:on_stop(InstId, State).
+
+on_query(InstId, {Channel, Message}, State) ->
+    emqx_bridge_influxdb_connector:on_query(InstId, {Channel, Message}, State).
+
+on_batch_query(InstId, BatchData, State) ->
+    emqx_bridge_influxdb_connector:on_batch_query(InstId, BatchData, State).
 
 on_query_async(
     InstId,
     {Channel, Message},
     {ReplyFun, Args},
-    #{channels := ChannelConf}
+    State
 ) ->
-    #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
-    #{channel_client := Client} = maps:get(Channel, ChannelConf),
-    case data_to_points(Message, SyntaxLines) of
-        {ok, Points} ->
-            ?tp(
-                datalayers_connector_send_query,
-                #{points => Points, batch => false, mode => async}
-            ),
-            do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args});
-        {error, ErrorPoints} = Err ->
-            ?tp(
-                datalayers_connector_send_query_error,
-                #{batch => false, mode => async, error => ErrorPoints}
-            ),
-            log_error_points(InstId, ErrorPoints),
-            Err
-    end.
+    emqx_bridge_influxdb_connector:on_query_async(
+        InstId,
+        {Channel, Message},
+        {ReplyFun, Args},
+        State
+    ).
 
 on_batch_query_async(
     InstId,
     BatchData,
     {ReplyFun, Args},
-    #{channels := ChannelConf}
+    State
 ) ->
-    [{Channel, _} | _] = BatchData,
-    #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
-    #{channel_client := Client} = maps:get(Channel, ChannelConf),
-    case parse_batch_data(InstId, BatchData, SyntaxLines) of
-        {ok, Points} ->
-            ?tp(
-                datalayers_connector_send_query,
-                #{points => Points, batch => true, mode => async}
-            ),
-            do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args});
-        {error, Reason} ->
-            ?tp(
-                datalayers_connector_send_query_error,
-                #{batch => true, mode => async, error => Reason}
-            ),
-            {error, {unrecoverable_error, Reason}}
-    end.
-
-on_format_query_result(Result) ->
-    emqx_bridge_http_connector:on_format_query_result(Result).
+    emqx_bridge_influxdb_connector:on_batch_query_async(
+        InstId,
+        BatchData,
+        {ReplyFun, Args},
+        State
+    ).
 
-on_get_status(_InstId, #{client := Client}) ->
-    case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
-        true ->
-            connected;
-        false ->
-            disconnected
-    end.
+on_get_status(InstId, State) ->
+    emqx_bridge_influxdb_connector:on_get_status(InstId, State).
 
 %%--------------------------------------------------------------------
 %% schema
@@ -237,11 +146,13 @@ fields("connector") ->
         server_field(),
         {parameters,
             mk(
-                ref(?MODULE, "datalayers_parameters"),
+                hoconsc:union([
+                    ref(?MODULE, "datalayers_influxdb_v1_parameters")
+                ]),
                 #{required => true, desc => ?DESC("datalayers_parameters")}
             )}
     ] ++ emqx_connector_schema_lib:ssl_fields();
-fields("datalayers_parameters") ->
+fields("datalayers_influxdb_v1_parameters") ->
     datalayers_parameters_fields().
 
 server_field() ->
@@ -259,6 +170,10 @@ precision_field() ->
 
 datalayers_parameters_fields() ->
     [
+        {driver_type,
+            mk(enum([influxdb_v1]), #{
+                required => false, default => influxdb_v1, desc => ?DESC("driver_type")
+            })},
         {database, mk(binary(), #{required => true, desc => ?DESC("database")})},
         {username, mk(binary(), #{desc => ?DESC("username")})},
         {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
@@ -277,541 +192,16 @@ desc(common) ->
     ?DESC("common");
 desc(parameters) ->
     ?DESC("dayalayers_parameters");
-desc("datalayers_parameters") ->
+desc("datalayers_influxdb_v1_parameters") ->
     ?DESC("datalayers_parameters");
 desc(datalayers_api) ->
     ?DESC("datalayers_api");
 desc("connector") ->
     ?DESC("connector").
 
-%% -------------------------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% internal functions
 
-start_client(InstId, Config) ->
-    ClientConfig = client_config(InstId, Config),
-    ?SLOG(info, #{
-        msg => "starting_datalayers_connector",
-        connector => InstId,
-        config => emqx_utils:redact(Config),
-        client_config => emqx_utils:redact(ClientConfig)
-    }),
-    try do_start_client(InstId, ClientConfig, Config) of
-        Res = {ok, #{client := Client}} ->
-            ok = emqx_resource:allocate_resource(InstId, ?datalayers_client, Client),
-            Res;
-        {error, Reason} ->
-            {error, Reason}
-    catch
-        E:R:S ->
-            ?tp(datalayers_connector_start_exception, #{error => {E, R}}),
-            ?SLOG(warning, #{
-                msg => "start_datalayers_connector_error",
-                connector => InstId,
-                error => E,
-                reason => R,
-                stack => S
-            }),
-            {error, R}
-    end.
-
-do_start_client(InstId, ClientConfig, Config) ->
-    case influxdb:start_client(ClientConfig) of
-        {ok, Client} ->
-            case influxdb:is_alive(Client, true) of
-                true ->
-                    case influxdb:check_auth(Client) of
-                        ok ->
-                            State = #{client => Client, channels => #{}},
-                            ?SLOG(info, #{
-                                msg => "starting_datalayers_connector_success",
-                                connector => InstId,
-                                client => redact_auth(Client),
-                                state => redact_auth(State)
-                            }),
-                            {ok, State};
-                        Error ->
-                            ?tp(datalayers_connector_start_failed, #{error => auth_error}),
-                            ?SLOG(warning, #{
-                                msg => "failed_to_start_datalayers_connector",
-                                error => Error,
-                                connector => InstId,
-                                client => redact_auth(Client),
-                                reason => auth_error
-                            }),
-                            %% no leak
-                            _ = influxdb:stop_client(Client),
-                            {error, connect_ok_but_auth_failed}
-                    end;
-                {false, Reason} ->
-                    ?tp(datalayers_connector_start_failed, #{
-                        error => datalayers_client_not_alive, reason => Reason
-                    }),
-                    ?SLOG(warning, #{
-                        msg => "failed_to_start_datalayers_connector",
-                        connector => InstId,
-                        client => redact_auth(Client),
-                        reason => Reason
-                    }),
-                    %% no leak
-                    _ = influxdb:stop_client(Client),
-                    {error, {connect_failed, Reason}}
-            end;
-        {error, {already_started, Client0}} ->
-            ?tp(datalayers_connector_start_already_started, #{}),
-            ?SLOG(info, #{
-                msg => "restarting_datalayers_connector_found_already_started_client",
-                connector => InstId,
-                old_client => redact_auth(Client0)
-            }),
-            _ = influxdb:stop_client(Client0),
-            do_start_client(InstId, ClientConfig, Config);
-        {error, Reason} ->
-            ?tp(datalayers_connector_start_failed, #{error => Reason}),
-            ?SLOG(warning, #{
-                msg => "failed_to_start_datalayers_connector",
-                connector => InstId,
-                reason => Reason
-            }),
-            {error, Reason}
-    end.
-
-client_config(
-    InstId,
-    Config = #{
-        server := Server
-    }
-) ->
-    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?DATALAYERS_HOST_OPTIONS),
-    [
-        {host, str(Host)},
-        {port, Port},
-        {pool_size, erlang:system_info(schedulers)},
-        {pool, InstId}
-    ] ++ protocol_config(Config).
-
-protocol_config(#{parameters := #{database := DB} = Params, ssl := SSL}) ->
-    [
-        {protocol, http},
-        {version, v1},
-        {database, str(DB)}
-    ] ++ username(Params) ++ password(Params) ++ ssl_config(SSL).
-
-ssl_config(#{enable := false}) ->
-    [
-        {https_enabled, false}
-    ];
-ssl_config(SSL = #{enable := true}) ->
-    [
-        {https_enabled, true},
-        {transport, ssl},
-        {transport_opts, emqx_tls_lib:to_client_opts(SSL)}
-    ].
-
-username(#{username := Username}) ->
-    [{username, str(Username)}];
-username(_) ->
-    [].
-
-password(#{password := Password}) ->
-    %% TODO: teach `influxdb` to accept 0-arity closures as passwords.
-    [{password, str(emqx_secret:unwrap(Password))}];
-password(_) ->
-    [].
-
-redact_auth(Term) ->
-    emqx_utils:redact(Term, fun is_auth_key/1).
-
-is_auth_key(Key) when is_binary(Key) ->
-    string:equal("authorization", Key, true);
-is_auth_key(_) ->
-    false.
-
-%% -------------------------------------------------------------------------------------------------
-%% Query
-do_query(InstId, Channel, Client, Points) ->
-    emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}),
-    case influxdb:write(Client, Points) of
-        ok ->
-            ?SLOG(debug, #{
-                msg => "datalayers_write_point_success",
-                connector => InstId,
-                points => Points
-            });
-        {error, {401, _, _}} ->
-            ?tp(datalayers_connector_do_query_failure, #{error => <<"authorization failure">>}),
-            ?SLOG(error, #{
-                msg => "datalayers_authorization_failed",
-                client => redact_auth(Client),
-                connector => InstId
-            }),
-            {error, {unrecoverable_error, <<"authorization failure">>}};
-        {error, Reason} = Err ->
-            ?tp(datalayers_connector_do_query_failure, #{error => Reason}),
-            ?SLOG(error, #{
-                msg => "datalayers_write_point_failed",
-                connector => InstId,
-                reason => Reason
-            }),
-            case is_unrecoverable_error(Err) of
-                true ->
-                    {error, {unrecoverable_error, Reason}};
-                false ->
-                    {error, {recoverable_error, Reason}}
-            end
-    end.
-
-do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) ->
-    ?SLOG(info, #{
-        msg => "datalayers_write_point_async",
-        connector => InstId,
-        points => Points
-    }),
-    emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}),
-    WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
-    {ok, _WorkerPid} = influxdb:write_async(Client, Points, WrappedReplyFunAndArgs).
-
-reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
-    case is_unrecoverable_error(Error) of
-        true ->
-            Result = {error, {unrecoverable_error, Reason}},
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
-        false ->
-            Result = {error, {recoverable_error, Reason}},
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
-    end;
-reply_callback(ReplyFunAndArgs, {ok, 401, _, _}) ->
-    ?tp(datalayers_connector_do_query_failure, #{error => <<"authorization failure">>}),
-    Result = {error, {unrecoverable_error, <<"authorization failure">>}},
-    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
-reply_callback(ReplyFunAndArgs, {ok, Code, _, Body}) when ?IS_HTTP_ERROR(Code) ->
-    ?tp(datalayers_connector_do_query_failure, #{error => Body}),
-    Result = {error, {unrecoverable_error, Body}},
-    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
-reply_callback(ReplyFunAndArgs, {ok, Code, Headers}) when ?IS_HTTP_ERROR(Code) ->
-    Error = #{code => Code, headers => Headers},
-    ?tp(datalayers_connector_do_query_failure, #{error => Error}),
-    Result = {error, {unrecoverable_error, Error}},
-    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
-reply_callback(ReplyFunAndArgs, Result) ->
-    ?tp(datalayers_connector_do_query_ok, #{result => Result}),
-    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
-
-%% -------------------------------------------------------------------------------------------------
-%% Tags & Fields Config Trans
-
-to_config(Lines, Precision) ->
-    to_config(Lines, [], Precision).
-
-to_config([], Acc, _Precision) ->
-    lists:reverse(Acc);
-to_config([Item0 | Rest], Acc, Precision) ->
-    Ts0 = maps:get(timestamp, Item0, undefined),
-    {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
-    Item = #{
-        measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)),
-        timestamp => Ts,
-        precision => {FromPrecision, ToPrecision},
-        tags => to_kv_config(maps:get(tags, Item0)),
-        fields => to_kv_config(maps:get(fields, Item0))
-    },
-    to_config(Rest, [Item | Acc], Precision).
-
-%% pre-process the timestamp template
-%% returns a tuple of three elements:
-%% 1. The timestamp template itself.
-%% 2. The source timestamp precision (ms if the template ${timestamp} is used).
-%% 3. The target timestamp precision (configured for the client).
-preproc_tmpl_timestamp(undefined, Precision) ->
-    %% not configured, we default it to the message timestamp
-    preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision);
-preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
-    %% a const value is used which is very much unusual, but we have to add a special handling
-    {Ts, Precision, Precision};
-preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
-    preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
-preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
-    {emqx_placeholder:preproc_tmpl(Ts), ms, Precision};
-preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
-    %% a placehold is in use. e.g. ${payload.my_timestamp}
-    %% we can only hope it the value will be of the same precision in the configs
-    {emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}.
-
-to_kv_config(KVfields) ->
-    maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
-
-to_maps_config(K, V, Res) ->
-    NK = emqx_placeholder:preproc_tmpl(bin(K)),
-    Res#{NK => preproc_quoted(V)}.
-
-preproc_quoted({quoted, V}) ->
-    {quoted, emqx_placeholder:preproc_tmpl(bin(V))};
-preproc_quoted(V) ->
-    emqx_placeholder:preproc_tmpl(bin(V)).
-
-proc_quoted({quoted, V}, Data, TransOpts) ->
-    {quoted, emqx_placeholder:proc_tmpl(V, Data, TransOpts)};
-proc_quoted(V, Data, TransOpts) ->
-    emqx_placeholder:proc_tmpl(V, Data, TransOpts).
-
-%% -------------------------------------------------------------------------------------------------
-%% Tags & Fields Data Trans
-parse_batch_data(InstId, BatchData, SyntaxLines) ->
-    {Points, Errors} = lists:foldl(
-        fun({_, Data}, {ListOfPoints, ErrAccIn}) ->
-            case data_to_points(Data, SyntaxLines) of
-                {ok, Points} ->
-                    {[Points | ListOfPoints], ErrAccIn};
-                {error, ErrorPoints} ->
-                    log_error_points(InstId, ErrorPoints),
-                    {ListOfPoints, ErrAccIn + 1}
-            end
-        end,
-        {[], 0},
-        BatchData
-    ),
-    case Errors of
-        0 ->
-            {ok, lists:flatten(Points)};
-        _ ->
-            ?SLOG(error, #{
-                msg => "datalayers_trans_point_failed",
-                error_count => Errors,
-                connector => InstId,
-                reason => points_trans_failed
-            }),
-            {error, points_trans_failed}
-    end.
-
--spec data_to_points(map(), [
-    #{
-        fields := [{binary(), binary()}],
-        measurement := binary(),
-        tags := [{binary(), binary()}],
-        timestamp := emqx_placeholder:tmpl_token() | integer(),
-        precision := {From :: ts_precision(), To :: ts_precision()}
-    }
-]) -> {ok, [map()]} | {error, term()}.
-data_to_points(Data, SyntaxLines) ->
-    lines_to_points(Data, SyntaxLines, [], []).
-
-%% When converting multiple rows data into InfluxDB Line Protocol, they are considered to be strongly correlated.
-%% And once a row fails to convert, all of them are considered to have failed.
-lines_to_points(_, [], Points, ErrorPoints) ->
-    case ErrorPoints of
-        [] ->
-            {ok, Points};
-        _ ->
-            %% ignore trans succeeded points
-            {error, ErrorPoints}
-    end;
-lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
-    is_list(Ts)
-->
-    TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
-    case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of
-        {ok, TsInt} ->
-            Item1 = Item#{timestamp => TsInt},
-            continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
-        {error, BadTs} ->
-            lines_to_points(Data, Rest, ResultPointsAcc, [
-                {error, {bad_timestamp, BadTs}} | ErrorPointsAcc
-            ])
-    end;
-lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
-    is_integer(Ts)
-->
-    continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
-
-parse_timestamp([TsInt]) when is_integer(TsInt) ->
-    {ok, TsInt};
-parse_timestamp([TsBin]) ->
-    try
-        {ok, binary_to_integer(TsBin)}
-    catch
-        _:_ ->
-            {error, TsBin}
-    end.
-
-continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
-    case line_to_point(Data, Item) of
-        #{fields := Fields} when map_size(Fields) =:= 0 ->
-            %% influxdb client doesn't like empty field maps...
-            ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc],
-            lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1);
-        Point ->
-            lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
-    end.
-
-line_to_point(
-    Data,
-    #{
-        measurement := Measurement,
-        tags := Tags,
-        fields := Fields,
-        timestamp := Ts,
-        precision := Precision
-    } = Item
-) ->
-    {_, EncodedTags, _} = maps:fold(fun maps_config_to_data/3, {Data, #{}, ?set_tag}, Tags),
-    {_, EncodedFields, _} = maps:fold(fun maps_config_to_data/3, {Data, #{}, ?set_field}, Fields),
-    maps:without([precision], Item#{
-        measurement => emqx_placeholder:proc_tmpl(Measurement, Data),
-        tags => EncodedTags,
-        fields => EncodedFields,
-        timestamp => maybe_convert_time_unit(Ts, Precision)
-    }).
-
-maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) ->
-    erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)).
-
-time_unit(s) -> second;
-time_unit(ms) -> millisecond;
-time_unit(us) -> microsecond;
-time_unit(ns) -> nanosecond.
-
-maps_config_to_data(K, V, {Data, Res, SetType}) ->
-    KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
-    VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
-    NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
-    NV = proc_quoted(V, Data, VTransOptions),
-    case {NK, NV} of
-        {[undefined], _} ->
-            {Data, Res, SetType};
-        %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
-        {_, [undefined | _]} ->
-            {Data, Res, SetType};
-        {_, {quoted, [undefined | _]}} ->
-            {Data, Res, SetType};
-        _ ->
-            NRes = Res#{
-                list_to_binary(NK) => value_type(NV, #{
-                    tmpl_type => tmpl_type(V), set_type => SetType
-                })
-            },
-            {Data, NRes, SetType}
-    end.
-
-value_type([Number], #{set_type := ?set_tag}) when is_number(Number) ->
-    %% all `tag` values are treated as string
-    %% See also: https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#tag-set
-    emqx_utils_conv:bin(Number);
-value_type([Str], #{set_type := ?set_tag}) when is_binary(Str) ->
-    Str;
-value_type({quoted, ValList}, _) ->
-    {string_list, ValList};
-value_type([Int, <<"i">>], #{tmpl_type := mixed}) when is_integer(Int) ->
-    {int, Int};
-value_type([UInt, <<"u">>], #{tmpl_type := mixed}) when is_integer(UInt) ->
-    {uint, UInt};
-%% write `1`, `1.0`, `-1.0` all as float
-%% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float
-value_type([Number], #{set_type := ?set_field}) when is_number(Number) ->
-    {float, Number};
-value_type([<<"t">>], _) ->
-    't';
-value_type([<<"T">>], _) ->
-    'T';
-value_type([true], _) ->
-    'true';
-value_type([<<"TRUE">>], _) ->
-    'TRUE';
-value_type([<<"True">>], _) ->
-    'True';
-value_type([<<"f">>], _) ->
-    'f';
-value_type([<<"F">>], _) ->
-    'F';
-value_type([false], _) ->
-    'false';
-value_type([<<"FALSE">>], _) ->
-    'FALSE';
-value_type([<<"False">>], _) ->
-    'False';
-value_type([Str], #{tmpl_type := variable}) when is_binary(Str) ->
-    Str;
-value_type([Str], #{tmpl_type := literal, set_type := ?set_field}) when is_binary(Str) ->
-    %% if Str is a literal string suffixed with `i` or `u`, we should convert it to int/uint.
-    %% otherwise, we should convert it to float.
-    NumStr = binary:part(Str, 0, byte_size(Str) - 1),
-    case binary:part(Str, byte_size(Str), -1) of
-        <<"i">> ->
-            maybe_convert_to_integer(NumStr, Str, int);
-        <<"u">> ->
-            maybe_convert_to_integer(NumStr, Str, uint);
-        _ ->
-            maybe_convert_to_float_str(Str)
-    end;
-value_type(Str, _) ->
-    Str.
-
-tmpl_type([{str, _}]) ->
-    literal;
-tmpl_type([{var, _}]) ->
-    variable;
-tmpl_type(_) ->
-    mixed.
-
-maybe_convert_to_integer(NumStr, String, Type) ->
-    try
-        Int = binary_to_integer(NumStr),
-        {Type, Int}
-    catch
-        error:badarg ->
-            maybe_convert_to_integer_f(NumStr, String, Type)
-    end.
-
-maybe_convert_to_integer_f(NumStr, String, Type) ->
-    try
-        Float = binary_to_float(NumStr),
-        {Type, erlang:floor(Float)}
-    catch
-        error:badarg ->
-            String
-    end.
-
-maybe_convert_to_float_str(NumStr) ->
-    try
-        _ = binary_to_float(NumStr),
-        %% NOTE: return a {float, String} to avoid precision loss when converting to float
-        {float, NumStr}
-    catch
-        error:badarg ->
-            maybe_convert_to_float_str_i(NumStr)
-    end.
-
-maybe_convert_to_float_str_i(NumStr) ->
-    try
-        _ = binary_to_integer(NumStr),
-        {float, NumStr}
-    catch
-        error:badarg ->
-            NumStr
-    end.
-
-key_filter(undefined) -> undefined;
-key_filter(Value) -> bin(Value).
-
-data_filter(undefined) -> undefined;
-data_filter(Int) when is_integer(Int) -> Int;
-data_filter(Number) when is_number(Number) -> Number;
-data_filter(Bool) when is_boolean(Bool) -> Bool;
-data_filter(Data) -> bin(Data).
-
-bin(Data) -> emqx_utils_conv:bin(Data).
-
-%% helper funcs
-log_error_points(InstId, Errs) ->
-    lists:foreach(
-        fun({error, Reason}) ->
-            ?SLOG(error, #{
-                msg => "datalayers_trans_point_failed",
-                connector => InstId,
-                reason => Reason
-            })
-        end,
-        Errs
-    ).
-
 convert_server(<<"http://", Server/binary>>, HoconOpts) ->
     convert_server(Server, HoconOpts);
 convert_server(<<"https://", Server/binary>>, HoconOpts) ->
@@ -820,22 +210,6 @@ convert_server(Server0, HoconOpts) ->
     Server = string:trim(Server0, trailing, "/"),
     emqx_schema:convert_servers(Server, HoconOpts).
 
-str(A) when is_atom(A) ->
-    atom_to_list(A);
-str(B) when is_binary(B) ->
-    binary_to_list(B);
-str(S) when is_list(S) ->
-    S.
-
-is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
-    true;
-is_unrecoverable_error({error, {Code, _}}) when ?IS_HTTP_ERROR(Code) ->
-    true;
-is_unrecoverable_error({error, {Code, _, _Body}}) when ?IS_HTTP_ERROR(Code) ->
-    true;
-is_unrecoverable_error(_) ->
-    false.
-
 %%===================================================================
 %% eunit tests
 %%===================================================================
@@ -843,13 +217,6 @@ is_unrecoverable_error(_) ->
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 
-is_auth_key_test_() ->
-    [
-        ?_assert(is_auth_key(<<"Authorization">>)),
-        ?_assertNot(is_auth_key(<<"Something">>)),
-        ?_assertNot(is_auth_key(89))
-    ].
-
 %% for coverage
 desc_test_() ->
     [

+ 22 - 21
apps/emqx_bridge_datalayers/test/emqx_bridge_datalayers_SUITE.erl

@@ -206,6 +206,7 @@ datalayers_connector_config(Name, DatalayersHost, DatalayersPort, Config) ->
             "  enable = true\n"
             "  server = \"~s:~b\"\n"
             "  parameters {\n"
+            "    driver_type = influxdb_v1\n"
             "    database = mqtt\n"
             "    username = admin\n"
             "    password = public\n"
@@ -451,7 +452,7 @@ t_start_ok(Config) ->
             ok
         end,
         fun(Trace0) ->
-            Trace = ?of_kind(datalayers_connector_send_query, Trace0),
+            Trace = ?of_kind(influxdb_connector_send_query, Trace0),
             ?assertMatch([#{points := [_]}], Trace),
             [#{points := [Point]}] = Trace,
             ct:pal("sent point: ~p", [Point]),
@@ -474,7 +475,7 @@ t_start_ok(Config) ->
     ok.
 
 t_start_stop(Config) ->
-    ok = emqx_bridge_v2_testlib:t_start_stop(Config, datalayers_client_stopped),
+    ok = emqx_bridge_v2_testlib:t_start_stop(Config, influxdb_client_stopped),
     ok.
 
 t_start_already_started(Config) ->
@@ -494,7 +495,7 @@ t_start_already_started(Config) ->
         emqx_bridge_datalayers_connector:on_start(ConnectorId, ConnConfigMap),
         fun(Result, Trace) ->
             ?assertMatch({ok, _}, Result),
-            ?assertMatch([_], ?of_kind(datalayers_connector_start_already_started, Trace)),
+            ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),
             ok
         end
     ),
@@ -853,7 +854,7 @@ t_bad_timestamp(Config) ->
     ?check_trace(
         ?wait_async_action(
             send_message(Config1, SentData),
-            #{?snk_kind := datalayers_connector_send_query_error},
+            #{?snk_kind := influxdb_connector_send_query_error},
             10_000
         ),
         fun(Result, Trace) ->
@@ -865,7 +866,7 @@ t_bad_timestamp(Config) ->
                     ?assertEqual(ok, Return),
                     ?assertMatch(
                         [#{error := points_trans_failed}],
-                        ?of_kind(datalayers_connector_send_query_error, Trace)
+                        ?of_kind(influxdb_connector_send_query_error, Trace)
                     );
                 {async, false} ->
                     ?assertEqual(ok, Return),
@@ -877,7 +878,7 @@ t_bad_timestamp(Config) ->
                                 ]
                             }
                         ],
-                        ?of_kind(datalayers_connector_send_query_error, Trace)
+                        ?of_kind(influxdb_connector_send_query_error, Trace)
                     );
                 {sync, false} ->
                     ?assertEqual(
@@ -915,14 +916,14 @@ t_create_disconnected(Config) ->
             ?assertMatch({ok, _}, create_bridge(Config))
         end),
         fun(Trace) ->
-            [#{error := datalayers_client_not_alive, reason := Reason}] =
-                ?of_kind(datalayers_connector_start_failed, Trace),
+            [#{error := influxdb_client_not_alive, reason := Reason}] =
+                ?of_kind(influxdb_connector_start_failed, Trace),
             case Reason of
                 econnrefused -> ok;
                 closed -> ok;
                 {closed, _} -> ok;
                 {shutdown, closed} -> ok;
-                _ -> ct:fail("datalayers_client_not_alive with wrong reason: ~p", [Reason])
+                _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
             end,
             ok
         end
@@ -939,7 +940,7 @@ t_start_error(Config) ->
             fun() ->
                 ?wait_async_action(
                     ?assertMatch({ok, _}, create_bridge(Config)),
-                    #{?snk_kind := datalayers_connector_start_failed},
+                    #{?snk_kind := influxdb_connector_start_failed},
                     10_000
                 )
             end
@@ -947,7 +948,7 @@ t_start_error(Config) ->
         fun(Trace) ->
             ?assertMatch(
                 [#{error := some_error}],
-                ?of_kind(datalayers_connector_start_failed, Trace)
+                ?of_kind(influxdb_connector_start_failed, Trace)
             ),
             ok
         end
@@ -964,7 +965,7 @@ t_start_exception(Config) ->
             fun() ->
                 ?wait_async_action(
                     ?assertMatch({ok, _}, create_bridge(Config)),
-                    #{?snk_kind := datalayers_connector_start_exception},
+                    #{?snk_kind := influxdb_connector_start_exception},
                     10_000
                 )
             end
@@ -972,7 +973,7 @@ t_start_exception(Config) ->
         fun(Trace) ->
             ?assertMatch(
                 [#{error := {error, boom}}],
-                ?of_kind(datalayers_connector_start_exception, Trace)
+                ?of_kind(influxdb_connector_start_exception, Trace)
             ),
             ok
         end
@@ -1026,7 +1027,7 @@ t_write_failure(Config) ->
                     ?assertMatch([_ | _], Trace),
                     [#{result := Result} | _] = Trace,
                     ?assert(
-                        not emqx_bridge_datalayers_connector:is_unrecoverable_error(Result),
+                        not emqx_bridge_influxdb_connector:is_unrecoverable_error(Result),
                         #{got => Result}
                     );
                 async ->
@@ -1034,7 +1035,7 @@ t_write_failure(Config) ->
                     ?assertMatch([#{action := nack} | _], Trace),
                     [#{result := Result} | _] = Trace,
                     ?assert(
-                        not emqx_bridge_datalayers_connector:is_unrecoverable_error(Result),
+                        not emqx_bridge_influxdb_connector:is_unrecoverable_error(Result),
                         #{got => Result}
                     )
             end,
@@ -1073,7 +1074,7 @@ t_missing_field(Config) ->
             {ok, _} =
                 snabbkaffe:block_until(
                     ?match_n_events(NEvents, #{
-                        ?snk_kind := datalayers_connector_send_query_error
+                        ?snk_kind := influxdb_connector_send_query_error
                     }),
                     _Timeout1 = 10_000
                 ),
@@ -1086,12 +1087,12 @@ t_missing_field(Config) ->
                 true ->
                     ?assertMatch(
                         [#{error := points_trans_failed} | _],
-                        ?of_kind(datalayers_connector_send_query_error, Trace)
+                        ?of_kind(influxdb_connector_send_query_error, Trace)
                     );
                 false ->
                     ?assertMatch(
                         [#{error := [{error, no_fields}]} | _],
-                        ?of_kind(datalayers_connector_send_query_error, Trace)
+                        ?of_kind(influxdb_connector_send_query_error, Trace)
                     )
             end,
             %% nothing should have been persisted
@@ -1113,14 +1114,14 @@ t_authentication_error(Config0) ->
         begin
             ?wait_async_action(
                 create_bridge(Config),
-                #{?snk_kind := datalayers_connector_start_failed},
+                #{?snk_kind := influxdb_connector_start_failed},
                 10_000
             )
         end,
         fun(Trace) ->
             ?assertMatch(
                 [#{error := auth_error} | _],
-                ?of_kind(datalayers_connector_start_failed, Trace)
+                ?of_kind(influxdb_connector_start_failed, Trace)
             ),
             ok
         end
@@ -1217,7 +1218,7 @@ t_authentication_error_on_send_message(Config0) ->
                 fun(Trace) ->
                     ?assertMatch(
                         [#{error := <<"authorization failure">>} | _],
-                        ?of_kind(datalayers_connector_do_query_failure, Trace)
+                        ?of_kind(influxdb_connector_do_query_failure, Trace)
                     ),
                     ok
                 end

+ 1 - 0
apps/emqx_bridge_datalayers/test/emqx_bridge_datalayers_connector_SUITE.erl

@@ -236,6 +236,7 @@ datalayers_connector_config(Host, Port, SslEnabled, Verify) ->
     Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])),
     ConnectorConf = #{
         <<"parameters">> => #{
+            <<"driver_type">> => <<"influxdb_v1">>,
             <<"database">> => <<"mqtt">>,
             <<"username">> => <<"admin">>,
             <<"password">> => <<"public">>

+ 1 - 0
changes/ee/fix-13755.md

@@ -0,0 +1 @@
+Refactoring the Datalayers data integration implementation to increase code reuse.

+ 6 - 0
rel/i18n/emqx_bridge_datalayers_connector.hocon

@@ -38,6 +38,12 @@ username.desc:
 username.label:
 """Username"""
 
+driver_type.label:
+"""Driver Type"""
+
+driver_type.desc:
+"""The Driver used to communicate with Datalayers service."""
+
 datalayers_parameters.label:
 """Datalayers Specific Parameters"""