Ver código fonte

Merge pull request #10409 from thalesmg/protobuf-schema-v50

feat(schema_registry): add support for protobuf schemas
Thales Macedo Garitezi 2 anos atrás
pai
commit
586cd54aa2

+ 1 - 4
build

@@ -124,10 +124,7 @@ make_docs() {
 }
 
 assert_no_compile_time_only_deps() {
-    if [ "$("$FIND" "_build/$PROFILE/rel/emqx/lib/" -maxdepth 1 -name 'gpb-*' -type d)" != "" ]; then
-        echo "gpb should not be included in the release"
-        exit 1
-    fi
+    :
 }
 
 make_rel() {

+ 1 - 0
changes/ee/feat-10409.en.md

@@ -0,0 +1 @@
+Add support for [Protocol Buffers](https://protobuf.dev/) schemas in Schema Registry.

+ 19 - 2
lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl

@@ -10,14 +10,19 @@
 
 -define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard).
 -define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
+-define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab).
 
 -type schema_name() :: binary().
 -type schema_source() :: binary().
 
 -type encoded_data() :: iodata().
 -type decoded_data() :: map().
--type serializer() :: fun((decoded_data()) -> encoded_data()).
--type deserializer() :: fun((encoded_data()) -> decoded_data()).
+-type serializer() ::
+    fun((decoded_data()) -> encoded_data())
+    | fun((decoded_data(), term()) -> encoded_data()).
+-type deserializer() ::
+    fun((encoded_data()) -> decoded_data())
+    | fun((encoded_data(), term()) -> decoded_data()).
 -type destructor() :: fun(() -> ok).
 -type serde_type() :: avro.
 -type serde_opts() :: map().
@@ -29,6 +34,18 @@
     destructor :: destructor()
 }).
 -type serde() :: #serde{}.
+
+-record(protobuf_cache, {
+    fingerprint,
+    module,
+    module_binary
+}).
+-type protobuf_cache() :: #protobuf_cache{
+    fingerprint :: binary(),
+    module :: module(),
+    module_binary :: binary()
+}.
+
 -type serde_map() :: #{
     name := schema_name(),
     serializer := serializer(),

+ 2 - 1
lib-ee/emqx_ee_schema_registry/rebar.config

@@ -4,7 +4,8 @@
 {deps, [
   {emqx, {path, "../../apps/emqx"}},
   {emqx_utils, {path, "../../apps/emqx_utils"}},
-  {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}
+  {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}},
+  {gpb, "4.19.7"}
 ]}.
 
 {shell, [

+ 2 - 1
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src

@@ -6,7 +6,8 @@
     {applications, [
         kernel,
         stdlib,
-        erlavro
+        erlavro,
+        gpb
     ]},
     {env, []},
     {modules, []},

+ 8 - 1
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl

@@ -176,7 +176,14 @@ create_tables() ->
         {record_name, serde},
         {attributes, record_info(fields, serde)}
     ]),
-    ok = mria:wait_for_tables([?SERDE_TAB]),
+    ok = mria:create_table(?PROTOBUF_CACHE_TAB, [
+        {type, set},
+        {rlog_shard, ?SCHEMA_REGISTRY_SHARD},
+        {storage, disc_only_copies},
+        {record_name, protobuf_cache},
+        {attributes, record_info(fields, protobuf_cache)}
+    ]),
+    ok = mria:wait_for_tables([?SERDE_TAB, ?PROTOBUF_CACHE_TAB]),
     ok.
 
 do_build_serdes(Schemas) ->

+ 18 - 2
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl

@@ -53,10 +53,20 @@ fields(avro) ->
             mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})},
         {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
     ];
