|
|
@@ -102,15 +102,17 @@ delete_schema(NameVsn) ->
|
|
|
-spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}.
|
|
|
decode(SerdeName, RawData) ->
|
|
|
with_serde(
|
|
|
- "decode_avro_json",
|
|
|
- eval_serde_fun(?FUNCTION_NAME, "bad_avro_binary", SerdeName, [RawData])
|
|
|
+ ?FUNCTION_NAME,
|
|
|
+ SerdeName,
|
|
|
+ [RawData]
|
|
|
).
|
|
|
|
|
|
-spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}.
|
|
|
encode(SerdeName, Data) ->
|
|
|
with_serde(
|
|
|
- "encode_avro_json",
|
|
|
- eval_serde_fun(?FUNCTION_NAME, "bad_avro_data", SerdeName, [Data])
|
|
|
+ ?FUNCTION_NAME,
|
|
|
+ SerdeName,
|
|
|
+ [Data]
|
|
|
).
|
|
|
|
|
|
%%-------------------------------------------------------------------------------------------------
|
|
|
@@ -209,9 +211,11 @@ ensure_serde_absent(Name) ->
|
|
|
async_delete_serdes(Names) ->
|
|
|
gen_server:cast(?MODULE, {delete_serdes, Names}).
|
|
|
|
|
|
-with_serde(WhichOp, Fun) ->
|
|
|
+with_serde(Op, SerdeName, Args) ->
|
|
|
+ WhichOp = which_op(Op),
|
|
|
+ ErrMsg = error_msg(Op),
|
|
|
try
|
|
|
- Fun()
|
|
|
+ eval_serde(Op, ErrMsg, SerdeName, Args)
|
|
|
catch
|
|
|
throw:Reason ->
|
|
|
?SLOG(error, Reason#{
|
|
|
@@ -233,18 +237,16 @@ with_serde(WhichOp, Fun) ->
|
|
|
}}
|
|
|
end.
|
|
|
|
|
|
-eval_serde_fun(Op, ErrMsg, SerdeName, Args) ->
|
|
|
- fun() ->
|
|
|
- case lookup_serde(SerdeName) of
|
|
|
- {ok, Serde} ->
|
|
|
- eval_serde(Op, Serde, Args);
|
|
|
- {error, not_found} ->
|
|
|
- throw(#{
|
|
|
- error_msg => ErrMsg,
|
|
|
- reason => plugin_serde_not_found,
|
|
|
- serde_name => SerdeName
|
|
|
- })
|
|
|
- end
|
|
|
+eval_serde(Op, ErrMsg, SerdeName, Args) ->
|
|
|
+ case lookup_serde(SerdeName) of
|
|
|
+ {ok, Serde} ->
|
|
|
+ eval_serde(Op, Serde, Args);
|
|
|
+ {error, not_found} ->
|
|
|
+ throw(#{
|
|
|
+ error_msg => ErrMsg,
|
|
|
+ reason => plugin_serde_not_found,
|
|
|
+ serde_name => SerdeName
|
|
|
+ })
|
|
|
end.
|
|
|
|
|
|
eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
|
|
|
@@ -255,6 +257,12 @@ eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Dat
|
|
|
eval_serde(_, _, _) ->
|
|
|
throw(#{error_msg => "unexpected_plugin_avro_op"}).
|
|
|
|
|
|
+which_op(Op) ->
|
|
|
+ atom_to_list(Op) ++ "_avro_json".
|
|
|
+
|
|
|
+error_msg(Op) ->
|
|
|
+ atom_to_list(Op) ++ "_avro_data".
|
|
|
+
|
|
|
read_avsc_file(Path) ->
|
|
|
case file:read_file(Path) of
|
|
|
{ok, Bin} ->
|