Просмотр исходного кода

feat(bridges): use union member selector function for better error messages

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
5d5c16a56d

+ 31 - 3
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -51,11 +51,39 @@ post_request() ->
 
 api_schema(Method) ->
     Broker = [
-        ref(Mod, Method)
-     || Mod <- [emqx_bridge_webhook_schema, emqx_bridge_mqtt_schema]
+        {Type, ref(Mod, Method)}
+     || {Type, Mod} <- [
+            {<<"webhook">>, emqx_bridge_webhook_schema},
+            {<<"mqtt">>, emqx_bridge_mqtt_schema}
+        ]
     ],
     EE = ee_api_schemas(Method),
-    hoconsc:union(Broker ++ EE).
+    hoconsc:union(bridge_api_union(Broker ++ EE)).
+
+bridge_api_union(Refs) ->
+    Index = maps:from_list(Refs),
+    fun
+        (all_union_members) ->
+            maps:values(Index);
+        ({value, V}) ->
+            case V of
+                #{<<"type">> := T} ->
+                    case maps:get(T, Index, undefined) of
+                        undefined ->
+                            throw(#{
+                                field_name => type,
+                                reason => <<"unknown bridge type">>
+                            });
+                        Ref ->
+                            [Ref]
+                    end;
+                _ ->
+                    throw(#{
+                        field_name => type,
+                        reason => <<"unknown bridge type">>
+                    })
+            end
+    end.
 
 -if(?EMQX_RELEASE_EDITION == ee).
 ee_api_schemas(Method) ->

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ee_bridge, [
     {description, "EMQX Enterprise data bridges"},
-    {vsn, "0.1.12"},
+    {vsn, "0.1.13"},
     {registered, []},
     {applications, [
         kernel,

+ 34 - 27
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -14,33 +14,37 @@
 
 api_schemas(Method) ->
     [
-        ref(emqx_bridge_gcp_pubsub, Method),
-        ref(emqx_bridge_kafka, Method ++ "_consumer"),
-        ref(emqx_bridge_kafka, Method ++ "_producer"),
-        ref(emqx_bridge_cassandra, Method),
-        ref(emqx_ee_bridge_mysql, Method),
-        ref(emqx_bridge_pgsql, Method),
-        ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
-        ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
-        ref(emqx_ee_bridge_mongodb, Method ++ "_single"),
-        ref(emqx_ee_bridge_hstreamdb, Method),
-        ref(emqx_bridge_influxdb, Method ++ "_api_v1"),
-        ref(emqx_bridge_influxdb, Method ++ "_api_v2"),
-        ref(emqx_ee_bridge_redis, Method ++ "_single"),
-        ref(emqx_ee_bridge_redis, Method ++ "_sentinel"),
-        ref(emqx_ee_bridge_redis, Method ++ "_cluster"),
-        ref(emqx_bridge_timescale, Method),
-        ref(emqx_bridge_matrix, Method),
-        ref(emqx_bridge_tdengine, Method),
-        ref(emqx_ee_bridge_clickhouse, Method),
-        ref(emqx_bridge_dynamo, Method),
-        ref(emqx_bridge_rocketmq, Method),
-        ref(emqx_bridge_sqlserver, Method),
-        ref(emqx_bridge_opents, Method),
-        ref(emqx_bridge_pulsar, Method ++ "_producer"),
-        ref(emqx_bridge_oracle, Method),
-        ref(emqx_bridge_iotdb, Method),
-        ref(emqx_bridge_rabbitmq, Method)
+        %% We need to map the `type' field of a request (binary) to a
+        %% bridge schema module.
+        api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub">>, Method),
+        api_ref(emqx_bridge_kafka, <<"kafka_consumer">>, Method ++ "_consumer"),
+        %% TODO: rename this to `kafka_producer' after alias support is added
+        %% to hocon; keeping this as just `kafka' for backwards compatibility.
+        api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_producer"),
+        api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method),
+        api_ref(emqx_ee_bridge_mysql, <<"mysql">>, Method),
+        api_ref(emqx_bridge_pgsql, <<"pgsql">>, Method),
+        api_ref(emqx_ee_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"),
+        api_ref(emqx_ee_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"),
+        api_ref(emqx_ee_bridge_mongodb, <<"mongodb_single">>, Method ++ "_single"),
+        api_ref(emqx_ee_bridge_hstreamdb, <<"hstreamdb">>, Method),
+        api_ref(emqx_bridge_influxdb, <<"influxdb_api_v1">>, Method ++ "_api_v1"),
+        api_ref(emqx_bridge_influxdb, <<"influxdb_api_v2">>, Method ++ "_api_v2"),
+        api_ref(emqx_ee_bridge_redis, <<"redis_single">>, Method ++ "_single"),
+        api_ref(emqx_ee_bridge_redis, <<"redis_sentinel">>, Method ++ "_sentinel"),
+        api_ref(emqx_ee_bridge_redis, <<"redis_cluster">>, Method ++ "_cluster"),
+        api_ref(emqx_bridge_timescale, <<"timescale">>, Method),
+        api_ref(emqx_bridge_matrix, <<"matrix">>, Method),
+        api_ref(emqx_bridge_tdengine, <<"tdengine">>, Method),
+        api_ref(emqx_ee_bridge_clickhouse, <<"clickhouse">>, Method),
+        api_ref(emqx_bridge_dynamo, <<"dynamo">>, Method),
+        api_ref(emqx_bridge_rocketmq, <<"rocketmq">>, Method),
+        api_ref(emqx_bridge_sqlserver, <<"sqlserver">>, Method),
+        api_ref(emqx_bridge_opents, <<"opents">>, Method),
+        api_ref(emqx_bridge_pulsar, <<"pulsar_producer">>, Method ++ "_producer"),
+        api_ref(emqx_bridge_oracle, <<"oracle">>, Method),
+        api_ref(emqx_bridge_iotdb, <<"iotdb">>, Method),
+        api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method)
     ].
 
 schema_modules() ->
@@ -338,3 +342,6 @@ rabbitmq_structs() ->
                 }
             )}
     ].
+
+api_ref(Module, Type, Method) ->
+    {Type, ref(Module, Method)}.