Преглед изворни кода

fix: add influxdb udp api_v1 api_v2 connector

DDDHuang пре 3 година
родитељ
комит
4c7ca2217c

+ 3 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl

@@ -70,9 +70,9 @@ fields("get") ->
 field(connector) ->
     ConnectorConfigRef =
         [
-            ref(emqx_ee_connector_influxdb, udp),
-            ref(emqx_ee_connector_influxdb, api_v1),
-            ref(emqx_ee_connector_influxdb, api_v2)
+            ref(emqx_ee_connector_influxdb, influxdb_udp),
+            ref(emqx_ee_connector_influxdb, influxdb_api_v1),
+            ref(emqx_ee_connector_influxdb, influxdb_api_v2)
         ],
     mk(
         hoconsc:union([binary() | ConnectorConfigRef]),

+ 2 - 1
lib-ee/emqx_ee_connector/rebar.config

@@ -1,6 +1,7 @@
 {erl_opts, [debug_info]}.
 {deps, [
-  {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}
+  {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
+  {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.2"}}}
 ]}.
 
 {shell, [

+ 8 - 5
lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl

@@ -26,14 +26,17 @@ fields(connectors) ->
                 hoconsc:map(name, ref(emqx_ee_connector_hstream, config)),
                 #{desc => <<"EMQX Enterprise Config">>}
             )}
-    ] ++ fields(influxdb);
+    ];
+% ] ++ fields(influxdb);
 fields(influxdb) ->
     [
-        {Protocol,
-            mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, Protocol)), #{
+        {
+            influxdb,
+            mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, influxdb_udp)), #{
                 desc => <<"EMQX Enterprise Config">>
-            })}
-     || Protocol <- [udp, api_v1, api_v2]
+            })
+        }
+        %  || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2]
     ].
 
 connector_examples(Method) ->

+ 84 - 42
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -32,37 +32,41 @@
 on_start(InstId, Config) ->
     start_client(InstId, Config).
 
-on_stop(_InstId, _State) ->
-    ok.
+on_stop(_InstId, #{client := Client}) ->
+    influxdb:stop_client(Client).
 
 on_query(_InstId, {send_message, _Data}, _AfterQuery, _State) ->
     ok.
 
-on_get_status(_InstId, _State) ->
-    % connected;
-    disconnected.
+on_get_status(_InstId, #{client := Client}) ->
+    case influxdb:is_alive(Client) of
+        true ->
+            connected;
+        false ->
+            disconnected
+    end.
 
 %% -------------------------------------------------------------------------------------------------
 %% schema
 
 fields("put_udp") ->
-    lists:filter(?PUT_FIELDS_FILTER, fields(udp));
+    lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_udp));
 fields("put_api_v1") ->
-    lists:filter(?PUT_FIELDS_FILTER, fields(api_v1));
+    lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_api_v1));
 fields("put_api_v2") ->
-    lists:filter(?PUT_FIELDS_FILTER, fields(api_v2));
+    lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_api_v2));
 fields("get_udp") ->
-    fields(udp);
+    fields(influxdb_udp);
 fields("get_api_v1") ->
-    fields(api_v1);
+    fields(influxdb_api_v1);
 fields("get_api_v2") ->
-    fields(api_v2);
+    fields(influxdb_api_v2);
 fields("post_udp") ->
-    fields(udp);
+    fields(influxdb_udp);
 fields("post_api_v1") ->
-    fields(api_v1);
+    fields(influxdb_api_v1);
 fields("post_api_v2") ->
-    fields(api_v2);
+    fields(influxdb_api_v2);
 fields(basic) ->
     [
         {host,
@@ -73,23 +77,22 @@ fields(basic) ->
                 required => false, default => ms, desc => ?DESC("precision")
             })},
         {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})},
