瀏覽代碼

Merge remote-tracking branch 'origin/release-51' into sync-r51-20230712

Thales Macedo Garitezi 2 年之前
父節點
當前提交
06010f7ca9
共有 94 個文件被更改,包括 855 次插入249 次删除
  1. 1 1
      .github/workflows/release.yaml
  2. 1 1
      Makefile
  3. 1 1
      apps/emqx/include/emqx_release.hrl
  4. 1 0
      apps/emqx/priv/bpapi.versions
  5. 1 1
      apps/emqx/src/emqx_ws_connection.erl
  6. 8 0
      apps/emqx/test/emqx_ws_connection_SUITE.erl
  7. 2 1
      apps/emqx_authn/rebar.config
  8. 3 2
      apps/emqx_authn/src/emqx_authn.app.src
  9. 3 3
      apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
  10. 2 1
      apps/emqx_authz/rebar.config
  11. 2 1
      apps/emqx_authz/src/emqx_authz.app.src
  12. 1 1
      apps/emqx_authz/src/emqx_authz_api_schema.erl
  13. 2 2
      apps/emqx_authz/src/emqx_authz_http.erl
  14. 2 0
      apps/emqx_authz/src/emqx_authz_schema.erl
  15. 4 4
      apps/emqx_bridge/src/emqx_bridge_resource.erl
  16. 2 2
      apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
  17. 2 1
      apps/emqx_bridge_gcp_pubsub/rebar.config
  18. 1 1
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src
  19. 2 2
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl
  20. 9 9
      apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl
  21. 36 0
      apps/emqx_bridge_http/README.md
  22. 10 0
      apps/emqx_bridge_http/rebar.config
  23. 9 0
      apps/emqx_bridge_http/src/emqx_bridge_http.app.src
  24. 1 1
      apps/emqx_connector/src/emqx_connector_http.erl
  25. 2 2
      apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl
  26. 7 7
      apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
  27. 1 1
      apps/emqx_connector/test/emqx_connector_web_hook_server.erl
  28. 3 3
      apps/emqx_connector/test/emqx_connector_http_tests.erl
  29. 2 1
      apps/emqx_bridge_iotdb/rebar.config
  30. 1 1
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src
  31. 1 1
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl
  32. 5 5
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl
  33. 2 1
      apps/emqx_dashboard/rebar.config
  34. 1 1
      apps/emqx_dashboard/src/emqx_dashboard.app.src
  35. 3 2
      apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl
  36. 1 1
      apps/emqx_enterprise/src/emqx_enterprise.app.src
  37. 2 2
      apps/emqx_enterprise/src/emqx_enterprise_schema.erl
  38. 1 0
      apps/emqx_machine/rebar.config
  39. 1 1
      apps/emqx_machine/src/emqx_machine_boot.erl
  40. 2 1
      apps/emqx_management/rebar.config
  41. 1 1
      apps/emqx_management/src/emqx_management.app.src
  42. 47 8
      apps/emqx_management/src/emqx_mgmt_api.erl
  43. 3 1
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  44. 87 1
      apps/emqx_management/src/emqx_mgmt_api_cluster.erl
  45. 38 0
      apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v2.erl
  46. 102 0
      apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl
  47. 1 1
      apps/emqx_management/test/emqx_mgmt_api_test_util.erl
  48. 1 1
      apps/emqx_rule_engine/src/emqx_rule_funcs.erl
  49. 2 1
      apps/emqx_s3/rebar.config
  50. 2 1
      apps/emqx_s3/src/emqx_s3.app.src
  51. 3 2
      apps/emqx_s3/src/emqx_s3_schema.erl
  52. 1 1
      apps/emqx_s3/test/emqx_s3_SUITE.erl
  53. 94 0
      apps/emqx_schema_registry/BSL.txt
  54. 15 0
      lib-ee/emqx_ee_schema_registry/README.md
  55. 0 0
      apps/emqx_schema_registry/etc/emqx_schema_registry.conf
  56. 3 2
      lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl
  57. 0 0
      apps/emqx_schema_registry/priv/LICENSE
  58. 0 0
      apps/emqx_schema_registry/priv/sparkplug_b.proto
  59. 4 4
      lib-ee/emqx_ee_schema_registry/rebar.config
  60. 3 3
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src
  61. 3 3
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl
  62. 6 6
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl
  63. 18 18
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_http_api.erl
  64. 2 2
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl
  65. 5 5
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl
  66. 3 4
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_sup.erl
  67. 41 27
      lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl
  68. 8 8
      lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl
  69. 17 17
      lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl
  70. 20 9
      apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl
  71. 1 1
      build
  72. 1 0
      changes/ce/feat-11249.en.md
  73. 3 0
      changes/ce/feat-11251.en.md
  74. 1 0
      changes/ce/feat-11253.en.md
  75. 2 0
      changes/ce/fix-11250.en.md
  76. 1 0
      changes/ce/perf-11236.en.md
  77. 1 0
      changes/ee/feat-11241.en.md
  78. 2 2
      deploy/charts/emqx-enterprise/Chart.yaml
  79. 0 19
      lib-ee/emqx_ee_schema_registry/.gitignore
  80. 14 1
      lib-ee/emqx_license/src/emqx_license.erl
  81. 55 9
      lib-ee/emqx_license/src/emqx_license_http_api.erl
  82. 4 6
      lib-ee/emqx_license/src/emqx_license_schema.erl
  83. 51 2
      lib-ee/emqx_license/test/emqx_license_http_api_SUITE.erl
  84. 5 2
      mix.exs
  85. 4 1
      rebar.config.erl
  86. 1 1
      rel/i18n/emqx_connector_http.hocon
  87. 1 1
      rel/i18n/emqx_bridge_webhook_schema.hocon
  88. 6 0
      rel/i18n/emqx_license_http_api.hocon
  89. 14 2
      rel/i18n/emqx_license_schema.hocon
  90. 5 0
      rel/i18n/emqx_mgmt_api_cluster.hocon
  91. 1 1
      rel/i18n/emqx_ee_schema_registry_http_api.hocon
  92. 1 1
      rel/i18n/emqx_ee_schema_registry_schema.hocon
  93. 1 0
      scripts/macos-sign-binaries.sh
  94. 9 9
      scripts/rerun-apps-version-check.py

+ 1 - 1
.github/workflows/release.yaml

@@ -111,7 +111,7 @@ jobs:
       - upload
     permissions:
       pull-requests: read
-      checks: read
+      checks: write
       actions: write
     steps:
       - uses: actions/checkout@v3

+ 1 - 1
Makefile

@@ -16,7 +16,7 @@ endif
 # Dashboard version
 # from https://github.com/emqx/emqx-dashboard5
 export EMQX_DASHBOARD_VERSION ?= v1.3.1
-export EMQX_EE_DASHBOARD_VERSION ?= e1.1.1-beta.1
+export EMQX_EE_DASHBOARD_VERSION ?= e1.1.1-beta.3
 
 # `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
 # In make 4.4+, for backward-compatibility the value from the original environment is used.

+ 1 - 1
apps/emqx/include/emqx_release.hrl

@@ -35,7 +35,7 @@
 -define(EMQX_RELEASE_CE, "5.1.1").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.1.0").
+-define(EMQX_RELEASE_EE, "5.1.1-alpha.1").
 
 %% The HTTP API version
 -define(EMQX_API_VERSION, "5.0").

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -32,6 +32,7 @@
 {emqx_mgmt_api_plugins,1}.
 {emqx_mgmt_api_plugins,2}.
 {emqx_mgmt_cluster,1}.
+{emqx_mgmt_cluster,2}.
 {emqx_mgmt_trace,1}.
 {emqx_mgmt_trace,2}.
 {emqx_node_rebalance,1}.

+ 1 - 1
apps/emqx/src/emqx_ws_connection.erl

@@ -699,7 +699,7 @@ check_oom(State = #state{channel = Channel}) ->
 %%--------------------------------------------------------------------
 
 parse_incoming(<<>>, Packets, State) ->
-    {Packets, State};
+    {lists:reverse(Packets), State};
 parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
     try emqx_frame:parse(Data, ParseState) of
         {more, NParseState} ->

+ 8 - 0
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -530,6 +530,14 @@ t_parse_incoming(_) ->
     Packet = ?PUBLISH_PACKET(?QOS_0, <<"t">>, undefined, <<>>),
     ?assertMatch([{incoming, Packet}], Packets1).
 
+t_parse_incoming_order(_) ->
+    Packet1 = ?PUBLISH_PACKET(?QOS_0, <<"t1">>, undefined, <<>>),
+    Packet2 = ?PUBLISH_PACKET(?QOS_0, <<"t2">>, undefined, <<>>),
+    Bin1 = emqx_frame:serialize(Packet1),
+    Bin2 = emqx_frame:serialize(Packet2),
+    {Packets1, _} = ?ws_conn:parse_incoming(erlang:iolist_to_binary([Bin1, Bin2]), [], st()),
+    ?assertMatch([{incoming, Packet1}, {incoming, Packet2}], Packets1).
+
 t_parse_incoming_frame_error(_) ->
     {Packets, _St} = ?ws_conn:parse_incoming(<<3, 2, 1, 0>>, [], st()),
     FrameError = {frame_error, malformed_packet},

+ 2 - 1
apps/emqx_authn/rebar.config

@@ -6,7 +6,8 @@
     {emqx_connector, {path, "../emqx_connector"}},
     {emqx_mongodb, {path, "../emqx_mongodb"}},
     {emqx_redis, {path, "../emqx_redis"}},
-    {emqx_mysql, {path, "../emqx_mysql"}}
+    {emqx_mysql, {path, "../emqx_mysql"}},
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
 ]}.
 
 {edoc_opts, [{preprocess, true}]}.

+ 3 - 2
apps/emqx_authn/src/emqx_authn.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_authn, [
     {description, "EMQX Authentication"},
-    {vsn, "0.1.22"},
+    {vsn, "0.1.23"},
     {modules, []},
     {registered, [emqx_authn_sup, emqx_authn_registry]},
     {applications, [
@@ -15,7 +15,8 @@
         jose,
         emqx_mongodb,
         emqx_redis,
-        emqx_mysql
+        emqx_mysql,
+        emqx_bridge_http
     ]},
     {mod, {emqx_authn_app, []}},
     {env, []},

+ 3 - 3
apps/emqx_authn/src/simple_authn/emqx_authn_http.erl

@@ -102,7 +102,7 @@ common_fields() ->
                 [
                     pool_type
                 ],
-                maps:from_list(emqx_connector_http:fields(config))
+                maps:from_list(emqx_bridge_http_connector:fields(config))
             )
         ).
 
@@ -185,14 +185,14 @@ create(Config0) ->
     {Config, State} = parse_config(Config0),
     {ok, _Data} = emqx_authn_utils:create_resource(
         ResourceId,
-        emqx_connector_http,
+        emqx_bridge_http_connector,
         Config
     ),
     {ok, State#{resource_id => ResourceId}}.
 
 update(Config0, #{resource_id := ResourceId} = _State) ->
     {Config, NState} = parse_config(Config0),
-    case emqx_authn_utils:update_resource(emqx_connector_http, Config, ResourceId) of
+    case emqx_authn_utils:update_resource(emqx_bridge_http_connector, Config, ResourceId) of
         {error, Reason} ->
             error({load_config_error, Reason});
         {ok, _} ->

+ 2 - 1
apps/emqx_authz/rebar.config

@@ -7,7 +7,8 @@
     {emqx_connector, {path, "../emqx_connector"}},
     {emqx_mongodb, {path, "../emqx_mongodb"}},
     {emqx_redis, {path, "../emqx_redis"}},
-    {emqx_mysql, {path, "../emqx_mysql"}}
+    {emqx_mysql, {path, "../emqx_mysql"}},
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
 ]}.
 
 {shell, [

+ 2 - 1
apps/emqx_authz/src/emqx_authz.app.src

@@ -12,7 +12,8 @@
         emqx_connector,
         emqx_mongodb,
         emqx_redis,
-        emqx_mysql
+        emqx_mysql,
+        emqx_bridge_http
     ]},
     {env, []},
     {modules, []},

+ 1 - 1
apps/emqx_authz/src/emqx_authz_api_schema.erl

