Kaynağa Gözat

refactor(emqx_coap): rename ps to pubsub

Shawn 4 yıl önce
ebeveyn
işleme
583382b8ce

+ 1 - 1
apps/emqx_coap/TODO

@@ -2,7 +2,7 @@
     - Enhance all test case
     - Enhance all test case
 
 
 2. Remove the mqtt adaptor
 2. Remove the mqtt adaptor
-3. Remove the emqx_coap_ps_topics.erl
+3. Remove the emqx_coap_pubsub_topics.erl
 
 
 
 
 ### Problems
 ### Problems

+ 3 - 3
apps/emqx_coap/src/emqx_coap_app.erl

@@ -29,12 +29,12 @@
 start(_Type, _Args) ->
 start(_Type, _Args) ->
     {ok, Sup} = emqx_coap_sup:start_link(),
     {ok, Sup} = emqx_coap_sup:start_link(),
     coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined),
     coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined),
-    coap_server_registry:add_handler([<<"ps">>], emqx_coap_ps_resource, undefined),
-    _ = emqx_coap_ps_topics:start_link(),
+    coap_server_registry:add_handler([<<"ps">>], emqx_coap_pubsub_resource, undefined),
+    _ = emqx_coap_pubsub_topics:start_link(),
     emqx_coap_server:start(application:get_all_env(?APP)),
     emqx_coap_server:start(application:get_all_env(?APP)),
     {ok,Sup}.
     {ok,Sup}.
 
 
 stop(_State) ->
 stop(_State) ->
     coap_server_registry:remove_handler([<<"mqtt">>], emqx_coap_resource, undefined),
     coap_server_registry:remove_handler([<<"mqtt">>], emqx_coap_resource, undefined),
-    coap_server_registry:remove_handler([<<"ps">>], emqx_coap_ps_resource, undefined),
+    coap_server_registry:remove_handler([<<"ps">>], emqx_coap_pubsub_resource, undefined),
     emqx_coap_server:stop(application:get_all_env(?APP)).
     emqx_coap_server:stop(application:get_all_env(?APP)).

+ 14 - 14
apps/emqx_coap/src/emqx_coap_ps_resource.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %% limitations under the License.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
--module(emqx_coap_ps_resource).
+-module(emqx_coap_pubsub_resource).
 
 
 -behaviour(coap_resource).
 -behaviour(coap_resource).
 
 
@@ -113,7 +113,7 @@ coap_observe(ChId, ?PS_PREFIX, TopicPath, Ack, Content) when TopicPath =/= [] ->
     ?LOG(debug, "observe Topic=~p, Ack=~p,Content=~p", [Topic, Ack, Content]),
     ?LOG(debug, "observe Topic=~p, Ack=~p,Content=~p", [Topic, Ack, Content]),
     Pid = get(mqtt_client_pid),
     Pid = get(mqtt_client_pid),
     emqx_coap_mqtt_adapter:subscribe(Pid, Topic),
     emqx_coap_mqtt_adapter:subscribe(Pid, Topic),
-    Code = case emqx_coap_ps_topics:is_topic_timeout(Topic) of
+    Code = case emqx_coap_pubsub_topics:is_topic_timeout(Topic) of
                true  ->
                true  ->
                    nocontent;
                    nocontent;
                false->
                false->
@@ -137,7 +137,7 @@ coap_unobserve({state, ChId, Prefix, TopicPath}) ->
 
 
 handle_info({dispatch, Topic, Payload}, State) ->
 handle_info({dispatch, Topic, Payload}, State) ->
     ?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
     ?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
