Kaynağa Gözat

feat: refactor hstreamdb connector to to avoid resources leaking

JimMoen 2 yıl önce
ebeveyn
işleme
4ee44972b2

+ 22 - 11
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl

@@ -33,6 +33,9 @@
     desc/1
 ]).
 
+%% Allocatable resources
+-define(hstreamdb_client, hstreamdb_client).
+
 -define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)).
 -define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
 
@@ -43,17 +46,22 @@ callback_mode() -> always_sync.
 on_start(InstId, Config) ->
     start_client(InstId, Config).
 
-on_stop(InstId, #{client := Client, producer := Producer}) ->
-    StopClientRes = hstreamdb:stop_client(Client),
-    StopProducerRes = hstreamdb:stop_producer(Producer),
-    ?SLOG(info, #{
-        msg => "stop hstreamdb connector",
-        connector => InstId,
-        client => Client,
-        producer => Producer,
-        stop_client => StopClientRes,
-        stop_producer => StopProducerRes
-    }).
+on_stop(InstId, _State) ->
+    case emqx_resource:get_allocated_resources(InstId) of
+        #{client := Client, producer := Producer} ->
+            StopClientRes = hstreamdb:stop_client(Client),
+            StopProducerRes = hstreamdb:stop_producer(Producer),
+            ?SLOG(info, #{
+                msg => "stop hstreamdb connector",
+                connector => InstId,
+                client => Client,
+                producer => Producer,
+                stop_client => StopClientRes,
+                stop_producer => StopProducerRes
+            });
+        _ ->
+            ok
+    end.
 
 -define(FAILED_TO_APPLY_HRECORD_TEMPLATE,
     {error, {unrecoverable_error, failed_to_apply_hrecord_template}}
@@ -237,6 +245,9 @@ start_producer(
                 ),
                 record_template => record_template(Options)
             },
+            ok = emqx_resource:allocate_resource(InstId, ?hstreamdb_client, #{
+                client => Client, producer => Producer
+            }),
             {ok, State};
         {error, {already_started, Pid}} ->
             ?SLOG(info, #{