Просмотр исходного кода

fix: use default template if timestamp is empty (undefined) in InfluxDB bridge

Closes EMQX-8926
Serge Tupchii 2 лет назад
Родитель
Сommit
97e71c54d4

+ 7 - 5
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl

@@ -527,7 +527,8 @@ t_start_ok(Config) ->
     SentData = #{
         <<"clientid">> => ClientId,
         <<"topic">> => atom_to_binary(?FUNCTION_NAME),
-        <<"payload">> => Payload
+        <<"payload">> => Payload,
+        <<"timestamp">> => erlang:system_time(millisecond)
     },
     ?check_trace(
         begin
@@ -685,7 +686,8 @@ t_const_timestamp(Config) ->
     SentData = #{
         <<"clientid">> => ClientId,
         <<"topic">> => atom_to_binary(?FUNCTION_NAME),
-        <<"payload">> => Payload
+        <<"payload">> => Payload,
+        <<"timestamp">> => erlang:system_time(millisecond)
     },
     ?assertEqual(ok, send_message(Config, SentData)),
     case QueryMode of
@@ -740,7 +742,7 @@ t_boolean_variants(Config) ->
             SentData = #{
                 <<"clientid">> => ClientId,
                 <<"topic">> => atom_to_binary(?FUNCTION_NAME),
-                <<"timestamp">> => erlang:system_time(nanosecond),
+                <<"timestamp">> => erlang:system_time(millisecond),
                 <<"payload">> => Payload
             },
             ?assertEqual(ok, send_message(Config, SentData)),
@@ -805,7 +807,7 @@ t_bad_timestamp(Config) ->
     SentData = #{
         <<"clientid">> => ClientId,
         <<"topic">> => atom_to_binary(?FUNCTION_NAME),
-        <<"timestamp">> => erlang:system_time(nanosecond),
+        <<"timestamp">> => erlang:system_time(millisecond),
         <<"payload">> => Payload
     },
     ?check_trace(
@@ -949,7 +951,7 @@ t_write_failure(Config) ->
     SentData = #{
         <<"clientid">> => ClientId,
         <<"topic">> => atom_to_binary(?FUNCTION_NAME),
-        <<"timestamp">> => erlang:system_time(nanosecond),
+        <<"timestamp">> => erlang:system_time(millisecond),
         <<"payload">> => Payload
     },
     ?check_trace(

+ 46 - 22
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -35,11 +35,15 @@
     desc/1
 ]).
 
+-type ts_precision() :: ns | us | ms | s.
+
 %% influxdb servers don't need parse
 -define(INFLUXDB_HOST_OPTIONS, #{
     default_port => ?INFLUXDB_DEFAULT_PORT
 }).
 
+-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
+
 %% -------------------------------------------------------------------------------------------------
 %% resource callback
 callback_mode() -> async_if_possible.
@@ -232,15 +236,14 @@ do_start_client(
     ClientConfig,
     Config = #{write_syntax := Lines}
 ) ->
+    Precision = maps:get(precision, Config, ms),
     case influxdb:start_client(ClientConfig) of
         {ok, Client} ->
             case influxdb:is_alive(Client) of
                 true ->
                     State = #{
                         client => Client,
-                        write_syntax => to_config(
-                            Lines, proplists:get_value(precision, ClientConfig)
-                        )
+                        write_syntax => to_config(Lines, Precision)
                     },
                     ?SLOG(info, #{
                         msg => "starting influxdb connector success",
@@ -407,27 +410,36 @@ to_config(Lines, Precision) ->
 to_config([], Acc, _Precision) ->
     lists:reverse(Acc);
 to_config([Item0 | Rest], Acc, Precision) ->
-    Ts = maps:get(timestamp, Item0, undefined),
+    Ts0 = maps:get(timestamp, Item0, undefined),
+    {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
     Item = #{
         measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)),
-        timestamp => preproc_tmpl_timestamp(Ts, Precision),
+        timestamp => Ts,
+        precision => {FromPrecision, ToPrecision},
         tags => to_kv_config(maps:get(tags, Item0)),
         fields => to_kv_config(maps:get(fields, Item0))
     },
     to_config(Rest, [Item | Acc], Precision).
 
-preproc_tmpl_timestamp(undefined, <<"ns">>) ->
-    erlang:system_time(nanosecond);
-preproc_tmpl_timestamp(undefined, <<"us">>) ->
-    erlang:system_time(microsecond);
-preproc_tmpl_timestamp(undefined, <<"ms">>) ->
-    erlang:system_time(millisecond);
-preproc_tmpl_timestamp(undefined, <<"s">>) ->
-    erlang:system_time(second);
-preproc_tmpl_timestamp(Ts, _) when is_integer(Ts) ->
-    Ts;
-preproc_tmpl_timestamp(Ts, _) when is_binary(Ts); is_list(Ts) ->
-    emqx_plugin_libs_rule:preproc_tmpl(Ts).
+%% pre-process the timestamp template
+%% returns a tuple of three elements:
+%% 1. The timestamp template itself.
+%% 2. The source timestamp precision (ms if the template ${timestamp} is used).
+%% 3. The target timestamp precision (configured for the client).
+preproc_tmpl_timestamp(undefined, Precision) ->
+    %% not configured, we default it to the message timestamp
+    preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision);
+preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
+    %% a const value is used which is very much unusual, but we have to add a special handling
+    {Ts, Precision, Precision};
+preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
+    preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
+preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
+    {emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision};
+preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
+    %% a placehold is in use. e.g. ${payload.my_timestamp}
+    %% we can only hope it the value will be of the same precision in the configs
+    {emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}.
 
 to_kv_config(KVfields) ->
     maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
@@ -470,7 +482,8 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
         fields := [{binary(), binary()}],
         measurement := binary(),
         tags := [{binary(), binary()}],
-        timestamp := emqx_plugin_libs_rule:tmpl_token() | integer()
+        timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(),
+        precision := {From :: ts_precision(), To :: ts_precision()}
     }
 ]) -> {ok, [map()]} | {error, term()}.
 data_to_points(Data, SyntaxLines) ->
@@ -529,16 +542,27 @@ line_to_point(
     #{
         measurement := Measurement,
         tags := Tags,
-        fields := Fields
+        fields := Fields,
+        timestamp := Ts,
+        precision := Precision
     } = Item
 ) ->
     {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
     {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
-    Item#{
+    maps:without([precision], Item#{
         measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
         tags => EncodedTags,
-        fields => EncodedFields
-    }.
+        fields => EncodedFields,
+        timestamp => maybe_convert_time_unit(Ts, Precision)
+    }).
+
+maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) ->
+    erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)).
+
+time_unit(s) -> second;
+time_unit(ms) -> millisecond;
+time_unit(us) -> microsecond;
+time_unit(ns) -> nanosecond.
 
 maps_config_to_data(K, V, {Data, Res}) ->
     KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},

+ 2 - 1
lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl

@@ -227,5 +227,6 @@ test_query() ->
     {send_message, #{
         <<"clientid">> => <<"something">>,
         <<"payload">> => #{bool => true},
-        <<"topic">> => <<"connector_test">>
+        <<"topic">> => <<"connector_test">>,
+        <<"timestamp">> => 1678220316257
     }}.