|
|
@@ -26,20 +26,28 @@
|
|
|
on_message_publish/1
|
|
|
]).
|
|
|
|
|
|
+%% Internal exports
|
|
|
+-export([run_transformation/2, trace_failure_context_to_map/1]).
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Type declarations
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-define(TRACE_TAG, "MESSAGE_TRANSFORMATION").
|
|
|
--define(CONF_ROOT, message_transformation).
|
|
|
--define(CONF_ROOT_BIN, <<"message_transformation">>).
|
|
|
--define(TRANSFORMATIONS_CONF_PATH, [?CONF_ROOT, transformations]).
|
|
|
+
|
|
|
+-record(trace_failure_context, {
|
|
|
+ transformation :: transformation(),
|
|
|
+ tag :: string(),
|
|
|
+ context :: map()
|
|
|
+}).
|
|
|
+-type trace_failure_context() :: #trace_failure_context{}.
|
|
|
|
|
|
-type transformation_name() :: binary().
|
|
|
%% TODO: make more specific typespec
|
|
|
-type transformation() :: #{atom() => term()}.
|
|
|
%% TODO: make more specific typespec
|
|
|
-type variform() :: any().
|
|
|
+-type failure_action() :: ignore | drop | disconnect.
|
|
|
-type operation() :: #{key := [binary(), ...], value := variform()}.
|
|
|
-type qos() :: 0..2.
|
|
|
-type rendered_value() :: qos() | boolean() | binary().
|
|
|
@@ -62,7 +70,8 @@
|
|
|
|
|
|
-export_type([
|
|
|
transformation/0,
|
|
|
- transformation_name/0
|
|
|
+ transformation_name/0,
|
|
|
+ failure_action/0
|
|
|
]).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -125,19 +134,50 @@ on_message_publish(Message = #message{topic = Topic}) ->
|
|
|
%% Internal exports
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
+-spec run_transformation(transformation(), emqx_types:message()) ->
|
|
|
+ {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}.
|
|
|
+run_transformation(Transformation, MessageIn) ->
|
|
|
+ #{
|
|
|
+ operations := Operations,
|
|
|
+ failure_action := FailureAction,
|
|
|
+ payload_decoder := PayloadDecoder
|
|
|
+ } = Transformation,
|
|
|
+ Fun = fun(Operation, Acc) ->
|
|
|
+ case eval_operation(Operation, Transformation, Acc) of
|
|
|
+ {ok, NewAcc} -> {cont, NewAcc};
|
|
|
+ {error, TraceFailureContext} -> {halt, {error, TraceFailureContext}}
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ PayloadIn = MessageIn#message.payload,
|
|
|
+ case decode(PayloadIn, PayloadDecoder, Transformation) of
|
|
|
+ {ok, InitPayload} ->
|
|
|
+ InitAcc = message_to_context(MessageIn, InitPayload, Transformation),
|
|
|
+ case emqx_utils:foldl_while(Fun, InitAcc, Operations) of
|
|
|
+ #{} = ContextOut ->
|
|
|
+ context_to_message(MessageIn, ContextOut, Transformation);
|
|
|
+ {error, TraceFailureContext} ->
|
|
|
+ {FailureAction, TraceFailureContext}
|
|
|
+ end;
|
|
|
+ {error, TraceFailureContext} ->
|
|
|
+ {FailureAction, TraceFailureContext}
|
|
|
+ end.
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
--spec eval_operation(operation(), transformation(), eval_context()) -> {ok, eval_context()} | error.
|
|
|
+-spec eval_operation(operation(), transformation(), eval_context()) ->
|
|
|
+ {ok, eval_context()} | {error, trace_failure_context()}.
|
|
|
eval_operation(Operation, Transformation, Context) ->
|
|
|
#{key := K, value := V} = Operation,
|
|
|
case eval_variform(K, V, Context) of
|
|
|
{error, Reason} ->
|
|
|
- trace_failure(Transformation, "transformation_eval_operation_failure", #{
|
|
|
- reason => Reason
|
|
|
- }),
|
|
|
- error;
|
|
|
+ FailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "transformation_eval_operation_failure",
|
|
|
+ context = #{reason => Reason}
|
|
|
+ },
|
|
|
+ {error, FailureContext};
|
|
|
{ok, Rendered} ->
|
|
|
NewContext = put_value(K, Rendered, Context),
|
|
|
{ok, NewContext}
|
|
|
@@ -233,14 +273,16 @@ do_run_transformations(Transformations, Message) ->
|
|
|
#{name := Name} = Transformation,
|
|
|
emqx_message_transformation_registry:inc_matched(Name),
|
|
|
case run_transformation(Transformation, MessageAcc) of
|
|
|
- #message{} = NewAcc ->
|
|
|
+ {ok, #message{} = NewAcc} ->
|
|
|
emqx_message_transformation_registry:inc_succeeded(Name),
|
|
|
{cont, NewAcc};
|
|
|
- ignore ->
|
|
|
+ {ignore, TraceFailureContext} ->
|
|
|
+ trace_failure_from_context(TraceFailureContext),
|
|
|
emqx_message_transformation_registry:inc_failed(Name),
|
|
|
run_message_transformation_failed_hook(Message, Transformation),
|
|
|
{cont, MessageAcc};
|
|
|
- FailureAction ->
|
|
|
+ {FailureAction, TraceFailureContext} ->
|
|
|
+ trace_failure_from_context(TraceFailureContext),
|
|
|
trace_failure(Transformation, "transformation_failed", #{
|
|
|
transformation => Name,
|
|
|
action => FailureAction
|
|
|
@@ -270,33 +312,6 @@ do_run_transformations(Transformations, Message) ->
|
|
|
FailureAction
|
|
|
end.
|
|
|
|
|
|
-run_transformation(Transformation, MessageIn) ->
|
|
|
- #{
|
|
|
- operations := Operations,
|
|
|
- failure_action := FailureAction,
|
|
|
- payload_decoder := PayloadDecoder
|
|
|
- } = Transformation,
|
|
|
- Fun = fun(Operation, Acc) ->
|
|
|
- case eval_operation(Operation, Transformation, Acc) of
|
|
|
- {ok, NewAcc} -> {cont, NewAcc};
|
|
|
- error -> {halt, FailureAction}
|
|
|
- end
|
|
|
- end,
|
|
|
- PayloadIn = MessageIn#message.payload,
|
|
|
- case decode(PayloadIn, PayloadDecoder, Transformation) of
|
|
|
- {ok, InitPayload} ->
|
|
|
- InitAcc = message_to_context(MessageIn, InitPayload, Transformation),
|
|
|
- case emqx_utils:foldl_while(Fun, InitAcc, Operations) of
|
|
|
- #{} = ContextOut ->
|
|
|
- context_to_message(MessageIn, ContextOut, Transformation);
|
|
|
- _ ->
|
|
|
- FailureAction
|
|
|
- end;
|
|
|
- error ->
|
|
|
- %% Error already logged
|
|
|
- FailureAction
|
|
|
- end.
|
|
|
-
|
|
|
-spec message_to_context(emqx_types:message(), _Payload, transformation()) -> eval_context().
|
|
|
message_to_context(#message{} = Message, Payload, Transformation) ->
|
|
|
#{
|
|
|
@@ -321,7 +336,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
|
|
|
}.
|
|
|
|
|
|
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
|
|
|
- {ok, emqx_types:message()} | _TODO.
|
|
|
+ {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}.
|
|
|
context_to_message(Message, Context, Transformation) ->
|
|
|
#{
|
|
|
failure_action := FailureAction,
|
|
|
@@ -330,9 +345,9 @@ context_to_message(Message, Context, Transformation) ->
|
|
|
#{payload := PayloadOut} = Context,
|
|
|
case encode(PayloadOut, PayloadEncoder, Transformation) of
|
|
|
{ok, Payload} ->
|
|
|
- take_from_context(Context#{payload := Payload}, Message);
|
|
|
- error ->
|
|
|
- FailureAction
|
|
|
+ {ok, take_from_context(Context#{payload := Payload}, Message)};
|
|
|
+ {error, TraceFailureContext} ->
|
|
|
+ {FailureAction, TraceFailureContext}
|
|
|
end.
|
|
|
|
|
|
take_from_context(Context, Message) ->
|
|
|
@@ -362,31 +377,43 @@ decode(Payload, #{type := json}, Transformation) ->
|
|
|
{ok, JSON} ->
|
|
|
{ok, JSON};
|
|
|
{error, Reason} ->
|
|
|
- trace_failure(Transformation, "payload_decode_failed", #{
|
|
|
- decoder => json,
|
|
|
- reason => Reason
|
|
|
- }),
|
|
|
- error
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_decode_failed",
|
|
|
+ context = #{
|
|
|
+ decoder => json,
|
|
|
+ reason => Reason
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext}
|
|
|
end;
|
|
|
decode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
|
|
|
try
|
|
|
{ok, emqx_schema_registry_serde:decode(SerdeName, Payload)}
|
|
|
catch
|
|
|
error:{serde_not_found, _} ->
|
|
|
- trace_failure(Transformation, "payload_decode_schema_not_found", #{
|
|
|
- decoder => avro,
|
|
|
- schema_name => SerdeName
|
|
|
- }),
|
|
|
- error;
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_decode_schema_not_found",
|
|
|
+ context = #{
|
|
|
+ decoder => avro,
|
|
|
+ schema_name => SerdeName
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext};
|
|
|
Class:Error:Stacktrace ->
|
|
|
- trace_failure(Transformation, "payload_decode_schema_failure", #{
|
|
|
- decoder => avro,
|
|
|
- schema_name => SerdeName,
|
|
|
- kind => Class,
|
|
|
- reason => Error,
|
|
|
- stacktrace => Stacktrace
|
|
|
- }),
|
|
|
- error
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_decode_schema_failure",
|
|
|
+ context = #{
|
|
|
+ decoder => avro,
|
|
|
+ schema_name => SerdeName,
|
|
|
+ kind => Class,
|
|
|
+ reason => Error,
|
|
|
+ stacktrace => Stacktrace
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext}
|
|
|
end;
|
|
|
decode(
|
|
|
Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
|
|
|
@@ -395,22 +422,30 @@ decode(
|
|
|
{ok, emqx_schema_registry_serde:decode(SerdeName, Payload, [MessageType])}
|
|
|
catch
|
|
|
error:{serde_not_found, _} ->
|
|
|
- trace_failure(Transformation, "payload_decode_schema_not_found", #{
|
|
|
- decoder => protobuf,
|
|
|
- schema_name => SerdeName,
|
|
|
- message_type => MessageType
|
|
|
- }),
|
|
|
- error;
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_decode_schema_not_found",
|
|
|
+ context = #{
|
|
|
+ decoder => protobuf,
|
|
|
+ schema_name => SerdeName,
|
|
|
+ message_type => MessageType
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext};
|
|
|
Class:Error:Stacktrace ->
|
|
|
- trace_failure(Transformation, "payload_decode_schema_failure", #{
|
|
|
- decoder => protobuf,
|
|
|
- schema_name => SerdeName,
|
|
|
- message_type => MessageType,
|
|
|
- kind => Class,
|
|
|
- reason => Error,
|
|
|
- stacktrace => Stacktrace
|
|
|
- }),
|
|
|
- error
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_decode_schema_failure",
|
|
|
+ context = #{
|
|
|
+ decoder => protobuf,
|
|
|
+ schema_name => SerdeName,
|
|
|
+ message_type => MessageType,
|
|
|
+ kind => Class,
|
|
|
+ reason => Error,
|
|
|
+ stacktrace => Stacktrace
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext}
|
|
|
end.
|
|
|
|
|
|
encode(Payload, #{type := none}, _Transformation) ->
|
|
|
@@ -420,31 +455,43 @@ encode(Payload, #{type := json}, Transformation) ->
|
|
|
{ok, Bin} ->
|
|
|
{ok, Bin};
|
|
|
{error, Reason} ->
|
|
|
- trace_failure(Transformation, "payload_encode_failed", #{
|
|
|
- encoder => json,
|
|
|
- reason => Reason
|
|
|
- }),
|
|
|
- error
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_encode_failed",
|
|
|
+ context = #{
|
|
|
+ encoder => json,
|
|
|
+ reason => Reason
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext}
|
|
|
end;
|
|
|
encode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
|
|
|
try
|
|
|
{ok, emqx_schema_registry_serde:encode(SerdeName, Payload)}
|
|
|
catch
|
|
|
error:{serde_not_found, _} ->
|
|
|
- trace_failure(Transformation, "payload_encode_schema_not_found", #{
|
|
|
- encoder => avro,
|
|
|
- schema_name => SerdeName
|
|
|
- }),
|
|
|
- error;
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_encode_schema_not_found",
|
|
|
+ context = #{
|
|
|
+ encoder => avro,
|
|
|
+ schema_name => SerdeName
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext};
|
|
|
Class:Error:Stacktrace ->
|
|
|
- trace_failure(Transformation, "payload_encode_schema_failure", #{
|
|
|
- encoder => avro,
|
|
|
- schema_name => SerdeName,
|
|
|
- kind => Class,
|
|
|
- reason => Error,
|
|
|
- stacktrace => Stacktrace
|
|
|
- }),
|
|
|
- error
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_encode_schema_failure",
|
|
|
+ context = #{
|
|
|
+ encoder => avro,
|
|
|
+ schema_name => SerdeName,
|
|
|
+ kind => Class,
|
|
|
+ reason => Error,
|
|
|
+ stacktrace => Stacktrace
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext}
|
|
|
end;
|
|
|
encode(
|
|
|
Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
|
|
|
@@ -453,24 +500,50 @@ encode(
|
|
|
{ok, emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageType])}
|
|
|
catch
|
|
|
error:{serde_not_found, _} ->
|
|
|
- trace_failure(Transformation, "payload_encode_schema_not_found", #{
|
|
|
- encoder => protobuf,
|
|
|
- schema_name => SerdeName,
|
|
|
- message_type => MessageType
|
|
|
- }),
|
|
|
- error;
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_encode_schema_failure",
|
|
|
+ context = #{
|
|
|
+ encoder => protobuf,
|
|
|
+ schema_name => SerdeName,
|
|
|
+ message_type => MessageType
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext};
|
|
|
Class:Error:Stacktrace ->
|
|
|
- trace_failure(Transformation, "payload_encode_schema_failure", #{
|
|
|
- encoder => protobuf,
|
|
|
- schema_name => SerdeName,
|
|
|
- message_type => MessageType,
|
|
|
- kind => Class,
|
|
|
- reason => Error,
|
|
|
- stacktrace => Stacktrace
|
|
|
- }),
|
|
|
- error
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_encode_schema_failure",
|
|
|
+ context = #{
|
|
|
+ encoder => protobuf,
|
|
|
+ schema_name => SerdeName,
|
|
|
+ message_type => MessageType,
|
|
|
+ kind => Class,
|
|
|
+ reason => Error,
|
|
|
+ stacktrace => Stacktrace
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext}
|
|
|
end.
|
|
|
|
|
|
+trace_failure_from_context(
|
|
|
+ #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = Tag,
|
|
|
+ context = Context
|
|
|
+ }
|
|
|
+) ->
|
|
|
+ trace_failure(Transformation, Tag, Context).
|
|
|
+
|
|
|
+%% Internal export for HTTP API.
|
|
|
+trace_failure_context_to_map(
|
|
|
+ #trace_failure_context{
|
|
|
+ tag = Tag,
|
|
|
+ context = Context
|
|
|
+ }
|
|
|
+) ->
|
|
|
+ Context#{msg => list_to_binary(Tag)}.
|
|
|
+
|
|
|
trace_failure(#{log_failure := #{level := none}} = Transformation, _Msg, _Meta) ->
|
|
|
#{
|
|
|
name := _Name,
|