+fields(protobuf) ->
+    [
+        {type, mk(protobuf, #{required => true, desc => ?DESC("schema_type")})},
+        {source, mk(binary(), #{required => true, desc => ?DESC("schema_source")})},
+        {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
+    ];
 fields("get_avro") ->
     [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
+fields("get_protobuf") ->
+    [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)];
 fields("put_avro") ->
     fields(avro);
+fields("put_protobuf") ->
+    fields(protobuf);
 fields("post_" ++ Type) ->
     fields("get_" ++ Type).
 
@@ -64,6 +74,8 @@ desc(?CONF_KEY_ROOT) ->
     ?DESC("schema_registry_root");
 desc(avro) ->
     ?DESC("avro_type");
+desc(protobuf) ->
+    ?DESC("protobuf_type");
 desc(_) ->
     undefined.
 
@@ -96,7 +108,7 @@ mk(Type, Meta) -> hoconsc:mk(Type, Meta).
 ref(Name) -> hoconsc:ref(?MODULE, Name).
 
 supported_serde_types() ->
-    [avro].
+    [avro, protobuf].
 
 refs() ->
     [ref(Type) || Type <- supported_serde_types()].
@@ -105,6 +117,8 @@ refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
     refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
 refs(#{<<"type">> := <<"avro">>}) ->
     [ref(avro)];
+refs(#{<<"type">> := <<"protobuf">>}) ->
+    [ref(protobuf)];
 refs(_) ->
     Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
     throw(#{
@@ -113,12 +127,14 @@ refs(_) ->
     }).
 
 refs_get_api() ->
-    [ref("get_avro")].
+    [ref("get_avro"), ref("get_protobuf")].
 
 refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
     refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
 refs_get_api(#{<<"type">> := <<"avro">>}) ->
     [ref("get_avro")];
+refs_get_api(#{<<"type">> := <<"protobuf">>}) ->
+    [ref("get_protobuf")];
 refs_get_api(_) ->
     Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
     throw(#{

+ 132 - 0
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl

@@ -4,8 +4,11 @@
 -module(emqx_ee_schema_registry_serde).
 
 -include("emqx_ee_schema_registry.hrl").
+-include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
+-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_ee_schema_registry_serde]}}]).
+
 %% API
 -export([
     decode/2,
@@ -55,6 +58,27 @@ make_serde(avro, Name, Source0) ->
         ?tp(serde_destroyed, #{type => avro, name => Name}),
         ok
     end,
+    {Serializer, Deserializer, Destructor};
+make_serde(protobuf, Name, Source) ->
+    SerdeMod = make_protobuf_serde_mod(Name, Source),
+    Serializer =
+        fun(DecodedData0, MessageName0) ->
+            DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
+            MessageName = binary_to_existing_atom(MessageName0, utf8),
+            SerdeMod:encode_msg(DecodedData, MessageName)
+        end,
+    Deserializer =
+        fun(EncodedData, MessageName0) ->
+            MessageName = binary_to_existing_atom(MessageName0, utf8),
+            Decoded = SerdeMod:decode_msg(EncodedData, MessageName),
+            emqx_utils_maps:binary_key_map(Decoded)
+        end,
+    Destructor =
+        fun() ->
+            unload_code(SerdeMod),
+            ?tp(serde_destroyed, #{type => protobuf, name => Name}),
+            ok
+        end,
     {Serializer, Deserializer, Destructor}.
 
 %%------------------------------------------------------------------------------
@@ -68,3 +92,111 @@ inject_avro_name(Name, Source0) ->
     Schema0 = emqx_utils_json:decode(Source0, [return_maps]),
     Schema = Schema0#{<<"name">> => Name},
     emqx_utils_json:encode(Schema).
+
+-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
+make_protobuf_serde_mod(Name, Source) ->
+    {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
+    case lazy_generate_protobuf_code(SerdeMod0, Source) of
+        {ok, SerdeMod, ModBinary} ->
+            load_code(SerdeMod, SerdeModFileName, ModBinary),
+            SerdeMod;
+        {error, #{error := Error, warnings := Warnings}} ->
+            ?SLOG(
+                warning,
+                #{
+                    msg => "error_generating_protobuf_code",
+                    error => Error,
+                    warnings => Warnings
+                }
+            ),
+            error({invalid_protobuf_schema, Error})
+    end.
+
+-spec protobuf_serde_mod_name(schema_name()) -> {module(), string()}.
+protobuf_serde_mod_name(Name) ->
+    %% must be a string (list)
+    SerdeModName = "$schema_parser_" ++ binary_to_list(Name),
+    SerdeMod = list_to_atom(SerdeModName),
+    %% the "path" to the module, for `code:load_binary'.
+    SerdeModFileName = SerdeModName ++ ".memory",
+    {SerdeMod, SerdeModFileName}.
+
+-spec lazy_generate_protobuf_code(module(), schema_source()) ->
+    {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
+lazy_generate_protobuf_code(SerdeMod0, Source) ->
+    %% We run this inside a transaction with locks to avoid running
+    %% the compile on all nodes; only one will get the lock, compile
+    %% the schema, and other nodes will simply read the final result.
+    {atomic, Res} = mria:transaction(
+        ?SCHEMA_REGISTRY_SHARD,
+        fun lazy_generate_protobuf_code_trans/2,
+        [SerdeMod0, Source]
+    ),
+    Res.
+
+-spec lazy_generate_protobuf_code_trans(module(), schema_source()) ->
+    {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
+lazy_generate_protobuf_code_trans(SerdeMod0, Source) ->
+    Fingerprint = erlang:md5(Source),
+    _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write),
+    case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of
+        [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
+            ?tp(schema_registry_protobuf_cache_hit, #{}),
+            {ok, SerdeMod, ModBinary};
+        [] ->
+            ?tp(schema_registry_protobuf_cache_miss, #{}),
+            case generate_protobuf_code(SerdeMod0, Source) of
+                {ok, SerdeMod, ModBinary} ->
+                    CacheEntry = #protobuf_cache{
+                        fingerprint = Fingerprint,
+                        module = SerdeMod,
+                        module_binary = ModBinary
+                    },
+                    ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
+                    {ok, SerdeMod, ModBinary};
+                {ok, SerdeMod, ModBinary, _Warnings} ->
+                    CacheEntry = #protobuf_cache{
+                        fingerprint = Fingerprint,
+                        module = SerdeMod,
+                        module_binary = ModBinary
+                    },
+                    ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
+                    {ok, SerdeMod, ModBinary};
+                error ->
+                    {error, #{error => undefined, warnings => []}};
+                {error, Error} ->
+                    {error, #{error => Error, warnings => []}};
+                {error, Error, Warnings} ->
+                    {error, #{error => Error, warnings => Warnings}}
+            end
+    end.
+
+generate_protobuf_code(SerdeMod, Source) ->
+    gpb_compile:string(
+        SerdeMod,
+        Source,
+        [
+            binary,
+            strings_as_binaries,
+            {maps, true},
+            %% Fixme: currently, some bug in `gpb' prevents this
+            %% option from working with `oneof' types...  We're then
+            %% forced to use atom key maps.
+            %% {maps_key_type, binary},
+            {maps_oneof, flat},
+            {verify, always},
+            {maps_unset_optional, omitted}
+        ]
+    ).
+
+-spec load_code(module(), string(), binary()) -> ok.
+load_code(SerdeMod, SerdeModFileName, ModBinary) ->
+    _ = code:purge(SerdeMod),
+    {module, SerdeMod} = code:load_binary(SerdeMod, SerdeModFileName, ModBinary),
+    ok.
+
+-spec unload_code(module()) -> ok.
+unload_code(SerdeMod) ->
+    _ = code:purge(SerdeMod),
+    _ = code:delete(SerdeMod),
+    ok.

+ 323 - 67
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl

@@ -21,11 +21,19 @@
 %%------------------------------------------------------------------------------
 
 all() ->
-    [{group, avro}].
+    [{group, avro}, {group, protobuf}].
 
 groups() ->
-    TCs = emqx_common_test_helpers:all(?MODULE),
-    [{avro, TCs}].
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    ProtobufOnlyTCs = protobuf_only_tcs(),
+    TCs = AllTCs -- ProtobufOnlyTCs,
+    [{avro, TCs}, {protobuf, AllTCs}].
+
+protobuf_only_tcs() ->
+    [
+        t_protobuf_union_encode,
+        t_protobuf_union_decode
+    ].
 
 init_per_suite(Config) ->
     emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
@@ -38,6 +46,8 @@ end_per_suite(_Config) ->
 
 init_per_group(avro, Config) ->
     [{serde_type, avro} | Config];
+init_per_group(protobuf, Config) ->
+    [{serde_type, protobuf} | Config];
 init_per_group(_Group, Config) ->
     Config.
 
@@ -95,8 +105,12 @@ create_rule_http(RuleParams) ->
     Path = emqx_mgmt_api_test_util:api_path(["rules"]),
     AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
     case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
-        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
-        Error -> Error
+        {ok, Res0} ->
+            Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
+            on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+            {ok, Res};
+        Error ->
+            Error
     end.
 
 schema_params(avro) ->
@@ -108,35 +122,174 @@ schema_params(avro) ->
         ]
     },
     SourceBin = emqx_utils_json:encode(Source),
-    #{type => avro, source => SourceBin}.
+    #{type => avro, source => SourceBin};
+schema_params(protobuf) ->
+    SourceBin =
+        <<
+            "message Person {\n"
+            "     required string name = 1;\n"
+            "     required int32 id = 2;\n"
+            "     optional string email = 3;\n"
+            "  }\n"
+            "message UnionValue {\n"
+            "    oneof u {\n"
+            "        int32  a = 1;\n"
+            "        string b = 2;\n"
+            "    }\n"
+            "}\n"
+        >>,
+    #{type => protobuf, source => SourceBin}.
 
 create_serde(SerdeType, SerdeName) ->
     Schema = schema_params(SerdeType),
     ok = emqx_ee_schema_registry:add_schema(SerdeName, Schema),
     ok.
 
-sql_for(avro, encode_decode1) ->
-    <<
-        "select\n"
-        "         schema_decode('my_serde',\n"
-        "           schema_encode('my_serde', json_decode(payload))) as decoded,\n"
-        "         decoded.i as decoded_int,\n"
-        "         decoded.s as decoded_string\n"
-        "       from t"
-    >>;
-sql_for(avro, encode1) ->
-    <<
-        "select\n"
-        "         schema_encode('my_serde', json_decode(payload)) as encoded\n"
-        "       from t"
-    >>;
-sql_for(avro, decode1) ->
-    <<
-        "select\n"
-        "         schema_decode('my_serde', payload) as decoded\n"
-        "       from t"
-    >>;
-sql_for(Type, Name) ->
+test_params_for(avro, encode_decode1) ->
+    SQL =
+        <<
+            "select\n"
+            "  schema_decode('my_serde',\n"
+            "    schema_encode('my_serde', json_decode(payload))) as decoded,\n\n"
+            "  decoded.i as decoded_int,\n"
+            "  decoded.s as decoded_string\n"
+            "from t\n"
+        >>,
+    Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
+    ExpectedRuleOutput =
+        #{
+            <<"decoded">> => #{<<"i">> => 10, <<"s">> => <<"text">>},
+            <<"decoded_int">> => 10,
+            <<"decoded_string">> => <<"text">>
+        },
+    ExtraArgs = [],
+    #{
+        sql => SQL,
+        payload => Payload,
+        expected_rule_output => ExpectedRuleOutput,
+        extra_args => ExtraArgs
+    };
+test_params_for(avro, encode1) ->
+    SQL =
+        <<
+            "select\n"
+            "  schema_encode('my_serde', json_decode(payload)) as encoded\n"
+            "from t\n"
+        >>,
+    Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
+    ExtraArgs = [],
+    #{
+        sql => SQL,
+        payload => Payload,
+        extra_args => ExtraArgs
+    };
+test_params_for(avro, decode1) ->
+    SQL =
+        <<
+            "select\n"
+            "  schema_decode('my_serde', payload) as decoded\n"
+            "from t\n"
+        >>,
+    Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
+    ExtraArgs = [],
+    #{
+        sql => SQL,
+        payload => Payload,
+        extra_args => ExtraArgs
+    };
+test_params_for(protobuf, encode_decode1) ->
+    SQL =
+        <<
+            "select\n"
+            "  schema_decode('my_serde',\n"
+            "    schema_encode('my_serde', json_decode(payload), 'Person'),\n"
+            "      'Person') as decoded,\n"
+            "  decoded.name as decoded_name,\n"
+            "  decoded.email as decoded_email,\n"
+            "  decoded.id as decoded_id\n"
+            "from t\n"
+        >>,
+    Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
+    ExpectedRuleOutput =
+        #{
+            <<"decoded">> =>
+                #{
+                    <<"email">> => <<"emqx@emqx.io">>,
+                    <<"id">> => 10,
+                    <<"name">> => <<"some name">>
+                },
+            <<"decoded_email">> => <<"emqx@emqx.io">>,
+            <<"decoded_id">> => 10,
+            <<"decoded_name">> => <<"some name">>
+        },
+    ExtraArgs = [<<"Person">>],
+    #{
+        sql => SQL,
+        payload => Payload,
+        extra_args => ExtraArgs,
+        expected_rule_output => ExpectedRuleOutput
+    };
+test_params_for(protobuf, decode1) ->
+    SQL =
+        <<
+            "select\n"
+            "  schema_decode('my_serde', payload, 'Person') as decoded\n"
+            "from t\n"
+        >>,
+    Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
+    ExtraArgs = [<<"Person">>],
+    #{
+        sql => SQL,
+        payload => Payload,
+        extra_args => ExtraArgs
+    };
+test_params_for(protobuf, encode1) ->
+    SQL =
+        <<
+            "select\n"
+            "  schema_encode('my_serde', json_decode(payload), 'Person') as encoded\n"
+            "from t\n"
+        >>,
+    Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
+    ExtraArgs = [<<"Person">>],
+    #{
+        sql => SQL,
+        payload => Payload,
+        extra_args => ExtraArgs
+    };
+test_params_for(protobuf, union1) ->
+    SQL =
+        <<
+            "select\n"
+            "  schema_decode('my_serde', payload, 'UnionValue') as decoded,\n"
+            "  decoded.a as decoded_a,\n"
+            "  decoded.b as decoded_b\n"
+            "from t\n"
+        >>,
+    PayloadA = #{<<"a">> => 10},
+    PayloadB = #{<<"b">> => <<"string">>},
+    ExtraArgs = [<<"UnionValue">>],
+    #{
+        sql => SQL,
+        payload => #{a => PayloadA, b => PayloadB},
+        extra_args => ExtraArgs
+    };
+test_params_for(protobuf, union2) ->
+    SQL =
+        <<
+            "select\n"
+            "  schema_encode('my_serde', json_decode(payload), 'UnionValue') as encoded\n"
+            "from t\n"
+        >>,
+    PayloadA = #{<<"a">> => 10},
+    PayloadB = #{<<"b">> => <<"string">>},
+    ExtraArgs = [<<"UnionValue">>],
+    #{
+        sql => SQL,
+        payload => #{a => PayloadA, b => PayloadB},
+        extra_args => ExtraArgs
+    };
+test_params_for(Type, Name) ->
     ct:fail("unimplemented: ~p", [{Type, Name}]).
 
 clear_schemas() ->
@@ -238,6 +391,40 @@ wait_for_cluster_rpc(Node) ->
         true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
     ).
 
+serde_deletion_calls_destructor_spec(#{serde_type := SerdeType}, Trace) ->
+    ?assert(
+        ?strict_causality(
+            #{?snk_kind := will_delete_schema},
+            #{?snk_kind := serde_destroyed, type := SerdeType},
+            Trace
+        )
+    ),
+    ok.
+
+protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) ->
+    #{nodes := Nodes} = Res,
+    CacheEvents = ?of_kind(
+        [
+            schema_registry_protobuf_cache_hit,
+            schema_registry_protobuf_cache_miss
+        ],
+        Trace
+    ),
+    ?assertMatch(
+        [
+            schema_registry_protobuf_cache_hit,
+            schema_registry_protobuf_cache_miss
+        ],
+        lists:sort(?projection(?snk_kind, CacheEvents))
+    ),
+    ?assertEqual(
+        lists:usort(Nodes),
+        lists:usort([N || #{?snk_meta := #{node := N}} <- CacheEvents])
+    ),
+    ok;
+protobuf_unique_cache_hit_spec(_Res, _Trace) ->
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -259,27 +446,16 @@ t_encode_decode(Config) ->
     SerdeType = ?config(serde_type, Config),
     SerdeName = my_serde,
     ok = create_serde(SerdeType, SerdeName),
-    {ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode_decode1)}),
-    on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
+    #{
+        sql := SQL,
+        payload := Payload,
+        expected_rule_output := ExpectedRuleOutput
+    } = test_params_for(SerdeType, encode_decode1),
+    {ok, _} = create_rule_http(#{sql => SQL}),
     PayloadBin = emqx_utils_json:encode(Payload),
     emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
     Res = receive_action_results(),
-    ?assertMatch(
-        #{
-            data :=
-                #{
-                    <<"decoded">> :=
-                        #{
-                            <<"i">> := 10,
-                            <<"s">> := <<"text">>
-                        },
-                    <<"decoded_int">> := 10,
-                    <<"decoded_string">> := <<"text">>
-                }
-        },
-        Res
-    ),
+    ?assertMatch(#{data := ExpectedRuleOutput}, Res),
     ok.
 
 t_delete_serde(Config) ->
@@ -308,9 +484,12 @@ t_encode(Config) ->
     SerdeType = ?config(serde_type, Config),
     SerdeName = my_serde,
     ok = create_serde(SerdeType, SerdeName),
-    {ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode1)}),
-    on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
+    #{
+        sql := SQL,
+        payload := Payload,
+        extra_args := ExtraArgs
+    } = test_params_for(SerdeType, encode1),
+    {ok, _} = create_rule_http(#{sql => SQL}),
     PayloadBin = emqx_utils_json:encode(Payload),
     emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
     Published = receive_published(?LINE),
@@ -320,18 +499,21 @@ t_encode(Config) ->
     ),
     #{payload := #{<<"encoded">> := Encoded}} = Published,
     {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
-    ?assertEqual(Payload, Deserializer(Encoded)),
+    ?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])),
     ok.
 
 t_decode(Config) ->
     SerdeType = ?config(serde_type, Config),
     SerdeName = my_serde,
     ok = create_serde(SerdeType, SerdeName),
-    {ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, decode1)}),
-    on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
+    #{
+        sql := SQL,
+        payload := Payload,
+        extra_args := ExtraArgs
+    } = test_params_for(SerdeType, decode1),
+    {ok, _} = create_rule_http(#{sql => SQL}),
     {ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
-    EncodedBin = Serializer(Payload),
+    EncodedBin = apply(Serializer, [Payload | ExtraArgs]),
     emqx:publish(emqx_message:make(<<"t">>, EncodedBin)),
     Published = receive_published(?LINE),
     ?assertMatch(
@@ -342,6 +524,76 @@ t_decode(Config) ->
     ?assertEqual(Payload, Decoded),
     ok.
 
+t_protobuf_union_encode(Config) ->
+    SerdeType = ?config(serde_type, Config),
+    ?assertEqual(protobuf, SerdeType),
+    SerdeName = my_serde,
+    ok = create_serde(SerdeType, SerdeName),
+    #{
+        sql := SQL,
+        payload := #{a := PayloadA, b := PayloadB},
+        extra_args := ExtraArgs
+    } = test_params_for(SerdeType, union1),
+    {ok, _} = create_rule_http(#{sql => SQL}),
+    {ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
+
+    EncodedBinA = apply(Serializer, [PayloadA | ExtraArgs]),
+    emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
+    PublishedA = receive_published(?LINE),
+    ?assertMatch(
+        #{payload := #{<<"decoded">> := _}},
+        PublishedA
+    ),
+    #{payload := #{<<"decoded">> := DecodedA}} = PublishedA,
+    ?assertEqual(PayloadA, DecodedA),
+
+    EncodedBinB = apply(Serializer, [PayloadB | ExtraArgs]),
+    emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)),
+    PublishedB = receive_published(?LINE),
+    ?assertMatch(
+        #{payload := #{<<"decoded">> := _}},
+        PublishedB
+    ),
+    #{payload := #{<<"decoded">> := DecodedB}} = PublishedB,
+    ?assertEqual(PayloadB, DecodedB),
+
+    ok.
+
+t_protobuf_union_decode(Config) ->
+    SerdeType = ?config(serde_type, Config),
+    ?assertEqual(protobuf, SerdeType),
+    SerdeName = my_serde,
+    ok = create_serde(SerdeType, SerdeName),
+    #{
+        sql := SQL,
+        payload := #{a := PayloadA, b := PayloadB},
+        extra_args := ExtraArgs
+    } = test_params_for(SerdeType, union2),
+    {ok, _} = create_rule_http(#{sql => SQL}),
+    {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
+
+    EncodedBinA = emqx_utils_json:encode(PayloadA),
+    emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
+    PublishedA = receive_published(?LINE),
+    ?assertMatch(
+        #{payload := #{<<"encoded">> := _}},
+        PublishedA
+    ),
+    #{payload := #{<<"encoded">> := EncodedA}} = PublishedA,
+    ?assertEqual(PayloadA, apply(Deserializer, [EncodedA | ExtraArgs])),
+
+    EncodedBinB = emqx_utils_json:encode(PayloadB),
+    emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)),
+    PublishedB = receive_published(?LINE),
+    ?assertMatch(
+        #{payload := #{<<"encoded">> := _}},
+        PublishedB
+    ),
+    #{payload := #{<<"encoded">> := EncodedB}} = PublishedB,
+    ?assertEqual(PayloadB, apply(Deserializer, [EncodedB | ExtraArgs])),
+
+    ok.
+
 t_fail_rollback(Config) ->
     SerdeType = ?config(serde_type, Config),
     OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
@@ -369,6 +621,10 @@ t_cluster_serde_build(Config) ->
     Cluster = cluster(Config),
     SerdeName = my_serde,
     Schema = schema_params(SerdeType),
+    #{
+        payload := Payload,
+        extra_args := ExtraArgs
+    } = test_params_for(SerdeType, encode_decode1),
     ?check_trace(
         begin
             Nodes = [N1, N2 | _] = start_cluster(Cluster),
@@ -385,8 +641,14 @@ t_cluster_serde_build(Config) ->
                         Res0 = emqx_ee_schema_registry:get_serde(SerdeName),
                         ?assertMatch({ok, #{}}, Res0, #{node => N}),
                         {ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0,
-                        Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
-                        ?assertEqual(Payload, Deserializer(Serializer(Payload)), #{node => N}),
+                        ?assertEqual(
+                            Payload,
+                            apply(
+                                Deserializer,
+                                [apply(Serializer, [Payload | ExtraArgs]) | ExtraArgs]
+                            ),
+                            #{node => N}
+                        ),
                         ok
                     end)
                 end,
@@ -417,17 +679,11 @@ t_cluster_serde_build(Config) ->
                 end,
                 Nodes
             ),
-            ok
+            #{serde_type => SerdeType, nodes => Nodes}
         end,
-        fun(Trace) ->
-            ?assert(
-                ?strict_causality(
-                    #{?snk_kind := will_delete_schema},
-                    #{?snk_kind := serde_destroyed, type := SerdeType},
-                    Trace
-                )
-            ),
-            ok
-        end
+        [
+            {"destructor is always called", fun ?MODULE:serde_deletion_calls_destructor_spec/2},
+            {"protobuf is only built on one node", fun ?MODULE:protobuf_unique_cache_hit_spec/2}
+        ]
     ),
     ok.

+ 68 - 20
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl

@@ -19,7 +19,17 @@
 %%------------------------------------------------------------------------------
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    [
+        {group, avro},
+        {group, protobuf}
+    ].
+
+groups() ->
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {avro, AllTCs},
+        {protobuf, AllTCs}
+    ].
 
 init_per_suite(Config) ->
     emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
@@ -30,6 +40,48 @@ end_per_suite(_Config) ->
     emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
     ok.
 
+init_per_group(avro, Config) ->
+    Source = #{
+        type => record,
+        fields => [
+            #{name => <<"i">>, type => <<"int">>},
+            #{name => <<"s">>, type => <<"string">>}
+        ]
+    },
+    SourceBin = emqx_utils_json:encode(Source),
+    InvalidSourceBin = <<"{}">>,
+    [
+        {serde_type, avro},
+        {schema_source, SourceBin},
+        {invalid_schema_source, InvalidSourceBin}
+        | Config
+    ];
+init_per_group(protobuf, Config) ->
+    SourceBin =
+        <<
+            "message Person {\n"
+            "     required string name = 1;\n"
+            "     required int32 id = 2;\n"
+            "     optional string email = 3;\n"
+            "  }\n"
+            "message UnionValue {\n"
+            "    oneof u {\n"
+            "        int32  a = 1;\n"
+            "        string b = 2;\n"
+            "    }\n"
+            "}\n"
+        >>,
+    InvalidSourceBin = <<"xxxx">>,
+    [
+        {serde_type, protobuf},
+        {schema_source, SourceBin},
+        {invalid_schema_source, InvalidSourceBin}
+        | Config
+    ].
+
+end_per_group(_Group, _Config) ->
+    ok.
+
 init_per_testcase(_TestCase, Config) ->
     clear_schemas(),
     ok = snabbkaffe:start_trace(),
@@ -93,18 +145,14 @@ clear_schemas() ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_crud(_Config) ->
-    SchemaName = <<"my_avro_schema">>,
-    Source = #{
-        type => record,
-        fields => [
-            #{name => <<"i">>, type => <<"int">>},
-            #{name => <<"s">>, type => <<"string">>}
-        ]
-    },
-    SourceBin = emqx_utils_json:encode(Source),
+t_crud(Config) ->
+    SerdeType = ?config(serde_type, Config),
+    SourceBin = ?config(schema_source, Config),
+    InvalidSourceBin = ?config(invalid_schema_source, Config),
+    SerdeTypeBin = atom_to_binary(SerdeType),
+    SchemaName = <<"my_schema">>,
     Params = #{
-        <<"type">> => <<"avro">>,
+        <<"type">> => SerdeTypeBin,
         <<"source">> => SourceBin,
         <<"name">> => SchemaName,
         <<"description">> => <<"My schema">>
@@ -138,7 +186,7 @@ t_crud(_Config) ->
     %% create a schema
     ?assertMatch(
         {ok, 201, #{
-            <<"type">> := <<"avro">>,
+            <<"type">> := SerdeTypeBin,
             <<"source">> := SourceBin,
             <<"name">> := SchemaName,
             <<"description">> := <<"My schema">>
@@ -147,7 +195,7 @@ t_crud(_Config) ->
     ),
     ?assertMatch(
         {ok, 200, #{
-            <<"type">> := <<"avro">>,
+            <<"type">> := SerdeTypeBin,
             <<"source">> := SourceBin,
             <<"name">> := SchemaName,
             <<"description">> := <<"My schema">>
@@ -157,7 +205,7 @@ t_crud(_Config) ->
     ?assertMatch(
         {ok, 200, [
             #{
-                <<"type">> := <<"avro">>,
+                <<"type">> := SerdeTypeBin,
                 <<"source">> := SourceBin,
                 <<"name">> := SchemaName,
                 <<"description">> := <<"My schema">>
@@ -168,7 +216,7 @@ t_crud(_Config) ->
     UpdateParams1 = UpdateParams#{<<"description">> := <<"My new schema">>},
     ?assertMatch(
         {ok, 200, #{
-            <<"type">> := <<"avro">>,
+            <<"type">> := SerdeTypeBin,
             <<"source">> := SourceBin,
             <<"name">> := SchemaName,
             <<"description">> := <<"My new schema">>
@@ -188,9 +236,9 @@ t_crud(_Config) ->
         {ok, 400, #{
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> :=
-                <<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">>
+                <<"{post_config_update,emqx_ee_schema_registry,", _/binary>>
         }},
-        request({put, SchemaName, UpdateParams#{<<"source">> := <<"{}">>}})
+        request({put, SchemaName, UpdateParams#{<<"source">> := InvalidSourceBin}})
     ),
 
     ?assertMatch(
@@ -229,9 +277,9 @@ t_crud(_Config) ->
         {ok, 400, #{
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> :=
-                <<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">>
+                <<"{post_config_update,emqx_ee_schema_registry,", _/binary>>
         }},
-        request({post, Params#{<<"source">> := <<"{}">>}})
+        request({post, Params#{<<"source">> := InvalidSourceBin}})
     ),
 
     %% unknown serde type

+ 52 - 1
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl

@@ -60,7 +60,25 @@ schema_params(avro) ->
         ]
     },
     SourceBin = emqx_utils_json:encode(Source),
-    #{type => avro, source => SourceBin}.
+    #{type => avro, source => SourceBin};
+schema_params(protobuf) ->
+    SourceBin =
+        <<
+            "\n"
+            "           message Person {\n"
+            "                required string name = 1;\n"
+            "                required int32 id = 2;\n"
+            "                optional string email = 3;\n"
+            "             }\n"
+            "           message UnionValue {\n"
+            "               oneof u {\n"
+            "                   int32  a = 1;\n"
+            "                   string b = 2;\n"
+            "               }\n"
+            "           }\n"
+            "          "
+        >>,
+    #{type => protobuf, source => SourceBin}.
 
 assert_roundtrip(SerdeName, Original) ->
     Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original),
@@ -119,3 +137,36 @@ t_serde_not_found(_Config) ->
         emqx_ee_schema_registry_serde:decode(NonexistentSerde, Original)
     ),
     ok.
+
+t_roundtrip_protobuf(_Config) ->
+    SerdeName = my_serde,
+    Params = schema_params(protobuf),
+    ok = emqx_ee_schema_registry:add_schema(SerdeName, Params),
+    ExtraArgsPerson = [<<"Person">>],
+
+    Original0 = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
+    assert_roundtrip(SerdeName, Original0, ExtraArgsPerson, ExtraArgsPerson),
+
+    %% removing optional field
+    Original1 = #{<<"name">> => <<"some name">>, <<"id">> => 10},
+    assert_roundtrip(SerdeName, Original1, ExtraArgsPerson, ExtraArgsPerson),
+
+    %% `oneof' fields
+    ExtraArgsUnion = [<<"UnionValue">>],
+    Original2 = #{<<"a">> => 1},
+    assert_roundtrip(SerdeName, Original2, ExtraArgsUnion, ExtraArgsUnion),
+
+    Original3 = #{<<"b">> => <<"string">>},
+    assert_roundtrip(SerdeName, Original3, ExtraArgsUnion, ExtraArgsUnion),
+
+    ok.
+
+t_protobuf_invalid_schema(_Config) ->
+    SerdeName = my_serde,
+    Params = schema_params(protobuf),
+    WrongParams = Params#{source := <<"xxxx">>},
+    ?assertMatch(
+        {error, {post_config_update, _, {invalid_protobuf_schema, _}}},
+        emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
+    ),
+    ok.

+ 1 - 1
mix.exs

@@ -94,7 +94,7 @@ defmodule EMQXUmbrella.MixProject do
       {:ranch,
        github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true},
       # in conflict by grpc and eetcd
-      {:gpb, "4.19.5", override: true, runtime: false},
+      {:gpb, "4.19.7", override: true, runtime: false},
       {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true}
     ] ++
       emqx_apps(profile_info, version) ++

+ 1 - 1
rebar.config

@@ -53,7 +53,7 @@
     [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}
     , {redbug, "2.0.8"}
     , {covertool, {git, "https://github.com/zmstone/covertool", {tag, "2.0.4.1"}}}
-    , {gpb, "4.19.5"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
+    , {gpb, "4.19.7"}
     , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
     , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
     , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.7"}}}

+ 6 - 0
rel/i18n/emqx_ee_schema_registry_schema.hocon

@@ -6,6 +6,12 @@ avro_type.desc:
 avro_type.label:
 """Apache Avro"""
 
+protobuf_type.desc:
+"""[Protocol Buffers](https://protobuf.dev/) serialization format."""
+
+protobuf_type.label:
+"""Protocol Buffers"""
+
 schema_description.desc:
 """A description for this schema."""
 

+ 6 - 0
rel/i18n/zh/emqx_ee_schema_registry_schema.hocon

@@ -6,6 +6,12 @@ avro_type.desc:
 avro_type.label:
 """阿帕奇-阿夫罗"""
 
+protobuf_type.desc:
+"""[协议缓冲器](https://protobuf.dev/) 序列化格式。"""
+
+protobuf_type.label:
+"""协议缓冲器"""
+
 schema_description.desc:
 """对该模式的描述。"""