Parcourir la source

feat(ft): add file transfer app and bootstrap replicated ft data structure

Ilya Averyanov il y a 3 ans
Parent
commit
aaaef30be6

+ 7 - 0
apps/emqx/src/emqx_channel.erl

@@ -641,6 +641,13 @@ process_connect(
 %%--------------------------------------------------------------------
 
 process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
+    ?SLOG(
+        warning,
+        #{
+            packet => Packet,
+            packet_id => PacketId
+        }
+    ),
     case
         pipeline(
             [

+ 9 - 0
apps/emqx_ft/README.md

@@ -0,0 +1,9 @@
+emqx_ft
+=====
+
+EMQX file transfer over MQTT
+
+Build
+-----
+
+    $ rebar3 compile

+ 17 - 0
apps/emqx_ft/include/emqx_ft.hrl

@@ -0,0 +1,17 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-define(FT_TAB, emqx_ft).

+ 11 - 0
apps/emqx_ft/rebar.config

@@ -0,0 +1,11 @@
+%% -*- mode: erlang -*-
+
+{erl_opts, [debug_info]}.
+{deps, [{emqx, {path, "../emqx"}}]}.
+
+{shell, [
+    % {config, "config/sys.config"},
+    {apps, [emqx_ft]}
+]}.
+
+{project_plugins, [erlfmt]}.

+ 12 - 0
apps/emqx_ft/src/emqx_ft.app.src

@@ -0,0 +1,12 @@
+{application, emqx_ft, [
+    {description, "EMQX file transfer over MQTT"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {mod, {emqx_ft_app, []}},
+    {applications, [
+        kernel,
+        stdlib
+    ]},
+    {env, []},
+    {modules, []}
+]}.

+ 103 - 0
apps/emqx_ft/src/emqx_ft.erl

@@ -0,0 +1,103 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft).
+
+-include("emqx_ft.hrl").
+
+-export([
+    create_tab/0,
+    hook/0,
+    unhook/0
+]).
+
+-export([
+    on_channel_unregistered/1,
+    on_channel_takeover/3,
+    on_channel_takeovered/3
+]).
+
+-type ft_data() :: #{
+    nodes := list(node())
+}.
+
+-record(emqx_ft, {
+    chan_pid :: pid(),
+    ft_data :: ft_data()
+}).
+
+%%--------------------------------------------------------------------
+%% API for app
+%%--------------------------------------------------------------------
+
+create_tab() ->
+    _Tab = ets:new(?FT_TAB, [
+        set,
+        public,
+        named_table,
+        {keypos, #emqx_ft.chan_pid}
+    ]),
+    ok.
+
+hook() ->
+    % ok = emqx_hooks:put('channel.registered', {?MODULE, on_channel_registered, []}),
+    ok = emqx_hooks:put('channel.unregistered', {?MODULE, on_channel_unregistered, []}),
+    ok = emqx_hooks:put('channel.takeover', {?MODULE, on_channel_takeover, []}),
+    ok = emqx_hooks:put('channel.takeovered', {?MODULE, on_channel_takeovered, []}).
+
+unhook() ->
+    % ok = emqx_hooks:del('channel.registered', {?MODULE, on_channel_registered}),
+    ok = emqx_hooks:del('channel.unregistered', {?MODULE, on_channel_unregistered}),
+    ok = emqx_hooks:del('channel.takeover', {?MODULE, on_channel_takeover}),
+    ok = emqx_hooks:del('channel.takeovered', {?MODULE, on_channel_takeovered}).
+
+%%--------------------------------------------------------------------
+%% Hooks
+%%--------------------------------------------------------------------
+
+on_channel_unregistered(ChanPid) ->
+    ok = delete_ft_data(ChanPid).
+
+on_channel_takeover(_ConnMod, ChanPid, TakeoverData) ->
+    case get_ft_data(ChanPid) of
+        {ok, FTData} ->
+            {ok, TakeoverData#{ft_data => FTData}};
+        none ->
+            ok
+    end.
+
+on_channel_takeovered(_ConnMod, ChanPid, #{ft_data := FTData}) ->
+    ok = put_ft_data(ChanPid, FTData);
+on_channel_takeovered(_ConnMod, _ChanPid, _) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Private funs
+%%--------------------------------------------------------------------
+
+get_ft_data(ChanPid) ->
+    case ets:lookup(?FT_TAB, ChanPid) of
+        [#emqx_ft{ft_data = FTData}] -> {ok, FTData};
+        [] -> none
+    end.
+
+delete_ft_data(ChanPid) ->
+    true = ets:delete(?FT_TAB, ChanPid),
+    ok.
+
+put_ft_data(ChanPid, FTData) ->
+    true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}),
+    ok.

+ 30 - 0
apps/emqx_ft/src/emqx_ft_app.erl

@@ -0,0 +1,30 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_app).
+
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+    {ok, Sup} = emqx_ft_sup:start_link(),
+    ok = emqx_ft:hook(),
+    {ok, Sup}.
+
+stop(_State) ->
+    ok = emqx_ft:unhook(),
+    ok.

+ 49 - 0
apps/emqx_ft/src/emqx_ft_sup.erl

@@ -0,0 +1,49 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%% sup_flags() = #{strategy => strategy(),         % optional
+%%                 intensity => non_neg_integer(), % optional
+%%                 period => pos_integer()}        % optional
+%% child_spec() = #{id => child_id(),       % mandatory
+%%                  start => mfargs(),      % mandatory
+%%                  restart => restart(),   % optional
+%%                  shutdown => shutdown(), % optional
+%%                  type => worker(),       % optional
+%%                  modules => modules()}   % optional
+init([]) ->
+    ok = emqx_ft:create_tab(),
+    SupFlags = #{
+        strategy => one_for_all,
+        intensity => 100,
+        period => 10
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.
+
+%% internal functions