@@ -118,7 +118,7 @@ authz_http_common_fields() ->
                 [
                     pool_type
                 ],
-                maps:from_list(emqx_connector_http:fields(config))
+                maps:from_list(emqx_bridge_http_connector:fields(config))
             )
         ).
 

+ 2 - 2
apps/emqx_authz/src/emqx_authz_http.erl

@@ -62,12 +62,12 @@ description() ->
 create(Config) ->
     NConfig = parse_config(Config),
     ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
-    {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_connector_http, NConfig),
+    {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_bridge_http_connector, NConfig),
     NConfig#{annotations => #{id => ResourceId}}.
 
 update(Config) ->
     NConfig = parse_config(Config),
-    case emqx_authz_utils:update_resource(emqx_connector_http, NConfig) of
+    case emqx_authz_utils:update_resource(emqx_bridge_http_connector, NConfig) of
         {error, Reason} -> error({load_config_error, Reason});
         {ok, Id} -> NConfig#{annotations => #{id => Id}}
     end.

+ 2 - 0
apps/emqx_authz/src/emqx_authz_schema.erl

@@ -391,6 +391,8 @@ connector_fields(DB) ->
     connector_fields(DB, config).
 connector_fields(DB, Fields) when DB =:= redis; DB =:= mysql ->
     connector_fields(DB, Fields, emqx);
+connector_fields(DB, Fields) when DB =:= http ->
+    connector_fields(bridge_http_connector, Fields, emqx);
 connector_fields(DB, Fields) ->
     connector_fields(DB, Fields, emqx_connector).
 

+ 4 - 4
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -62,14 +62,14 @@
 -if(?EMQX_RELEASE_EDITION == ee).
 bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
 bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
-bridge_to_resource_type(webhook) -> emqx_connector_http;
+bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
+bridge_to_resource_type(webhook) -> emqx_bridge_http_connector;
 bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType).
 -else.
 bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
 bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
-bridge_to_resource_type(webhook) -> emqx_connector_http.
+bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
+bridge_to_resource_type(webhook) -> emqx_bridge_http_connector.
 -endif.
 
 resource_id(BridgeId) when is_binary(BridgeId) ->

+ 2 - 2
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -53,7 +53,7 @@ api_schema(Method) ->
     Broker = [
         {Type, ref(Mod, Method)}
      || {Type, Mod} <- [
-            {<<"webhook">>, emqx_bridge_webhook_schema},
+            {<<"webhook">>, emqx_bridge_http_schema},
             {<<"mqtt">>, emqx_bridge_mqtt_schema}
         ]
     ],
@@ -158,7 +158,7 @@ fields(bridges) ->
     [
         {webhook,
             mk(
-                hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")),
+                hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
                 #{
                     desc => ?DESC("bridges_webhook"),
                     required => false,

+ 2 - 1
apps/emqx_bridge_gcp_pubsub/rebar.config

@@ -10,7 +10,8 @@
 {deps, [
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
-    {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+    {emqx_bridge, {path, "../../apps/emqx_bridge"}},
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
 ]}.
 
 {xref_checks, [

+ 1 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src

@@ -6,7 +6,7 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
+        emqx_bridge_http,
         ehttpc
     ]},
     {env, []},

+ 2 - 2
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -243,7 +243,7 @@ handle_result(
 ) ->
     ?SLOG(error, #{
         msg => "gcp_pubsub_error_response",
-        request => emqx_connector_http:redact_request(Request),
+        request => emqx_bridge_http_connector:redact_request(Request),
         connector => ResourceId,
         status_code => StatusCode,
         resp_body => RespBody
@@ -252,7 +252,7 @@ handle_result(
 handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ResourceId) ->
     ?SLOG(error, #{
         msg => "gcp_pubsub_error_response",
-        request => emqx_connector_http:redact_request(Request),
+        request => emqx_bridge_http_connector:redact_request(Request),
         connector => ResourceId,
         status_code => StatusCode
     }),

+ 9 - 9
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl

@@ -246,10 +246,10 @@ start_echo_http_server() ->
             {versions, ['tlsv1.2', 'tlsv1.3']},
             {ciphers, ["ECDHE-RSA-AES256-GCM-SHA384", "TLS_CHACHA20_POLY1305_SHA256"]}
         ] ++ certs(),
-    {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
+    {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
         random, HTTPPath, ServerSSLOpts
     ),
-    ok = emqx_connector_web_hook_server:set_handler(success_http_handler()),
+    ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()),
     HTTPHost = "localhost",
     HostPort = HTTPHost ++ ":" ++ integer_to_list(HTTPPort),
     true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
@@ -261,7 +261,7 @@ start_echo_http_server() ->
 
 stop_echo_http_server() ->
     os:unsetenv("PUBSUB_EMULATOR_HOST"),
-    ok = emqx_connector_web_hook_server:stop().
+    ok = emqx_bridge_http_connector_test_server:stop().
 
 certs() ->
     CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
@@ -983,7 +983,7 @@ t_publish_econnrefused(Config) ->
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
     assert_empty_metrics(ResourceId),
-    ok = emqx_connector_web_hook_server:stop(),
+    ok = emqx_bridge_http_connector_test_server:stop(),
     do_econnrefused_or_timeout_test(Config, econnrefused).
 
 t_publish_timeout(Config) ->
@@ -1019,7 +1019,7 @@ t_publish_timeout(Config) ->
             ),
             {ok, Rep, State}
         end,
-    ok = emqx_connector_web_hook_server:set_handler(TimeoutHandler),
+    ok = emqx_bridge_http_connector_test_server:set_handler(TimeoutHandler),
     do_econnrefused_or_timeout_test(Config, timeout).
 
 do_econnrefused_or_timeout_test(Config, Error) ->
@@ -1149,7 +1149,7 @@ t_success_no_body(Config) ->
             ),
             {ok, Rep, State}
         end,
-    ok = emqx_connector_web_hook_server:set_handler(SuccessNoBodyHandler),
+    ok = emqx_bridge_http_connector_test_server:set_handler(SuccessNoBodyHandler),
     Topic = <<"t/topic">>,
     {ok, _} = create_bridge(Config),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
@@ -1187,7 +1187,7 @@ t_failure_with_body(Config) ->
             ),
             {ok, Rep, State}
         end,
-    ok = emqx_connector_web_hook_server:set_handler(FailureWithBodyHandler),
+    ok = emqx_bridge_http_connector_test_server:set_handler(FailureWithBodyHandler),
     Topic = <<"t/topic">>,
     {ok, _} = create_bridge(Config),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
@@ -1225,7 +1225,7 @@ t_failure_no_body(Config) ->
             ),
             {ok, Rep, State}
         end,
-    ok = emqx_connector_web_hook_server:set_handler(FailureNoBodyHandler),
+    ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
     Topic = <<"t/topic">>,
     {ok, _} = create_bridge(Config),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
@@ -1271,7 +1271,7 @@ t_unrecoverable_error(Config) ->
             ),
             {ok, Rep, State}
         end,
-    ok = emqx_connector_web_hook_server:set_handler(FailureNoBodyHandler),
+    ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
     Topic = <<"t/topic">>,
     {ok, _} = create_bridge(Config),
     assert_empty_metrics(ResourceId),

+ 36 - 0
apps/emqx_bridge_http/README.md

@@ -0,0 +1,36 @@
+# EMQX HTTP Broker Bridge
+
+This application enables EMQX to connect to any HTTP API, conforming to the
+HTTP standard. The connection is established via the [HTTP][1] bridge abstraction,
+which facilitates the unidirectional flow of data from EMQX to the HTTP API
+(egress).
+
+Users can define a rule and efficiently transfer data to a remote HTTP API
+utilizing [EMQX Rules][2].
+
+# Documentation
+
+- For instructions on how to use the EMQX dashboard to set up an egress bridge,
+  refer to [Bridge Data into HTTP API][3].
+
+- To understand the EMQX rules engine, please refer to [EMQX Rules][2].
+
+# HTTP APIs
+
+We provide a range of APIs for bridge management. For more detailed
+information, refer to [API Docs -Bridges][4].
+
+# Contributing
+
+For those interested in contributing, please consult our
+[contributing guide](../../CONTRIBUTING.md).
+
+# License
+
+This software is under the Apache License 2.0. For more details, see
+[LICENSE](../../APL.txt).
+
+[1]: https://tools.ietf.org/html/rfc2616
+[2]: https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html
+[3]: https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-webhook.html
+[4]: https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges

+ 10 - 0
apps/emqx_bridge_http/rebar.config

@@ -0,0 +1,10 @@
+%% -*- mode: erlang; -*-
+{erl_opts, [debug_info]}.
+{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+       ]}.
+
+{shell, [
+    {apps, [emqx_bridge_http]}
+]}.

+ 9 - 0
apps/emqx_bridge_http/src/emqx_bridge_http.app.src

@@ -0,0 +1,9 @@
+{application, emqx_bridge_http, [
+    {description, "EMQX HTTP Bridge and Connector Application"},
+    {vsn, "0.1.1"},
+    {registered, []},
+    {applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, ehttpc]},
+    {env, []},
+    {modules, []},
+    {links, []}
+]}.

+ 1 - 1
apps/emqx_connector/src/emqx_connector_http.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_connector_http).
+-module(emqx_bridge_http_connector).
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").

+ 2 - 2
apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl

@@ -13,7 +13,7 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
--module(emqx_bridge_webhook_schema).
+-module(emqx_bridge_http_schema).
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
@@ -68,7 +68,7 @@ basic_config() ->
             )}
     ] ++ webhook_creation_opts() ++
         proplists:delete(
-            max_retries, emqx_connector_http:fields(config)
+            max_retries, emqx_bridge_http_connector:fields(config)
         ).
 
 request_config() ->

+ 7 - 7
apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl

@@ -13,7 +13,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_bridge_webhook_SUITE).
+-module(emqx_bridge_http_SUITE).
 
 %% This suite should contains testcases that are specific for the webhook
 %% bridge. There are also some test cases that implicitly tests the webhook
@@ -64,18 +64,18 @@ init_per_testcase(t_send_async_connection_timeout, Config) ->
 init_per_testcase(t_path_not_found, Config) ->
     HTTPPath = <<"/nonexisting/path">>,
     ServerSSLOpts = false,
