Procházet zdrojové kódy

Merge pull request #11334 from lafirest/feature/greptimedb-bridge

Feature/greptimedb bridge
lafirest před 2 roky
rodič
revize
1874cd1223

+ 22 - 0
.ci/docker-compose-file/docker-compose-greptimedb.yaml

@@ -0,0 +1,22 @@
+version: '3.9'
+
+services:
+  greptimedb:
+    container_name: greptimedb
+    hostname: greptimedb
+    image: greptime/greptimedb:0.3.2
+    expose:
+      - "4000"
+      - "4001"
+    # uncomment for local testing
+    # ports:
+    #   - "4000:4000"
+    #   - "4001:4001"
+    restart: always
+    networks:
+      - emqx_bridge
+    command:
+      standalone start
+      --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd
+      --http-addr="0.0.0.0:4000"
+      --rpc-addr="0.0.0.0:4001"

+ 3 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -51,6 +51,9 @@ services:
       - 15670:5670
       # Kinesis
       - 4566:4566
+      # GreptimeDB
+      - 4000:4000
+      - 4001:4001
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 12 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -162,6 +162,18 @@
     "upstream": "hstreamdb:6570",
     "enabled": true
   },
+  {
+    "name": "greptimedb_http",
+    "listen": "0.0.0.0:4000",
+    "upstream": "greptimedb:4000",
+    "enabled": true
+  },
+  {
+    "name": "greptimedb_grpc",
+    "listen": "0.0.0.0:4001",
+    "upstream": "greptimedb:4001",
+    "enabled": true
+  },
   {
     "name": "kinesis",
     "listen": "0.0.0.0:4566",

+ 1 - 1
apps/emqx/src/emqx.app.src

@@ -2,7 +2,7 @@
 {application, emqx, [
     {id, "emqx"},
     {description, "EMQX Core"},
-    {vsn, "5.1.2"},
+    {vsn, "5.1.3"},
     {modules, []},
     {registered, []},
     {applications, [

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.1.23"},
+    {vsn, "0.1.24"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 2 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -89,7 +89,8 @@
     T == pulsar_producer;
     T == oracle;
     T == iotdb;
-    T == kinesis_producer
+    T == kinesis_producer;
+    T == greptimedb
 ).
 
 -define(ROOT_KEY, bridges).

+ 23 - 4
apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl

@@ -49,7 +49,8 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_oracle, <<"oracle">>, Method),
         api_ref(emqx_bridge_iotdb, <<"iotdb">>, Method),
         api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method),
-        api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer")
+        api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer"),
+        api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_grpc_v1")
     ].
 
 schema_modules() ->
@@ -75,7 +76,8 @@ schema_modules() ->
         emqx_bridge_oracle,
         emqx_bridge_iotdb,
         emqx_bridge_rabbitmq,
-        emqx_bridge_kinesis
+        emqx_bridge_kinesis,
+        emqx_bridge_greptimedb
     ].
 
 examples(Method) ->
@@ -121,7 +123,8 @@ resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
 resource_type(oracle) -> emqx_oracle;
 resource_type(iotdb) -> emqx_bridge_iotdb_impl;
 resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;
-resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer.
+resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer;
+resource_type(greptimedb) -> emqx_bridge_greptimedb_connector.
 
 fields(bridges) ->
     [
@@ -202,7 +205,8 @@ fields(bridges) ->
         influxdb_structs() ++
         redis_structs() ++
         pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++
-        kinesis_structs().
+        kinesis_structs() ++
+        greptimedb_structs().
 
 mongodb_structs() ->
     [
@@ -287,6 +291,21 @@ influxdb_structs() ->
         ]
     ].
 
+greptimedb_structs() ->
+    [
+        {Protocol,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_greptimedb, Protocol)),
+                #{
+                    desc => <<"GreptimeDB Bridge Config">>,
+                    required => false
+                }
+            )}
+     || Protocol <- [
+            greptimedb
+        ]
+    ].
+
 redis_structs() ->
     [
         {Type,

+ 19 - 0
apps/emqx_bridge_greptimedb/.gitignore

@@ -0,0 +1,19 @@
+.rebar3
+_*
+.eunit
+*.o
+*.beam
+*.plt
+*.swp
+*.swo
+.erlang.cookie
+ebin
+ log
+erl_crash.dump
+.rebar
+logs
+_build
+.idea
+*.iml
+rebar3.crashdump
+*~

+ 94 - 0
apps/emqx_bridge_greptimedb/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 27 - 0
apps/emqx_bridge_greptimedb/README.md

@@ -0,0 +1,27 @@
+# emqx_bridge_greptimedb
+This application houses the GreptimeDB data integration to EMQX.
+It provides the means to connect to GreptimeDB and publish messages to it.
+
+It implements connection management and interaction without the need for a
+ separate connector app, since it's not used for authentication and authorization
+ applications.
+
+## Docs
+
+For more information about GreptimeDB, please refer to [official
+ document](https://docs.greptime.com/).
+
+## Configurations
+
+Just like the InfluxDB data bridge but have some different parameters. Below are several important parameters:
+  - `server`: The IPv4 or IPv6 address or the hostname to connect to.
+  - `dbname`: The GreptimeDB database name.
+  - `write_syntax`: Like the `write_syntax` in `InfluxDB` conf, it's the conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported.
+
+
+# Contributing - [Mandatory]
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+See [BSL](./BSL.txt).

+ 2 - 0
apps/emqx_bridge_greptimedb/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+greptimedb

+ 12 - 0
apps/emqx_bridge_greptimedb/rebar.config

@@ -0,0 +1,12 @@
+{erl_opts, [
+    debug_info
+]}.
+
+{deps, [
+       {emqx_connector, {path, "../../apps/emqx_connector"}},
+       {emqx_resource, {path, "../../apps/emqx_resource"}},
+       {emqx_bridge, {path, "../../apps/emqx_bridge"}},
+       {greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.2"}}}
+]}.
+{plugins, [rebar3_path_deps]}.
+{project_plugins, [erlfmt]}.

+ 14 - 0
apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src

@@ -0,0 +1,14 @@
+{application, emqx_bridge_greptimedb, [
+    {description, "EMQX GreptimeDB Bridge"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        emqx_resource,
+        greptimedb
+    ]},
+    {env, []},
+    {modules, []},
+    {links, []}
+]}.

+ 299 - 0
apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl

@@ -0,0 +1,299 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_greptimedb).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+    conn_bridge_examples/1
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-type write_syntax() :: list().
+-reflect_type([write_syntax/0]).
+-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}).
+-export([to_influx_lines/1]).
+
+%% -------------------------------------------------------------------------------------------------
+%% api
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"greptimedb">> => #{
+                summary => <<"Greptimedb HTTP API V2 Bridge">>,
+                value => values("greptimedb", Method)
+            }
+        }
+    ].
+
+values(Protocol, get) ->
+    values(Protocol, post);
+values("greptimedb", post) ->
+    SupportUint = <<"uint_value=${payload.uint_key}u,">>,
+    TypeOpts = #{
+        bucket => <<"example_bucket">>,
+        org => <<"examlpe_org">>,
+        token => <<"example_token">>,
+        server => <<"127.0.0.1:4001">>
+    },
+    values(common, "greptimedb", SupportUint, TypeOpts);
+values(Protocol, put) ->
+    values(Protocol, post).
+
+values(common, Protocol, SupportUint, TypeOpts) ->
+    CommonConfigs = #{
+        type => list_to_atom(Protocol),
+        name => <<"demo">>,
+        enable => true,
+        local_topic => <<"local/topic/#">>,
+        write_syntax =>
+            <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
+                "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary,
+                "bool=${payload.bool}">>,
+        precision => ms,
+        resource_opts => #{
+            batch_size => 100,
+            batch_time => <<"20ms">>
+        },
+        server => <<"127.0.0.1:4001">>,
+        ssl => #{enable => false}
+    },
+    maps:merge(TypeOpts, CommonConfigs).
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+namespace() -> "bridge_greptimedb".
+
+roots() -> [].
+
+fields("post_grpc_v1") ->
+    method_fields(post, greptimedb);
+fields("put_grpc_v1") ->
+    method_fields(put, greptimedb);
+fields("get_grpc_v1") ->
+    method_fields(get, greptimedb);
+fields(Type) when
+    Type == greptimedb
+->
+    greptimedb_bridge_common_fields() ++
+        connector_fields(Type).
+
+method_fields(post, ConnectorType) ->
+    greptimedb_bridge_common_fields() ++
+        connector_fields(ConnectorType) ++
+        type_name_fields(ConnectorType);
+method_fields(get, ConnectorType) ->
+    greptimedb_bridge_common_fields() ++
+        connector_fields(ConnectorType) ++
+        type_name_fields(ConnectorType) ++
+        emqx_bridge_schema:status_fields();
+method_fields(put, ConnectorType) ->
+    greptimedb_bridge_common_fields() ++
+        connector_fields(ConnectorType).
+
+greptimedb_bridge_common_fields() ->
+    emqx_bridge_schema:common_bridge_fields() ++
+        [
+            {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
+            {write_syntax, fun write_syntax/1}
+        ] ++
+        emqx_resource_schema:fields("resource_opts").
+
+connector_fields(Type) ->
+    emqx_bridge_greptimedb_connector:fields(Type).
+
+type_name_fields(Type) ->
+    [
+        {type, mk(Type, #{required => true, desc => ?DESC("desc_type")})},
+        {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
+    ].
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for Greptimedb using `", string:to_upper(Method), "` method."];
+desc(greptimedb) ->
+    ?DESC(emqx_bridge_greptimedb_connector, "greptimedb");
+desc(_) ->
+    undefined.
+
+write_syntax(type) ->
+    ?MODULE:write_syntax();
+write_syntax(required) ->
+    true;
+write_syntax(validator) ->
+    [?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")];
+write_syntax(converter) ->
+    fun to_influx_lines/1;
+write_syntax(desc) ->
+    ?DESC("write_syntax");
+write_syntax(format) ->
+    <<"sql">>;
+write_syntax(_) ->
+    undefined.
+
+to_influx_lines(RawLines) ->
+    try
+        influx_lines(str(RawLines), [])
+    catch
+        _:Reason:Stacktrace ->
+            Msg = lists:flatten(
+                io_lib:format("Unable to parse Greptimedb line protocol: ~p", [RawLines])
+            ),
+            ?SLOG(error, #{msg => Msg, error_reason => Reason, stacktrace => Stacktrace}),
+            throw(Msg)
+    end.
+
+-define(MEASUREMENT_ESC_CHARS, [$,, $\s]).
+-define(TAG_FIELD_KEY_ESC_CHARS, [$,, $=, $\s]).
+-define(FIELD_VAL_ESC_CHARS, [$", $\\]).
+% Common separator for both tags and fields
+-define(SEP, $\s).
+-define(MEASUREMENT_TAG_SEP, $,).
+-define(KEY_SEP, $=).
+-define(VAL_SEP, $,).
+-define(NON_EMPTY, [_ | _]).
+
+influx_lines([] = _RawLines, Acc) ->
+    ?NON_EMPTY = lists:reverse(Acc);
+influx_lines(RawLines, Acc) ->
+    {Acc1, RawLines1} = influx_line(string:trim(RawLines, leading, "\s\n"), Acc),
+    influx_lines(RawLines1, Acc1).
+
+influx_line([], Acc) ->
+    {Acc, []};
+influx_line(Line, Acc) ->
+    {?NON_EMPTY = Measurement, Line1} = measurement(Line),
+    {Tags, Line2} = tags(Line1),
+    {?NON_EMPTY = Fields, Line3} = influx_fields(Line2),
+    {Timestamp, Line4} = timestamp(Line3),
+    {
+        [
+            #{
+                measurement => Measurement,
+                tags => Tags,
+                fields => Fields,
+                timestamp => Timestamp
+            }
+            | Acc
+        ],
+        Line4
+    }.
+
+measurement(Line) ->
+    unescape(?MEASUREMENT_ESC_CHARS, [?MEASUREMENT_TAG_SEP, ?SEP], Line, []).
+
+tags([?MEASUREMENT_TAG_SEP | Line]) ->
+    tags1(Line, []);
+tags(Line) ->
+    {[], Line}.
+
+%% Empty line is invalid as fields are required after tags,
+%% need to break recursion here and fail later on parsing fields
+tags1([] = Line, Acc) ->
+    {lists:reverse(Acc), Line};
+%% Matching non empty Acc treats lines like "m, field=field_val" invalid
+tags1([?SEP | _] = Line, ?NON_EMPTY = Acc) ->
+    {lists:reverse(Acc), Line};
+tags1(Line, Acc) ->
+    {Tag, Line1} = tag(Line),
+    tags1(Line1, [Tag | Acc]).
+
+tag(Line) ->
+    {?NON_EMPTY = Key, Line1} = key(Line),
+    {?NON_EMPTY = Val, Line2} = tag_val(Line1),
+    {{Key, Val}, Line2}.
+
+tag_val(Line) ->
+    {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP], Line, []),
+    {Val, strip_l(Line1, ?VAL_SEP)}.
+
+influx_fields([?SEP | Line]) ->
+    fields1(string:trim(Line, leading, "\s"), []).
+
+%% Timestamp is optional, so fields may be at the very end of the line
+fields1([Ch | _] = Line, Acc) when Ch =:= ?SEP; Ch =:= $\n ->
+    {lists:reverse(Acc), Line};
+fields1([] = Line, Acc) ->
+    {lists:reverse(Acc), Line};
+fields1(Line, Acc) ->
+    {Field, Line1} = field(Line),
+    fields1(Line1, [Field | Acc]).
+
+field(Line) ->
+    {?NON_EMPTY = Key, Line1} = key(Line),
+    {Val, Line2} = field_val(Line1),
+    {{Key, Val}, Line2}.
+
+field_val([$" | Line]) ->
+    {Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []),
+    %% Quoted val can be empty
+    {Val, strip_l(Line1, ?VAL_SEP)};
+field_val(Line) ->
+    %% Unquoted value should not be un-escaped according to Greptimedb protocol,
+    %% as it can only hold float, integer, uinteger or boolean value.
+    %% However, as templates are possible, un-escaping is applied here,
+    %% which also helps to detect some invalid lines, e.g.: "m,tag=1 field= ${timestamp}"
+    {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP, $\n], Line, []),
+    {?NON_EMPTY = Val, strip_l(Line1, ?VAL_SEP)}.
+
+timestamp([?SEP | Line]) ->
+    Line1 = string:trim(Line, leading, "\s"),
+    %% Similarly to unquoted field value, un-escape a timestamp to validate and handle
+    %% potentially escaped characters in a template
+    {T, Line2} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?SEP, $\n], Line1, []),
+    {timestamp1(T), Line2};
+timestamp(Line) ->
+    {undefined, Line}.
+
+timestamp1(?NON_EMPTY = Ts) -> Ts;
+timestamp1(_Ts) -> undefined.
+
+%% Common for both tag and field keys
+key(Line) ->
+    {Key, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?KEY_SEP], Line, []),
+    {Key, strip_l(Line1, ?KEY_SEP)}.
+
+%% Only strip a character between pairs, don't strip it(and let it fail)
+%% if the char to be stripped is at the end, e.g.: m,tag=val, field=val
+strip_l([Ch, Ch1 | Str], Ch) when Ch1 =/= ?SEP ->
+    [Ch1 | Str];
+strip_l(Str, _Ch) ->
+    Str.
+
+unescape(EscapeChars, SepChars, [$\\, Char | T], Acc) ->
+    ShouldEscapeBackslash = lists:member($\\, EscapeChars),
+    Acc1 =
+        case lists:member(Char, EscapeChars) of
+            true -> [Char | Acc];
+            false when not ShouldEscapeBackslash -> [Char, $\\ | Acc]
+        end,
+    unescape(EscapeChars, SepChars, T, Acc1);
+unescape(EscapeChars, SepChars, [Char | T] = L, Acc) ->
+    IsEscapeChar = lists:member(Char, EscapeChars),
+    case lists:member(Char, SepChars) of
+        true -> {lists:reverse(Acc), L};
+        false when not IsEscapeChar -> unescape(EscapeChars, SepChars, T, [Char | Acc])
+    end;
+unescape(_EscapeChars, _SepChars, [] = L, Acc) ->
+    {lists:reverse(Acc), L}.
+
+str(A) when is_atom(A) ->
+    atom_to_list(A);
+str(B) when is_binary(B) ->
+    binary_to_list(B);
+str(S) when is_list(S) ->
+    S.

+ 636 - 0
apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl

@@ -0,0 +1,636 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_greptimedb_connector).
+
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-behaviour(emqx_resource).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_batch_query/3,
+    on_get_status/2
+]).
+
+-export([
+    roots/0,
+    namespace/0,
+    fields/1,
+    desc/1
+]).
+
+%% only for test
+-ifdef(TEST).
+-export([is_unrecoverable_error/1]).
+-endif.
+
+-type ts_precision() :: ns | us | ms | s.
+
+%% Allocatable resources
+-define(greptime_client, greptime_client).
+
+-define(GREPTIMEDB_DEFAULT_PORT, 4001).
+
+-define(DEFAULT_DB, <<"public">>).
+
+-define(GREPTIMEDB_HOST_OPTIONS, #{
+    default_port => ?GREPTIMEDB_DEFAULT_PORT
+}).
+
+-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
+
+-define(AUTO_RECONNECT_S, 1).
+
+%% -------------------------------------------------------------------------------------------------
+%% resource callback
+callback_mode() -> always_sync.
+
+on_start(InstId, Config) ->
+    %% InstID as pool would be handled by greptimedb client
+    %% so there is no need to allocate pool_name here
+    %% See: greptimedb:start_client/1
+    start_client(InstId, Config).
+
+on_stop(InstId, _State) ->
+    case emqx_resource:get_allocated_resources(InstId) of
+        #{?greptime_client := Client} ->
+            greptimedb:stop_client(Client);
+        _ ->
+            ok
+    end.
+
+on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
+    case data_to_points(Data, SyntaxLines) of
+        {ok, Points} ->
+            ?tp(
+                greptimedb_connector_send_query,
+                #{points => Points, batch => false, mode => sync}
+            ),
+            do_query(InstId, Client, Points);
+        {error, ErrorPoints} ->
+            ?tp(
+                greptimedb_connector_send_query_error,
+                #{batch => false, mode => sync, error => ErrorPoints}
+            ),
+            log_error_points(InstId, ErrorPoints),
+            {error, ErrorPoints}
+    end.
+
+%% Once a Batched Data trans to points failed.
+%% This batch query failed
+on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
+    case parse_batch_data(InstId, BatchData, SyntaxLines) of
+        {ok, Points} ->
+            ?tp(
+                greptimedb_connector_send_query,
+                #{points => Points, batch => true, mode => sync}
+            ),
+            do_query(InstId, Client, Points);
+        {error, Reason} ->
+            ?tp(
+                greptimedb_connector_send_query_error,
+                #{batch => true, mode => sync, error => Reason}
+            ),
+            {error, {unrecoverable_error, Reason}}
+    end.
+
+on_get_status(_InstId, #{client := Client}) ->
+    case greptimedb:is_alive(Client) of
+        true ->
+            connected;
+        false ->
+            disconnected
+    end.
+
+%% -------------------------------------------------------------------------------------------------
+%% schema
+namespace() -> connector_greptimedb.
+
+roots() ->
+    [
+        {config, #{
+            type => hoconsc:union(
+                [
+                    hoconsc:ref(?MODULE, greptimedb)
+                ]
+            )
+        }}
+    ].
+
+fields(common) ->
+    [
+        {server, server()},
+        {precision,
+            %% The greptimedb only supports these 4 precision
+            mk(enum([ns, us, ms, s]), #{
+                required => false, default => ms, desc => ?DESC("precision")
+            })}
+    ];
+fields(greptimedb) ->
+    fields(common) ++
+        [
+            {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})},
+            {username, mk(binary(), #{desc => ?DESC("username")})},
+            {password,
+                mk(binary(), #{
+                    desc => ?DESC("password"),
+                    format => <<"password">>,
+                    sensitive => true,
+                    converter => fun emqx_schema:password_converter/2
+                })}
+        ] ++ emqx_connector_schema_lib:ssl_fields().
+
+server() ->
+    Meta = #{
+        required => false,
+        default => <<"127.0.0.1:4001">>,
+        desc => ?DESC("server"),
+        converter => fun convert_server/2
+    },
+    emqx_schema:servers_sc(Meta, ?GREPTIMEDB_HOST_OPTIONS).
+
+desc(common) ->
+    ?DESC("common");
+desc(greptimedb) ->
+    ?DESC("greptimedb").
+
+%% -------------------------------------------------------------------------------------------------
+%% internal functions
+
+start_client(InstId, Config) ->
+    ClientConfig = client_config(InstId, Config),
+    ?SLOG(info, #{
+        msg => "starting greptimedb connector",
+        connector => InstId,
+        config => emqx_utils:redact(Config),
+        client_config => emqx_utils:redact(ClientConfig)
+    }),
+    try do_start_client(InstId, ClientConfig, Config) of
+        Res = {ok, #{client := Client}} ->
+            ok = emqx_resource:allocate_resource(InstId, ?greptime_client, Client),
+            Res;
+        {error, Reason} ->
+            {error, Reason}
+    catch
+        E:R:S ->
+            ?tp(greptimedb_connector_start_exception, #{error => {E, R}}),
+            ?SLOG(warning, #{
+                msg => "start greptimedb connector error",
+                connector => InstId,
+                error => E,
+                reason => emqx_utils:redact(R),
+                stack => emqx_utils:redact(S)
+            }),
+            {error, R}
+    end.
+
+do_start_client(
+    InstId,
+    ClientConfig,
+    Config = #{write_syntax := Lines}
+) ->
+    Precision = maps:get(precision, Config, ms),
+    case greptimedb:start_client(ClientConfig) of
+        {ok, Client} ->
+            case greptimedb:is_alive(Client, true) of
+                true ->
+                    State = #{
+                        client => Client,
+                        dbname => proplists:get_value(dbname, ClientConfig, ?DEFAULT_DB),
+                        write_syntax => to_config(Lines, Precision)
+                    },
+                    ?SLOG(info, #{
+                        msg => "starting greptimedb connector success",
+                        connector => InstId,
+                        client => redact_auth(Client),
+                        state => redact_auth(State)
+                    }),
+                    {ok, State};
+                {false, Reason} ->
+                    ?tp(greptimedb_connector_start_failed, #{
+                        error => greptimedb_client_not_alive, reason => Reason
+                    }),
+                    ?SLOG(warning, #{
+                        msg => "failed_to_start_greptimedb_connector",
+                        connector => InstId,
+                        client => redact_auth(Client),
+                        reason => Reason
+                    }),
+                    %% no leak
+                    _ = greptimedb:stop_client(Client),
+                    {error, greptimedb_client_not_alive}
+            end;
+        {error, {already_started, Client0}} ->
+            ?tp(greptimedb_connector_start_already_started, #{}),
+            ?SLOG(info, #{
+                msg => "restarting greptimedb connector, found already started client",
+                connector => InstId,
+                old_client => redact_auth(Client0)
+            }),
+            _ = greptimedb:stop_client(Client0),
+            do_start_client(InstId, ClientConfig, Config);
+        {error, Reason} ->
+            ?tp(greptimedb_connector_start_failed, #{error => Reason}),
+            ?SLOG(warning, #{
+                msg => "failed_to_start_greptimedb_connector",
+                connector => InstId,
+                reason => Reason
+            }),
+            {error, Reason}
+    end.
+
+client_config(
+    InstId,
+    Config = #{
+        server := Server
+    }
+) ->
+    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?GREPTIMEDB_HOST_OPTIONS),
+    [
+        {endpoints, [{http, str(Host), Port}]},
+        {pool_size, erlang:system_info(schedulers)},
+        {pool, InstId},
+        {pool_type, random},
+        {auto_reconnect, ?AUTO_RECONNECT_S},
+        {timeunit, maps:get(precision, Config, ms)}
+    ] ++ protocol_config(Config).
+
+protocol_config(
+    #{
+        dbname := DbName,
+        ssl := SSL
+    } = Config
+) ->
+    [
+        {dbname, str(DbName)}
+    ] ++ auth(Config) ++
+        ssl_config(SSL).
+
+ssl_config(#{enable := false}) ->
+    [
+        {https_enabled, false}
+    ];
+ssl_config(SSL = #{enable := true}) ->
+    [
+        {https_enabled, true},
+        {transport, ssl},
+        {transport_opts, emqx_tls_lib:to_client_opts(SSL)}
+    ].
+
+auth(#{username := Username, password := Password}) ->
+    [
+        {auth, {basic, #{username => str(Username), password => str(Password)}}}
+    ];
+auth(_) ->
+    [].
+
+redact_auth(Term) ->
+    emqx_utils:redact(Term, fun is_auth_key/1).
+
+is_auth_key(Key) when is_binary(Key) ->
+    string:equal("authorization", Key, true);
+is_auth_key(_) ->
+    false.
+
+%% -------------------------------------------------------------------------------------------------
+%% Query
+do_query(InstId, Client, Points) ->
+    case greptimedb:write_batch(Client, Points) of
+        {ok, #{response := {affected_rows, #{value := Rows}}}} ->
+            ?SLOG(debug, #{
+                msg => "greptimedb write point success",
+                connector => InstId,
+                points => Points
+            }),
+            {ok, {affected_rows, Rows}};
+        {error, {unauth, _, _}} ->
+            ?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
+            ?SLOG(error, #{
+                msg => "greptimedb_authorization_failed",
+                client => redact_auth(Client),
+                connector => InstId
+            }),
+            {error, {unrecoverable_error, <<"authorization failure">>}};
+        {error, Reason} = Err ->
+            ?tp(greptimedb_connector_do_query_failure, #{error => Reason}),
+            ?SLOG(error, #{
+                msg => "greptimedb write point failed",
+                connector => InstId,
+                reason => Reason
+            }),
+            case is_unrecoverable_error(Err) of
+                true ->
+                    {error, {unrecoverable_error, Reason}};
+                false ->
+                    {error, {recoverable_error, Reason}}
+            end
+    end.
+
+%% -------------------------------------------------------------------------------------------------
+%% Tags & Fields Config Trans
+
+to_config(Lines, Precision) ->
+    to_config(Lines, [], Precision).
+
+to_config([], Acc, _Precision) ->
+    lists:reverse(Acc);
+to_config([Item0 | Rest], Acc, Precision) ->
+    Ts0 = maps:get(timestamp, Item0, ?DEFAULT_TIMESTAMP_TMPL),
+    {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
+    Item = #{
+        measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)),
+        timestamp => Ts,
+        precision => {FromPrecision, ToPrecision},
+        tags => to_kv_config(maps:get(tags, Item0)),
+        fields => to_kv_config(maps:get(fields, Item0))
+    },
+    to_config(Rest, [Item | Acc], Precision).
+
+%% pre-process the timestamp template
+%% returns a tuple of three elements:
+%% 1. The timestamp template itself.
+%% 2. The source timestamp precision (ms if the template ${timestamp} is used).
+%% 3. The target timestamp precision (configured for the client).
+preproc_tmpl_timestamp(undefined, Precision) ->
+    %% not configured, we default it to the message timestamp
+    preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision);
+preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
+    %% a const value is used which is very much unusual, but we have to add a special handling
+    {Ts, Precision, Precision};
+preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
+    preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
+preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
+    {emqx_placeholder:preproc_tmpl(Ts), ms, Precision};
+preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
+    %% a placehold is in use. e.g. ${payload.my_timestamp}
+    %% we can only hope it the value will be of the same precision in the configs
+    {emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}.
+
+to_kv_config(KVfields) ->
+    lists:foldl(
+        fun({K, V}, Acc) -> to_maps_config(K, V, Acc) end,
+        #{},
+        KVfields
+    ).
+
+to_maps_config(K, V, Res) ->
+    NK = emqx_placeholder:preproc_tmpl(bin(K)),
+    NV = emqx_placeholder:preproc_tmpl(bin(V)),
+    Res#{NK => NV}.
+
+%% -------------------------------------------------------------------------------------------------
+%% Tags & Fields Data Trans
+parse_batch_data(InstId, BatchData, SyntaxLines) ->
+    {Points, Errors} = lists:foldl(
+        fun({send_message, Data}, {ListOfPoints, ErrAccIn}) ->
+            case data_to_points(Data, SyntaxLines) of
+                {ok, Points} ->
+                    {[Points | ListOfPoints], ErrAccIn};
+                {error, ErrorPoints} ->
+                    log_error_points(InstId, ErrorPoints),
+                    {ListOfPoints, ErrAccIn + 1}
+            end
+        end,
+        {[], 0},
+        BatchData
+    ),
+    case Errors of
+        0 ->
+            {ok, lists:flatten(Points)};
+        _ ->
+            ?SLOG(error, #{
+                msg => io_lib:format("Greptimedb trans point failed, count: ~p", [Errors]),
+                connector => InstId,
+                reason => points_trans_failed
+            }),
+            {error, points_trans_failed}
+    end.
+
+-spec data_to_points(map(), [
+    #{
+        fields := [{binary(), binary()}],
+        measurement := binary(),
+        tags := [{binary(), binary()}],
+        timestamp := emqx_placeholder:tmpl_token() | integer(),
+        precision := {From :: ts_precision(), To :: ts_precision()}
+    }
+]) -> {ok, [map()]} | {error, term()}.
+data_to_points(Data, SyntaxLines) ->
+    lines_to_points(Data, SyntaxLines, [], []).
+
+%% When converting multiple rows data into Greptimedb Line Protocol, they are considered to be strongly correlated.
+%% And once a row fails to convert, all of them are considered to have failed.
+lines_to_points(_, [], Points, ErrorPoints) ->
+    case ErrorPoints of
+        [] ->
+            {ok, Points};
+        _ ->
+            %% ignore trans succeeded points
+            {error, ErrorPoints}
+    end;
+lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
+    is_list(Ts)
+->
+    TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
+    case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of
+        {ok, TsInt} ->
+            Item1 = Item#{timestamp => TsInt},
+            continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
+        {error, BadTs} ->
+            lines_to_points(Data, Rest, ResultPointsAcc, [
+                {error, {bad_timestamp, BadTs}} | ErrorPointsAcc
+            ])
+    end;
+lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
+    is_integer(Ts)
+->
+    continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
+
+parse_timestamp([TsInt]) when is_integer(TsInt) ->
+    {ok, TsInt};
+parse_timestamp([TsBin]) ->
+    try
+        {ok, binary_to_integer(TsBin)}
+    catch
+        _:_ ->
+            {error, TsBin}
+    end.
+
+continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
+    case line_to_point(Data, Item) of
+        {_, [#{fields := Fields}]} when map_size(Fields) =:= 0 ->
+            %% greptimedb client doesn't like empty field maps...
+            ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc],
+            lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1);
+        Point ->
+            lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
+    end.
+
+line_to_point(
+    Data,
+    #{
+        measurement := Measurement,
+        tags := Tags,
+        fields := Fields,
+        timestamp := Ts,
+        precision := Precision
+    } = Item
+) ->
+    {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
+    {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
+    TableName = emqx_placeholder:proc_tmpl(Measurement, Data),
+    {TableName, [
+        maps:without([precision, measurement], Item#{
+            tags => EncodedTags,
+            fields => EncodedFields,
+            timestamp => maybe_convert_time_unit(Ts, Precision)
+        })
+    ]}.
+
+maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) ->
+    erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)).
+
+time_unit(s) -> second;
+time_unit(ms) -> millisecond;
+time_unit(us) -> microsecond;
+time_unit(ns) -> nanosecond.
+
+maps_config_to_data(K, V, {Data, Res}) ->
+    KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
+    VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
+    NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
+    NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions),
+    case {NK0, NV} of
+        {[undefined], _} ->
+            {Data, Res};
+        %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
+        {_, [undefined | _]} ->
+            {Data, Res};
+        _ ->
+            NK = list_to_binary(NK0),
+            {Data, Res#{NK => value_type(NV)}}
+    end.
+
+value_type([Int, <<"i">>]) when
+    is_integer(Int)
+->
+    greptimedb_values:int64_value(Int);
+value_type([UInt, <<"u">>]) when
+    is_integer(UInt)
+->
+    greptimedb_values:uint64_value(UInt);
+value_type([<<"t">>]) ->
+    greptimedb_values:boolean_value(true);
+value_type([<<"T">>]) ->
+    greptimedb_values:boolean_value(true);
+value_type([true]) ->
+    greptimedb_values:boolean_value(true);
+value_type([<<"TRUE">>]) ->
+    greptimedb_values:boolean_value(true);
+value_type([<<"True">>]) ->
+    greptimedb_values:boolean_value(true);
+value_type([<<"f">>]) ->
+    greptimedb_values:boolean_value(false);
+value_type([<<"F">>]) ->
+    greptimedb_values:boolean_value(false);
+value_type([false]) ->
+    greptimedb_values:boolean_value(false);
+value_type([<<"FALSE">>]) ->
+    greptimedb_values:boolean_value(false);
+value_type([<<"False">>]) ->
+    greptimedb_values:boolean_value(false);
+value_type([Float]) when is_float(Float) ->
+    Float;
+value_type(Val) ->
+    #{values => #{string_values => Val}, datatype => 'STRING'}.
+
+key_filter(undefined) -> undefined;
+key_filter(Value) -> emqx_utils_conv:bin(Value).
+
+data_filter(undefined) -> undefined;
+data_filter(Int) when is_integer(Int) -> Int;
+data_filter(Number) when is_number(Number) -> Number;
+data_filter(Bool) when is_boolean(Bool) -> Bool;
+data_filter(Data) -> bin(Data).
+
+bin(Data) -> emqx_utils_conv:bin(Data).
+
+%% helper funcs
+log_error_points(InstId, Errs) ->
+    lists:foreach(
+        fun({error, Reason}) ->
+            ?SLOG(error, #{
+                msg => "greptimedb trans point failed",
+                connector => InstId,
+                reason => Reason
+            })
+        end,
+        Errs
+    ).
+
+convert_server(<<"http://", Server/binary>>, HoconOpts) ->
+    convert_server(Server, HoconOpts);
+convert_server(<<"https://", Server/binary>>, HoconOpts) ->
+    convert_server(Server, HoconOpts);
+convert_server(Server, HoconOpts) ->
+    emqx_schema:convert_servers(Server, HoconOpts).
+
+str(A) when is_atom(A) ->
+    atom_to_list(A);
+str(B) when is_binary(B) ->
+    binary_to_list(B);
+str(S) when is_list(S) ->
+    S.
+
+is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
+    true;
+is_unrecoverable_error(_) ->
+    false.
+
+%%===================================================================
+%% eunit tests
+%%===================================================================
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+is_auth_key_test_() ->
+    [
+        ?_assert(is_auth_key(<<"Authorization">>)),
+        ?_assertNot(is_auth_key(<<"Something">>)),
+        ?_assertNot(is_auth_key(89))
+    ].
+
+%% for coverage
+desc_test_() ->
+    [
+        ?_assertMatch(
+            {desc, _, _},
+            desc(common)
+        ),
+        ?_assertMatch(
+            {desc, _, _},
+            desc(greptimedb)
+        ),
+        ?_assertMatch(
+            {desc, _, _},
+            hocon_schema:field_schema(server(), desc)
+        ),
+        ?_assertMatch(
+            connector_greptimedb,
+            namespace()
+        )
+    ].
+-endif.

