|
|
@@ -36,8 +36,14 @@
|
|
|
conn_bridge_examples(Method) ->
|
|
|
[
|
|
|
#{
|
|
|
- <<"kafka">> => #{
|
|
|
- summary => <<"Kafka Bridge">>,
|
|
|
+ <<"kafka_producer">> => #{
|
|
|
+ summary => <<"Kafka Producer Bridge">>,
|
|
|
+ value => values(Method)
|
|
|
+ }
|
|
|
+ },
|
|
|
+ #{
|
|
|
+ <<"kafka_consumer">> => #{
|
|
|
+ summary => <<"Kafka Consumer Bridge">>,
|
|
|
value => values(Method)
|
|
|
}
|
|
|
}
|
|
|
@@ -60,14 +66,18 @@ host_opts() ->
|
|
|
|
|
|
namespace() -> "bridge_kafka".
|
|
|
|
|
|
-roots() -> ["config"].
|
|
|
+roots() -> ["config_consumer", "config_producer"].
|
|
|
|
|
|
-fields("post") ->
|
|
|
- [type_field(), name_field() | fields("config")];
|
|
|
-fields("put") ->
|
|
|
- fields("config");
|
|
|
-fields("get") ->
|
|
|
- emqx_bridge_schema:status_fields() ++ fields("post");
|
|
|
+fields("post_" ++ Type) ->
|
|
|
+ [type_field(), name_field() | fields("config_" ++ Type)];
|
|
|
+fields("put_" ++ Type) ->
|
|
|
+ fields("config_" ++ Type);
|
|
|
+fields("get_" ++ Type) ->
|
|
|
+ emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
|
|
|
+fields("config_producer") ->
|
|
|
+ fields(kafka_producer);
|
|
|
+fields("config_consumer") ->
|
|
|
+ fields(kafka_consumer);
|
|
|
fields(kafka_producer) ->
|
|
|
fields("config") ++ fields(producer_opts);
|
|
|
fields(kafka_consumer) ->
|
|
|
@@ -292,8 +302,12 @@ fields(consumer_kafka_opts) ->
|
|
|
|
|
|
desc("config") ->
|
|
|
?DESC("desc_config");
|
|
|
-desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
|
|
- ["Configuration for Kafka using `", string:to_upper(Method), "` method."];
|
|
|
+desc("get_" ++ Type) when Type =:= "consumer"; Type =:= "producer" ->
|
|
|
+ ["Configuration for Kafka using `GET` method."];
|
|
|
+desc("put_" ++ Type) when Type =:= "consumer"; Type =:= "producer" ->
|
|
|
+ ["Configuration for Kafka using `PUT` method."];
|
|
|
+desc("post_" ++ Type) when Type =:= "consumer"; Type =:= "producer" ->
|
|
|
+ ["Configuration for Kafka using `POST` method."];
|
|
|
desc(Name) ->
|
|
|
lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
|
|
|
?DESC(Name).
|
|
|
@@ -317,7 +331,8 @@ struct_names() ->
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
|
%% internal
|
|
|
type_field() ->
|
|
|
- {type, mk(enum([kafka]), #{required => true, desc => ?DESC("desc_type")})}.
|
|
|
+ {type,
|
|
|
+ mk(enum([kafka_consumer, kafka_producer]), #{required => true, desc => ?DESC("desc_type")})}.
|
|
|
|
|
|
name_field() ->
|
|
|
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|