-    {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
+    {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
         _Port = random, HTTPPath, ServerSSLOpts
     ),
-    ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()),
+    ok = emqx_bridge_http_connector_test_server:set_handler(not_found_http_handler()),
     [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
 init_per_testcase(t_too_many_requests, Config) ->
     HTTPPath = <<"/path">>,
     ServerSSLOpts = false,
-    {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
+    {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
         _Port = random, HTTPPath, ServerSSLOpts
     ),
-    ok = emqx_connector_web_hook_server:set_handler(too_many_requests_http_handler()),
+    ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()),
     [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
 init_per_testcase(_TestCase, Config) ->
     Server = start_http_server(#{response_delay_ms => 0}),
@@ -85,7 +85,7 @@ end_per_testcase(TestCase, _Config) when
     TestCase =:= t_path_not_found;
     TestCase =:= t_too_many_requests
 ->
-    ok = emqx_connector_web_hook_server:stop(),
+    ok = emqx_bridge_http_connector_test_server:stop(),
     persistent_term:erase({?MODULE, times_called}),
     emqx_bridge_testlib:delete_all_bridges(),
     emqx_common_test_helpers:call_janitor(),
@@ -552,7 +552,7 @@ do_t_async_retries(TestContext, Error, Fn) ->
         Attempts + 1
     end,
     emqx_common_test_helpers:with_mock(
-        emqx_connector_http,
+        emqx_bridge_http_connector,
         reply_delegator,
         fun(Context, ReplyFunAndArgs, Result) ->
             Attempts = GetAndBump(),

+ 1 - 1
apps/emqx_connector/test/emqx_connector_web_hook_server.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_connector_web_hook_server).
+-module(emqx_bridge_http_connector_test_server).
 
 -compile([nowarn_export_all, export_all]).
 

+ 3 - 3
apps/emqx_connector/test/emqx_connector_http_tests.erl

@@ -13,7 +13,7 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
--module(emqx_connector_http_tests).
+-module(emqx_bridge_http_connector_tests).
 
 -include_lib("eunit/include/eunit.hrl").
 
@@ -47,10 +47,10 @@ wrap_auth_headers_test_() ->
                     headers => auth_headers()
                 }
             },
-            {ok, #{request := #{headers := Headers}} = State} = emqx_connector_http:on_start(
+            {ok, #{request := #{headers := Headers}} = State} = emqx_bridge_http_connector:on_start(
                 <<"test">>, Config
             ),
-            {ok, 200, Req} = emqx_connector_http:on_query(foo, {send_message, #{}}, State),
+            {ok, 200, Req} = emqx_bridge_http_connector:on_query(foo, {send_message, #{}}, State),
             Tests =
                 [
                     ?_assert(is_wrapped(V))

+ 2 - 1
apps/emqx_bridge_iotdb/rebar.config

@@ -8,7 +8,8 @@
         {emqx, {path, "../../apps/emqx"}},
         {emqx_connector, {path, "../../apps/emqx_connector"}},
         {emqx_resource, {path, "../../apps/emqx_resource"}},
-        {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+        {emqx_bridge, {path, "../../apps/emqx_bridge"}},
+        {emqx_bridge_http, {path, "../emqx_bridge_http"}}
 ]}.
 {plugins, [rebar3_path_deps]}.
 {project_plugins, [erlfmt]}.

+ 1 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src

@@ -11,7 +11,7 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
+        emqx_bridge_http,
         %% for module emqx_connector_http
         emqx_connector
     ]},

+ 1 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl

@@ -118,7 +118,7 @@ basic_config() ->
     ] ++ resource_creation_opts() ++
         proplists_without(
             [max_retries, base_url, request],
-            emqx_connector_http:fields(config)
+            emqx_bridge_http_connector:fields(config)
         ).
 
 proplists_without(Keys, List) ->

+ 5 - 5
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl

@@ -65,7 +65,7 @@ callback_mode() -> async_if_possible.
 on_start(InstanceId, Config) ->
     %% [FIXME] The configuration passed in here is pre-processed and transformed
     %% in emqx_bridge_resource:parse_confs/2.
-    case emqx_connector_http:on_start(InstanceId, Config) of
+    case emqx_bridge_http_connector:on_start(InstanceId, Config) of
         {ok, State} ->
             ?SLOG(info, #{
                 msg => "iotdb_bridge_started",
@@ -90,14 +90,14 @@ on_stop(InstanceId, State) ->
         msg => "stopping_iotdb_bridge",
         connector => InstanceId
     }),
-    Res = emqx_connector_http:on_stop(InstanceId, State),
+    Res = emqx_bridge_http_connector:on_stop(InstanceId, State),
     ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
     Res.
 
 -spec on_get_status(manager_id(), state()) ->
     {connected, state()} | {disconnected, state(), term()}.
 on_get_status(InstanceId, State) ->
-    emqx_connector_http:on_get_status(InstanceId, State).
+    emqx_bridge_http_connector:on_get_status(InstanceId, State).
 
 -spec on_query(manager_id(), {send_message, map()}, state()) ->
     {ok, pos_integer(), [term()], term()}
@@ -114,7 +114,7 @@ on_query(InstanceId, {send_message, Message}, State) ->
     case make_iotdb_insert_request(Message, State) of
         {ok, IoTDBPayload} ->
             handle_response(
-                emqx_connector_http:on_query(
+                emqx_bridge_http_connector:on_query(
                     InstanceId, {send_message, IoTDBPayload}, State
                 )
             );
@@ -142,7 +142,7 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
                     end,
                     []
                 },
-            emqx_connector_http:on_query_async(
+            emqx_bridge_http_connector:on_query_async(
                 InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
             );
         Error ->

+ 2 - 1
apps/emqx_dashboard/rebar.config

@@ -2,7 +2,8 @@
 
 {deps, [
     {emqx, {path, "../emqx"}},
-    {emqx_utils, {path, "../emqx_utils"}}
+    {emqx_utils, {path, "../emqx_utils"}},
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
 ]}.
 
 {edoc_opts, [{preprocess, true}]}.

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard.app.src

@@ -5,7 +5,7 @@
     {vsn, "5.0.25"},
     {modules, []},
     {registered, [emqx_dashboard_sup]},
-    {applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl]},
+    {applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]},
     {mod, {emqx_dashboard_app, []}},
     {env, []},
     {licenses, ["Apache-2.0"]},

+ 3 - 2
apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl

@@ -647,8 +647,9 @@ schema("/ref/complex_type") ->
                     {no_neg_integer, hoconsc:mk(non_neg_integer(), #{})},
                     {url, hoconsc:mk(url(), #{})},
                     {server, hoconsc:mk(emqx_schema:ip_port(), #{})},
-                    {connect_timeout, hoconsc:mk(emqx_connector_http:connect_timeout(), #{})},
-                    {pool_type, hoconsc:mk(emqx_connector_http:pool_type(), #{})},
+                    {connect_timeout,
+                        hoconsc:mk(emqx_bridge_http_connector:connect_timeout(), #{})},
+                    {pool_type, hoconsc:mk(emqx_bridge_http_connector:pool_type(), #{})},
                     {timeout, hoconsc:mk(timeout(), #{})},
                     {bytesize, hoconsc:mk(emqx_schema:bytesize(), #{})},
                     {wordsize, hoconsc:mk(emqx_schema:wordsize(), #{})},

+ 1 - 1
apps/emqx_enterprise/src/emqx_enterprise.app.src

@@ -1,6 +1,6 @@
 {application, emqx_enterprise, [
     {description, "EMQX Enterprise Edition"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 2
apps/emqx_enterprise/src/emqx_enterprise_schema.erl

@@ -10,7 +10,7 @@
 
 -define(EE_SCHEMA_MODULES, [
     emqx_license_schema,
-    emqx_ee_schema_registry_schema,
+    emqx_schema_registry_schema,
     emqx_ft_schema
 ]).
 
@@ -35,7 +35,7 @@ desc(Name) ->
     ee_delegate(desc, ?EE_SCHEMA_MODULES, Name).
 
 validations() ->
-    emqx_conf_schema:validations().
+    emqx_conf_schema:validations() ++ emqx_license_schema:validations().
 
 %%------------------------------------------------------------------------------
 %% helpers

+ 1 - 0
apps/emqx_machine/rebar.config

@@ -2,6 +2,7 @@
 
 {deps, [
     {emqx, {path, "../emqx"}},
+    {emqx_dashboard, {path, "../emqx_dashboard"}},
     {emqx_utils, {path, "../emqx_utils"}}
 ]}.
 

+ 1 - 1
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -159,7 +159,7 @@ basic_reboot_apps_edition(ee) ->
         emqx_ft,
         emqx_eviction_agent,
         emqx_node_rebalance,
-        emqx_ee_schema_registry
+        emqx_schema_registry
     ];
 %% unexcepted edition, should not happen
 basic_reboot_apps_edition(_) ->

+ 2 - 1
apps/emqx_management/rebar.config

@@ -2,7 +2,8 @@
 
 {deps, [
     {emqx, {path, "../emqx"}},
-    {emqx_utils, {path, "../emqx_utils"}}
+    {emqx_utils, {path, "../emqx_utils"}},
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
 ]}.
 
 {edoc_opts, [{preprocess, true}]}.

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -5,7 +5,7 @@
     {vsn, "5.0.26"},
     {modules, []},
     {registered, [emqx_management_sup]},
-    {applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl]},
+    {applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl, emqx_bridge_http]},
     {mod, {emqx_mgmt_app, []}},
     {env, []},
     {licenses, ["Apache-2.0"]},

+ 47 - 8
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -29,7 +29,9 @@
 %% first_next query APIs
 -export([
     node_query/6,
+    node_query/7,
     cluster_query/5,
+    cluster_query/6,
     b2i/1
 ]).
 
@@ -57,6 +59,18 @@
     fun((node(), term()) -> term())
     | fun((term()) -> term()).
 
+-type query_options() :: #{
+    %% Whether to use `ets:info/2` to get the total number of rows when the query conditions are
+    %% empty. It can significantly improves the speed of the query when the table stored large
+    %% amounts of data.
+    %%
+    %% However, it may cause the total number of rows to be inaccurate when the table stored in
+    %% multiple schemas of data, i.e: Built-in Authorization
+    %%
+    %% Default: false
+    fast_total_counting => boolean()
+}.
+
 -type query_return() :: #{meta := map(), data := [term()]}.
 
 -export([do_query/2, apply_total_query/1]).
@@ -114,13 +128,25 @@ limit(Params) when is_map(Params) ->
     format_result_fun()
 ) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
 node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) ->
+    node_query(Node, Tab, QString, QSchema, MsFun, FmtFun, #{}).
+
+-spec node_query(
+    node(),
+    atom(),
+    query_params(),
+    query_schema(),
+    query_to_match_spec_fun(),
+    format_result_fun(),
+    query_options()
+) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
+node_query(Node, Tab, QString, QSchema, MsFun, FmtFun, Options) ->
     case parse_pager_params(QString) of
         false ->
             {error, page_limit_invalid};
         Meta ->
             {_CodCnt, NQString} = parse_qstring(QString, QSchema),
             ResultAcc = init_query_result(),
-            QueryState = init_query_state(Tab, NQString, MsFun, Meta),
+            QueryState = init_query_state(Tab, NQString, MsFun, Meta, Options),
             NResultAcc = do_node_query(
                 Node, QueryState, ResultAcc
             ),
@@ -158,6 +184,17 @@ do_node_query(
     format_result_fun()
 ) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
 cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
+    cluster_query(Tab, QString, QSchema, MsFun, FmtFun, #{}).
+
+-spec cluster_query(
+    atom(),
+    query_params(),
+    query_schema(),
+    query_to_match_spec_fun(),
+    format_result_fun(),
+    query_options()
+) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
+cluster_query(Tab, QString, QSchema, MsFun, FmtFun, Options) ->
     case parse_pager_params(QString) of
         false ->
             {error, page_limit_invalid};
@@ -165,7 +202,7 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
             {_CodCnt, NQString} = parse_qstring(QString, QSchema),
             Nodes = emqx:running_nodes(),
             ResultAcc = init_query_result(),
-            QueryState = init_query_state(Tab, NQString, MsFun, Meta),
+            QueryState = init_query_state(Tab, NQString, MsFun, Meta, Options),
             NResultAcc = do_cluster_query(
                 Nodes, QueryState, ResultAcc
             ),
@@ -231,9 +268,10 @@ collect_total_from_tail_nodes(Nodes, QueryState = #{total := TotalAcc}) ->
 %%    table := atom(),
 %%    qs := {Qs, Fuzzy},  %% parsed query params
 %%    msfun := query_to_match_spec_fun(),
-%%    complete := boolean()
+%%    complete := boolean(),
+%%    options := query_options()
 %%    }
-init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) ->
+init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}, Options) ->
     #{match_spec := Ms, fuzzy_fun := FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
     %% assert FuzzyFun type
     _ =
@@ -252,7 +290,8 @@ init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) -
         msfun => MsFun,
         match_spec => Ms,
         fuzzy_fun => FuzzyFun,
-        complete => false
+        complete => false,
+        options => Options
     },
     case counting_total_fun(QueryState) of
         false ->
@@ -355,6 +394,8 @@ apply_total_query(QueryState = #{table := Tab}) ->
             Fun(Tab)
     end.
 
+counting_total_fun(_QueryState = #{qs := {[], []}, options := #{fast_total_counting := true}}) ->
+    fun(Tab) -> ets:info(Tab, size) end;
 counting_total_fun(_QueryState = #{match_spec := Ms, fuzzy_fun := undefined}) ->
     %% XXX: Calculating the total number of data that match a certain
     %% condition under a large table is very expensive because the
@@ -373,9 +414,7 @@ counting_total_fun(_QueryState = #{match_spec := Ms, fuzzy_fun := undefined}) ->
 counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= undefined ->
     %% XXX: Calculating the total number for a fuzzy searching is very very expensive
     %% so it is not supported now
-    false;
-counting_total_fun(_QueryState = #{qs := {[], []}}) ->
-    fun(Tab) -> ets:info(Tab, size) end.
+    false.
 
 %% ResultAcc :: #{count := integer(),
 %%                cursor := integer(),

+ 3 - 1
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -661,12 +661,14 @@ list_clients(QString) ->
     Result =
         case maps:get(<<"node">>, QString, undefined) of
             undefined ->
+                Options = #{fast_total_counting => true},
                 emqx_mgmt_api:cluster_query(
                     ?CHAN_INFO_TAB,
                     QString,
                     ?CLIENT_QSCHEMA,
                     fun ?MODULE:qs2ms/2,
-                    fun ?MODULE:format_channel_info/2
+                    fun ?MODULE:format_channel_info/2,
+                    Options
                 );
             Node0 ->
                 case emqx_utils:safe_to_existing_atom(Node0) of

+ 87 - 1
apps/emqx_management/src/emqx_mgmt_api_cluster.erl

@@ -19,9 +19,17 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -export([api_spec/0, fields/1, paths/0, schema/1, namespace/0]).
--export([cluster_info/2, invite_node/2, force_leave/2, join/1]).
+-export([
+    cluster_info/2,
+    cluster_topology/2,
+    invite_node/2,
+    force_leave/2,
+    join/1,
+    connected_replicants/0
+]).
 
 namespace() -> "cluster".
 
@@ -31,6 +39,7 @@ api_spec() ->
 paths() ->
     [
         "/cluster",
+        "/cluster/topology",
         "/cluster/:node/invite",
         "/cluster/:node/force_leave"
     ].
@@ -50,6 +59,17 @@ schema("/cluster") ->
             }
         }
     };
+schema("/cluster/topology") ->
+    #{
+        'operationId' => cluster_topology,
+        get => #{
+            desc => ?DESC(get_cluster_topology),
+            tags => [<<"Cluster">>],
+            responses => #{
+                200 => ?HOCON(?ARRAY(?REF(core_replicants)), #{desc => <<"Cluster topology">>})
+            }
+        }
+    };
 schema("/cluster/:node/invite") ->
     #{
         'operationId' => invite_node,
@@ -89,6 +109,28 @@ fields(node) ->
                     validator => fun validate_node/1
                 }
             )}
+    ];
+fields(replicant_info) ->
+    [
+        {node,
+            ?HOCON(
+                atom(),
+                #{desc => <<"Replicant node name">>, example => <<"emqx-replicant@127.0.0.2">>}
+            )},
+        {streams,
+            ?HOCON(
+                non_neg_integer(),
+                #{desc => <<"The number of RLOG (replicated log) streams">>, example => <<"10">>}
+            )}
+    ];
+fields(core_replicants) ->
+    [
+        {core_node,
+            ?HOCON(
+                atom(),
+                #{desc => <<"Core node name">>, example => <<"emqx-core@127.0.0.1">>}
+            )},
+        {replicant_nodes, ?HOCON(?ARRAY(?REF(replicant_info)))}
     ].
 
 validate_node(Node) ->
@@ -106,6 +148,46 @@ cluster_info(get, _) ->
     },
     {200, Info}.
 
+cluster_topology(get, _) ->
+    RunningCores = running_cores(),
+    {Replicants, BadNodes} = emqx_mgmt_cluster_proto_v2:connected_replicants(RunningCores),
+    CoreReplicants = lists:zip(
+        lists:filter(
+            fun(N) -> not lists:member(N, BadNodes) end,
+            RunningCores
+        ),
+        Replicants
+    ),
+    Topology = lists:map(
+        fun
+            ({Core, {badrpc, Reason}}) ->
+                ?SLOG(error, #{
+                    msg => "failed_to_get_replicant_nodes",
+                    core_node => Core,
+                    reason => Reason
+                }),
+                #{core_node => Core, replicant_nodes => []};
+            ({Core, Repls}) ->
+                #{core_node => Core, replicant_nodes => format_replicants(Repls)}
+        end,
+        CoreReplicants
+    ),
+    BadNodes =/= [] andalso ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes}),
+    {200, Topology}.
+
+format_replicants(Replicants) ->
+    maps:fold(
+        fun(K, V, Acc) ->
+            [#{node => K, streams => length(V)} | Acc]
+        end,
+        [],
+        maps:groups_from_list(fun({_, N, _}) -> N end, Replicants)
+    ).
+
+running_cores() ->
+    Running = emqx:running_nodes(),
+    lists:filter(fun(C) -> lists:member(C, Running) end, emqx:cluster_nodes(cores)).
+
 invite_node(put, #{bindings := #{node := Node0}}) ->
     Node = ekka_node:parse_name(binary_to_list(Node0)),
     case emqx_mgmt_cluster_proto_v1:invite_node(Node, node()) of
@@ -134,5 +216,9 @@ force_leave(delete, #{bindings := #{node := Node0}}) ->
 join(Node) ->
     ekka:join(Node).
 
+-spec connected_replicants() -> [{atom(), node(), pid()}].
+connected_replicants() ->
+    mria_status:agents().
+
 error_message(Msg) ->
     iolist_to_binary(io_lib:format("~p", [Msg])).

+ 38 - 0
apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v2.erl

@@ -0,0 +1,38 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_cluster_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    invite_node/2,
+    connected_replicants/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.1.1".
+
+-spec invite_node(node(), node()) -> ok | ignore | {error, term()} | emqx_rpc:badrpc().
+invite_node(Node, Self) ->
+    rpc:call(Node, emqx_mgmt_api_cluster, join, [Self], 5000).
+
+-spec connected_replicants([node()]) -> emqx_rpc:multicall_result().
+connected_replicants(Nodes) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_cluster, connected_replicants, [], 30_000).

+ 102 - 0
apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl

@@ -0,0 +1,102 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_cluster_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(APPS, [emqx_conf, emqx_management]).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_) ->
+    ok.
+
+init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) ->
+    Config = [{tc_name, TC} | Config0],
+    [{cluster, cluster(Config)} | setup(Config)];
+init_per_testcase(_TC, Config) ->
+    emqx_mgmt_api_test_util:init_suite(?APPS),
+    Config.
+
+end_per_testcase(t_cluster_topology_api_replicants, Config) ->
+    emqx_cth_cluster:stop(?config(cluster, Config)),
+    cleanup(Config);
+end_per_testcase(_TC, _Config) ->
+    emqx_mgmt_api_test_util:end_suite(?APPS).
+
+t_cluster_topology_api_empty_resp(_) ->
+    ClusterTopologyPath = emqx_mgmt_api_test_util:api_path(["cluster", "topology"]),
+    {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, ClusterTopologyPath),
+    ?assertEqual(
+        [#{<<"core_node">> => atom_to_binary(node()), <<"replicant_nodes">> => []}],
+        emqx_utils_json:decode(Resp, [return_maps])
+    ).
+
+t_cluster_topology_api_replicants(Config) ->
+    %% some time to stabilize
+    timer:sleep(3000),
+    [Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config),
+    {200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    {200, Core2Resp} = rpc:call(Core2, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    {200, ReplResp} = rpc:call(Replicant, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    [
+        ?assertMatch(
+            [
+                #{
+                    core_node := Core1,
+                    replicant_nodes :=
+                        [#{node := Replicant, streams := _}]
+                },
+                #{
+                    core_node := Core2,
+                    replicant_nodes :=
+                        [#{node := Replicant, streams := _}]
+                }
+            ],
+            Resp
+        )
+     || Resp <- [lists:sort(R) || R <- [Core1Resp, Core2Resp, ReplResp]]
+    ].
+
+cluster(Config) ->
+    Nodes = emqx_cth_cluster:start(
+        [
+            {data_backup_core1, #{role => core, apps => ?APPS}},
+            {data_backup_core2, #{role => core, apps => ?APPS}},
+            {data_backup_replicant, #{role => replicant, apps => ?APPS}}
+        ],
+        #{work_dir => work_dir(Config)}
+    ),
+    Nodes.
+
+setup(Config) ->
+    WorkDir = filename:join(work_dir(Config), local),
+    Started = emqx_cth_suite:start(?APPS, #{work_dir => WorkDir}),
+    [{suite_apps, Started} | Config].
+
+cleanup(Config) ->
+    emqx_cth_suite:stop(?config(suite_apps, Config)).
+
+work_dir(Config) ->
+    filename:join(?config(priv_dir, Config), ?config(tc_name, Config)).

+ 1 - 1
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -153,7 +153,7 @@ api_path_without_base_path(Parts) ->
 join_http_path([]) ->
     [];
 join_http_path([Part | Rest]) ->
-    lists:foldl(fun(P, Acc) -> emqx_connector_http:join_paths(Acc, P) end, Part, Rest).
+    lists:foldl(fun(P, Acc) -> emqx_bridge_http_connector:join_paths(Acc, P) end, Part, Rest).
 
 %% Usage:
 %% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>,

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -1148,7 +1148,7 @@ timezone_to_offset_seconds(TimeZone) ->
 '$handle_undefined_function'(sprintf, [Format | Args]) ->
     erlang:apply(fun sprintf_s/2, [Format, Args]);
 %% This is for functions that should be handled in another module
-%% (currently this module is emqx_ee_schema_registry_serde in the case of EE but
+%% (currently this module is emqx_schema_registry_serde in the case of EE but
 %% could be changed to another module in the future).
 '$handle_undefined_function'(FunctionName, Args) ->
     case emqx_rule_engine:extra_functions_module() of

+ 2 - 1
apps/emqx_s3/rebar.config

@@ -1,6 +1,7 @@
 {deps, [
     {emqx, {path, "../../apps/emqx"}},
-    {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}}
+    {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}},
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
 ]}.
 
 {project_plugins, [erlfmt]}.

+ 2 - 1
apps/emqx_s3/src/emqx_s3.app.src

@@ -8,7 +8,8 @@
         stdlib,
         gproc,
         erlcloud,
-        ehttpc
+        ehttpc,
+        emqx_bridge_http
     ]},
     {mod, {emqx_s3_app, []}}
 ]}.

+ 3 - 2
apps/emqx_s3/src/emqx_s3_schema.erl

@@ -136,10 +136,11 @@ fields(transport_options) ->
             )}
     ] ++
         props_without(
-            [base_url, max_retries, retry_interval, request], emqx_connector_http:fields(config)
+            [base_url, max_retries, retry_interval, request],
+            emqx_bridge_http_connector:fields(config)
         ) ++
         props_with(
-            [headers, max_retries, request_timeout], emqx_connector_http:fields("request")
+            [headers, max_retries, request_timeout], emqx_bridge_http_connector:fields("request")
         ).
 
 desc(s3) ->

+ 1 - 1
apps/emqx_s3/test/emqx_s3_SUITE.erl

@@ -14,7 +14,7 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    {ok, _} = application:ensure_all_started(emqx_s3),
+    emqx_common_test_helpers:start_apps([emqx_s3]),
     Config.
 
 end_per_suite(_Config) ->

+ 94 - 0
apps/emqx_schema_registry/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 15 - 0
lib-ee/emqx_ee_schema_registry/README.md

@@ -71,3 +71,18 @@ WHERE
 - Register schema instance: adds a new instance of a schema of a
   certain type.  For example, when the user may have several Avro or
   Protobuf schemas that they wish to use with different data flows.
+
+# Documentation
+
+- Refer to [Introduction to Schema Registry](https://docs.emqx.com/en/enterprise/v5.1/data-integration/schema-registry.html) for how to use the EMQX dashboard for configuring schemas.
+
+- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)
+  for the EMQX rules engine introduction.
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

lib-ee/emqx_ee_schema_registry/etc/emqx_ee_schema_registry.conf → apps/emqx_schema_registry/etc/emqx_schema_registry.conf


+ 3 - 2
lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl

@@ -2,12 +2,13 @@
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--ifndef(EMQX_EE_SCHEMA_REGISTRY_HRL).
--define(EMQX_EE_SCHEMA_REGISTRY_HRL, true).
+-ifndef(EMQX_SCHEMA_REGISTRY_HRL).
+-define(EMQX_SCHEMA_REGISTRY_HRL, true).
 
 -define(CONF_KEY_ROOT, schema_registry).
 -define(CONF_KEY_PATH, [?CONF_KEY_ROOT]).
 
+%% Note: this has the `_ee_' segment for backwards compatibility.
 -define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard).
 -define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
 -define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab).

lib-ee/emqx_ee_schema_registry/priv/LICENSE → apps/emqx_schema_registry/priv/LICENSE


lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto → apps/emqx_schema_registry/priv/sparkplug_b.proto


+ 4 - 4
lib-ee/emqx_ee_schema_registry/rebar.config

@@ -2,14 +2,14 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-  {emqx, {path, "../../apps/emqx"}},
-  {emqx_utils, {path, "../../apps/emqx_utils"}},
-  {emqx_rule_engine, {path, "../../apps/emqx_rule_engine"}},
+  {emqx, {path, "../emqx"}},
+  {emqx_utils, {path, "../emqx_utils"}},
+  {emqx_rule_engine, {path, "../emqx_rule_engine"}},
   {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}},
   {gpb, "4.19.7"}
 ]}.
 
 {shell, [
   % {config, "config/sys.config"},
-    {apps, [emqx_ee_schema_registry]}
+    {apps, [emqx_schema_registry]}
 ]}.

+ 3 - 3
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src

@@ -1,8 +1,8 @@
-{application, emqx_ee_schema_registry, [
+{application, emqx_schema_registry, [
     {description, "EMQX Schema Registry"},
     {vsn, "0.1.5"},
-    {registered, [emqx_ee_schema_registry_sup]},
-    {mod, {emqx_ee_schema_registry_app, []}},
+    {registered, [emqx_schema_registry_sup]},
+    {mod, {emqx_schema_registry_app, []}},
     {included_applications, [
         emqx_rule_engine
     ]},

+ 3 - 3
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl

@@ -1,13 +1,13 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ee_schema_registry).
+-module(emqx_schema_registry).
 
 -behaviour(gen_server).
 -behaviour(emqx_config_handler).
 -behaviour(emqx_config_backup).
 
--include("emqx_ee_schema_registry.hrl").
+-include("emqx_schema_registry.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
@@ -291,7 +291,7 @@ do_build_serde(Name, Serde) when not is_binary(Name) ->
 do_build_serde(Name, #{type := Type, source := Source}) ->
     try
         {Serializer, Deserializer, Destructor} =
-            emqx_ee_schema_registry_serde:make_serde(Type, Name, Source),
+            emqx_schema_registry_serde:make_serde(Type, Name, Source),
         Serde = #serde{
             name = Name,
             serializer = Serializer,

+ 6 - 6
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl

@@ -1,24 +1,24 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ee_schema_registry_app).
+-module(emqx_schema_registry_app).
 
 -behaviour(application).
 
--include("emqx_ee_schema_registry.hrl").
+-include("emqx_schema_registry.hrl").
 
 -export([start/2, stop/1]).
 
 start(_StartType, _StartArgs) ->
     %% Register rule engine extra functions module so that we can handle decode
     %% and encode functions called from the rule engine SQL like language
-    ok = emqx_rule_engine:set_extra_functions_module(emqx_ee_schema_registry_serde),
+    ok = emqx_rule_engine:set_extra_functions_module(emqx_schema_registry_serde),
     ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
     %% HTTP API handler
-    emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry),
+    emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_schema_registry),
     %% Conf load / data import handler
-    emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry),
-    emqx_ee_schema_registry_sup:start_link().
+    emqx_conf:add_handler(?CONF_KEY_PATH, emqx_schema_registry),
+    emqx_schema_registry_sup:start_link().
 
 stop(_State) ->
     emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']),

+ 18 - 18
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_http_api.erl

@@ -1,11 +1,11 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ee_schema_registry_http_api).
+-module(emqx_schema_registry_http_api).
 
 -behaviour(minirest_api).
 
--include("emqx_ee_schema_registry.hrl").
+-include("emqx_schema_registry.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_utils/include/emqx_utils_api.hrl").
@@ -48,7 +48,7 @@ schema("/schema_registry") ->
                 #{
                     200 =>
                         emqx_dashboard_swagger:schema_with_examples(
-                            hoconsc:array(emqx_ee_schema_registry_schema:api_schema("get")),
+                            hoconsc:array(emqx_schema_registry_schema:api_schema("get")),
                             #{
                                 sample =>
                                     #{value => sample_list_schemas_response()}
@@ -61,14 +61,14 @@ schema("/schema_registry") ->
             summary => <<"Register a new schema">>,
             description => ?DESC("desc_schema_registry_api_post"),
             'requestBody' => emqx_dashboard_swagger:schema_with_examples(
-                emqx_ee_schema_registry_schema:api_schema("post"),
+                emqx_schema_registry_schema:api_schema("post"),
                 post_examples()
             ),
             responses =>
                 #{
                     201 =>
                         emqx_dashboard_swagger:schema_with_examples(
-                            emqx_ee_schema_registry_schema:api_schema("post"),
+                            emqx_schema_registry_schema:api_schema("post"),
                             post_examples()
                         ),
                     400 => error_schema('ALREADY_EXISTS', "Schema already exists")
@@ -87,7 +87,7 @@ schema("/schema_registry/:name") ->
                 #{
                     200 =>
                         emqx_dashboard_swagger:schema_with_examples(
-                            emqx_ee_schema_registry_schema:api_schema("get"),
+                            emqx_schema_registry_schema:api_schema("get"),
                             get_examples()
                         ),
                     404 => error_schema('NOT_FOUND', "Schema not found")
@@ -99,14 +99,14 @@ schema("/schema_registry/:name") ->
             description => ?DESC("desc_schema_registry_api_put"),
             parameters => [param_path_schema_name()],
             'requestBody' => emqx_dashboard_swagger:schema_with_examples(
-                emqx_ee_schema_registry_schema:api_schema("put"),
+                emqx_schema_registry_schema:api_schema("put"),
                 post_examples()
             ),
             responses =>
                 #{
                     200 =>
                         emqx_dashboard_swagger:schema_with_examples(
-                            emqx_ee_schema_registry_schema:api_schema("put"),
+                            emqx_schema_registry_schema:api_schema("put"),
                             put_examples()
                         ),
                     404 => error_schema('NOT_FOUND', "Schema not found")
@@ -130,7 +130,7 @@ schema("/schema_registry/:name") ->
 %%-------------------------------------------------------------------------------------------------
 
 '/schema_registry'(get, _Params) ->
-    Schemas = emqx_ee_schema_registry:list_schemas(),
+    Schemas = emqx_schema_registry:list_schemas(),
     Response =
         lists:map(
             fun({Name, Params}) ->
@@ -141,11 +141,11 @@ schema("/schema_registry/:name") ->
     ?OK(Response);
 '/schema_registry'(post, #{body := Params0 = #{<<"name">> := Name}}) ->
     Params = maps:without([<<"name">>], Params0),
-    case emqx_ee_schema_registry:get_schema(Name) of
+    case emqx_schema_registry:get_schema(Name) of
         {error, not_found} ->
-            case emqx_ee_schema_registry:add_schema(Name, Params) of
+            case emqx_schema_registry:add_schema(Name, Params) of
                 ok ->
-                    {ok, Res} = emqx_ee_schema_registry:get_schema(Name),
+                    {ok, Res} = emqx_schema_registry:get_schema(Name),
                     {201, Res#{name => Name}};
                 {error, Error} ->
                     ?BAD_REQUEST(Error)
@@ -155,31 +155,31 @@ schema("/schema_registry/:name") ->
     end.
 
 '/schema_registry/:name'(get, #{bindings := #{name := Name}}) ->
-    case emqx_ee_schema_registry:get_schema(Name) of
+    case emqx_schema_registry:get_schema(Name) of
         {error, not_found} ->
             ?NOT_FOUND(<<"Schema not found">>);
         {ok, Schema} ->
             ?OK(Schema#{name => Name})
     end;
 '/schema_registry/:name'(put, #{bindings := #{name := Name}, body := Params}) ->
-    case emqx_ee_schema_registry:get_schema(Name) of
+    case emqx_schema_registry:get_schema(Name) of
         {error, not_found} ->
             ?NOT_FOUND(<<"Schema not found">>);
         {ok, _} ->
-            case emqx_ee_schema_registry:add_schema(Name, Params) of
+            case emqx_schema_registry:add_schema(Name, Params) of
                 ok ->
-                    {ok, Res} = emqx_ee_schema_registry:get_schema(Name),
+                    {ok, Res} = emqx_schema_registry:get_schema(Name),
                     ?OK(Res#{name => Name});
                 {error, Error} ->
                     ?BAD_REQUEST(Error)
             end
     end;
 '/schema_registry/:name'(delete, #{bindings := #{name := Name}}) ->
-    case emqx_ee_schema_registry:get_schema(Name) of
+    case emqx_schema_registry:get_schema(Name) of
         {error, not_found} ->
             ?NOT_FOUND(<<"Schema not found">>);
         {ok, _} ->
-            case emqx_ee_schema_registry:delete_schema(Name) of
+            case emqx_schema_registry:delete_schema(Name) of
                 ok ->
                     ?NO_CONTENT;
                 {error, Error} ->

+ 2 - 2
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl

@@ -2,11 +2,11 @@
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_ee_schema_registry_schema).
+-module(emqx_schema_registry_schema).
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
--include("emqx_ee_schema_registry.hrl").
+-include("emqx_schema_registry.hrl").
 
 %% `hocon_schema' API
 -export([

+ 5 - 5
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl

@@ -1,15 +1,15 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ee_schema_registry_serde).
+-module(emqx_schema_registry_serde).
 
 -behaviour(emqx_rule_funcs).
 
--include("emqx_ee_schema_registry.hrl").
+-include("emqx_schema_registry.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_ee_schema_registry_serde]}}]).
+-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_schema_registry_serde]}}]).
 
 %% API
 -export([
@@ -67,7 +67,7 @@ decode(SerdeName, RawData) ->
 
 -spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
 decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
-    case emqx_ee_schema_registry:get_serde(SerdeName) of
+    case emqx_schema_registry:get_serde(SerdeName) of
         {error, not_found} ->
             error({serde_not_found, SerdeName});
         {ok, #{deserializer := Deserializer}} ->
@@ -80,7 +80,7 @@ encode(SerdeName, RawData) ->
 
 -spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
 encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) ->
-    case emqx_ee_schema_registry:get_serde(SerdeName) of
+    case emqx_schema_registry:get_serde(SerdeName) of
         {error, not_found} ->
             error({serde_not_found, SerdeName});
         {ok, #{serializer := Serializer}} ->

+ 3 - 4
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_sup.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ee_schema_registry_sup).
+-module(emqx_schema_registry_sup).
 
 -behaviour(supervisor).
 
@@ -29,7 +29,7 @@ init([]) ->
         intensity => 10,
         period => 100
     },
-    ChildSpecs = [child_spec(emqx_ee_schema_registry)],
+    ChildSpecs = [child_spec(emqx_schema_registry)],
     {ok, {SupFlags, ChildSpecs}}.
 
 child_spec(Mod) ->
@@ -38,6 +38,5 @@ child_spec(Mod) ->
         start => {Mod, start_link, []},
         restart => permanent,
         shutdown => 5_000,
-        type => worker,
-        modules => [Mod]
+        type => worker
     }.

+ 41 - 27
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ee_schema_registry_SUITE).
+-module(emqx_schema_registry_SUITE).
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -10,11 +10,11 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--include("emqx_ee_schema_registry.hrl").
+-include("emqx_schema_registry.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
--define(APPS, [emqx_conf, emqx_rule_engine, emqx_ee_schema_registry]).
+-define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
 
 %%------------------------------------------------------------------------------
 %% CT boilerplate
@@ -46,7 +46,7 @@ sparkplug_tests() ->
     ].
 
 init_per_suite(Config) ->
-    emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
+    emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema),
     emqx_mgmt_api_test_util:init_suite(?APPS),
     Config.
 
@@ -156,7 +156,7 @@ schema_params(protobuf) ->
 
 create_serde(SerdeType, SerdeName) ->
     Schema = schema_params(SerdeType),
-    ok = emqx_ee_schema_registry:add_schema(SerdeName, Schema),
+    ok = emqx_schema_registry:add_schema(SerdeName, Schema),
     ok.
 
 test_params_for(avro, encode_decode1) ->
@@ -313,9 +313,9 @@ test_params_for(Type, Name) ->
 clear_schemas() ->
     maps:foreach(
         fun(Name, _Schema) ->
-            ok = emqx_ee_schema_registry:delete_schema(Name)
+            ok = emqx_schema_registry:delete_schema(Name)
         end,
-        emqx_ee_schema_registry:list_schemas()
+        emqx_schema_registry:list_schemas()
     ).
 
 receive_action_results() ->
@@ -367,7 +367,7 @@ cluster(Config) ->
             %% need to restart schema registry app in the tests so
             %% that it re-registers the config handler that is lost
             %% when emqx_conf restarts during join.
-            {env, [{emqx_machine, applications, [emqx_ee_schema_registry]}]},
+            {env, [{emqx_machine, applications, [emqx_schema_registry]}]},
             {load_apps, [emqx_machine | ?APPS]},
             {env_handler, fun
                 (emqx) ->
@@ -453,11 +453,11 @@ protobuf_unique_cache_hit_spec(_Res, _Trace) ->
 %%------------------------------------------------------------------------------
 
 t_unknown_calls(_Config) ->
-    Ref = monitor(process, emqx_ee_schema_registry),
+    Ref = monitor(process, emqx_schema_registry),
     %% for coverage
-    emqx_ee_schema_registry ! unknown,
-    gen_server:cast(emqx_ee_schema_registry, unknown),
-    ?assertEqual({error, unknown_call}, gen_server:call(emqx_ee_schema_registry, unknown)),
+    emqx_schema_registry ! unknown,
+    gen_server:cast(emqx_schema_registry, unknown),
+    ?assertEqual({error, unknown_call}, gen_server:call(emqx_schema_registry, unknown)),
     receive
         {'DOWN', Ref, process, _, _} ->
             ct:fail("registry shouldn't have died")
@@ -489,7 +489,7 @@ t_delete_serde(Config) ->
             ok = create_serde(SerdeType, SerdeName),
             {ok, {ok, _}} =
                 ?wait_async_action(
-                    emqx_ee_schema_registry:delete_schema(SerdeName),
+                    emqx_schema_registry:delete_schema(SerdeName),
                     #{?snk_kind := schema_registry_serdes_deleted},
                     1_000
                 ),
@@ -522,7 +522,7 @@ t_encode(Config) ->
         Published
     ),
     #{payload := Encoded} = Published,
-    {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
+    {ok, #{deserializer := Deserializer}} = emqx_schema_registry:get_serde(SerdeName),
     ?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])),
     ok.
 
@@ -536,7 +536,7 @@ t_decode(Config) ->
         extra_args := ExtraArgs
     } = test_params_for(SerdeType, decode1),
     {ok, _} = create_rule_http(#{sql => SQL}),
-    {ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
+    {ok, #{serializer := Serializer}} = emqx_schema_registry:get_serde(SerdeName),
     EncodedBin = apply(Serializer, [Payload | ExtraArgs]),
     emqx:publish(emqx_message:make(<<"t">>, EncodedBin)),
     Published = receive_published(?LINE),
@@ -559,7 +559,7 @@ t_protobuf_union_encode(Config) ->
         extra_args := ExtraArgs
     } = test_params_for(SerdeType, union1),
     {ok, _} = create_rule_http(#{sql => SQL}),
-    {ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
+    {ok, #{serializer := Serializer}} = emqx_schema_registry:get_serde(SerdeName),
 
     EncodedBinA = apply(Serializer, [PayloadA | ExtraArgs]),
     emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
@@ -594,7 +594,7 @@ t_protobuf_union_decode(Config) ->
         extra_args := ExtraArgs
     } = test_params_for(SerdeType, union2),
     {ok, _} = create_rule_http(#{sql => SQL}),
-    {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
+    {ok, #{deserializer := Deserializer}} = emqx_schema_registry:get_serde(SerdeName),
 
     EncodedBinA = emqx_utils_json:encode(PayloadA),
     emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
@@ -639,9 +639,9 @@ t_fail_rollback(Config) ->
             #{}
         )
     ),
-    ?assertMatch({ok, #{name := <<"a">>}}, emqx_ee_schema_registry:get_serde(<<"a">>)),
+    ?assertMatch({ok, #{name := <<"a">>}}, emqx_schema_registry:get_serde(<<"a">>)),
     %% no z serdes should be in the table
-    ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)),
+    ?assertEqual({error, not_found}, emqx_schema_registry:get_serde(<<"z">>)),
     ok.
 
 t_cluster_serde_build(Config) ->
@@ -660,13 +660,13 @@ t_cluster_serde_build(Config) ->
             wait_for_cluster_rpc(N2),
             ?assertEqual(
                 ok,
-                erpc:call(N2, emqx_ee_schema_registry, add_schema, [SerdeName, Schema])
+                erpc:call(N2, emqx_schema_registry, add_schema, [SerdeName, Schema])
             ),
             %% check that we can serialize/deserialize in all nodes
             lists:foreach(
                 fun(N) ->
                     erpc:call(N, fun() ->
-                        Res0 = emqx_ee_schema_registry:get_serde(SerdeName),
+                        Res0 = emqx_schema_registry:get_serde(SerdeName),
                         ?assertMatch({ok, #{}}, Res0, #{node => N}),
                         {ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0,
                         ?assertEqual(
@@ -691,7 +691,7 @@ t_cluster_serde_build(Config) ->
             ),
             ?assertEqual(
                 ok,
-                erpc:call(N1, emqx_ee_schema_registry, delete_schema, [SerdeName])
+                erpc:call(N1, emqx_schema_registry, delete_schema, [SerdeName])
             ),
             {ok, _} = snabbkaffe:receive_events(SRef1),
             lists:foreach(
@@ -699,7 +699,7 @@ t_cluster_serde_build(Config) ->
                     erpc:call(N, fun() ->
                         ?assertMatch(
                             {error, not_found},
-                            emqx_ee_schema_registry:get_serde(SerdeName),
+                            emqx_schema_registry:get_serde(SerdeName),
                             #{node => N}
                         ),
                         ok
@@ -726,7 +726,15 @@ t_import_config(_Config) ->
                             #{
                                 <<"description">> => <<"My Avro Schema">>,
                                 <<"source">> =>
-                                    <<"{\"type\":\"record\",\"fields\":[{\"type\":\"int\",\"name\":\"i\"},{\"type\":\"string\",\"name\":\"s\"}]}">>,
+                                    emqx_utils_json:encode(
+                                        #{
+                                            type => <<"record">>,
+                                            fields => [
+                                                #{type => <<"int">>, name => <<"i">>},
+                                                #{type => <<"string">>, name => <<"s">>}
+                                            ]
+                                        }
+                                    ),
                                 <<"type">> => <<"avro">>
                             }
                     }
@@ -740,15 +748,21 @@ t_import_config(_Config) ->
     Path = [schema_registry, schemas, <<"my_avro_schema">>],
     ?assertEqual(
         {ok, #{root_key => schema_registry, changed => []}},
-        emqx_ee_schema_registry:import_config(RawConf)
+        emqx_schema_registry:import_config(RawConf)
     ),
     ?assertEqual(
         {ok, #{root_key => schema_registry, changed => [Path]}},
-        emqx_ee_schema_registry:import_config(RawConf1)
+        emqx_schema_registry:import_config(RawConf1)
     ).
 
 sparkplug_example_data_base64() ->
-    <<"CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgDEikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA==">>.
+    <<
+        "CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgD"
+        "EikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3Vu"
+        "dGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3Jv"
+        "dXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50"
+        "ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA=="
+    >>.
 
 sparkplug_example_data() ->
     #{

+ 8 - 8
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ee_schema_registry_http_api_SUITE).
+-module(emqx_schema_registry_http_api_SUITE).
 
 -compile(nowarn_export_all).
 -compile(export_all).
@@ -12,9 +12,9 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--include("emqx_ee_schema_registry.hrl").
+-include("emqx_schema_registry.hrl").
 
--define(APPS, [emqx_conf, emqx_ee_schema_registry]).
+-define(APPS, [emqx_conf, emqx_schema_registry]).
 
 %%------------------------------------------------------------------------------
 %% CT boilerplate
@@ -34,7 +34,7 @@ groups() ->
     ].
 
 init_per_suite(Config) ->
-    emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
+    emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema),
     emqx_mgmt_api_test_util:init_suite(?APPS),
     Config.
 
@@ -138,9 +138,9 @@ try_decode_error_message(Res) ->
 clear_schemas() ->
     maps:foreach(
         fun(Name, _Schema) ->
-            ok = emqx_ee_schema_registry:delete_schema(Name)
+            ok = emqx_schema_registry:delete_schema(Name)
         end,
-        emqx_ee_schema_registry:list_schemas()
+        emqx_schema_registry:list_schemas()
     ).
 
 %%------------------------------------------------------------------------------
@@ -249,7 +249,7 @@ t_crud(Config) ->
         {ok, 400, #{
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> :=
-                <<"{post_config_update,emqx_ee_schema_registry,", _/binary>>
+                <<"{post_config_update,emqx_schema_registry,", _/binary>>
         }},
         request({put, SchemaName, UpdateParams#{<<"source">> := InvalidSourceBin}})
     ),
@@ -290,7 +290,7 @@ t_crud(Config) ->
         {ok, 400, #{
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> :=
-                <<"{post_config_update,emqx_ee_schema_registry,", _/binary>>
+                <<"{post_config_update,emqx_schema_registry,", _/binary>>
         }},
         request({post, Params#{<<"source">> := InvalidSourceBin}})
     ),

+ 17 - 17
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ee_schema_registry_serde_SUITE).
+-module(emqx_schema_registry_serde_SUITE).
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -10,11 +10,11 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--include("emqx_ee_schema_registry.hrl").
+-include("emqx_schema_registry.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
--define(APPS, [emqx_conf, emqx_rule_engine, emqx_ee_schema_registry]).
+-define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
 
 %%------------------------------------------------------------------------------
 %% CT boilerplate
@@ -24,7 +24,7 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
+    emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema),
     emqx_mgmt_api_test_util:init_suite(?APPS),
     Config.
 
@@ -46,9 +46,9 @@ end_per_testcase(_TestCase, _Config) ->
 clear_schemas() ->
     maps:foreach(
         fun(Name, _Schema) ->
-            ok = emqx_ee_schema_registry:delete_schema(Name)
+            ok = emqx_schema_registry:delete_schema(Name)
         end,
-        emqx_ee_schema_registry:list_schemas()
+        emqx_schema_registry:list_schemas()
     ).
 
 schema_params(avro) ->
@@ -81,13 +81,13 @@ schema_params(protobuf) ->
     #{type => protobuf, source => SourceBin}.
 
 assert_roundtrip(SerdeName, Original) ->
-    Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original),
-    Decoded = emqx_ee_schema_registry_serde:decode(SerdeName, Encoded),
+    Encoded = emqx_schema_registry_serde:encode(SerdeName, Original),
+    Decoded = emqx_schema_registry_serde:decode(SerdeName, Encoded),
     ?assertEqual(Original, Decoded, #{original => Original}).
 
 assert_roundtrip(SerdeName, Original, ArgsSerialize, ArgsDeserialize) ->
-    Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original, ArgsSerialize),
-    Decoded = emqx_ee_schema_registry_serde:decode(SerdeName, Encoded, ArgsDeserialize),
+    Encoded = emqx_schema_registry_serde:encode(SerdeName, Original, ArgsSerialize),
+    Decoded = emqx_schema_registry_serde:decode(SerdeName, Encoded, ArgsDeserialize),
     ?assertEqual(Original, Decoded, #{original => Original}).
 
 %%------------------------------------------------------------------------------
@@ -97,7 +97,7 @@ assert_roundtrip(SerdeName, Original, ArgsSerialize, ArgsDeserialize) ->
 t_roundtrip_avro(_Config) ->
     SerdeName = my_serde,
     Params = schema_params(avro),
-    ok = emqx_ee_schema_registry:add_schema(SerdeName, Params),
+    ok = emqx_schema_registry:add_schema(SerdeName, Params),
     Original = #{<<"i">> => 10, <<"s">> => <<"hi">>},
     %% for coverage
     assert_roundtrip(SerdeName, Original, _ArgsSerialize = [], _ArgsDeserialize = []),
@@ -110,7 +110,7 @@ t_avro_invalid_json_schema(_Config) ->
     WrongParams = Params#{source := <<"{">>},
     ?assertMatch(
         {error, #{reason := #{expected := _}}},
-        emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
+        emqx_schema_registry:add_schema(SerdeName, WrongParams)
     ),
     ok.
 
@@ -120,7 +120,7 @@ t_avro_invalid_schema(_Config) ->
     WrongParams = Params#{source := <<"{}">>},
     ?assertMatch(
         {error, {post_config_update, _, {not_found, <<"type">>}}},
-        emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
+        emqx_schema_registry:add_schema(SerdeName, WrongParams)
     ),
     ok.
 
@@ -130,18 +130,18 @@ t_serde_not_found(_Config) ->
     Original = #{},
     ?assertError(
         {serde_not_found, NonexistentSerde},
-        emqx_ee_schema_registry_serde:encode(NonexistentSerde, Original)
+        emqx_schema_registry_serde:encode(NonexistentSerde, Original)
     ),
     ?assertError(
         {serde_not_found, NonexistentSerde},
-        emqx_ee_schema_registry_serde:decode(NonexistentSerde, Original)
+        emqx_schema_registry_serde:decode(NonexistentSerde, Original)
     ),
     ok.
 
 t_roundtrip_protobuf(_Config) ->
     SerdeName = my_serde,
     Params = schema_params(protobuf),
-    ok = emqx_ee_schema_registry:add_schema(SerdeName, Params),
+    ok = emqx_schema_registry:add_schema(SerdeName, Params),
     ExtraArgsPerson = [<<"Person">>],
 
     Original0 = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
@@ -167,6 +167,6 @@ t_protobuf_invalid_schema(_Config) ->
     WrongParams = Params#{source := <<"xxxx">>},
     ?assertMatch(
         {error, {post_config_update, _, {invalid_protobuf_schema, _}}},
-        emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
+        emqx_schema_registry:add_schema(SerdeName, WrongParams)
     ),
     ok.

+ 20 - 9
apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl

@@ -101,7 +101,7 @@ init_per_testcase(t_get_telemetry, Config) ->
         "test/emqx_gateway_SUITE_data"
     ),
     ok = emqx_gateway_SUITE:setup_fake_usage_data(Lwm2mDataDir),
-    {ok, _} = application:ensure_all_started(emqx_gateway),
+    emqx_common_test_helpers:start_apps([emqx_gateway]),
     Config;
 init_per_testcase(t_advanced_mqtt_features, Config) ->
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
@@ -129,8 +129,7 @@ init_per_testcase(t_send_after_enable, Config) ->
 init_per_testcase(t_rule_engine_and_data_bridge_info, Config) ->
     mock_httpc(),
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
-    {ok, _} = application:ensure_all_started(emqx_rule_engine),
-    ok = application:start(emqx_bridge),
+    emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_bridge]),
     ok = emqx_bridge_SUITE:setup_fake_telemetry_data(),
     ok = setup_fake_rule_engine_data(),
     Config;
@@ -154,7 +153,7 @@ init_per_testcase(t_exhook_info, Config) ->
     {ok, Sock} = gen_tcp:connect("localhost", 9000, [], 3000),
     _ = gen_tcp:close(Sock),
     ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, ExhookConf),
-    {ok, _} = application:ensure_all_started(emqx_exhook),
+    emqx_common_test_helpers:start_apps([emqx_exhook]),
     Config;
 init_per_testcase(t_cluster_uuid, Config) ->
     Node = start_slave(n1),
@@ -630,11 +629,23 @@ bin(B) when is_binary(B) ->
 mock_httpc() ->
     TestPID = self(),
     ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
-    ok = meck:expect(httpc, request, fun(
-        Method, {URL, Headers, _ContentType, Body}, _HTTPOpts, _Opts
-    ) ->
-        TestPID ! {request, Method, URL, Headers, Body}
-    end).
+    ok = meck:expect(
+        httpc,
+        request,
+        fun
+            (Method, {URL, Headers, _ContentType, Body}, _HTTPOpts, _Opts) ->
+                TestPID ! {request, Method, URL, Headers, Body};
+            (Method, {URL, Headers}, _HTTPOpts, _Opts) ->
+                TestPID ! {request, Method, URL, Headers, undefined}
+        end
+    ),
+    ok = meck:expect(
+        httpc,
+        request,
+        fun(Method, {URL, Headers}, _Opts) ->
+            TestPID ! {request, Method, URL, Headers, undefined}
+        end
+    ).
 
 mock_advanced_mqtt_features() ->
     Context = undefined,

+ 1 - 1
build

@@ -320,7 +320,7 @@ make_tgz() {
             if [ "${APPLE_SIGN_BINARIES:-0}" = 1 ]; then
                 # notarize the package
                 # if fails, check what went wrong with this command:
-                    # xcrun notarytool log --apple-id <apple id> \
+                    # xcrun notarytool log \
                     #   --apple-id <apple id> \
                     #   --password <apple id password>
                     #   --team-id <apple team id> <submission-id>

+ 1 - 0
changes/ce/feat-11249.en.md

@@ -0,0 +1 @@
+Support HTTP API for setting alarm watermark of license.

+ 3 - 0
changes/ce/feat-11251.en.md

@@ -0,0 +1,3 @@
+Add `/cluster/topology` HTTP API endpoint
+
+`GET` request to the endpoint returns the cluster topology: connections between RLOG core and replicant nodes.

+ 1 - 0
changes/ce/feat-11253.en.md

@@ -0,0 +1 @@
+The Webhook/HTTP bridge has been refactored to its own Erlang application. This allows for more flexibility in the future, and also allows for the bridge to be run as a standalone application.

+ 2 - 0
changes/ce/fix-11250.en.md

@@ -0,0 +1,2 @@
+Fix while a WebSocket packet contains more than one MQTT packet, the order of MQTT packets will be reversed.
+

+ 1 - 0
changes/ce/perf-11236.en.md

@@ -0,0 +1 @@
+Improve the speed of clients querying in HTTP API `/clients` endpoint with default parameters

+ 1 - 0
changes/ee/feat-11241.en.md

@@ -0,0 +1 @@
+Schema Registry has been refactored to its own Erlang application. This allows for more flexibility in the future.

+ 2 - 2
deploy/charts/emqx-enterprise/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.1.0
+version: 5.1.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.1.0
+appVersion: 5.1.1

+ 0 - 19
lib-ee/emqx_ee_schema_registry/.gitignore

@@ -1,19 +0,0 @@
-.rebar3
-_*
-.eunit
-*.o
-*.beam
-*.plt
-*.swp
-*.swo
-.erlang.cookie
-ebin
-log
-erl_crash.dump
-.rebar
-logs
-_build
-.idea
-*.iml
-rebar3.crashdump
-*~

+ 14 - 1
lib-ee/emqx_license/src/emqx_license.erl

@@ -22,7 +22,8 @@
     unload/0,
     read_license/0,
     read_license/1,
-    update_key/1
+    update_key/1,
+    update_setting/1
 ]).
 
 -define(CONF_KEY_PATH, [license]).
@@ -64,6 +65,14 @@ update_key(Value) when is_binary(Value); is_list(Value) ->
     ),
     handle_config_update_result(Result).
 
+update_setting(Setting) when is_map(Setting) ->
+    Result = emqx_conf:update(
+        ?CONF_KEY_PATH,
+        {setting, Setting},
+        #{rawconf_with_defaults => true, override_to => cluster}
+    ),
+    handle_config_update_result(Result).
+
 %%------------------------------------------------------------------------------
 %% emqx_hooks
 %%------------------------------------------------------------------------------
@@ -96,6 +105,8 @@ check(_ConnInfo, AckProps) ->
 pre_config_update(_, Cmd, Conf) ->
     {ok, do_update(Cmd, Conf)}.
 
+post_config_update(_Path, {setting, _}, NewConf, _Old, _AppEnvs) ->
+    {ok, NewConf};
 post_config_update(_Path, _Cmd, NewConf, _Old, _AppEnvs) ->
     case read_license(NewConf) of
         {ok, License} ->
@@ -122,6 +133,8 @@ do_update({key, Content}, Conf) when is_binary(Content); is_list(Content) ->
         {error, Reason} ->
             erlang:throw(Reason)
     end;
+do_update({setting, Setting}, Conf) ->
+    maps:merge(Conf, Setting);
 do_update(NewConf, _PrevConf) ->
     #{<<"key">> := NewKey} = NewConf,
     do_update({key, NewKey}, NewConf).

+ 55 - 9
lib-ee/emqx_license/src/emqx_license_http_api.erl

@@ -13,11 +13,14 @@
     namespace/0,
     api_spec/0,
     paths/0,
-    schema/1
+    schema/1,
+    fields/1
 ]).
+-define(LICENSE_TAGS, [<<"License">>]).
 
 -export([
-    '/license'/2
+    '/license'/2,
+    '/license/setting'/2
 ]).
 
 -define(BAD_REQUEST, 'BAD_REQUEST').
@@ -29,14 +32,15 @@ api_spec() ->
 
 paths() ->
     [
-        "/license"
+        "/license",
+        "/license/setting"
     ].
 
 schema("/license") ->
     #{
         'operationId' => '/license',
         get => #{
-            tags => [<<"license">>],
+            tags => ?LICENSE_TAGS,
             summary => <<"Get license info">>,
             description => ?DESC("desc_license_info_api"),
             responses => #{
@@ -50,19 +54,18 @@ schema("/license") ->
                 )
             }
         },
