Просмотр исходного кода

feat: handle escaped characters in InfluxDB data bridge write_syntax

Closes: EMQX-7834
Serge Tupchii 2 лет назад
Родитель
Сommit
3a46681dde

+ 2 - 0
changes/ee/feat-10165.en.md

@@ -0,0 +1,2 @@
+Support escaped special characters in InfluxDB data bridge write_syntax.
+This update allows to use escaped special characters in string elements in accordance with InfluxDB line protocol.

+ 141 - 43
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl

@@ -3,6 +3,7 @@
 %%--------------------------------------------------------------------
 -module(emqx_ee_bridge_influxdb).
 
+-include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_bridge/include/emqx_bridge.hrl").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include_lib("typerefl/include/types.hrl").
@@ -169,53 +170,150 @@ write_syntax(_) ->
     undefined.
 
 to_influx_lines(RawLines) ->
-    Lines = string:tokens(str(RawLines), "\n"),
-    lists:reverse(lists:foldl(fun converter_influx_line/2, [], Lines)).
-
-converter_influx_line(Line, AccIn) ->
-    case string:tokens(str(Line), " ") of
-        [MeasurementAndTags, Fields, Timestamp] ->
-            append_influx_item(MeasurementAndTags, Fields, Timestamp, AccIn);
-        [MeasurementAndTags, Fields] ->
-            append_influx_item(MeasurementAndTags, Fields, undefined, AccIn);
-        _ ->
-            throw("Bad InfluxDB Line Protocol schema")
+    try
+        influx_lines(str(RawLines), [])
+    catch
+        _:Reason:Stacktrace ->
+            Msg = lists:flatten(
+                io_lib:format("Unable to parse InfluxDB line protocol: ~p", [RawLines])
+            ),
+            ?SLOG(error, #{msg => Msg, error_reason => Reason, stacktrace => Stacktrace}),
+            throw(Msg)
     end.
 
-append_influx_item(MeasurementAndTags, Fields, Timestamp, Acc) ->
-    {Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags),
-    [
-        #{
-            measurement => Measurement,
-            tags => kv_pairs(Tags),
-            fields => kv_pairs(string:tokens(Fields, ",")),
-            timestamp => Timestamp
-        }
-        | Acc
-    ].
+-define(MEASUREMENT_ESC_CHARS, [$,, $\s]).
+-define(TAG_FIELD_KEY_ESC_CHARS, [$,, $=, $\s]).
+-define(FIELD_VAL_ESC_CHARS, [$", $\\]).
+% Common separator for both tags and fields
+-define(SEP, $\s).
+-define(MEASUREMENT_TAG_SEP, $,).
+-define(KEY_SEP, $=).
+-define(VAL_SEP, $,).
+-define(NON_EMPTY, [_ | _]).
 
-split_measurement_and_tags(Subject) ->
-    case string:tokens(Subject, ",") of
-        [] ->
-            throw("Bad Measurement schema");
-        [Measurement] ->
-            {Measurement, []};
-        [Measurement | Tags] ->
-            {Measurement, Tags}
-    end.
+influx_lines([] = _RawLines, Acc) ->
+    ?NON_EMPTY = lists:reverse(Acc);
+influx_lines(RawLines, Acc) ->
+    {Acc1, RawLines1} = influx_line(string:trim(RawLines, leading, "\s\n"), Acc),
+    influx_lines(RawLines1, Acc1).
 
-kv_pairs(Pairs) ->
-    kv_pairs(Pairs, []).
-kv_pairs([], Acc) ->
-    lists:reverse(Acc);
-kv_pairs([Pair | Rest], Acc) ->
-    case string:tokens(Pair, "=") of
-        [K, V] ->
-            %% Reduplicated keys will be overwritten. Follows InfluxDB Line Protocol.
-            kv_pairs(Rest, [{K, V} | Acc]);
-        _ ->
-            throw(io_lib:format("Bad InfluxDB Line Protocol Key Value pair: ~p", Pair))
-    end.
+influx_line([], Acc) ->
+    {Acc, []};
+influx_line(Line, Acc) ->
+    {?NON_EMPTY = Measurement, Line1} = measurement(Line),
+    {Tags, Line2} = tags(Line1),
+    {?NON_EMPTY = Fields, Line3} = influx_fields(Line2),
+    {Timestamp, Line4} = timestamp(Line3),
+    {
+        [
+            #{
+                measurement => Measurement,
+                tags => Tags,
+                fields => Fields,
+                timestamp => Timestamp
+            }
+            | Acc
+        ],
+        Line4
+    }.
+
+measurement(Line) ->
+    unescape(?MEASUREMENT_ESC_CHARS, [?MEASUREMENT_TAG_SEP, ?SEP], Line, []).
+
+tags([?MEASUREMENT_TAG_SEP | Line]) ->
+    tags1(Line, []);
+tags(Line) ->
+    {[], Line}.
+
+%% Empty line is invalid as fields are required after tags,
+%% need to break recursion here and fail later on parsing fields
+tags1([] = Line, Acc) ->
+    {lists:reverse(Acc), Line};
+%% Matching non empty Acc treats lines like "m, field=field_val" invalid
+tags1([?SEP | _] = Line, ?NON_EMPTY = Acc) ->
+    {lists:reverse(Acc), Line};
+tags1(Line, Acc) ->
+    {Tag, Line1} = tag(Line),
+    tags1(Line1, [Tag | Acc]).
+
+tag(Line) ->
+    {?NON_EMPTY = Key, Line1} = key(Line),
+    {?NON_EMPTY = Val, Line2} = tag_val(Line1),
+    {{Key, Val}, Line2}.
+
+tag_val(Line) ->
+    {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP], Line, []),
+    {Val, strip_l(Line1, ?VAL_SEP)}.
+
+influx_fields([?SEP | Line]) ->
+    fields1(string:trim(Line, leading, "\s"), []).
+
+%% Timestamp is optional, so fields may be at the very end of the line
+fields1([Ch | _] = Line, Acc) when Ch =:= ?SEP; Ch =:= $\n ->
+    {lists:reverse(Acc), Line};
+fields1([] = Line, Acc) ->
+    {lists:reverse(Acc), Line};
+fields1(Line, Acc) ->
+    {Field, Line1} = field(Line),
+    fields1(Line1, [Field | Acc]).
+
+field(Line) ->
+    {?NON_EMPTY = Key, Line1} = key(Line),
+    {Val, Line2} = field_val(Line1),
+    {{Key, Val}, Line2}.
+
+field_val([$" | Line]) ->
+    {Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []),
+    %% Quoted val can be empty
+    {Val, strip_l(Line1, ?VAL_SEP)};
+field_val(Line) ->
+    %% Unquoted value should not be un-escaped according to InfluxDB protocol,
+    %% as it can only hold float, integer, uinteger or boolean value.
+    %% However, as templates are possible, un-escaping is applied here,
+    %% which also helps to detect some invalid lines, e.g.: "m,tag=1 field= ${timestamp}"
+    {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP, $\n], Line, []),
+    {?NON_EMPTY = Val, strip_l(Line1, ?VAL_SEP)}.
+
+timestamp([?SEP | Line]) ->
+    Line1 = string:trim(Line, leading, "\s"),
+    %% Similarly to unquoted field value, un-escape a timestamp to validate and handle
+    %% potentially escaped characters in a template
+    {T, Line2} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?SEP, $\n], Line1, []),
+    {timestamp1(T), Line2};
+timestamp(Line) ->
+    {undefined, Line}.
+
+timestamp1(?NON_EMPTY = Ts) -> Ts;
+timestamp1(_Ts) -> undefined.
+
+%% Common for both tag and field keys
+key(Line) ->
+    {Key, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?KEY_SEP], Line, []),
+    {Key, strip_l(Line1, ?KEY_SEP)}.
+
+%% Only strip a character between pairs, don't strip it(and let it fail)
+%% if the char to be stripped is at the end, e.g.: m,tag=val, field=val
+strip_l([Ch, Ch1 | Str], Ch) when Ch1 =/= ?SEP ->
+    [Ch1 | Str];
+strip_l(Str, _Ch) ->
+    Str.
+
+unescape(EscapeChars, SepChars, [$\\, Char | T], Acc) ->
+    ShouldEscapeBackslash = lists:member($\\, EscapeChars),
+    Acc1 =
+        case lists:member(Char, EscapeChars) of
+            true -> [Char | Acc];
+            false when not ShouldEscapeBackslash -> [Char, $\\ | Acc]
+        end,
+    unescape(EscapeChars, SepChars, T, Acc1);
+unescape(EscapeChars, SepChars, [Char | T] = L, Acc) ->
+    IsEscapeChar = lists:member(Char, EscapeChars),
+    case lists:member(Char, SepChars) of
+        true -> {lists:reverse(Acc), L};
+        false when not IsEscapeChar -> unescape(EscapeChars, SepChars, T, [Char | Acc])
+    end;
+unescape(_EscapeChars, _SepChars, [] = L, Acc) ->
+    {lists:reverse(Acc), L}.
 
 str(A) when is_atom(A) ->
     atom_to_list(A);

