Просмотр исходного кода

Merge pull request #9890 from zmstone/0202-fix-kafka-status-check

0202 fix kafka status check
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
5dbee76af4

+ 2 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -264,7 +264,8 @@ query(ResId, Request, Opts) ->
             case {IsBufferSupported, QM} of
                 {true, _} ->
                     %% only Kafka so far
-                    emqx_resource_buffer_worker:simple_async_query(ResId, Request);
+                    Opts1 = Opts#{is_buffer_supported => true},
+                    emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
                 {false, sync} ->
                     emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
                 {false, async} ->

+ 23 - 13
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -38,7 +38,7 @@
 
 -export([
     simple_sync_query/2,
-    simple_async_query/2
+    simple_async_query/3
 ]).
 
 -export([
@@ -130,10 +130,10 @@ simple_sync_query(Id, Request) ->
     Result.
 
 %% simple async-query the resource without batching and queuing.
--spec simple_async_query(id(), request()) -> term().
-simple_async_query(Id, Request) ->
+-spec simple_async_query(id(), request(), query_opts()) -> term().
+simple_async_query(Id, Request, QueryOpts0) ->
     Index = undefined,
-    QueryOpts = simple_query_opts(),
+    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     Ref = make_request_ref(),
     Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
@@ -851,23 +851,33 @@ handle_async_worker_down(Data0, Pid) ->
 call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
     ?tp(call_query_enter, #{id => Id, query => Query}),
     case emqx_resource_manager:ets_lookup(Id) of
-        {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
+        {ok, _Group, #{status := stopped}} ->
+            ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
+        {ok, _Group, Resource} ->
             QM =
                 case QM0 =:= configured of
-                    true -> maps:get(query_mode, Data);
+                    true -> maps:get(query_mode, Resource);
                     false -> QM0
                 end,
-            CBM = maps:get(callback_mode, Data),
-            CallMode = call_mode(QM, CBM),
-            apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
-        {ok, _Group, #{status := stopped}} ->
-            ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
-        {ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
-            ?RESOURCE_ERROR(not_connected, "resource not connected");
+            do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
         {error, not_found} ->
             ?RESOURCE_ERROR(not_found, "resource not found")
     end.
 
+do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) ->
+    %% The connector supprots buffer, send even in disconnected state
+    #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
+    CallMode = call_mode(QM, CBM),
+    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
+do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) ->
+    %% when calling from the buffer worker or other simple queries,
+    %% only apply the query fun when it's at connected status
+    #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
+    CallMode = call_mode(QM, CBM),
+    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
+do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
+    ?RESOURCE_ERROR(not_connected, "resource not connected").
+
 -define(APPLY_RESOURCE(NAME, EXPR, REQ),
     try
         %% if the callback module (connector) wants to return an error that

+ 1 - 1
lib-ee/emqx_ee_bridge/rebar.config

@@ -1,5 +1,5 @@
 {erl_opts, [debug_info]}.
-{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.4"}}}
+{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}}
        , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}}
        , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
        , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.7"}}}

+ 30 - 2
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -91,6 +91,7 @@ on_start(InstId, Config) ->
             {ok, #{
                 message_template => compile_message_template(MessageTemplate),
                 client_id => ClientId,
+                kafka_topic => KafkaTopic,
                 producers => Producers,
                 resource_id => ResourceID
             }};
@@ -234,8 +235,35 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
     %% do not apply the callback (which is basically to bump success or fail counter)
     ok.
 
-on_get_status(_InstId, _State) ->
-    connected.
+on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) ->
+    case wolff_client_sup:find_client(ClientId) of
+        {ok, Pid} ->
+            do_get_status(Pid, KafkaTopic);
+        {error, _Reason} ->
+            disconnected
+    end.
+
+do_get_status(Client, KafkaTopic) ->
+    %% TODO: add a wolff_producers:check_connectivity
+    case wolff_client:get_leader_connections(Client, KafkaTopic) of
+        {ok, Leaders} ->
+            %% Kafka is considered healthy as long as any of the partition leader is reachable
+            case
+                lists:any(
+                    fun({_Partition, Pid}) ->
+                        is_pid(Pid) andalso erlang:is_process_alive(Pid)
+                    end,
+                    Leaders
+                )
+            of
+                true ->
+                    connected;
+                false ->
+                    disconnected
+            end;
+        {error, _} ->
+            disconnected
+    end.
 
 %% Parse comma separated host:port list into a [{Host,Port}] list
 hosts(Hosts) when is_binary(Hosts) ->

+ 1 - 1
mix.exs

@@ -130,7 +130,7 @@ defmodule EMQXUmbrella.MixProject do
     [
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.7", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"},
+      {:wolff, github: "kafka4beam/wolff", tag: "1.7.5"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
       {:brod, github: "kafka4beam/brod", tag: "3.16.7"},