Просмотр исходного кода

Merge pull request #12581 from zmstone/0223-add-json-schema

feat: schema registry supports json schema
Zaiming (Stone) Shi 1 год назад
Родитель
Сommit
e9a5670cce

+ 1 - 1
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -497,7 +497,7 @@ do_t_session_expiration(_Config, Opts) ->
     ok.
     ok.
 
 
 t_session_gc(Config) ->
 t_session_gc(Config) ->
-    [Node1, Node2, _Node3] = Nodes = ?config(nodes, Config),
+    [Node1, _Node2, _Node3] = Nodes = ?config(nodes, Config),
     [
     [
         Port1,
         Port1,
         Port2,
         Port2,

+ 1 - 1
apps/emqx_gateway_ocpp/rebar.config

@@ -1,7 +1,7 @@
 %% -*- mode: erlang; -*-
 %% -*- mode: erlang; -*-
 
 
 {deps, [
 {deps, [
-    {jesse, "1.7.0"},
+    {jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}},
     {emqx, {path, "../../apps/emqx"}},
     {emqx, {path, "../../apps/emqx"}},
     {emqx_utils, {path, "../emqx_utils"}},
     {emqx_utils, {path, "../emqx_utils"}},
     {emqx_gateway, {path, "../../apps/emqx_gateway"}}
     {emqx_gateway, {path, "../../apps/emqx_gateway"}}

+ 4 - 2
apps/emqx_schema_registry/include/emqx_schema_registry.hrl

@@ -26,13 +26,15 @@
 -type encoded_data() :: iodata().
 -type encoded_data() :: iodata().
 -type decoded_data() :: map().
 -type decoded_data() :: map().
 
 
--type serde_type() :: avro | protobuf.
+-type serde_type() :: avro | protobuf | json.
 -type serde_opts() :: map().
 -type serde_opts() :: map().
 
 
 -record(serde, {
 -record(serde, {
     name :: schema_name(),
     name :: schema_name(),
     type :: serde_type(),
     type :: serde_type(),
-    eval_context :: term()
+    eval_context :: term(),
+    %% for future use
+    extra = []
 }).
 }).
 -type serde() :: #serde{}.
 -type serde() :: #serde{}.
 
 

+ 1 - 0
apps/emqx_schema_registry/rebar.config

@@ -6,6 +6,7 @@
     {emqx_utils, {path, "../emqx_utils"}},
     {emqx_utils, {path, "../emqx_utils"}},
     {emqx_rule_engine, {path, "../emqx_rule_engine"}},
     {emqx_rule_engine, {path, "../emqx_rule_engine"}},
     {erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}},
     {erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}},
+    {jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}},
     {gpb, "4.19.9"}
     {gpb, "4.19.9"}
 ]}.
 ]}.
 
 

+ 2 - 1
apps/emqx_schema_registry/src/emqx_schema_registry.app.src

@@ -10,7 +10,8 @@
         kernel,
         kernel,
         stdlib,
         stdlib,
         erlavro,
         erlavro,
-        gpb
+        gpb,
+        jesse
     ]},
     ]},
     {env, []},
     {env, []},
     {modules, []},
     {modules, []},

+ 6 - 2
apps/emqx_schema_registry/src/emqx_schema_registry.erl

@@ -218,7 +218,10 @@ terminate(_Reason, _State) ->
 %%-------------------------------------------------------------------------------------------------
 %%-------------------------------------------------------------------------------------------------
 
 
 create_tables() ->
 create_tables() ->
