|
|
@@ -143,24 +143,42 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
|
|
|
%% Internal Functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-preproc_data(DataList) ->
|
|
|
+make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) ->
|
|
|
+ emqx_utils_json:decode(PayloadUnparsed, [return_maps]);
|
|
|
+make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) ->
|
|
|
+ lists:map(fun make_parsed_payload/1, PayloadUnparsed);
|
|
|
+make_parsed_payload(
|
|
|
+ #{
|
|
|
+ measurement := Measurement,
|
|
|
+ data_type := DataType,
|
|
|
+ value := Value
|
|
|
+ } = Data
|
|
|
+) ->
|
|
|
+ Data#{
|
|
|
+ <<"measurement">> => Measurement,
|
|
|
+ <<"data_type">> => DataType,
|
|
|
+ <<"value">> => Value
|
|
|
+ }.
|
|
|
+
|
|
|
+preproc_data(
|
|
|
+ #{
|
|
|
+ <<"measurement">> := Measurement,
|
|
|
+ <<"data_type">> := DataType,
|
|
|
+ <<"value">> := Value
|
|
|
+ } = Data
|
|
|
+) ->
|
|
|
+ #{
|
|
|
+ timestamp => emqx_plugin_libs_rule:preproc_tmpl(
|
|
|
+ maps:get(<<"timestamp">>, Data, <<"now">>)
|
|
|
+ ),
|
|
|
+ measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
|
|
+ data_type => DataType,
|
|
|
+ value => emqx_plugin_libs_rule:preproc_tmpl(Value)
|
|
|
+ }.
|
|
|
+
|
|
|
+preproc_data_list(DataList) ->
|
|
|
lists:map(
|
|
|
- fun(
|
|
|
- #{
|
|
|
- measurement := Measurement,
|
|
|
- data_type := DataType,
|
|
|
- value := Value
|
|
|
- } = Data
|
|
|
- ) ->
|
|
|
- #{
|
|
|
- timestamp => emqx_plugin_libs_rule:preproc_tmpl(
|
|
|
- maps:get(<<"timestamp">>, Data, <<"now">>)
|
|
|
- ),
|
|
|
- measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
|
|
- data_type => DataType,
|
|
|
- value => emqx_plugin_libs_rule:preproc_tmpl(Value)
|
|
|
- }
|
|
|
- end,
|
|
|
+ fun preproc_data/1,
|
|
|
DataList
|
|
|
).
|
|
|
|
|
|
@@ -258,12 +276,13 @@ convert_float(Str) when is_binary(Str) ->
|
|
|
convert_float(undefined) ->
|
|
|
null.
|
|
|
|
|
|
-make_iotdb_insert_request(Message, State) ->
|
|
|
+make_iotdb_insert_request(MessageUnparsedPayload, State) ->
|
|
|
+ Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
|
|
|
IsAligned = maps:get(is_aligned, State, false),
|
|
|
DeviceId = device_id(Message, State),
|
|
|
- IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X),
|
|
|
+ IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X),
|
|
|
Payload = make_list(maps:get(payload, Message)),
|
|
|
- PreProcessedData = preproc_data(Payload),
|
|
|
+ PreProcessedData = preproc_data_list(Payload),
|
|
|
DataList = proc_data(PreProcessedData, Message),
|
|
|
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
|
|
|
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
|
|
|
@@ -330,15 +349,15 @@ insert_value(1, Data, [Value | Values]) ->
|
|
|
insert_value(Index, Data, [Value | Values]) ->
|
|
|
[[null | Value] | insert_value(Index - 1, Data, Values)].
|
|
|
|
|
|
-iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
|
|
|
+iotdb_field_key(is_aligned, ?VSN_1_X) ->
|
|
|
<<"is_aligned">>;
|
|
|
iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
|
|
|
<<"isAligned">>;
|
|
|
-iotdb_field_key(device_id, ?VSN_1_0_X) ->
|
|
|
+iotdb_field_key(device_id, ?VSN_1_X) ->
|
|
|
<<"device">>;
|
|
|
iotdb_field_key(device_id, ?VSN_0_13_X) ->
|
|
|
<<"deviceId">>;
|
|
|
-iotdb_field_key(data_types, ?VSN_1_0_X) ->
|
|
|
+iotdb_field_key(data_types, ?VSN_1_X) ->
|
|
|
<<"data_types">>;
|
|
|
iotdb_field_key(data_types, ?VSN_0_13_X) ->
|
|
|
<<"dataTypes">>.
|
|
|
@@ -350,6 +369,8 @@ device_id(Message, State) ->
|
|
|
case maps:get(device_id, State, undefined) of
|
|
|
undefined ->
|
|
|
case maps:get(payload, Message) of
|
|
|
+ #{<<"device_id">> := DeviceId} ->
|
|
|
+ DeviceId;
|
|
|
#{device_id := DeviceId} ->
|
|
|
DeviceId;
|
|
|
_NotFound ->
|