+ 328 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_tests.erl

@@ -0,0 +1,328 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_influxdb_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-import(emqx_ee_bridge_influxdb, [to_influx_lines/1]).
+
+-define(INVALID_LINES, [
+    "   ",
+    " \n",
+    "  \n\n\n  ",
+    "\n",
+    "  \n\n   \n  \n",
+    "measurement",
+    "measurement ",
+    "measurement,tag",
+    "measurement field",
+    "measurement,tag field",
+    "measurement,tag field ${timestamp}",
+    "measurement,tag=",
+    "measurement,tag=tag1",
+    "measurement,tag =",
+    "measurement field=",
+    "measurement field= ",
+    "measurement field = ",
+    "measurement, tag = field = ",
+    "measurement, tag = field = ",
+    "measurement, tag = tag_val field = field_val",
+    "measurement, tag = tag_val field = field_val ${timestamp}",
+    "measurement,= = ${timestamp}",
+    "measurement,t=a, f=a, ${timestamp}",
+    "measurement,t=a,t1=b, f=a,f1=b, ${timestamp}",
+    "measurement,t=a,t1=b, f=a,f1=b,",
+    "measurement,t=a, t1=b, f=a,f1=b,",
+    "measurement,t=a,,t1=b, f=a,f1=b,",
+    "measurement,t=a,,t1=b f=a,,f1=b",
+    "measurement,t=a,,t1=b f=a,f1=b ${timestamp}",
+    "measurement, f=a,f1=b",
+    "measurement, f=a,f1=b ${timestamp}",
+    "measurement,, f=a,f1=b ${timestamp}",
+    "measurement,, f=a,f1=b",
+    "measurement,, f=a,f1=b,, ${timestamp}",
+    "measurement f=a,f1=b,, ${timestamp}",
+    "measurement,t=a f=a,f1=b,, ${timestamp}",
+    "measurement,t=a f=a,f1=b,, ",
+    "measurement,t=a f=a,f1=b,,",
+    "measurement, t=a  f=a,f1=b",
+    "measurement,t=a f=a, f1=b",
+    "measurement,t=a f=a, f1=b ${timestamp}",
+    "measurement, t=a  f=a, f1=b ${timestamp}",
+    "measurement,t= a f=a,f1=b ${timestamp}",
+    "measurement,t= a f=a,f1 =b ${timestamp}",
+    "measurement, t = a f = a,f1 = b ${timestamp}",
+    "measurement,t=a f=a,f1=b \n ${timestamp}",
+    "measurement,t=a \n f=a,f1=b \n ${timestamp}",
+    "measurement,t=a \n f=a,f1=b \n ",
+    "\n measurement,t=a \n f=a,f1=b \n ${timestamp}",
+    "\n measurement,t=a \n f=a,f1=b \n",
+    %% not escaped backslash in a quoted field value is invalid
+    "measurement,tag=1 field=\"val\\1\""
+]).
+
+-define(VALID_LINE_PARSED_PAIRS, [
+    {"m1,tag=tag1 field=field1 ${timestamp1}", #{
+        measurement => "m1",
+        tags => [{"tag", "tag1"}],
+        fields => [{"field", "field1"}],
+        timestamp => "${timestamp1}"
+    }},
+    {"m2,tag=tag2 field=field2", #{
+        measurement => "m2",
+        tags => [{"tag", "tag2"}],
+        fields => [{"field", "field2"}],
+        timestamp => undefined
+    }},
+    {"m3 field=field3 ${timestamp3}", #{
+        measurement => "m3",
+        tags => [],
+        fields => [{"field", "field3"}],
+        timestamp => "${timestamp3}"
+    }},
+    {"m4 field=field4", #{
+        measurement => "m4",
+        tags => [],
+        fields => [{"field", "field4"}],
+        timestamp => undefined
+    }},
+    {"m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5}",
+        #{
+            measurement => "m5",
+            tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
+            fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
+            timestamp => "${timestamp5}"
+        }},
+    {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b", #{
+        measurement => "m6",
+        tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
+        fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+        timestamp => undefined
+    }},
+    {"m7,tag=tag7,tag_a=\"tag7a\",tag_b=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\"",
+        #{
+            measurement => "m7",
+            tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}],
+            fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}],
+            timestamp => undefined
+        }},
+    {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}",
+        #{
+            measurement => "m8",
+            tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
+            fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}],
+            timestamp => "${timestamp8}"
+        }},
+    {"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
+        #{
+            measurement => "m9",
+            tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
+            fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
+            timestamp => "${timestamp9}"
+        }},
+    {"m10 field=\"\" ${timestamp10}", #{
+        measurement => "m10",
+        tags => [],
+        fields => [{"field", ""}],
+        timestamp => "${timestamp10}"
+    }}
+]).
+
+-define(VALID_LINE_EXTRA_SPACES_PARSED_PAIRS, [
+    {"\n  m1,tag=tag1  field=field1  ${timestamp1} \n", #{
+        measurement => "m1",
+        tags => [{"tag", "tag1"}],
+        fields => [{"field", "field1"}],
+        timestamp => "${timestamp1}"
+    }},
+    {"  m2,tag=tag2  field=field2  ", #{
+        measurement => "m2",
+        tags => [{"tag", "tag2"}],
+        fields => [{"field", "field2"}],
+        timestamp => undefined
+    }},
+    {" m3  field=field3   ${timestamp3}  ", #{
+        measurement => "m3",
+        tags => [],
+        fields => [{"field", "field3"}],
+        timestamp => "${timestamp3}"
+    }},
+    {" \n m4  field=field4\n ", #{
+        measurement => "m4",
+        tags => [],
+        fields => [{"field", "field4"}],
+        timestamp => undefined
+    }},
+    {" \n m5,tag=tag5,tag_a=tag5a,tag_b=tag5b   field=field5,field_a=field5a,field_b=field5b    ${timestamp5}  \n",
+        #{
+            measurement => "m5",
+            tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
+            fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
+            timestamp => "${timestamp5}"
+        }},
+    {" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b  field=field6,field_a=field6a,field_b=field6b\n  ", #{
+        measurement => "m6",
+        tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
+        fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+        timestamp => undefined
+    }}
+]).
+
+-define(VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS, [
+    {"m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1}", #{
+        measurement => "m =1,",
+        tags => [{",tag =", "=tag 1,"}],
+        fields => [{",fie ld ", " field,1"}],
+        timestamp => "${timestamp1}"
+    }},
+    {"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{
+        measurement => "m2",
+        tags => [{"tag", "tag2"}],
+        fields => [{"field", "field \"2\",\n"}],
+        timestamp => undefined
+    }},
+    {"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{
+        measurement => "m 3",
+        tags => [],
+        fields => [{"field", "field3"}],
+        timestamp => "${payload.timestamp 3}"
+    }},
+    {"m4 field=\"\\\"field\\\\4\\\"\"", #{
+        measurement => "m4",
+        tags => [],
+        fields => [{"field", "\"field\\4\""}],
+        timestamp => undefined
+    }},
+    {"m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}",
+        #{
+            measurement => "m5,mA",
+            tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
+            fields => [
+                {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
+            ],
+            timestamp => "${timestamp5}"
+        }},
+    {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"",
+        #{
+            measurement => "m6",
+            tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
+            fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+            timestamp => undefined
+        }},
+    {"\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\\\\\n\"",
+        #{
+            measurement => "  m7  ",
+            tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
+            fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}],
+            timestamp => undefined
+        }},
+    {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"\\\"field\\\" = 8b\" ${timestamp8}",
+        #{
+            measurement => "m8",
+            tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
+            fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}],
+            timestamp => "${timestamp8}"
+        }},
+    {"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
+        #{
+            measurement => "m\\9",
+            tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
+            fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
+            timestamp => "${timestamp9}"
+        }},
+    {"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{
+        measurement => "m,10",
+        tags => [],
+        %% backslash should not be un-escaped in tag key
+        fields => [{"\"field\\\\\"", ""}],
+        timestamp => "${timestamp10}"
+    }}
+]).
+
+-define(VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS, [
+    {" \n m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\,   \\,fie\\ ld\\ =\\ field\\,1  ${timestamp1}  ", #{
+        measurement => "m =1,",
+        tags => [{",tag =", "=tag 1,"}],
+        fields => [{",fie ld ", " field,1"}],
+        timestamp => "${timestamp1}"
+    }},
+    {" m2,tag=tag2   field=\"field \\\"2\\\",\n\"  ", #{
+        measurement => "m2",
+        tags => [{"tag", "tag2"}],
+        fields => [{"field", "field \"2\",\n"}],
+        timestamp => undefined
+    }},
+    {"  m\\ 3   field=\"field3\"   ${payload.timestamp\\ 3}  ", #{
+        measurement => "m 3",
+        tags => [],
+        fields => [{"field", "field3"}],
+        timestamp => "${payload.timestamp 3}"
+    }},
+    {"   m4       field=\"\\\"field\\\\4\\\"\"    ", #{
+        measurement => "m4",
+        tags => [],
+        fields => [{"field", "\"field\\4\""}],
+        timestamp => undefined
+    }},
+    {" m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b   \\ field\\ =field5,field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b   ${timestamp5}    ",
+        #{
+            measurement => "m5,mA",
+            tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
+            fields => [
+                {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
+            ],
+            timestamp => "${timestamp5}"
+        }},
+    {"  m6,tag=tag6,tag_a=tag6a,tag_b=tag6b   field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"  ",
+        #{
+            measurement => "m6",
+            tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
+            fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+            timestamp => undefined
+        }}
+]).
+
+invalid_write_syntax_line_test_() ->
+    [?_assertThrow(_, to_influx_lines(L)) || L <- ?INVALID_LINES].
+
+invalid_write_syntax_multiline_test_() ->
+    LinesList = [
+        join("\n", ?INVALID_LINES),
+        join("\n\n\n", ?INVALID_LINES),
+        join("\n\n", lists:reverse(?INVALID_LINES))
+    ],
+    [?_assertThrow(_, to_influx_lines(Lines)) || Lines <- LinesList].
+
+valid_write_syntax_test_() ->
+    test_pairs(?VALID_LINE_PARSED_PAIRS).
+
+valid_write_syntax_with_extra_spaces_test_() ->
+    test_pairs(?VALID_LINE_EXTRA_SPACES_PARSED_PAIRS).
+
+valid_write_syntax_escaped_chars_test_() ->
+    test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS).
+
+valid_write_syntax_escaped_chars_with_extra_spaces_test_() ->
+    test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS).
+
+test_pairs(PairsList) ->
+    {Lines, AllExpected} = lists:unzip(PairsList),
+    JoinedLines = join("\n", Lines),
+    JoinedLines1 = join("\n\n\n", Lines),
+    JoinedLines2 = join("\n\n", lists:reverse(Lines)),
+    SingleLineTests =
+        [
+            ?_assertEqual([Expected], to_influx_lines(Line))
+         || {Line, Expected} <- PairsList
+        ],
+    JoinedLinesTests =
+        [
+            ?_assertEqual(AllExpected, to_influx_lines(JoinedLines)),
+            ?_assertEqual(AllExpected, to_influx_lines(JoinedLines1)),
+            ?_assertEqual(lists:reverse(AllExpected), to_influx_lines(JoinedLines2))
+        ],
+    SingleLineTests ++ JoinedLinesTests.
+
+join(Sep, LinesList) ->
+    lists:flatten(lists:join(Sep, LinesList)).