Просмотр исходного кода

feat(emqx_resource): add behaviour emqx_resource

Shawn 4 лет назад
Родитель
Сommit
2a31c43e0d

+ 191 - 0
apps/emqx_resource/LICENSE

@@ -0,0 +1,191 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   Copyright 2021, Shawn <506895667@qq.com>.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

+ 43 - 0
apps/emqx_resource/Makefile

@@ -0,0 +1,43 @@
+REBAR := rebar3
+
+.PHONY: all
+all: es
+
+.PHONY: compile
+compile:
+	$(REBAR) compile
+
+.PHONY: clean
+clean: distclean
+
+.PHONY: distclean
+distclean:
+	@rm -rf _build erl_crash.dump rebar3.crashdump
+
+.PHONY: xref
+xref:
+	$(REBAR) xref
+
+.PHONY: eunit
+eunit: compile
+	$(REBAR) eunit -v -c
+	$(REBAR) cover
+
+.PHONY: ct
+ct: compile
+	$(REBAR) as test ct -v
+
+cover:
+	$(REBAR) cover
+
+.PHONY: dialyzer
+dialyzer:
+	$(REBAR) dialyzer
+
+.PHONY: es
+es: compile
+	$(REBAR) escriptize
+
+.PHONY: elvis
+elvis:
+	./scripts/elvis-check.sh

+ 53 - 0
apps/emqx_resource/README.md

@@ -0,0 +1,53 @@
+# emqx_resource
+
+The `emqx_resource` is an application that manages configuration specs and runtime states
+for components that need to be configured and manipulated from the emqx-dashboard.
+
+It is intended to be used by resources, actions, acl, auth, backend_logics and more.
+
+It reads the configuration spec from *.spec (in HOCON format) and provide APIs for
+creating, updating and destroying resource instances among all nodes in the cluster.
+
+It handles the problem like storing the configs and runtime states for both resource
+and resource instances, and how porting them between different emqx_resource versions.
+
+It may maintain the config and data in JSON or HOCON files in data/ dir.
+
+After restarting the emqx_resource, it re-creates all the resource instances.
+
+There can be foreign references between resource instances via resource-id.
+So they may find each other via this Id.
+
+## Try it out
+
+    $ ./demo.sh
+    Eshell V11.1.8  (abort with ^G)
+    1> == the demo log tracer <<"log_tracer_clientid_shawn">> started.
+    config: #{<<"config">> =>
+                #{<<"bulk">> => <<"10KB">>,<<"cache_log_dir">> => <<"/tmp">>,
+                    <<"condition">> => #{<<"clientid">> => <<"abc">>},
+                    <<"level">> => <<"debug">>},
+            <<"id">> => <<"log_tracer_clientid_shawn">>,
+            <<"resource_type">> => <<"log_tracer">>}
+    1> emqx_resource_instance:health_check(<<"log_tracer_clientid_shawn">>).
+    == the demo log tracer <<"log_tracer_clientid_shawn">> is working well
+    state: #{health_checked => 1,logger_handler_id => abc}
+    ok
+
+    2> emqx_resource_instance:health_check(<<"log_tracer_clientid_shawn">>).
+    == the demo log tracer <<"log_tracer_clientid_shawn">> is working well
+    state: #{health_checked => 2,logger_handler_id => abc}
+    ok
+
+    3> emqx_resource_instance:query(<<"log_tracer_clientid_shawn">>, get_log).
+    == the demo log tracer <<"log_tracer_clientid_shawn">> received request: get_log
+    state: #{health_checked => 2,logger_handler_id => abc}
+    "this is a demo log messages..."
+
+    4> emqx_resource_instance:remove(<<"log_tracer_clientid_shawn">>).
+    == the demo log tracer <<"log_tracer_clientid_shawn">> stopped.
+    state: #{health_checked => 0,logger_handler_id => abc}
+    ok
+
+    5> emqx_resource_instance:query(<<"log_tracer_clientid_shawn">>, get_log).
+    ** exception error: {get_instance,{<<"log_tracer_clientid_shawn">>,not_found}}

+ 6 - 0
apps/emqx_resource/demo.sh

