|
@@ -134,8 +134,11 @@ on_query(
|
|
|
#{
|
|
#{
|
|
|
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
|
|
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
|
|
|
} = maps:get(ChannelID, Channels),
|
|
} = maps:get(ChannelID, Channels),
|
|
|
- try to_record(PartitionKey, HRecordTemplate, Data) of
|
|
|
|
|
- Record -> append_record(InstId, Producer, Record, false)
|
|
|
|
|
|
|
+ try
|
|
|
|
|
+ KeyAndRawRecord = to_key_and_raw_record(PartitionKey, HRecordTemplate, Data),
|
|
|
|
|
+ emqx_trace:rendered_action_template(ChannelID, #{record => KeyAndRawRecord}),
|
|
|
|
|
+ Record = to_record(KeyAndRawRecord),
|
|
|
|
|
+ append_record(InstId, Producer, Record, false)
|
|
|
catch
|
|
catch
|
|
|
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
|
|
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
|
|
|
end.
|
|
end.
|
|
@@ -148,8 +151,13 @@ on_batch_query(
|
|
|
#{
|
|
#{
|
|
|
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
|
|
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
|
|
|
} = maps:get(ChannelID, Channels),
|
|
} = maps:get(ChannelID, Channels),
|
|
|
- try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of
|
|
|
|
|
- Records -> append_record(InstId, Producer, Records, true)
|
|
|
|
|
|
|
+ try
|
|
|
|
|
+ KeyAndRawRecordList = to_multi_part_key_and_partition_key(
|
|
|
|
|
+ PartitionKey, HRecordTemplate, BatchList
|
|
|
|
|
+ ),
|
|
|
|
|
+ emqx_trace:rendered_action_template(ChannelID, #{records => KeyAndRawRecordList}),
|
|
|
|
|
+ Records = [to_record(Item) || Item <- KeyAndRawRecordList],
|
|
|
|
|
+ append_record(InstId, Producer, Records, true)
|
|
|
catch
|
|
catch
|
|
|
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
|
|
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
|
|
|
end.
|
|
end.
|
|
@@ -348,20 +356,20 @@ ensure_start_producer(ProducerName, ProducerOptions) ->
|
|
|
produce_name(ActionId) ->
|
|
produce_name(ActionId) ->
|
|
|
list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)).
|
|
list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)).
|
|
|
|
|
|
|
|
-to_record(PartitionKeyTmpl, HRecordTmpl, Data) ->
|
|
|
|
|
|
|
+to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data) ->
|
|
|
PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data),
|
|
PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data),
|
|
|
RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data),
|
|
RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data),
|
|
|
- to_record(PartitionKey, RawRecord).
|
|
|
|
|
|
|
+ #{partition_key => PartitionKey, raw_record => RawRecord}.
|
|
|
|
|
|
|
|
-to_record(PartitionKey, RawRecord) when is_binary(PartitionKey) ->
|
|
|
|
|
- to_record(binary_to_list(PartitionKey), RawRecord);
|
|
|
|
|
-to_record(PartitionKey, RawRecord) ->
|
|
|
|
|
|
|
+to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) when is_binary(PartitionKey) ->
|
|
|
|
|
+ to_record(#{partition_key => binary_to_list(PartitionKey), raw_record => RawRecord});
|
|
|
|
|
+to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) ->
|
|
|
hstreamdb:to_record(PartitionKey, raw, RawRecord).
|
|
hstreamdb:to_record(PartitionKey, raw, RawRecord).
|
|
|
|
|
|
|
|
-to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
|
|
|
|
|
|
|
+to_multi_part_key_and_partition_key(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
|
|
|
lists:map(
|
|
lists:map(
|
|
|
fun({_, Data}) ->
|
|
fun({_, Data}) ->
|
|
|
- to_record(PartitionKeyTmpl, HRecordTmpl, Data)
|
|
|
|
|
|
|
+ to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data)
|
|
|
end,
|
|
end,
|
|
|
BatchList
|
|
BatchList
|
|
|
).
|
|
).
|