+ 939 - 0
apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl

@@ -0,0 +1,939 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_greptimedb_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, with_batch},
+        {group, without_batch}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {with_batch, [
+            {group, sync_query}
+        ]},
+        {without_batch, [
+            {group, sync_query}
+        ]},
+        {sync_query, [
+            {group, grpcv1_tcp}
+            %% uncomment tls when we are ready
+            %% {group, grpcv1_tls}
+        ]},
+        {grpcv1_tcp, TCs}
+        %%{grpcv1_tls, TCs}
+    ].
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    delete_all_bridges(),
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_connector_test_helpers:stop_apps([
+        emqx_conf, emqx_bridge, emqx_resource, emqx_rule_engine
+    ]),
+    _ = application:stop(emqx_connector),
+    ok.
+
+init_per_group(GreptimedbType, Config0) when
+    GreptimedbType =:= grpcv1_tcp;
+    GreptimedbType =:= grpcv1_tls
+->
+    #{
+        host := GreptimedbHost,
+        port := GreptimedbPort,
+        http_port := GreptimedbHttpPort,
+        use_tls := UseTLS,
+        proxy_name := ProxyName
+    } =
+        case GreptimedbType of
+            grpcv1_tcp ->
+                #{
+                    host => os:getenv("GREPTIMEDB_GRPCV1_TCP_HOST", "toxiproxy"),
+                    port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TCP_PORT", "4001")),
+                    http_port => list_to_integer(os:getenv("GREPTIMEDB_HTTP_PORT", "4000")),
+                    use_tls => false,
+                    proxy_name => "greptimedb_grpc"
+                };
+            grpcv1_tls ->
+                #{
+                    host => os:getenv("GREPTIMEDB_GRPCV1_TLS_HOST", "toxiproxy"),
+                    port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TLS_PORT", "4001")),
+                    http_port => list_to_integer(os:getenv("GREPTIMEDB_HTTP_PORT", "4000")),
+                    use_tls => true,
+                    proxy_name => "greptimedb_tls"
+                }
+        end,
+    case emqx_common_test_helpers:is_tcp_server_available(GreptimedbHost, GreptimedbHttpPort) of
+        true ->
+            ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+            ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            ok = start_apps(),
+            {ok, _} = application:ensure_all_started(emqx_connector),
+            {ok, _} = application:ensure_all_started(greptimedb),
+            emqx_mgmt_api_test_util:init_suite(),
+            Config = [{use_tls, UseTLS} | Config0],
+            {Name, ConfigString, GreptimedbConfig} = greptimedb_config(
+                grpcv1, GreptimedbHost, GreptimedbPort, Config
+            ),
+            EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_http">>,
+            EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin),
+            {EHttpcTransport, EHttpcTransportOpts} =
+                case UseTLS of
+                    true -> {tls, [{verify, verify_none}]};
+                    false -> {tcp, []}
+                end,
+            EHttpcPoolOpts = [
+                {host, GreptimedbHost},
+                {port, GreptimedbHttpPort},
+                {pool_size, 1},
+                {transport, EHttpcTransport},
+                {transport_opts, EHttpcTransportOpts}
+            ],
+            {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts),
+            [
+                {proxy_host, ProxyHost},
+                {proxy_port, ProxyPort},
+                {proxy_name, ProxyName},
+                {greptimedb_host, GreptimedbHost},
+                {greptimedb_port, GreptimedbPort},
+                {greptimedb_http_port, GreptimedbHttpPort},
+                {greptimedb_type, grpcv1},
+                {greptimedb_config, GreptimedbConfig},
+                {greptimedb_config_string, ConfigString},
+                {ehttpc_pool_name, EHttpcPoolName},
+                {greptimedb_name, Name}
+                | Config
+            ];
+        false ->
+            {skip, no_greptimedb}
+    end;
+init_per_group(sync_query, Config) ->
+    [{query_mode, sync} | Config];
+init_per_group(with_batch, Config) ->
+    [{batch_size, 100} | Config];
+init_per_group(without_batch, Config) ->
+    [{batch_size, 1} | Config];
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(Group, Config) when
+    Group =:= grpcv1_tcp;
+    Group =:= grpcv1_tls
+->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    EHttpcPoolName = ?config(ehttpc_pool_name, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ehttpc_sup:stop_pool(EHttpcPoolName),
+    delete_bridge(Config),
+    _ = application:stop(greptimedb),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_testcase(_Testcase, Config) ->
+    delete_all_rules(),
+    delete_all_bridges(),
+    Config.
+
+end_per_testcase(_Testcase, Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ok = snabbkaffe:stop(),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    delete_all_rules(),
+    delete_all_bridges(),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+start_apps() ->
+    %% some configs in emqx_conf app are mandatory
+    %% we want to make sure they are loaded before
+    %% ekka start in emqx_common_test_helpers:start_apps/1
+    emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
+    ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]).
+
+example_write_syntax() ->
+    %% N.B.: this single space character is relevant
+    <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
+        "${clientid}_int_value=${payload.int_key}i,",
+        "uint_value=${payload.uint_key}u,"
+        "float_value=${payload.float_key},", "undef_value=${payload.undef},",
+        "${undef_key}=\"hard-coded-value\",", "bool=${payload.bool}">>.
+
+greptimedb_config(grpcv1 = Type, GreptimedbHost, GreptimedbPort, Config) ->
+    BatchSize = proplists:get_value(batch_size, Config, 100),
+    QueryMode = proplists:get_value(query_mode, Config, sync),
+    UseTLS = proplists:get_value(use_tls, Config, false),
+    Name = atom_to_binary(?MODULE),
+    WriteSyntax = example_write_syntax(),
+    ConfigString =
+        io_lib:format(
+            "bridges.greptimedb.~s {\n"
+            "  enable = true\n"
+            "  server = \"~p:~b\"\n"
+            "  dbname = public\n"
+            "  username = greptime_user\n"
+            "  password = greptime_pwd\n"
+            "  precision = ns\n"
+            "  write_syntax = \"~s\"\n"
+            "  resource_opts = {\n"
+            "    request_ttl = 1s\n"
+            "    query_mode = ~s\n"
+            "    batch_size = ~b\n"
+            "  }\n"
+            "  ssl {\n"
+            "    enable = ~p\n"
+            "    verify = verify_none\n"
+            "  }\n"
+            "}\n",
+            [
+                Name,
+                GreptimedbHost,
+                GreptimedbPort,
+                WriteSyntax,
+                QueryMode,
+                BatchSize,
+                UseTLS
+            ]
+        ),
+    {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}.
+
+parse_and_check(ConfigString, Type, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    TypeBin = greptimedb_type_bin(Type),
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+    Config.
+
+greptimedb_type_bin(grpcv1) ->
+    <<"greptimedb">>.
+
+create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
+    Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
+    Name = ?config(greptimedb_name, Config),
+    GreptimedbConfig0 = ?config(greptimedb_config, Config),
+    GreptimedbConfig = emqx_utils_maps:deep_merge(GreptimedbConfig0, Overrides),
+    emqx_bridge:create(Type, Name, GreptimedbConfig).
+
+delete_bridge(Config) ->
+    Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
+    Name = ?config(greptimedb_name, Config),
+    emqx_bridge:remove(Type, Name).
+
+delete_all_bridges() ->
+    lists:foreach(
+        fun(#{name := Name, type := Type}) ->
+            emqx_bridge:remove(Type, Name)
+        end,
+        emqx_bridge:list()
+    ).
+
+delete_all_rules() ->
+    lists:foreach(
+        fun(#{id := RuleId}) ->
+            ok = emqx_rule_engine:delete_rule(RuleId)
+        end,
+        emqx_rule_engine:get_rules()
+    ).
+
+create_rule_and_action_http(Config) ->
+    create_rule_and_action_http(Config, _Overrides = #{}).
+
+create_rule_and_action_http(Config, Overrides) ->
+    GreptimedbName = ?config(greptimedb_name, Config),
+    Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
+    BridgeId = emqx_bridge_resource:bridge_id(Type, GreptimedbName),
+    Params0 = #{
+        enable => true,
+        sql => <<"SELECT * FROM \"t/topic\"">>,
+        actions => [BridgeId]
+    },
+    Params = emqx_utils_maps:deep_merge(Params0, Overrides),
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+send_message(Config, Payload) ->
+    Name = ?config(greptimedb_name, Config),
+    Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
+    BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
+    Resp = emqx_bridge:send_message(BridgeId, Payload),
+    Resp.
+
+query_by_clientid(Topic, ClientId, Config) ->
+    GreptimedbHost = ?config(greptimedb_host, Config),
+    GreptimedbPort = ?config(greptimedb_http_port, Config),
+    EHttpcPoolName = ?config(ehttpc_pool_name, Config),
+    UseTLS = ?config(use_tls, Config),
+    Path = <<"/v1/sql?db=public">>,
+    Scheme =
+        case UseTLS of
+            true -> <<"https://">>;
+            false -> <<"http://">>
+        end,
+    URI = iolist_to_binary([
+        Scheme,
+        list_to_binary(GreptimedbHost),
+        ":",
+        integer_to_binary(GreptimedbPort),
+        Path
+    ]),
+    Headers = [
+        {"Authorization", "Basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q="},
+        {"Content-Type", "application/x-www-form-urlencoded"}
+    ],
+    Body = <<"sql=select * from \"", Topic/binary, "\" where clientid='", ClientId/binary, "'">>,
+    {ok, 200, _Headers, RawBody0} =
+        ehttpc:request(
+            EHttpcPoolName,
+            post,
+            {URI, Headers, Body},
+            _Timeout = 10_000,
+            _Retry = 0
+        ),
+
+    case emqx_utils_json:decode(RawBody0, [return_maps]) of
+        #{
+            <<"code">> := 0,
+            <<"output">> := [
+                #{
+                    <<"records">> := #{
+                        <<"rows">> := Rows,
+                        <<"schema">> := Schema
+                    }
+                }
+            ]
+        } ->
+            make_row(Schema, Rows);
+        #{
+            <<"code">> := Code,
+            <<"error">> := Error
+        } ->
+            GreptimedbName = ?config(greptimedb_name, Config),
+            Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
+            BridgeId = emqx_bridge_resource:bridge_id(Type, GreptimedbName),
+
+            ?SLOG(error, #{
+                msg => io_lib:format("Failed to query: ~p, ~p", [Code, Error]),
+                connector => BridgeId,
+                reason => Error
+            }),
+            %% TODO(dennis): check the error by code
+            case binary:match(Error, <<"Table not found">>) of
+                nomatch ->
+                    {error, Error};
+                _ ->
+                    %% Table not found
+                    #{}
+            end
+    end.
+
+make_row(null, _Rows) ->
+    #{};
+make_row(_Schema, []) ->
+    #{};
+make_row(#{<<"column_schemas">> := ColumnsSchemas}, [Row]) ->
+    Columns = lists:map(fun(#{<<"name">> := Name}) -> Name end, ColumnsSchemas),
+    maps:from_list(lists:zip(Columns, Row)).
+
+assert_persisted_data(ClientId, Expected, PersistedData) ->
+    ClientIdIntKey = <<ClientId/binary, "_int_value">>,
+    maps:foreach(
+        fun
+            (int_value, ExpectedValue) ->
+                ?assertMatch(
+                    ExpectedValue,
+                    maps:get(ClientIdIntKey, PersistedData)
+                );
+            (Key, ExpectedValue) ->
+                ?assertMatch(
+                    ExpectedValue,
+                    maps:get(atom_to_binary(Key), PersistedData),
+                    #{expected => ExpectedValue}
+                )
+        end,
+        Expected
+    ),
+    ok.
+
+resource_id(Config) ->
+    Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
+    Name = ?config(greptimedb_name, Config),
+    emqx_bridge_resource:resource_id(Type, Name).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_start_ok(Config) ->
+    QueryMode = ?config(query_mode, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{
+        int_key => -123,
+        bool => true,
+        float_key => 24.5,
+        uint_key => 123
+    },
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"payload">> => Payload,
+        <<"timestamp">> => erlang:system_time(millisecond)
+    },
+    ?check_trace(
+        begin
+            case QueryMode of
+                sync ->
+                    ?assertMatch({ok, _}, send_message(Config, SentData))
+            end,
+            PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
+            Expected = #{
+                bool => true,
+                int_value => -123,
+                uint_value => 123,
+                float_value => 24.5,
+                payload => emqx_utils_json:encode(Payload)
+            },
+            assert_persisted_data(ClientId, Expected, PersistedData),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(greptimedb_connector_send_query, Trace0),
+            ?assertMatch([#{points := [_]}], Trace),
+            [#{points := [Point0]}] = Trace,
+            {Measurement, [Point]} = Point0,
+            ct:pal("sent point: ~p", [Point]),
+            ?assertMatch(
+                <<_/binary>>,
+                Measurement
+            ),
+            ?assertMatch(
+                #{
+                    fields := #{},
+                    tags := #{},
+                    timestamp := TS
+                } when is_integer(TS),
+                Point
+            ),
+            #{fields := Fields} = Point,
+            ?assert(lists:all(fun is_binary/1, maps:keys(Fields))),
+            ?assertNot(maps:is_key(<<"undefined">>, Fields)),
+            ?assertNot(maps:is_key(<<"undef_value">>, Fields)),
+            ok
+        end
+    ),
+    ok.
+
+t_start_already_started(Config) ->
+    Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
+    Name = ?config(greptimedb_name, Config),
+    GreptimedbConfigString = ?config(greptimedb_config_string, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    ResourceId = resource_id(Config),
+    TypeAtom = binary_to_atom(Type),
+    NameAtom = binary_to_atom(Name),
+    {ok, #{bridges := #{TypeAtom := #{NameAtom := GreptimedbConfigMap}}}} = emqx_hocon:check(
+        emqx_bridge_schema, GreptimedbConfigString
+    ),
+    ?check_trace(
+        emqx_bridge_greptimedb_connector:on_start(ResourceId, GreptimedbConfigMap),
+        fun(Result, Trace) ->
+            ?assertMatch({ok, _}, Result),
+            ?assertMatch([_], ?of_kind(greptimedb_connector_start_already_started, Trace)),
+            ok
+        end
+    ),
+    ok.
+
+t_start_ok_timestamp_write_syntax(Config) ->
+    GreptimedbType = ?config(greptimedb_type, Config),
+    GreptimedbName = ?config(greptimedb_name, Config),
+    GreptimedbConfigString0 = ?config(greptimedb_config_string, Config),
+    GreptimedbTypeCfg =
+        case GreptimedbType of
+            grpcv1 -> "greptimedb"
+        end,
+    WriteSyntax =
+        %% N.B.: this single space characters are relevant
+        <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
+            "${clientid}_int_value=${payload.int_key}i,",
+            "uint_value=${payload.uint_key}u,"
+            "bool=${payload.bool}", " ", "${timestamp}">>,
+    %% append this to override the config
+    GreptimedbConfigString1 =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  write_syntax = \"~s\"\n"
+            "}\n",
+            [GreptimedbTypeCfg, GreptimedbName, WriteSyntax]
+        ),
+    GreptimedbConfig1 = parse_and_check(
+        GreptimedbConfigString0 ++ GreptimedbConfigString1,
+        GreptimedbType,
+        GreptimedbName
+    ),
+    Config1 = [{greptimedb_config, GreptimedbConfig1} | Config],
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config1)
+    ),
+    ok.
+
+t_start_ok_no_subject_tags_write_syntax(Config) ->
+    GreptimedbType = ?config(greptimedb_type, Config),
+    GreptimedbName = ?config(greptimedb_name, Config),
+    GreptimedbConfigString0 = ?config(greptimedb_config_string, Config),
+    GreptimedbTypeCfg =
+        case GreptimedbType of
+            grpcv1 -> "greptimedb"
+        end,
+    WriteSyntax =
+        %% N.B.: this single space characters are relevant
+        <<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,",
+            "uint_value=${payload.uint_key}u,"
+            "bool=${payload.bool}", " ", "${timestamp}">>,
+    %% append this to override the config
+    GreptimedbConfigString1 =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  write_syntax = \"~s\"\n"
+            "}\n",
+            [GreptimedbTypeCfg, GreptimedbName, WriteSyntax]
+        ),
+    GreptimedbConfig1 = parse_and_check(
+        GreptimedbConfigString0 ++ GreptimedbConfigString1,
+        GreptimedbType,
+        GreptimedbName
+    ),
+    Config1 = [{greptimedb_config, GreptimedbConfig1} | Config],
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config1)
+    ),
+    ok.
+
+t_const_timestamp(Config) ->
+    QueryMode = ?config(query_mode, Config),
+    Const = erlang:system_time(nanosecond),
+    ConstBin = integer_to_binary(Const),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(
+            Config,
+            #{
+                <<"write_syntax">> =>
+                    <<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>>
+            }
+        )
+    ),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{<<"foo">> => 123},
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"payload">> => Payload,
+        <<"timestamp">> => erlang:system_time(millisecond)
+    },
+    case QueryMode of
+        sync ->
+            ?assertMatch({ok, _}, send_message(Config, SentData))
+    end,
+    PersistedData = query_by_clientid(<<"mqtt">>, ClientId, Config),
+    Expected = #{foo => 123},
+    assert_persisted_data(ClientId, Expected, PersistedData),
+    TimeReturned = maps:get(<<"greptime_timestamp">>, PersistedData),
+    ?assertEqual(Const, TimeReturned).
+
+t_boolean_variants(Config) ->
+    QueryMode = ?config(query_mode, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    BoolVariants = #{
+        true => true,
+        false => false,
+        <<"t">> => true,
+        <<"f">> => false,
+        <<"T">> => true,
+        <<"F">> => false,
+        <<"TRUE">> => true,
+        <<"FALSE">> => false,
+        <<"True">> => true,
+        <<"False">> => false
+    },
+    maps:foreach(
+        fun(BoolVariant, Translation) ->
+            ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+            Payload = #{
+                int_key => -123,
+                bool => BoolVariant,
+                uint_key => 123
+            },
+            SentData = #{
+                <<"clientid">> => ClientId,
+                <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+                <<"timestamp">> => erlang:system_time(millisecond),
+                <<"payload">> => Payload
+            },
+            case QueryMode of
+                sync ->
+                    ?assertMatch({ok, _}, send_message(Config, SentData))
+            end,
+            case QueryMode of
+                sync -> ok
+            end,
+            PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
+            Expected = #{
+                bool => Translation,
+                int_value => -123,
+                uint_value => 123,
+                payload => emqx_utils_json:encode(Payload)
+            },
+            assert_persisted_data(ClientId, Expected, PersistedData),
+            ok
+        end,
+        BoolVariants
+    ),
+    ok.
+
+t_bad_timestamp(Config) ->
+    GreptimedbType = ?config(greptimedb_type, Config),
+    GreptimedbName = ?config(greptimedb_name, Config),
+    QueryMode = ?config(query_mode, Config),
+    BatchSize = ?config(batch_size, Config),
+    GreptimedbConfigString0 = ?config(greptimedb_config_string, Config),
+    GreptimedbTypeCfg =
+        case GreptimedbType of
+            grpcv1 -> "greptimedb"
+        end,
+    WriteSyntax =
+        %% N.B.: this single space characters are relevant
+        <<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,",
+            "uint_value=${payload.uint_key}u,"
+            "bool=${payload.bool}", " ", "bad_timestamp">>,
+    %% append this to override the config
+    GreptimedbConfigString1 =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  write_syntax = \"~s\"\n"
+            "}\n",
+            [GreptimedbTypeCfg, GreptimedbName, WriteSyntax]
+        ),
+    GreptimedbConfig1 = parse_and_check(
+        GreptimedbConfigString0 ++ GreptimedbConfigString1,
+        GreptimedbType,
+        GreptimedbName
+    ),
+    Config1 = [{greptimedb_config, GreptimedbConfig1} | Config],
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config1)
+    ),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{
+        int_key => -123,
+        bool => false,
+        uint_key => 123
+    },
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"timestamp">> => erlang:system_time(millisecond),
+        <<"payload">> => Payload
+    },
+    ?check_trace(
+        ?wait_async_action(
+            send_message(Config1, SentData),
+            #{?snk_kind := greptimedb_connector_send_query_error},
+            10_000
+        ),
+        fun(Result, _Trace) ->
+            ?assertMatch({_, {ok, _}}, Result),
+            {Return, {ok, _}} = Result,
+            IsBatch = BatchSize > 1,
+            case {QueryMode, IsBatch} of
+                {sync, false} ->
+                    ?assertEqual(
+                        {error, [
+                            {error, {bad_timestamp, <<"bad_timestamp">>}}
+                        ]},
+                        Return
+                    );
+                {sync, true} ->
+                    ?assertEqual({error, {unrecoverable_error, points_trans_failed}}, Return)
+            end,
+            ok
+        end
+    ),
+    ok.
+
+t_get_status(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    {ok, _} = create_bridge(Config),
+    ResourceId = resource_id(Config),
+    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
+    end),
+    ok.
+
+t_create_disconnected(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ?check_trace(
+        emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            ?assertMatch({ok, _}, create_bridge(Config))
+        end),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{error := greptimedb_client_not_alive, reason := _SomeReason}],
+                ?of_kind(greptimedb_connector_start_failed, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_start_error(Config) ->
+    %% simulate client start error
+    ?check_trace(
+        emqx_common_test_helpers:with_mock(
+            greptimedb,
+            start_client,
+            fun(_Config) -> {error, some_error} end,
+            fun() ->
+                ?wait_async_action(
+                    ?assertMatch({ok, _}, create_bridge(Config)),
+                    #{?snk_kind := greptimedb_connector_start_failed},
+                    10_000
+                )
+            end
+        ),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{error := some_error}],
+                ?of_kind(greptimedb_connector_start_failed, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_start_exception(Config) ->
+    %% simulate client start exception
+    ?check_trace(
+        emqx_common_test_helpers:with_mock(
+            greptimedb,
+            start_client,
+            fun(_Config) -> error(boom) end,
+            fun() ->
+                ?wait_async_action(
+                    ?assertMatch({ok, _}, create_bridge(Config)),
+                    #{?snk_kind := greptimedb_connector_start_exception},
+                    10_000
+                )
+            end
+        ),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{error := {error, boom}}],
+                ?of_kind(greptimedb_connector_start_exception, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_write_failure(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    QueryMode = ?config(query_mode, Config),
+    {ok, _} = create_bridge(Config),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{
+        int_key => -123,
+        bool => true,
+        float_key => 24.5,
+        uint_key => 123
+    },
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"timestamp">> => erlang:system_time(millisecond),
+        <<"payload">> => Payload
+    },
+    ?check_trace(
+        emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            case QueryMode of
+                sync ->
+                    ?wait_async_action(
+                        ?assertMatch(
+                            {error, {resource_error, #{reason := timeout}}},
+                            send_message(Config, SentData)
+                        ),
+                        #{?snk_kind := greptimedb_connector_do_query_failure, action := nack},
+                        16_000
+                    )
+            end
+        end),
+        fun(Trace) ->
+            case QueryMode of
+                sync ->
+                    ?assertMatch(
+                        [#{error := _} | _],
+                        ?of_kind(greptimedb_connector_do_query_failure, Trace)
+                    )
+            end,
+            ok
+        end
+    ),
+    ok.
+
+t_missing_field(Config) ->
+    BatchSize = ?config(batch_size, Config),
+    IsBatch = BatchSize > 1,
+    {ok, _} =
+        create_bridge(
+            Config,
+            #{
+                <<"resource_opts">> => #{<<"worker_pool_size">> => 1},
+                <<"write_syntax">> => <<"${clientid} foo=${foo}i">>
+            }
+        ),
+    %% note: we don't select foo here, but we interpolate it in the
+    %% fields, so it'll become undefined.
+    {ok, _} = create_rule_and_action_http(Config, #{sql => <<"select * from \"t/topic\"">>}),
+    ClientId0 = emqx_guid:to_hexstr(emqx_guid:gen()),
+    ClientId1 = emqx_guid:to_hexstr(emqx_guid:gen()),
+    %% Message with the field that we "forgot" to select in the rule
+    Msg0 = emqx_message:make(ClientId0, <<"t/topic">>, emqx_utils_json:encode(#{foo => 123})),
+    %% Message without any fields
+    Msg1 = emqx_message:make(ClientId1, <<"t/topic">>, emqx_utils_json:encode(#{})),
+    ?check_trace(
+        begin
+            emqx:publish(Msg0),
+            emqx:publish(Msg1),
+            NEvents = 1,
+            {ok, _} =
+                snabbkaffe:block_until(
+                    ?match_n_events(NEvents, #{
+                        ?snk_kind := greptimedb_connector_send_query_error
+                    }),
+                    _Timeout1 = 16_000
+                ),
+            ok
+        end,
+        fun(Trace) ->
+            PersistedData0 = query_by_clientid(ClientId0, ClientId0, Config),
+            PersistedData1 = query_by_clientid(ClientId1, ClientId1, Config),
+            case IsBatch of
+                true ->
+                    ?assertMatch(
+                        [#{error := points_trans_failed} | _],
+                        ?of_kind(greptimedb_connector_send_query_error, Trace)
+                    );
+                false ->
+                    ?assertMatch(
+                        [#{error := [{error, no_fields}]} | _],
+                        ?of_kind(greptimedb_connector_send_query_error, Trace)
+                    )
+            end,
+            %% nothing should have been persisted
+            ?assertEqual(#{}, PersistedData0),
+            ?assertEqual(#{}, PersistedData1),
+            ok
+        end
+    ),
+    ok.
+
+t_authentication_error_on_send_message(Config0) ->
+    ResourceId = resource_id(Config0),
+    QueryMode = proplists:get_value(query_mode, Config0, sync),
+    GreptimedbType = ?config(greptimedb_type, Config0),
+    GreptimeConfig0 = proplists:get_value(greptimedb_config, Config0),
+    GreptimeConfig =
+        case GreptimedbType of
+            grpcv1 -> GreptimeConfig0#{<<"password">> => <<"wrong_password">>}
+        end,
+    Config = lists:keyreplace(greptimedb_config, 1, Config0, {greptimedb_config, GreptimeConfig}),
+
+    % Fake initialization to simulate credential update after bridge was created.
+    emqx_common_test_helpers:with_mock(
+        greptimedb,
+        check_auth,
+        fun(_) ->
+            ok
+        end,
+        fun() ->
+            {ok, _} = create_bridge(Config),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 10,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            )
+        end
+    ),
+
+    % Now back to wrong credentials
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{
+        int_key => -123,
+        bool => true,
+        float_key => 24.5,
+        uint_key => 123
+    },
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"timestamp">> => erlang:system_time(millisecond),
+        <<"payload">> => Payload
+    },
+    case QueryMode of
+        sync ->
+            ?assertMatch(
+                {error, {unrecoverable_error, <<"authorization failure">>}},
+                send_message(Config, SentData)
+            )
+    end,
+    ok.

+ 155 - 0
apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl

@@ -0,0 +1,155 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_greptimedb_connector_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(GREPTIMEDB_RESOURCE_MOD, emqx_bridge_greptimedb_connector).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+    [].
+
+init_per_suite(Config) ->
+    GreptimedbTCPHost = os:getenv("GREPTIMEDB_GRPCV1_TCP_HOST", "toxiproxy"),
+    GreptimedbTCPPort = list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TCP_PORT", "4001")),
+    Servers = [{GreptimedbTCPHost, GreptimedbTCPPort}],
+    case emqx_common_test_helpers:is_all_tcp_servers_available(Servers) of
+        true ->
+            ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+            ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
+            {ok, _} = application:ensure_all_started(emqx_connector),
+            {ok, _} = application:ensure_all_started(greptimedb),
+            [
+                {greptimedb_tcp_host, GreptimedbTCPHost},
+                {greptimedb_tcp_port, GreptimedbTCPPort}
+                | Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_greptimedb);
+                _ ->
+                    {skip, no_greptimedb}
+            end
+    end.
+
+end_per_suite(_Config) ->
+    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
+    _ = application:stop(emqx_connector),
+    _ = application:stop(greptimedb),
+    ok.
+
+init_per_testcase(_, Config) ->
+    Config.
+
+end_per_testcase(_, _Config) ->
+    ok.
+
+% %%------------------------------------------------------------------------------
+% %% Testcases
+% %%------------------------------------------------------------------------------
+
+t_lifecycle(Config) ->
+    Host = ?config(greptimedb_tcp_host, Config),
+    Port = ?config(greptimedb_tcp_port, Config),
+    perform_lifecycle_check(
+        <<"emqx_bridge_greptimedb_connector_SUITE">>,
+        greptimedb_config(Host, Port)
+    ).
+
+perform_lifecycle_check(PoolName, InitialConfig) ->
+    {ok, #{config := CheckedConfig}} =
+        emqx_resource:check_config(?GREPTIMEDB_RESOURCE_MOD, InitialConfig),
+    % We need to add a write_syntax to the config since the connector
+    % expects this
+    FullConfig = CheckedConfig#{write_syntax => greptimedb_write_syntax()},
+    {ok, #{
+        state := #{client := #{pool := ReturnedPoolName}} = State,
+        status := InitialStatus
+    }} = emqx_resource:create_local(
+        PoolName,
+        ?CONNECTOR_RESOURCE_GROUP,
+        ?GREPTIMEDB_RESOURCE_MOD,
+        FullConfig,
+        #{}
+    ),
+    ?assertEqual(InitialStatus, connected),
+    % Instance should match the state and status of the just started resource
+    {ok, ?CONNECTOR_RESOURCE_GROUP, #{
+        state := State,
+        status := InitialStatus
+    }} =
+        emqx_resource:get_instance(PoolName),
+    ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
+    % % Perform query as further check that the resource is working as expected
+    ?assertMatch({ok, _}, emqx_resource:query(PoolName, test_query())),
+    ?assertEqual(ok, emqx_resource:stop(PoolName)),
+    % Resource will be listed still, but state will be changed and healthcheck will fail
+    % as the worker no longer exists.
+    {ok, ?CONNECTOR_RESOURCE_GROUP, #{
+        state := State,
+        status := StoppedStatus
+    }} =
+        emqx_resource:get_instance(PoolName),
+    ?assertEqual(stopped, StoppedStatus),
+    ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
+    % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
+    ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
+    % Can call stop/1 again on an already stopped instance
+    ?assertEqual(ok, emqx_resource:stop(PoolName)),
+    % Make sure it can be restarted and the healthchecks and queries work properly
+    ?assertEqual(ok, emqx_resource:restart(PoolName)),
+    % async restart, need to wait resource
+    timer:sleep(500),
+    {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
+        emqx_resource:get_instance(PoolName),
+    ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
+    ?assertMatch({ok, _}, emqx_resource:query(PoolName, test_query())),
+    % Stop and remove the resource in one go.
+    ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
+    ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
+    % Should not even be able to get the resource data out of ets now unlike just stopping.
+    ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
+
+% %%------------------------------------------------------------------------------
+% %% Helpers
+% %%------------------------------------------------------------------------------
+
+greptimedb_config(Host, Port) ->
+    Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])),
+    ResourceConfig = #{
+        <<"dbname">> => <<"public">>,
+        <<"server">> => Server,
+        <<"username">> => <<"greptime_user">>,
+        <<"password">> => <<"greptime_pwd">>
+    },
+    #{<<"config">> => ResourceConfig}.
+
+greptimedb_write_syntax() ->
+    [
+        #{
+            measurement => "${topic}",
+            tags => [{"clientid", "${clientid}"}],
+            fields => [{"payload", "${payload}"}],
+            timestamp => undefined
+        }
+    ].
+
+test_query() ->
+    {send_message, #{
+        <<"clientid">> => <<"something">>,
+        <<"payload">> => #{bool => true},
+        <<"topic">> => <<"connector_test">>,
+        <<"timestamp">> => 1678220316257
+    }}.

+ 348 - 0
apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl

@@ -0,0 +1,348 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_greptimedb_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(INVALID_LINES, [
+    "   ",
+    " \n",
+    "  \n\n\n  ",
+    "\n",
+    "  \n\n   \n  \n",
+    "measurement",
+    "measurement ",
+    "measurement,tag",
+    "measurement field",
+    "measurement,tag field",
+    "measurement,tag field ${timestamp}",
+    "measurement,tag=",
+    "measurement,tag=tag1",
+    "measurement,tag =",
+    "measurement field=",
+    "measurement field= ",
+    "measurement field = ",
+    "measurement, tag = field = ",
+    "measurement, tag = field = ",
+    "measurement, tag = tag_val field = field_val",
+    "measurement, tag = tag_val field = field_val ${timestamp}",
+    "measurement,= = ${timestamp}",
+    "measurement,t=a, f=a, ${timestamp}",
+    "measurement,t=a,t1=b, f=a,f1=b, ${timestamp}",
+    "measurement,t=a,t1=b, f=a,f1=b,",
+    "measurement,t=a, t1=b, f=a,f1=b,",
+    "measurement,t=a,,t1=b, f=a,f1=b,",
+    "measurement,t=a,,t1=b f=a,,f1=b",
+    "measurement,t=a,,t1=b f=a,f1=b ${timestamp}",
+    "measurement, f=a,f1=b",
+    "measurement, f=a,f1=b ${timestamp}",
+    "measurement,, f=a,f1=b ${timestamp}",
+    "measurement,, f=a,f1=b",
+    "measurement,, f=a,f1=b,, ${timestamp}",
+    "measurement f=a,f1=b,, ${timestamp}",
+    "measurement,t=a f=a,f1=b,, ${timestamp}",
+    "measurement,t=a f=a,f1=b,, ",
+    "measurement,t=a f=a,f1=b,,",
+    "measurement, t=a  f=a,f1=b",
+    "measurement,t=a f=a, f1=b",
+    "measurement,t=a f=a, f1=b ${timestamp}",
+    "measurement, t=a  f=a, f1=b ${timestamp}",
+    "measurement,t= a f=a,f1=b ${timestamp}",
+    "measurement,t= a f=a,f1 =b ${timestamp}",
+    "measurement, t = a f = a,f1 = b ${timestamp}",
+    "measurement,t=a f=a,f1=b \n ${timestamp}",
+    "measurement,t=a \n f=a,f1=b \n ${timestamp}",
+    "measurement,t=a \n f=a,f1=b \n ",
+    "\n measurement,t=a \n f=a,f1=b \n ${timestamp}",
+    "\n measurement,t=a \n f=a,f1=b \n",
+    %% not escaped backslash in a quoted field value is invalid
+    "measurement,tag=1 field=\"val\\1\""
+]).
+
+-define(VALID_LINE_PARSED_PAIRS, [
+    {"m1,tag=tag1 field=field1 ${timestamp1}", #{
+        measurement => "m1",
+        tags => [{"tag", "tag1"}],
+        fields => [{"field", "field1"}],
+        timestamp => "${timestamp1}"
+    }},
+    {"m2,tag=tag2 field=field2", #{
+        measurement => "m2",
+        tags => [{"tag", "tag2"}],
+        fields => [{"field", "field2"}],
+        timestamp => undefined
+    }},
+    {"m3 field=field3 ${timestamp3}", #{
+        measurement => "m3",
+        tags => [],
+        fields => [{"field", "field3"}],
+        timestamp => "${timestamp3}"
+    }},
+    {"m4 field=field4", #{
+        measurement => "m4",
+        tags => [],
+        fields => [{"field", "field4"}],
+        timestamp => undefined
+    }},
+    {"m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5}",
+        #{
+            measurement => "m5",
+            tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
+            fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
+            timestamp => "${timestamp5}"
+        }},
+    {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b", #{
+        measurement => "m6",
+        tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
+        fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+        timestamp => undefined
+    }},
+    {"m7,tag=tag7,tag_a=\"tag7a\",tag_b=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\"",
+        #{
+            measurement => "m7",
+            tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}],
+            fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}],
+            timestamp => undefined
+        }},
+    {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}",
+        #{
+            measurement => "m8",
+            tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
+            fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}],
+            timestamp => "${timestamp8}"
+        }},
+    {"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
+        #{
+            measurement => "m9",
+            tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
+            fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
+            timestamp => "${timestamp9}"
+        }},
+    {"m10 field=\"\" ${timestamp10}", #{
+        measurement => "m10",
+        tags => [],
+        fields => [{"field", ""}],
+        timestamp => "${timestamp10}"
+    }}
+]).
+
+-define(VALID_LINE_EXTRA_SPACES_PARSED_PAIRS, [
+    {"\n  m1,tag=tag1  field=field1  ${timestamp1} \n", #{
+        measurement => "m1",
+        tags => [{"tag", "tag1"}],
+        fields => [{"field", "field1"}],
+        timestamp => "${timestamp1}"
+    }},
+    {"  m2,tag=tag2  field=field2  ", #{
+        measurement => "m2",
+        tags => [{"tag", "tag2"}],
+        fields => [{"field", "field2"}],
+        timestamp => undefined
+    }},
+    {" m3  field=field3   ${timestamp3}  ", #{
+        measurement => "m3",
+        tags => [],
+        fields => [{"field", "field3"}],
+        timestamp => "${timestamp3}"
+    }},
+    {" \n m4  field=field4\n ", #{
+        measurement => "m4",
+        tags => [],
+        fields => [{"field", "field4"}],
+        timestamp => undefined
+    }},
+    {" \n m5,tag=tag5,tag_a=tag5a,tag_b=tag5b   field=field5,field_a=field5a,field_b=field5b    ${timestamp5}  \n",
+        #{
+            measurement => "m5",
+            tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}],
+            fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}],
+            timestamp => "${timestamp5}"
+        }},
+    {" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b  field=field6,field_a=field6a,field_b=field6b\n  ", #{
+        measurement => "m6",
+        tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
+        fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+        timestamp => undefined
+    }}
+]).
+
+-define(VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS, [
+    {"m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1}", #{
+        measurement => "m =1,",
+        tags => [{",tag =", "=tag 1,"}],
+        fields => [{",fie ld ", " field,1"}],
+        timestamp => "${timestamp1}"
+    }},
+    {"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{
+        measurement => "m2",
+        tags => [{"tag", "tag2"}],
+        fields => [{"field", "field \"2\",\n"}],
+        timestamp => undefined
+    }},
+    {"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{
+        measurement => "m 3",
+        tags => [],
+        fields => [{"field", "field3"}],
+        timestamp => "${payload.timestamp 3}"
+    }},
+    {"m4 field=\"\\\"field\\\\4\\\"\"", #{
+        measurement => "m4",
+        tags => [],
+        fields => [{"field", "\"field\\4\""}],
+        timestamp => undefined
+    }},
+    {
+        "m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
+        "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}",
+        #{
+            measurement => "m5,mA",
+            tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
+            fields => [
+                {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
+            ],
+            timestamp => "${timestamp5}"
+        }
+    },
+    {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"",
+        #{
+            measurement => "m6",
+            tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
+            fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+            timestamp => undefined
+        }},
+    {
+        "\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\","
+        "field_a=field7a,field_b=\"field7b\\\\\n\"",
+        #{
+            measurement => "  m7  ",
+            tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
+            fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}],
+            timestamp => undefined
+        }
+    },
+    {
+        "m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,"
+        "field_b=\"\\\"field\\\" = 8b\" ${timestamp8}",
+        #{
+            measurement => "m8",
+            tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
+            fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}],
+            timestamp => "${timestamp8}"
+        }
+    },
+    {"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
+        #{
+            measurement => "m\\9",
+            tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
+            fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
+            timestamp => "${timestamp9}"
+        }},
+    {"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{
+        measurement => "m,10",
+        tags => [],
+        %% backslash should not be un-escaped in tag key
+        fields => [{"\"field\\\\\"", ""}],
+        timestamp => "${timestamp10}"
+    }}
+]).
+
+-define(VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS, [
+    {" \n m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\,   \\,fie\\ ld\\ =\\ field\\,1  ${timestamp1}  ", #{
+        measurement => "m =1,",
+        tags => [{",tag =", "=tag 1,"}],
+        fields => [{",fie ld ", " field,1"}],
+        timestamp => "${timestamp1}"
+    }},
+    {" m2,tag=tag2   field=\"field \\\"2\\\",\n\"  ", #{
+        measurement => "m2",
+        tags => [{"tag", "tag2"}],
+        fields => [{"field", "field \"2\",\n"}],
+        timestamp => undefined
+    }},
+    {"  m\\ 3   field=\"field3\"   ${payload.timestamp\\ 3}  ", #{
+        measurement => "m 3",
+        tags => [],
+        fields => [{"field", "field3"}],
+        timestamp => "${payload.timestamp 3}"
+    }},
+    {"   m4       field=\"\\\"field\\\\4\\\"\"    ", #{
+        measurement => "m4",
+        tags => [],
+        fields => [{"field", "\"field\\4\""}],
+        timestamp => undefined
+    }},
+    {
+        " m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b   \\ field\\ =field5,"
+        "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b   ${timestamp5}    ",
+        #{
+            measurement => "m5,mA",
+            tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
+            fields => [
+                {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
+            ],
+            timestamp => "${timestamp5}"
+        }
+    },
+    {"  m6,tag=tag6,tag_a=tag6a,tag_b=tag6b   field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"  ",
+        #{
+            measurement => "m6",
+            tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
+            fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+            timestamp => undefined
+        }}
+]).
+
+invalid_write_syntax_line_test_() ->
+    [?_assertThrow(_, to_influx_lines(L)) || L <- ?INVALID_LINES].
+
+invalid_write_syntax_multiline_test_() ->
+    LinesList = [
+        join("\n", ?INVALID_LINES),
+        join("\n\n\n", ?INVALID_LINES),
+        join("\n\n", lists:reverse(?INVALID_LINES))
+    ],
+    [?_assertThrow(_, to_influx_lines(Lines)) || Lines <- LinesList].
+
+valid_write_syntax_test_() ->
+    test_pairs(?VALID_LINE_PARSED_PAIRS).
+
+valid_write_syntax_with_extra_spaces_test_() ->
+    test_pairs(?VALID_LINE_EXTRA_SPACES_PARSED_PAIRS).
+
+valid_write_syntax_escaped_chars_test_() ->
+    test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS).
+
+valid_write_syntax_escaped_chars_with_extra_spaces_test_() ->
+    test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS).
+
+test_pairs(PairsList) ->
+    {Lines, AllExpected} = lists:unzip(PairsList),
+    JoinedLines = join("\n", Lines),
+    JoinedLines1 = join("\n\n\n", Lines),
+    JoinedLines2 = join("\n\n", lists:reverse(Lines)),
+    SingleLineTests =
+        [
+            ?_assertEqual([Expected], to_influx_lines(Line))
+         || {Line, Expected} <- PairsList
+        ],
+    JoinedLinesTests =
+        [
+            ?_assertEqual(AllExpected, to_influx_lines(JoinedLines)),
+            ?_assertEqual(AllExpected, to_influx_lines(JoinedLines1)),
+            ?_assertEqual(lists:reverse(AllExpected), to_influx_lines(JoinedLines2))
+        ],
+    SingleLineTests ++ JoinedLinesTests.
+
+join(Sep, LinesList) ->
+    lists:flatten(lists:join(Sep, LinesList)).
+
+to_influx_lines(RawLines) ->
+    OldLevel = emqx_logger:get_primary_log_level(),
+    try
+        %% mute error logs from this call
+        emqx_logger:set_primary_log_level(none),
+        emqx_bridge_greptimedb:to_influx_lines(RawLines)
+    after
+        emqx_logger:set_primary_log_level(OldLevel)
+    end.