@@ -0,0 +1,6 @@
+#!/bin/sh
+set -e
+
+rebar3 compile
+
+erl -sname abc -pa _build/default/lib/*/ebin _build/default/lib/emqx_resource/examples -s demo

+ 14 - 0
apps/emqx_resource/elvis.config

@@ -0,0 +1,14 @@
+[{elvis, [{config, [
+
+#{dirs => ["src"],
+  filter => "*.erl",
+  %ignore => [],
+  ruleset => erl_files,
+  rules => [{elvis_style, operator_spaces, #{
+              rules => [{right, ","},
+                        {right, "|"},
+                        {left, "|"},
+                        {right, "||"},
+                        {left, "||"}]}},
+            {elvis_style, god_modules, #{limit => 100}}]}
+]}]}].

+ 13 - 0
apps/emqx_resource/examples/demo.erl

@@ -0,0 +1,13 @@
+-module(demo).
+
+-export([start/0]).
+
+start() ->
+    code:load_file(log_tracer),
+    code:load_file(log_tracer_schema),
+    {ok, _} = application:ensure_all_started(minirest),
+    {ok, _} = application:ensure_all_started(emqx_resource),
+    emqx_resource:load_instances("./_build/default/lib/emqx_resource/examples"),
+    Handlers = [{"/", minirest:handler(#{modules => [log_tracer]})}],
+    Dispatch = [{"/[...]", minirest, Handlers}],
+    minirest:start_http(?MODULE, #{socket_opts => [inet, {port, 9900}]}, Dispatch).

+ 147 - 0
apps/emqx_resource/examples/demo.md

@@ -0,0 +1,147 @@
+---
+theme: gaia
+color: #000
+colorSecondary: #333
+backgroundColor: #fff
+backgroundImage: url('https://marp.app/assets/hero-background.jpg')
+paginate: true
+marp: true
+---
+
+<!-- _class: lead -->
+
+# EMQX Resource
+
+---
+
+## What is it for
+
+The [emqx_resource](https://github.com/terry-xiaoyu/emqx_resource) for managing configurations and runtime states for dashboard components .
+
+![bg right](https://docs.emqx.cn/assets/img/rule_action_1@2x.73766093.png)
+
+---
+
+<!-- _class: lead -->
+
+# The Demo
+
+The little log tracer
+
+---
+
+- The hocon schema file (log_tracer_schema.erl):
+
+https://github.com/terry-xiaoyu/emqx_resource/blob/main/examples/log_tracer_schema.erl
+
+- The callback file (log_tracer.erl):
+
+https://github.com/terry-xiaoyu/emqx_resource/blob/main/examples/log_tracer.erl
+
+---
+
+Start the demo log tracer
+
+```
+./demo.sh
+```
+
+Load instance from config files (auto loaded)
+
+```
+## This will load all of the "*.conf" file under that directory:
+
+emqx_resource:load_instances("./_build/default/lib/emqx_resource/examples").
+```
+
+The config file is validated against the schema (`*_schema.erl`) before loaded.
+
+---
+
+# List Types and Instances
+
+- To list all the available resource types:
+
+```
+emqx_resource:list_types().
+emqx_resource:list_instances().
+```
+
+- And there's `*_verbose` versions for these `list_*` APIs:
+
+```
+emqx_resource:list_types_verbose().
+emqx_resource:list_instances_verbose().
+```
+
+---
+# Instance management
+
+- To get a resource types and instances:
+
+```
+emqx_resource:get_type(log_tracer).
+emqx_resource:get_instance("log_tracer_clientid_shawn").
+```
+
+- To create a resource instances:
+
+```
+emqx_resource:create("log_tracer2", log_tracer,
+#{bulk => <<"1KB">>,cache_log_dir => <<"/tmp">>,
+  cache_logs_in => <<"memory">>,chars_limit => 1024,
+  condition => #{<<"app">> => <<"emqx">>},
+  enable_cache => true,level => debug}).
+```
+
+---
+
+- To update a resource:
+
+```
+emqx_resource:update("log_tracer2", log_tracer, #{bulk => <<"100KB">>}, []).
+```
+
+- To delete a resource:
+
+```
+emqx_resource:remove("log_tracer2").
+```
+
+---
+
+<!-- _class: lead -->
+
+# HTTP APIs Demo
+
+---
+
+# Get a log tracer
+
+To list current log tracers:
+
+```
+curl -s -XGET 'http://localhost:9900/log_tracer' | jq .
+```
+
+---
+
+## Update or Create
+
+To update an existing log tracer or create a new one:
+
+```
+INST='{
+  "resource_type": "log_tracer",
+  "config": {
+    "condition": {
+      "app": "emqx"
+    },
+    "level": "debug",
+    "cache_log_dir": "/tmp",
+    "bulk": "10KB",
+    "chars_limit": 1024
+  }
+}'
+curl -sv -XPUT 'http://localhost:9900/log_tracer/log_tracer2' -d $INST | jq .
+```

+ 11 - 0
apps/emqx_resource/examples/log_tracer.conf

@@ -0,0 +1,11 @@
+{
+    "id": "log_tracer_clientid_shawn"
+    "resource_type": "log_tracer"
+    "config": {
+        "condition": {"app": "emqx"}
+        "level": "debug"
+        "cache_log_dir": "/tmp"
+        "bulk": "10KB"
+        "chars_limit": 1024
+    }
+}

+ 45 - 0
apps/emqx_resource/examples/log_tracer.erl

@@ -0,0 +1,45 @@
+-module(log_tracer).
+
+-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
+
+-emqx_resource_api_path("/log_tracer").
+
+%% callbacks of behaviour emqx_resource
+-export([ on_start/2
+        , on_stop/2
+        , on_query/4
+        , on_health_check/2
+        , on_api_reply_format/1
+        , on_config_merge/3
+        ]).
+
+%% callbacks for emqx_resource config schema
+-export([fields/1]).
+
+fields(ConfPath) ->
+    log_tracer_schema:fields(ConfPath).
+
+on_start(InstId, Config) ->
+    io:format("== the demo log tracer ~p started.~nconfig: ~p~n", [InstId, Config]),
+    {ok, #{logger_handler_id => abc, health_checked => 0}}.
+
+on_stop(InstId, State) ->
+    io:format("== the demo log tracer ~p stopped.~nstate: ~p~n", [InstId, State]),
+    ok.
+
+on_query(InstId, Request, AfterQuery, State) ->
+    io:format("== the demo log tracer ~p received request: ~p~nstate: ~p~n",
+        [InstId, Request, State]),
+    emqx_resource:query_success(AfterQuery),
+    "this is a demo log messages...".
+
+on_health_check(InstId, State = #{health_checked := Checked}) ->
+    NState = State#{health_checked => Checked + 1},
+    io:format("== the demo log tracer ~p is working well~nstate: ~p~n", [InstId, NState]),
+    {ok, NState}.
+
+on_api_reply_format(#{id := Id, status := Status, state := #{health_checked := NChecked}}) ->
+    #{id => Id, status => Status, checked_count => NChecked}.
+
+on_config_merge(OldConfig, NewConfig, _Params) ->
+    maps:merge(OldConfig, NewConfig).

+ 45 - 0
apps/emqx_resource/examples/log_tracer_schema.erl

@@ -0,0 +1,45 @@
+-module(log_tracer_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-export([fields/1]).
+
+-reflect_type([t_level/0, t_cache_logs_in/0]).
+
+-type t_level() :: debug | info | notice | warning | error | critical | alert | emergency.
+
+-type t_cache_logs_in() :: memory | file.
+
+fields("config") ->
+    [ {condition, fun condition/1}
+    , {level, fun level/1}
+    , {enable_cache, fun enable_cache/1}
+    , {cache_logs_in, fun cache_logs_in/1}
+    , {cache_log_dir, fun cache_log_dir/1}
+    , {bulk, fun bulk/1}
+    ];
+fields(_) -> [].
+
+condition(mapping) -> "config.condition";
+condition(type) -> map();
+condition(_) -> undefined.
+
+level(mapping) -> "config.level";
+level(type) -> t_level();
+level(_) -> undefined.
+
+enable_cache(mapping) -> "config.enable_cache";
+enable_cache(type) -> boolean();
+enable_cache(_) -> undefined.
+
+cache_logs_in(mapping) -> "config.cache_logs_in";
+cache_logs_in(type) -> t_cache_logs_in();
+cache_logs_in(_) -> undefined.
+
+cache_log_dir(mapping) -> "config.cache_log_dir";
+cache_log_dir(type) -> typerefl:regexp_string("^(.*)$");
+cache_log_dir(_) -> undefined.
+
+bulk(mapping) -> "config.bulk";
+bulk(type) -> typerefl:regexp_string("^[. 0-9]+(B|KB|MB|GB)$");
+bulk(_) -> undefined.

+ 19 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -0,0 +1,19 @@
+-type resource_type() :: module().
+-type instance_id() :: binary().
+-type resource_config() :: jsx:json_term().
+-type resource_spec() :: map().
+-type resource_state() :: term().
+-type resource_data() :: #{
+    id => instance_id(),
+    mod => module(),
+    config => resource_config(),
+    state => resource_state(),
+    status => started | stopped
+}.
+
+-type after_query() :: {OnSuccess :: after_query_fun(), OnFailed :: after_query_fun()} |
+    undefined.
+
+%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
+%% actions upon query failure
+-type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.

+ 3 - 0
apps/emqx_resource/include/emqx_resource_behaviour.hrl

@@ -0,0 +1,3 @@
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-behaviour(emqx_resource).
+-compile({parse_transform, emqx_resource_transform}).

+ 38 - 0
apps/emqx_resource/include/emqx_resource_utils.hrl

@@ -0,0 +1,38 @@
+-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
+
+-define(CLUSTER_CALL(Func, Args, ResParttern),
+%% ekka_mnesia:running_nodes()
+    fun() ->
+        case LocalResult = erlang:apply(?MODULE, Func, Args) of
+            ResParttern ->
+                case rpc:multicall(nodes(), ?MODULE, Func, Args, 5000) of
+                {ResL, []} ->
+                    Filter = fun
+                        (ResParttern) -> false;
+                        ({badrpc, {'EXIT', {undef, [{?MODULE, Func0, _, []}]}}})
+                            when Func0 =:= Func -> false;
+                        (_) -> true
+                    end,
+                    case lists:filter(Filter, ResL) of
+                        [] -> LocalResult;
+                        ErrL -> {error, ErrL}
+                    end;
+                {ResL, BadNodes} ->
+                    {error, {failed_on_nodes, BadNodes, ResL}}
+                end;
+            ErrorResult ->
+                {error, ErrorResult}
+        end
+    end()).
+
+-define(SAFE_CALL(_EXP_),
+        ?SAFE_CALL(_EXP_, _ = do_nothing)).
+
+-define(SAFE_CALL(_EXP_, _EXP_ON_FAIL_),
+        fun() ->
+            try (_EXP_)
+            catch _EXCLASS_:_EXCPTION_:_ST_ ->
+                _EXP_ON_FAIL_,
+                {error, {_EXCLASS_, _EXCPTION_, _ST_}}
+            end
+        end()).

+ 14 - 0
apps/emqx_resource/rebar.config

@@ -0,0 +1,14 @@
+{erl_opts, [ debug_info
+           %, {d, 'RESOURCE_DEBUG'}
+           ]}.
+
+{erl_first_files, ["src/emqx_resource_transform.erl"]}.
+
+{extra_src_dirs, ["examples"]}.
+
+{deps, [ {hocon, {git, "https://github.com/emqx/hocon", {branch, "master"}}}
+       , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
+       , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}
+       , {jsx, {git, "https://github.com/talentdeficit/jsx", {tag, "v3.1.0"}}}
+       ]}.
+

+ 17 - 0
apps/emqx_resource/scripts/elvis-check.sh

@@ -0,0 +1,17 @@
+#!/bin/bash
+
+set -euo pipefail
+
+ELVIS_VERSION='1.0.0-emqx-2'
+
+elvis_version="${2:-$ELVIS_VERSION}"
+
+echo "elvis -v: $elvis_version"
+
+if [ ! -f ./elvis ] || [ "$(./elvis -v | grep -oE '[1-9]+\.[0-9]+\.[0-9]+\-emqx-[0-9]+')" != "$elvis_version" ]; then
+    curl  -fLO "https://github.com/emqx/elvis/releases/download/$elvis_version/elvis"
+    chmod +x ./elvis
+fi
+
+./elvis rock --config elvis.config
+

+ 17 - 0
apps/emqx_resource/src/emqx_resource.app.src

@@ -0,0 +1,17 @@
+{application, emqx_resource,
+ [{description, "An OTP application"},
+  {vsn, "0.1.0"},
+  {registered, []},
+  {mod, {emqx_resource_app, []}},
+  {applications,
+   [kernel,
+    stdlib,
+    gproc,
+    hocon
+   ]},
+  {env,[]},
+  {modules, []},
+
+  {licenses, ["Apache 2.0"]},
+  {links, []}
+ ]}.

+ 274 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -0,0 +1,274 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource).
+
+-include("emqx_resource.hrl").
+-include("emqx_resource_utils.hrl").
+
+%% APIs for resource types
+
+-export([ get_type/1
+        , list_types/0
+        , list_types_verbose/0
+        ]).
+
+-export([ discover_resource_mods/0
+        , is_resource_mod/1
+        , call_instance/2
+        ]).
+
+-export([ query_success/1
+        , query_failed/1
+        ]).
+
+%% APIs for instances
+
+-export([ parse_config/2
+        , resource_type_from_str/1
+        ]).
+
+%% Sync resource instances and files
+%% provisional solution: rpc:multical to all the nodes for creating/updating/removing
+%% todo: replicate operations
+-export([ create/3 %% store the config and start the instance
+        , create_dry_run/3 %% run start/2, health_check/2 and stop/1 sequentially
+        , update/4 %% update the config, stop the old instance and start the new one
+                   %% it will create a new resource when the id does not exist
+        , remove/1 %% remove the config and stop the instance
+        ]).
+
+%% Calls to the callback module with current resource state
+%% They also save the state after the call finished (except query/2,3).
+-export([ restart/1  %% restart the instance.
+        , health_check/1 %% verify if the resource is working normally
+        , stop/1   %% stop the instance
+        , query/2  %% query the instance
+        , query/3  %% query the instance with after_query()
+        ]).
+
+%% Direct calls to the callback module
+-export([ call_start/3  %% start the instance
+        , call_health_check/3 %% verify if the resource is working normally
+        , call_stop/3   %% stop the instance
+        , call_config_merge/4 %% merge the config when updating
+        ]).
+
+-export([ list_instances/0 %% list all the instances, id only.
+        , list_instances_verbose/0 %% list all the instances
+        , get_instance/1 %% return the data of the instance
+        , get_instance_by_type/1 %% return all the instances of the same resource type
+        , load_instances/1 %% load instances from config files
+        % , dependents/1
+        % , inc_counter/2 %% increment the counter of the instance
+        % , inc_counter/3 %% increment the counter by a given integer
+        ]).
+
+-define(EXT, "*.spec").
+
+-optional_callbacks([ on_query/4
+                    , on_health_check/2
+                    , on_api_reply_format/1
+                    , on_config_merge/3
+                    ]).
+
+-callback on_api_reply_format(resource_data()) -> map().
+
+-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
+
+%% when calling emqx_resource:start/1
+-callback on_start(instance_id(), resource_config()) ->
+    {ok, resource_state()} | {error, Reason :: term()}.
+
+%% when calling emqx_resource:stop/1
+-callback on_stop(instance_id(), resource_state()) -> term().
+
+%% when calling emqx_resource:query/3
+-callback on_query(instance_id(), Request :: term(), after_query(), resource_state()) -> term().
+
+%% when calling emqx_resource:health_check/2
+-callback on_health_check(instance_id(), resource_state()) ->
+    {ok, resource_state()} | {error, Reason:: term(), resource_state()}.
+
+%% load specs and return the loaded resources this time.
+-spec list_types_verbose() -> [resource_spec()].
+list_types_verbose() ->
+    [get_spec(Mod) || Mod <- list_types()].
+
+-spec list_types() -> [module()].
+list_types() ->
+    discover_resource_mods().
+
+-spec get_type(module()) -> {ok, resource_spec()} | {error, not_found}.
+get_type(Mod) ->
+    case is_resource_mod(Mod) of
+        true -> {ok, get_spec(Mod)};
+        false -> {error, not_found}
+    end.
+
+-spec get_spec(module()) -> resource_spec().
+get_spec(Mod) ->
+    maps:put(<<"resource_type">>, Mod, Mod:emqx_resource_schema()).
+
+-spec discover_resource_mods() -> [module()].
+discover_resource_mods() ->
+    [Mod || {Mod, _} <- code:all_loaded(), is_resource_mod(Mod)].
+
+-spec is_resource_mod(module()) -> boolean().
+is_resource_mod(Mod) ->
+    erlang:function_exported(Mod, emqx_resource_schema, 0).
+
+-spec query_success(after_query()) -> ok.
+query_success(undefined) -> ok;
+query_success({{OnSucc, Args}, _}) ->
+    safe_apply(OnSucc, Args).
+
+-spec query_failed(after_query()) -> ok.
+query_failed(undefined) -> ok;
+query_failed({_, {OnFailed, Args}}) ->
+    safe_apply(OnFailed, Args).
+
+%% =================================================================================
+%% APIs for resource instances
+%% =================================================================================
+-spec create(instance_id(), resource_type(), resource_config()) ->
+    {ok, resource_data()} | {error, Reason :: term()}.
+create(InstId, ResourceType, Config) ->
+    ?CLUSTER_CALL(call_instance, [InstId, {create, InstId, ResourceType, Config}], {ok, _}).
+
+-spec create_dry_run(instance_id(), resource_type(), resource_config()) ->
+    ok | {error, Reason :: term()}.
+create_dry_run(InstId, ResourceType, Config) ->
+    ?CLUSTER_CALL(call_instance, [InstId, {create_dry_run, InstId, ResourceType, Config}]).
+
+-spec update(instance_id(), resource_type(), resource_config(), term()) ->
+    {ok, resource_data()} | {error, Reason :: term()}.
+update(InstId, ResourceType, Config, Params) ->
+    ?CLUSTER_CALL(call_instance, [InstId, {update, InstId, ResourceType, Config, Params}], {ok, _}).
+
+-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
+remove(InstId) ->
+    ?CLUSTER_CALL(call_instance, [InstId, {remove, InstId}]).
+
+-spec query(instance_id(), Request :: term()) -> Result :: term().
+query(InstId, Request) ->
+    query(InstId, Request, undefined).
+
+%% same to above, also defines what to do when the Module:on_query success or failed
+%% it is the duty of the Moudle to apply the `after_query()` functions.
+-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
+query(InstId, Request, AfterQuery) ->
+    case get_instance(InstId) of
+        {ok, #{mod := Mod, state := ResourceState}} ->
+            %% the resource state is readonly to Moudle:on_query/4
+            %% and the `after_query()` functions should be thread safe
+            Mod:on_query(InstId, Request, AfterQuery, ResourceState);
+        {error, Reason} ->
+            error({get_instance, {InstId, Reason}})
+    end.
+
+-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
+restart(InstId) ->
+    call_instance(InstId, {restart, InstId}).
+
+-spec stop(instance_id()) -> ok | {error, Reason :: term()}.
+stop(InstId) ->
+    call_instance(InstId, {stop, InstId}).
+
+-spec health_check(instance_id()) -> ok | {error, Reason :: term()}.
+health_check(InstId) ->
+    call_instance(InstId, {health_check, InstId}).
+
+-spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}.
+get_instance(InstId) ->
+    emqx_resource_instance:lookup(InstId).
+
+-spec list_instances() -> [instance_id()].
+list_instances() ->
+    [Id || #{id := Id} <- list_instances_verbose()].
+
+-spec list_instances_verbose() -> [resource_data()].
+list_instances_verbose() ->
+    emqx_resource_instance:list_all().
+
+-spec get_instance_by_type(module()) -> [resource_data()].
+get_instance_by_type(ResourceType) ->
+    emqx_resource_instance:lookup_by_type(ResourceType).
+
+-spec load_instances(Dir :: string()) -> ok.
+load_instances(Dir) ->
+    emqx_resource_instance:load(Dir).
+
+-spec call_start(instance_id(), module(), resource_config()) ->
+    {ok, resource_state()} | {error, Reason :: term()}.
+call_start(InstId, Mod, Config) ->
+    ?SAFE_CALL(Mod:on_start(InstId, Config)).
+
+-spec call_health_check(instance_id(), module(), resource_state()) ->
+    {ok, resource_state()} | {error, Reason:: term(), resource_state()}.
+call_health_check(InstId, Mod, ResourceState) ->
+    ?SAFE_CALL(Mod:on_health_check(InstId, ResourceState)).
+
+-spec call_stop(instance_id(), module(), resource_state()) -> term().
+call_stop(InstId, Mod, ResourceState) ->
+    ?SAFE_CALL(Mod:on_stop(InstId, ResourceState)).
+
+-spec call_config_merge(module(), resource_config(), resource_config(), term()) ->
+    resource_config().
+call_config_merge(Mod, OldConfig, NewConfig, Params) ->
+    ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)).
+
+-spec parse_config(resource_type(), binary() | term()) ->
+    {ok, resource_config()} | {error, term()}.
+parse_config(ResourceType, RawConfig) when is_binary(RawConfig) ->
+    case hocon:binary(RawConfig, #{format => richmap}) of
+        {ok, MapConfig} ->
+            do_parse_config(ResourceType, MapConfig);
+        Error -> Error
+    end;
+parse_config(ResourceType, RawConfigTerm) ->
+    parse_config(ResourceType, jsx:encode(#{<<"config">> => RawConfigTerm})).
+
+-spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}.
+do_parse_config(ResourceType, MapConfig) ->
+    case ?SAFE_CALL(hocon_schema:generate(ResourceType, MapConfig)) of
+        {error, Reason} -> {error, Reason};
+        Config ->
+            InstConf = maps:from_list(proplists:get_value(config, Config)),
+            {ok, InstConf}
+    end.
+
+%% =================================================================================
+
+-spec resource_type_from_str(string()) -> {ok, resource_type()} | {error, term()}.
+resource_type_from_str(ResourceType) ->
+    try Mod = list_to_existing_atom(str(ResourceType)),
+        case emqx_resource:is_resource_mod(Mod) of
+            true -> {ok, Mod};
+            false -> {error, {invalid_resource, Mod}}
+        end
+    catch error:badarg ->
+        {error, {not_found, ResourceType}}
+    end.
+
+call_instance(InstId, Query) ->
+    emqx_resource_instance:hash_call(InstId, Query).
+
+safe_apply(Func, Args) ->
+    ?SAFE_CALL(erlang:apply(Func, Args)).
+
+str(S) when is_binary(S) -> binary_to_list(S);
+str(S) when is_list(S) -> S.

+ 64 - 0
apps/emqx_resource/src/emqx_resource_api.erl

@@ -0,0 +1,64 @@
+-module(emqx_resource_api).
+
+-export([ get_all/3
+        , get/3
+        , put/3
+        , delete/3
+        ]).
+get_all(Mod, _Binding, _Params) ->
+    {200, #{code => 0, data =>
+        [format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}.
+
+get(Mod, #{id := Id}, _Params) ->
+    case emqx_resource:get_instance(stringnify(Id)) of
+        {ok, Data} ->
+            {200, #{code => 0, data => format_data(Mod, Data)}};
+        {error, not_found} ->
+            {404, #{code => 102, message => {resource_instance_not_found, stringnify(Id)}}}
+    end.
+
+put(Mod, #{id := Id}, Params) ->
+    ConfigParams = proplists:get_value(<<"config">>, Params),
+    ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params),
+    case emqx_resource:resource_type_from_str(ResourceTypeStr) of
+        {ok, ResourceType} ->
+            do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params);
+        {error, Reason} ->
+            {404, #{code => 102, message => stringnify(Reason)}}
+    end.
+
+do_put(Mod, Id, ConfigParams, ResourceType, Params) ->
+    case emqx_resource:parse_config(ResourceType, ConfigParams) of
+        {ok, Config} ->
+            case emqx_resource:update(Id, ResourceType, Config, Params) of
+                {ok, Data} ->
+                    {200, #{code => 0, data => format_data(Mod, Data)}};
+                {error, Reason} ->
+                    {500, #{code => 102, message => stringnify(Reason)}}
+            end;
+        {error, Reason} ->
+            {400, #{code => 108, message => stringnify(Reason)}}
+    end.
+
+delete(_Mod, #{id := Id}, _Params) ->
+    case emqx_resource:remove(stringnify(Id)) of
+        ok -> {200, #{code => 0, data => #{}}};
+        {error, Reason} ->
+            {500, #{code => 102, message => stringnify(Reason)}}
+    end.
+
+format_data(Mod, Data) ->
+    case erlang:function_exported(Mod, on_api_reply_format, 1) of
+        false ->
+            default_api_reply_format(Data);
+        true ->
+            Mod:on_api_reply_format(Data)
+    end.
+
+default_api_reply_format(#{id := Id, status := Status, config := Config}) ->
+    #{node => node(), id => Id, status => Status, config => Config}.
+
+stringnify(Bin) when is_binary(Bin) -> Bin;
+stringnify(Str) when is_list(Str) -> list_to_binary(Str);
+stringnify(Reason) ->
+    iolist_to_binary(io_lib:format("~p", [Reason])).

+ 31 - 0
apps/emqx_resource/src/emqx_resource_app.erl

@@ -0,0 +1,31 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_app).
+
+-behaviour(application).
+
+-include("emqx_resource.hrl").
+
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+    emqx_resource_sup:start_link().
+
+stop(_State) ->
+    ok.
+
+%% internal functions

+ 294 - 0
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -0,0 +1,294 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_instance).
+
+-behaviour(gen_server).
+
+-include("emqx_resource.hrl").
+-include("emqx_resource_utils.hrl").
+
+-export([start_link/2]).
+
+%% load resource instances from *.conf files
+-export([ load/1
+        , lookup/1
+        , list_all/0
+        , lookup_by_type/1
+        ]).
+
+-export([ hash_call/2
+        , hash_call/3
+        ]).
+
+%% gen_server Callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        , code_change/3
+        ]).
+
+-record(state, {worker_pool, worker_id}).
+
+-type state() :: #state{}.
+
+%%------------------------------------------------------------------------------
+%% Start the registry
+%%------------------------------------------------------------------------------
+
+start_link(Pool, Id) ->
+    gen_server:start_link({local, proc_name(?MODULE, Id)},
+                          ?MODULE, {Pool, Id}, []).
+
+%% call the worker by the hash of resource-instance-id, to make sure we always handle
+%% operations on the same instance in the same worker.
+hash_call(InstId, Request) ->
+    hash_call(InstId, Request, infinity).
+
+hash_call(InstId, Request, Timeout) ->
+    gen_server:call(pick(InstId), Request, Timeout).
+
+-spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}.
+lookup(InstId) ->
+    case ets:lookup(emqx_resource_instance, InstId) of
+        [] -> {error, not_found};
+        [{_, Data}] -> {ok, Data#{id => InstId}}
+    end.
+
+force_lookup(InstId) ->
+    {ok, Data} = lookup(InstId),
+    Data.
+
+-spec list_all() -> [resource_data()].
+list_all() ->
+    [Data#{id => Id} || {Id, Data} <- ets:tab2list(emqx_resource_instance)].
+
+-spec lookup_by_type(module()) -> [resource_data()].
+lookup_by_type(ResourceType) ->
+    [Data || #{mod := Mod} = Data <- list_all()
+             , Mod =:= ResourceType].
+
+-spec load(Dir :: string()) -> ok.
+load(Dir) ->
+    lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))).
+
+load_file(File) ->
+    case ?SAFE_CALL(hocon_token:read(File)) of
+        {error, Reason} ->
+            logger:error("load resource from ~p failed: ~p", [File, Reason]);
+        RawConfig ->
+            case hocon:binary(RawConfig, #{format => map}) of
+                {ok, #{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr,
+                       <<"config">> := MapConfig}} ->
+                    case emqx_resource:resource_type_from_str(ResourceTypeStr) of
+                        {ok, ResourceType} ->
+                            parse_and_load_config(Id, ResourceType, MapConfig);
+                        {error, Reason} ->
+                            logger:error("no such resource type: ~s, ~p",
+                                [ResourceTypeStr, Reason])
+                    end;
+                {error, Reason} ->
+                    logger:error("load resource from ~p failed: ~p", [File, Reason])
+            end
+    end.
+
+parse_and_load_config(InstId, ResourceType, MapConfig) ->
+    case emqx_resource:parse_config(ResourceType, MapConfig) of
+        {error, Reason} ->
+            logger:error("parse config for resource ~p of type ~p failed: ~p",
+                [InstId, ResourceType, Reason]);
+        {ok, InstConf} ->
+            create_instance_local(InstId, ResourceType, InstConf)
+    end.
+
+create_instance_local(InstId, ResourceType, InstConf) ->
+    case do_create(InstId, ResourceType, InstConf) of
+        {ok, Data} ->
+            logger:debug("created ~p resource instance: ~p from config: ~p, Data: ~p",
+                [ResourceType, InstId, InstConf, Data]);
+        {error, Reason} ->
+            logger:error("create ~p resource instance: ~p failed: ~p, config: ~p",
+                [ResourceType, InstId, Reason, InstConf])
+    end.
+
+%%------------------------------------------------------------------------------
+%% gen_server callbacks
+%%------------------------------------------------------------------------------
+
+-spec init({atom(), integer()}) ->
+    {ok, State :: state()} | {ok, State :: state(), timeout() | hibernate | {continue, term()}} |
+    {stop, Reason :: term()} | ignore.
+init({Pool, Id}) ->
+    true = gproc_pool:connect_worker(Pool, {Pool, Id}),
+    {ok, #state{worker_pool = Pool, worker_id = Id}}.
+
+handle_call({create, InstId, ResourceType, Config}, _From, State) ->
+    {reply, do_create(InstId, ResourceType, Config), State};
+
+handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) ->
+    {reply, do_create_dry_run(InstId, ResourceType, Config), State};
+
+handle_call({update, InstId, ResourceType, Config, Params}, _From, State) ->
+    {reply, do_update(InstId, ResourceType, Config, Params), State};
+
+handle_call({remove, InstId}, _From, State) ->
+    {reply, do_remove(InstId), State};
+
+handle_call({restart, InstId}, _From, State) ->
+    {reply, do_restart(InstId), State};
+
+handle_call({stop, InstId}, _From, State) ->
+    {reply, do_stop(InstId), State};
+
+handle_call({health_check, InstId}, _From, State) ->
+    {reply, do_health_check(InstId), State};
+
+handle_call(Req, _From, State) ->
+    logger:error("Received unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, #state{worker_pool = Pool, worker_id = Id}) ->
+    gproc_pool:disconnect_worker(Pool, {Pool, Id}).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%------------------------------------------------------------------------------
+
+do_update(InstId, ResourceType, NewConfig, Params) ->
+    case lookup(InstId) of
+        {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
+            Config = emqx_resource:call_config_merge(ResourceType, OldConfig,
+                        NewConfig, Params),
+            case do_create_dry_run(InstId, ResourceType, Config) of
+                ok ->
+                    do_remove(ResourceType, InstId, ResourceState),
+                    do_create(InstId, ResourceType, Config);
+                Error ->
+                    Error
+            end;
+        {ok, #{mod := Mod}} when Mod =/= ResourceType ->
+            {error, updating_to_incorrect_resource_type};
+        {error, not_found} ->
+            do_create(InstId, ResourceType, NewConfig)
+    end.
+
+do_create(InstId, ResourceType, Config) ->
+    case lookup(InstId) of
+        {ok, _} -> {error, already_created};
+        _ ->
+            case emqx_resource:call_start(InstId, ResourceType, Config) of
+                {ok, ResourceState} ->
+                    ets:insert(emqx_resource_instance, {InstId,
+                        #{mod => ResourceType, config => Config,
+                          state => ResourceState, status => stopped}}),
+                    _ = do_health_check(InstId),
+                    {ok, force_lookup(InstId)};
+                {error, Reason} ->
+                    logger:error("start ~s resource ~s failed: ~p", [ResourceType, InstId, Reason]),
+                    {error, Reason}
+            end
+    end.
+
+do_create_dry_run(InstId, ResourceType, Config) ->
+    case emqx_resource:call_start(InstId, ResourceType, Config) of
+        {ok, ResourceState0} ->
+            Return = case emqx_resource:call_health_check(InstId, ResourceType, ResourceState0) of
+                {ok, ResourceState1} -> ok;
+                {error, Reason, ResourceState1} ->
+                    {error, Reason}
+            end,
+            _ = emqx_resource:call_stop(InstId, ResourceType, ResourceState1),
+            Return;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+do_remove(InstId) ->
+    case lookup(InstId) of
+        {ok, #{mod := Mod, state := ResourceState}} ->
+            do_remove(Mod, InstId, ResourceState);
+        Error ->
+            Error
+    end.
+
+do_remove(Mod, InstId, ResourceState) ->
+    _ = emqx_resource:call_stop(InstId, Mod, ResourceState),
+    ets:delete(emqx_resource_instance, InstId),
+    ok.
+
+do_restart(InstId) ->
+    case lookup(InstId) of
+        {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} ->
+            _ = emqx_resource:call_stop(InstId, Mod, ResourceState),
+            case emqx_resource:call_start(InstId, Mod, Config) of
+                {ok, ResourceState} ->
+                    ets:insert(emqx_resource_instance,
+                        {InstId, Data#{state => ResourceState, status => started}}),
+                    ok;
+                {error, Reason} ->
+                    ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
+                    {error, Reason}
+            end;
+        Error ->
+            Error
+    end.
+
+do_stop(InstId) ->
+    case lookup(InstId) of
+        {ok, #{mod := Mod, state := ResourceState} = Data} ->
+            _ = emqx_resource:call_stop(InstId, Mod, ResourceState),
+            ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
+            ok;
+        Error ->
+            Error
+    end.
+
+do_health_check(InstId) ->
+    case lookup(InstId) of
+        {ok, #{mod := Mod, state := ResourceState0} = Data} ->
+            case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
+                {ok, ResourceState1} ->
+                    ets:insert(emqx_resource_instance,
+                        {InstId, Data#{status => started, state => ResourceState1}}),
+                    ok;
+                {error, Reason, ResourceState1} ->
+                    logger:error("health check for ~p failed: ~p", [InstId, Reason]),
+                    ets:insert(emqx_resource_instance,
+                        {InstId, Data#{status => stopped, state => ResourceState1}}),
+                    {error, Reason}
+            end;
+        Error ->
+            Error
+    end.
+
+%%------------------------------------------------------------------------------
+%% internal functions
+%%------------------------------------------------------------------------------
+
+proc_name(Mod, Id) ->
+    list_to_atom(lists:concat([Mod, "_", Id])).
+
+pick(InstId) ->
+    gproc_pool:pick_worker(emqx_resource_instance, InstId).

+ 48 - 0
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -0,0 +1,48 @@
+%%%-------------------------------------------------------------------
+%% @doc emqx_resource top level supervisor.
+%% @end
+%%%-------------------------------------------------------------------
+
+-module(emqx_resource_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(RESOURCE_INST_MOD, emqx_resource_instance).
+-define(POOL_SIZE, 64). %% set a very large pool size in case all the workers busy
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    TabOpts = [named_table, set, public, {read_concurrency, true}],
+    _ = ets:new(emqx_resource_instance, TabOpts),
+
+    SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
+    Pool = ?RESOURCE_INST_MOD,
+    Mod = ?RESOURCE_INST_MOD,
+    ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]),
+    {ok, {SupFlags, [
+        begin
+            ensure_pool_worker(Pool, {Pool, Idx}, Idx),
+            #{id => {Mod, Idx},
+              start => {Mod, start_link, [Pool, Idx]},
+              restart => transient,
+              shutdown => 5000, type => worker, modules => [Mod]}
+        end || Idx <- lists:seq(1, ?POOL_SIZE)]}}.
+
+%% internal functions
+ensure_pool(Pool, Type, Opts) ->
+    try gproc_pool:new(Pool, Type, Opts)
+    catch
+        error:exists -> ok
+    end.
+
+ensure_pool_worker(Pool, Name, Slot) ->
+    try gproc_pool:add_worker(Pool, Name, Slot)
+    catch
+        error:exists -> ok
+    end.

+ 99 - 0
apps/emqx_resource/src/emqx_resource_transform.erl

@@ -0,0 +1,99 @@
+-module(emqx_resource_transform).
+
+-include_lib("syntax_tools/include/merl.hrl").
+
+-export([parse_transform/2]).
+
+parse_transform(Forms, _Opts) ->
+    Mod = hd([M || {attribute, _, module, M} <- Forms]),
+    AST = trans(Mod, proplists:delete(eof, Forms)),
+    debug_print(Mod, AST),
+    AST.
+
+-ifdef(RESOURCE_DEBUG).
+
+debug_print(Mod, Ts) ->
+    {ok, Io} = file:open("./" ++ atom_to_list(Mod) ++ ".trans.erl", [write]),
+    do_debug_print(Io, Ts),
+    file:close(Io).
+
+do_debug_print(Io, Ts) when is_list(Ts) ->
+    lists:foreach(fun(T) -> do_debug_print(Io, T) end, Ts);
+do_debug_print(Io, T) ->
+    io:put_chars(Io, erl_prettypr:format(merl:tree(T))),
+    io:nl(Io).
+-else.
+debug_print(_Mod, _AST) ->
+    ok.
+-endif.
+
+trans(Mod, Forms) ->
+    forms(Mod, Forms) ++ [erl_syntax:revert(erl_syntax:eof_marker())].
+
+forms(Mod, [F0 | Fs0]) ->
+    case form(Mod, F0) of
+        {CurrForm, AppendedForms} ->
+            CurrForm ++ forms(Mod, Fs0) ++ AppendedForms;
+        {AHeadForms, CurrForm, AppendedForms} ->
+            AHeadForms ++ CurrForm ++ forms(Mod, Fs0) ++ AppendedForms
+    end;
+forms(_, []) -> [].
+
+form(Mod, Form) ->
+    case Form of
+        ?Q("-emqx_resource_api_path('@Path').") ->
+            {fix_spec_attrs() ++ fix_api_attrs(erl_syntax:concrete(Path)) ++ fix_api_exports(),
+             [],
+             fix_spec_funcs(Mod) ++ fix_api_funcs(Mod)};
+        _ ->
+            %io:format("---other form: ~p~n", [Form]),
+            {[], [Form], []}
+    end.
+
+fix_spec_attrs() ->
+    [ ?Q("-export([emqx_resource_schema/0]).")
+    , ?Q("-export([structs/0]).")
+    , ?Q("-behaviour(hocon_schema).")
+    ].
+fix_spec_funcs(_Mod) ->
+    [ (?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>."))
+    , ?Q("structs() -> [\"config\"].")
+    ].
+
+fix_api_attrs(Path0) ->
+    BaseName = filename:basename(Path0),
+    Path = "/" ++ BaseName,
+    [erl_syntax:revert(
+        erl_syntax:attribute(?Q("rest_api"), [
+            erl_syntax:abstract(#{
+                name => list_to_atom(Name ++ "_log_tracers"),
+                method => Method,
+                path => mk_path(Path, WithId),
+                func => Func,
+                descr => Name ++ " the " ++ BaseName})]))
+       || {Name, Method, WithId, Func} <- [
+            {"list", 'GET', noid, api_get_all},
+            {"get", 'GET', id, api_get},
+            {"update", 'PUT', id, api_put},
+            {"delete", 'DELETE', id, api_delete}]].
+
+fix_api_exports() ->
+    [?Q("-export([api_get_all/2, api_get/2, api_put/2, api_delete/2]).")].
+
+fix_api_funcs(Mod) ->
+    [erl_syntax:revert(?Q(
+        "api_get_all(Binding, Params) ->
+            emqx_resource_api:get_all('@Mod@', Binding, Params).")),
+     erl_syntax:revert(?Q(
+        "api_get(Binding, Params) ->
+            emqx_resource_api:get('@Mod@', Binding, Params).")),
+     erl_syntax:revert(?Q(
+        "api_put(Binding, Params) ->
+            emqx_resource_api:put('@Mod@', Binding, Params).")),
+     erl_syntax:revert(?Q(
+        "api_delete(Binding, Params) ->
+            emqx_resource_api:delete('@Mod@', Binding, Params)."))
+    ].
+
+mk_path(Path, id) -> Path ++ "/:bin:id";
+mk_path(Path, noid) -> Path.

+ 1 - 0
apps/emqx_resource/src/emqx_resource_uitils.erl

@@ -0,0 +1 @@
+-module(emqx_resource_uitils).

+ 63 - 0
apps/emqx_resource/src/emqx_resource_validator.erl

@@ -0,0 +1,63 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_validator).
+
+-export([ min/2
+        , max/2
+        , equals/2
+        , enum/1
+        , required/1
+        ]).
+
+max(Type, Max) ->
+    limit(Type, '=<', Max).
+
+min(Type, Min) ->
+    limit(Type, '>=', Min).
+
+equals(Type, Expected) ->
+    limit(Type, '==', Expected).
+
+enum(Items) ->
+    fun(Value) ->
+        return(lists:member(Value, Items),
+            err_limit({enum, {is_member_of, Items}, {got, Value}}))
+    end.
+
+required(ErrMsg) ->
+    fun(undefined) -> {error, ErrMsg};
+       (_) -> ok
+    end.
+
+limit(Type, Op, Expected) ->
+    L = len(Type),
+    fun(Value) ->
+        Got = L(Value),
+        return(erlang:Op(Got, Expected),
+            err_limit({Type, {Op, Expected}, {got, Got}}))
+    end.
+
+len(array) -> fun erlang:length/1;
+len(string) -> fun string:length/1;
+len(_Type) -> fun(Val) -> Val end.
+
+err_limit({Type, {Op, Expected}, {got, Got}}) ->
+    io_lib:format("Expect the ~s value ~s ~p but got: ~p", [Type, Op, Expected, Got]).
+
+return(true, _) -> ok;
+return(false, Error) ->
+    {error, Error}.