yr 2 سال پیش
کامیت
ea0cb45d59

+ 23 - 0
.gitignore

@@ -0,0 +1,23 @@
+.eunit
+deps
+*.o
+*.beam
+*.plt
+erl_crash.dump
+ebin
+rel/example_project
+.concrete/DEV_MODE
+.rebar
+.erlang.mk/
+data/
+emqx_plugin_template.d
+.DS_Store
+erlang.mk
+_build/
+rebar.lock
+test/ct.cover.spec
+.rebar3
+/.idea/
+rebar3.crashdump
+rebar3
+/*.iml

+ 201 - 0
LICENSE

@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

+ 51 - 0
Makefile

@@ -0,0 +1,51 @@
+## shallow clone for speed
+
+BUILD_WITHOUT_QUIC ?= true
+export BUILD_WITHOUT_QUIC
+BUILD_WITHOUT_ROCKSDB ?= true
+export BUILD_WITHOUT_ROCKSDB
+
+REBAR ?= $(or $(shell which rebar3 2>/dev/null),$(CURDIR)/rebar3)
+REBAR_VERSION ?= 3.19.0-emqx-1
+
+.PHONY: all
+all: compile
+
+.PHONY: get-rebar3
+get-rebar3:
+	@$(CURDIR)/get-rebar3 $(REBAR_VERSION)
+
+$(REBAR):
+	$(MAKE) get-rebar3
+
+.PHONY: compile
+compile: $(REBAR)
+	$(REBAR) compile
+
+.PHONY: ct
+ct: $(REBAR)
+	$(REBAR) as test ct -v
+
+.PHONY: eunit
+eunit: $(REBAR)
+	$(REBAR) as test eunit
+
+.PHONY: xref
+xref: $(REBAR)
+	$(REBAR) xref
+
+.PHONY: cover
+cover: $(REBAR)
+	$(REBAR) cover
+
+.PHONY: clean
+clean: distclean
+
+.PHONY: distclean
+distclean:
+	@rm -rf _build
+	@rm -f data/app.*.config data/vm.*.args rebar.lock
+
+.PHONY: rel
+rel: $(REBAR)
+	$(REBAR) emqx_plugrel tar

+ 133 - 0
README.md

@@ -0,0 +1,133 @@
+# emqx_plugin_kafka
+
+Kafka plugin for EMQX >= V5.4.0
+
+## Usage
+
+### Release
+
+```shell
+> git clone https://github.com/jostar-y/emqx_plugin_kafka.git
+> cd emqx_plugin_kafka
+> make rel
+_build/default/emqx_plugrel/emqx_plugin_kafka-<vsn>.tar.gz
+```
+
+### Config
+
+#### Explain
+
+```shell
+> cat priv/emqx_plugin_kafka.hocon
+plugin_kafka {
+  // required
+  connection {
+    // Kafka client id.
+    // optional   default:client
+    client_id = "kafka_client"
+    // Kafka address.
+    // required
+    bootstrap_hosts = ["10.3.64.223:9192", "10.3.64.223:9292", "10.3.64.223:9392"]
+
+    // Reference type: kpro_connection:config().
+    // https://github.com/kafka4beam/kafka_protocol/blob/master/src/kpro_connection.erl
+    // optional   default:5s
+    connect_timeout = 5s
+    // enum: per_partition | per_broker
+    // optional   default:per_partition
+    connection_strategy = per_partition
+    // optional   default:5s
+    min_metadata_refresh_interval = 5s
+    // optional   default:true
+    query_api_versions = true
+    // optional   default:3s
+    request_timeout = 3s
+    sasl {
+      // enum:  plain | scram_sha_256 | scram_sha_512
+      mechanism = plain
+      username = "username"
+      password = "password"
+    }
+    ssl {
+      enable = false
+    }
+
+    //Emqx resource opts.
+    // optional   default:32s
+    health_check_interval = 32s
+  }
+
+  // optional
+  producer {
+    // Most number of bytes to collect into a produce request.
+    // optional   default:896KB
+    max_batch_bytes = 896KB
+    // enum:  no_compression | snappy | gzip
+    // optional   default:no_compression
+    compression = no_compression
+    // enum:  random | roundrobin | first_key_dispatch
+    // optional   default:random
+    partition_strategy = random
+
+    // Encode kafka value.
+    // enum:  plain | base64
+    // optional   default:plain
+    encode_payload_type = plain
+  }
+
+  // required
+  hooks = [
+    {
+      // Hook point.
+      // required
+      endpoint = message.publish
+      // Emqx topic pattern.
+      // 1. Cannot match the system message;
+      // 2. Cannot use filters that start with '+' or '#'.
+      // message required
+      filter = "test/#"
+      // Kafka topic, must be created in advance in Kafka.
+      // required
+      kafka_topic = emqx_test
+      // Matching template, value = ${.} indicates that all keys match
+      // optional default:{timestamp = "${.timestamp}", value = "${.}",key = "${.clientid}"}
+      kafka_message = {
+        timestamp = "${.timestamp}"
+        value = "${.}"
+        key = "${.clientid}"
+      }
+    }
+  ]
+}
+```
+
+Some examples in the directory `priv/example/`.
+
+#### Hook Point
+
+|          endpoint           |  filter  |
+| :-------------------------: | :------: |
+|       client.connect        |    /     |
+|       client.connack        |    /     |
+|      client.connected       |    /     |
+|     client.disconnected     |    /     |
+|     client.authenticate     |    /     |
+|      client.authorize       |    /     |
+|     client.authenticate     |    /     |
+| client.check_authz_complete |    /     |
+|       session.created       |    /     |
+|     session.subscribed      |    /     |
+|    session.unsubscribed     |    /     |
+|       session.resumed       |    /     |
+|      session.discarded      |    /     |
+|      session.takenover      |    /     |
+|     session.terminated      |    /     |
+|       message.publish       | required |
+|      message.delivered      | required |
+|        message.acked        | required |
+|       message.dropped       | required |
+
+#### Path
+
+- Default path: `emqx/etc/emqx_plugin_kafka.hocon`
+- Attach to path:  set system environment variables  `export EMQX_PLUGIN_KAFKA_CONF="absolute_path"`

+ 35 - 0
erlang_ls.config

@@ -0,0 +1,35 @@
+apps_dirs:
+  - "src/*"
+deps_dirs:
+  - "_build/default/lib/*"
+include_dirs:
+  - "_build/default/lib/*/include"
+exclude_unused_includes:
+  - "typerefl/include/types.hrl"
+  - "logger.hrl"
+diagnostics:
+  enabled:
+    - bound_var_in_pattern
+    - elvis
+    - unused_includes
+    - unused_macros
+    - crossref
+    # - dialyzer
+    - compiler
+  disabled:
+    - dialyzer
+    # - crossref
+    # - compiler
+lenses:
+  disabled:
+    # - show-behaviour-usages
+    # - ct-run-test
+    - server-info
+  enable:
+    - show-behaviour-usages
+    - ct-run-test
+macros:
+  - name: EMQX_RELEASE_EDITION
+    value: ce
+code_reload:
+  node: emqx@127.0.0.1

+ 31 - 0
get-rebar3