-    ok = emqx_utils_ets:new(?SERDE_TAB, [public, {keypos, #serde.name}]),
+    ok = emqx_utils_ets:new(?SERDE_TAB, [public, ordered_set, {keypos, #serde.name}]),
+    %% have to create the table for jesse_database otherwise the on-demand table will disappear
+    %% when the caller process dies
+    ok = emqx_utils_ets:new(jesse_ets, [public, ordered_set]),
     ok = mria:create_table(?PROTOBUF_CACHE_TAB, [
     ok = mria:create_table(?PROTOBUF_CACHE_TAB, [
         {type, set},
         {type, set},
         {rlog_shard, ?SCHEMA_REGISTRY_SHARD},
         {rlog_shard, ?SCHEMA_REGISTRY_SHARD},
@@ -312,8 +315,9 @@ ensure_serde_absent(Name) when not is_binary(Name) ->
 ensure_serde_absent(Name) ->
 ensure_serde_absent(Name) ->
     case get_serde(Name) of
     case get_serde(Name) of
         {ok, Serde} ->
         {ok, Serde} ->
+            ok = emqx_schema_registry_serde:destroy(Serde),
             _ = ets:delete(?SERDE_TAB, Name),
             _ = ets:delete(?SERDE_TAB, Name),
-            ok = emqx_schema_registry_serde:destroy(Serde);
+            ok;
         {error, not_found} ->
         {error, not_found} ->
             ok
             ok
     end.
     end.

+ 27 - 9
apps/emqx_schema_registry/src/emqx_schema_registry_schema.erl

@@ -52,34 +52,48 @@ fields(?CONF_KEY_ROOT) ->
     ];
     ];
 fields(avro) ->
 fields(avro) ->
     [
     [
-        {type, mk(avro, #{required => true, desc => ?DESC("schema_type")})},
-        {source,
-            mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})},
-        {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
+        {type, mk(avro, #{required => true, desc => ?DESC("schema_type_avro")})}
+        | common_fields(emqx_schema:json_binary())
     ];
     ];
 fields(protobuf) ->
 fields(protobuf) ->
     [
     [
-        {type, mk(protobuf, #{required => true, desc => ?DESC("schema_type")})},
-        {source, mk(binary(), #{required => true, desc => ?DESC("schema_source")})},
-        {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
+        {type, mk(protobuf, #{required => true, desc => ?DESC("schema_type_protobuf")})}
+        | common_fields(binary())
+    ];
+fields(json) ->
+    [
+        {type, mk(json, #{required => true, desc => ?DESC("schema_type_json")})}
+        | common_fields(emqx_schema:json_binary())
     ];
     ];
 fields("get_avro") ->
 fields("get_avro") ->
     [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
     [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
 fields("get_protobuf") ->
 fields("get_protobuf") ->
     [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)];
     [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)];
+fields("get_json") ->
+    [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(json)];
 fields("put_avro") ->
 fields("put_avro") ->
     fields(avro);
     fields(avro);
 fields("put_protobuf") ->
 fields("put_protobuf") ->
     fields(protobuf);
     fields(protobuf);
+fields("put_json") ->
+    fields(json);
 fields("post_" ++ Type) ->
 fields("post_" ++ Type) ->
     fields("get_" ++ Type).
     fields("get_" ++ Type).
 
 
+common_fields(SourceType) ->
+    [
+        {source, mk(SourceType, #{required => true, desc => ?DESC("schema_source")})},
+        {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
+    ].
+
 desc(?CONF_KEY_ROOT) ->
 desc(?CONF_KEY_ROOT) ->
     ?DESC("schema_registry_root");
     ?DESC("schema_registry_root");
 desc(avro) ->
 desc(avro) ->
     ?DESC("avro_type");
     ?DESC("avro_type");
 desc(protobuf) ->
 desc(protobuf) ->
     ?DESC("protobuf_type");
     ?DESC("protobuf_type");
+desc(json) ->
+    ?DESC("json_type");
 desc(_) ->
 desc(_) ->
     undefined.
     undefined.
 
 
@@ -121,7 +135,7 @@ mk(Type, Meta) -> hoconsc:mk(Type, Meta).
 ref(Name) -> hoconsc:ref(?MODULE, Name).
 ref(Name) -> hoconsc:ref(?MODULE, Name).
 
 
 supported_serde_types() ->
 supported_serde_types() ->
-    [avro, protobuf].
+    [avro, protobuf, json].
 
 
 refs() ->
 refs() ->
     [ref(Type) || Type <- supported_serde_types()].
     [ref(Type) || Type <- supported_serde_types()].
@@ -132,6 +146,8 @@ refs(#{<<"type">> := <<"avro">>}) ->
     [ref(avro)];
     [ref(avro)];
 refs(#{<<"type">> := <<"protobuf">>}) ->
 refs(#{<<"type">> := <<"protobuf">>}) ->
     [ref(protobuf)];
     [ref(protobuf)];
+refs(#{<<"type">> := <<"json">>}) ->
+    [ref(json)];
 refs(_) ->
 refs(_) ->
     Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
     Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
     throw(#{
     throw(#{
@@ -140,7 +156,7 @@ refs(_) ->
     }).
     }).
 
 
 refs_get_api() ->
 refs_get_api() ->
-    [ref("get_avro"), ref("get_protobuf")].
+    [ref("get_avro"), ref("get_protobuf"), ref("get_json")].
 
 
 refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
 refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
     refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
     refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
@@ -148,6 +164,8 @@ refs_get_api(#{<<"type">> := <<"avro">>}) ->
     [ref("get_avro")];
     [ref("get_avro")];
 refs_get_api(#{<<"type">> := <<"protobuf">>}) ->
 refs_get_api(#{<<"type">> := <<"protobuf">>}) ->
     [ref("get_protobuf")];
     [ref("get_protobuf")];
+refs_get_api(#{<<"type">> := <<"json">>}) ->
+    [ref("get_json")];
 refs_get_api(_) ->
 refs_get_api(_) ->
     Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
     Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
     throw(#{
     throw(#{

+ 101 - 22
apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl

@@ -11,20 +11,32 @@
 
 
 %% API
 %% API
 -export([
 -export([
-    decode/2,
-    decode/3,
-    encode/2,
-    encode/3,
     make_serde/3,
     make_serde/3,
     handle_rule_function/2,
     handle_rule_function/2,
     destroy/1
     destroy/1
 ]).
 ]).
 
 
+%% Tests
 -export([
 -export([
+    decode/2,
+    decode/3,
+    encode/2,
+    encode/3,
     eval_decode/2,
     eval_decode/2,
     eval_encode/2
     eval_encode/2
 ]).
 ]).
 
 
+-define(BOOL(SerdeName, EXPR),
+    try
+        _ = EXPR,
+        true
+    catch
+        error:Reason ->
+            ?SLOG(debug, #{msg => "schema_check_failed", schema => SerdeName, reason => Reason}),
+            false
+    end
+).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% API
 %% API
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -40,10 +52,6 @@ handle_rule_function(sparkplug_decode, [Data | MoreArgs]) ->
         schema_decode,
         schema_decode,
         [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
         [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
     );
     );
-handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
-    decode(SchemaId, Data, MoreArgs);
-handle_rule_function(schema_decode, Args) ->
-    error({args_count_error, {schema_decode, Args}});
 handle_rule_function(sparkplug_encode, [Term]) ->
 handle_rule_function(sparkplug_encode, [Term]) ->
     handle_rule_function(
     handle_rule_function(
         schema_encode,
         schema_encode,
@@ -54,6 +62,10 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
         schema_encode,
         schema_encode,
         [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
         [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
     );
     );
+handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
+    decode(SchemaId, Data, MoreArgs);
+handle_rule_function(schema_decode, Args) ->
+    error({args_count_error, {schema_decode, Args}});
 handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
 handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
     %% encode outputs iolists, but when the rule actions process those
     %% encode outputs iolists, but when the rule actions process those
     %% it might wrongly encode them as JSON lists, so we force them to
     %% it might wrongly encode them as JSON lists, so we force them to
@@ -62,33 +74,56 @@ handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
     iolist_to_binary(IOList);
     iolist_to_binary(IOList);
 handle_rule_function(schema_encode, Args) ->
 handle_rule_function(schema_encode, Args) ->
     error({args_count_error, {schema_encode, Args}});
     error({args_count_error, {schema_encode, Args}});
+handle_rule_function(schema_check, [SchemaId, Data | MoreArgs]) ->
+    schema_check(SchemaId, Data, MoreArgs);
 handle_rule_function(_, _) ->
 handle_rule_function(_, _) ->
     {error, no_match_for_function}.
     {error, no_match_for_function}.
 
 
+-spec schema_check(schema_name(), decoded_data() | encoded_data(), [term()]) -> decoded_data().
+schema_check(SerdeName, Data, VarArgs) when is_list(VarArgs), is_binary(Data) ->
+    with_serde(
+        SerdeName,
+        fun(Serde) ->
+            ?BOOL(SerdeName, eval_decode(Serde, [Data | VarArgs]))
+        end
+    );
+schema_check(SerdeName, Data, VarArgs) when is_list(VarArgs), is_map(Data) ->
+    with_serde(
+        SerdeName,
+        fun(Serde) ->
+            ?BOOL(SerdeName, eval_encode(Serde, [Data | VarArgs]))
+        end
+    ).
+
 -spec decode(schema_name(), encoded_data()) -> decoded_data().
 -spec decode(schema_name(), encoded_data()) -> decoded_data().
 decode(SerdeName, RawData) ->
 decode(SerdeName, RawData) ->
     decode(SerdeName, RawData, []).
     decode(SerdeName, RawData, []).
 
 
 -spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
 -spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
 decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
 decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
-    case emqx_schema_registry:get_serde(SerdeName) of
-        {error, not_found} ->
-            error({serde_not_found, SerdeName});
-        {ok, Serde} ->
-            eval_decode(Serde, [RawData | VarArgs])
-    end.
+    with_serde(SerdeName, fun(Serde) ->
+        eval_decode(Serde, [RawData | VarArgs])
+    end).
 
 
 -spec encode(schema_name(), decoded_data()) -> encoded_data().
 -spec encode(schema_name(), decoded_data()) -> encoded_data().
 encode(SerdeName, RawData) ->
 encode(SerdeName, RawData) ->
     encode(SerdeName, RawData, []).
     encode(SerdeName, RawData, []).
 
 
 -spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
 -spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
-encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) ->
-    case emqx_schema_registry:get_serde(SerdeName) of
-        {error, not_found} ->
-            error({serde_not_found, SerdeName});
+encode(SerdeName, Data, VarArgs) when is_list(VarArgs) ->
+    with_serde(
+        SerdeName,
+        fun(Serde) ->
+            eval_encode(Serde, [Data | VarArgs])
+        end
+    ).
+
+with_serde(Name, F) ->
+    case emqx_schema_registry:get_serde(Name) of
         {ok, Serde} ->
         {ok, Serde} ->
-            eval_encode(Serde, [EncodedData | VarArgs])
+            F(Serde);
+        {error, not_found} ->
+            error({serde_not_found, Name})
     end.
     end.
 
 
 -spec make_serde(serde_type(), schema_name(), schema_source()) -> serde().
 -spec make_serde(serde_type(), schema_name(), schema_source()) -> serde().
@@ -108,7 +143,17 @@ make_serde(protobuf, Name, Source) ->
         name = Name,
         name = Name,
         type = protobuf,
         type = protobuf,
         eval_context = SerdeMod
         eval_context = SerdeMod
-    }.
+    };
+make_serde(json, Name, Source) ->
+    case json_decode(Source) of
+        SchemaObj when is_map(SchemaObj) ->
+            %% jesse:add_schema adds any map() without further validation
+            %% if it's not a map, then case_clause
+            ok = jesse_add_schema(Name, SchemaObj),
+            #serde{name = Name, type = json};
+        _NotMap ->
+            error({invalid_json_schema, bad_schema_object})
+    end.
 
 
 eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
 eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
     Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
     Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
@@ -116,14 +161,29 @@ eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
 eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) ->
 eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) ->
     MessageName = binary_to_existing_atom(MessageName0, utf8),
     MessageName = binary_to_existing_atom(MessageName0, utf8),
     Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]),
     Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]),
-    emqx_utils_maps:binary_key_map(Decoded).
+    emqx_utils_maps:binary_key_map(Decoded);
+eval_decode(#serde{type = json, name = Name}, [Data]) ->
+    true = is_binary(Data),
+    Term = json_decode(Data),
+    {ok, NewTerm} = jesse_validate(Name, Term),
+    NewTerm.
 
 
 eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
 eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
     avro_binary_encoder:encode(Store, Name, Data);
     avro_binary_encoder:encode(Store, Name, Data);
 eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) ->
 eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) ->
     DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
     DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
     MessageName = binary_to_existing_atom(MessageName0, utf8),
     MessageName = binary_to_existing_atom(MessageName0, utf8),
-    apply(SerdeMod, encode_msg, [DecodedData, MessageName]).
+    apply(SerdeMod, encode_msg, [DecodedData, MessageName]);
+eval_encode(#serde{type = json, name = Name}, [Map]) ->
+    %% The input Map may not be a valid JSON term for jesse
+    Data = iolist_to_binary(emqx_utils_json:encode(Map)),
+    NewMap = json_decode(Data),
+    case jesse_validate(Name, NewMap) of
+        {ok, _} ->
+            Data;
+        {error, Reason} ->
+            error(Reason)
+    end.
 
 
 destroy(#serde{type = avro, name = _Name}) ->
 destroy(#serde{type = avro, name = _Name}) ->
     ?tp(serde_destroyed, #{type => avro, name => _Name}),
     ?tp(serde_destroyed, #{type => avro, name => _Name}),
@@ -131,12 +191,31 @@ destroy(#serde{type = avro, name = _Name}) ->
 destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) ->
 destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) ->
     unload_code(SerdeMod),
     unload_code(SerdeMod),
     ?tp(serde_destroyed, #{type => protobuf, name => _Name}),
     ?tp(serde_destroyed, #{type => protobuf, name => _Name}),
+    ok;
+destroy(#serde{type = json, name = Name}) ->
+    ok = jesse_del_schema(Name),
+    ?tp(serde_destroyed, #{type => json, name => Name}),
     ok.
     ok.
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Internal fns
 %% Internal fns
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
+json_decode(Data) ->
+    emqx_utils_json:decode(Data, [return_maps]).
+
+jesse_add_schema(Name, Obj) ->
+    jesse:add_schema(jesse_name(Name), Obj).
+
+jesse_del_schema(Name) ->
+    jesse:del_schema(jesse_name(Name)).
+
+jesse_validate(Name, Map) ->
+    jesse:validate(jesse_name(Name), Map, []).
+
+jesse_name(Str) ->
+    unicode:characters_to_list(Str).
+
 -spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
 -spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
 make_protobuf_serde_mod(Name, Source) ->
 make_protobuf_serde_mod(Name, Source) ->
     {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
     {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),

+ 39 - 14
apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl

@@ -23,14 +23,15 @@
 all() ->
 all() ->
     [
     [
         {group, avro},
         {group, avro},
-        {group, protobuf}
+        {group, protobuf},
+        {group, json}
     ] ++ sparkplug_tests().
     ] ++ sparkplug_tests().
 
 
 groups() ->
 groups() ->
     AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(),
     AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(),
     ProtobufOnlyTCs = protobuf_only_tcs(),
     ProtobufOnlyTCs = protobuf_only_tcs(),
     TCs = AllTCsExceptSP -- ProtobufOnlyTCs,
     TCs = AllTCsExceptSP -- ProtobufOnlyTCs,
-    [{avro, TCs}, {protobuf, AllTCsExceptSP}].
+    [{avro, TCs}, {json, TCs}, {protobuf, AllTCsExceptSP}].
 
 
 protobuf_only_tcs() ->
 protobuf_only_tcs() ->
     [
     [
@@ -57,6 +58,8 @@ end_per_suite(_Config) ->
 
 
 init_per_group(avro, Config) ->
 init_per_group(avro, Config) ->
     [{serde_type, avro} | Config];
     [{serde_type, avro} | Config];
+init_per_group(json, Config) ->
+    [{serde_type, json} | Config];
 init_per_group(protobuf, Config) ->
 init_per_group(protobuf, Config) ->
     [{serde_type, protobuf} | Config];
     [{serde_type, protobuf} | Config];
 init_per_group(_Group, Config) ->
 init_per_group(_Group, Config) ->
@@ -140,6 +143,18 @@ schema_params(avro) ->
     },
     },
     SourceBin = emqx_utils_json:encode(Source),
     SourceBin = emqx_utils_json:encode(Source),
     #{type => avro, source => SourceBin};
     #{type => avro, source => SourceBin};
+schema_params(json) ->
+    Source =
+        #{
+            type => object,
+            properties => #{
+                i => #{type => integer},
+                s => #{type => string}
+            },
+            required => [<<"i">>, <<"s">>]
+        },
+    SourceBin = emqx_utils_json:encode(Source),
+    #{type => json, source => SourceBin};
 schema_params(protobuf) ->
 schema_params(protobuf) ->
     SourceBin =
     SourceBin =
         <<
         <<
@@ -162,7 +177,7 @@ create_serde(SerdeType, SerdeName) ->
     ok = emqx_schema_registry:add_schema(SerdeName, Schema),
     ok = emqx_schema_registry:add_schema(SerdeName, Schema),
     ok.
     ok.
 
 
-test_params_for(avro, encode_decode1) ->
+test_params_for(Type, encode_decode1) when Type =:= avro; Type =:= json ->
     SQL =
     SQL =
         <<
         <<
             "select\n"
             "select\n"
@@ -186,7 +201,7 @@ test_params_for(avro, encode_decode1) ->
         expected_rule_output => ExpectedRuleOutput,
         expected_rule_output => ExpectedRuleOutput,
         extra_args => ExtraArgs
         extra_args => ExtraArgs
     };
     };
-test_params_for(avro, encode1) ->
+test_params_for(Type, encode1) when Type =:= avro; Type =:= json ->
     SQL =
     SQL =
         <<
         <<
             "select\n"
             "select\n"
@@ -202,7 +217,7 @@ test_params_for(avro, encode1) ->
         payload_template => PayloadTemplate,
         payload_template => PayloadTemplate,
         extra_args => ExtraArgs
         extra_args => ExtraArgs
     };
     };
-test_params_for(avro, decode1) ->
+test_params_for(Type, decode1) when Type =:= avro; Type =:= json ->
     SQL =
     SQL =
         <<
         <<
             "select\n"
             "select\n"
@@ -503,13 +518,18 @@ t_encode(Config) ->
     PayloadBin = emqx_utils_json:encode(Payload),
     PayloadBin = emqx_utils_json:encode(Payload),
     emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
     emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
     Published = receive_published(?LINE),
     Published = receive_published(?LINE),
-    ?assertMatch(
-        #{payload := P} when is_binary(P),
-        Published
-    ),
-    #{payload := Encoded} = Published,
-    {ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
-    ?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs])),
+    case SerdeType of
+        json ->
+            %% should have received binary
+            %% but since it's valid json, so it got
+            %% 'safe_decode' decoded in receive_published
+            ?assertMatch(#{payload := #{<<"i">> := _, <<"s">> := _}}, Published);
+        _ ->
+            ?assertMatch(#{payload := B} when is_binary(B), Published),
+            #{payload := Encoded} = Published,
+            {ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
+            ?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs]))
+    end,
     ok.
     ok.
 
 
 t_decode(Config) ->
 t_decode(Config) ->
@@ -607,8 +627,13 @@ t_protobuf_union_decode(Config) ->
 t_fail_rollback(Config) ->
 t_fail_rollback(Config) ->
     SerdeType = ?config(serde_type, Config),
     SerdeType = ?config(serde_type, Config),
     OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
     OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
-    BrokenSchema = OkSchema#{<<"source">> := <<"{}">>},
-
+    BrokenSchema =
+        case SerdeType of
+            json ->
+                OkSchema#{<<"source">> := <<"not a json value">>};
+            _ ->
+                OkSchema#{<<"source">> := <<"{}">>}
+        end,
     ?assertMatch(
     ?assertMatch(
         {ok, _},
         {ok, _},
         emqx_conf:update(
         emqx_conf:update(

+ 23 - 4
apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl

@@ -23,14 +23,16 @@
 all() ->
 all() ->
     [
     [
         {group, avro},
         {group, avro},
-        {group, protobuf}
+        {group, protobuf},
+        {group, json}
     ].
     ].
 
 
 groups() ->
 groups() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
     AllTCs = emqx_common_test_helpers:all(?MODULE),
     [
     [
         {avro, AllTCs},
         {avro, AllTCs},
-        {protobuf, AllTCs}
+        {protobuf, AllTCs},
+        {json, AllTCs}
     ].
     ].
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
@@ -80,6 +82,23 @@ init_per_group(protobuf, Config) ->
         {schema_source, SourceBin},
         {schema_source, SourceBin},
         {invalid_schema_source, InvalidSourceBin}
         {invalid_schema_source, InvalidSourceBin}
         | Config
         | Config
+    ];
+init_per_group(json, Config) ->
+    Source =
+        #{
+            properties => #{
+                foo => #{},
+                bar => #{}
+            },
+            required => [<<"foo">>]
+        },
+    SourceBin = emqx_utils_json:encode(Source),
+    InvalidSourceBin = <<"\"not an object\"">>,
+    [
+        {serde_type, json},
+        {schema_source, SourceBin},
+        {invalid_schema_source, InvalidSourceBin}
+        | Config
     ].
     ].
 
 
 end_per_group(_Group, _Config) ->
 end_per_group(_Group, _Config) ->
@@ -279,7 +298,7 @@ t_crud(Config) ->
             <<"code">> := <<"BAD_REQUEST">>,
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> :=
             <<"message">> :=
                 #{
                 #{
-                    <<"expected">> := <<"avro | protobuf">>,
+                    <<"expected">> := <<"avro | protobuf | json">>,
                     <<"field_name">> := <<"type">>
                     <<"field_name">> := <<"type">>
                 }
                 }
         }},
         }},
@@ -302,7 +321,7 @@ t_crud(Config) ->
             <<"code">> := <<"BAD_REQUEST">>,
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> :=
             <<"message">> :=
                 #{
                 #{
-                    <<"expected">> := <<"avro | protobuf">>,
+                    <<"expected">> := <<"avro | protobuf | json">>,
                     <<"field_name">> := <<"type">>
                     <<"field_name">> := <<"type">>
                 }
                 }
         }},
         }},

+ 77 - 8
apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl

@@ -15,6 +15,10 @@
 -import(emqx_common_test_helpers, [on_exit/1]).
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 
 -define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
 -define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
+-define(INVALID_JSON, #{
+    reason := #{expected := "emqx_schema:json_binary()"},
+    kind := validation_error
+}).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %% CT boilerplate
@@ -79,7 +83,21 @@ schema_params(protobuf) ->
             "           }\n"
             "           }\n"
             "          "
             "          "
         >>,
         >>,
-    #{type => protobuf, source => SourceBin}.
+    #{type => protobuf, source => SourceBin};
+schema_params(json) ->
+    Source =
+        #{
+            <<"$schema">> => <<"http://json-schema.org/draft-06/schema#">>,
+            <<"$id">> => <<"http://json-schema.org/draft-06/schema#">>,
+            type => object,
+            properties => #{
+                foo => #{type => integer},
+                bar => #{type => integer}
+            },
+            required => [<<"foo">>]
+        },
+    SourceBin = emqx_utils_json:encode(Source),
+    #{type => json, source => SourceBin}.
 
 
 assert_roundtrip(SerdeName, Original) ->
 assert_roundtrip(SerdeName, Original) ->
     Encoded = emqx_schema_registry_serde:encode(SerdeName, Original),
     Encoded = emqx_schema_registry_serde:encode(SerdeName, Original),
@@ -109,10 +127,7 @@ t_avro_invalid_json_schema(_Config) ->
     SerdeName = my_serde,
     SerdeName = my_serde,
     Params = schema_params(avro),
     Params = schema_params(avro),
     WrongParams = Params#{source := <<"{">>},
     WrongParams = Params#{source := <<"{">>},
-    ?assertMatch(
-        {error, #{reason := #{expected := _}}},
-        emqx_schema_registry:add_schema(SerdeName, WrongParams)
-    ),
+    ?assertMatch({error, ?INVALID_JSON}, emqx_schema_registry:add_schema(SerdeName, WrongParams)),
     ok.
     ok.
 
 
 t_avro_invalid_schema(_Config) ->
 t_avro_invalid_schema(_Config) ->
@@ -128,14 +143,27 @@ t_avro_invalid_schema(_Config) ->
 t_serde_not_found(_Config) ->
 t_serde_not_found(_Config) ->
     %% for coverage
     %% for coverage
     NonexistentSerde = <<"nonexistent">>,
     NonexistentSerde = <<"nonexistent">>,
-    Original = #{},
+    EncodeData = #{},
+    DecodeData = <<"data">>,
+    ?assertError(
+        {serde_not_found, NonexistentSerde},
+        emqx_schema_registry_serde:encode(NonexistentSerde, EncodeData)
+    ),
+    ?assertError(
+        {serde_not_found, NonexistentSerde},
+        emqx_schema_registry_serde:decode(NonexistentSerde, DecodeData)
+    ),
     ?assertError(
     ?assertError(
         {serde_not_found, NonexistentSerde},
         {serde_not_found, NonexistentSerde},
-        emqx_schema_registry_serde:encode(NonexistentSerde, Original)
+        emqx_schema_registry_serde:handle_rule_function(schema_check, [
+            NonexistentSerde, EncodeData
+        ])
     ),
     ),
     ?assertError(
     ?assertError(
         {serde_not_found, NonexistentSerde},
         {serde_not_found, NonexistentSerde},
-        emqx_schema_registry_serde:decode(NonexistentSerde, Original)
+        emqx_schema_registry_serde:handle_rule_function(schema_check, [
+            NonexistentSerde, DecodeData
+        ])
     ),
     ),
     ok.
     ok.
 
 
@@ -171,3 +199,44 @@ t_protobuf_invalid_schema(_Config) ->
         emqx_schema_registry:add_schema(SerdeName, WrongParams)
         emqx_schema_registry:add_schema(SerdeName, WrongParams)
     ),
     ),
     ok.
     ok.
+
+t_json_invalid_schema(_Config) ->
+    SerdeName = invalid_json,
+    Params = schema_params(json),
+    BadParams1 = Params#{source := <<"not valid json value">>},
+    BadParams2 = Params#{source := <<"\"not an object\"">>},
+    BadParams3 = Params#{source := <<"{\"foo\": 1}">>},
+    ?assertMatch({error, ?INVALID_JSON}, emqx_schema_registry:add_schema(SerdeName, BadParams1)),
+    ?assertMatch(
+        {error, {post_config_update, _, {invalid_json_schema, bad_schema_object}}},
+        emqx_schema_registry:add_schema(SerdeName, BadParams2)
+    ),
+    ?assertMatch(
+        ok,
+        emqx_schema_registry:add_schema(SerdeName, BadParams3)
+    ),
+    ok.
+
+t_roundtrip_json(_Config) ->
+    SerdeName = my_json_schema,
+    Params = schema_params(json),
+    ok = emqx_schema_registry:add_schema(SerdeName, Params),
+    Original = #{<<"foo">> => 1, <<"bar">> => 2},
+    assert_roundtrip(SerdeName, Original),
+    ok.
+
+t_json_validation(_Config) ->
+    SerdeName = my_json_schema,
+    Params = schema_params(json),
+    ok = emqx_schema_registry:add_schema(SerdeName, Params),
+    F = fun(Fn, Data) ->
+        emqx_schema_registry_serde:handle_rule_function(Fn, [SerdeName, Data])
+    end,
+    OK = #{<<"foo">> => 1, <<"bar">> => 2},
+    NotOk = #{<<"bar">> => 2},
+    ?assert(F(schema_check, OK)),
+    ?assert(F(schema_check, <<"{\"foo\": 1, \"bar\": 2}">>)),
+    ?assertNot(F(schema_check, NotOk)),
+    ?assertNot(F(schema_check, <<"{\"bar\": 2}">>)),
+    ?assertNot(F(schema_check, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)),
+    ok.

+ 3 - 0
changes/ee/feat-12581.en.md

@@ -0,0 +1,3 @@
+Add JSON schema to schema registry.
+
+JSON Schema supports [Draft 03](http://tools.ietf.org/html/draft-zyp-json-schema-03), [Draft 04](http://tools.ietf.org/html/draft-zyp-json-schema-04) and [Draft 06](https://datatracker.ietf.org/doc/html/draft-wright-json-schema-00).

+ 1 - 2
rebar.config.erl

@@ -50,8 +50,7 @@ deps(Config) ->
 
 
 overrides() ->
 overrides() ->
     [
     [
-        {add, [{extra_src_dirs, [{"etc", [{recursive, true}]}]}]},
-        {add, jesse, [{erl_opts, [nowarn_match_float_zero]}]}
+        {add, [{extra_src_dirs, [{"etc", [{recursive, true}]}]}]}
     ] ++ snabbkaffe_overrides().
     ] ++ snabbkaffe_overrides().
 
 
 %% Temporary workaround for a rebar3 erl_opts duplication
 %% Temporary workaround for a rebar3 erl_opts duplication

+ 24 - 4
rel/i18n/emqx_schema_registry_schema.hocon

@@ -12,6 +12,14 @@ protobuf_type.desc:
 protobuf_type.label:
 protobuf_type.label:
 """Protocol Buffers"""
 """Protocol Buffers"""
 
 
+json_type.desc: """~
+    Supports JSON Schema
+    [Draft 03](http://tools.ietf.org/html/draft-zyp-json-schema-03)
+    [Draft 04](http://tools.ietf.org/html/draft-zyp-json-schema-04) and
+    [Draft 06](https://datatracker.ietf.org/doc/html/draft-wright-json-schema-00)."""
+
+json_type.label: "JSON Schema"
+
 schema_description.desc:
 schema_description.desc:
 """A description for this schema."""
 """A description for this schema."""
 
 
@@ -42,10 +50,22 @@ schema_source.desc:
 schema_source.label:
 schema_source.label:
 """Schema source"""
 """Schema source"""
 
 
-schema_type.desc:
-"""Schema type."""
+schema_type_avro.desc:
+"""Must be `avro` for Avro schema."""
+
+schema_type_avro.label:
+"""Avro Schema"""
+
+schema_type_protobuf.desc:
+"""Must be `protobuf` for protobuf schema."""
+
+schema_type_protobuf.label:
+"""Protobuf Schema"""
+
+schema_type_json.desc:
+"""Must be `json` for JSON schema."""
 
 
-schema_type.label:
-"""Schema type"""
+schema_type_json.label:
+"""JSON Schema"""
 
 
 }
 }

+ 1 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -198,6 +198,7 @@ procs
 progname
 progname
 prometheus
 prometheus
 proto
 proto
+protobuf
 ps
 ps
 psk
 psk
 pubsub
 pubsub