|
|
@@ -53,7 +53,6 @@
|
|
|
|
|
|
%% Internal exports used to execute code with ecpool worker
|
|
|
-export([
|
|
|
- check_database_status/1,
|
|
|
execute_sql_in_clickhouse_server_using_connection/2
|
|
|
]).
|
|
|
|
|
|
@@ -102,6 +101,14 @@ fields(config) ->
|
|
|
end,
|
|
|
desc => ?DESC("base_url")
|
|
|
}
|
|
|
+ )},
|
|
|
+ {connect_timeout,
|
|
|
+ hoconsc:mk(
|
|
|
+ emqx_schema:duration_ms(),
|
|
|
+ #{
|
|
|
+ default => <<"15s">>,
|
|
|
+ desc => ?DESC("connect_timeout")
|
|
|
+ }
|
|
|
)}
|
|
|
] ++ emqx_connector_schema_lib:relational_db_fields().
|
|
|
|
|
|
@@ -137,7 +144,8 @@ on_start(
|
|
|
#{
|
|
|
url := URL,
|
|
|
database := DB,
|
|
|
- pool_size := PoolSize
|
|
|
+ pool_size := PoolSize,
|
|
|
+ connect_timeout := ConnectTimeout
|
|
|
} = Config
|
|
|
) ->
|
|
|
?SLOG(info, #{
|
|
|
@@ -155,7 +163,10 @@ on_start(
|
|
|
{pool_size, PoolSize},
|
|
|
{pool, PoolName}
|
|
|
],
|
|
|
- InitState = #{poolname => PoolName},
|
|
|
+ InitState = #{
|
|
|
+ poolname => PoolName,
|
|
|
+ connect_timeout => ConnectTimeout
|
|
|
+ },
|
|
|
try
|
|
|
Templates = prepare_sql_templates(Config),
|
|
|
State = maps:merge(InitState, #{templates => Templates}),
|
|
|
@@ -282,18 +293,52 @@ on_stop(ResourceID, #{poolname := PoolName}) ->
|
|
|
%% on_get_status emqx_resouce callback and related functions
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
|
-on_get_status(_ResourceID, #{poolname := Pool} = _State) ->
|
|
|
- case
|
|
|
- emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:check_database_status/1)
|
|
|
- of
|
|
|
- true ->
|
|
|
- connected;
|
|
|
- false ->
|
|
|
- connecting
|
|
|
+on_get_status(
|
|
|
+ _InstId,
|
|
|
+ #{
|
|
|
+ poolname := PoolName,
|
|
|
+ connect_timeout := Timeout
|
|
|
+ } = State
|
|
|
+) ->
|
|
|
+ case do_get_status(PoolName, Timeout) of
|
|
|
+ ok ->
|
|
|
+ {connected, State};
|
|
|
+ {error, Reason} ->
|
|
|
+ {disconnected, State, Reason}
|
|
|
end.
|
|
|
|
|
|
-check_database_status(Connection) ->
|
|
|
- clickhouse:status(Connection).
|
|
|
+do_get_status(PoolName, Timeout) ->
|
|
|
+ Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
|
|
+ DoPerWorker =
|
|
|
+ fun(Worker) ->
|
|
|
+ case ecpool_worker:exec(Worker, fun clickhouse:detailed_status/1, Timeout) of
|
|
|
+ ok ->
|
|
|
+ ok;
|
|
|
+ {error, Reason} = Error ->
|
|
|
+ ?SLOG(error, #{
|
|
|
+ msg => "clickhouse_connector_get_status_failed",
|
|
|
+ reason => Reason,
|
|
|
+ worker => Worker
|
|
|
+ }),
|
|
|
+ Error
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
|
|
|
+ Results ->
|
|
|
+ case [E || {error, _} = E <- Results] of
|
|
|
+ [] ->
|
|
|
+ ok;
|
|
|
+ Errors ->
|
|
|
+ hd(Errors)
|
|
|
+ end
|
|
|
+ catch
|
|
|
+ exit:timeout ->
|
|
|
+ ?SLOG(error, #{
|
|
|
+ msg => "clickhouse_connector_pmap_failed",
|
|
|
+ reason => timeout
|
|
|
+ }),
|
|
|
+ {error, timeout}
|
|
|
+ end.
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
%% on_query emqx_resouce callback and related functions
|