+        %% TODO(5.x): It's a update action, should use PUT instead
         post => #{
-            tags => [<<"license">>],
+            tags => ?LICENSE_TAGS,
             summary => <<"Update license key">>,
             description => ?DESC("desc_license_key_api"),
             'requestBody' => emqx_dashboard_swagger:schema_with_examples(
-                emqx_license_schema:key_license(),
+                hoconsc:ref(?MODULE, key_license),
                 #{
                     license_key => #{
                         summary => <<"License key string">>,
                         value => #{
-                            <<"key">> => <<"xxx">>,
-                            <<"connection_low_watermark">> => "75%",
-                            <<"connection_high_watermark">> => "80%"
+                            <<"key">> => <<"xxx">>
                         }
                     }
                 }
@@ -79,6 +82,28 @@ schema("/license") ->
                 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad license key">>)
             }
         }
+    };
+schema("/license/setting") ->
+    #{
+        'operationId' => '/license/setting',
+        get => #{
+            tags => ?LICENSE_TAGS,
+            summary => <<"Get license setting">>,
+            description => ?DESC("desc_license_setting_api"),
+            responses => #{
+                200 => setting()
+            }
+        },
+        put => #{
+            tags => ?LICENSE_TAGS,
+            summary => <<"Update license setting">>,
+            description => ?DESC("desc_license_setting_api"),
+            'requestBody' => setting(),
+            responses => #{
+                200 => setting(),
+                400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad setting value">>)
+            }
+        }
     }.
 
 sample_license_info_response() ->
