Просмотр исходного кода

test: add README to influxdb test script

Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
24f476e35f

+ 50 - 2
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl

@@ -663,6 +663,54 @@ t_start_ok_no_subject_tags_write_syntax(Config) ->
     ),
     ok.
 
+t_const_timestamp(Config) ->
+    QueryMode = ?config(query_mode, Config),
+    Const = erlang:system_time(nanosecond),
+    ConstBin = integer_to_binary(Const),
+    TsStr = iolist_to_binary(
+        calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}])
+    ),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(
+            Config,
+            #{
+                <<"write_syntax">> =>
+                    <<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>>
+            }
+        )
+    ),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{<<"foo">> => 123},
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"payload">> => Payload
+    },
+    ?assertEqual(ok, send_message(Config, SentData)),
+    case QueryMode of
+        async -> ct:sleep(500);
+        sync -> ok
+    end,
+    PersistedData = query_by_clientid(ClientId, Config),
+    Expected = #{foo => <<"123">>},
+    assert_persisted_data(ClientId, Expected, PersistedData),
+    TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)),
+    TimeReturned = pad_zero(TimeReturned0),
+    ?assertEqual(TsStr, TimeReturned).
+
+%% influxdb returns timestamps without trailing zeros such as
+%% "2023-02-28T17:21:51.63678163Z"
+%% while the standard should be
+%% "2023-02-28T17:21:51.636781630Z"
+pad_zero(BinTs) ->
+    StrTs = binary_to_list(BinTs),
+    [Nano | Rest] = lists:reverse(string:tokens(StrTs, ".")),
+    [$Z | NanoNum] = lists:reverse(Nano),
+    Padding = lists:duplicate(10 - length(Nano), $0),
+    NewNano = lists:reverse(NanoNum) ++ Padding ++ "Z",
+    iolist_to_binary(string:join(lists:reverse([NewNano | Rest]), ".")).
+
 t_boolean_variants(Config) ->
     QueryMode = ?config(query_mode, Config),
     ?assertMatch(
@@ -783,7 +831,7 @@ t_bad_timestamp(Config) ->
                         [
                             #{
                                 error := [
-                                    {error, {bad_timestamp, [<<"bad_timestamp">>]}}
+                                    {error, {bad_timestamp, <<"bad_timestamp">>}}
                                 ]
                             }
                         ],
@@ -793,7 +841,7 @@ t_bad_timestamp(Config) ->
                     ?assertEqual(
                         {error,
                             {unrecoverable_error, [
-                                {error, {bad_timestamp, [<<"bad_timestamp">>]}}
+                                {error, {bad_timestamp, <<"bad_timestamp">>}}
                             ]}},
                         Return
                     );

+ 13 - 3
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -490,11 +490,11 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error
     is_list(Ts)
 ->
     TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
-    case emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions) of
-        [TsInt] when is_integer(TsInt) ->
+    case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of
+        {ok, TsInt} ->
             Item1 = Item#{timestamp => TsInt},
             continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
-        BadTs ->
+        {error, BadTs} ->
             lines_to_points(Data, Rest, ResultPointsAcc, [
                 {error, {bad_timestamp, BadTs}} | ErrorPointsAcc
             ])
@@ -504,6 +504,16 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error
 ->
     continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
 
+parse_timestamp([TsInt]) when is_integer(TsInt) ->
+    {ok, TsInt};
+parse_timestamp([TsBin]) ->
+    try
+        {ok, binary_to_integer(TsBin)}
+    catch
+        _:_ ->
+            {error, TsBin}
+    end.
+
 continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
     case line_to_point(Data, Item) of
         #{fields := Fields} when map_size(Fields) =:= 0 ->

+ 25 - 0
scripts/test/influx/README.md

@@ -0,0 +1,25 @@
+# Test influxdb integration
+
+This script starts two EMQX nodes and a influxdb server in docker container.
+The bootstraping rule engine and data bridge config is provided in influx-bridge.conf
+which got included in the bootstraping config bundle emqx.conf.
+
+## Start the cluster
+
+./start.sh
+
+## How to run tests
+
+The rule and bridge are configured to pipe data from MQTT topic `t/#` to the 'myvalues' measurement in the 'mqtt' bucket.
+
+### Manual verification steps
+
+* Start the cluster
+* Send mqtt messages to topic `/t/a` with a JSON object as MQTT paylaod like `{"value": 1}`
+* Observe data in influxdb `curl -k -H 'Authorization: Token abcdefg' -G 'https://localhost:8086/query?pretty=true' --data-urlencode "db=mqtt" --data-urlencode "q=SELECT * from myvalues"`
+
+Example output the curl query against influxdb:
+
+```
+{"results":[{"statement_id":0,"series":[{"name":"myvalues","columns":["time","clientid","value"],"values":[["2023-02-28T11:13:29.039Z","a1",123]]}]}]
+```

+ 1 - 1
scripts/test/influx/influx-bridge.conf

@@ -30,7 +30,7 @@ bridges {
         versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
       }
       token = "abcdefg"
-      write_syntax = "mqtt,clientid=${clientid} value=${payload.value}"
+      write_syntax = "myvalues,clientid=${clientid} value=${payload.value}i"
     }
   }
 }