|
|
@@ -444,19 +444,22 @@ proc_data(
|
|
|
DataType = list_to_binary(
|
|
|
string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
|
|
|
),
|
|
|
- case proc_value(DataType, ValueTkn, Msg) of
|
|
|
- {ok, Value} ->
|
|
|
- proc_data(T, Msg, Nows, [
|
|
|
- #{
|
|
|
- timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
|
|
|
- measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
|
|
|
- data_type => DataType,
|
|
|
- value => Value
|
|
|
- }
|
|
|
- | Acc
|
|
|
- ]);
|
|
|
- Error ->
|
|
|
- Error
|
|
|
+ try
|
|
|
+ proc_data(T, Msg, Nows, [
|
|
|
+ #{
|
|
|
+ timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
|
|
|
+ measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
|
|
|
+ data_type => DataType,
|
|
|
+ value => proc_value(DataType, ValueTkn, Msg)
|
|
|
+ }
|
|
|
+ | Acc
|
|
|
+ ])
|
|
|
+ catch
|
|
|
+ throw:Reason ->
|
|
|
+ {error, Reason};
|
|
|
+ Error:Reason:Stacktrace ->
|
|
|
+ ?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}),
|
|
|
+ {error, invalid_data}
|
|
|
end;
|
|
|
proc_data([], _Msg, _Nows, Acc) ->
|
|
|
{ok, lists:reverse(Acc)}.
|
|
|
@@ -478,19 +481,18 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
|
|
|
binary_to_integer(Timestamp).
|
|
|
|
|
|
proc_value(<<"TEXT">>, ValueTkn, Msg) ->
|
|
|
- {ok,
|
|
|
- case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
|
|
|
- <<"undefined">> -> null;
|
|
|
- Val -> Val
|
|
|
- end};
|
|
|
+ case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
|
|
|
+ <<"undefined">> -> null;
|
|
|
+ Val -> Val
|
|
|
+ end;
|
|
|
proc_value(<<"BOOLEAN">>, ValueTkn, Msg) ->
|
|
|
- {ok, convert_bool(replace_var(ValueTkn, Msg))};
|
|
|
+ convert_bool(replace_var(ValueTkn, Msg));
|
|
|
proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> ->
|
|
|
- {ok, convert_int(replace_var(ValueTkn, Msg))};
|
|
|
+ convert_int(replace_var(ValueTkn, Msg));
|
|
|
proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
|
|
|
- {ok, convert_float(replace_var(ValueTkn, Msg))};
|
|
|
+ convert_float(replace_var(ValueTkn, Msg));
|
|
|
proc_value(Type, _, _) ->
|
|
|
- {error, {invalid_type, Type}}.
|
|
|
+ throw(#{reason => invalid_type, type => Type}).
|
|
|
|
|
|
replace_var(Tokens, Data) when is_list(Tokens) ->
|
|
|
[Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
|