@@ -117,3 +142,24 @@ error_msg(Code, Msg) ->
     end;
 '/license'(post, _Params) ->
     {400, error_msg(?BAD_REQUEST, <<"Invalid request params">>)}.
+
+'/license/setting'(get, _Params) ->
+    {200, maps:remove(<<"key">>, emqx_config:get_raw([license]))};
+'/license/setting'(put, #{body := Setting}) ->
+    case emqx_license:update_setting(Setting) of
+        {error, Error} ->
+            ?SLOG(error, #{
+                msg => "bad_license_setting",
+                reason => Error
+            }),
+            {400, error_msg(?BAD_REQUEST, <<"Bad license setting">>)};
+        {ok, _} ->
+            ?SLOG(info, #{msg => "updated_license_setting"}),
+            '/license/setting'(get, undefined)
+    end.
+
+fields(key_license) ->
+    [lists:keyfind(key, 1, emqx_license_schema:fields(key_license))].
+
+setting() ->
+    lists:keydelete(key, 1, emqx_license_schema:fields(key_license)).

+ 4 - 6
lib-ee/emqx_license/src/emqx_license_schema.erl

@@ -16,15 +16,14 @@
 -export([roots/0, fields/1, validations/0, desc/1, tags/0]).
 
 -export([
-    default_license/0,
-    key_license/0
+    default_license/0
 ]).
 
 roots() ->
     [
         {license,
             hoconsc:mk(
-                key_license(),
+                hoconsc:ref(?MODULE, key_license),
                 #{
                     desc => ?DESC(license_root)
                 }
@@ -47,11 +46,13 @@ fields(key_license) ->
         {connection_low_watermark, #{
             type => emqx_schema:percent(),
             default => <<"75%">>,
+            example => <<"75%">>,
             desc => ?DESC(connection_low_watermark_field)
         }},
         {connection_high_watermark, #{
             type => emqx_schema:percent(),
             default => <<"80%">>,
+            example => <<"80%">>,
             desc => ?DESC(connection_high_watermark_field)
         }}
     ].
