emqx_plugins_serde.erl 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins_serde).
  17. -include("emqx_plugins.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. %% API
  20. -export([
  21. start_link/0,
  22. lookup_serde/1,
  23. add_schema/2,
  24. get_schema/1,
  25. delete_schema/1
  26. ]).
  27. %% `gen_server' API
  28. -export([
  29. init/1,
  30. handle_call/3,
  31. handle_cast/2,
  32. terminate/2
  33. ]).
  34. -export([
  35. decode/2,
  36. encode/2
  37. ]).
  38. %%-------------------------------------------------------------------------------------------------
  39. %% API
  40. %%-------------------------------------------------------------------------------------------------
  41. start_link() ->
  42. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  43. -spec lookup_serde(schema_name()) -> {ok, plugin_schema_serde()} | {error, not_found}.
  44. lookup_serde(SchemaName) ->
  45. case ets:lookup(?PLUGIN_SERDE_TAB, to_bin(SchemaName)) of
  46. [] ->
  47. {error, not_found};
  48. [Serde] ->
  49. {ok, Serde}
  50. end.
  51. -spec add_schema(schema_name(), avsc_path()) -> ok | {error, term()}.
  52. add_schema(NameVsn, Path) ->
  53. case lookup_serde(NameVsn) of
  54. {ok, _Serde} ->
  55. ?SLOG(warning, #{msg => "plugin_schema_already_exists", plugin => NameVsn}),
  56. {error, already_exists};
  57. {error, not_found} ->
  58. case gen_server:call(?MODULE, {build_serdes, to_bin(NameVsn), Path}, infinity) of
  59. ok ->
  60. ?SLOG(debug, #{msg => "plugin_schema_added", plugin => NameVsn}),
  61. ok;
  62. {error, Reason} = E ->
  63. ?SLOG(error, #{
  64. msg => "plugin_schema_add_failed",
  65. plugin => NameVsn,
  66. reason => emqx_utils:readable_error_msg(Reason)
  67. }),
  68. E
  69. end
  70. end.
  71. get_schema(NameVsn) ->
  72. Path = emqx_plugins:avsc_file_path(NameVsn),
  73. case read_avsc_file(Path) of
  74. {ok, Avsc} ->
  75. {ok, Avsc};
  76. {error, Reason} ->
  77. ?SLOG(warning, Reason),
  78. {error, Reason}
  79. end.
  80. -spec delete_schema(schema_name()) -> ok | {error, term()}.
  81. delete_schema(NameVsn) ->
  82. case lookup_serde(NameVsn) of
  83. {ok, _Serde} ->
  84. async_delete_serdes([NameVsn]),
  85. ok;
  86. {error, not_found} ->
  87. {error, not_found}
  88. end.
  89. -spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}.
  90. decode(SerdeName, RawData) ->
  91. with_serde(
  92. ?FUNCTION_NAME,
  93. SerdeName,
  94. [RawData]
  95. ).
  96. -spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}.
  97. encode(SerdeName, Data) ->
  98. with_serde(
  99. ?FUNCTION_NAME,
  100. SerdeName,
  101. [Data]
  102. ).
  103. %%-------------------------------------------------------------------------------------------------
  104. %% `gen_server' API
  105. %%-------------------------------------------------------------------------------------------------
  106. init(_) ->
  107. process_flag(trap_exit, true),
  108. ok = emqx_utils_ets:new(?PLUGIN_SERDE_TAB, [
  109. public, ordered_set, {keypos, #plugin_schema_serde.name}
  110. ]),
  111. State = #{},
  112. AvscPaths = get_plugin_avscs(),
  113. %% force build all schemas at startup
  114. %% otherwise plugin schema may not be available when needed
  115. _ = build_serdes(AvscPaths),
  116. {ok, State}.
  117. handle_call({build_serdes, NameVsn, AvscPath}, _From, State) ->
  118. BuildRes = do_build_serde({NameVsn, AvscPath}),
  119. {reply, BuildRes, State};
  120. handle_call(_Call, _From, State) ->
  121. {reply, {error, unknown_call}, State}.
  122. handle_cast({delete_serdes, Names}, State) ->
  123. lists:foreach(fun ensure_serde_absent/1, Names),
  124. {noreply, State};
  125. handle_cast(_Cast, State) ->
  126. {noreply, State}.
  127. terminate(_Reason, _State) ->
  128. ok.
  129. %%-------------------------------------------------------------------------------------------------
  130. %% Internal fns
  131. %%-------------------------------------------------------------------------------------------------
  132. -spec get_plugin_avscs() -> [{string(), string()}].
  133. get_plugin_avscs() ->
  134. Pattern = filename:join([emqx_plugins:install_dir(), "*", "*", "priv", "config_schema.avsc"]),
  135. lists:foldl(
  136. fun(AvscPath, AccIn) ->
  137. [_, _, _, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
  138. [{to_bin(NameVsn), AvscPath} | AccIn]
  139. end,
  140. _Acc0 = [],
  141. filelib:wildcard(Pattern)
  142. ).
  143. build_serdes(AvscPaths) ->
  144. ok = lists:foreach(fun do_build_serde/1, AvscPaths).
  145. do_build_serde({NameVsn, AvscPath}) ->
  146. try
  147. Serde = make_serde(NameVsn, AvscPath),
  148. true = ets:insert(?PLUGIN_SERDE_TAB, Serde),
  149. ok
  150. catch
  151. Kind:Error:Stacktrace ->
  152. ?SLOG(
  153. error,
  154. #{
  155. msg => "error_building_plugin_schema_serde",
  156. name => NameVsn,
  157. kind => Kind,
  158. error => Error,
  159. stacktrace => Stacktrace
  160. }
  161. ),
  162. {error, Error}
  163. end.
  164. make_serde(NameVsn, AvscPath) when not is_binary(NameVsn) ->
  165. make_serde(to_bin(NameVsn), AvscPath);
  166. make_serde(NameVsn, AvscPath) ->
  167. {ok, AvscBin} = read_avsc_file(AvscPath),
  168. Store0 = avro_schema_store:new([map]),
  169. %% import the schema into the map store with an assigned name
  170. %% if it's a named schema (e.g. struct), then Name is added as alias
  171. Store = avro_schema_store:import_schema_json(NameVsn, AvscBin, Store0),
  172. #plugin_schema_serde{
  173. name = NameVsn,
  174. eval_context = Store
  175. }.
  176. ensure_serde_absent(Name) when not is_binary(Name) ->
  177. ensure_serde_absent(to_bin(Name));
  178. ensure_serde_absent(Name) ->
  179. case lookup_serde(Name) of
  180. {ok, _Serde} ->
  181. _ = ets:delete(?PLUGIN_SERDE_TAB, Name),
  182. ok;
  183. {error, not_found} ->
  184. ok
  185. end.
  186. async_delete_serdes(Names) ->
  187. gen_server:cast(?MODULE, {delete_serdes, Names}).
  188. with_serde(Op, SerdeName, Args) ->
  189. WhichOp = which_op(Op),
  190. ErrMsg = error_msg(Op),
  191. try
  192. eval_serde(Op, ErrMsg, SerdeName, Args)
  193. catch
  194. throw:Reason ->
  195. ?SLOG(error, Reason#{
  196. which_op => WhichOp,
  197. reason => emqx_utils:readable_error_msg(Reason)
  198. }),
  199. {error, Reason};
  200. error:Reason:Stacktrace ->
  201. %% unexpected errors, log stacktrace
  202. ?SLOG(warning, #{
  203. msg => "plugin_schema_op_failed",
  204. which_op => WhichOp,
  205. exception => Reason,
  206. stacktrace => Stacktrace
  207. }),
  208. {error, #{
  209. which_op => WhichOp,
  210. reason => Reason
  211. }}
  212. end.
  213. eval_serde(Op, ErrMsg, SerdeName, Args) ->
  214. case lookup_serde(SerdeName) of
  215. {ok, Serde} ->
  216. eval_serde(Op, Serde, Args);
  217. {error, not_found} ->
  218. throw(#{
  219. error_msg => ErrMsg,
  220. reason => plugin_serde_not_found,
  221. serde_name => SerdeName
  222. })
  223. end.
  224. eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
  225. Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}, {encoding, avro_json}]),
  226. {ok, avro_json_decoder:decode_value(Data, Name, Store, Opts)};
  227. eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
  228. {ok, avro_json_encoder:encode(Store, Name, Data)};
  229. eval_serde(_, _, _) ->
  230. throw(#{error_msg => "unexpected_plugin_avro_op"}).
  231. which_op(Op) ->
  232. atom_to_list(Op) ++ "_avro_json".
  233. error_msg(Op) ->
  234. atom_to_list(Op) ++ "_avro_data".
  235. read_avsc_file(Path) ->
  236. case file:read_file(Path) of
  237. {ok, Bin} ->
  238. {ok, Bin};
  239. {error, _} ->
  240. {error, #{
  241. error => "failed_to_read_plugin_schema",
  242. path => Path
  243. }}
  244. end.
  245. to_bin(A) when is_atom(A) -> atom_to_binary(A);
  246. to_bin(L) when is_list(L) -> iolist_to_binary(L);
  247. to_bin(B) when is_binary(B) -> B.