-    {ok, Ret} = emqx_coap_ps_topics:reset_topic_info(Topic, Payload),
+    {ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
     ?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
     ?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
     {notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
     {notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
 handle_info(Message, State) ->
 handle_info(Message, State) ->
@@ -166,7 +166,7 @@ get_auth([Param|T], Auth=#coap_mqtt_auth{}) ->
     get_auth(T, Auth).
     get_auth(T, Auth).
 
 
 add_topic_info(publish, Topic, MaxAge, Format, Payload) when is_binary(Topic), Topic =/= <<>>  ->
 add_topic_info(publish, Topic, MaxAge, Format, Payload) when is_binary(Topic), Topic =/= <<>>  ->
-    case emqx_coap_ps_topics:lookup_topic_info(Topic) of
+    case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
         [{_, StoredMaxAge, StoredCT, _, _}] ->
         [{_, StoredMaxAge, StoredCT, _, _}] ->
             ?LOG(debug, "publish topic=~p already exists, need reset the topic info", [Topic]),
             ?LOG(debug, "publish topic=~p already exists, need reset the topic info", [Topic]),
             %% check whether the ct value stored matches the ct option in this POST message
             %% check whether the ct value stored matches the ct option in this POST message
@@ -175,9 +175,9 @@ add_topic_info(publish, Topic, MaxAge, Format, Payload) when is_binary(Topic), T
                     {ok, Ret} =
                     {ok, Ret} =
                         case StoredMaxAge =:= MaxAge of
                         case StoredMaxAge =:= MaxAge of
                             true  ->
                             true  ->
-                                emqx_coap_ps_topics:reset_topic_info(Topic, Payload);
+                                emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload);
                             false ->
                             false ->
-                                emqx_coap_ps_topics:reset_topic_info(Topic, MaxAge, Payload)
+                                emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Payload)
                         end,
                         end,
                     {changed, Ret};
                     {changed, Ret};
                 false ->
                 false ->
@@ -186,19 +186,19 @@ add_topic_info(publish, Topic, MaxAge, Format, Payload) when is_binary(Topic), T
             end;
             end;
         [] ->
         [] ->
             ?LOG(debug, "publish topic=~p will be created", [Topic]),
             ?LOG(debug, "publish topic=~p will be created", [Topic]),
-            {ok, Ret} = emqx_coap_ps_topics:add_topic_info(Topic, MaxAge, Format, Payload),
+            {ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, Payload),
             {created, Ret}
             {created, Ret}
     end;
     end;
 
 
 add_topic_info(create, Topic, MaxAge, Format, _Payload) when is_binary(Topic), Topic =/= <<>> ->
 add_topic_info(create, Topic, MaxAge, Format, _Payload) when is_binary(Topic), Topic =/= <<>> ->
-    case emqx_coap_ps_topics:is_topic_existed(Topic) of
+    case emqx_coap_pubsub_topics:is_topic_existed(Topic) of
         true ->
         true ->
             %% Whether we should support CREATE to an existed topic is TBD!!
             %% Whether we should support CREATE to an existed topic is TBD!!
             ?LOG(debug, "create topic=~p already exists, need reset the topic info", [Topic]),
             ?LOG(debug, "create topic=~p already exists, need reset the topic info", [Topic]),
-            {ok, Ret} = emqx_coap_ps_topics:reset_topic_info(Topic, MaxAge, Format, <<>>);
+            {ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Format, <<>>);
         false ->
         false ->
             ?LOG(debug, "create topic=~p will be created", [Topic]),
             ?LOG(debug, "create topic=~p will be created", [Topic]),
-            {ok, Ret} = emqx_coap_ps_topics:add_topic_info(Topic, MaxAge, Format, <<>>)
+            {ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, <<>>)
     end,
     end,
     {created, Ret};
     {created, Ret};
 
 
@@ -275,7 +275,7 @@ return_resource(Topic, Payload, MaxAge, TimeStamp, Content) ->
 
 
 read_last_publish_message(false, Topic, Content=#coap_content{format = QueryFormat}) when is_binary(QueryFormat)->
 read_last_publish_message(false, Topic, Content=#coap_content{format = QueryFormat}) when is_binary(QueryFormat)->
     ?LOG(debug, "the QueryFormat=~p", [QueryFormat]),
     ?LOG(debug, "the QueryFormat=~p", [QueryFormat]),
-    case emqx_coap_ps_topics:lookup_topic_info(Topic) of
+    case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
         [] ->
         [] ->
             {error, not_found};
             {error, not_found};
         [{_, MaxAge, CT, Payload, TimeStamp}] ->
         [{_, MaxAge, CT, Payload, TimeStamp}] ->
@@ -289,7 +289,7 @@ read_last_publish_message(false, Topic, Content=#coap_content{format = QueryForm
     end;
     end;
 
 
 read_last_publish_message(false, Topic, Content) ->
 read_last_publish_message(false, Topic, Content) ->
-    case emqx_coap_ps_topics:lookup_topic_info(Topic) of
+    case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
         [] ->
         [] ->
             {error, not_found};
             {error, not_found};
         [{_, MaxAge, _, Payload, TimeStamp}] ->
         [{_, MaxAge, _, Payload, TimeStamp}] ->
@@ -301,11 +301,11 @@ read_last_publish_message(true, Topic, _Content) ->
     {error, bad_request}.
     {error, bad_request}.
 
 
 delete_topic_info(Topic) ->
 delete_topic_info(Topic) ->
-    case emqx_coap_ps_topics:lookup_topic_info(Topic) of
+    case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
         [] ->
         [] ->
             {error, not_found};
             {error, not_found};
         [{_, _, _, _, _}] ->
         [{_, _, _, _, _}] ->
-            emqx_coap_ps_topics:delete_sub_topics(Topic)
+            emqx_coap_pubsub_topics:delete_sub_topics(Topic)
     end.
     end.
 
 
 topic(Topic) when is_binary(Topic) -> Topic;
 topic(Topic) when is_binary(Topic) -> Topic;

+ 1 - 1
apps/emqx_coap/src/emqx_coap_ps_topics.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %% limitations under the License.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
--module(emqx_coap_ps_topics).
+-module(emqx_coap_pubsub_topics).
 
 
 -behaviour(gen_server).
 -behaviour(gen_server).
 
 

+ 3 - 3
apps/emqx_coap/src/emqx_coap_sup.erl

@@ -32,11 +32,11 @@ init(_Args) ->
                  shutdown => 5000,
                  shutdown => 5000,
                  type     => worker,
                  type     => worker,
                  modules  => [emqx_coap_registry]},
                  modules  => [emqx_coap_registry]},
-    PsTopics = #{id       => emqx_coap_ps_topics,
-                 start    => {emqx_coap_ps_topics, start_link, []},
+    PsTopics = #{id       => emqx_coap_pubsub_topics,
+                 start    => {emqx_coap_pubsub_topics, start_link, []},
                  restart  => permanent,
                  restart  => permanent,
                  shutdown => 5000,
                  shutdown => 5000,
                  type     => worker,
                  type     => worker,
-                 modules  => [emqx_coap_ps_topics]},
+                 modules  => [emqx_coap_pubsub_topics]},
     {ok, {{one_for_all, 10, 3600}, [Registry, PsTopics]}}.
     {ok, {{one_for_all, 10, 3600}, [Registry, PsTopics]}}.
 
 

+ 35 - 35
apps/emqx_coap/test/emqx_coap_ps_SUITE.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %% limitations under the License.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
--module(emqx_coap_ps_SUITE).
+-module(emqx_coap_pubsub_SUITE).
 
 
 -compile(export_all).
 -compile(export_all).
 -compile(nowarn_export_all).
 -compile(nowarn_export_all).
@@ -54,7 +54,7 @@ t_update_max_age(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(TopicInPayload),
+    TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(<<"42">>, CT1),
     ?assertEqual(<<"42">>, CT1),
@@ -65,7 +65,7 @@ t_update_max_age(_Config) ->
     Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 70, format = <<"application/link-format">>, payload = Payload1}),
     Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 70, format = <<"application/link-format">>, payload = Payload1}),
     {ok,created, #coap_content{location_path = LocPath}} = Reply1,
     {ok,created, #coap_content{location_path = LocPath}} = Reply1,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    [{TopicInPayload, MaxAge2, CT2, _ResPayload, _TimeStamp1}] = emqx_coap_ps_topics:lookup_topic_info(TopicInPayload),
+    [{TopicInPayload, MaxAge2, CT2, _ResPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
     ?assertEqual(70, MaxAge2),
     ?assertEqual(70, MaxAge2),
     ?assertEqual(<<"50">>, CT2),
     ?assertEqual(<<"50">>, CT2),
 
 
@@ -82,7 +82,7 @@ t_create_subtopic(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(TopicInPayload),
+    TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(<<"42">>, CT1),
     ?assertEqual(<<"42">>, CT1),
@@ -99,7 +99,7 @@ t_create_subtopic(_Config) ->
     ?LOGT("Reply =~p", [Reply1]),
     ?LOGT("Reply =~p", [Reply1]),
     {ok,created, #coap_content{location_path = LocPath1}} = Reply1,
     {ok,created, #coap_content{location_path = LocPath1}} = Reply1,
     ?assertEqual([<<"/ps/topic1/subtopic">>] ,LocPath1),
     ?assertEqual([<<"/ps/topic1/subtopic">>] ,LocPath1),
-    [{FullTopic, MaxAge2, CT2, _ResPayload, _}] = emqx_coap_ps_topics:lookup_topic_info(FullTopic),
+    [{FullTopic, MaxAge2, CT2, _ResPayload, _}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
     ?assertEqual(60, MaxAge2),
     ?assertEqual(60, MaxAge2),
     ?assertEqual(<<"42">>, CT2),
     ?assertEqual(<<"42">>, CT2),
 
 
@@ -114,13 +114,13 @@ t_over_max_age(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(TopicInPayload),
+    TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?assertEqual(2, MaxAge1),
     ?assertEqual(2, MaxAge1),
     ?assertEqual(<<"42">>, CT1),
     ?assertEqual(<<"42">>, CT1),
 
 
     timer:sleep(3000),
     timer:sleep(3000),
-    ?assertEqual(true, emqx_coap_ps_topics:is_topic_timeout(TopicInPayload)).
+    ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(TopicInPayload)).
 
 
 t_refreash_max_age(_Config) ->
 t_refreash_max_age(_Config) ->
     TopicInPayload = <<"topic1">>,
     TopicInPayload = <<"topic1">>,
@@ -132,7 +132,7 @@ t_refreash_max_age(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(TopicInPayload),
+    TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?LOGT("TimeStamp=~p", [TimeStamp]),
     ?LOGT("TimeStamp=~p", [TimeStamp]),
     ?assertEqual(5, MaxAge1),
     ?assertEqual(5, MaxAge1),
@@ -144,13 +144,13 @@ t_refreash_max_age(_Config) ->
     Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/link-format">>, payload = Payload1}),
     Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/link-format">>, payload = Payload1}),
     {ok,created, #coap_content{location_path = LocPath}} = Reply1,
     {ok,created, #coap_content{location_path = LocPath}} = Reply1,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    [{TopicInPayload, MaxAge2, CT2, _ResPayload, TimeStamp1}] = emqx_coap_ps_topics:lookup_topic_info(TopicInPayload),
+    [{TopicInPayload, MaxAge2, CT2, _ResPayload, TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
     ?LOGT("TimeStamp1=~p", [TimeStamp1]),
     ?LOGT("TimeStamp1=~p", [TimeStamp1]),
     ?assertEqual(5, MaxAge2),
     ?assertEqual(5, MaxAge2),
     ?assertEqual(<<"50">>, CT2),
     ?assertEqual(<<"50">>, CT2),
 
 
     timer:sleep(3000),
     timer:sleep(3000),
-    ?assertEqual(false, emqx_coap_ps_topics:is_topic_timeout(TopicInPayload)),
+    ?assertEqual(false, emqx_coap_pubsub_topics:is_topic_timeout(TopicInPayload)),
 
 
     {ok, deleted, #coap_content{}} = er_coap_client:request(delete, RealURI).
     {ok, deleted, #coap_content{}} = er_coap_client:request(delete, RealURI).
 
 
@@ -168,7 +168,7 @@ t_case01_publish_post(_Config) ->
     ?LOGT("Reply =~p", [Reply1]),
     ?LOGT("Reply =~p", [Reply1]),
     {ok,created, #coap_content{location_path = LocPath1}} = Reply1,
     {ok,created, #coap_content{location_path = LocPath1}} = Reply1,
     ?assertEqual([<<"/ps/maintopic/topic1">>] ,LocPath1),
     ?assertEqual([<<"/ps/maintopic/topic1">>] ,LocPath1),
-    [{FullTopic, MaxAge, CT2, <<>>, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(FullTopic),
+    [{FullTopic, MaxAge, CT2, <<>>, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
     ?assertEqual(60, MaxAge),
     ?assertEqual(60, MaxAge),
     ?assertEqual(<<"42">>, CT2),
     ?assertEqual(<<"42">>, CT2),
 
 
@@ -183,7 +183,7 @@ t_case01_publish_post(_Config) ->
     Reply2 = er_coap_client:request(post, URI2, #coap_content{format = <<"application/octet-stream">>, payload = PubPayload}),
     Reply2 = er_coap_client:request(post, URI2, #coap_content{format = <<"application/octet-stream">>, payload = PubPayload}),
     ?LOGT("Reply =~p", [Reply2]),
     ?LOGT("Reply =~p", [Reply2]),
     {ok,changed, _} = Reply2,
     {ok,changed, _} = Reply2,
-    TopicInfo = [{FullTopic, MaxAge, CT2, PubPayload, _TimeStamp1}] = emqx_coap_ps_topics:lookup_topic_info(FullTopic),
+    TopicInfo = [{FullTopic, MaxAge, CT2, PubPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
     ?LOGT("the topic info =~p", [TopicInfo]),
     ?LOGT("the topic info =~p", [TopicInfo]),
 
 
     assert_recv(FullTopic, PubPayload),
     assert_recv(FullTopic, PubPayload),
@@ -203,7 +203,7 @@ t_case02_publish_post(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?assertEqual(60, MaxAge),
     ?assertEqual(60, MaxAge),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(<<"42">>, CT),
 
 
@@ -214,7 +214,7 @@ t_case02_publish_post(_Config) ->
     Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/octet-stream">>, payload = NewPayload}),
     Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/octet-stream">>, payload = NewPayload}),
     ?LOGT("Reply =~p", [Reply1]),
     ?LOGT("Reply =~p", [Reply1]),
     {ok,changed, _} = Reply1,
     {ok,changed, _} = Reply1,
-    [{Topic, MaxAge, CT, NewPayload, _TimeStamp1}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, NewPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
 
 
     assert_recv(Topic, NewPayload),
     assert_recv(Topic, NewPayload),
     {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
     {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
@@ -233,7 +233,7 @@ t_case03_publish_post(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?assertEqual(60, MaxAge),
     ?assertEqual(60, MaxAge),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(<<"42">>, CT),
 
 
@@ -258,13 +258,13 @@ t_case04_publish_post(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?assertEqual(5, MaxAge),
     ?assertEqual(5, MaxAge),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(<<"42">>, CT),
 
 
     %% after max age timeout, the topic still exists but the status is timeout
     %% after max age timeout, the topic still exists but the status is timeout
     timer:sleep(6000),
     timer:sleep(6000),
-    ?assertEqual(true, emqx_coap_ps_topics:is_topic_timeout(Topic)),
+    ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)),
 
 
     {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
     {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
 
 
@@ -281,7 +281,7 @@ t_case01_publish_put(_Config) ->
     ?LOGT("Reply =~p", [Reply1]),
     ?LOGT("Reply =~p", [Reply1]),
     {ok,created, #coap_content{location_path = LocPath1}} = Reply1,
     {ok,created, #coap_content{location_path = LocPath1}} = Reply1,
     ?assertEqual([<<"/ps/maintopic/topic1">>] ,LocPath1),
     ?assertEqual([<<"/ps/maintopic/topic1">>] ,LocPath1),
-    [{FullTopic, MaxAge, CT2, <<>>, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(FullTopic),
+    [{FullTopic, MaxAge, CT2, <<>>, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
     ?assertEqual(60, MaxAge),
     ?assertEqual(60, MaxAge),
     ?assertEqual(<<"42">>, CT2),
     ?assertEqual(<<"42">>, CT2),
 
 
@@ -296,7 +296,7 @@ t_case01_publish_put(_Config) ->
     Reply2 = er_coap_client:request(put, URI2, #coap_content{format = <<"application/octet-stream">>, payload = PubPayload}),
     Reply2 = er_coap_client:request(put, URI2, #coap_content{format = <<"application/octet-stream">>, payload = PubPayload}),
     ?LOGT("Reply =~p", [Reply2]),
     ?LOGT("Reply =~p", [Reply2]),
     {ok,changed, _} = Reply2,
     {ok,changed, _} = Reply2,
-    [{FullTopic, MaxAge, CT2, PubPayload, _TimeStamp1}] = emqx_coap_ps_topics:lookup_topic_info(FullTopic),
+    [{FullTopic, MaxAge, CT2, PubPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
 
 
     assert_recv(FullTopic, PubPayload),
     assert_recv(FullTopic, PubPayload),
 
 
@@ -316,7 +316,7 @@ t_case02_publish_put(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?assertEqual(60, MaxAge),
     ?assertEqual(60, MaxAge),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(<<"42">>, CT),
 
 
@@ -327,7 +327,7 @@ t_case02_publish_put(_Config) ->
     Reply1 = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = NewPayload}),
     Reply1 = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = NewPayload}),
     ?LOGT("Reply =~p", [Reply1]),
     ?LOGT("Reply =~p", [Reply1]),
     {ok,changed, _} = Reply1,
     {ok,changed, _} = Reply1,
-    [{Topic, MaxAge, CT, NewPayload, _TimeStamp1}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, NewPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
 
 
     assert_recv(Topic, NewPayload),
     assert_recv(Topic, NewPayload),
 
 
@@ -347,7 +347,7 @@ t_case03_publish_put(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?assertEqual(60, MaxAge),
     ?assertEqual(60, MaxAge),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(<<"42">>, CT),
 
 
@@ -372,7 +372,7 @@ t_case04_publish_put(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
     ?assertEqual([<<"/ps/topic1">>] ,LocPath),
-    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?assertEqual(5, MaxAge),
     ?assertEqual(5, MaxAge),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(<<"42">>, CT),
 
 
@@ -381,7 +381,7 @@ t_case04_publish_put(_Config) ->
     % but there is one thing to do is we don't count in the publish message received from emqx(from other node).TBD!!!!!!!!!!!!!
     % but there is one thing to do is we don't count in the publish message received from emqx(from other node).TBD!!!!!!!!!!!!!
     %%%%%%%%%%%%%%%%%%%%%%%%%%
     %%%%%%%%%%%%%%%%%%%%%%%%%%
     timer:sleep(6000),
     timer:sleep(6000),
-    ?assertEqual(true, emqx_coap_ps_topics:is_topic_timeout(Topic)),
+    ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)),
 
 
     {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
     {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
 
 
@@ -396,7 +396,7 @@ t_case01_subscribe(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = [LocPath]}} = Reply,
     {ok,created, #coap_content{location_path = [LocPath]}} = Reply,
     ?assertEqual(<<"/ps/topic1">> ,LocPath),
     ?assertEqual(<<"/ps/topic1">> ,LocPath),
-    TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(<<"42">>, CT1),
     ?assertEqual(<<"42">>, CT1),
@@ -439,13 +439,13 @@ t_case02_subscribe(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/a/b">>] ,LocPath),
     ?assertEqual([<<"/ps/a/b">>] ,LocPath),
-    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?assertEqual(5, MaxAge),
     ?assertEqual(5, MaxAge),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(<<"42">>, CT),
 
 
     %% Wait for the max age of the timer expires
     %% Wait for the max age of the timer expires
     timer:sleep(6000),
     timer:sleep(6000),
-    ?assertEqual(true, emqx_coap_ps_topics:is_topic_timeout(Topic)),
+    ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)),
 
 
     %% Subscribe to the timeout topic "a/b", still successfully,got {ok, nocontent} Method
     %% Subscribe to the timeout topic "a/b", still successfully,got {ok, nocontent} Method
     Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
     Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
@@ -458,7 +458,7 @@ t_case02_subscribe(_Config) ->
     %% put to publish to topic "a/b"
     %% put to publish to topic "a/b"
     Reply2 = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
     Reply2 = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
     {ok,changed, #coap_content{}} = Reply2,
     {ok,changed, #coap_content{}} = Reply2,
-    [{Topic, MaxAge1, CT, Payload, TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge1, CT, Payload, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(false, TimeStamp =:= timeout),
     ?assertEqual(false, TimeStamp =:= timeout),
@@ -505,7 +505,7 @@ t_case01_read(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = [LocPath]}} = Reply,
     {ok,created, #coap_content{location_path = [LocPath]}} = Reply,
     ?assertEqual(<<"/ps/topic1">> ,LocPath),
     ?assertEqual(<<"/ps/topic1">> ,LocPath),
-    TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(<<"42">>, CT1),
     ?assertEqual(<<"42">>, CT1),
@@ -530,7 +530,7 @@ t_case02_read(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = [LocPath]}} = Reply,
     {ok,created, #coap_content{location_path = [LocPath]}} = Reply,
     ?assertEqual(<<"/ps/topic1">> ,LocPath),
     ?assertEqual(<<"/ps/topic1">> ,LocPath),
-    TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(<<"42">>, CT1),
     ?assertEqual(<<"42">>, CT1),
@@ -565,7 +565,7 @@ t_case04_read(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = [LocPath]}} = Reply,
     {ok,created, #coap_content{location_path = [LocPath]}} = Reply,
     ?assertEqual(<<"/ps/topic1">> ,LocPath),
     ?assertEqual(<<"/ps/topic1">> ,LocPath),
-    TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?LOGT("lookup topic info=~p", [TopicInfo]),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(60, MaxAge1),
     ?assertEqual(<<"42">>, CT1),
     ?assertEqual(<<"42">>, CT1),
@@ -591,13 +591,13 @@ t_case05_read(_Config) ->
     ?LOGT("Reply =~p", [Reply]),
     ?LOGT("Reply =~p", [Reply]),
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     {ok,created, #coap_content{location_path = LocPath}} = Reply,
     ?assertEqual([<<"/ps/a/b">>] ,LocPath),
     ?assertEqual([<<"/ps/a/b">>] ,LocPath),
-    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_ps_topics:lookup_topic_info(Topic),
+    [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
     ?assertEqual(5, MaxAge),
     ?assertEqual(5, MaxAge),
     ?assertEqual(<<"42">>, CT),
     ?assertEqual(<<"42">>, CT),
 
 
     %% Wait for the max age of the timer expires
     %% Wait for the max age of the timer expires
     timer:sleep(6000),
     timer:sleep(6000),
-    ?assertEqual(true, emqx_coap_ps_topics:is_topic_timeout(Topic)),
+    ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)),
 
 
     %% GET to read the expired publish message, supposed to get {ok, nocontent}, but now got {ok, content}
     %% GET to read the expired publish message, supposed to get {ok, nocontent}, but now got {ok, content}
     Reply1 = er_coap_client:request(get, URI),
     Reply1 = er_coap_client:request(get, URI),
@@ -636,8 +636,8 @@ t_case01_delete(_Config) ->
     ?LOGT("Reply=~p", [Reply1]),
     ?LOGT("Reply=~p", [Reply1]),
     {ok, deleted, #coap_content{}}= ReplyD,
     {ok, deleted, #coap_content{}}= ReplyD,
 
 
-    ?assertEqual(false, emqx_coap_ps_topics:is_topic_existed(TopicInPayload)),
-    ?assertEqual(false, emqx_coap_ps_topics:is_topic_existed(TopicInPayload1)).
+    ?assertEqual(false, emqx_coap_pubsub_topics:is_topic_existed(TopicInPayload)),
+    ?assertEqual(false, emqx_coap_pubsub_topics:is_topic_existed(TopicInPayload1)).
 
 
 t_case02_delete(_Config) ->
 t_case02_delete(_Config) ->
     TopicInPayload = <<"a/b">>,
     TopicInPayload = <<"a/b">>,