@@ -64,9 +65,6 @@ desc(_) ->
 validations() ->
     [{check_license_watermark, fun check_license_watermark/1}].
 
-key_license() ->
-    hoconsc:ref(?MODULE, key_license).
-
 check_license_watermark(Conf) ->
     case hocon_maps:get("license.connection_low_watermark", Conf) of
         undefined ->

+ 51 - 2
lib-ee/emqx_license/test/emqx_license_http_api_SUITE.erl

@@ -38,9 +38,15 @@ set_special_configs(emqx_dashboard) ->
     emqx_dashboard_api_test_helpers:set_default_config(<<"license_admin">>);
 set_special_configs(emqx_license) ->
     LicenseKey = emqx_license_test_lib:make_license(#{max_connections => "100"}),
-    Config = #{key => LicenseKey},
+    Config = #{
+        key => LicenseKey, connection_low_watermark => 0.75, connection_high_watermark => 0.8
+    },
     emqx_config:put([license], Config),
-    RawConfig = #{<<"key">> => LicenseKey},
+    RawConfig = #{
+        <<"key">> => LicenseKey,
+        <<"connection_low_watermark">> => <<"75%">>,
+        <<"connection_high_watermark">> => <<"80%">>
+    },
     emqx_config:put_raw([<<"license">>], RawConfig),
     ok = persistent_term:put(
         emqx_license_test_pubkey,
@@ -172,3 +178,46 @@ t_license_upload_key_not_json(_Config) ->
     ),
     assert_untouched_license(),
     ok.