@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+VERSION="$1"
+
+# ensure dir
+cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/"
+
+DOWNLOAD_URL='https://github.com/emqx/rebar3/releases/download'
+
+download() {
+    local url="${DOWNLOAD_URL}/${VERSION}/rebar3"
+
+    echo "Downloading rebar3 from '${url}' ..."
+    curl --silent --show-error -f -L "${url}" -o ./rebar3
+}
+
+# get the version number from the second line of the escript
+# because command `rebar3 -v` tries to load rebar.config
+# which is slow and may print some logs
+version() {
+    head -n 2 ./rebar3 | tail -n 1 | tr ' ' '\n' | grep -E '^.+-emqx-.+'
+}
+
+if [ -f 'rebar3' ] && [ "$(version)" = "$VERSION" ]; then
+    exit 0
+fi
+
+download
+chmod +x ./rebar3

+ 9 - 0
include/emqx_plugin_kafka.hrl

@@ -0,0 +1,9 @@
+-ifndef(EMQX_PLUGIN_KAFKA_HRL).
+-define(EMQX_PLUGIN_KAFKA_HRL, true).
+
+-define(EMQX_PLUGIN_KAFKA_APP, emqx_plugin_kafka).
+-define(EMQX_PLUGIN_KAFKA_CHANNELS, channels).
+
+-define(PLUGIN_KAFKA_RESOURCE_GROUP, <<"emqx_plugin_kafka">>).
+
+-endif.

+ 32 - 0
priv/emqx_plugin_kafka.hocon

@@ -0,0 +1,32 @@
+plugin_kafka {
+  connection {
+    bootstrap_hosts = ["10.3.64.223:9192", "10.3.64.223:9292", "10.3.64.223:9392"]
+    sasl {
+      mechanism = plain
+      username = "username"
+      password = "password"
+    }
+  }
+
+  hooks = [
+    {endpoint = client.connect}
+    , {endpoint = client.connack}
+    , {endpoint = client.connected}
+    , {endpoint = client.disconnected}
+    , {endpoint = client.authenticate}
+    , {endpoint = client.authorize}
+    , {endpoint = client.authenticate}
+    , {endpoint = client.check_authz_complete}
+    , {endpoint = session.created}
+    , {endpoint = session.subscribed}
+    , {endpoint = session.unsubscribed}
+    , {endpoint = session.resumed}
+    , {endpoint = session.discarded}
+    , {endpoint = session.takenover}
+    , {endpoint = session.terminated}
+    , {endpoint = message.publish, filter = "test/#"}
+    , {endpoint = message.delivered, filter = "test/#"}
+    , {endpoint = message.acked, filter = "test/#"}
+    , {endpoint = message.dropped, filter = "test/#"}
+  ]
+}

+ 61 - 0
priv/example/exmaple_1.hocon

@@ -0,0 +1,61 @@
+plugin_kafka {
+  connection {
+    client_id = "kafka_client"
+    bootstrap_hosts = ["10.3.64.223:9192", "10.3.64.223:9292", "10.3.64.223:9392"]
+
+    connect_timeout = 5s
+    connection_strategy = per_partition
+    min_metadata_refresh_interval = 5s
+    query_api_versions = true
+    request_timeout = 3s
+    sasl {
+      mechanism = plain
+      username = "username"
+      password = "password"
+    }
+    ssl {
+      enable = false
+    }
+
+    health_check_interval = 32s
+  }
+
+  producer {
+    max_batch_bytes = 896KB
+    compression = no_compression
+    partition_strategy = random
+
+    encode_payload_type = plain
+  }
+
+  hooks = [
+    {
+      endpoint = message.publish
+      filter = "test/#"
+      kafka_topic = emqx_msg_publish
+      kafka_message = {
+        timestamp = "${.timestamp}"
+        value = "${.}"
+        key = "${.clientid}"
+      }
+    }
+    , {
+      endpoint = client.connected
+      kafka_topic = emqx_client_connected
+      kafka_message = {
+        timestamp = "${.timestamp}"
+        value = "${.}"
+        key = "${.proto_name}"
+      }
+    }
+    , {
+      endpoint = client.disconnected
+      kafka_topic = emqx_client_disconnected
+      kafka_message = {
+        timestamp = "${.timestamp}"
+        value = "${.reason}"
+        key = "${.clientid}"
+      }
+    }
+  ]
+}

+ 14 - 0
priv/example/exmaple_2.hocon

@@ -0,0 +1,14 @@
+plugin_kafka {
+  connection {
+    bootstrap_hosts = ["10.3.64.223:9192"]
+    sasl {
+      mechanism = plain
+      username = "username"
+      password = "password"
+    }
+  }
+
+  hooks = [
+    {endpoint = client.connect}
+  ]
+}

+ 49 - 0
rebar.config

@@ -0,0 +1,49 @@
+%% -*- mode: erlang -*-
+{deps,
+    [
+        {emqx, {git_subdir, "https://github.com/emqx/emqx.git", {tag, "v5.4.0"}, "apps/emqx"}}
+        , {emqx_ctl, {git_subdir, "https://github.com/emqx/emqx.git", {tag, "v5.4.0"}, "apps/emqx_ctl"}}
+        , {emqx_utils, {git_subdir, "https://github.com/emqx/emqx.git", {tag, "v5.4.0"}, "apps/emqx_utils"}}
+        , {emqx_durable_storage, {git_subdir, "https://github.com/emqx/emqx.git", {tag, "v5.4.0"}, "apps/emqx_durable_storage"}}
+        , {emqx_resource, {git_subdir, "https://github.com/emqx/emqx.git", {tag, "v5.4.0"}, "apps/emqx_resource"}}
+        , {wolff, "1.9.1"}
+    ]}.
+
+{plugins, [
+    {emqx_plugrel, {git, "https://github.com/jostar-y/emqx_plugrel.git", {branch, "master"}}}
+]}.
+
+{erl_opts, []}.
+
+{relx, [{release, {emqx_plugin_kafka, "1.0.0"},
+    [
+        emqx_plugin_kafka
+        , wolff
+        , kafka_protocol
+        , replayq
+        , telemetry
+        , lc
+        , crc32cer
+    ]}
+    , {dev_mode, false}
+    , {include_erts, false}
+]}.
+
+{emqx_plugrel,
+    [{authors, ["Jostar"]}
+        , {builder,
+        [{name, ""}
+            , {contact, ""}
+            , {website, ""}
+        ]}
+        , {repo, "https://github.com/jostar-y/emqx_plugin_kafka"}
+        , {functionality, ["Demo"]}
+        , {compatibility,
+        [{emqx, "~> v5.4.0"}
+        ]}
+        , {description, "Kafka plugin for EMQX >= V5.4.0"}
+    ]
+}.
+
+{xref_checks, [undefined_function_calls, undefined_functions, locals_not_used,
+    deprecated_function_calls, warnings_as_errors, deprecated_functions]}.

+ 11 - 0
src/emqx_plugin_kafka.app.src

@@ -0,0 +1,11 @@
+{application, emqx_plugin_kafka,
+    [{description, "EMQX kafka plugin."},
+        {vsn, "0.1.0"},
+        {modules, []},
+        {registered, [emqx_plugin_kafka_sup]},
+        {applications, [kernel, stdlib, wolff]},
+        {mod, {emqx_plugin_kafka_app, []}},
+        {env, []},
+        {licenses, ["Apache-2.0"]},
+        {maintainers, [""]}
+    ]}.

+ 83 - 0
src/emqx_plugin_kafka.erl

