Просмотр исходного кода

fix(rocketmq): fix namespace error for RocketMQ

firest 1 год назад
Родитель
Сommit
f3e8037e0f
1 измененных файлов с 23 добавлено и 14 удалено
  1. 23 14
      apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl

+ 23 - 14
apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl

@@ -112,11 +112,13 @@ on_start(
     ),
     ),
     ClientId = client_id(InstanceId),
     ClientId = client_id(InstanceId),
     ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
     ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
-    ClientCfg = namespace(#{acl_info => ACLInfo}, Config),
+    Namespace = maps:get(namespace, Config, <<>>),
+    ClientCfg = #{acl_info => ACLInfo, namespace => Namespace},
 
 
     State = #{
     State = #{
         client_id => ClientId,
         client_id => ClientId,
         acl_info => ACLInfo,
         acl_info => ACLInfo,
+        namespace => Namespace,
         installed_channels => #{}
         installed_channels => #{}
     },
     },
 
 
@@ -139,12 +141,13 @@ on_add_channel(
     _InstId,
     _InstId,
     #{
     #{
         installed_channels := InstalledChannels,
         installed_channels := InstalledChannels,
+        namespace := Namespace,
         acl_info := ACLInfo
         acl_info := ACLInfo
     } = OldState,
     } = OldState,
     ChannelId,
     ChannelId,
     ChannelConfig
     ChannelConfig
 ) ->
 ) ->
-    {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo),
+    {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace),
     NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
     NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
     %% Update state
     %% Update state
     NewState = OldState#{installed_channels => NewInstalledChannels},
     NewState = OldState#{installed_channels => NewInstalledChannels},
@@ -152,16 +155,18 @@ on_add_channel(
 
 
 create_channel_state(
 create_channel_state(
     #{parameters := Conf} = _ChannelConfig,
     #{parameters := Conf} = _ChannelConfig,
-    ACLInfo
+    ACLInfo,
+    Namespace
 ) ->
 ) ->
     #{
     #{
         topic := Topic,
         topic := Topic,
-        sync_timeout := SyncTimeout
+        sync_timeout := SyncTimeout,
+        strategy := Strategy
     } = Conf,
     } = Conf,
     TopicTks = emqx_placeholder:preproc_tmpl(Topic),
     TopicTks = emqx_placeholder:preproc_tmpl(Topic),
-    ProducerOpts = make_producer_opts(Conf, ACLInfo),
+    ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy),
     Templates = parse_template(Conf),
     Templates = parse_template(Conf),
-    DispatchStrategy = parse_dispatch_strategy(Conf),
+    DispatchStrategy = parse_dispatch_strategy(Strategy),
     State = #{
     State = #{
         topic => Topic,
         topic => Topic,
         topic_tokens => TopicTks,
         topic_tokens => TopicTks,
@@ -330,11 +335,11 @@ parse_template([], Templates) ->
     Templates.
     Templates.
 
 
 %% returns a procedure to generate the produce context
 %% returns a procedure to generate the produce context
-parse_dispatch_strategy(#{strategy := roundrobin}) ->
+parse_dispatch_strategy(roundrobin) ->
     fun(_) ->
     fun(_) ->
         #{}
         #{}
     end;
     end;
-parse_dispatch_strategy(#{strategy := Template}) ->
+parse_dispatch_strategy(Template) ->
     Tokens = emqx_placeholder:preproc_tmpl(Template),
     Tokens = emqx_placeholder:preproc_tmpl(Template),
     fun(Msg) ->
     fun(Msg) ->
         #{
         #{
@@ -400,12 +405,20 @@ make_producer_opts(
         send_buffer := SendBuff,
         send_buffer := SendBuff,
         refresh_interval := RefreshInterval
         refresh_interval := RefreshInterval
     },
     },
-    ACLInfo
+    ACLInfo,
+    Namespace,
+    Strategy
 ) ->
 ) ->
     #{
     #{
         tcp_opts => [{sndbuf, SendBuff}],
         tcp_opts => [{sndbuf, SendBuff}],
         ref_topic_route_interval => RefreshInterval,
         ref_topic_route_interval => RefreshInterval,
-        acl_info => emqx_secret:wrap(ACLInfo)
+        acl_info => emqx_secret:wrap(ACLInfo),
+        namespace => Namespace,
+        partitioner =>
+            case Strategy of
+                roundrobin -> roundrobin;
+                _ -> key_dispatch
+            end
     }.
     }.
 
 
 acl_info(<<>>, _, _) ->
 acl_info(<<>>, _, _) ->
@@ -424,10 +437,6 @@ acl_info(AccessKey, SecretKey, SecurityToken) when is_binary(AccessKey) ->
 acl_info(_, _, _) ->
 acl_info(_, _, _) ->
     #{}.
     #{}.
 
 
-namespace(ClientCfg, Config) ->
-    Namespace = maps:get(namespace, Config, <<>>),
-    ClientCfg#{namespace => Namespace}.
-
 create_producers_map(ClientId) ->
 create_producers_map(ClientId) ->
     _ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]),
     _ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]),
     ok.
     ok.