| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_plugins_serde).
- -include("emqx_plugins.hrl").
- -include_lib("emqx/include/logger.hrl").
- %% API
- -export([
- start_link/0,
- lookup_serde/1,
- add_schema/2,
- get_schema/1,
- delete_schema/1
- ]).
- %% `gen_server' API
- -export([
- init/1,
- handle_call/3,
- handle_cast/2,
- terminate/2
- ]).
- -export([
- decode/2,
- encode/2
- ]).
- %%-------------------------------------------------------------------------------------------------
- %% API
- %%-------------------------------------------------------------------------------------------------
- start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
- -spec lookup_serde(schema_name()) -> {ok, plugin_schema_serde()} | {error, not_found}.
- lookup_serde(SchemaName) ->
- case ets:lookup(?PLUGIN_SERDE_TAB, to_bin(SchemaName)) of
- [] ->
- {error, not_found};
- [Serde] ->
- {ok, Serde}
- end.
- -spec add_schema(schema_name(), avsc_path()) -> ok | {error, term()}.
- add_schema(NameVsn, Path) ->
- case lookup_serde(NameVsn) of
- {ok, _Serde} ->
- ?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => NameVsn}),
- {error, already_exists};
- {error, not_found} ->
- case gen_server:call(?MODULE, {build_serdes, to_bin(NameVsn), Path}, infinity) of
- ok ->
- ?SLOG(debug, #{msg => "plugin_schema_added", plugin => NameVsn}),
- ok;
- {error, Reason} = E ->
- ?SLOG(error, #{
- msg => "plugin_schema_add_failed",
- plugin => NameVsn,
- reason => emqx_utils:readable_error_msg(Reason)
- }),
- E
- end
- end.
- get_schema(NameVsn) ->
- Path = emqx_plugins:avsc_file_path(NameVsn),
- case read_avsc_file(Path) of
- {ok, Avsc} ->
- {ok, Avsc};
- {error, Reason} ->
- ?SLOG(warning, Reason),
- {error, Reason}
- end.
- -spec delete_schema(schema_name()) -> ok | {error, term()}.
- delete_schema(NameVsn) ->
- case lookup_serde(NameVsn) of
- {ok, _Serde} ->
- async_delete_serdes([NameVsn]),
- ok;
- {error, not_found} ->
- {error, not_found}
- end.
- -spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}.
- decode(SerdeName, RawData) ->
- with_serde(
- ?FUNCTION_NAME,
- SerdeName,
- [RawData]
- ).
- -spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}.
- encode(SerdeName, Data) ->
- with_serde(
- ?FUNCTION_NAME,
- SerdeName,
- [Data]
- ).
- %%-------------------------------------------------------------------------------------------------
- %% `gen_server' API
- %%-------------------------------------------------------------------------------------------------
- init(_) ->
- process_flag(trap_exit, true),
- ok = emqx_utils_ets:new(?PLUGIN_SERDE_TAB, [
- public, ordered_set, {keypos, #plugin_schema_serde.name}
- ]),
- State = #{},
- AvscPaths = get_plugin_avscs(),
- %% force build all schemas at startup
- %% otherwise plugin schema may not be available when needed
- _ = build_serdes(AvscPaths),
- {ok, State}.
- handle_call({build_serdes, NameVsn, AvscPath}, _From, State) ->
- BuildRes = do_build_serde({NameVsn, AvscPath}),
- {reply, BuildRes, State};
- handle_call(_Call, _From, State) ->
- {reply, {error, unknown_call}, State}.
- handle_cast({delete_serdes, Names}, State) ->
- lists:foreach(fun ensure_serde_absent/1, Names),
- {noreply, State};
- handle_cast(_Cast, State) ->
- {noreply, State}.
- terminate(_Reason, _State) ->
- ok.
- %%-------------------------------------------------------------------------------------------------
- %% Internal fns
- %%-------------------------------------------------------------------------------------------------
- -spec get_plugin_avscs() -> [{string(), string()}].
- get_plugin_avscs() ->
- Pattern = filename:join([emqx_plugins:install_dir(), "*", "*", "priv", "config_schema.avsc"]),
- lists:foldl(
- fun(AvscPath, AccIn) ->
- [_, _, _, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
- [{to_bin(NameVsn), AvscPath} | AccIn]
- end,
- _Acc0 = [],
- filelib:wildcard(Pattern)
- ).
- build_serdes(AvscPaths) ->
- ok = lists:foreach(fun do_build_serde/1, AvscPaths).
- do_build_serde({NameVsn, AvscPath}) ->
- try
- Serde = make_serde(NameVsn, AvscPath),
- true = ets:insert(?PLUGIN_SERDE_TAB, Serde),
- ok
- catch
- Kind:Error:Stacktrace ->
- ?SLOG(
- error,
- #{
- msg => "error_building_plugin_schema_serde",
- name => NameVsn,
- kind => Kind,
- error => Error,
- stacktrace => Stacktrace
- }
- ),
- {error, Error}
- end.
- make_serde(NameVsn, AvscPath) when not is_binary(NameVsn) ->
- make_serde(to_bin(NameVsn), AvscPath);
- make_serde(NameVsn, AvscPath) ->
- {ok, AvscBin} = read_avsc_file(AvscPath),
- Store0 = avro_schema_store:new([map]),
- %% import the schema into the map store with an assigned name
- %% if it's a named schema (e.g. struct), then Name is added as alias
- Store = avro_schema_store:import_schema_json(NameVsn, AvscBin, Store0),
- #plugin_schema_serde{
- name = NameVsn,
- eval_context = Store
- }.
- ensure_serde_absent(Name) when not is_binary(Name) ->
- ensure_serde_absent(to_bin(Name));
- ensure_serde_absent(Name) ->
- case lookup_serde(Name) of
- {ok, _Serde} ->
- _ = ets:delete(?PLUGIN_SERDE_TAB, Name),
- ok;
- {error, not_found} ->
- ok
- end.
- async_delete_serdes(Names) ->
- gen_server:cast(?MODULE, {delete_serdes, Names}).
- with_serde(Op, SerdeName, Args) ->
- WhichOp = which_op(Op),
- ErrMsg = error_msg(Op),
- try
- eval_serde(Op, ErrMsg, SerdeName, Args)
- catch
- throw:Reason ->
- ?SLOG(error, Reason#{
- which_op => WhichOp,
- reason => emqx_utils:readable_error_msg(Reason)
- }),
- {error, Reason};
- error:Reason:Stacktrace ->
- %% unexpected errors, log stacktrace
- ?SLOG(warning, #{
- msg => "plugin_schema_op_failed",
- which_op => WhichOp,
- exception => Reason,
- stacktrace => Stacktrace
- }),
- {error, #{
- which_op => WhichOp,
- reason => Reason
- }}
- end.
- eval_serde(Op, ErrMsg, SerdeName, Args) ->
- case lookup_serde(SerdeName) of
- {ok, Serde} ->
- eval_serde(Op, Serde, Args);
- {error, not_found} ->
- throw(#{
- error_msg => ErrMsg,
- reason => plugin_serde_not_found,
- serde_name => SerdeName
- })
- end.
- eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
- Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}, {encoding, avro_json}]),
- {ok, avro_json_decoder:decode_value(Data, Name, Store, Opts)};
- eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
- {ok, avro_json_encoder:encode(Store, Name, Data)};
- eval_serde(_, _, _) ->
- throw(#{error_msg => "unexpected_plugin_avro_op"}).
- which_op(Op) ->
- atom_to_list(Op) ++ "_avro_json".
- error_msg(Op) ->
- atom_to_list(Op) ++ "_avro_data".
- read_avsc_file(Path) ->
- case file:read_file(Path) of
- {ok, Bin} ->
- {ok, Bin};
- {error, _} ->
- {error, #{
- error => "failed_to_read_plugin_schema",
- path => Path
- }}
- end.
- to_bin(A) when is_atom(A) -> atom_to_binary(A);
- to_bin(L) when is_list(L) -> iolist_to_binary(L);
- to_bin(B) when is_binary(B) -> B.
|