Browse Source

fix(ds): Fix static checks

ieQu1 2 years ago
parent
commit
8e5dda40be

+ 1 - 1
Makefile

@@ -85,7 +85,7 @@ $(REL_PROFILES:%=%-compile): $(REBAR) merge-config
 
 
 .PHONY: ct
 .PHONY: ct
 ct: $(REBAR) merge-config
 ct: $(REBAR) merge-config
-	ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct
+	@ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct
 
 
 ## only check bpapi for enterprise profile because it's a super-set.
 ## only check bpapi for enterprise profile because it's a super-set.
 .PHONY: static_checks
 .PHONY: static_checks

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -18,6 +18,7 @@
 {emqx_dashboard,1}.
 {emqx_dashboard,1}.
 {emqx_delayed,1}.
 {emqx_delayed,1}.
 {emqx_delayed,2}.
 {emqx_delayed,2}.
+{emqx_ds,1}.
 {emqx_eviction_agent,1}.
 {emqx_eviction_agent,1}.
 {emqx_eviction_agent,2}.
 {emqx_eviction_agent,2}.
 {emqx_exhook,1}.
 {emqx_exhook,1}.

+ 37 - 7
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -61,6 +61,13 @@
 %% session table operations
 %% session table operations
 -export([create_tables/0]).
 -export([create_tables/0]).
 
 
+%% Remove me later (satisfy checks for an unused BPAPI)
+-export([
+    do_open_iterator/3,
+    do_ensure_iterator_closed/1,
+    do_ensure_all_iterators_closed/1
+]).
+
 -ifdef(TEST).
 -ifdef(TEST).
 -export([session_open/1]).
 -export([session_open/1]).
 -endif.
 -endif.
@@ -268,13 +275,17 @@ get_subscription(TopicFilter, #{iterators := Iters}) ->
     {ok, emqx_types:publish_result(), replies(), session()}
     {ok, emqx_types:publish_result(), replies(), session()}
     | {error, emqx_types:reason_code()}.
     | {error, emqx_types:reason_code()}.
 publish(_PacketId, Msg, Session) ->
 publish(_PacketId, Msg, Session) ->
-    ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]),
-    {ok, persisted, [], Session}.
+    %% TODO:
+    Result = emqx_broker:publish(Msg),
+    {ok, Result, [], Session}.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Client -> Broker: PUBACK
 %% Client -> Broker: PUBACK
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+%% FIXME: parts of the commit offset function are mocked
+-dialyzer({nowarn_function, puback/3}).
+
 -spec puback(clientinfo(), emqx_types:packet_id(), session()) ->
 -spec puback(clientinfo(), emqx_types:packet_id(), session()) ->
     {ok, emqx_types:message(), replies(), session()}
     {ok, emqx_types:message(), replies(), session()}
     | {error, emqx_types:reason_code()}.
     | {error, emqx_types:reason_code()}.