-        {type, mk(enum([influxdb]), #{required => true, desc => ?DESC("type")})},
         {name, mk(binary(), #{required => true, desc => ?DESC("name")})}
     ];
-fields(udp) ->
+fields(influxdb_udp) ->
     [
-        {protocol, mk(enum([udp]), #{required => true, desc => ?DESC("protocol_udp")})}
+        {type, mk(influxdb_udp, #{required => true, desc => ?DESC("type")})}
     ] ++ fields(basic);
-fields(api_v1) ->
+fields(influxdb_api_v1) ->
     [
-        {protocol, mk(enum([api_v1]), #{required => true, desc => ?DESC("protocol_api_v1")})},
+        {type, mk(influxdb_api_v1, #{required => true, desc => ?DESC("type")})},
         {database, mk(binary(), #{required => true, desc => ?DESC("database")})},
         {username, mk(binary(), #{required => true, desc => ?DESC("username")})},
         {password, mk(binary(), #{required => true, desc => ?DESC("password")})}
     ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic);
-fields(api_v2) ->
+fields(influxdb_api_v2) ->
     [
-        {protocol, mk(enum([api_v2]), #{required => true, desc => ?DESC("protocol_api_v2")})},
+        {type, mk(influxdb_api_v2, #{required => true, desc => ?DESC("type")})},
         {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})},
         {org, mk(binary(), #{required => true, desc => ?DESC("org")})},
         {token, mk(binary(), #{required => true, desc => ?DESC("token")})}
@@ -120,11 +123,11 @@ connector_examples(Method) ->
 values(Protocol, get) ->
     values(Protocol, post);
 values(Protocol, post) ->
+    Type = list_to_atom(io_lib:format("influxdb_~p", [Protocol])),
     ConnectorName = list_to_binary(io_lib:format("~p_connector", [Protocol])),
-    maps:merge(values(Protocol, put), #{type => influxdb, name => ConnectorName});
+    maps:merge(values(Protocol, put), #{type => Type, name => ConnectorName});
 values(udp, put) ->
     #{
-        protocol => udp,
         host => <<"127.0.0.1">>,
         port => 8089,
         precision => ms,
@@ -132,7 +135,6 @@ values(udp, put) ->
     };
 values(api_v1, put) ->
     #{
-        protocol => api_v1,
         host => <<"127.0.0.1">>,
         port => 8086,
         precision => ms,
@@ -144,7 +146,6 @@ values(api_v1, put) ->
     };
 values(api_v2, put) ->
     #{
-        protocol => api_v2,
         host => <<"127.0.0.1">>,
         port => 8086,
         precision => ms,
@@ -158,23 +159,64 @@ values(api_v2, put) ->
 %% internal functions
 
 start_client(InstId, Config) ->
-    io:format("InstId ~p~n", [InstId]),
-    client_config(InstId, Config).
-
-% ClientConfig = client_config(InstId, Config),
-% case influxdb:start_client(ClientConfig) of
-%     {ok, Client} ->
-%         true = influxdb:is_alive(Client),
-%         maybe_pool_size(Client, Params);
-%     {error, {already_started, Client0}} ->
-%         _ = influxdb:stop_client(Client0),
-%         {ok, Client} = influxdb:start_client(Options),
-%         true = influxdb:is_alive(Client),
-%         maybe_pool_size(Client, Params);
-%     {error, Reason} ->
-%         logger:log(error, "Initiate influxdb failed ~0p", [Reason]),
-%         error({start_pool_failed, ResId})
-% end.
+    ClientConfig = client_config(InstId, Config),
+    ?SLOG(info, #{
+        msg => "starting influxdb connector",
+        connector => InstId,
+        config => Config,
+        client_config => ClientConfig
+    }),
+    try
+        do_start_client(InstId, ClientConfig, Config)
+    catch
+        E:R:S ->
+            ?SLOG(error, #{
+                msg => "start influxdb connector error",
+                connector => InstId,
+                error => E,
+                reason => R,
+                stack => S
+            }),
+            {error, R}
+    end.
+
+do_start_client(InstId, ClientConfig, Config = #{egress := #{payload := PayloadBin}}) ->
+    case influxdb:start_client(ClientConfig) of
+        {ok, Client} ->
+            case influxdb:is_alive(Client) of
+                true ->
+                    Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin),
+                    ?SLOG(info, #{
+                        msg => "starting influxdb connector success",
+                        connector => InstId,
+                        client => Client
+                    }),
+                    #{client => Client, payload => Payload};
+                false ->
+                    ?SLOG(error, #{
+                        msg => "starting influxdb connector failed",
+                        connector => InstId,
+                        client => Client,
+                        reason => "client is not alive"
+                    }),
+                    {error, influxdb_client_not_alive}
+            end;
+        {error, {already_started, Client0}} ->
+            ?SLOG(info, #{
+                msg => "starting influxdb connector,find already started client",
+                connector => InstId,
+                old_client => Client0
+            }),
+            _ = influxdb:stop_client(Client0),
+            do_start_client(InstId, ClientConfig, Config);
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "starting influxdb connector failed",
+                connector => InstId,
+                reason => Reason
+            }),
+            {error, Reason}
+    end.
 
 client_config(
     _InstId,

+ 6 - 0
mix.exs

@@ -89,9 +89,15 @@ defmodule EMQXUmbrella.MixProject do
        github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true},
       # in conflict by grpc and eetcd
       {:gpb, "4.11.2", override: true, runtime: false},
+<<<<<<< HEAD
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}
     ] ++
       umbrella_apps() ++ enterprise_apps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep()
+=======
+      {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
+      {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.2"}
+    ] ++ umbrella_apps() ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep()
+>>>>>>> fix: add influxdb udp api_v1 api_v2 connector
   end
 
   defp umbrella_apps() do