@@ -0,0 +1,83 @@
+-module(emqx_plugin_kafka).
+
+-include_lib("emqx/include/logger.hrl").
+-include("emqx_plugin_kafka.hrl").
+
+-export([
+    load/0
+    , unload/0
+]).
+
+load() ->
+    load(read_config()).
+
+load(Conf = #{connection := _, producer := _, hooks := _}) ->
+    emqx_plugin_kafka_util:check_crc32cer_nif(),
+    {ok, _} = start_resource(Conf),
+    hooks(Conf);
+load(_) ->
+    {error, "config_error"}.
+
+read_config() ->
+    case hocon:load(kafka_config_file()) of
+        {ok, RawConf} ->
+            case emqx_config:check_config(emqx_plugin_kafka_schema, RawConf) of
+                {_, #{plugin_kafka := Conf}} ->
+                    ?SLOG(info, #{
+                        msg => "emqx_plugin_kafka config",
+                        config => Conf
+                    }),
+                    Conf;
+                _ ->
+                    ?SLOG(error, #{
+                        msg => "bad_hocon_file",
+                        file => kafka_config_file()
+                    }),
+                    {error, bad_hocon_file}
+
+            end;
+        {error, Error} ->
+            ?SLOG(error, #{
+                msg => "bad_hocon_file",
+                file => kafka_config_file(),
+                reason => Error
+            }),
+            {error, bad_hocon_file}
+    end.
+
+kafka_config_file() ->
+    Env = os:getenv("EMQX_PLUGIN_KAFKA_CONF"),
+    case Env =:= "" orelse Env =:= false of
+        true -> "etc/emqx_plugin_kafka.hocon";
+        false -> Env
+    end.
+
+start_resource(Conf = #{connection := #{health_check_interval := HealthCheckInterval}}) ->
+    ResId = emqx_plugin_kafka_util:resource_id(),
+    ok = emqx_resource:create_metrics(ResId),
+    Result = emqx_resource:create_local(
+        ResId,
+        ?PLUGIN_KAFKA_RESOURCE_GROUP,
+        emqx_plugin_kafka_producer,
+        Conf,
+        #{health_check_interval => HealthCheckInterval}),
+    start_resource_if_enabled(Result).
+
+start_resource_if_enabled({ok, _Result = #{error := undefined, id := ResId}}) ->
+    {ok, ResId};
+start_resource_if_enabled({ok, #{error := Error, id := ResId}}) ->
+    ?SLOG(error, #{
+        msg => "start resource error",
+        error => Error,
+        resource_id => ResId
+    }),
+    emqx_resource:stop(ResId),
+    error.
+
+hooks(#{producer := Producer, hooks := Hooks}) ->
+    emqx_plugin_kafka_hook:hooks(Hooks, Producer, []).
+
+unload() ->
+    emqx_plugin_kafka_hook:unhook(),
+    ResId = emqx_plugin_kafka_util:resource_id(),
+    emqx_resource:remove_local(ResId).

+ 19 - 0
src/emqx_plugin_kafka_app.erl

@@ -0,0 +1,19 @@
+-module(emqx_plugin_kafka_app).
+
+-behaviour(application).
+
+-emqx_plugin(?MODULE).
+
+-export([
+    start/2
+    , stop/1
+]).
+
+start(_StartType, _StartArgs) ->
+    {ok, Sup} = emqx_plugin_kafka_sup:start_link(),
+    emqx_plugin_kafka:load(),
+    {ok, Sup}.
+
+stop(_State) ->
+    emqx_plugin_kafka:unload(),
+    ok.

+ 472 - 0
src/emqx_plugin_kafka_evt.erl

@@ -0,0 +1,472 @@
+-module(emqx_plugin_kafka_evt).
+
+-include_lib("emqx/include/emqx.hrl").
+
+-export([
+    eventmsg_connect/1
+    , eventmsg_connack/2
+    , eventmsg_connected/2
+    , eventmsg_disconnected/3
+    , eventmsg_authenticate/2
+    , eventmsg_authorize/4
+    , eventmsg_check_authz_complete/5
+
+    , eventmsg_session_created/3
+    , eventmsg_sub_or_unsub/4
+    , eventmsg_session/2
+    , eventmsg_session_terminated/2
+
+    , eventmsg_publish/1
+    , eventmsg_dropped/3
+    , eventmsg_delivered/2
+    , eventmsg_acked/2
+]).
+
+eventmsg_publish(
+    Message = #message{
+        id = Id,
+        from = ClientId,
+        qos = QoS,
+        flags = Flags,
+        topic = Topic,
+        payload = Payload,
+        timestamp = Timestamp
+    }
+) ->
+    with_basic_columns(
+        'message.publish',
+        #{
+            id => emqx_guid:to_hexstr(Id),
+            clientid => ClientId,
+            username => emqx_message:get_header(username, Message, undefined),
+            payload => Payload,
+            peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
+            topic => Topic,
+            qos => QoS,
+            flags => Flags,
+            pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
+            publish_received_at => Timestamp
+        }
+    ).
+
+eventmsg_connect(
+    ConnInfo = #{
+        peername := PeerName,
+        sockname := SockName,
+        clean_start := CleanStart,
+        proto_name := ProtoName,
+        proto_ver := ProtoVer
+    }
+) ->
+    Keepalive = maps:get(keepalive, ConnInfo, 0),
+    ConnProps = maps:get(conn_props, ConnInfo, #{}),
+    RcvMax = maps:get(receive_maximum, ConnInfo, 0),
+    ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0),
+    with_basic_columns(
+        'client.connect',
+        #{
+            peername => ntoa(PeerName),
+            sockname => ntoa(SockName),
+            proto_name => ProtoName,
+            proto_ver => ProtoVer,
+            keepalive => Keepalive,
+            clean_start => CleanStart,
+            receive_maximum => RcvMax,
+            expiry_interval => ExpiryInterval div 1000,
+            conn_props => printable_maps(ConnProps)
+        }
+    ).
+
+eventmsg_connected(
+    _ClientInfo = #{
+        clientid := ClientId,
+        username := Username,
+        is_bridge := IsBridge,
+        mountpoint := Mountpoint
+    },
+    ConnInfo = #{
+        peername := PeerName,
+        sockname := SockName,
+        clean_start := CleanStart,
+        proto_name := ProtoName,
+        proto_ver := ProtoVer,
+        connected_at := ConnectedAt
+    }
+) ->
+    Keepalive = maps:get(keepalive, ConnInfo, 0),
+    ConnProps = maps:get(conn_props, ConnInfo, #{}),
+    RcvMax = maps:get(receive_maximum, ConnInfo, 0),
+    ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0),
+    with_basic_columns(
+        'client.connected',
+        #{
+            clientid => ClientId,
+            username => Username,
+            mountpoint => Mountpoint,
+            peername => ntoa(PeerName),
+            sockname => ntoa(SockName),
+            proto_name => ProtoName,
+            proto_ver => ProtoVer,
+            keepalive => Keepalive,
+            clean_start => CleanStart,
+            receive_maximum => RcvMax,
+            expiry_interval => ExpiryInterval div 1000,
+            is_bridge => IsBridge,
+            conn_props => printable_maps(ConnProps),
+            connected_at => ConnectedAt
+        }
+    ).
+
+eventmsg_disconnected(
+    _ClientInfo = #{
+        clientid := ClientId,
+        username := Username
+    },
+    ConnInfo = #{
+        peername := PeerName,
+        sockname := SockName,
+        proto_name := ProtoName,
+        proto_ver := ProtoVer,
+        disconnected_at := DisconnectedAt
+    },
+    Reason
+) ->
+    with_basic_columns(
+        'client.disconnected',
+        #{
+            reason => reason(Reason),
+            clientid => ClientId,
+            username => Username,
+            peername => ntoa(PeerName),
+            sockname => ntoa(SockName),
+            proto_name => ProtoName,
+            proto_ver => ProtoVer,
+            disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
+            disconnected_at => DisconnectedAt
+        }
+    ).
+
+eventmsg_connack(
+    ConnInfo = #{
+        clientid := ClientId,
+        clean_start := CleanStart,
+        username := Username,
+        peername := PeerName,
+        sockname := SockName,
+        proto_name := ProtoName,
+        proto_ver := ProtoVer
+    },
+    Reason
+) ->
+    Keepalive = maps:get(keepalive, ConnInfo, 0),
+    ConnProps = maps:get(conn_props, ConnInfo, #{}),
+    ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0),
+    with_basic_columns(
+        'client.connack',
+        #{
+            reason_code => reason(Reason),
+            clientid => ClientId,
+            clean_start => CleanStart,
+            username => Username,
+            peername => ntoa(PeerName),
+            sockname => ntoa(SockName),
+            proto_name => ProtoName,
+            proto_ver => ProtoVer,
+            keepalive => Keepalive,
+            expiry_interval => ExpiryInterval,
+            conn_props => printable_maps(ConnProps)
+        }
+    ).
+
+eventmsg_authenticate(
+    _ClientInfo = #{
+        clientid := ClientId,
+        username := Username,
+        peerhost := PeerHost
+    },
+    Result
+) ->
+    with_basic_columns(
+        'client.authenticate',
+        #{
+            clientid => ClientId,
+            username => Username,
+            peerhost => ntoa(PeerHost),
+            result => Result
+        }
+    ).
+
+eventmsg_authorize(
+    _ClientInfo = #{
+        clientid := ClientId,
+        username := Username,
+        peerhost := PeerHost
+    },
+    PubSub,
+    Topic,
+    Result
+) ->
+    with_basic_columns(
+        'client.authorize',
+        #{
+            clientid => ClientId,
+            username => Username,
+            peerhost => ntoa(PeerHost),
+            topic => Topic,
+            action => PubSub,
+            result => Result
+        }
+    ).
+
+eventmsg_check_authz_complete(
+    _ClientInfo = #{
+        clientid := ClientId,
+        username := Username,
+        peerhost := PeerHost
+    },
+    PubSub,
+    Topic,
+    Result,
+    AuthzSource
+) ->
+    with_basic_columns(
+        'client.check_authz_complete',
+        #{
+            clientid => ClientId,
+            username => Username,
+            peerhost => ntoa(PeerHost),
+            topic => Topic,
+            action => PubSub,
+            authz_source => AuthzSource,
+            result => Result
+        }
+    ).
+
+eventmsg_session_created(
+    _ClientInfo = #{
+        clientid := ClientId,
+        username := Username,
+        peerhost := PeerHost
+    },
+    SessionId,
+    CreatedAt
+) ->
+    with_basic_columns(
+        'session.created',
+        #{
+            clientid => ClientId,
+            username => Username,
+            peerhost => ntoa(PeerHost),
+            session_id => SessionId,
+            created_at => CreatedAt
+        }
+    ).
+
+eventmsg_sub_or_unsub(
+    Event,
+    _ClientInfo = #{
+        clientid := ClientId,
+        username := Username,
+        peerhost := PeerHost
+    },
+    Topic,
+    SubOpts = #{qos := QoS}
+) ->
+    PropKey = sub_unsub_prop_key(Event),
+    with_basic_columns(
+        Event,
+        #{
+            clientid => ClientId,
+            username => Username,
+            peerhost => ntoa(PeerHost),
+            PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
+            topic => Topic,
+            qos => QoS
+        }
+    ).
+
+eventmsg_session(
+    Evt,
+    _ClientInfo = #{
+        clientid := ClientId,
+        username := Username,
+        peerhost := PeerHost
+    }
+) ->
+    with_basic_columns(
+        Evt,
+        #{
+            clientid => ClientId,
+            username => Username,
+            peerhost => ntoa(PeerHost)
+        }
+    ).
+
+eventmsg_session_terminated(
+    _ClientInfo = #{
+        clientid := ClientId,
+        username := Username,
+        peerhost := PeerHost
+    },
+    Reason
+) ->
+    with_basic_columns(
+        'session.terminated',
+        #{
+            clientid => ClientId,
+            username => Username,
+            peerhost => ntoa(PeerHost),
+            reason => reason(Reason)
+        }
+    ).
+
+eventmsg_dropped(
+    Message = #message{
+        id = Id,
+        from = ClientId,
+        qos = QoS,
+        flags = Flags,
+        topic = Topic,
+        payload = Payload,
+        timestamp = Timestamp
+    },
+    ByNode,
+    Reason
+) ->
+    with_basic_columns(
+        'message.dropped',
+        #{
+            id => emqx_guid:to_hexstr(Id),
+            by_node => ByNode,
+            reason => Reason,
+            clientid => ClientId,
+            username => emqx_message:get_header(username, Message, undefined),
+            payload => Payload,
+            peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
+            topic => Topic,
+            qos => QoS,
+            flags => Flags,
+            pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
+            publish_received_at => Timestamp
+        }
+    ).
+
+eventmsg_delivered(
+    _ClientInfo = #{
+        peerhost := PeerHost,
+        clientid := ReceiverCId,
+        username := ReceiverUsername
+    },
+    Message = #message{
+        id = Id,
+        from = ClientId,
+        qos = QoS,
+        flags = Flags,
+        topic = Topic,
+        payload = Payload,
+        timestamp = Timestamp
+    }
+) ->
+    with_basic_columns(
+        'message.delivered',
+        #{
+            id => emqx_guid:to_hexstr(Id),
+            from_clientid => ClientId,
+            from_username => emqx_message:get_header(username, Message, undefined),
+            clientid => ReceiverCId,
+            username => ReceiverUsername,
+            payload => Payload,
+            peerhost => ntoa(PeerHost),
+            topic => Topic,
+            qos => QoS,
+            flags => Flags,
+            pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
+            publish_received_at => Timestamp
+        }
+    ).
+
+eventmsg_acked(
+    _ClientInfo = #{
+        peerhost := PeerHost,
+        clientid := ReceiverCId,
+        username := ReceiverUsername
+    },
+    Message = #message{
+        id = Id,
+        from = ClientId,
+        qos = QoS,
+        flags = Flags,
+        topic = Topic,
+        payload = Payload,
+        timestamp = Timestamp
+    }
+) ->
+    with_basic_columns(
+        'message.acked',
+        #{
+            id => emqx_guid:to_hexstr(Id),
+            from_clientid => ClientId,
+            from_username => emqx_message:get_header(username, Message, undefined),
+            clientid => ReceiverCId,
+            username => ReceiverUsername,
+            payload => Payload,
+            peerhost => ntoa(PeerHost),
+            topic => Topic,
+            qos => QoS,
+            flags => Flags,
+            pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
+            puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
+            publish_received_at => Timestamp
+        }
+    ).
+
+with_basic_columns(EventName, Columns) when is_map(Columns) ->
+    Columns#{
+        event => EventName,
+        timestamp => erlang:system_time(millisecond),
+        node => node()
+    }.
+
+reason(Reason) when is_atom(Reason) -> Reason;
+reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
+reason({Error, _}) when is_atom(Error) -> Error;
+reason(_) -> internal_error.
+
+ntoa(undefined) -> undefined;
+ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
+ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
+
+sub_unsub_prop_key('session.subscribed') -> sub_props;
+sub_unsub_prop_key('session.unsubscribed') -> unsub_props.
+
+printable_maps(undefined) ->
+    #{};
+printable_maps(Headers) ->
+    maps:fold(
+        fun
+            (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname ->
+                AccIn#{K => ntoa(V0)};
+            ('User-Property', V0, AccIn) when is_list(V0) ->
+                AccIn#{
+                    %% The 'User-Property' field is for the convenience of querying properties
+                    %% using the '.' syntax, e.g. "SELECT 'User-Property'.foo as foo"
+                    %% However, this does not allow duplicate property keys. To allow
+                    %% duplicate keys, we have to use the 'User-Property-Pairs' field instead.
+                    'User-Property' => maps:from_list(V0),
+                    'User-Property-Pairs' => [
+                        #{
+                            key => Key,
+                            value => Value
+                        }
+                        || {Key, Value} <- V0
+                    ]
+                };
+            (_K, V, AccIn) when is_tuple(V) ->
+                %% internal headers
+                AccIn;
+            (K, V, AccIn) ->
+                AccIn#{K => V}
+        end,
+        #{'User-Property' => #{}},
+        Headers
+    ).

