Browse Source

Merge pull request #13818 from id/20240917-sync-master

sync master
Ivan Dyachkov 1 năm trước cách đây
mục cha
commit
5fc939f1ae
40 tập tin đã thay đổi với 2102 bổ sung196 xóa
  1. 5 0
      .ci/docker-compose-file/confluent-schema-registry/jaas.conf
  2. 1 0
      .ci/docker-compose-file/confluent-schema-registry/password-file
  3. 32 0
      .ci/docker-compose-file/docker-compose-confluent-schema-registry.yaml
  4. 0 1
      .github/workflows/_push-entrypoint.yaml
  5. 3 4
      .github/workflows/run_test_cases.yaml
  6. 24 0
      apps/emqx/src/emqx_schema.erl
  7. 1 15
      apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_action_schema.erl
  8. 1 1
      apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src
  9. 2 25
      apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl
  10. 136 0
      apps/emqx_bridge_snowflake/docs/dev-quick-ref.md
  11. 1 1
      apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_action_schema.erl
  12. 92 9
      apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl
  13. 14 1
      apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector_schema.erl
  14. 140 11
      apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl
  15. 53 0
      apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_tests.erl
  16. 8 4
      apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl
  17. 1 2
      apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl
  18. 2 0
      apps/emqx_mix_utils/lib/mix/tasks/emqx.eunit.ex
  19. 3 0
      apps/emqx_schema_registry/docker-ct
  20. 1 0
      apps/emqx_schema_registry/include/emqx_schema_registry.hrl
  21. 1 0
      apps/emqx_schema_registry/mix.exs
  22. 2 1
      apps/emqx_schema_registry/rebar.config
  23. 1 0
      apps/emqx_schema_registry/src/emqx_schema_registry.app.src
  24. 4 86
      apps/emqx_schema_registry/src/emqx_schema_registry.erl
  25. 2 6
      apps/emqx_schema_registry/src/emqx_schema_registry_app.erl
  26. 284 0
      apps/emqx_schema_registry/src/emqx_schema_registry_config.erl
  27. 373 0
      apps/emqx_schema_registry/src/emqx_schema_registry_external.erl
  28. 223 2
      apps/emqx_schema_registry/src/emqx_schema_registry_http_api.erl
  29. 71 0
      apps/emqx_schema_registry/src/emqx_schema_registry_schema.erl
  30. 42 0
      apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl
  31. 90 17
      apps/emqx_schema_registry/src/emqx_schema_registry_sup.erl
  32. 110 6
      apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl
  33. 308 3
      apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl
  34. 3 0
      apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl
  35. 8 0
      apps/emqx_utils/src/emqx_utils_redact.erl
  36. 1 0
      changes/ee/feat-13804.en.md
  37. 3 1
      mix.exs
  38. 18 0
      rel/i18n/emqx_schema_registry_http_api.hocon
  39. 35 0
      rel/i18n/emqx_schema_registry_schema.hocon
  40. 3 0
      scripts/ct/run.sh

+ 5 - 0
.ci/docker-compose-file/confluent-schema-registry/jaas.conf

@@ -0,0 +1,5 @@
+SchemaRegistry-Props {
+  org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
+  file="/etc/schema-registry/password-file"
+  debug="true";
+};

+ 1 - 0
.ci/docker-compose-file/confluent-schema-registry/password-file

@@ -0,0 +1 @@
+cpsruser:mypass,user

+ 32 - 0
.ci/docker-compose-file/docker-compose-confluent-schema-registry.yaml

@@ -0,0 +1,32 @@
+version: '3.9'
+
+## depends on kafka; see `./docker-file-compose-kafka.yaml`.
+services:
+  confluent_schema_registry: &cpsr
+    image: public.ecr.aws/ews-network/confluentinc/cp-schema-registry:7.5.1
+    container_name: confluent_schema_registry
+    restart: always
+    depends_on:
+      zookeeper:
+        condition: service_started
+      kafka_1:
+        condition: service_started
+    networks:
+      emqx_bridge:
+    environment: &cpsr-env
+      SCHEMA_REGISTRY_DEBUG: true
+      SCHEMA_REGISTRY_HOST_NAME: confluent_schema_registry
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka-1.emqx.net:9092"
+  confluent_schema_registry_basicauth:
+    <<: *cpsr
+    container_name: confluent_schema_registry_basicauth
+    environment:
+      <<: *cpsr-env
+      SCHEMA_REGISTRY_HOST_NAME: confluent_schema_registry_basicauth
+      SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
+      SCHEMA_REGISTRY_AUTHENTICATION_ROLES: user
+      SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry-Props
+      SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/etc/schema-registry/jaas.conf
+    volumes:
+      - ./confluent-schema-registry/jaas.conf:/etc/schema-registry/jaas.conf
+      - ./confluent-schema-registry/password-file:/etc/schema-registry/password-file

+ 0 - 1
.github/workflows/_push-entrypoint.yaml

@@ -125,7 +125,6 @@ jobs:
     uses: ./.github/workflows/build_slim_packages.yaml
 
   compile:
-    if: needs.prepare.outputs.release != 'true'
     runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }}
     container: ${{ needs.init.outputs.BUILDER }}
     needs:

+ 3 - 4
.github/workflows/run_test_cases.yaml

@@ -68,7 +68,7 @@ jobs:
           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
         run: make coveralls
 
-      - run: cat rebar3.crashdump
+      - run: cat rebar3.crashdump || true
         if: failure()
 
   ct_docker:
@@ -124,7 +124,7 @@ jobs:
 
       - name: rebar3.crashdump
         if: failure()
-        run: cat rebar3.crashdump
+        run: cat rebar3.crashdump || true
 
       - name: compress logs
         if: failure()
