|
|
@@ -470,7 +470,51 @@ t_failed_creation_then_fix(Config) ->
|
|
|
delete_all_bridges(),
|
|
|
ok.
|
|
|
|
|
|
-t_table_removed(_Config) ->
|
|
|
+t_custom_timestamp(_Config) ->
|
|
|
+ HostsString = kafka_hosts_string_sasl(),
|
|
|
+ AuthSettings = valid_sasl_plain_settings(),
|
|
|
+ Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
|
|
|
+ Type = ?BRIDGE_TYPE,
|
|
|
+ Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
|
|
|
+ ResourceId = emqx_bridge_resource:resource_id(Type, Name),
|
|
|
+ KafkaTopic = "test-topic-one-partition",
|
|
|
+ MQTTTopic = <<"t/local/kafka">>,
|
|
|
+ emqx:subscribe(MQTTTopic),
|
|
|
+ Conf0 = config(#{
|
|
|
+ "authentication" => AuthSettings,
|
|
|
+ "kafka_hosts_string" => HostsString,
|
|
|
+ "local_topic" => MQTTTopic,
|
|
|
+ "kafka_topic" => KafkaTopic,
|
|
|
+ "instance_id" => ResourceId,
|
|
|
+ "ssl" => #{}
|
|
|
+ }),
|
|
|
+ Conf = emqx_utils_maps:deep_put(
|
|
|
+ [<<"kafka">>, <<"message">>, <<"timestamp">>],
|
|
|
+ Conf0,
|
|
|
+ <<"123">>
|
|
|
+ ),
|
|
|
+ {ok, _} = emqx_bridge:create(Type, erlang:list_to_atom(Name), Conf),
|
|
|
+ {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
|
|
|
+ ct:pal("base offset before testing ~p", [Offset]),
|
|
|
+ Time = erlang:unique_integer(),
|
|
|
+ BinTime = integer_to_binary(Time),
|
|
|
+ Msg = #{
|
|
|
+ clientid => BinTime,
|
|
|
+ payload => <<"payload">>,
|
|
|
+ timestamp => Time
|
|
|
+ },
|
|
|
+ emqx:publish(emqx_message:make(MQTTTopic, emqx_utils_json:encode(Msg))),
|
|
|
+ {ok, {_, [KafkaMsg]}} =
|
|
|
+ ?retry(
|
|
|
+ _Interval = 500,
|
|
|
+ _NAttempts = 20,
|
|
|
+ {ok, {_, [_]}} = brod:fetch(kafka_hosts(), KafkaTopic, _Partition = 0, Offset)
|
|
|
+ ),
|
|
|
+ ?assertMatch(#kafka_message{ts = 123, ts_type = create}, KafkaMsg),
|
|
|
+ delete_all_bridges(),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_nonexistent_topic(_Config) ->
|
|
|
HostsString = kafka_hosts_string_sasl(),
|
|
|
AuthSettings = valid_sasl_plain_settings(),
|
|
|
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
|