+
+t_license_setting(_Config) ->
+    %% get
+    GetRes = request(get, uri(["license", "setting"]), []),
+    validate_setting(GetRes, <<"75%">>, <<"80%">>),
+    %% update
+    Low = <<"50%">>,
+    High = <<"55%">>,
+    UpdateRes = request(put, uri(["license", "setting"]), #{
+        <<"connection_low_watermark">> => Low,
+        <<"connection_high_watermark">> => High
+    }),
+    validate_setting(UpdateRes, Low, High),
+    ?assertEqual(0.5, emqx_config:get([license, connection_low_watermark])),
+    ?assertEqual(0.55, emqx_config:get([license, connection_high_watermark])),
+
+    %% update bad setting low >= high
+    ?assertMatch(
+        {ok, 400, _},
+        request(put, uri(["license", "setting"]), #{
+            <<"connection_low_watermark">> => <<"50%">>,
+            <<"connection_high_watermark">> => <<"50%">>
+        })
+    ),
+    ?assertMatch(
+        {ok, 400, _},
+        request(put, uri(["license", "setting"]), #{
+            <<"connection_low_watermark">> => <<"51%">>,
+            <<"connection_high_watermark">> => <<"50%">>
+        })
+    ),
+    ok.
+
+validate_setting(Res, ExpectLow, ExpectHigh) ->
+    ?assertMatch({ok, 200, _}, Res),
+    {ok, 200, Payload} = Res,
+    ?assertEqual(
+        #{
+            <<"connection_low_watermark">> => ExpectLow,
+            <<"connection_high_watermark">> => ExpectHigh
+        },
+        emqx_utils_json:decode(Payload, [return_maps])
+    ).