+ 1 - 0
apps/emqx_machine/priv/reboot_lists.eterm

@@ -85,6 +85,7 @@
             emqx_bridge_opents,
             emqx_bridge_clickhouse,
             emqx_bridge_dynamo,
+            emqx_bridge_greptimedb,
             emqx_bridge_hstreamdb,
             emqx_bridge_influxdb,
             emqx_bridge_iotdb,

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -3,7 +3,7 @@
     {id, "emqx_machine"},
     {description, "The EMQX Machine"},
     % strict semver, bump manually!
-    {vsn, "0.2.8"},
+    {vsn, "0.2.9"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_ctl]},

+ 1 - 0
changes/ee/feat-10647.en.md

@@ -0,0 +1 @@
+Add enterprise data bridge for [GreptimeDB](https://github.com/GreptimeTeam/greptimedb).

+ 2 - 0
mix.exs

@@ -171,6 +171,7 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_cassandra,
       :emqx_bridge_opents,
       :emqx_bridge_dynamo,
+      :emqx_bridge_greptimedb,
       :emqx_bridge_hstreamdb,
       :emqx_bridge_influxdb,
       :emqx_bridge_iotdb,
@@ -208,6 +209,7 @@ defmodule EMQXUmbrella.MixProject do
       {:crc32cer, "0.1.8", override: true},
       {:supervisor3, "1.1.12", override: true},
       {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true},
+      {:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.2", override: true},
       # The following two are dependencies of rabbit_common. They are needed here to
       # make mix not complain about conflicting versions
       {:thoas, github: "emqx/thoas", tag: "v1.0.0", override: true},

+ 50 - 0
rel/i18n/emqx_bridge_greptimedb.hocon

@@ -0,0 +1,50 @@
+emqx_bridge_greptimedb {
+
+config_enable.desc:
+"""Enable or disable this bridge."""
+
+config_enable.label:
+"""Enable Or Disable Bridge"""
+
+desc_config.desc:
+"""Configuration for an GreptimeDB bridge."""
+
+desc_config.label:
+"""GreptimeDB Bridge Configuration"""
+
+desc_name.desc:
+"""Bridge name."""
+
+desc_name.label:
+"""Bridge Name"""
+
+desc_type.desc:
+"""The Bridge Type."""
+
+desc_type.label:
+"""Bridge Type"""
+
+local_topic.desc:
+"""The MQTT topic filter to be forwarded to the GreptimeDB. All MQTT 'PUBLISH' messages with the topic
+matching the local_topic will be forwarded.</br>
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
+configured, then both the data got from the rule and the MQTT messages that match local_topic
+will be forwarded."""
+
+local_topic.label:
+"""Local Topic"""
+
+write_syntax.desc:
+"""Conf of GreptimeDB gRPC protocol to write data points.The write syntax is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported, which is the same as InfluxDB line protocol.
+See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) and
+[GreptimeDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
+TLDR:</br>
+```
+<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
+```
+Please note that a placeholder for an integer value must be annotated with a suffix `i`. For example `${payload.int_value}i`."""
+
+write_syntax.label:
+"""Write Syntax"""
+
+}

+ 47 - 0
rel/i18n/emqx_bridge_greptimedb_connector.hocon

@@ -0,0 +1,47 @@
+emqx_bridge_greptimedb_connector {
+
+dbname.desc:
+"""GreptimeDB database."""
+
+dbname.label:
+"""Database"""
+
+greptimedb.desc:
+"""GreptimeDB's protocol. Support GreptimeDB v1.8 and before."""
+
+greptimedb.label:
+"""HTTP API Protocol"""
+
+password.desc:
+"""GreptimeDB password."""
+
+password.label:
+"""Password"""
+
+precision.desc:
+"""GreptimeDB time precision."""
+
+precision.label:
+"""Time Precision"""
+
+protocol.desc:
+"""GreptimeDB's protocol. gRPC API."""
+
+protocol.label:
+"""Protocol"""
+
+server.desc:
+"""The IPv4 or IPv6 address or the hostname to connect to.</br>
+A host entry has the following form: `Host[:Port]`.</br>
+The GreptimeDB default port 8086 is used if `[:Port]` is not specified."""
+
+server.label:
+"""Server Host"""
+
+username.desc:
+"""GreptimeDB username."""
+
+username.label:
+"""Username"""
+
+}

+ 3 - 0
scripts/ct/run.sh

@@ -222,6 +222,9 @@ for dep in ${CT_DEPS}; do
         kinesis)
             FILES+=( '.ci/docker-compose-file/docker-compose-kinesis.yaml' )
             ;;
+        greptimedb)
+            FILES+=( '.ci/docker-compose-file/docker-compose-greptimedb.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1

+ 1 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -29,6 +29,7 @@ EPMD
 ERL
 ETS
 FIXME
+GreptimeDB
 GCM
 HMAC
 HOCON