|
|
@@ -12,10 +12,6 @@
|
|
|
|
|
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
|
|
|
|
|
--export([
|
|
|
- write_syntax_type/0
|
|
|
-]).
|
|
|
-
|
|
|
-export([
|
|
|
namespace/0,
|
|
|
roots/0,
|
|
|
@@ -30,20 +26,9 @@
|
|
|
connector_examples/1
|
|
|
]).
|
|
|
|
|
|
--type write_syntax() :: list().
|
|
|
--reflect_type([write_syntax/0]).
|
|
|
--typerefl_from_string({write_syntax/0, ?MODULE, to_datalayers_lines}).
|
|
|
--export([to_datalayers_lines/1]).
|
|
|
-
|
|
|
-define(CONNECTOR_TYPE, datalayers).
|
|
|
-define(ACTION_TYPE, datalayers).
|
|
|
|
|
|
-%% -------------------------------------------------------------------------------------------------
|
|
|
-%% api
|
|
|
-
|
|
|
-write_syntax_type() ->
|
|
|
- typerefl:alias("template", write_syntax()).
|
|
|
-
|
|
|
%% Examples
|
|
|
conn_bridge_examples(Method) ->
|
|
|
[
|
|
|
@@ -118,7 +103,6 @@ values(common, Protocol, SupportUint, TypeOpts) ->
|
|
|
type => list_to_atom(Protocol),
|
|
|
name => <<"demo">>,
|
|
|
enable => true,
|
|
|
- local_topic => <<"local/topic/#">>,
|
|
|
write_syntax =>
|
|
|
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
|
|
|
"${clientid}_int_value=${payload.int_key}i,", SupportUint/binary,
|
|
|
@@ -143,12 +127,6 @@ fields("config_connector") ->
|
|
|
emqx_connector_schema:common_fields() ++
|
|
|
emqx_bridge_datalayers_connector:fields("connector") ++
|
|
|
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
|
|
-fields("post_api") ->
|
|
|
- method_fields(post, datalayers);
|
|
|
-fields("put_api") ->
|
|
|
- method_fields(put, datalayers);
|
|
|
-fields("get_api") ->
|
|
|
- method_fields(get, datalayers);
|
|
|
fields(action) ->
|
|
|
{datalayers,
|
|
|
mk(
|
|
|
@@ -182,39 +160,7 @@ fields(Field) when
|
|
|
Field == "post_bridge_v2";
|
|
|
Field == "put_bridge_v2"
|
|
|
->
|
|
|
- emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(datalayers_action));
|
|
|
-fields(datalayers_api) ->
|
|
|
- datalayers_bridge_common_fields() ++ connector_fields(datalayers).
|
|
|
-
|
|
|
-method_fields(post, ConnectorType) ->
|
|
|
- datalayers_bridge_common_fields() ++
|
|
|
- connector_fields(ConnectorType) ++
|
|
|
- type_name_fields(ConnectorType);
|
|
|
-method_fields(get, ConnectorType) ->
|
|
|
- datalayers_bridge_common_fields() ++
|
|
|
- connector_fields(ConnectorType) ++
|
|
|
- type_name_fields(ConnectorType) ++
|
|
|
- emqx_bridge_schema:status_fields();
|
|
|
-method_fields(put, ConnectorType) ->
|
|
|
- datalayers_bridge_common_fields() ++
|
|
|
- connector_fields(ConnectorType).
|
|
|
-
|
|
|
-datalayers_bridge_common_fields() ->
|
|
|
- emqx_bridge_schema:common_bridge_fields() ++
|
|
|
- [
|
|
|
- {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
|
|
|
- {write_syntax, fun write_syntax/1}
|
|
|
- ] ++
|
|
|
- emqx_resource_schema:fields("resource_opts").
|
|
|
-
|
|
|
-connector_fields(Type) ->
|
|
|
- emqx_bridge_datalayers_connector:fields(Type).
|
|
|
-
|
|
|
-type_name_fields(Type) ->
|
|
|
- [
|
|
|
- {type, mk(Type, #{required => true, desc => ?DESC("desc_type")})},
|
|
|
- {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
|
|
|
- ].
|
|
|
+ emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(datalayers_action)).
|
|
|
|
|
|
desc("config") ->
|
|
|
?DESC("desc_config");
|
|
|
@@ -234,172 +180,16 @@ desc(_) ->
|
|
|
undefined.
|
|
|
|
|
|
write_syntax(type) ->
|
|
|
- write_syntax_type();
|
|
|
+ emqx_bridge_influxdb:write_syntax_type();
|
|
|
write_syntax(required) ->
|
|
|
true;
|
|
|
write_syntax(validator) ->
|
|
|
[?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")];
|
|
|
write_syntax(converter) ->
|
|
|
- fun to_datalayers_lines/1;
|
|
|
+ fun emqx_bridge_influxdb:to_influx_lines/1;
|
|
|
write_syntax(desc) ->
|
|
|
?DESC("write_syntax");
|
|
|
write_syntax(format) ->
|
|
|
<<"sql">>;
|
|
|
write_syntax(_) ->
|
|
|
undefined.
|
|
|
-
|
|
|
-to_datalayers_lines(Lines = [#{} | _]) ->
|
|
|
- %% already parsed/converted (e.g.: bridge_probe, after hocon_tconf:check_plain)
|
|
|
- Lines;
|
|
|
-to_datalayers_lines(RawLines) ->
|
|
|
- try
|
|
|
- datalayers_lines(str(RawLines), [])
|
|
|
- catch
|
|
|
- _:Reason:Stacktrace ->
|
|
|
- Msg = lists:flatten(
|
|
|
- io_lib:format("Unable to parse Datalayers line protocol: ~p", [RawLines])
|
|
|
- ),
|
|
|
- ?SLOG(error, #{msg => Msg, error_reason => Reason, stacktrace => Stacktrace}),
|
|
|
- throw(Msg)
|
|
|
- end.
|
|
|
-
|
|
|
--define(MEASUREMENT_ESC_CHARS, [$,, $\s]).
|
|
|
--define(TAG_FIELD_KEY_ESC_CHARS, [$,, $=, $\s]).
|
|
|
--define(FIELD_VAL_ESC_CHARS, [$", $\\]).
|
|
|
-% Common separator for both tags and fields
|
|
|
--define(SEP, $\s).
|
|
|
--define(MEASUREMENT_TAG_SEP, $,).
|
|
|
--define(KEY_SEP, $=).
|
|
|
--define(VAL_SEP, $,).
|
|
|
--define(NON_EMPTY, [_ | _]).
|
|
|
-
|
|
|
-datalayers_lines([] = _RawLines, Acc) ->
|
|
|
- ?NON_EMPTY = lists:reverse(Acc);
|
|
|
-datalayers_lines(RawLines, Acc) ->
|
|
|
- {Acc1, RawLines1} = datalayers_line(string:trim(RawLines, leading, "\s\n"), Acc),
|
|
|
- datalayers_lines(RawLines1, Acc1).
|
|
|
-
|
|
|
-datalayers_line([], Acc) ->
|
|
|
- {Acc, []};
|
|
|
-datalayers_line(Line, Acc) ->
|
|
|
- {?NON_EMPTY = Measurement, Line1} = measurement(Line),
|
|
|
- {Tags, Line2} = tags(Line1),
|
|
|
- {?NON_EMPTY = Fields, Line3} = datalayers_fields(Line2),
|
|
|
- {Timestamp, Line4} = timestamp(Line3),
|
|
|
- {
|
|
|
- [
|
|
|
- #{
|
|
|
- measurement => Measurement,
|
|
|
- tags => Tags,
|
|
|
- fields => Fields,
|
|
|
- timestamp => Timestamp
|
|
|
- }
|
|
|
- | Acc
|
|
|
- ],
|
|
|
- Line4
|
|
|
- }.
|
|
|
-
|
|
|
-measurement(Line) ->
|
|
|
- unescape(?MEASUREMENT_ESC_CHARS, [?MEASUREMENT_TAG_SEP, ?SEP], Line, []).
|
|
|
-
|
|
|
-tags([?MEASUREMENT_TAG_SEP | Line]) ->
|
|
|
- tags1(Line, []);
|
|
|
-tags(Line) ->
|
|
|
- {[], Line}.
|
|
|
-
|
|
|
-%% Empty line is invalid as fields are required after tags,
|
|
|
-%% need to break recursion here and fail later on parsing fields
|
|
|
-tags1([] = Line, Acc) ->
|
|
|
- {lists:reverse(Acc), Line};
|
|
|
-%% Matching non empty Acc treats lines like "m, field=field_val" invalid
|
|
|
-tags1([?SEP | _] = Line, ?NON_EMPTY = Acc) ->
|
|
|
- {lists:reverse(Acc), Line};
|
|
|
-tags1(Line, Acc) ->
|
|
|
- {Tag, Line1} = tag(Line),
|
|
|
- tags1(Line1, [Tag | Acc]).
|
|
|
-
|
|
|
-tag(Line) ->
|
|
|
- {?NON_EMPTY = Key, Line1} = key(Line),
|
|
|
- {?NON_EMPTY = Val, Line2} = tag_val(Line1),
|
|
|
- {{Key, Val}, Line2}.
|
|
|
-
|
|
|
-tag_val(Line) ->
|
|
|
- {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP], Line, []),
|
|
|
- {Val, strip_l(Line1, ?VAL_SEP)}.
|
|
|
-
|
|
|
-datalayers_fields([?SEP | Line]) ->
|
|
|
- fields1(string:trim(Line, leading, "\s"), []).
|
|
|
-
|
|
|
-%% Timestamp is optional, so fields may be at the very end of the line
|
|
|
-fields1([Ch | _] = Line, Acc) when Ch =:= ?SEP; Ch =:= $\n ->
|
|
|
- {lists:reverse(Acc), Line};
|
|
|
-fields1([] = Line, Acc) ->
|
|
|
- {lists:reverse(Acc), Line};
|
|
|
-fields1(Line, Acc) ->
|
|
|
- {Field, Line1} = field(Line),
|
|
|
- fields1(Line1, [Field | Acc]).
|
|
|
-
|
|
|
-field(Line) ->
|
|
|
- {?NON_EMPTY = Key, Line1} = key(Line),
|
|
|
- {Val, Line2} = field_val(Line1),
|
|
|
- {{Key, Val}, Line2}.
|
|
|
-
|
|
|
-field_val([$" | Line]) ->
|
|
|
- {Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []),
|
|
|
- %% Quoted val can be empty
|
|
|
- {{quoted, Val}, strip_l(Line1, ?VAL_SEP)};
|
|
|
-field_val(Line) ->
|
|
|
- %% Unquoted value should not be un-escaped according to InfluxDB protocol,
|
|
|
- %% as it can only hold float, integer, uinteger or boolean value.
|
|
|
- %% However, as templates are possible, un-escaping is applied here,
|
|
|
- %% which also helps to detect some invalid lines, e.g.: "m,tag=1 field= ${timestamp}"
|
|
|
- {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP, $\n], Line, []),
|
|
|
- {?NON_EMPTY = Val, strip_l(Line1, ?VAL_SEP)}.
|
|
|
-
|
|
|
-timestamp([?SEP | Line]) ->
|
|
|
- Line1 = string:trim(Line, leading, "\s"),
|
|
|
- %% Similarly to unquoted field value, un-escape a timestamp to validate and handle
|
|
|
- %% potentially escaped characters in a template
|
|
|
- {T, Line2} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?SEP, $\n], Line1, []),
|
|
|
- {timestamp1(T), Line2};
|
|
|
-timestamp(Line) ->
|
|
|
- {undefined, Line}.
|
|
|
-
|
|
|
-timestamp1(?NON_EMPTY = Ts) -> Ts;
|
|
|
-timestamp1(_Ts) -> undefined.
|
|
|
-
|
|
|
-%% Common for both tag and field keys
|
|
|
-key(Line) ->
|
|
|
- {Key, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?KEY_SEP], Line, []),
|
|
|
- {Key, strip_l(Line1, ?KEY_SEP)}.
|
|
|
-
|
|
|
-%% Only strip a character between pairs, don't strip it(and let it fail)
|
|
|
-%% if the char to be stripped is at the end, e.g.: m,tag=val, field=val
|
|
|
-strip_l([Ch, Ch1 | Str], Ch) when Ch1 =/= ?SEP ->
|
|
|
- [Ch1 | Str];
|
|
|
-strip_l(Str, _Ch) ->
|
|
|
- Str.
|
|
|
-
|
|
|
-unescape(EscapeChars, SepChars, [$\\, Char | T], Acc) ->
|
|
|
- ShouldEscapeBackslash = lists:member($\\, EscapeChars),
|
|
|
- Acc1 =
|
|
|
- case lists:member(Char, EscapeChars) of
|
|
|
- true -> [Char | Acc];
|
|
|
- false when not ShouldEscapeBackslash -> [Char, $\\ | Acc]
|
|
|
- end,
|
|
|
- unescape(EscapeChars, SepChars, T, Acc1);
|
|
|
-unescape(EscapeChars, SepChars, [Char | T] = L, Acc) ->
|
|
|
- IsEscapeChar = lists:member(Char, EscapeChars),
|
|
|
- case lists:member(Char, SepChars) of
|
|
|
- true -> {lists:reverse(Acc), L};
|
|
|
- false when not IsEscapeChar -> unescape(EscapeChars, SepChars, T, [Char | Acc])
|
|
|
- end;
|
|
|
-unescape(_EscapeChars, _SepChars, [] = L, Acc) ->
|
|
|
- {lists:reverse(Acc), L}.
|
|
|
-
|
|
|
-str(A) when is_atom(A) ->
|
|
|
- atom_to_list(A);
|
|
|
-str(B) when is_binary(B) ->
|
|
|
- binary_to_list(B);
|
|
|
-str(S) when is_list(S) ->
|
|
|
- S.
|