@@ -180,7 +180,7 @@ jobs:
           ls _build/test/cover/*.coverdata || exit 0
           make coveralls
 
-      - run: cat rebar3.crashdump
+      - run: cat rebar3.crashdump || true
         if: failure()
 
       - name: compress logs
@@ -217,4 +217,3 @@ jobs:
           git-commit: ${{ github.sha }}
 
       - run: echo "All tests passed"
-          

+ 24 - 0
apps/emqx/src/emqx_schema.erl

@@ -146,6 +146,7 @@
 ]).
 
 -export([listeners/0]).
+-export([mkunion/2, mkunion/3]).
 
 -behaviour(hocon_schema).
 
@@ -3936,3 +3937,26 @@ listeners() ->
                 }
             )}
     ].
+
+mkunion(Field, Schemas) ->
+    mkunion(Field, Schemas, none).
+
+mkunion(Field, Schemas, Default) ->
+    hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Default, Arg) end).
+
+scunion(_Field, Schemas, _Default, all_union_members) ->
+    maps:values(Schemas);
+scunion(Field, Schemas, Default, {value, Value}) ->
+    Selector =
+        case maps:get(emqx_utils_conv:bin(Field), Value, undefined) of
+            undefined ->
+                Default;
+            X ->
+                emqx_utils_conv:bin(X)
+        end,
+    case maps:find(Selector, Schemas) of
+        {ok, Schema} ->
+            [Schema];
+        _Error ->
+            throw(#{field_name => Field, expected => maps:keys(Schemas)})
+    end.

+ 1 - 15
apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_action_schema.erl

@@ -56,7 +56,7 @@ fields(action) ->
 fields(?ACTION_TYPE) ->
     emqx_bridge_v2_schema:make_producer_action_schema(
         mk(
-            mkunion(mode, #{
+            emqx_schema:mkunion(mode, #{
                 <<"direct">> => ref(direct_parameters),
                 <<"aggregated">> => ref(aggreg_parameters)
             }),
@@ -289,20 +289,6 @@ action_example(put, direct) ->
 ref(Name) -> hoconsc:ref(?MODULE, Name).
 mk(Type, Meta) -> hoconsc:mk(Type, Meta).
 
-mkunion(Field, Schemas) ->
-    hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Arg) end).
-
-scunion(_Field, Schemas, all_union_members) ->
-    maps:values(Schemas);
-scunion(Field, Schemas, {value, Value}) ->
-    Selector = maps:get(emqx_utils_conv:bin(Field), Value, undefined),
-    case Selector == undefined orelse maps:find(emqx_utils_conv:bin(Selector), Schemas) of
-        {ok, Schema} ->
-            [Schema];
-        _Error ->
-            throw(#{field_name => Field, expected => maps:keys(Schemas)})
-    end.
-
 block_size_validator(SizeLimit) ->
     case SizeLimit =< 4_000 * 1024 * 1024 of
         true -> ok;

+ 1 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_s3, [
     {description, "EMQX Enterprise S3 Bridge"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 25
apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl

@@ -66,7 +66,7 @@ fields(action) ->
 fields(?ACTION) ->
     emqx_bridge_v2_schema:make_producer_action_schema(
         hoconsc:mk(
-            mkunion(
+            emqx_schema:mkunion(
                 mode,
                 #{
                     <<"direct">> => ?R_REF(s3_direct_upload_parameters),
@@ -121,7 +121,7 @@ fields(s3_aggregated_upload_parameters) ->
                 )},
             {container,
                 hoconsc:mk(
-                    mkunion(type, #{
+                    emqx_schema:mkunion(type, #{
                         <<"csv">> => ?REF(s3_aggregated_container_csv)
                     }),
                     #{
@@ -196,29 +196,6 @@ fields(s3_upload_resource_opts) ->
         {batch_time, #{default => ?DEFAULT_AGGREG_BATCH_TIME}}
     ]).
 
-mkunion(Field, Schemas) ->
-    mkunion(Field, Schemas, none).
-
-mkunion(Field, Schemas, Default) ->
-    hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Default, Arg) end).
-
-scunion(_Field, Schemas, _Default, all_union_members) ->
-    maps:values(Schemas);
-scunion(Field, Schemas, Default, {value, Value}) ->
-    Selector =
-        case maps:get(emqx_utils_conv:bin(Field), Value, undefined) of
-            undefined ->
-                Default;
-            X ->
-                emqx_utils_conv:bin(X)
-        end,
-    case maps:find(Selector, Schemas) of
-        {ok, Schema} ->
-            [Schema];
-        _Error ->
-            throw(#{field_name => Field, expected => maps:keys(Schemas)})
-    end.
-
 desc(s3) ->
     ?DESC(s3_upload);
 desc(Name) when

+ 136 - 0
apps/emqx_bridge_snowflake/docs/dev-quick-ref.md

@@ -1,5 +1,41 @@
+## Initialize Snowflake ODBC driver
+
+### Linux
+
+Run `scripts/install-snowflake-driver.sh` to install the Snowflake ODBC driver and configure `odbc.ini`.
+
+### macOS
+
+- Install unixODBC (e.g. `brew install unixodbc`)
+- [Download and install iODBC](https://github.com/openlink/iODBC/releases/download/v3.52.16/iODBC-SDK-3.52.16-macOS11.dmg)
+- [Download and install the Snowflake ODBC driver](https://sfc-repo.snowflakecomputing.com/odbc/macuniversal/3.3.2/snowflake_odbc_mac_64universal-3.3.2.dmg)
+- Refer to [Installing and configuring the ODBC Driver for macOS](https://docs.snowflake.com/en/developer-guide/odbc/odbc-mac) for more information.
+- Update `~/.odbc.ini` and `/opt/snowflake/snowflakeodbc/lib/universal/simba.snowflake.ini`:
+
+```sh
+chown $(id -u):$(id -g) /opt/snowflake/snowflakeodbc/lib/universal/simba.snowflake.ini
+echo 'ODBCInstLib=libiodbcinst.dylib' >> /opt/snowflake/snowflakeodbc/lib/universal/simba.snowflake.ini
+
+cat < EOF > ~/.odbc.ini
+[ODBC]
+Trace=no
+TraceFile=
+
+[ODBC Drivers]
+Snowflake = Installed
+
+[ODBC Data Sources]
+snowflake = Snowflake
+
+[Snowflake]
+Driver = /opt/snowflake/snowflakeodbc/lib/universal/libSnowflake.dylib
+EOF
+```
+
 ## Basic helper functions
 
+### Elixir
+
 ```elixir
 Application.ensure_all_started(:odbc)
 user = "your_admin_user"
@@ -17,13 +53,31 @@ dsn = "snowflake"
 query = fn conn, sql -> :odbc.sql_query(conn, sql |> to_charlist()) end
 ```
 
+### Erlang
+
+```erlang
+application:ensure_all_started(odbc).
+User = "your_admin_user".
+Pass = os:getenv("SNOWFLAKE_PASSWORD").
+OrgID = "orgid".
+Account = "accountid".
+Server = lists:flatten([OrgID, "-", Account, ".snowflakecomputing.com"]).
+DSN = "snowflake".
+{ok, Conn} = odbc:connect(["dsn=snowflake;uid=", User, ";pwd=", Pass, ";server=", Server, ";account=", Account], []).
+Query = fun(Conn, Sql) -> odbc:sql_query(Conn, Sql) end.
+```
+
 ## Create user
 
+### Shell
+
 ```sh
 openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt
 openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem
 ```
 
+### Elixir
+
 ```elixir
 test_user = "testuser"
 query.(conn, "create user #{test_user} password = 'TestUser99' must_change_password = false")
@@ -35,8 +89,26 @@ query.(conn, "alter user #{test_user} set rsa_public_key = '#{public_pem_content
 # {:updated, :undefined}
 ```
 
+### Erlang
+
+```erlang
+TestUser = "testuser".
+Query(Conn, ["create user ", TestUser, " password = 'TestUser99' must_change_password = false"]).
+# {updated,undefined}
+
+{ok, Bin} = file:read_file("snowflake_rsa_key.public.pem").
+Pem = binary_to_list(Bin).
+[_ | Lines] = string:split(string:trim(Pem), "\n", all).
+PublicPemContentsTrimmed = lists:join("\n", lists:droplast(Lines)).
+
+Query(Conn, ["alter user ", TestUser, " set rsa_public_key = '", PublicPemContentsTrimmed, "'"]).
+# {updated,undefined}
+```
+
 ## Create database objects
 
+### Elixir
+
 ```elixir
 database = "testdatabase"
 schema = "public"
@@ -92,3 +164,67 @@ query.(conn, "grant role #{test_role} to user #{test_user}")
 query.(conn, "alter user #{snowpipe_user} set default_role = #{snowpipe_role}")
 query.(conn, "alter user testuser set default_role = #{test_role}")
 ```
+
+### Erlang
+
+```erlang
+Database = "testdatabase",
+Schema = "public",
+Table = "test1",
+Stage = "teststage0",
+Pipe = "testpipe0",
+Warehouse = "testwarehouse",
+SnowpipeRole = "snowpipe1",
+SnowpipeUser = "snowpipeuser",
+TestRole = "testrole",
+FqnTable = [Database, ".", Schema, ".", Table],
+FqnStage = [Database, ".", Schema, ".", Stage],
+FqnPipe = [Database, ".", Schema, ".", Pipe],
+
+Query(Conn, "use role accountadmin"),
+
+% Create database, table, stage, pipe, warehouse
+Query(Conn, ["create database if not exists ", Database]),
+Query(Conn, ["create or replace table ", FqnTable, " (clientid string, topic string, payload binary, publish_received_at timestamp_ltz)"]),
+Query(Conn, ["create stage if not exists ", FqnStage, " file_format = (type = csv parse_header = true) copy_options = (on_error = continue purge = true)"]),
+Query(Conn, ["create pipe if not exists ", FqnPipe, " as copy into ", FqnTable, " from @", FqnStage, " match_by_column_name = case_insensitive"]),
+Query(Conn, ["create or replace warehouse ", Warehouse]),
+
+% Create a role for the Snowpipe privileges.
+Query(Conn, ["create or replace role ", SnowpipeRole]),
+Query(Conn, ["create or replace role ", TestRole]),
+
+% Grant the USAGE privilege on the database and schema that contain the pipe object.
+Query(Conn, ["grant usage on database ", Database, " to role ", SnowpipeRole]),
+Query(Conn, ["grant usage on database ", Database, " to role ", TestRole]),
+Query(Conn, ["grant usage on schema ", Database, ".", Schema, " to role ", SnowpipeRole]),
+Query(Conn, ["grant usage on schema ", Database, ".", Schema, " to role ", TestRole]),
+
+% Grant the INSERT and SELECT privileges on the target table.
+Query(Conn, ["grant insert, select on ", FqnTable, " to role ", SnowpipeRole]),
+% For cleaning up table after tests
+Query(Conn, ["grant insert, select, truncate, delete on ", FqnTable, " to role ", TestRole]),
+
+% Grant the USAGE privilege on the external stage.
+% Must use read/write for internal stage
+% Query(Conn, ["grant usage on stage ", FqnStage, " to role ", SnowpipeRole]),
+Query(Conn, ["grant read, write on stage ", FqnStage, " to role ", SnowpipeRole]),
+% For cleaning up table after tests
+Query(Conn, ["grant read, write on stage ", FqnStage, " to role ", TestRole]),
+
+% Grant the OPERATE and MONITOR privileges on the pipe object.
+Query(Conn, ["grant operate, monitor on pipe ", FqnPipe, " to role ", SnowpipeRole]),
+
+% Grant the role to a user
+Query(Conn, ["create user if not exists ", SnowpipeUser, " password = 'TestUser99' must_change_password = false rsa_public_key = '", PublicPemContentsTrimmed, "'"]),
+
+Query(Conn, ["grant usage on warehouse ", Warehouse, " to role ", TestRole]),
+
+Query(Conn, ["grant role ", SnowpipeRole, " to user ", SnowpipeUser]),
+Query(Conn, ["grant role ", SnowpipeRole, " to user ", TestUser]),
+Query(Conn, ["grant role ", TestRole, " to user ", TestUser]),
+
+% Set the role as the default role for the user
+Query(Conn, ["alter user ", SnowpipeUser, " set default_role = ", SnowpipeRole]),
+Query(Conn, ["alter user testuser set default_role = ", TestRole]).
+```

+ 1 - 1
apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_action_schema.erl

@@ -84,7 +84,7 @@ fields(aggreg_parameters) ->
             })},
         {pipelining, mk(pos_integer(), #{default => 100, desc => ?DESC("pipelining")})},
         {pool_size, mk(pos_integer(), #{default => 8, desc => ?DESC("pool_size")})},
-        {max_retries, mk(non_neg_integer(), #{required => false, desc => ?DESC("max_retries")})},
+        {max_retries, mk(non_neg_integer(), #{default => 3, desc => ?DESC("max_retries")})},
         {max_block_size,
             mk(
                 emqx_schema:bytesize(),

+ 92 - 9
apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl

@@ -58,7 +58,7 @@
 ]).
 
 %% Internal exports only for mocking
--export([do_insert_files_request/4]).
+-export([do_insert_files_request/4, do_insert_report_request/4]).
 
 %%------------------------------------------------------------------------------
 %% Type declarations
@@ -392,6 +392,7 @@ stage_file(ODBCPool, Filename, Database, Schema, Stage, ActionName) ->
     {ok, file:filename()} | {error, term()}.
 do_stage_file(ConnPid, Filename, Database, Schema, Stage, ActionName) ->
     SQL = stage_file_sql(Filename, Database, Schema, Stage, ActionName),
+    ?tp(debug, "snowflake_stage_file", #{sql => SQL, action => ActionName}),
     %% Should we also check if it actually succeeded by inspecting reportFiles?
     odbc:sql_query(ConnPid, SQL).
 
@@ -607,6 +608,7 @@ process_complete(TransferState0) ->
             {ok, 200, _, Body} ->
                 {ok, emqx_utils_json:decode(Body, [return_maps])};
             Res ->
+                ?tp("snowflake_insert_files_request_failed", #{response => Res}),
                 %% TODO: retry?
                 exit({insert_failed, Res})
         end
@@ -625,6 +627,7 @@ create_action(
 ) ->
     maybe
         {ok, ActionState0} ?= start_http_pool(ActionResId, ActionConfig, ConnState),
+        _ = check_snowpipe_user_permission(ActionResId, ActionState0),
         start_aggregator(ConnResId, ActionResId, ActionConfig, ActionState0)
     end.
 
@@ -879,13 +882,7 @@ insert_report_request(HTTPPool, Opts, HTTPClientConfig) ->
                 InsertReportPath0
         end,
     Req = {InsertReportPath, Headers},
-    Response = ehttpc:request(
-        HTTPPool,
-        get,
-        Req,
-        RequestTTL,
-        MaxRetries
-    ),
+    Response = ?MODULE:do_insert_report_request(HTTPPool, Req, RequestTTL, MaxRetries),
     case Response of
         {ok, 200, _Headers, Body0} ->
             Body = emqx_utils_json:decode(Body0, [return_maps]),
@@ -894,6 +891,10 @@ insert_report_request(HTTPPool, Opts, HTTPClientConfig) ->
             {error, Response}
     end.
 
+%% Internal export only for mocking
+do_insert_report_request(HTTPPool, Req, RequestTTL, MaxRetries) ->
+    ehttpc:request(HTTPPool, get, Req, RequestTTL, MaxRetries).
+
 http_headers(AuthnHeader) ->
     [
         {<<"X-Snowflake-Authorization-Token-Type">>, <<"KEYPAIR_JWT">>},
@@ -915,6 +916,8 @@ action_status(ActionResId, #{mode := aggregated} = ActionState) ->
     %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
     Timestamp = erlang:system_time(second),
     ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
+    ok = check_aggreg_upload_errors(AggregId),
+    ok = check_snowpipe_user_permission(ActionResId, ActionState),
     case http_pool_workers_healthy(ActionResId, ConnectTimeout) of
         true ->
             ?status_connected;
@@ -968,7 +971,7 @@ http_pool_workers_healthy(HTTPPool, Timeout) ->
 
 %% https://docs.snowflake.com/en/sql-reference/identifiers-syntax
 needs_quoting(Identifier) ->
-    nomatch =:= re:run(Identifier, <<"^[A-Za-z_][A-Za-z_0-9$]$">>, [{capture, none}]).
+    nomatch =:= re:run(Identifier, <<"^[A-Za-z_][A-Za-z_0-9$]*$">>, [{capture, none}]).
 
 maybe_quote(Identifier) ->
     case needs_quoting(Identifier) of
@@ -977,3 +980,83 @@ maybe_quote(Identifier) ->
         false ->
             Identifier
     end.
+
+check_aggreg_upload_errors(AggregId) ->
+    case emqx_connector_aggregator:take_error(AggregId) of
+        [Error] ->
+            ?tp("snowflake_check_aggreg_upload_error_found", #{error => Error}),
+            %% TODO
+            %% This approach means that, for example, 3 upload failures will cause
+            %% the channel to be marked as unhealthy for 3 consecutive health checks.
+            ErrorMessage = emqx_utils:format(Error),
+            throw({unhealthy_target, ErrorMessage});
+        [] ->
+            ok
+    end.
+
+check_snowpipe_user_permission(HTTPPool, ActionState) ->
+    #{http := HTTPClientConfig} = ActionState,
+    Opts = #{},
+    case insert_report_request(HTTPPool, Opts, HTTPClientConfig) of
+        {ok, _} ->
+            ok;
+        {error, {ok, 401, _, Body0}} ->
+            Body =
+                case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+                    {ok, JSON} -> JSON;
+                    {error, _} -> Body0
+                end,
+            ?SLOG(debug, #{
+                msg => "snowflake_check_snowpipe_user_permission_error",
+                body => Body
+            }),
+            Msg = <<
+                "Configured pipe user does not have permissions to operate on pipe,"
+                " or does not exist. Please check your configuration."
+            >>,
+            throw({unhealthy_target, Msg});
+        {error, {ok, StatusCode, _}} ->
+            Msg = iolist_to_binary([
+                <<"Error checking if configured snowpipe user has permissions.">>,
+                <<" HTTP Status Code:">>,
+                integer_to_binary(StatusCode)
+            ]),
+            %% Not marking it as unhealthy because it could be spurious
+            throw(Msg);
+        {error, {ok, StatusCode, _, Body}} ->
+            Msg = iolist_to_binary([
+                <<"Error checking if configured snowpipe user has permissions.">>,
+                <<" HTTP Status Code:">>,
+                integer_to_binary(StatusCode),
+                <<"; Body: ">>,
+                Body
+            ]),
+            %% Not marking it as unhealthy because it could be spurious
+            throw(Msg)
+    end.
+
+%%------------------------------------------------------------------------------
+%% Tests
+%%------------------------------------------------------------------------------
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+needs_quoting_test_() ->
+    PositiveCases = [
+        <<"with spaece">>,
+        <<"1_number_in_beginning">>,
+        <<"contains_açéntõ">>,
+        <<"with-hyphen">>,
+        <<"">>
+    ],
+    NegativeCases = [
+        <<"testdatabase">>,
+        <<"TESTDATABASE">>,
+        <<"TestDatabase">>,
+        <<"with_underscore">>,
+        <<"with_underscore_10">>
+    ],
+    Positive = lists:map(fun(Id) -> {Id, ?_assert(needs_quoting(Id))} end, PositiveCases),
+    Negative = lists:map(fun(Id) -> {Id, ?_assertNot(needs_quoting(Id))} end, NegativeCases),
+    Positive ++ Negative.
+-endif.

+ 14 - 1
apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector_schema.erl

@@ -68,7 +68,12 @@ fields(connector_config) ->
                 #{required => true, desc => ?DESC("server")},
                 ?SERVER_OPTS
             )},
-        {account, mk(binary(), #{required => true, desc => ?DESC("account")})},
+        {account,
+            mk(binary(), #{
+                required => true,
+                desc => ?DESC("account"),
+                validator => fun account_id_validator/1
+            })},
         {dsn, mk(binary(), #{required => true, desc => ?DESC("dsn")})}
         | Fields
     ] ++
@@ -143,3 +148,11 @@ connector_example(put) ->
 %%------------------------------------------------------------------------------
 
 mk(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+account_id_validator(AccountId) ->
+    case binary:split(AccountId, <<"-">>) of
+        [_, _] ->
+            ok;
+        _ ->
+            {error, <<"Account identifier must be of form ORGID-ACCOUNTNAME">>}
+    end.

+ 140 - 11
apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl

@@ -53,8 +53,8 @@ init_per_suite(Config) ->
     case os:getenv("SNOWFLAKE_ACCOUNT_ID", "") of
         "" ->
             Mock = true,
-            AccountId = "mocked_account_id",
-            Server = <<"mocked.snowflakecomputing.com">>,
+            AccountId = "mocked_orgid-mocked_account_id",
+            Server = <<"mocked_orgid-mocked_account_id.snowflakecomputing.com">>,
             Username = <<"mock_username">>,
             Password = <<"mock_password">>;
         AccountId ->
@@ -206,6 +206,12 @@ mock_snowflake() ->
         {selected, Headers, Rows}
     end),
     meck:expect(Mod, do_health_check_connector, fun(_ConnPid) -> true end),
+    %% Used in health checks
+    meck:expect(Mod, do_insert_report_request, fun(_HTTPPool, _Req, _RequestTTL, _MaxRetries) ->
+        Headers = [],
+        Body = emqx_utils_json:encode(#{}),
+        {ok, 200, Headers, Body}
+    end),
     meck:expect(Mod, do_insert_files_request, fun(_HTTPPool, _Req, _RequestTTL, _MaxRetries) ->
         Headers = [],
         Body = emqx_utils_json:encode(#{}),
@@ -462,26 +468,36 @@ get_begin_mark(#{mock := false}, ActionResId) ->
         emqx_bridge_snowflake_connector:insert_report(ActionResId, #{}),
     BeginMark.
 
-wait_until_processed(Config, ActionResId, BeginMark) when is_list(Config) ->
-    wait_until_processed(maps:from_list(Config), ActionResId, BeginMark);
-wait_until_processed(#{mock := true} = Config, _ActionResId, _BeginMark) ->
-    {ok, _} = ?block_until(#{?snk_kind := "mock_snowflake_insert_file_request"}),
+wait_until_processed(Config, ActionResId, BeginMark) ->
+    wait_until_processed(Config, ActionResId, BeginMark, _ExpectedNumFiles = 1).
+
+wait_until_processed(Config, ActionResId, BeginMark, ExpectedNumFiles) when is_list(Config) ->
+    wait_until_processed(maps:from_list(Config), ActionResId, BeginMark, ExpectedNumFiles);
+wait_until_processed(#{mock := true} = Config, _ActionResId, _BeginMark, ExpectedNumFiles) ->
+    snabbkaffe:block_until(
+        ?match_n_events(
+            ExpectedNumFiles,
+            #{?snk_kind := "mock_snowflake_insert_file_request"}
+        ),
+        _Timeout = infinity,
+        _BackInTIme = infinity
+    ),
     InsertRes = maps:get(mocked_insert_report, Config, #{}),
     {ok, InsertRes};
-wait_until_processed(#{mock := false} = Config, ActionResId, BeginMark) ->
+wait_until_processed(#{mock := false} = Config, ActionResId, BeginMark, ExpectedNumFiles) ->
     {ok, Res} =
         emqx_bridge_snowflake_connector:insert_report(ActionResId, #{begin_mark => BeginMark}),
     ct:pal("insert report (begin mark ~s):\n  ~p", [BeginMark, Res]),
     case Res of
         #{
-            <<"files">> := [_ | _],
+            <<"files">> := Files,
             <<"statistics">> := #{<<"activeFilesCount">> := 0}
-        } ->
+        } when length(Files) >= ExpectedNumFiles ->
             ct:pal("insertReport response:\n  ~p", [Res]),
             {ok, Res};
         _ ->
             ct:sleep(2_000),
-            wait_until_processed(Config, ActionResId, BeginMark)
+            wait_until_processed(Config, ActionResId, BeginMark, ExpectedNumFiles)
     end.
 
 bin2hex(Bin) ->
@@ -564,7 +580,8 @@ t_aggreg_upload(Config) ->
                     #{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}
                 ),
             %% Check the uploaded objects.
-            wait_until_processed(Config, ActionResId, BeginMark),
+            ExpectedNumFiles = 2,
+            wait_until_processed(Config, ActionResId, BeginMark, ExpectedNumFiles),
             Rows = get_all_rows(Config),
             [
                 P1Hex,
@@ -1032,6 +1049,118 @@ t_aggreg_invalid_column_values(Config0) ->
     ),
     ok.
 
+t_aggreg_inexistent_database(init, Config) when is_list(Config) ->
+    t_aggreg_inexistent_database(init, maps:from_list(Config));
+t_aggreg_inexistent_database(init, #{mock := true} = Config) ->
+    Mod = ?CONN_MOD,
+    meck:expect(Mod, do_stage_file, fun(
+        _ConnPid, _Filename, _Database, _Schema, _Stage, _ActionName
+    ) ->
+        Msg =
+            "SQL compilation error:, Database 'INEXISTENT' does not"
+            " exist or not authorized. SQLSTATE IS: 02000",
+        {error, Msg}
+    end),
+    maps:to_list(Config);
+t_aggreg_inexistent_database(init, #{} = Config) ->
+    maps:to_list(Config).
+t_aggreg_inexistent_database(Config) ->
+    ?check_trace(
+        emqx_bridge_v2_testlib:snk_timetrap(),
+        begin
+            {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(
+                Config,
+                #{<<"parameters">> => #{<<"database">> => <<"inexistent">>}}
+            ),
+            ActionResId = emqx_bridge_v2_testlib:bridge_id(Config),
+            %% BeginMark = get_begin_mark(Config, ActionResId),
+            {ok, _Rule} =
+                emqx_bridge_v2_testlib:create_rule_and_action_http(
+                    ?ACTION_TYPE_BIN, <<"">>, Config, #{
+                        sql => sql1()
+                    }
+                ),
+            Messages1 = lists:map(fun mk_message/1, [
+                {<<"C1">>, <<"sf/a/b/c">>, <<"{\"hello\":\"world\"}">>},
+                {<<"C2">>, <<"sf/foo/bar">>, <<"baz">>},
+                {<<"C3">>, <<"sf/t/42">>, <<"">>}
+            ]),
+            ok = publish_messages(Messages1),
+            %% Wait until the insert files request fails
+            ct:pal("waiting for delivery to fail..."),
+            ?block_until(#{?snk_kind := "aggregated_buffer_delivery_failed"}),
+            %% When channel health check happens, we check aggregator for errors.
+            %% Current implementation will mark the action as unhealthy.
+            ct:pal("waiting for delivery failure to be noticed by health check..."),
+            ?block_until(#{?snk_kind := "snowflake_check_aggreg_upload_error_found"}),
+
+            ?retry(
+                _Sleep = 500,
+                _Retries = 10,
+                ?assertMatch(
+                    {200, #{
+                        <<"error">> :=
+                            <<"{unhealthy_target,", _/binary>>
+                    }},
+                    emqx_bridge_v2_testlib:simplify_result(
+                        emqx_bridge_v2_testlib:get_action_api(Config)
+                    )
+                )
+            ),
+
+            ?assertEqual(3, emqx_resource_metrics:matched_get(ActionResId)),
+            %% Currently, failure metrics are not bumped when aggregated uploads fail
+            ?assertEqual(0, emqx_resource_metrics:failed_get(ActionResId)),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
+%% Checks that we detect early that the configured snowpipe user does not have the proper
+%% credentials (or does not exist) for accessing Snowpipe's REST API.
+t_wrong_snowpipe_user(init, Config) when is_list(Config) ->
+    t_wrong_snowpipe_user(init, maps:from_list(Config));
+t_wrong_snowpipe_user(init, #{mock := true} = Config) ->
+    Mod = ?CONN_MOD,
+    InsertReportResponse = #{
+        <<"code">> => <<"390144">>,
+        <<"data">> => null,
+        <<"headers">> => null,
+        <<"message">> => <<"JWT token is invalid. [92d86b2e-d652-4d2d-9780-a6ed28b38356]">>,
+        <<"success">> => false
+    },
+    meck:expect(Mod, do_insert_report_request, fun(_HTTPPool, _Req, _RequestTTL, _MaxRetries) ->
+        Headers = [],
+        Body = emqx_utils_json:encode(InsertReportResponse),
+        {ok, 401, Headers, Body}
+    end),
+    maps:to_list(Config);
+t_wrong_snowpipe_user(init, #{} = Config) ->
+    maps:to_list(Config).
+t_wrong_snowpipe_user(Config) ->
+    ?check_trace(
+        emqx_bridge_v2_testlib:snk_timetrap(),
+        begin
+            {ok, _} = emqx_bridge_v2_testlib:create_connector_api(Config),
+            ?assertMatch(
+                {ok,
+                    {{_, 201, _}, _, #{
+                        <<"status">> := <<"disconnected">>,
+                        <<"error">> := <<"{unhealthy_target,", _/binary>>
+                    }}},
+                emqx_bridge_v2_testlib:create_kind_api(
+                    Config,
+                    #{<<"parameters">> => #{<<"pipe_user">> => <<"idontexist">>}}
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %% Todo: test scenarios
 %% * User error in rule definition; e.g.:
 %%    - forgot to use `bin2hexstr' to encode the payload

+ 53 - 0
apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_tests.erl

@@ -0,0 +1,53 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_snowflake_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("src/emqx_bridge_snowflake.hrl").
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+-define(CONNECTOR_NAME, <<"my_connector">>).
+
+parse_and_check_connector(InnerConfig) ->
+    emqx_bridge_v2_testlib:parse_and_check_connector(
+        ?CONNECTOR_TYPE_BIN,
+        ?CONNECTOR_NAME,
+        InnerConfig
+    ).
+
+connector_config(Overrides) ->
+    Base = emqx_bridge_snowflake_SUITE:connector_config(
+        ?CONNECTOR_NAME,
+        <<"orgid-accountid">>,
+        <<"orgid-accountid.snowflakecomputing.com">>,
+        <<"odbcuser">>,
+        <<"odbcpass">>
+    ),
+    emqx_utils_maps:deep_merge(Base, Overrides).
+
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
+
+validation_test_() ->
+    [
+        {"good config", ?_assertMatch(#{}, parse_and_check_connector(connector_config(#{})))},
+        {"account must contain org id and account id",
+            ?_assertThrow(
+                {_SchemaMod, [
+                    #{
+                        reason := <<"Account identifier must be of form ORGID-ACCOUNTNAME">>,
+                        kind := validation_error
+                    }
+                ]},
+                parse_and_check_connector(
+                    connector_config(#{
+                        <<"account">> => <<"onlyaccid">>
+                    })
+                )
+            )}
+    ].

+ 8 - 4
apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl

@@ -158,7 +158,7 @@ process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0})
             Delivery#delivery{transfer = Transfer};
         {error, Reason} ->
             %% Todo: handle more gracefully?  Retry?
-            error({transfer_failed, Reason})
+            exit({upload_failed, Reason})
     end.
 
 process_complete(#delivery{id = Id, empty = true}) ->
@@ -169,9 +169,13 @@ process_complete(#delivery{
 }) ->
     Trailer = emqx_connector_aggreg_csv:close(Container),
     Transfer = Mod:process_append(Trailer, Transfer0),
-    {ok, Completed} = Mod:process_complete(Transfer),
-    ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}),
-    ok.
+    case Mod:process_complete(Transfer) of
+        {ok, Completed} ->
+            ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}),
+            ok;
+        {error, Error} ->
+            exit({upload_failed, Error})
+    end.
 
 %%
 

+ 1 - 2
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl

@@ -415,8 +415,7 @@ handle_delivery_exit(Buffer, {shutdown, {skipped, Reason}}, St = #st{name = Name
     ok = discard_buffer(Buffer),
     St;
 handle_delivery_exit(Buffer, Error, St = #st{name = Name}) ->
-    ?SLOG(error, #{
-        msg => "aggregated_buffer_delivery_failed",
+    ?tp(error, "aggregated_buffer_delivery_failed", #{
         action => Name,
         buffer => {Buffer#buffer.since, Buffer#buffer.seq},
         filename => Buffer#buffer.filename,

+ 2 - 0
apps/emqx_mix_utils/lib/mix/tasks/emqx.eunit.ex

@@ -27,6 +27,8 @@ defmodule Mix.Tasks.Emqx.Eunit do
     |> String.replace_suffix("-test", "")
     |> then(& System.put_env("PROFILE", &1))
 
+    :emqx_common_test_helpers.clear_screen()
+
     args
     |> parse_args!()
     |> discover_tests()

+ 3 - 0
apps/emqx_schema_registry/docker-ct

@@ -0,0 +1,3 @@
+kdc
+kafka
+schema-registry

+ 1 - 0
apps/emqx_schema_registry/include/emqx_schema_registry.hrl

@@ -6,6 +6,7 @@
 -define(EMQX_SCHEMA_REGISTRY_HRL, true).
 
 -define(CONF_KEY_ROOT, schema_registry).
+-define(CONF_KEY_ROOT_BIN, <<"schema_registry">>).
 -define(CONF_KEY_PATH, [?CONF_KEY_ROOT]).
 
 %% Note: this has the `_ee_' segment for backwards compatibility.

+ 1 - 0
apps/emqx_schema_registry/mix.exs

@@ -29,6 +29,7 @@ defmodule EMQXSchemaRegistry.MixProject do
       UMP.common_dep(:erlavro),
       UMP.common_dep(:jesse),
       UMP.common_dep(:gpb, runtime: true),
+      {:avlizer, github: "emqx/avlizer", tag: "0.5.1.1"},
     ]
   end
 end

+ 2 - 1
apps/emqx_schema_registry/rebar.config

@@ -7,7 +7,8 @@
     {emqx_rule_engine, {path, "../emqx_rule_engine"}},
     {erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}},
     {jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.8.0.1"}}},
-    {gpb, "4.19.9"}
+    {gpb, "4.19.9"},
+    {avlizer, {git, "https://github.com/emqx/avlizer.git", {tag, "0.5.1.1"}}}
 ]}.
 
 {shell, [

+ 1 - 0
apps/emqx_schema_registry/src/emqx_schema_registry.app.src

@@ -12,6 +12,7 @@
         erlavro,
         gpb,
         jesse,
+        avlizer,
         emqx
     ]},
     {env, []},

+ 4 - 86
apps/emqx_schema_registry/src/emqx_schema_registry.erl

@@ -4,8 +4,6 @@
 -module(emqx_schema_registry).
 
 -behaviour(gen_server).
--behaviour(emqx_config_handler).
--behaviour(emqx_config_backup).
 
 -include("emqx_schema_registry.hrl").
 -include_lib("emqx/include/logger.hrl").
@@ -31,12 +29,11 @@
     terminate/2
 ]).
 
-%% `emqx_config_handler' API
--export([post_config_update/5]).
-
-%% Data backup
+%% Internal exports for `emqx_schema_registry_config'
 -export([
-    import_config/1
+    async_delete_serdes/1,
+    ensure_serde_absent/1,
+    build_serdes/1
 ]).
 
 %% for testing
@@ -131,85 +128,6 @@ delete_schema(Name) ->
 list_schemas() ->
     emqx_config:get([?CONF_KEY_ROOT, schemas], #{}).
 
-%%-------------------------------------------------------------------------------------------------
-%% `emqx_config_handler' API
-%%-------------------------------------------------------------------------------------------------
-%% remove
-post_config_update(
-    [?CONF_KEY_ROOT, schemas, Name],
-    '$remove',
-    _NewSchemas,
-    _OldSchemas,
-    _AppEnvs
-) ->
-    async_delete_serdes([Name]),
-    ok;
-%% add or update
-post_config_update(
-    [?CONF_KEY_ROOT, schemas, NewName],
-    _Cmd,
-    NewSchema,
-    OldSchema,
-    _AppEnvs
-) ->
-    case OldSchema of
-        undefined ->
-            ok;
-        _ ->
-            ensure_serde_absent(NewName)
-    end,
-    case build_serdes([{NewName, NewSchema}]) of
-        ok ->
-            {ok, #{NewName => NewSchema}};
-        {error, Reason, SerdesToRollback} ->
-            lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
-            {error, Reason}
-    end;
-post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, OldConf, _AppEnvs) ->
-    OldSchemas = maps:get(schemas, OldConf, #{}),
-    #{
-        added := Added,
-        changed := Changed0,
-        removed := Removed
-    } = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas),
-    Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0),
-    RemovedNames = maps:keys(Removed),
-    case RemovedNames of
-        [] ->
-            ok;
-        _ ->
-            async_delete_serdes(RemovedNames)
-    end,
-    SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
-    ok = lists:foreach(fun ensure_serde_absent/1, [N || {N, _} <- SchemasToBuild]),
-    case build_serdes(SchemasToBuild) of
-        ok ->
-            {ok, NewConf};
-        {error, Reason, SerdesToRollback} ->
-            lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
-            {error, Reason}
-    end;
-post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
-    {ok, NewConf}.
-
-%%-------------------------------------------------------------------------------------------------
-%% Data backup
-%%-------------------------------------------------------------------------------------------------
-
-import_config(#{<<"schema_registry">> := #{<<"schemas">> := Schemas} = SchemaRegConf}) ->
-    OldSchemas = emqx:get_raw_config([?CONF_KEY_ROOT, schemas], #{}),
-    SchemaRegConf1 = SchemaRegConf#{<<"schemas">> => maps:merge(OldSchemas, Schemas)},
-    case emqx_conf:update(?CONF_KEY_PATH, SchemaRegConf1, #{override_to => cluster}) of
-        {ok, #{raw_config := #{<<"schemas">> := NewRawSchemas}}} ->
-            Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawSchemas, OldSchemas)),
-            ChangedPaths = [[?CONF_KEY_ROOT, schemas, Name] || Name <- maps:keys(Changed)],
-            {ok, #{root_key => ?CONF_KEY_ROOT, changed => ChangedPaths}};
-        Error ->
-            {error, #{root_key => ?CONF_KEY_ROOT, reason => Error}}
-    end;
-import_config(_RawConf) ->
-    {ok, #{root_key => ?CONF_KEY_ROOT, changed => []}}.
-
 %%-------------------------------------------------------------------------------------------------
 %% `gen_server' API
 %%-------------------------------------------------------------------------------------------------

+ 2 - 6
apps/emqx_schema_registry/src/emqx_schema_registry_app.erl

@@ -14,13 +14,9 @@ start(_StartType, _StartArgs) ->
     %% and encode functions called from the rule engine SQL like language
     ok = emqx_rule_engine:set_extra_functions_module(emqx_schema_registry_serde),
     ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
-    %% HTTP API handler
-    emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_schema_registry),
-    %% Conf load / data import handler
-    emqx_conf:add_handler(?CONF_KEY_PATH, emqx_schema_registry),
+    ok = emqx_schema_registry_config:add_handlers(),
     emqx_schema_registry_sup:start_link().
 
 stop(_State) ->
-    emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']),
-    emqx_conf:remove_handler(?CONF_KEY_PATH),
+    ok = emqx_schema_registry_config:remove_handlers(),
     ok.

+ 284 - 0
apps/emqx_schema_registry/src/emqx_schema_registry_config.erl

@@ -0,0 +1,284 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_schema_registry_config).
+
+-feature(maybe_expr, enable).
+
+-include("emqx_schema_registry.hrl").
+
+%% API
+-export([
+    add_handlers/0,
+    remove_handlers/0,
+
+    list_external_registries/0,
+    list_external_registries_raw/0,
+    lookup_external_registry_raw/1,
+
+    upsert_external_registry/2,
+    delete_external_registry/1
+]).
+
+%% `emqx_config_handler' API
+-export([post_config_update/5]).
+
+%% `emqx_config_backup' API
+-behaviour(emqx_config_backup).
+-export([import_config/1]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-define(SCHEMA_CONF_PATH(NAME), [?CONF_KEY_ROOT, schemas, NAME]).
+-define(EXTERNAL_REGISTRY_CONF_PATH(NAME), [?CONF_KEY_ROOT, external, NAME]).
+
+-define(EXTERNAL_REGISTRIES_CONF_PATH, [?CONF_KEY_ROOT, external]).
+-define(EXTERNAL_REGISTRIES_CONF_PATH_BIN, [?CONF_KEY_ROOT, <<"external">>]).
+-define(EXTERNAL_REGISTRIES_CONF_PATH_BIN(NAME), [?CONF_KEY_ROOT, <<"external">>, NAME]).
+
+-type external_registry_name() :: binary().
+-type external_registry_raw() :: #{binary() => term()}.
+
+-type external_registry() :: external_registry_confluent().
+
+-type external_registry_confluent() :: #{type := confluent}.
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec add_handlers() -> ok.
+add_handlers() ->
+    %% HTTP API handlers
+    ok = emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], ?MODULE),
+    ok = emqx_conf:add_handler([?CONF_KEY_ROOT, external, '?'], ?MODULE),
+    %% Conf load / data import handler
+    ok = emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE),
+    ok.
+
+-spec remove_handlers() -> ok.
+remove_handlers() ->
+    ok = emqx_conf:remove_handler([?CONF_KEY_ROOT, external, '?']),
+    ok = emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']),
+    ok = emqx_conf:remove_handler(?CONF_KEY_PATH),
+    ok.
+
+-spec list_external_registries() ->
+    #{atom() => external_registry()}.
+list_external_registries() ->
+    emqx:get_config(?EXTERNAL_REGISTRIES_CONF_PATH, #{}).
+
+-spec list_external_registries_raw() ->
+    #{external_registry_name() => external_registry_raw()}.
+list_external_registries_raw() ->
+    emqx:get_raw_config(?EXTERNAL_REGISTRIES_CONF_PATH_BIN, #{}).
+
+-spec lookup_external_registry_raw(external_registry_name()) ->
+    {ok, external_registry_raw()} | {error, not_found}.
+lookup_external_registry_raw(Name) ->
+    case emqx:get_raw_config(?EXTERNAL_REGISTRIES_CONF_PATH_BIN(Name), undefined) of
+        undefined ->
+            {error, not_found};
+        Registry ->
+            {ok, Registry}
+    end.
+
+-spec upsert_external_registry(external_registry_name(), external_registry_raw()) ->
+    {ok, external_registry_raw()} | {error, term()}.
+upsert_external_registry(Name, RegistryRaw) ->
+    case
+        emqx_conf:update(
+            ?EXTERNAL_REGISTRY_CONF_PATH(Name),
+            RegistryRaw,
+            #{override_to => cluster}
+        )
+    of
+        {ok, #{raw_config := NewRegistryRaw}} ->
+            {ok, NewRegistryRaw};
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec delete_external_registry(external_registry_name()) ->
+    ok | {error, term()}.
+delete_external_registry(Name) ->
+    case emqx_conf:remove(?EXTERNAL_REGISTRY_CONF_PATH(Name), #{override_to => cluster}) of
+        {ok, _} ->
+            ok;
+        {error, _} = Error ->
+            Error
+    end.
+
+%%------------------------------------------------------------------------------
+%% `emqx_config_handler' API
+%%------------------------------------------------------------------------------
+
+%% remove schema
+post_config_update(
+    ?SCHEMA_CONF_PATH(Name),
+    '$remove',
+    _NewSchemas,
+    _OldSchemas,
+    _AppEnvs
+) ->
+    emqx_schema_registry:async_delete_serdes([Name]),
+    ok;
+%% add or update schema
+post_config_update(
+    ?SCHEMA_CONF_PATH(NewName),
+    _Cmd,
+    NewSchema,
+    OldSchema,
+    _AppEnvs
+) ->
+    case OldSchema of
+        undefined ->
+            ok;
+        _ ->
+            emqx_schema_registry:ensure_serde_absent(NewName)
+    end,
+    case emqx_schema_registry:build_serdes([{NewName, NewSchema}]) of
+        ok ->
+            {ok, #{NewName => NewSchema}};
+        {error, Reason, SerdesToRollback} ->
+            lists:foreach(fun emqx_schema_registry:ensure_serde_absent/1, SerdesToRollback),
+            {error, Reason}
+    end;
+%% remove external registry
+post_config_update(
+    ?EXTERNAL_REGISTRY_CONF_PATH(Name),
+    '$remove',
+    _New,
+    _Old,
+    _AppEnvs
+) ->
+    remove_external_registry(Name),
+    ok;
+%% add or update external registry
+post_config_update(
+    ?EXTERNAL_REGISTRY_CONF_PATH(Name),
+    _Cmd,
+    NewConfig,
+    _Old,
+    _AppEnvs
+) ->
+    do_upsert_external_registry(Name, NewConfig),
+    ok;
+post_config_update([?CONF_KEY_ROOT], _Cmd, NewConf, OldConf, _AppEnvs) ->
+    Context0 = #{},
+    maybe
+        {ok, Context1} ?= handle_import_schemas(Context0, NewConf, OldConf),
+        {ok, _Context2} ?= handle_import_external_registries(Context1, NewConf, OldConf)
+    end;
+post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
+    {ok, NewConf}.
+
+%%------------------------------------------------------------------------------
+%% `emqx_config_backup' API
+%%------------------------------------------------------------------------------
+
+import_config(#{?CONF_KEY_ROOT_BIN := RawConf0}) ->
+    Result = emqx_conf:update(
+        [?CONF_KEY_ROOT],
+        RawConf0,
+        #{override_to => cluster, rawconf_with_defaults => true}
+    ),
+    case Result of
+        {error, Reason} ->
+            {error, #{root_key => ?CONF_KEY_ROOT, reason => Reason}};
+        {ok, Res} ->
+            ChangedPaths = emqx_utils_maps:deep_get(
+                [post_config_update, ?MODULE, changed_paths],
+                Res,
+                []
+            ),
+            {ok, #{root_key => ?CONF_KEY_ROOT, changed => ChangedPaths}}
+    end;
+import_config(_RawConf) ->
+    {ok, #{root_key => ?CONF_KEY_ROOT, changed => []}}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+add_external_registry(Name, Config) ->
+    ok = emqx_schema_registry_external:add(Name, Config),
+    ok.
+
+remove_external_registry(Name) ->
+    ok = emqx_schema_registry_external:remove(Name),
+    ok.
+
+%% receives parsed config with atom keys
+do_upsert_external_registry(Name, NewConfig) ->
+    remove_external_registry(Name),
+    add_external_registry(Name, NewConfig),
+    ok.
+
+handle_import_schemas(Context0, #{schemas := NewSchemas}, OldConf) ->
+    OldSchemas = maps:get(schemas, OldConf, #{}),
+    #{
+        added := Added,
+        changed := Changed0,
+        removed := Removed
+    } = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas),
+    Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0),
+    RemovedNames = maps:keys(Removed),
+    case RemovedNames of
+        [] ->
+            ok;
+        _ ->
+            emqx_schema_registry:async_delete_serdes(RemovedNames)
+    end,
+    SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
+    ok = lists:foreach(fun emqx_schema_registry:ensure_serde_absent/1, [
+        N
+     || {N, _} <- SchemasToBuild
+    ]),
+    case emqx_schema_registry:build_serdes(SchemasToBuild) of
+        ok ->
+            ChangedPaths = [
+                [?CONF_KEY_ROOT, schemas, N]
+             || {N, _} <- SchemasToBuild
+            ],
+            Context = maps:update_with(
+                changed_paths,
+                fun(Ps) -> ChangedPaths ++ Ps end,
+                ChangedPaths,
+                Context0
+            ),
+            {ok, Context};
+        {error, Reason, SerdesToRollback} ->
+            lists:foreach(fun emqx_schema_registry:ensure_serde_absent/1, SerdesToRollback),
+            {error, Reason}
+    end.
+
+handle_import_external_registries(Context0, NewConf, OldConf) ->
+    New = maps:get(external, NewConf, #{}),
+    Old = maps:get(external, OldConf, #{}),
+    #{
+        added := Added,
+        changed := Changed0,
+        removed := Removed
+    } = emqx_utils_maps:diff_maps(New, Old),
+    Changed = maps:map(fun(_N, {_Old, New0}) -> New0 end, Changed0),
+    RemovedNames = maps:keys(Removed),
+    RegistriesToUpsert = maps:to_list(maps:merge(Changed, Added)),
+    lists:foreach(fun remove_external_registry/1, RemovedNames),
+    lists:foreach(
+        fun({Name, Cfg}) -> do_upsert_external_registry(Name, Cfg) end,
+        RegistriesToUpsert
+    ),
+    ChangedPaths = [
+        [?CONF_KEY_ROOT, external, N]
+     || {N, _} <- RegistriesToUpsert
+    ],
+    Context = maps:update_with(
+        changed_paths,
+        fun(Ps) -> ChangedPaths ++ Ps end,
+        ChangedPaths,
+        Context0
+    ),
+    {ok, Context}.

+ 373 - 0
apps/emqx_schema_registry/src/emqx_schema_registry_external.erl

@@ -0,0 +1,373 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_schema_registry_external).
+
+-feature(maybe_expr, enable).
+
+-behaviour(gen_server).
+
+-include("emqx_schema_registry.hrl").
+
+%% API
+-export([
+    start_link/0,
+
+    add/2,
+    remove/1,
+
+    encode/4,
+    encode_with/6,
+    decode/4
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-define(EXTERNAL_REGISTRY_TAB, emqx_schema_registry_external).
+
+-define(NAME(RegistryName), {n, l, {?MODULE, RegistryName}}).
+-define(REF(RegistryName), {via, gproc, ?NAME(RegistryName)}).
+
+-define(bad_registry_credentials, bad_registry_credentials).
+-define(schema_not_found, schema_not_found).
+-define(bad_input, bad_input).
+-define(bad_schema_source, bad_schema_source).
+-define(external_registry_unavailable, external_registry_unavailable).
+
+-type registry_name() :: atom() | binary().
+-type encode_opts() :: #{
+    %% Default: false
+    tag => boolean()
+}.
+-type decode_opts() :: #{}.
+-type data() :: term().
+-type encoded() :: binary().
+-type arg() :: term().
+-type schema_source_hash() :: binary().
+
+%% call/cast/info events
+-record(ensure_registered, {
+    name :: registry_name(),
+    our_schema_name :: schema_name(),
+    source :: schema_source(),
+    hash :: schema_source_hash(),
+    args :: [arg()]
+}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+add(Name, Config) ->
+    ChildSpec = worker_child_spec(Name, Config),
+    ok = emqx_schema_registry_sup:ensure_external_registry_worker_started(ChildSpec),
+    ok = initialize_registry_context(Name, Config),
+    ok.
+
+remove(Name) ->
+    ok = emqx_schema_registry_sup:ensure_external_registry_worker_absent(Name),
+    ok = deinitialize_registry_context(Name),
+    ok.
+
+-spec encode(registry_name(), data(), [arg()], encode_opts()) ->
+    {ok, encoded()} | {error, term()}.
+encode(Name, Data, Args, Opts) ->
+    with_registry(Name, fun(Context) ->
+        do_encode(Name, Context, Data, Args, Opts)
+    end).
+
+-spec encode_with(registry_name(), schema_name(), schema_source(), data(), [arg()], encode_opts()) ->
+    {ok, encoded()} | {error, term()}.
+encode_with(Name, OurSchemaName, Source, Data, Args, Opts) ->
+    with_registry(Name, fun(Context) ->
+        maybe
+            {ok, Id} ?= ensure_registered(Name, Context, OurSchemaName, Source, Args),
+            do_encode(Name, Context, Data, [Id | Args], Opts)
+        end
+    end).
+
+-spec decode(registry_name(), encoded(), [arg()], decode_opts()) ->
+    {ok, data()} | {error, term()}.
+decode(Name, Data, Args, Opts) ->
+    with_registry(Name, fun(Context) ->
+        do_decode(Name, Context, Data, Args, Opts)
+    end).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_) ->
+    process_flag(trap_exit, true),
+    create_tables(),
+    State = #{},
+    {ok, State}.
+
+terminate(_Reason, _State) ->
+    ets:foldr(
+        fun
+            ({_Id, #{on_remove := OnRemove}}, Acc) when is_function(OnRemove, 0) ->
+                _ = OnRemove(),
+                Acc;
+            (_, Acc) ->
+                Acc
+        end,
+        ok,
+        ?EXTERNAL_REGISTRY_TAB
+    ).
+
+handle_call(#ensure_registered{} = Req, _From, State) ->
+    handle_ensure_registered(Req, State);
+handle_call(Call, _From, State) ->
+    {reply, {error, {unknown_call, Call}}, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns (`gen_server')
+%%------------------------------------------------------------------------------
+
+create_tables() ->
+    ok = emqx_utils_ets:new(?EXTERNAL_REGISTRY_TAB, [set, public]),
+    ok.
+
+handle_ensure_registered(Req, State) ->
+    #ensure_registered{
+        name = Name,
+        our_schema_name = OurSchemaName,
+        source = Source,
+        hash = SourceHash,
+        args = Args
+    } = Req,
+    Res = with_registry(Name, fun(Context) ->
+        %% Race: another call already registered it
+        maybe
+            {ok, _Id} ?= find_cached_schema_id(Context, OurSchemaName, SourceHash)
+        else
+            error ->
+                do_register(Name, Context, OurSchemaName, Source, SourceHash, Args)
+        end
+    end),
+    {reply, Res, State}.
+
+do_register(Name, #{type := confluent} = Context, OurSchemaName, Source, SourceHash, [Subject]) ->
+    maybe
+        {ok, Id} ?= confluent_register_schema(Name, Subject, Source),
+        NewContext = add_cached_schema_id(Context, OurSchemaName, SourceHash, Id),
+        ets:insert(?EXTERNAL_REGISTRY_TAB, {id(Name), NewContext}),
+        {ok, Id}
+    end.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+%%===========================
+%% Confluent
+%%===========================
+
+avlizer_auth(#{auth := none}) ->
+    undefined;
+avlizer_auth(#{auth := #{} = Opts}) ->
+    #{
+        mechanism := Mechanism,
+        username := Username,
+        password := Password
+    } = Opts,
+    {Mechanism, emqx_utils_conv:str(Username), Password}.
+
+confluent_make_encoder(Name, CacheTable, SchemaId) ->
+    try
+        {ok, avlizer_confluent:make_encoder2(?REF(id(Name)), CacheTable, SchemaId)}
+    catch
+        error:{bad_http_code, 404} ->
+            {error, ?schema_not_found};
+        error:{bad_http_code, 401} ->
+            {error, ?bad_registry_credentials}
+    end.
+
+confluent_encode(Encoder, Data) ->
+    try
+        {ok, avlizer_confluent:encode(Encoder, Data)}
+    catch
+        _:_ ->
+            {error, ?bad_input}
+    end.
+
+confluent_untag(Data) ->
+    try
+        {ok, avlizer_confluent:untag_data(Data)}
+    catch
+        _:_ ->
+            {error, ?bad_input}
+    end.
+
+confluent_make_decoder(Name, CacheTable, SchemaId) ->
+    try
+        {ok, avlizer_confluent:make_decoder2(?REF(id(Name)), CacheTable, SchemaId)}
+    catch
+        error:{bad_http_code, 404} ->
+            {error, ?schema_not_found};
+        error:{bad_http_code, 401} ->
+            {error, ?bad_registry_credentials}
+    end.
+
+confluent_decode(Decoder, Data) ->
+    try
+        {ok, avlizer_confluent:decode(Decoder, Data)}
+    catch
+        _:_ ->
+            {error, bad_input}
+    end.
+
+confluent_register_schema(Name, Subject0, Source) ->
+    Subject = emqx_utils_conv:str(Subject0),
+    case avlizer_confluent:register_schema(?REF(id(Name)), Subject, Source) of
+        {ok, Id} ->
+            {ok, Id};
+        {error, {bad_http_code, 401}} ->
+            {error, ?bad_registry_credentials};
+        {error, {bad_http_code, 409}} ->
+            {error, ?bad_schema_source};
+        {error, {bad_http_code, 422}} ->
+            {error, ?bad_schema_source};
+        {error, {bad_http_code, 500}} ->
+            {error, ?external_registry_unavailable};
+        {error, _} = Error ->
+            Error
+    end.
+
+%%===========================
+%% Misc
+%%===========================
+
+with_registry(Name, Fn) ->
+    Id = id(Name),
+    case ets:lookup(?EXTERNAL_REGISTRY_TAB, Id) of
+        [{Id, Context}] ->
+            Fn(Context);
+        [] ->
+            {error, registry_not_found}
+    end.
+
+worker_child_spec(Name, #{type := confluent} = Config) ->
+    Id = id(Name),
+    Opts = external_registry_worker_config(Config),
+    #{
+        id => Name,
+        start => {avlizer_confluent, start_link, [?REF(Id), Opts]},
+        restart => permanent,
+        shutdown => 5_000,
+        type => worker
+    }.
+
+external_registry_worker_config(#{type := confluent} = Config) ->
+    #{url := URL} = Config,
+    MAuth = avlizer_auth(Config),
+    emqx_utils_maps:put_if(
+        #{url => emqx_utils_conv:str(URL)},
+        auth,
+        MAuth,
+        MAuth =/= undefined
+    ).
+
+id(Name) ->
+    emqx_utils_conv:bin(Name).
+
+initialize_registry_context(Name, #{type := confluent = Type}) ->
+    Id = id(Name),
+    CacheTable = avlizer_confluent:get_table(?REF(Id)),
+    Context = #{type => Type, cache_table => CacheTable},
+    Id = id(Name),
+    true = ets:insert(?EXTERNAL_REGISTRY_TAB, {Id, Context}),
+    ok.
+
+deinitialize_registry_context(Name) ->
+    Id = id(Name),
+    case ets:take(?EXTERNAL_REGISTRY_TAB, Id) of
+        [{Id, #{on_remove := OnRemove}}] when is_function(OnRemove, 0) ->
+            OnRemove(),
+            ok;
+        _ ->
+            ok
+    end.
+
+do_decode(Name, #{type := confluent} = Context, Data, [SchemaId], _Opts) ->
+    %% Tag provided
+    #{cache_table := CacheTable} = Context,
+    maybe
+        true ?= is_integer(SchemaId) orelse {error, {bad_schema_id, SchemaId}},
+        {ok, Decoder} ?= confluent_make_decoder(Name, CacheTable, SchemaId),
+        confluent_decode(Decoder, Data)
+    end;
+do_decode(Name, #{type := confluent} = Context, Data0, [], _Opts) ->
+    %% Data is tagged.
+    #{cache_table := CacheTable} = Context,
+    maybe
+        {ok, {SchemaId, Data}} ?= confluent_untag(Data0),
+        {ok, Decoder} ?= confluent_make_decoder(Name, CacheTable, SchemaId),
+        confluent_decode(Decoder, Data)
+    end.
+
+do_encode(Name, #{type := confluent} = Context, Data, [SchemaId | _MaybeSubject], Opts) ->
+    %% assert
+    true = is_integer(SchemaId),
+    #{cache_table := CacheTable} = Context,
+    ShouldTag = maps:get(tag, Opts, false),
+    maybe
+        true ?= is_integer(SchemaId) orelse {error, {bad_schema_id, SchemaId}},
+        {ok, Encoder} ?= confluent_make_encoder(Name, CacheTable, SchemaId),
+        {ok, Encoded} ?= confluent_encode(Encoder, Data),
+        case ShouldTag of
+            true ->
+                {ok, avlizer_confluent:tag_data(SchemaId, Encoded)};
+            false ->
+                {ok, Encoded}
+        end
+    end.
+
+hash_source(Source) when is_binary(Source) ->
+    erlang:md5(Source).
+
+find_cached_schema_id(Context, OurSchemaName, SourceHash) ->
+    case Context of
+        #{cache := #{OurSchemaName := #{SourceHash := Id}}} ->
+            {ok, Id};
+        _ ->
+            error
+    end.
+
+add_cached_schema_id(Context, OurSchemaName, SourceHash, Id) ->
+    %% Drops any stale source hash already in context
+    Cached = #{SourceHash => Id},
+    emqx_utils_maps:deep_put([cache, OurSchemaName], Context, Cached).
+
+ensure_registered(Name, Context, OurSchemaName, Source, Args) ->
+    SourceHash = hash_source(Source),
+    maybe
+        {ok, _Id} ?= find_cached_schema_id(Context, OurSchemaName, SourceHash)
+    else
+        error ->
+            Req = #ensure_registered{
+                name = Name,
+                our_schema_name = OurSchemaName,
+                source = Source,
+                hash = SourceHash,
+                args = Args
+            },
+            gen_server:call(?MODULE, Req, infinity)
+    end.

+ 223 - 2
apps/emqx_schema_registry/src/emqx_schema_registry_http_api.erl

@@ -19,7 +19,9 @@
 
 -export([
     '/schema_registry'/2,
-    '/schema_registry/:name'/2
+    '/schema_registry/:name'/2,
+    '/schema_registry_external'/2,
+    '/schema_registry_external/registry/:name'/2
 ]).
 
 -define(TAGS, [<<"Schema Registry">>]).
@@ -36,7 +38,9 @@ api_spec() ->
 paths() ->
     [
         "/schema_registry",
-        "/schema_registry/:name"
+        "/schema_registry/:name",
+        "/schema_registry_external",
+        "/schema_registry_external/registry/:name"
     ].
 
 schema("/schema_registry") ->
@@ -125,6 +129,91 @@ schema("/schema_registry/:name") ->
                     404 => error_schema('NOT_FOUND', "Schema not found")
                 }
         }
+    };
+schema("/schema_registry_external") ->
+    #{
+        'operationId' => '/schema_registry_external',
+        get => #{
+            tags => ?TAGS,
+            summary => <<"List external registries">>,
+            description => ?DESC("external_registry_list"),
+            responses =>
+                #{
+                    200 =>
+                        emqx_dashboard_swagger:schema_with_examples(
+                            hoconsc:map(
+                                name, emqx_schema_registry_schema:external_registries_type()
+                            ),
+                            #{
+                                sample =>
+                                    #{value => sample_list_external_registries_response()}
+                            }
+                        )
+                }
+        },
+        post => #{
+            tags => ?TAGS,
+            summary => <<"Add a new external registry">>,
+            description => ?DESC("external_registry_create"),
+            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+                hoconsc:union(fun create_external_registry_union/1),
+                post_examples()
+            ),
+            responses =>
+                #{
+                    201 =>
+                        emqx_dashboard_swagger:schema_with_examples(
+                            emqx_schema_registry_schema:external_registry_type(),
+                            post_examples()
+                        ),
+                    400 => error_schema('ALREADY_EXISTS', "Schema already exists")
+                }
+        }
+    };
+schema("/schema_registry_external/registry/:name") ->
+    #{
+        'operationId' => '/schema_registry_external/registry/:name',
+        get => #{
+            tags => ?TAGS,
+            summary => <<"Lookup external registry">>,
+            description => ?DESC("external_registry_lookup"),
+            parameters => [param_path_external_registry_name()],
+            responses =>
+                #{
+                    200 =>
+                        emqx_dashboard_swagger:schema_with_examples(
+                            emqx_schema_registry_schema:api_schema("get"),
+                            get_examples()
+                        ),
+                    404 => error_schema('NOT_FOUND', "Schema not found")
+                }
+        },
+        put => #{
+            tags => ?TAGS,
+            summary => <<"Update external registry">>,
+            description => ?DESC("external_registry_update"),
+            parameters => [param_path_external_registry_name()],
+            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+                emqx_schema_registry_schema:external_registry_type(),
+                create_external_registry_input_examples()
+            ),
+            responses =>
+                #{
+                    200 =>
+                        emqx_dashboard_swagger:schema_with_examples(
+                            emqx_schema_registry_schema:api_schema("put"),
+                            put_examples()
+                        ),
+                    404 => error_schema('NOT_FOUND', "Schema not found")
+                }
+        },
+        delete => #{
+            tags => ?TAGS,
+            summary => <<"Delete external registry">>,
+            description => ?DESC("external_registry_delete"),
+            parameters => [param_path_external_registry_name()],
+            responses => #{204 => <<"Deleted">>}
+        }
     }.
 
 %%-------------------------------------------------------------------------------------------------
@@ -197,6 +286,66 @@ schema("/schema_registry/:name") ->
             end
     end.
 
+%% External registries
+'/schema_registry_external'(get, _Params) ->
+    Registries0 = emqx_schema_registry_config:list_external_registries_raw(),
+    Registries = maps:map(
+        fun(_Name, Registry) -> emqx_utils:redact(Registry) end,
+        Registries0
+    ),
+    ?OK(Registries);
+'/schema_registry_external'(post, #{body := Params0 = #{<<"name">> := Name}}) ->
+    Params = maps:remove(<<"name">>, Params0),
+    with_external_registry(
+        Name,
+        fun() ->
+            ?BAD_REQUEST(<<"External registry already exists">>)
+        end,
+        fun() ->
+            case emqx_schema_registry_config:upsert_external_registry(Name, Params) of
+                {ok, Registry} ->
+                    ?CREATED(external_registry_out(Registry));
+                {error, Reason} ->
+                    ?BAD_REQUEST(Reason)
+            end
+        end
+    ).
+
+'/schema_registry_external/registry/:name'(get, #{bindings := #{name := Name}}) ->
+    with_external_registry(
+        Name,
+        fun(Registry) ->
+            ?OK(external_registry_out(Registry))
+        end,
+        not_found()
+    );
+'/schema_registry_external/registry/:name'(put, #{bindings := #{name := Name}, body := Params}) ->
+    with_external_registry(
+        Name,
+        fun() ->
+            case emqx_schema_registry_config:upsert_external_registry(Name, Params) of
+                {ok, Registry} ->
+                    ?OK(external_registry_out(Registry));
+                {error, Reason} ->
+                    ?BAD_REQUEST(Reason)
+            end
+        end,
+        not_found()
+    );
+'/schema_registry_external/registry/:name'(delete, #{bindings := #{name := Name}}) ->
+    with_external_registry(
+        Name,
+        fun() ->
+            case emqx_schema_registry_config:delete_external_registry(Name) of
+                ok ->
+                    ?NO_CONTENT;
+                {error, Reason} ->
+                    ?BAD_REQUEST(Reason)
+            end
+        end,
+        fun() -> ?NO_CONTENT end
+    ).
+
 %%-------------------------------------------------------------------------------------------------
 %% Examples
 %%-------------------------------------------------------------------------------------------------
@@ -231,6 +380,32 @@ get_examples() ->
             }
     }.
 
+sample_list_external_registries_response() ->
+    #{<<"my_registry">> => sample_get_external_registry_response(confluent)}.
+
+sample_get_external_registry_response(confluent) ->
+    #{
+        type => <<"confluent">>,
+        url => <<"http://confluent_schema_registry:8081">>,
+        auth => #{
+            mechanism => <<"basic">>,
+            username => <<"cpsruser">>,
+            password => <<"******">>
+        }
+    }.
+
+create_external_registry_input_examples() ->
+    #{
+        <<"confluent">> =>
+            #{
+                summary => <<"Confluent">>,
+                value => external_registry_confluent_example()
+            }
+    }.
+
+external_registry_confluent_example() ->
+    #{}.
+
 %%-------------------------------------------------------------------------------------------------
 %% Schemas and hocon types
 %%-------------------------------------------------------------------------------------------------
@@ -247,6 +422,18 @@ param_path_schema_name() ->
             }
         )}.
 
+param_path_external_registry_name() ->
+    {name,
+        mk(
+            binary(),
+            #{
+                in => path,
+                required => true,
+                example => <<"my_registry">>,
+                desc => ?DESC("param_path_external_registry_name")
+            }
+        )}.
+
 %%-------------------------------------------------------------------------------------------------
 %% Internal fns
 %%-------------------------------------------------------------------------------------------------
@@ -259,3 +446,37 @@ error_schema(Codes, Message) when is_list(Message) ->
     error_schema(Codes, list_to_binary(Message));
 error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
     emqx_dashboard_swagger:error_codes(Codes, Message).
+
+external_registry_out(Registry) ->
+    emqx_utils:redact(Registry).
+
+create_external_registry_union(all_union_members) ->
+    ?UNION(UnionFn) = emqx_schema_registry_schema:external_registry_type(),
+    Refs = UnionFn(all_union_members),
+    lists:map(
+        fun(?R_REF(emqx_schema_registry_schema, Name)) ->
+            NameStr = emqx_utils_conv:str(Name),
+            Struct = "external_registry_api_create_" ++ NameStr,
+            ?R_REF(emqx_schema_registry_schema, Struct)
+        end,
+        Refs
+    );
+create_external_registry_union({value, V}) ->
+    ?UNION(UnionFn) = emqx_schema_registry_schema:external_registry_type(),
+    %% will throw if there's no match; always return single match
+    [?R_REF(emqx_schema_registry_schema, Name)] = UnionFn({value, V}),
+    NameStr = emqx_utils_conv:str(Name),
+    Struct = "external_registry_api_create_" ++ NameStr,
+    [?R_REF(emqx_schema_registry_schema, Struct)].
+
+not_found() -> fun() -> ?NOT_FOUND(<<"External registry not found">>) end.
+
+with_external_registry(Name, FoundFn, NotFoundFn) ->
+    case emqx_schema_registry_config:lookup_external_registry_raw(Name) of
+        {ok, _Registry} when is_function(FoundFn, 0) ->
+            FoundFn();
+        {ok, Registry} when is_function(FoundFn, 1) ->
+            FoundFn(Registry);
+        {error, not_found} ->
+            NotFoundFn()
+    end.

+ 71 - 0
apps/emqx_schema_registry/src/emqx_schema_registry_schema.erl

@@ -23,6 +23,28 @@
     api_schema/1
 ]).
 
+%% API
+-export([
+    external_registry_type/0,
+    external_registries_type/0
+]).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+external_registry_type() ->
+    emqx_schema:mkunion(
+        type,
+        #{
+            <<"confluent">> => ref(confluent_schema_registry)
+        },
+        <<"confluent">>
+    ).
+
+external_registries_type() ->
+    hoconsc:map(name, external_registry_type()).
+
 %%------------------------------------------------------------------------------
 %% `hocon_schema' APIs
 %%------------------------------------------------------------------------------
@@ -37,6 +59,14 @@ tags() ->
 
 fields(?CONF_KEY_ROOT) ->
     [
+        {external,
+            mk(
+                external_registries_type(),
+                #{
+                    default => #{},
+                    desc => ?DESC("confluent_schema_registry")
+                }
+            )},
         {schemas,
             mk(
                 hoconsc:map(
@@ -65,6 +95,43 @@ fields(json) ->
         {type, mk(json, #{required => true, desc => ?DESC("schema_type_json")})}
         | common_fields(emqx_schema:json_binary())
     ];
+fields(confluent_schema_registry) ->
+    [
+        {type,
+            mk(confluent, #{default => confluent, desc => ?DESC("schema_registry_external_type")})},
+        {url, mk(binary(), #{required => true, desc => ?DESC("confluent_schema_registry_url")})},
+        {auth,
+            mk(
+                hoconsc:union([none, ref(confluent_schema_registry_auth_basic)]),
+                #{default => none, desc => ?DESC("confluent_schema_registry_auth")}
+            )}
+    ];
+fields(confluent_schema_registry_auth_basic) ->
+    [
+        {mechanism,
+            mk(basic, #{
+                required => true,
+                default => basic,
+                importance => ?IMPORTANCE_HIDDEN,
+                desc => ?DESC("confluent_schema_registry_auth_basic")
+            })},
+        {username,
+            mk(binary(), #{
+                required => true,
+                desc => ?DESC("confluent_schema_registry_auth_basic_username")
+            })},
+        {password,
+            emqx_schema_secret:mk(#{
+                required => true,
+                desc => ?DESC("confluent_schema_registry_auth_basic_password")
+            })}
+    ];
+fields("external_registry_api_create_" ++ NameStr) ->
+    Name = list_to_existing_atom(NameStr),
+    [
+        {name, mk(binary(), #{required => true, desc => ?DESC("external_registry_name")})}
+        | fields(Name)
+    ];
 fields("get_avro") ->
     [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
 fields("get_protobuf") ->
@@ -94,6 +161,10 @@ desc(protobuf) ->
     ?DESC("protobuf_type");
 desc(json) ->
     ?DESC("json_type");
+desc(confluent_schema_registry) ->
+    ?DESC("confluent_schema_registry");
+desc(confluent_schema_registry_auth_basic) ->
+    ?DESC("confluent_schema_registry_auth");
 desc(_) ->
     undefined.
 

+ 42 - 0
apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl

@@ -109,6 +109,34 @@ handle_rule_function(schema_encode, Args) ->
     error({args_count_error, {schema_encode, Args}});
 handle_rule_function(schema_check, [SchemaId, Data | MoreArgs]) ->
     schema_check(SchemaId, Data, MoreArgs);
+handle_rule_function(avro_encode, [RegistryName, Data | Args]) ->
+    case emqx_schema_registry_external:encode(RegistryName, Data, Args, #{tag => false}) of
+        {ok, Encoded} ->
+            Encoded;
+        {error, Reason} ->
+            error(Reason)
+    end;
+handle_rule_function(avro_decode, [RegistryName, Data | Args]) ->
+    case emqx_schema_registry_external:decode(RegistryName, Data, Args, _Opts = #{}) of
+        {ok, Decoded} ->
+            Decoded;
+        {error, Reason} ->
+            error(Reason)
+    end;
+handle_rule_function(schema_encode_and_tag, [OurSchemaName, RegistryName, Data | Args]) ->
+    case handle_schema_encode_and_tag(OurSchemaName, RegistryName, Data, Args) of
+        {ok, Encoded} ->
+            Encoded;
+        {error, Reason} ->
+            error(Reason)
+    end;
+handle_rule_function(schema_decode_tagged, [RegistryName, Data | Args]) ->
+    case emqx_schema_registry_external:decode(RegistryName, Data, Args, _Opts = #{}) of
+        {ok, Decoded} ->
+            Decoded;
+        {error, Reason} ->
+            error(Reason)
+    end;
 handle_rule_function(_, _) ->
     {error, no_match_for_function}.
 
@@ -439,3 +467,17 @@ has_inner_type(_SerdeType, _EvalContext, []) ->
     true;
 has_inner_type(_SerdeType, _EvalContext, _Path) ->
     false.
+
+handle_schema_encode_and_tag(OurSchemaName, RegistryName, Data, Args) ->
+    maybe
+        {ok, Schema} ?= emqx_schema_registry:get_schema(OurSchemaName),
+        Source = maps:get(source, Schema),
+        emqx_schema_registry_external:encode_with(
+            RegistryName,
+            OurSchemaName,
+            Source,
+            Data,
+            Args,
+            #{tag => true}
+        )
+    end.

+ 90 - 17
apps/emqx_schema_registry/src/emqx_schema_registry_sup.erl

@@ -5,34 +5,98 @@
 
 -behaviour(supervisor).
 
--export([start_link/0]).
+%% API
+-export([
+    start_link/0,
+    start_external_registry_sup/0,
 
+    ensure_external_registry_worker_started/1,
+    ensure_external_registry_worker_absent/1
+]).
+
+%% `supervisor' API
 -export([init/1]).
 
--define(SERVER, ?MODULE).
+%% @doc
+%%
+%% emqx_schema_registry_sup
+%% |
+%% +-- emqx_schema_registry(1)  % registry process controlling local schemas
+%% |
+%% +-- emqx_schema_registry_external_sup(1)  % supervisor for external registry workers
+%%     |
+%%     +-- emqx_schema_registry_external(1) % for managing external registry context and cache
+%%     |
+%%     +-- avlizer_confluent(0..n) % for interacting with confluent schema registry
+
+%%------------------------------------------------------------------------------
+%% Type definitions
+%%------------------------------------------------------------------------------
+
+-define(root, root).
+-define(ROOT_SUP, ?MODULE).
+
+-define(external, external).
+-define(EXTERNAL_SUP, emqx_schema_registry_external_sup).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
 
 start_link() ->
-    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
-
-%% sup_flags() = #{strategy => strategy(),         % optional
-%%                 intensity => non_neg_integer(), % optional
-%%                 period => pos_integer()}        % optional
-%% child_spec() = #{id => child_id(),       % mandatory
-%%                  start => mfargs(),      % mandatory
-%%                  restart => restart(),   % optional
-%%                  shutdown => shutdown(), % optional
-%%                  type => worker(),       % optional
-%%                  modules => modules()}   % optional
-init([]) ->
+    supervisor:start_link({local, ?ROOT_SUP}, ?MODULE, ?root).
+
+start_external_registry_sup() ->
+    supervisor:start_link({local, ?EXTERNAL_SUP}, ?MODULE, ?external).
+
+ensure_external_registry_worker_started(ChildSpec) ->
+    case supervisor:start_child(?EXTERNAL_SUP, ChildSpec) of
+        {ok, _} ->
+            ok;
+        {error, {already_started, _}} ->
+            ok
+    end.
+
+ensure_external_registry_worker_absent(WorkerId) ->
+    case supervisor:terminate_child(?EXTERNAL_SUP, WorkerId) of
+        ok ->
+            _ = supervisor:delete_child(?EXTERNAL_SUP, WorkerId),
+            ok;
+        {error, not_found} ->
+            ok
+    end.
+
+%%------------------------------------------------------------------------------
+%% `supervisor' API
+%%------------------------------------------------------------------------------
+
+init(?root) ->
     SupFlags = #{
         strategy => one_for_one,
         intensity => 10,
         period => 100
     },
-    ChildSpecs = [child_spec(emqx_schema_registry)],
-    {ok, {SupFlags, ChildSpecs}}.
+    ChildSpecs = [
+        worker_spec(emqx_schema_registry),
+        external_sup_spec()
+    ],
+    {ok, {SupFlags, ChildSpecs}};
+init(?external) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 5
+    },
+    Children = [
+        worker_spec(emqx_schema_registry_external)
+    ],
+    {ok, {SupFlags, Children}}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
 
-child_spec(Mod) ->
+worker_spec(Mod) ->
     #{
         id => Mod,
         start => {Mod, start_link, []},
@@ -40,3 +104,12 @@ child_spec(Mod) ->
         shutdown => 5_000,
         type => worker
     }.
+
+external_sup_spec() ->
+    #{
+        id => ?EXTERNAL_SUP,
+        start => {?MODULE, start_external_registry_sup, []},
+        restart => permanent,
+        shutdown => infinity,
+        type => supervisor
+    }.

+ 110 - 6
apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl

@@ -23,14 +23,21 @@ all() ->
         {group, avro},
         {group, protobuf},
         {group, json}
-    ] ++ sparkplug_tests().
+    ] ++ sparkplug_tests() ++ only_once_tcs().
 
 groups() ->
-    AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(),
+    OnlyOnceTCs = only_once_tcs(),
+    AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- (sparkplug_tests() ++ OnlyOnceTCs),
     ProtobufOnlyTCs = protobuf_only_tcs(),
     TCs = AllTCsExceptSP -- ProtobufOnlyTCs,
     [{avro, TCs}, {json, TCs}, {protobuf, AllTCsExceptSP}].
 
+only_once_tcs() ->
+    [
+        t_import_config,
+        t_external_registry_load_config
+    ].
+
 protobuf_only_tcs() ->
     [
         t_protobuf_union_encode,
@@ -85,6 +92,7 @@ end_per_testcase(_TestCase, _Config) ->
     ok = snabbkaffe:stop(),
     emqx_common_test_helpers:call_janitor(),
     clear_schemas(),
+    clear_external_registries(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -345,6 +353,14 @@ clear_schemas() ->
         emqx_schema_registry:list_schemas()
     ).
 
+clear_external_registries() ->
+    maps:foreach(
+        fun(Name, _Schema) ->
+            ok = emqx_schema_registry_config:delete_external_registry(Name)
+        end,
+        emqx_schema_registry_config:list_external_registries()
+    ).
+
 receive_action_results() ->
     receive
         {action, #{data := _} = Res} ->
@@ -753,6 +769,94 @@ t_cluster_serde_build(Config) ->
     ),
     ok.
 
+%% Verifies that importing in both `merge' and `replace' modes work with external
+%% registries, when importing configurations from the CLI interface.
+t_external_registry_load_config(_Config) ->
+    %% Existing config
+    Name0 = <<"preexisting">>,
+    Config0 = emqx_schema_registry_http_api_SUITE:confluent_schema_registry_with_basic_auth(),
+    {201, _} = emqx_schema_registry_http_api_SUITE:create_external_registry(Config0#{
+        <<"name">> => Name0
+    }),
+    [_] = emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name0),
+
+    %% Config to load
+    %% Will update existing config
+    Name1 = Name0,
+    URL1 = <<"http://new_url:8081">>,
+    Config1 = emqx_schema_registry_http_api_SUITE:confluent_schema_registry_with_basic_auth(#{
+        <<"url">> => URL1
+    }),
+    %% New config
+    Name2 = <<"new">>,
+    URL2 = <<"http://yet_another_url:8081">>,
+    Config2 = emqx_schema_registry_http_api_SUITE:confluent_schema_registry_with_basic_auth(#{
+        <<"url">> => URL2
+    }),
+    ConfigToLoad1 = #{
+        <<"schema_registry">> => #{
+            <<"external">> => #{
+                Name1 => Config1,
+                Name2 => Config2
+            }
+        }
+    },
+    ConfigToLoad1Bin = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})),
+    ?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoad1Bin, #{mode => merge})),
+
+    Path = [schema_registry, external],
+    PathBin = [emqx_utils_conv:bin(PS) || PS <- Path],
+    Name1Atom = binary_to_atom(Name1),
+    Name2Atom = binary_to_atom(Name2),
+    ?assertMatch(
+        #{
+            Name1 := #{<<"url">> := URL1},
+            Name2 := #{<<"url">> := URL2}
+        },
+        emqx_config:get_raw(PathBin)
+    ),
+    ?assertMatch(
+        #{
+            Name1Atom := #{url := URL1},
+            Name2Atom := #{url := URL2}
+        },
+        emqx_config:get(Path)
+    ),
+    %% Correctly starts new processes
+    ?assertMatch([_], emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name1)),
+    ?assertMatch([_], emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name2)),
+
+    %% New config; will replace everything
+    Name3 = Name0,
+    URL3 = <<"http://final_url:8081">>,
+    Config3 = emqx_schema_registry_http_api_SUITE:confluent_schema_registry_with_basic_auth(#{
+        <<"url">> => URL3
+    }),
+    ConfigToLoad2 = #{
+        <<"schema_registry">> => #{
+            <<"external">> => #{
+                Name3 => Config3
+            }
+        }
+    },
+    ConfigToLoad2Bin = iolist_to_binary(hocon_pp:do(ConfigToLoad2, #{})),
+    ?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoad2Bin, #{mode => replace})),
+
+    Name3Atom = binary_to_atom(Name3),
+    ?assertMatch(
+        #{Name3 := #{<<"url">> := URL3}},
+        emqx_config:get_raw(PathBin)
+    ),
+    ?assertMatch(
+        #{Name3Atom := #{url := URL3}},
+        emqx_config:get(Path)
+    ),
+    %% Correctly stops old processes
+    ?assertMatch([_], emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name3)),
+    ?assertMatch([], emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name2)),
+
+    ok.
+
 t_import_config(_Config) ->
     RawConf = #{
         <<"schema_registry">> =>
@@ -784,14 +888,14 @@ t_import_config(_Config) ->
         RawConf,
         <<"Updated description">>
     ),
-    Path = [schema_registry, schemas, <<"my_avro_schema">>],
+    Path = [schema_registry, schemas, my_avro_schema],
     ?assertEqual(
-        {ok, #{root_key => schema_registry, changed => []}},
-        emqx_schema_registry:import_config(RawConf)
+        {ok, #{root_key => schema_registry, changed => [Path]}},
+        emqx_schema_registry_config:import_config(RawConf)
     ),
     ?assertEqual(
         {ok, #{root_key => schema_registry, changed => [Path]}},
-        emqx_schema_registry:import_config(RawConf1)
+        emqx_schema_registry_config:import_config(RawConf1)
     ).
 
 sparkplug_example_data_base64() ->

+ 308 - 3
apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl

@@ -15,6 +15,12 @@
 
 -include("emqx_schema_registry.hrl").
 
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-define(REDACTED, <<"******">>).
+
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %%------------------------------------------------------------------------------
@@ -40,7 +46,9 @@ groups() ->
 
 only_once_testcases() ->
     [
-        t_empty_sparkplug
+        t_empty_sparkplug,
+        t_external_registry_crud_confluent,
+        t_smoke_test_external_registry_confluent
     ].
 
 init_per_suite(Config) ->
@@ -124,11 +132,13 @@ end_per_group(_Group, _Config) ->
 
 init_per_testcase(_TestCase, Config) ->
     clear_schemas(),
+    clear_external_registries(),
     ok = snabbkaffe:start_trace(),
     Config.
 
 end_per_testcase(_TestCase, _Config) ->
     clear_schemas(),
+    clear_external_registries(),
     ok = snabbkaffe:stop(),
     emqx_common_test_helpers:call_janitor(),
     ok.
@@ -181,6 +191,14 @@ clear_schemas() ->
         emqx_schema_registry:list_schemas()
     ).
 
+clear_external_registries() ->
+    maps:foreach(
+        fun(Name, _Schema) ->
+            ok = emqx_schema_registry_config:delete_external_registry(Name)
+        end,
+        emqx_schema_registry_config:list_external_registries()
+    ).
+
 dryrun_rule(SQL, Context) ->
     Params = #{
         context => Context,
@@ -203,6 +221,139 @@ simplify_result(Res) ->
             {Status, Body}
     end.
 
+simple_request(Method, Path, Params) ->
+    emqx_mgmt_api_test_util:simple_request(Method, Path, Params).
+
+confluent_schema_registry_no_auth() ->
+    confluent_schema_registry_no_auth(_Overrides = #{}).
+
+confluent_schema_registry_no_auth(#{} = Overrides) ->
+    emqx_utils_maps:deep_merge(
+        #{
+            <<"type">> => <<"confluent">>,
+            <<"url">> => confluent_url_bin(without_auth),
+            <<"auth">> => <<"none">>
+        },
+        Overrides
+    ).
+
+confluent_schema_registry_with_basic_auth() ->
+    confluent_schema_registry_with_basic_auth(_Overrides = #{}).
+
+confluent_schema_registry_with_basic_auth(#{} = Overrides) ->
+    emqx_utils_maps:deep_merge(
+        #{
+            <<"type">> => <<"confluent">>,
+            <<"url">> => confluent_url_bin(with_auth),
+            <<"auth">> => confluent_schema_registry_basic_auth()
+        },
+        Overrides
+    ).
+
+confluent_schema_registry_basic_auth() ->
+    #{
+        <<"mechanism">> => <<"basic">>,
+        <<"username">> => <<"cpsruser">>,
+        <<"password">> => <<"mypass">>
+    }.
+
+list_external_registries() ->
+    Path = uri(["schema_registry_external"]),
+    simple_request(get, Path, _Params = []).
+
+get_external_registry(Name) ->
+    Path = uri(["schema_registry_external", "registry", Name]),
+    simple_request(get, Path, _Params = []).
+
+create_external_registry(Params) ->
+    Path = uri(["schema_registry_external"]),
+    simple_request(post, Path, Params).
+
+update_external_registry(Name, Params) ->
+    Path = uri(["schema_registry_external", "registry", Name]),
+    simple_request(put, Path, Params).
+
+delete_external_registry(Name) ->
+    Path = uri(["schema_registry_external", "registry", Name]),
+    simple_request(delete, Path, _Params = []).
+
+find_external_registry_worker(Name) ->
+    [
+        Pid
+     || {N, Pid, _, _} <- supervisor:which_children(emqx_schema_registry_external_sup),
+        emqx_utils_conv:bin(N) =:= Name
+    ].
+
+confluent_url_bin(WithOrWithoutAuth) ->
+    list_to_binary(confluent_url_string(WithOrWithoutAuth)).
+
+confluent_url_string(without_auth) ->
+    "http://confluent_schema_registry:8081";
+confluent_url_string(with_auth) ->
+    "http://confluent_schema_registry_basicauth:8081".
+
+start_confluent_client(WithOrWithoutAuth) ->
+    Cfg0 = #{url => confluent_url_string(WithOrWithoutAuth)},
+    Cfg = emqx_utils_maps:put_if(
+        Cfg0,
+        auth,
+        to_avlizer_auth(confluent_schema_registry_basic_auth()),
+        WithOrWithoutAuth =:= with_auth
+    ),
+    {ok, Server} = avlizer_confluent:start_link(_ServerRef = undefined, Cfg),
+    Table = avlizer_confluent:get_table(Server),
+    #{server => Server, table => Table}.
+
+to_avlizer_auth(#{<<"mechanism">> := <<"basic">>} = Auth0) ->
+    #{<<"username">> := Username, <<"password">> := Password} = Auth0,
+    {basic, emqx_utils_conv:str(Username), emqx_utils_conv:str(Password)}.
+
+register_schema_confluent(#{server := Server}, Subject, Schema) ->
+    {ok, Id} = avlizer_confluent:register_schema(Server, Subject, Schema),
+    Id.
+
+confluent_encode(Data, SchemaId, RegistryClient) ->
+    #{server := Server, table := Table} = RegistryClient,
+    Encoder = avlizer_confluent:make_encoder2(Server, Table, SchemaId),
+    avlizer_confluent:encode(Encoder, Data).
+
+confluent_encode_and_tag(Data, SchemaId, RegistryClient) ->
+    avlizer_confluent:tag_data(SchemaId, confluent_encode(Data, SchemaId, RegistryClient)).
+
+confluent_decode_untagged(Data, SchemaId, RegistryClient) ->
+    #{server := Server, table := Table} = RegistryClient,
+    Decoder = avlizer_confluent:make_decoder2(Server, Table, SchemaId),
+    avlizer_confluent:decode(Decoder, Data).
+
+avro_schema2() ->
+    avro_record:type(
+        <<"myrecord">>,
+        [avro_record:define_field(f1, avro_map:type(avro_primitive:int_type()))],
+        [{namespace, 'com.example'}]
+    ).
+
+sql(Template, Context) ->
+    Parsed = emqx_template:parse(Template),
+    iolist_to_binary(emqx_template:render_strict(Parsed, Context)).
+
+publish_context({json, Data}) ->
+    publish_context1(emqx_utils_json:encode(Data));
+publish_context({hex, Data}) ->
+    publish_context1(bin2hex(Data)).
+
+publish_context1(Payload) ->
+    #{
+        <<"clientid">> => <<"c_emqx">>,
+        <<"event_type">> => <<"message_publish">>,
+        <<"payload">> => Payload,
+        <<"qos">> => 1,
+        <<"topic">> => <<"t">>,
+        <<"username">> => <<"u_emqx">>
+    }.
+
+bin2hex(Bin) ->
+    emqx_rule_funcs:bin2hexstr(Bin).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -316,7 +467,7 @@ t_crud(Config) ->
         {ok, 400, #{
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> :=
-                <<"{post_config_update,emqx_schema_registry,", _/binary>>
+                <<"{post_config_update,emqx_schema_registry_config,", _/binary>>
         }},
         request({put, SchemaName, UpdateParams#{<<"source">> := InvalidSourceBin}})
     ),
@@ -357,7 +508,7 @@ t_crud(Config) ->
         {ok, 400, #{
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> :=
-                <<"{post_config_update,emqx_schema_registry,", _/binary>>
+                <<"{post_config_update,emqx_schema_registry_config,", _/binary>>
         }},
         request({post, Params#{<<"source">> := InvalidSourceBin}})
     ),
@@ -437,3 +588,157 @@ t_name_too_long(Config) ->
         request({get, SchemaName})
     ),
     ok.
+
+%% Checks basic CRUD operations for dealing with external confluent registry.
+t_external_registry_crud_confluent(_Config) ->
+    Name1 = <<"my_reg1">>,
+    Params1 = confluent_schema_registry_no_auth(),
+    NamedParams1 = Params1#{<<"name">> => Name1},
+
+    ?assertEqual({200, #{}}, list_external_registries()),
+    ?assertMatch({404, _}, get_external_registry(Name1)),
+    ?assertMatch({404, _}, update_external_registry(Name1, Params1)),
+    ?assertMatch({204, _}, delete_external_registry(Name1)),
+
+    ?assertMatch({201, _}, create_external_registry(NamedParams1)),
+    ?assertEqual({200, Params1}, get_external_registry(Name1)),
+    ?assertMatch({200, #{Name1 := Params1}}, list_external_registries()),
+    ?assertMatch([_], find_external_registry_worker(Name1)),
+
+    Params2 = confluent_schema_registry_with_basic_auth(),
+    Expected2 = emqx_utils_maps:deep_put([<<"auth">>, <<"password">>], Params2, ?REDACTED),
+    ?assertMatch({200, Expected2}, update_external_registry(Name1, Params2)),
+    ?assertMatch({200, Expected2}, get_external_registry(Name1)),
+    ?assertMatch({200, #{Name1 := Expected2}}, list_external_registries()),
+    ?assertMatch([_], find_external_registry_worker(Name1)),
+
+    Name2 = <<"my_reg2">>,
+    Params3 = confluent_schema_registry_no_auth(),
+    NamedParams3 = Params3#{<<"name">> => Name2},
+    ?assertMatch({404, _}, get_external_registry(Name2)),
+    ?assertMatch({404, _}, update_external_registry(Name2, Params3)),
+    ?assertMatch({204, _}, delete_external_registry(Name2)),
+    ?assertMatch([], find_external_registry_worker(Name2)),
+
+    ?assertMatch({201, _}, create_external_registry(NamedParams3)),
+    ?assertMatch({200, Params3}, get_external_registry(Name2)),
+    ?assertMatch({200, #{Name1 := Expected2, Name2 := Params3}}, list_external_registries()),
+    ?assertMatch([_], find_external_registry_worker(Name1)),
+    ?assertMatch([_], find_external_registry_worker(Name2)),
+
+    ?assertMatch({204, _}, delete_external_registry(Name1)),
+    ?assertMatch({404, _}, get_external_registry(Name1)),
+    ?assertMatch({404, _}, update_external_registry(Name1, Params1)),
+    ?assertMatch({204, _}, delete_external_registry(Name1)),
+    ?assertMatch({200, #{Name2 := Params3}}, list_external_registries()),
+    ?assertMatch([], find_external_registry_worker(Name1)),
+    ?assertMatch([_], find_external_registry_worker(Name2)),
+
+    %% Bad params
+    BadParams = #{},
+    ?assertMatch({400, _}, create_external_registry(BadParams)),
+    ?assertMatch({400, _}, update_external_registry(Name2, BadParams)),
+
+    ok.
+
+%% Happy path tests when using external registry (confluent).
+t_smoke_test_external_registry_confluent(_Config) ->
+    Name1 = <<"my_reg1">>,
+    Params1 = confluent_schema_registry_with_basic_auth(),
+    NamedParams1 = Params1#{<<"name">> => Name1},
+    {201, _} = create_external_registry(NamedParams1),
+
+    RegistryClient = start_confluent_client(with_auth),
+    Subject = atom_to_list(?FUNCTION_NAME),
+    SchemaId = register_schema_confluent(RegistryClient, Subject, avro_schema2()),
+
+    %% encode: fetch schema on the fly using id; good data
+    SQL1 = sql(
+        <<
+            "select bin2hexstr(avro_encode('${.name}', json_decode(payload), ${.schema_id})) as encoded"
+            " from \"t\" "
+        >>,
+        #{name => Name1, schema_id => SchemaId}
+    ),
+    Data1 = #{<<"f1">> => #{<<"bah">> => 123}},
+    Context1 = publish_context({json, Data1}),
+    Expected1 = bin2hex(confluent_encode(Data1, SchemaId, RegistryClient)),
+    ?assertMatch(
+        {200, #{<<"encoded">> := Expected1}},
+        dryrun_rule(SQL1, Context1),
+        #{expected => Expected1}
+    ),
+
+    %% decode: fetch schema on the fly using id; good data
+    SQL2 = sql(
+        <<
+            "select avro_decode('${.name}', hexstr2bin(payload), ${.schema_id}) as decoded"
+            " from \"t\" "
+        >>,
+        #{name => Name1, schema_id => SchemaId}
+    ),
+    Encoded2 = confluent_encode(Data1, SchemaId, RegistryClient),
+    Context2 = publish_context({hex, Encoded2}),
+    Expected2 = Data1,
+    ?assertMatch(
+        {200, #{<<"decoded">> := Expected2}},
+        dryrun_rule(SQL2, Context2),
+        #{expected => Expected2}
+    ),
+
+    %% encode and tag using a schema registered in emqx
+    SchemaName3 = <<"my_schema">>,
+    Params3 = #{
+        <<"type">> => <<"avro">>,
+        <<"source">> => emqx_utils_json:encode(#{
+            <<"type">> => <<"record">>,
+            <<"name">> => <<"apitest">>,
+            <<"fields">> => [
+                #{<<"name">> => <<"i">>, <<"type">> => <<"int">>},
+                #{<<"name">> => <<"s">>, <<"type">> => <<"string">>}
+            ]
+        }),
+        <<"name">> => SchemaName3,
+        <<"description">> => <<"My schema">>
+    },
+    {ok, 201, _} = request({post, Params3}),
+
+    Subject3 = <<"subj3">>,
+    SQL3 = sql(
+        <<
+            "select bin2hexstr(schema_encode_and_tag("
+            "    '${.schema}', '${.registry}', json_decode(payload), '${.subject}')) as encoded"
+            " from \"t\" "
+        >>,
+        #{schema => SchemaName3, registry => Name1, subject => Subject3}
+    ),
+    Data3 = #{<<"i">> => 10, <<"s">> => <<"abc">>},
+    Context3 = publish_context({json, Data3}),
+    Expected3 = bin2hex(iolist_to_binary(emqx_schema_registry_serde:encode(SchemaName3, Data3))),
+    %% 40 bits => 5 * 2 hex digits
+    MagicTagHexSize = 10,
+    ?assertMatch(
+        {200, #{<<"encoded">> := <<_Tag3:MagicTagHexSize/binary, Expected3/binary>>}},
+        dryrun_rule(SQL3, Context3),
+        #{expected => Expected3}
+    ),
+
+    %% decode tagged data on the fly
+    SQL4 = sql(
+        <<
+            "select schema_decode_tagged("
+            "    '${.registry}', hexstr2bin(payload)) as decoded"
+            " from \"t\" "
+        >>,
+        #{registry => Name1}
+    ),
+    Encoded4 = confluent_encode_and_tag(Data1, SchemaId, RegistryClient),
+    Context4 = publish_context({hex, Encoded4}),
+    Expected4 = Data1,
+    ?assertMatch(
+        {200, #{<<"decoded">> := Expected4}},
+        dryrun_rule(SQL4, Context4),
+        #{expected => Expected4}
+    ),
+
+    ok.

+ 3 - 0
apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl

@@ -42,12 +42,15 @@ end_per_suite(Config) ->
     Apps = ?config(apps, Config),
     emqx_cth_suite:stop(Apps),
     ok.
+
 init_per_testcase(_TestCase, Config) ->
     Config.
 
 end_per_testcase(_TestCase, _Config) ->
     emqx_common_test_helpers:call_janitor(),
+    snabbkaffe:start_trace(),
     clear_schemas(),
+    snabbkaffe:stop(),
     ok.
 
 %%------------------------------------------------------------------------------

+ 8 - 0
apps/emqx_utils/src/emqx_utils_redact.erl

@@ -23,12 +23,18 @@
 -define(IS_KEY_HEADERS(K), (K == headers orelse K == <<"headers">> orelse K == "headers")).
 
 %% NOTE: keep alphabetical order
+is_sensitive_key(account_key) -> true;
+is_sensitive_key("account_key") -> true;
+is_sensitive_key(<<"account_key">>) -> true;
 is_sensitive_key(aws_secret_access_key) -> true;
 is_sensitive_key("aws_secret_access_key") -> true;
 is_sensitive_key(<<"aws_secret_access_key">>) -> true;
 is_sensitive_key(password) -> true;
 is_sensitive_key("password") -> true;
 is_sensitive_key(<<"password">>) -> true;
+is_sensitive_key(private_key) -> true;
+is_sensitive_key("private_key") -> true;
+is_sensitive_key(<<"private_key">>) -> true;
 is_sensitive_key(secret) -> true;
 is_sensitive_key("secret") -> true;
 is_sensitive_key(<<"secret">>) -> true;
@@ -236,8 +242,10 @@ redact_test_() ->
 
     Types = [atom, string, binary],
     Keys = [
+        account_key,
         aws_secret_access_key,
         password,
+        private_key,
         secret,
         secret_key,
         secret_access_key,

+ 1 - 0
changes/ee/feat-13804.en.md

@@ -0,0 +1 @@
+Added support for using Confluent Schema Registry as an external provider in our Schema Registry.

+ 3 - 1
mix.exs

@@ -159,6 +159,8 @@ defmodule EMQXUmbrella.MixProject do
       common_dep(:ra),
       {:mimerl, "1.2.0", override: true},
       common_dep(:sasl_auth),
+      # avlizer currently uses older :erlavro version
+      common_dep(:erlavro),
       common_dep(:crc32cer)
     ]
   end
@@ -285,7 +287,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:snappyer), do: {:snappyer, "1.2.9", override: true}
   def common_dep(:crc32cer), do: {:crc32cer, "0.1.8", override: true}
   def common_dep(:jesse), do: {:jesse, github: "emqx/jesse", tag: "1.8.0.1"}
-  def common_dep(:erlavro), do: {:erlavro, github: "emqx/erlavro", tag: "2.10.0"}
+  def common_dep(:erlavro), do: {:erlavro, github: "emqx/erlavro", tag: "2.10.0", override: true}
 
   ###############################################################################################
   # BEGIN DEPRECATED FOR MIX BLOCK

+ 18 - 0
rel/i18n/emqx_schema_registry_http_api.hocon

@@ -6,6 +6,9 @@ desc_param_path_schema_name.desc:
 desc_param_path_schema_name.label:
 """Schema name"""
 
+param_path_external_registry_name.desc:
+"""External registry name"""
+
 desc_schema_registry_api_delete.desc:
 """Delete a schema"""
 
@@ -36,4 +39,19 @@ desc_schema_registry_api_put.desc:
 desc_schema_registry_api_put.label:
 """Update schema"""
 
+external_registry_list.desc:
+"""List external schema registries"""
+
+external_registry_delete.desc:
+"""Delete external schema registry"""
+
+external_registry_lookup.desc:
+"""Lookup external schema registry"""
+
+external_registry_create.desc:
+"""Create external schema registry"""
+
+external_registry_update.desc:
+"""Update external schema registry"""
+
 }

+ 35 - 0
rel/i18n/emqx_schema_registry_schema.hocon

@@ -68,4 +68,39 @@ schema_type_json.desc:
 schema_type_json.label:
 """JSON Schema"""
 
+confluent_schema_registry.label:
+"""Confluent Schema Registry"""
+confluent_schema_registry.desc:
+"""Confluent External Schema Registry configuration."""
+
+schema_registry_external_type.label:
+"""External Schema Registry Type"""
+schema_registry_external_type.desc:
+"""External Schema Registry Type"""
+
+confluent_schema_registry_url.label:
+"""Registry URL"""
+confluent_schema_registry_url.desc:
+"""URL endpoint for external registry."""
+
+confluent_schema_registry_auth.label:
+"""Authentication"""
+confluent_schema_registry_auth.desc:
+"""Authentication options for accessing external registry."""
+
+confluent_schema_registry_auth_basic.label:
+"""Basic Auth"""
+confluent_schema_registry_auth_basic.desc:
+"""Use basic authentication method for accessing external registry."""
+
+confluent_schema_registry_auth_basic_username.label:
+"""Username"""
+confluent_schema_registry_auth_basic_username.desc:
+"""Username for basic authentication method."""
+
+confluent_schema_registry_auth_basic_password.label:
+"""Password"""
+confluent_schema_registry_auth_basic_password.desc:
+"""Password for basic authentication method."""
+
 }

+ 3 - 0
scripts/ct/run.sh

@@ -272,6 +272,9 @@ for dep in ${CT_DEPS}; do
                 SNOWFLAKE_ODBC_REQUEST='yes'
             fi
             ;;
+        schema-registry)
+          FILES+=( '.ci/docker-compose-file/docker-compose-confluent-schema-registry.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1