Bläddra i källkod

feat(emqx_mysql_connector): implement the on_query callback

Shawn 4 år sedan
förälder
incheckning
f4bb589079

+ 3 - 0
apps/emqx_connector/etc/emqx_connector.conf

@@ -0,0 +1,3 @@
+##--------------------------------------------------------------------
+## EMQ X CONNECTOR Plugin
+##--------------------------------------------------------------------

+ 2 - 0
apps/emqx_connector/priv/emqx_connector.schema

@@ -0,0 +1,2 @@
+%%-*- mode: erlang -*-
+%% emqx_connector config mapping

+ 15 - 5
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -14,6 +14,8 @@
         , on_health_check/2
         ]).
 
+-export([connect/1]).
+
 -export([do_health_check/1]).
 
 fields("config") ->
@@ -51,11 +53,16 @@ on_stop(InstId, #{poolname := PoolName}) ->
     logger:info("stopping mysql connector: ~p", [InstId]),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, Request, AfterQuery, State) ->
-    io:format("== the demo log tracer ~p received request: ~p~nstate: ~p~n",
-        [InstId, Request, State]),
-    emqx_resource:query_success(AfterQuery),
-    "this is a demo log messages...".
+on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) ->
+    logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]),
+    case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL]}, no_handover) of
+        {error, Reason} ->
+            logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]),
+            emqx_resource:query_failure(AfterQuery);
+        _ ->
+            emqx_resource:query_success(AfterQuery)
+    end,
+    Result.
 
 on_health_check(_InstId, #{poolname := PoolName} = State) ->
     emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
@@ -66,3 +73,6 @@ do_health_check(Conn) ->
 %% ===================================================================
 reconn_interval(true) -> 15;
 reconn_interval(false) -> false.
+
+connect(Options) ->
+    mysql:start_link(Options).