@@ -323,17 +334,16 @@ pubcomp(_ClientInfo, _PacketId, _Session = #{}) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 -spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
 -spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
-    {ok, emqx_types:message(), replies(), session()}.
+    {ok, replies(), session()}.
 deliver(_ClientInfo, _Delivers, Session) ->
 deliver(_ClientInfo, _Delivers, Session) ->
-    %% This may be triggered for the system messages. FIXME.
+    %% TODO: QoS0 and system messages end up here.
     {ok, [], Session}.
     {ok, [], Session}.
 
 
--spec handle_timeout(clientinfo(), emqx_session:common_timer_name(), session()) ->
+-spec handle_timeout(clientinfo(), _Timeout, session()) ->
     {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
     {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
 handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) ->
 handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) ->
     WindowSize = 100,
     WindowSize = 100,
     {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize),
     {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize),
-    %%logger:warning("Inflight: ~p", [Inflight]),
     ensure_timer(pull),
     ensure_timer(pull),
     {ok, Publishes, Session#{inflight => Inflight}};
     {ok, Publishes, Session#{inflight => Inflight}};
 handle_timeout(_ClientInfo, get_streams, Session = #{id := Id}) ->
 handle_timeout(_ClientInfo, get_streams, Session = #{id := Id}) ->
@@ -601,6 +611,26 @@ new_subscription_id(DSSessionId, TopicFilter) ->
     DSSubId = {DSSessionId, TopicFilter},
     DSSubId = {DSSessionId, TopicFilter},
     {DSSubId, NowMS}.
     {DSSubId, NowMS}.
 
 
+%%--------------------------------------------------------------------
+%% RPC targets (v1)
+%%--------------------------------------------------------------------
+
+%% RPC target.
+-spec do_open_iterator(emqx_types:words(), emqx_ds:time(), emqx_ds:iterator_id()) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _Reason}.
+do_open_iterator(_TopicFilter, _StartMS, _IteratorID) ->
+    {error, not_implemented}.
+
+%% RPC target.
+-spec do_ensure_iterator_closed(emqx_ds:iterator_id()) -> ok.
+do_ensure_iterator_closed(_IteratorID) ->
+    ok.
+
+%% RPC target.
+-spec do_ensure_all_iterators_closed(id()) -> ok.
+do_ensure_all_iterators_closed(_DSSessionID) ->
+    ok.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Reading batches
 %% Reading batches
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -677,5 +707,5 @@ export_record(_, _, [], Acc) ->
 
 
 -spec ensure_timer(pull | get_streams) -> ok.
 -spec ensure_timer(pull | get_streams) -> ok.
 ensure_timer(Type) ->
 ensure_timer(Type) ->
-    emqx_utils:start_timer(100, {emqx_session, Type}),
+    _ = emqx_utils:start_timer(100, {emqx_session, Type}),
     ok.
     ok.

+ 4 - 1
apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl

@@ -20,6 +20,7 @@
 
 
 -export([
 -export([
     introduced_in/0,
     introduced_in/0,
+    deprecated_since/0,
 
 
     open_iterator/4,
     open_iterator/4,
     close_iterator/2,
     close_iterator/2,
@@ -31,9 +32,11 @@
 -define(TIMEOUT, 30_000).
 -define(TIMEOUT, 30_000).
 
 
 introduced_in() ->
 introduced_in() ->
-    %% FIXME
     "5.3.0".
     "5.3.0".
 
 
+deprecated_since() ->
+    "5.4.0".
+
 -spec open_iterator(
 -spec open_iterator(
     [node()],
     [node()],
     emqx_types:words(),
     emqx_types:words(),

+ 0 - 13
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -30,9 +30,6 @@
 %% Message replay API:
 %% Message replay API:
 -export([get_streams/3, make_iterator/3, next/2]).
 -export([get_streams/3, make_iterator/3, next/2]).
 
 
-%% Iterator storage API:
--export([save_iterator/3, get_iterator/2]).
-
 %% Misc. API:
 %% Misc. API:
 -export([]).
 -export([]).
 
 
@@ -101,8 +98,6 @@
 
 
 -type message_id() :: emqx_ds_replication_layer:message_id().
 -type message_id() :: emqx_ds_replication_layer:message_id().
 
 
--type iterator_id() :: term().
-
 -type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
 -type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
 
 
 %%================================================================================
 %%================================================================================
@@ -182,14 +177,6 @@ make_iterator(Stream, TopicFilter, StartTime) ->
 next(Iter, BatchSize) ->
 next(Iter, BatchSize) ->
     emqx_ds_replication_layer:next(Iter, BatchSize).
     emqx_ds_replication_layer:next(Iter, BatchSize).
 
 
--spec save_iterator(db(), iterator_id(), iterator()) -> ok.
-save_iterator(DB, ITRef, Iterator) ->
-    emqx_ds_replication_layer:save_iterator(DB, ITRef, Iterator).
-
--spec get_iterator(db(), iterator_id()) -> get_iterator_result(iterator()).
-get_iterator(DB, ITRef) ->
-    emqx_ds_replication_layer:get_iterator(DB, ITRef).
-
 %%================================================================================
 %%================================================================================
 %% Internal exports
 %% Internal exports
 %%================================================================================
 %%================================================================================

+ 6 - 13
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -25,9 +25,7 @@
     store_batch/3,
     store_batch/3,
     get_streams/3,
     get_streams/3,
     make_iterator/3,
     make_iterator/3,
-    next/2,
-    save_iterator/3,
-    get_iterator/2
+    next/2
 ]).
 ]).
 
 
 %% internal exports:
 %% internal exports:
@@ -169,14 +167,6 @@ next(Iter0, BatchSize) ->
             Other
             Other
     end.
     end.
 
 
--spec save_iterator(db(), emqx_ds:iterator_id(), iterator()) -> ok.
-save_iterator(_DB, _ITRef, _Iterator) ->
-    error(todo).
-
--spec get_iterator(db(), emqx_ds:iterator_id()) -> emqx_ds:get_iterator_result(iterator()).
-get_iterator(_DB, _ITRef) ->
-    error(todo).
-
 %%================================================================================
 %%================================================================================
 %% behavior callbacks
 %% behavior callbacks
 %%================================================================================
 %%================================================================================
@@ -198,12 +188,15 @@ do_drop_shard_v1(Shard) ->
 do_get_streams_v1(Shard, TopicFilter, StartTime) ->
 do_get_streams_v1(Shard, TopicFilter, StartTime) ->
     emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime).
     emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime).
 
 
--spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) ->
+-spec do_make_iterator_v1(
+    shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time()
+) ->
     {ok, iterator()} | {error, _}.
     {ok, iterator()} | {error, _}.
 do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) ->
 do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) ->
     emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime).
     emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime).
 
 