+ 246 - 0
src/emqx_plugin_kafka_hook.erl

@@ -0,0 +1,246 @@
+-module(emqx_plugin_kafka_hook).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_hooks.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include("emqx_plugin_kafka.hrl").
+
+-export([
+    hooks/3
+    , unhook/0
+]).
+
+-export([
+    endpoint_func/1
+]).
+
+-export([
+    on_client_connect/3
+    , on_client_connack/4
+    , on_client_connected/3
+    , on_client_disconnected/4
+    , on_client_authenticate/3
+    , on_client_authorize/5
+    , on_client_check_authz_complete/6
+]).
+
+-export([
+    on_session_created/3
+    , on_session_subscribed/4
+    , on_session_unsubscribed/4
+    , on_session_resumed/3
+    , on_session_discarded/3
+    , on_session_takenover/3
+    , on_session_terminated/4
+]).
+
+-export([
+    on_message_publish/2
+    , on_message_delivered/3
+    , on_message_acked/3
+    , on_message_dropped/4
+]).
+
+-define(evt_mod, emqx_plugin_kafka_evt).
+
+hooks([Hook | T], Producer, Acc) ->
+    Ret = hook(emqx_plugin_kafka_util:resource_id(), Hook#{producer => Producer}),
+    hooks(T, Producer, [Ret | Acc]);
+hooks([], _, Acc) ->
+    persistent_term:put({?EMQX_PLUGIN_KAFKA_APP, ?EMQX_PLUGIN_KAFKA_CHANNELS}, Acc).
+
+hook(ResId, Hook = #{endpoint := Endpoint0, filter := Filter}) ->
+    {ok, Endpoint} = emqx_utils:safe_to_existing_atom(Endpoint0),
+    ChannelId = emqx_plugin_kafka_util:channel_id(Endpoint),
+    emqx_resource_manager:add_channel(ResId, ChannelId, Hook),
+    Opts = #{
+        channel_id => ChannelId,
+        filter => Filter
+    },
+    trigger_hook(Endpoint, endpoint_func(Endpoint), Opts),
+    {ChannelId, Hook}.
+
+trigger_hook(_, undefined, _) ->
+    ok;
+trigger_hook(Endpoint, Func, Opts) ->
+    emqx_hooks:add(Endpoint, {?MODULE, Func, [Opts]}, _Property = ?HP_HIGHEST).
+
+endpoint_func('client.connect') -> on_client_connect;
+endpoint_func('client.connack') -> on_client_connack;
+endpoint_func('client.connected') -> on_client_connected;
+endpoint_func('client.disconnected') -> on_client_disconnected;
+endpoint_func('client.authenticate') -> on_client_authenticate;
+endpoint_func('client.authorize') -> on_client_authorize;
+endpoint_func('client.authenticate') -> on_client_authenticate;
+endpoint_func('client.check_authz_complete') -> on_client_check_authz_complete;
+endpoint_func('session.created') -> on_session_created;
+endpoint_func('session.subscribed') -> on_session_subscribed;
+endpoint_func('session.unsubscribed') -> on_session_unsubscribed;
+endpoint_func('session.resumed') -> on_session_resumed;
+endpoint_func('session.discarded') -> on_session_discarded;
+endpoint_func('session.takenover') -> on_session_takenover;
+endpoint_func('session.terminated') -> on_session_terminated;
+endpoint_func('message.publish') -> on_message_publish;
+endpoint_func('message.delivered') -> on_message_delivered;
+endpoint_func('message.acked') -> on_message_acked;
+endpoint_func('message.dropped') -> on_message_dropped;
+endpoint_func(_) -> undefined.
+
+unhook() ->
+    unhook('client.connect', {?MODULE, on_client_connect}),
+    unhook('client.connack', {?MODULE, on_client_connack}),
+    unhook('client.connected', {?MODULE, on_client_connected}),
+    unhook('client.disconnected', {?MODULE, on_client_disconnected}),
+    unhook('client.authenticate', {?MODULE, on_client_authenticate}),
+    unhook('client.authorize', {?MODULE, on_client_authorize}),
+    unhook('client.check_authz_complete', {?MODULE, on_client_check_authz_complete}),
+    unhook('session.created', {?MODULE, on_session_created}),
+    unhook('session.subscribed', {?MODULE, on_session_subscribed}),
+    unhook('session.unsubscribed', {?MODULE, on_session_unsubscribed}),
+    unhook('session.resumed', {?MODULE, on_session_resumed}),
+    unhook('session.discarded', {?MODULE, on_session_discarded}),
+    unhook('session.takenover', {?MODULE, on_session_takenover}),
+    unhook('session.terminated', {?MODULE, on_session_terminated}),
+    unhook('message.publish', {?MODULE, on_message_publish}),
+    unhook('message.delivered', {?MODULE, on_message_delivered}),
+    unhook('message.acked', {?MODULE, on_message_acked}),
+    unhook('message.dropped', {?MODULE, on_message_dropped}).
+
+unhook(Endpoint, MFA) ->
+    emqx_hooks:del(Endpoint, MFA).
+
+%%--------------------------------------------------------------------
+%% Client Lifecycle Hooks
+%%--------------------------------------------------------------------
+
+on_client_connect(ConnInfo, Props, Opts) ->
+    query(?evt_mod:eventmsg_connect(ConnInfo), Opts),
+    {ok, Props}.
+
+on_client_connack(ConnInfo, Rc, Props, Opts) ->
+    query(?evt_mod:eventmsg_connack(ConnInfo, Rc), Opts),
+    {ok, Props}.
+
+on_client_connected(ClientInfo, ConnInfo, Opts) ->
+    query(?evt_mod:eventmsg_connected(ClientInfo, ConnInfo), Opts),
+    ok.
+
+on_client_disconnected(ClientInfo, ReasonCode, ConnInfo, Opts) ->
+    query(?evt_mod:eventmsg_disconnected(ClientInfo, ConnInfo, ReasonCode), Opts),
+    ok.
+
+on_client_authenticate(ClientInfo, Result, Opts) ->
+    query(?evt_mod:eventmsg_authenticate(ClientInfo, Result), Opts),
+    {ok, Result}.
+
+on_client_authorize(ClientInfo, PubSub, Topic, Result, Opts) ->
+    query(?evt_mod:eventmsg_authorize(ClientInfo, PubSub, Topic, Result), Opts),
+    {ok, Result}.
+
+on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Opts) ->
+    query(?evt_mod:eventmsg_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource), Opts),
+    {ok, Result}.
+
+%%--------------------------------------------------------------------
+%% Session Lifecycle Hooks
+%%--------------------------------------------------------------------
+
+on_session_created(ClientInfo, #{id := SessionId, created_at := CreatedAt}, Opts) ->
+    query(?evt_mod:eventmsg_session_created(ClientInfo, SessionId, CreatedAt), Opts),
+    ok.
+
+on_session_subscribed(ClientInfo, Topic, SubOpts, Opts) ->
+    query(?evt_mod:eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts), Opts),
+    ok.
+
+on_session_unsubscribed(ClientInfo, Topic, SubOpts, Opts) ->
+    query(?evt_mod:eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts), Opts),
+    ok.
+
+on_session_resumed(ClientInfo, _SessInfo, Opts) ->
+    query(?evt_mod:eventmsg_session('session.resumed', ClientInfo), Opts),
+    ok.
+
+on_session_discarded(ClientInfo, _SessInfo, Opts) ->
+    query(?evt_mod:eventmsg_session('session.discarded', ClientInfo), Opts),
+    ok.
+
+on_session_takenover(ClientInfo, _SessInfo, Opts) ->
+    query(?evt_mod:eventmsg_session('session.takenover', ClientInfo), Opts),
+    ok.
+
+on_session_terminated(ClientInfo, Reason, _SessInfo, Opts) ->
+    query(?evt_mod:eventmsg_session_terminated(ClientInfo, Reason), Opts),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Message PubSub Hooks
+%%--------------------------------------------------------------------
+
+on_message_publish(Message, Opts = #{filter := Filter}) ->
+    case match_topic(Message, Filter) of
+        true ->
+            query(?evt_mod:eventmsg_publish(Message), Opts);
+        false ->
+            ok
+    end,
+    {ok, Message}.
+
+on_message_dropped(Message, #{node := ByNode}, Reason, Opts = #{filter := Filter}) ->
+    case match_topic(Message, Filter) of
+        true ->
+            query(?evt_mod:eventmsg_dropped(Message, ByNode, Reason), Opts);
+        false ->
+            ok
+    end,
+    ok.
+
+on_message_delivered(ClientInfo, Message, Opts = #{filter := Filter}) ->
+    case match_topic(Message, Filter) of
+        true ->
+            query(?evt_mod:eventmsg_delivered(ClientInfo, Message), Opts);
+        false ->
+            ok
+    end,
+    {ok, Message}.
+
+on_message_acked(ClientInfo, Message, Opts = #{filter := Filter}) ->
+    case match_topic(Message, Filter) of
+        true ->
+            query(?evt_mod:eventmsg_acked(ClientInfo, Message), Opts);
+        false ->
+            ok
+    end,
+    ok.
+
+%%%===================================================================
+%%% External functions
+%%%===================================================================
+
+match_topic(#message{topic = <<"$SYS/">>}, _) ->
+    false;
+match_topic(_, <<$#, _/binary>>) ->
+    false;
+match_topic(_, <<$+, _/binary>>) ->
+    false;
+match_topic(#message{topic = Topic}, Filter) ->
+    emqx_topic:match(Topic, Filter);
+match_topic(_, _) ->
+    false.
+
+query(
+    EvtMsg,
+    #{channel_id := ChannelId}
+) ->
+    Ret = emqx_resource:query(emqx_plugin_kafka_util:resource_id(), {ChannelId, EvtMsg}),
+    query_ret(Ret, EvtMsg).
+
+query_ret({_, {ok, _}}, _) ->
+    ok;
+query_ret(Ret, EvtMsg) ->
+    ?SLOG(error,
+        #{
+            msg => "failed_to_query_kafka_resource",
+            ret => Ret,
+            evt_msg => EvtMsg
+        }).

+ 318 - 0
src/emqx_plugin_kafka_producer.erl

@@ -0,0 +1,318 @@
+-module(emqx_plugin_kafka_producer).
+
+-behaviour(emqx_resource).
+
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include("emqx_plugin_kafka.hrl").
+
+-export([
+    query_mode/1
+    , callback_mode/0
+    , on_start/2
+    , on_get_status/2
+    , on_stop/2
+    , on_add_channel/4
+    , on_get_channels/1
+    , on_get_channel_status/3
+    , on_remove_channel/3
+    , on_query_async/4
+]).
+
+-include_lib("emqx/include/logger.hrl").
+
+query_mode(_) ->
+    simple_async_internal_buffer.
+
+callback_mode() ->
+    async_if_possible.
+
+on_start(
+    _InstId,
+    #{connection := Connection}
+) ->
+    C = fun(Key) -> emqx_plugin_kafka_util:check_config(Key, Connection) end,
+    Hosts = C(bootstrap_hosts),
+    ClientId = C(client_id),
+    ClientConfig = #{
+        connect_timeout => C(connect_timeout),
+        connection_strategy => C(connection_strategy),
+        min_metadata_refresh_interval => C(min_metadata_refresh_interval),
+        query_api_versions => C(query_api_versions),
+        request_timeout => C(request_timeout),
+        sasl => C(sasl),
+        ssl => C(ssl)
+    },
+    ok = ensure_client(ClientId, Hosts, ClientConfig),
+    case check_client_connectivity(ClientId) of
+        ok ->
+            {ok, #{
+                client_id => ClientId,
+                channels => #{}
+            }};
+        {error, {find_client, Reason}} ->
+            %% Race condition?  Crash?  We just checked it with `ensure_client'...
+            {error, Reason};
+        {error, {connectivity, Reason}} ->
+            {error, Reason}
+    end.
+
+on_get_status(
+    _InstId,
+    #{client_id := ClientId} = State
+) ->
+    case check_client_connectivity(ClientId) of
+        ok ->
+            ?status_connected;
+        {error, {find_client, _Error}} ->
+            ?status_connecting;
+        {error, {connectivity, Error}} ->
+            {?status_connecting, State, Error}
+    end.
+
+on_stop(_InstId, #{client_id := ClientId, channels := Channels}) ->
+    ?SLOG(info, #{
+        msg => "kafka_client_on_stop",
+        client_id => ClientId
+    }),
+    maps:foreach(fun(_, ChannelState) -> remove_producers(ClientId, ChannelState) end, Channels),
+    deallocate_client(ClientId),
+    ok.
+
+on_add_channel(
+    InstId,
+    #{
+        client_id := ClientId,
+        channels := Channels
+    } = OldState,
+    ChannelId,
+    ChannelConfig
+) ->
+    {ok, ChannelState} = start_producers(InstId, ChannelId, ClientId, ChannelConfig),
+    NChannels = maps:put(ChannelId, ChannelState, Channels),
+    NewState = OldState#{channels => NChannels},
+    {ok, NewState}.
+
+on_get_channels(_InstId) ->
+    persistent_term:get({?EMQX_PLUGIN_KAFKA_APP, ?EMQX_PLUGIN_KAFKA_CHANNELS}, []).
+
+on_get_channel_status(
+    _InstId,
+    _ChannelId,
+    _State
+) ->
+    ?status_connected.
+
+on_remove_channel(
+    _InstId,
+    #{
+        client_id := ClientId,
+        channels := Channels
+    } = OldState,
+    ChannelId
+) ->
+    case maps:take(ChannelId, Channels) of
+        {ChannelState, NChannels} ->
+            remove_producers(ClientId, ChannelState),
+            NewState = OldState#{channels => NChannels},
+            {ok, NewState};
+        error ->
+            {ok, OldState}
+    end.
+
+on_query_async(
+    InstId,
+    {ChannelId, Message},
+    _,
+    #{channels := Channels} = _ConnectorState
+) ->
+    #{
+        message_template := Template,
+        producers := Producers,
+        encode_payload_type := EncodePayloadType
+    } = maps:get(ChannelId, Channels),
+    try
+        KafkaMessage = render_message(Template, Message, EncodePayloadType),
+        do_send_msg(KafkaMessage, Producers)
+    catch
+        Error:Reason :Stack ->
+            ?SLOG(error, #{
+                msg => "emqx_plugin_kafka_producer on_query_async error",
+                error => Error,
+                instId => InstId,
+                reason => Reason,
+                stack => Stack
+            }),
+            {error, {Error, Reason}}
+    end.
+
+%%%===================================================================
+%%% External functions
+%%%===================================================================
+
+check_client_connectivity(ClientId) ->
+    case wolff_client_sup:find_client(ClientId) of
+        {ok, Pid} ->
+            case wolff_client:check_connectivity(Pid) of
+                ok ->
+                    ok;
+                {error, Error} ->
+                    {error, {connectivity, Error}}
+            end;
+        {error, Reason} ->
+            {error, {find_client, Reason}}
+    end.
+
+ensure_client(ClientId, Hosts, ClientConfig) ->
+    case wolff_client_sup:find_client(ClientId) of
+        {ok, _Pid} ->
+            ok;
+        {error, no_such_client} ->
+            case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
+                {ok, _} ->
+                    ok;
+                {error, Reason} ->
+                    ?SLOG(error, #{
+                        msg => failed_to_start_kafka_client,
+                        client_id => ClientId,
+                        kafka_hosts => Hosts,
+                        reason => Reason
+                    }),
+                    throw(failed_to_start_kafka_client)
+            end;
+        {error, Reason} ->
+            deallocate_client(ClientId),
+            throw({failed_to_find_created_client, Reason})
+    end.
+
+deallocate_client(ClientId) ->
+    _ = with_log_at_error(
+        fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
+        #{
+            msg => "failed_to_delete_kafka_client",
+            client_id => ClientId
+        }
+    ),
+    ok.
+
+start_producers(
+    InstId,
+    ChannelId,
+    ClientId,
+    #{
+        kafka_topic := KafkaTopic,
+        kafka_message := MessageTemplate,
+        producer := Producer
+    }
+) ->
+    #{encode_payload_type := EncodePayloadType} = Producer,
+    WolffProducerConfig = producers_config(Producer),
+    case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
+        {ok, Producers} ->
+            {ok, #{
+                message_template => compile_message_template(MessageTemplate),
+                kafka_topic => KafkaTopic,
+                producers => Producers,
+                encode_payload_type => EncodePayloadType
+            }};
+        {error, Reason2} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_kafka_producer",
+                instance_id => InstId,
+                channel_id => ChannelId,
+                kafka_topic => KafkaTopic,
+                reason => Reason2
+            }),
+            throw(
+                "Failed to start Kafka client. Please check the logs for errors and check"
+                " the connection parameters."
+            )
+    end.
+
+producers_config(#{
+    max_batch_bytes := MaxBatchBytes,
+    compression := Compression,
+    partition_strategy := PartitionStrategy
+}) ->
+    #{
+        partitioner => PartitionStrategy,
+        replayq_dir => false,
+        replayq_offload_mode => false,
+        max_batch_bytes => MaxBatchBytes,
+        compression => Compression
+    }.
+
+remove_producers(ClientId, #{producers := Producers}) ->
+    deallocate_producers(ClientId, Producers),
+    ok.
+
+deallocate_producers(ClientId, Producers) ->
+    _ = with_log_at_error(
+        fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
+        #{
+            msg => "failed_to_delete_kafka_producer",
+            client_id => ClientId
+        }
+    ).
+
+compile_message_template(T) ->
+    KeyTemplate = maps:get(key, T, <<"${.clientid}">>),
+    ValueTemplate = maps:get(value, T, <<"${.}">>),
+    TimestampTemplate = maps:get(timestamp, T, <<"${.timestamp}">>),
+    #{
+        key => preproc_tmpl(KeyTemplate),
+        value => preproc_tmpl(ValueTemplate),
+        timestamp => preproc_tmpl(TimestampTemplate)
+    }.
+
+preproc_tmpl(Tmpl) ->
+    emqx_placeholder:preproc_tmpl(Tmpl).
+
+render_message(
+    #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate},
+    Message,
+    EncodePayloadType
+) ->
+    #{
+        key => render(KeyTemplate, Message),
+        value => encode_payload(EncodePayloadType, render(ValueTemplate, Message)),
+        ts => render_timestamp(TimestampTemplate, Message)
+    }.
+
+render(Template, Message) ->
+    Opts = #{
+        var_trans => fun
+                         (undefined) -> <<"">>;
+                         (X) -> emqx_utils_conv:bin(X)
+                     end,
+        return => full_binary
+    },
+    emqx_placeholder:proc_tmpl(Template, Message, Opts).
+
+render_timestamp(Template, Message) ->
+    try
+        binary_to_integer(render(Template, Message))
+    catch
+        _:_ ->
+            erlang:system_time(millisecond)
+    end.
+
+encode_payload(base64, Payload) ->
+    base64:encode(Payload);
+encode_payload(_, Payload) ->
+    Payload.
+
+do_send_msg(KafkaMessage, Producers) ->
+    {_Partition, Pid} = wolff:send(Producers, [KafkaMessage], fun(_Partition, _BaseOffset) -> ok end),
+    {ok, Pid}.
+
+with_log_at_error(Fun, Log) ->
+    try
+        Fun()
+    catch
+        C:E ->
+            ?SLOG(error, Log#{
+                exception => C,
+                reason => E
+            })
+    end.

+ 203 - 0
src/emqx_plugin_kafka_schema.erl

@@ -0,0 +1,203 @@
+-module(emqx_plugin_kafka_schema).
+
+-include_lib("hocon/include/hoconsc.hrl").
+
+-export([
+    roots/0
+    , fields/1
+    , desc/1
+]).
+
+-import(hoconsc, [enum/1]).
+
+roots() -> [plugin_kafka].
+
+fields(plugin_kafka) ->
+    [
+        {connection, ?HOCON(?R_REF(connection), #{desc => ?DESC("connect_timeout")})},
+        {producer, ?HOCON(?R_REF(producer), #{desc => ?DESC("connect_timeout")})},
+        {hooks, ?HOCON(?ARRAY(?R_REF(hook)),
+            #{
+                required => true,
+                default => [],
+                desc => ?DESC("hooks")
+            })}
+    ];
+fields(connection) ->
+    [
+        {client_id, ?HOCON(string(),
+            #{
+                desc => ?DESC("client_id"),
+                default => "client"
+            })},
+        {bootstrap_hosts, bootstrap_hosts()},
+        {connect_timeout, ?HOCON(emqx_schema:timeout_duration_ms(),
+            #{
+                default => <<"5s">>,
+                desc => ?DESC("connect_timeout")
+            })},
+        {client_id, ?HOCON(string(),
+            #{
+                default => <<"emqx_plugin_kafka_connection">>,
+                desc => ?DESC("client_id")
+            })},
+        {connection_strategy, ?HOCON(enum([per_partition, per_broker]),
+            #{
+                default => per_partition,
+                desc => ?DESC("connection_strategy")
+            }
+        )},
+        {min_metadata_refresh_interval, ?HOCON(emqx_schema:timeout_duration_ms(),
+            #{
+                default => <<"5s">>,
+                desc => ?DESC("min_metadata_refresh_interval")
+            }
+        )},
+        {query_api_versions, ?HOCON(boolean(),
+            #{
+                default => true,
+                desc => ?DESC("query_api_versions")
+            })},
+        {request_timeout, ?HOCON(emqx_schema:timeout_duration_ms(),
+            #{
+                default => <<"3s">>,
+                desc => ?DESC("request_timeout")
+            }
+        )},
+        {sasl, ?HOCON(?R_REF(sasl),
+            #{
+                desc => ?DESC("sasl")
+            }
+        )},
+        {ssl, ?HOCON(?R_REF(ssl),
+            #{
+                desc => ?DESC("ssl")
+            }
+        )},
+        {health_check_interval, ?HOCON(emqx_schema:timeout_duration_ms(),
+            #{
+                default => <<"32s">>,
+                desc => ?DESC("health_check_interval")
+            }
+        )}
+    ];
+fields(bootstrap_host) ->
+    [
+        {host, ?HOCON(string(),
+            #{
+                validator => emqx_schema:servers_validator(
+                    #{default_port => 9092}, _Required = true)
+            }
+        )}
+    ];
+fields(sasl) ->
+    [
+        {mechanism, ?HOCON(enum([plain, scram_sha_256, scram_sha_512]),
+            #{
+                default => plain,
+                desc => ?DESC("sasl_mechanism"),
+                required => true
+            }
+        )},
+        {username, ?HOCON(string(),
+            #{
+                desc => ?DESC("sasl_username"),
+                required => true
+            }
+        )},
+        {password, ?HOCON(string(),
+            #{
+                desc => ?DESC("sasl_password"),
+                required => true
+            }
+        )}
+    ];
+fields(ssl) ->
+    Schema = emqx_schema:client_ssl_opts_schema(#{}),
+    lists:keydelete("user_lookup_fun", 1, Schema);
+fields(producer) ->
+    [
+        {max_batch_bytes, ?HOCON(emqx_schema:bytesize(),
+            #{
+                default => "896KB",
+                desc => ?DESC("max_batch_bytes")
+            }
+        )},
+        {compression, ?HOCON(enum([no_compression, snappy, gzip]),
+            #{
+                default => no_compression,
+                desc => ?DESC("compression")
+            }
+        )},
+        {partition_strategy, ?HOCON(enum([random, roundrobin, first_key_dispatch]),
+            #{
+                default => random,
+                desc => ?DESC("partition_strategy")
+            }
+        )},
+        {encode_payload_type, ?HOCON(enum([plain, base64]),
+            #{
+                default => plain,
+                desc => ?DESC("encode_payload_type")
+            }
+        )}
+    ];
+fields(hook) ->
+    [
+        {endpoint, ?HOCON(string(),
+            #{
+                desc => ?DESC("hook_endpoint"),
+                required => true,
+                validator => fun validate_endpoint/1
+            })},
+        {filter, ?HOCON(binary(),
+            #{
+                desc => ?DESC("hook_filter"),
+                default => <<"#">>
+            })},
+        {kafka_topic, ?HOCON(string(),
+            #{
+                desc => ?DESC("hook_kafka_topic"),
+                default => "emqx_test"
+            })},
+        {kafka_message, ?HOCON(?R_REF(kafka_message),
+            #{
+                desc => ?DESC("hook_kafka_message")
+            })}
+    ];
+fields(kafka_message) ->
+    [
+        {key, ?HOCON(string(),
+            #{
+                default => <<"${.clientid}">>,
+                desc => ?DESC(kafka_message_key)
+            })},
+        {value, ?HOCON(string(),
+            #{
+                default => <<"${.}">>,
+                desc => ?DESC(kafka_message_value)
+            })},
+        {timestamp, ?HOCON(string(),
+            #{
+                default => <<"${.timestamp}">>,
+                desc => ?DESC(kafka_message_timestamp)
+            })}
+    ].
+
+desc(_) -> undefined.
+
+bootstrap_hosts() ->
+    Meta = #{desc => ?DESC("bootstrap_hosts")},
+    emqx_schema:servers_sc(Meta, #{default_port => 9092}).
+
+validate_endpoint(undefined) ->
+    {error, "no matching mount point was found"};
+validate_endpoint(Endpoint0) when is_list(Endpoint0) ->
+    case emqx_utils:safe_to_existing_atom(Endpoint0) of
+        {ok, Endpoint} ->
+            validate_endpoint(emqx_plugin_kafka_hook:endpoint_func(Endpoint));
+        _ ->
+            {error, "no matching mount point was found"}
+    end;
+validate_endpoint(_) ->
+    ok.

+ 13 - 0
src/emqx_plugin_kafka_sup.erl

@@ -0,0 +1,13 @@
+-module(emqx_plugin_kafka_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    {ok, {{one_for_all, 0, 1}, []}}.

+ 75 - 0
src/emqx_plugin_kafka_util.erl

@@ -0,0 +1,75 @@
+-module(emqx_plugin_kafka_util).
+
+-include_lib("emqx/include/logger.hrl").
+
+-export([
+    resource_id/0
+    , channel_id/1
+    , check_config/2
+]).
+
+-export([
+    check_crc32cer_nif/0
+]).
+
+resource_id() ->
+    <<"emqx_plugin:kafka_producer">>.
+
+client_id(ClientId) ->
+    <<"emqx_plugin:kafka_client:", (bin(ClientId))/binary>>.
+
+channel_id(Endpoint) ->
+    <<"emqx_plugin:kafka_producer:connector:", (bin(Endpoint))/binary>>.
+
+bin(Bin) when is_binary(Bin) -> Bin;
+bin(Str) when is_list(Str) -> list_to_binary(Str);
+bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
+
+check_config(Key, Config) when is_map_key(Key, Config) ->
+    tr_config(Key, maps:get(Key, Config));
+check_config(Key, _Config) ->
+    throw(#{
+        reason => missing_required_config,
+        missing_config => Key
+    }).
+
+tr_config(bootstrap_hosts, Hosts) ->
+    hosts(Hosts);
+tr_config(client_id, ClientId) ->
+    client_id(ClientId);
+tr_config(sasl, Sasl) ->
+    sasl(Sasl);
+tr_config(ssl, Ssl) ->
+    ssl(Ssl);
+tr_config(_Key, Value) ->
+    Value.
+
+%% Parse comma separated host:port list into a [{Host,Port}] list
+hosts(Hosts) when is_binary(Hosts) ->
+    hosts(binary_to_list(Hosts));
+hosts([#{hostname := _, port := _} | _] = Servers) ->
+    [{Hostname, Port} || #{hostname := Hostname, port := Port} <- Servers];
+hosts(Hosts) when is_list(Hosts) ->
+    kpro:parse_endpoints(Hosts).
+
+sasl(#{mechanism := Mechanism, username := Username, password := Secret}) ->
+    {Mechanism, Username, Secret};
+sasl(_) ->
+    undefined.
+
+ssl(#{enable := true} = SSL) ->
+    emqx_tls_lib:to_client_opts(SSL);
+ssl(_) ->
+    false.
+
+check_crc32cer_nif() ->
+    try
+        crc32cer:nif("1")
+    catch
+        error:{crc32cer_nif_not_loaded, _}:_ ->
+            load_crc32cer_nif()
+    end.
+
+load_crc32cer_nif() ->
+    code:purge(crc32cer),
+    code:load_file(crc32cer).