+ 5 - 2
mix.exs

@@ -189,7 +189,9 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_rabbitmq,
       :emqx_bridge_clickhouse,
       :emqx_ft,
-      :emqx_s3
+      :emqx_s3,
+      :emqx_schema_registry,
+      :emqx_enterprise
     ])
   end
 
@@ -370,6 +372,7 @@ defmodule EMQXUmbrella.MixProject do
         emqx_exhook: :permanent,
         emqx_bridge: :permanent,
         emqx_bridge_mqtt: :permanent,
+        emqx_bridge_http: :permanent,
         emqx_rule_engine: :permanent,
         emqx_modules: :permanent,
         emqx_management: :permanent,
@@ -417,7 +420,7 @@ defmodule EMQXUmbrella.MixProject do
           emqx_oracle: :permanent,
           emqx_bridge_oracle: :permanent,
           emqx_bridge_rabbitmq: :permanent,
-          emqx_ee_schema_registry: :permanent,
+          emqx_schema_registry: :permanent,
           emqx_eviction_agent: :permanent,
           emqx_node_rebalance: :permanent,
           emqx_ft: :permanent

+ 4 - 1
rebar.config.erl

@@ -102,6 +102,8 @@ is_community_umbrella_app("apps/emqx_oracle") -> false;
 is_community_umbrella_app("apps/emqx_bridge_rabbitmq") -> false;
 is_community_umbrella_app("apps/emqx_ft") -> false;
 is_community_umbrella_app("apps/emqx_s3") -> false;
+is_community_umbrella_app("apps/emqx_schema_registry") -> false;
+is_community_umbrella_app("apps/emqx_enterprise") -> false;
 is_community_umbrella_app(_) -> true.
 
 is_jq_supported() ->
@@ -431,6 +433,7 @@ relx_apps(ReleaseType, Edition) ->
             emqx_exhook,
             emqx_bridge,
             emqx_bridge_mqtt,
+            emqx_bridge_http,
             emqx_rule_engine,
             emqx_modules,
             emqx_management,
@@ -485,7 +488,7 @@ relx_apps_per_edition(ee) ->
         emqx_oracle,
         emqx_bridge_oracle,
         emqx_bridge_rabbitmq,
-        emqx_ee_schema_registry,
+        emqx_schema_registry,
         emqx_eviction_agent,
         emqx_node_rebalance,
         emqx_ft

+ 1 - 1
rel/i18n/emqx_connector_http.hocon

@@ -1,4 +1,4 @@
-emqx_connector_http {
+emqx_bridge_http_connector {
 
 body.desc:
 """HTTP request body."""

+ 1 - 1
rel/i18n/emqx_bridge_webhook_schema.hocon

@@ -1,4 +1,4 @@
-emqx_bridge_webhook_schema {
+emqx_bridge_http_schema {
 
 config_body.desc:
 """The body of the HTTP request.<br/>

+ 6 - 0
rel/i18n/emqx_license_http_api.hocon

@@ -12,4 +12,10 @@ desc_license_key_api.desc:
 desc_license_key_api.label:
 """Update license"""
 
+desc_license_setting_api.desc:
+"""Update license setting"""
+
+desc_license_setting_api.label:
+"""Update license setting"""
+
 }

+ 14 - 2
rel/i18n/emqx_license_schema.hocon

@@ -12,6 +12,18 @@ connection_low_watermark_field.desc:
 connection_low_watermark_field.label:
 """Connection low watermark"""
 
+connection_high_watermark_field_deprecated.desc:
+"""deprecated use /license/setting instead"""
+
+connection_high_watermark_field_deprecated.label:
+"""deprecated use /license/setting instead"""
+
+connection_low_watermark_field_deprecated.desc:
+"""deprecated use /license/setting instead"""
+
+connection_low_watermark_field_deprecated.label:
+"""deprecated use /license/setting instead"""
+
 key_field.desc:
 """License string"""
 
@@ -19,12 +31,12 @@ key_field.label:
 """License string"""
 
 license_root.desc:
-"""Defines the EMQX Enterprise license. 
+"""Defines the EMQX Enterprise license.
 
 
 The default license has 100 connections limit, it is issued on 2023-01-09 and valid for 5 years (1825 days).
 
-EMQX comes with a default trial license.  For production use, please 
+EMQX comes with a default trial license.  For production use, please
 visit https://www.emqx.com/apply-licenses/emqx to apply."""
 
 license_root.label:

+ 5 - 0
rel/i18n/emqx_mgmt_api_cluster.hocon

@@ -5,6 +5,11 @@ get_cluster_info.desc:
 get_cluster_info.label:
 """Get cluster info"""
 
+get_cluster_topology.desc:
+"""Get RLOG cluster topology: connections between core and replicant nodes."""
+get_cluster_topology.label:
+"""Get cluster topology"""
+
 invite_node.desc:
 """Invite node to cluster"""
 invite_node.label:

+ 1 - 1
rel/i18n/emqx_ee_schema_registry_http_api.hocon

@@ -1,4 +1,4 @@
-emqx_ee_schema_registry_http_api {
+emqx_schema_registry_http_api {
 
 desc_param_path_schema_name.desc:
 """The schema name"""

+ 1 - 1
rel/i18n/emqx_ee_schema_registry_schema.hocon

@@ -1,4 +1,4 @@
-emqx_ee_schema_registry_schema {
+emqx_schema_registry_schema {
 
 avro_type.desc:
 """[Apache Avro](https://avro.apache.org/) serialization format."""

+ 1 - 0
scripts/macos-sign-binaries.sh

@@ -65,6 +65,7 @@ for f in \
         crc32cer_nif.so \
         crypto.so \
         crypto_callback.so \
+        ezstd_nif.so \
         jiffy.so \
         liberocksdb.so \
         libquicer_nif.so \

+ 9 - 9
scripts/rerun-apps-version-check.py

@@ -22,13 +22,13 @@ def query(owner, repo):
     return """
 query {
   repository(owner: "%s", name: "%s") {
-    pullRequests(last: 25, states: OPEN) {
+    pullRequests(first: 25, states: OPEN, orderBy: {field:CREATED_AT, direction:DESC}) {
       nodes {
         url
         commits(last: 1) {
           nodes {
             commit {
-              checkSuites(first: 17) {
+              checkSuites(first: 25) {
                 nodes {
                   url
                   checkRuns(first: 1, filterBy: {checkName: "check_apps_version"}) {
@@ -77,7 +77,7 @@ def get_check_suite_ids(token: str, repo: str):
         if not 'data' in resp:
             print(f'Failed to fetch check runs: {r.status_code}\n{r.json()}')
             sys.exit(1)
-        ids = []
+        result = []
         for pr in resp['data']['repository']['pullRequests']['nodes']:
             if not pr['commits']['nodes']:
                 continue
@@ -85,12 +85,11 @@ def get_check_suite_ids(token: str, repo: str):
                 continue
             for node in pr['commits']['nodes'][0]['commit']['checkSuites']['nodes']:
                 if node['checkRuns']['nodes']:
-                    id = node['checkRuns']['nodes'][0]['url'].rsplit('/', 1)[-1]
                     url_parsed = urlparse(node['url'])
                     params = parse_qs(url_parsed.query)
                     check_suite_id = params['check_suite_id'][0] 
-                    ids.extend([check_suite_id])
-        return ids
+                    result.extend([(check_suite_id, pr['url'], node['checkRuns']['nodes'][0]['url'])])
+        return result
     else:
         print(f'Failed to fetch check runs: {r.status_code}\n{r.text}')
         sys.exit(1)
@@ -100,9 +99,9 @@ def rerequest_check_suite(token: str, repo: str, check_suite_id: str):
     url = f'https://api.github.com/repos/{repo}/check-suites/{check_suite_id}/rerequest'
     r = session.post(url, headers=get_headers(token))
     if r.status_code == 201:
-        print(f'Successfully triggered rerequest for check suite {check_suite_id}')
+        print(f'Successfully triggered {url}')
     else:
-        print(f'Failed to trigger rerequest for check suite {check_suite_id}: {r.status_code}\n{r.text}')
+        print(f'Failed to trigger {url}: {r.status_code}\n{r.text}')
 
 def main():
     parser = OptionParser()
@@ -115,7 +114,8 @@ def main():
     # Get github token from env var if provided, else use the one from command line.
     # The token must be exported in the env from ${{ secrets.GITHUB_TOKEN }} in the workflow.
     token = os.environ['GITHUB_TOKEN'] if 'GITHUB_TOKEN' in os.environ else options.gh_token
-    for id in get_check_suite_ids(token, options.repo):
+    for id, pr_url, check_run_url in get_check_suite_ids(token, options.repo):
+        print(f'Attempting to re-request {check_run_url} for {pr_url}')
         rerequest_check_suite(token, options.repo, id)
 
 if __name__ == '__main__':