--spec do_next_v1(shard_id(), Iter, pos_integer()) -> emqx_ds:next_result(Iter).
+-spec do_next_v1(shard_id(), emqx_ds_storage_layer:iterator(), pos_integer()) ->
+    emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
 do_next_v1(Shard, Iter, BatchSize) ->
 do_next_v1(Shard, Iter, BatchSize) ->
     emqx_ds_storage_layer:next(Shard, Iter, BatchSize).
     emqx_ds_storage_layer:next(Shard, Iter, BatchSize).
 
 

+ 10 - 6
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl

@@ -28,25 +28,29 @@
 %% API funcions
 %% API funcions
 %%================================================================================
 %%================================================================================
 
 
--spec open_shard(node(), emqx_ds_replication_layer:shard(), emqx_ds:create_db_opts()) ->
+-spec open_shard(node(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
     ok.
     ok.
 open_shard(Node, Shard, Opts) ->
 open_shard(Node, Shard, Opts) ->
     erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]).
     erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]).
 
 
--spec drop_shard(node(), emqx_ds_replication_layer:shard()) ->
+-spec drop_shard(node(), emqx_ds_replication_layer:shard_id()) ->
     ok.
     ok.
 drop_shard(Node, Shard) ->
 drop_shard(Node, Shard) ->
     erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [Shard]).
     erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [Shard]).
 
 
 -spec get_streams(
 -spec get_streams(
-    node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time()
+    node(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
 ) ->
 ) ->
     [{integer(), emqx_ds_replication_layer:stream()}].
     [{integer(), emqx_ds_replication_layer:stream()}].
 get_streams(Node, Shard, TopicFilter, Time) ->
 get_streams(Node, Shard, TopicFilter, Time) ->
     erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
     erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
 
 
 -spec make_iterator(
 -spec make_iterator(
-    node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()
+    node(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
 ) ->
 ) ->
     {ok, emqx_ds_replication_layer:iterator()} | {error, _}.
     {ok, emqx_ds_replication_layer:iterator()} | {error, _}.
 make_iterator(Node, Shard, Stream, TopicFilter, StartTime) ->
 make_iterator(Node, Shard, Stream, TopicFilter, StartTime) ->
@@ -55,9 +59,9 @@ make_iterator(Node, Shard, Stream, TopicFilter, StartTime) ->
     ]).
     ]).
 
 
 -spec next(
 -spec next(
-    node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer()
+    node(), emqx_ds_replication_layer:shard_id(), emqx_ds_storage_layer:iterator(), pos_integer()
 ) ->
 ) ->
-    {ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]}
+    {ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]}
     | {ok, end_of_stream}
     | {ok, end_of_stream}
     | {error, _}.
     | {error, _}.
 next(Node, Shard, Iter, BatchSize) ->
 next(Node, Shard, Iter, BatchSize) ->

BIN
topic_match_test.png