|
|
@@ -22,7 +22,7 @@
|
|
|
-include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl").
|
|
|
|
|
|
%% API
|
|
|
--export([new/0, init/3, update/3, reregister/3, on_close/1]).
|
|
|
+-export([new/0, init/4, update/3, reregister/3, on_close/1]).
|
|
|
|
|
|
-export([ info/1
|
|
|
, info/2
|
|
|
@@ -47,9 +47,10 @@
|
|
|
, wait_ack :: request_context() | undefined
|
|
|
, endpoint_name :: binary() | undefined
|
|
|
, location_path :: list(binary()) | undefined
|
|
|
- , headers :: map() | undefined
|
|
|
, reg_info :: map() | undefined
|
|
|
, lifetime :: non_neg_integer() | undefined
|
|
|
+ , is_cache_mode :: boolean()
|
|
|
+ , mountpoint :: binary()
|
|
|
, last_active_at :: non_neg_integer()
|
|
|
}).
|
|
|
|
|
|
@@ -61,7 +62,7 @@
|
|
|
<<"7">>, <<"9">>, <<"15">>]).
|
|
|
|
|
|
%% uplink and downlink topic configuration
|
|
|
--define(lwm2m_up_dm_topic, {<<"v1/up/dm">>, 0}).
|
|
|
+-define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}).
|
|
|
|
|
|
%% steal from emqx_session
|
|
|
-define(INFO_KEYS, [subscriptions,
|
|
|
@@ -95,41 +96,44 @@ new() ->
|
|
|
#session{ coap = emqx_coap_tm:new()
|
|
|
, queue = queue:new()
|
|
|
, last_active_at = ?NOW
|
|
|
+ , is_cache_mode = false
|
|
|
+ , mountpoint = <<>>
|
|
|
, lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
|
|
|
|
|
|
--spec init(emqx_coap_message(), function(), session()) -> map().
|
|
|
-init(#coap_message{options = Opts, payload = Payload} = Msg, Validator, Session) ->
|
|
|
+-spec init(emqx_coap_message(), binary(), function(), session()) -> map().
|
|
|
+init(#coap_message{options = Opts,
|
|
|
+ payload = Payload} = Msg, MountPoint, WithContext, Session) ->
|
|
|
Query = maps:get(uri_query, Opts),
|
|
|
RegInfo = append_object_list(Query, Payload),
|
|
|
- Headers = get_headers(RegInfo),
|
|
|
LifeTime = get_lifetime(RegInfo),
|
|
|
Epn = maps:get(<<"ep">>, Query),
|
|
|
Location = [?PREFIX, Epn],
|
|
|
|
|
|
- Result = return(register_init(Validator,
|
|
|
- Session#session{headers = Headers,
|
|
|
- endpoint_name = Epn,
|
|
|
- location_path = Location,
|
|
|
- reg_info = RegInfo,
|
|
|
- lifetime = LifeTime,
|
|
|
- queue = queue:new()})),
|
|
|
+ NewSession = Session#session{endpoint_name = Epn,
|
|
|
+ location_path = Location,
|
|
|
+ reg_info = RegInfo,
|
|
|
+ lifetime = LifeTime,
|
|
|
+ mountpoint = MountPoint,
|
|
|
+ is_cache_mode = is_psm(RegInfo) orelse is_qmode(RegInfo),
|
|
|
+ queue = queue:new()},
|
|
|
|
|
|
+ Result = return(register_init(WithContext, NewSession)),
|
|
|
Reply = emqx_coap_message:piggyback({ok, created}, Msg),
|
|
|
Reply2 = emqx_coap_message:set(location_path, Location, Reply),
|
|
|
reply(Reply2, Result#{lifetime => true}).
|
|
|
|
|
|
-reregister(Msg, Validator, Session) ->
|
|
|
- update(Msg, Validator, <<"register">>, Session).
|
|
|
+reregister(Msg, WithContext, Session) ->
|
|
|
+ update(Msg, WithContext, <<"register">>, Session).
|
|
|
|
|
|
-update(Msg, Validator, Session) ->
|
|
|
- update(Msg, Validator, <<"update">>, Session).
|
|
|
+update(Msg, WithContext, Session) ->
|
|
|
+ update(Msg, WithContext, <<"update">>, Session).
|
|
|
|
|
|
--spec on_close(session()) -> ok.
|
|
|
-on_close(#session{endpoint_name = Epn}) ->
|
|
|
+-spec on_close(session()) -> binary().
|
|
|
+on_close(Session) ->
|
|
|
#{topic := Topic} = downlink_topic(),
|
|
|
- MountedTopic = mount(Topic, mountpoint(Epn)),
|
|
|
+ MountedTopic = mount(Topic, Session),
|
|
|
emqx:unsubscribe(MountedTopic),
|
|
|
- ok.
|
|
|
+ MountedTopic.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Info, Stats
|
|
|
@@ -194,15 +198,15 @@ stats(Session) -> info(?STATS_KEYS, Session).
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% API
|
|
|
%%--------------------------------------------------------------------
|
|
|
-handle_coap_in(Msg, _Validator, Session) ->
|
|
|
+handle_coap_in(Msg, _WithContext, Session) ->
|
|
|
call_coap(case emqx_coap_message:is_request(Msg) of
|
|
|
true -> handle_request;
|
|
|
_ -> handle_response
|
|
|
end,
|
|
|
Msg, Session#session{last_active_at = ?NOW}).
|
|
|
|
|
|
-handle_deliver(Delivers, _Validator, Session) ->
|
|
|
- return(deliver(Delivers, Session)).
|
|
|
+handle_deliver(Delivers, WithContext, Session) ->
|
|
|
+ return(deliver(Delivers, WithContext, Session)).
|
|
|
|
|
|
timeout({transport, Msg}, _, Session) ->
|
|
|
call_coap(timeout, Msg, Session).
|
|
|
@@ -214,17 +218,17 @@ set_reply(Msg, #session{coap = Coap} = Session) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Protocol Stack
|
|
|
%%--------------------------------------------------------------------
|
|
|
-handle_protocol_in({response, CtxMsg}, Validator, Session) ->
|
|
|
- return(handle_coap_response(CtxMsg, Validator, Session));
|
|
|
+handle_protocol_in({response, CtxMsg}, WithContext, Session) ->
|
|
|
+ return(handle_coap_response(CtxMsg, WithContext, Session));
|
|
|
|
|
|
-handle_protocol_in({ack, CtxMsg}, Validator, Session) ->
|
|
|
- return(handle_ack(CtxMsg, Validator, Session));
|
|
|
+handle_protocol_in({ack, CtxMsg}, WithContext, Session) ->
|
|
|
+ return(handle_ack(CtxMsg, WithContext, Session));
|
|
|
|
|
|
-handle_protocol_in({ack_failure, CtxMsg}, Validator, Session) ->
|
|
|
- return(handle_ack_failure(CtxMsg, Validator, Session));
|
|
|
+handle_protocol_in({ack_failure, CtxMsg}, WithContext, Session) ->
|
|
|
+ return(handle_ack_failure(CtxMsg, WithContext, Session));
|
|
|
|
|
|
-handle_protocol_in({reset, CtxMsg}, Validator, Session) ->
|
|
|
- return(handle_ack_reset(CtxMsg, Validator, Session)).
|
|
|
+handle_protocol_in({reset, CtxMsg}, WithContext, Session) ->
|
|
|
+ return(handle_ack_reset(CtxMsg, WithContext, Session)).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Register
|
|
|
@@ -302,50 +306,6 @@ delink(Str) ->
|
|
|
Ltrim = binary_util:ltrim(Str, $<),
|
|
|
binary_util:rtrim(Ltrim, $>).
|
|
|
|
|
|
-get_headers(RegInfo) ->
|
|
|
- lists:foldl(fun(K, Acc) ->
|
|
|
- get_header(K, RegInfo, Acc)
|
|
|
- end,
|
|
|
- extract_module_params(RegInfo),
|
|
|
- [<<"apn">>, <<"im">>, <<"ct">>, <<"mv">>, <<"mt">>]).
|
|
|
-
|
|
|
-get_header(Key, RegInfo, Headers) ->
|
|
|
- case maps:get(Key, RegInfo, undefined) of
|
|
|
- undefined ->
|
|
|
- Headers;
|
|
|
- Val ->
|
|
|
- AtomKey = erlang:binary_to_atom(Key),
|
|
|
- Headers#{AtomKey => Val}
|
|
|
- end.
|
|
|
-
|
|
|
-extract_module_params(RegInfo) ->
|
|
|
- Keys = [<<"module">>, <<"sv">>, <<"chip">>, <<"imsi">>, <<"iccid">>],
|
|
|
- case lists:any(fun(K) -> maps:get(K, RegInfo, undefined) =:= undefined end, Keys) of
|
|
|
- true -> #{module_params => undefined};
|
|
|
- false ->
|
|
|
- Extras = [<<"rsrp">>, <<"sinr">>, <<"txpower">>, <<"cellid">>],
|
|
|
- case lists:any(fun(K) -> maps:get(K, RegInfo, undefined) =:= undefined end, Extras) of
|
|
|
- true ->
|
|
|
- #{module_params =>
|
|
|
- #{module => maps:get(<<"module">>, RegInfo),
|
|
|
- softversion => maps:get(<<"sv">>, RegInfo),
|
|
|
- chiptype => maps:get(<<"chip">>, RegInfo),
|
|
|
- imsi => maps:get(<<"imsi">>, RegInfo),
|
|
|
- iccid => maps:get(<<"iccid">>, RegInfo)}};
|
|
|
- false ->
|
|
|
- #{module_params =>
|
|
|
- #{module => maps:get(<<"module">>, RegInfo),
|
|
|
- softversion => maps:get(<<"sv">>, RegInfo),
|
|
|
- chiptype => maps:get(<<"chip">>, RegInfo),
|
|
|
- imsi => maps:get(<<"imsi">>, RegInfo),
|
|
|
- iccid => maps:get(<<"iccid">>, RegInfo),
|
|
|
- rsrp => maps:get(<<"rsrp">>, RegInfo),
|
|
|
- sinr => maps:get(<<"sinr">>, RegInfo),
|
|
|
- txpower => maps:get(<<"txpower">>, RegInfo),
|
|
|
- cellid => maps:get(<<"cellid">>, RegInfo)}}
|
|
|
- end
|
|
|
- end.
|
|
|
-
|
|
|
get_lifetime(#{<<"lt">> := LT}) ->
|
|
|
case LT of
|
|
|
0 -> emqx:get_config([gateway, lwm2m, lifetime_max]);
|
|
|
@@ -362,7 +322,7 @@ get_lifetime(_, OldRegInfo) ->
|
|
|
|
|
|
-spec update(emqx_coap_message(), function(), binary(), session()) -> map().
|
|
|
update(#coap_message{options = Opts, payload = Payload} = Msg,
|
|
|
- Validator,
|
|
|
+ WithContext,
|
|
|
CmdType,
|
|
|
#session{reg_info = OldRegInfo} = Session) ->
|
|
|
Query = maps:get(uri_query, Opts),
|
|
|
@@ -370,58 +330,51 @@ update(#coap_message{options = Opts, payload = Payload} = Msg,
|
|
|
UpdateRegInfo = maps:merge(OldRegInfo, RegInfo),
|
|
|
LifeTime = get_lifetime(UpdateRegInfo, OldRegInfo),
|
|
|
|
|
|
- Session2 = proto_subscribe(Validator,
|
|
|
- Session#session{reg_info = UpdateRegInfo,
|
|
|
- lifetime = LifeTime}),
|
|
|
+ NewSession = Session#session{reg_info = UpdateRegInfo,
|
|
|
+ is_cache_mode =
|
|
|
+ is_psm(UpdateRegInfo) orelse is_qmode(UpdateRegInfo),
|
|
|
+ lifetime = LifeTime},
|
|
|
+
|
|
|
+ Session2 = proto_subscribe(WithContext, NewSession),
|
|
|
Session3 = send_dl_msg(Session2),
|
|
|
RegPayload = #{<<"data">> => UpdateRegInfo},
|
|
|
- Session4 = send_to_mqtt(#{}, CmdType, RegPayload, Validator, Session3),
|
|
|
+ Session4 = send_to_mqtt(#{}, CmdType, RegPayload, WithContext, Session3),
|
|
|
|
|
|
Result = return(Session4),
|
|
|
|
|
|
Reply = emqx_coap_message:piggyback({ok, changed}, Msg),
|
|
|
reply(Reply, Result#{lifetime => true}).
|
|
|
|
|
|
-register_init(Validator, #session{reg_info = RegInfo,
|
|
|
- endpoint_name = Epn} = Session) ->
|
|
|
-
|
|
|
+register_init(WithContext, #session{reg_info = RegInfo} = Session) ->
|
|
|
Session2 = send_auto_observe(RegInfo, Session),
|
|
|
%% - subscribe to the downlink_topic and wait for commands
|
|
|
#{topic := Topic, qos := Qos} = downlink_topic(),
|
|
|
- MountedTopic = mount(Topic, mountpoint(Epn)),
|
|
|
- Session3 = subscribe(MountedTopic, Qos, Validator, Session2),
|
|
|
+ MountedTopic = mount(Topic, Session),
|
|
|
+ Session3 = subscribe(MountedTopic, Qos, WithContext, Session2),
|
|
|
Session4 = send_dl_msg(Session3),
|
|
|
|
|
|
%% - report the registration info
|
|
|
RegPayload = #{<<"data">> => RegInfo},
|
|
|
- send_to_mqtt(#{}, <<"register">>, RegPayload, Validator, Session4).
|
|
|
+ send_to_mqtt(#{}, <<"register">>, RegPayload, WithContext, Session4).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Subscribe
|
|
|
%%--------------------------------------------------------------------
|
|
|
-proto_subscribe(Validator, #session{endpoint_name = Epn, wait_ack = WaitAck} = Session) ->
|
|
|
+proto_subscribe(WithContext, #session{wait_ack = WaitAck} = Session) ->
|
|
|
#{topic := Topic, qos := Qos} = downlink_topic(),
|
|
|
- MountedTopic = mount(Topic, mountpoint(Epn)),
|
|
|
+ MountedTopic = mount(Topic, Session),
|
|
|
Session2 = case WaitAck of
|
|
|
undefined ->
|
|
|
Session;
|
|
|
Ctx ->
|
|
|
MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, <<"coap_timeout">>),
|
|
|
- send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, Validator, Session)
|
|
|
+ send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, WithContext, Session)
|
|
|
end,
|
|
|
- subscribe(MountedTopic, Qos, Validator, Session2).
|
|
|
-
|
|
|
-subscribe(Topic, Qos, Validator,
|
|
|
- #session{headers = Headers, endpoint_name = EndpointName} = Session) ->
|
|
|
- case Validator(subscribe, Topic) of
|
|
|
- allow ->
|
|
|
- ClientId = maps:get(device_id, Headers, undefined),
|
|
|
- Opts = get_sub_opts(Qos),
|
|
|
- ?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, EndpointName]),
|
|
|
- emqx:subscribe(Topic, ClientId, Opts);
|
|
|
- _ ->
|
|
|
- ?LOG(error, "Topic: ~0p not allow to subscribe", [Topic])
|
|
|
- end,
|
|
|
+ subscribe(MountedTopic, Qos, WithContext, Session2).
|
|
|
+
|
|
|
+subscribe(Topic, Qos, WithContext, Session) ->
|
|
|
+ Opts = get_sub_opts(Qos),
|
|
|
+ WithContext(subscribe, [Topic, Opts]),
|
|
|
Session.
|
|
|
|
|
|
send_auto_observe(RegInfo, Session) ->
|
|
|
@@ -486,7 +439,7 @@ handle_coap_response({Ctx = #{<<"msgType">> := EventType},
|
|
|
type = CoapMsgType,
|
|
|
payload = CoapMsgPayload,
|
|
|
options = CoapMsgOpts}},
|
|
|
- Validator,
|
|
|
+ WithContext,
|
|
|
Session) ->
|
|
|
MqttPayload = emqx_lwm2m_cmd:coap_to_mqtt(CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Ctx),
|
|
|
{ReqPath, _} = emqx_lwm2m_cmd:path_list(emqx_lwm2m_cmd:extract_path(Ctx)),
|
|
|
@@ -495,46 +448,43 @@ handle_coap_response({Ctx = #{<<"msgType">> := EventType},
|
|
|
{[<<"5">>| _], _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack ->
|
|
|
%% this is a notification for status update during NB firmware upgrade.
|
|
|
%% need to reply to DM http callbacks
|
|
|
- send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, Validator, Session);
|
|
|
+ send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, WithContext, Session);
|
|
|
{_ReqPath, _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack ->
|
|
|
%% this is actually a notification, correct the msgType
|
|
|
- send_to_mqtt(Ctx, <<"notify">>, MqttPayload, Validator, Session);
|
|
|
+ send_to_mqtt(Ctx, <<"notify">>, MqttPayload, WithContext, Session);
|
|
|
_ ->
|
|
|
- send_to_mqtt(Ctx, EventType, MqttPayload, Validator, Session)
|
|
|
+ send_to_mqtt(Ctx, EventType, MqttPayload, WithContext, Session)
|
|
|
end,
|
|
|
send_dl_msg(Ctx, Session2).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Ack
|
|
|
%%--------------------------------------------------------------------
|
|
|
-handle_ack({Ctx, _}, Validator, Session) ->
|
|
|
+handle_ack({Ctx, _}, WithContext, Session) ->
|
|
|
Session2 = send_dl_msg(Ctx, Session),
|
|
|
MqttPayload = emqx_lwm2m_cmd:empty_ack_to_mqtt(Ctx),
|
|
|
- send_to_mqtt(Ctx, <<"ack">>, MqttPayload, Validator, Session2).
|
|
|
+ send_to_mqtt(Ctx, <<"ack">>, MqttPayload, WithContext, Session2).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Ack Failure(Timeout/Reset)
|
|
|
%%--------------------------------------------------------------------
|
|
|
-handle_ack_failure({Ctx, _}, Validator, Session) ->
|
|
|
- handle_ack_failure(Ctx, <<"coap_timeout">>, Validator, Session).
|
|
|
+handle_ack_failure({Ctx, _}, WithContext, Session) ->
|
|
|
+ handle_ack_failure(Ctx, <<"coap_timeout">>, WithContext, Session).
|
|
|
|
|
|
-handle_ack_reset({Ctx, _}, Validator, Session) ->
|
|
|
- handle_ack_failure(Ctx, <<"coap_reset">>, Validator, Session).
|
|
|
+handle_ack_reset({Ctx, _}, WithContext, Session) ->
|
|
|
+ handle_ack_failure(Ctx, <<"coap_reset">>, WithContext, Session).
|
|
|
|
|
|
-handle_ack_failure(Ctx, MsgType, Validator, Session) ->
|
|
|
+handle_ack_failure(Ctx, MsgType, WithContext, Session) ->
|
|
|
Session2 = may_send_dl_msg(coap_timeout, Ctx, Session),
|
|
|
MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, MsgType),
|
|
|
- send_to_mqtt(Ctx, MsgType, MqttPayload, Validator, Session2).
|
|
|
+ send_to_mqtt(Ctx, MsgType, MqttPayload, WithContext, Session2).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Send To CoAP
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-may_send_dl_msg(coap_timeout, Ctx, #session{headers = Headers,
|
|
|
- reg_info = RegInfo,
|
|
|
- wait_ack = WaitAck} = Session) ->
|
|
|
- Lwm2mMode = maps:get(lwm2m_model, Headers, undefined),
|
|
|
- case is_cache_mode(Lwm2mMode, RegInfo, Session) of
|
|
|
+may_send_dl_msg(coap_timeout, Ctx, #session{wait_ack = WaitAck} = Session) ->
|
|
|
+ case is_cache_mode(Session) of
|
|
|
false -> send_dl_msg(Ctx, Session);
|
|
|
true ->
|
|
|
case WaitAck of
|
|
|
@@ -545,14 +495,11 @@ may_send_dl_msg(coap_timeout, Ctx, #session{headers = Headers,
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
-is_cache_mode(Lwm2mMode, RegInfo, #session{last_active_at = LastActiveAt}) ->
|
|
|
- case Lwm2mMode =:= psm orelse is_psm(RegInfo) orelse is_qmode(RegInfo) of
|
|
|
- true ->
|
|
|
- QModeTimeWind = emqx:get_config([gateway, lwm2m, qmode_time_window]),
|
|
|
- Now = ?NOW,
|
|
|
- (Now - LastActiveAt) >= QModeTimeWind;
|
|
|
- false -> false
|
|
|
- end.
|
|
|
+is_cache_mode(#session{is_cache_mode = IsCacheMode,
|
|
|
+ last_active_at = LastActiveAt}) ->
|
|
|
+ IsCacheMode andalso
|
|
|
+ ((?NOW - LastActiveAt) >=
|
|
|
+ emqx:get_config([gateway, lwm2m, qmode_time_window])).
|
|
|
|
|
|
is_psm(#{<<"apn">> := APN}) when APN =:= <<"Ctnb">>;
|
|
|
APN =:= <<"psmA.eDRX0.ctnb">>;
|
|
|
@@ -611,54 +558,27 @@ send_msg_not_waiting_ack(Ctx, Req, Session) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Send To MQTT
|
|
|
%%--------------------------------------------------------------------
|
|
|
-send_to_mqtt(Ref, EventType, Payload, Validator, Session = #session{headers = Headers}) ->
|
|
|
+send_to_mqtt(Ref, EventType, Payload, WithContext, Session) ->
|
|
|
#{topic := Topic, qos := Qos} = uplink_topic(EventType),
|
|
|
- NHeaders = extract_ext_flags(Headers),
|
|
|
Mheaders = maps:get(mheaders, Ref, #{}),
|
|
|
- NHeaders1 = maps:merge(NHeaders, Mheaders),
|
|
|
- proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, NHeaders1, Validator, Session).
|
|
|
+ proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, Mheaders, WithContext, Session).
|
|
|
|
|
|
send_to_mqtt(Ctx, EventType, Payload, {Topic, Qos},
|
|
|
- Validator, #session{headers = Headers} = Session) ->
|
|
|
+ WithContext, Session) ->
|
|
|
Mheaders = maps:get(mheaders, Ctx, #{}),
|
|
|
- NHeaders = extract_ext_flags(Headers),
|
|
|
- NHeaders1 = maps:merge(NHeaders, Mheaders),
|
|
|
- proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, NHeaders1, Validator, Session).
|
|
|
+ proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, Mheaders, WithContext, Session).
|
|
|
|
|
|
-proto_publish(Topic, Payload, Qos, Headers, Validator,
|
|
|
+proto_publish(Topic, Payload, Qos, Headers, WithContext,
|
|
|
#session{endpoint_name = Epn} = Session) ->
|
|
|
- MountedTopic = mount(Topic, mountpoint(Epn)),
|
|
|
- _ = case Validator(publish, MountedTopic) of
|
|
|
- allow ->
|
|
|
- Msg = emqx_message:make(Epn, Qos, MountedTopic,
|
|
|
- emqx_json:encode(Payload), #{}, Headers),
|
|
|
- emqx:publish(Msg);
|
|
|
- _ ->
|
|
|
- ?LOG(error, "topic:~p not allow to publish ", [MountedTopic])
|
|
|
- end,
|
|
|
+ MountedTopic = mount(Topic, Session),
|
|
|
+ Msg = emqx_message:make(Epn, Qos, MountedTopic,
|
|
|
+ emqx_json:encode(Payload), #{}, Headers),
|
|
|
+ WithContext(publish, [MountedTopic, Msg]),
|
|
|
Session.
|
|
|
|
|
|
-mountpoint(Epn) ->
|
|
|
- Prefix = emqx:get_config([gateway, lwm2m, mountpoint]),
|
|
|
- <<Prefix/binary, "/", Epn/binary, "/">>.
|
|
|
-
|
|
|
-mount(Topic, MountPoint) when is_binary(Topic), is_binary(MountPoint) ->
|
|
|
+mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) ->
|
|
|
<<MountPoint/binary, Topic/binary>>.
|
|
|
|
|
|
-extract_ext_flags(Headers) ->
|
|
|
- Header0 = #{is_tr => maps:get(is_tr, Headers, true)},
|
|
|
- check(Header0, Headers, [sota_type, appId, nbgwFlag]).
|
|
|
-
|
|
|
-check(Params, _Headers, []) -> Params;
|
|
|
-check(Params, Headers, [Key | Rest]) ->
|
|
|
- case maps:get(Key, Headers, null) of
|
|
|
- V when V == undefined; V == null ->
|
|
|
- check(Params, Headers, Rest);
|
|
|
- Value ->
|
|
|
- Params1 = Params#{Key => Value},
|
|
|
- check(Params1, Headers, Rest)
|
|
|
- end.
|
|
|
-
|
|
|
downlink_topic() ->
|
|
|
emqx:get_config([gateway, lwm2m, translators, command]).
|
|
|
|
|
|
@@ -678,29 +598,30 @@ uplink_topic(_) ->
|
|
|
%% Deliver
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-deliver(Delivers, #session{headers = Headers, reg_info = RegInfo} = Session) ->
|
|
|
- Lwm2mMode = maps:get(lwm2m_model, Headers, undefined),
|
|
|
- IsCacheMode = is_cache_mode(Lwm2mMode, RegInfo, Session),
|
|
|
+deliver(Delivers, WithContext, #session{reg_info = RegInfo} = Session) ->
|
|
|
+ IsCacheMode = is_cache_mode(Session),
|
|
|
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
|
|
lists:foldl(fun({deliver, _, MQTT}, Acc) ->
|
|
|
deliver_to_coap(AlternatePath,
|
|
|
- MQTT#message.payload, MQTT, IsCacheMode, Acc)
|
|
|
+ MQTT#message.payload, MQTT, IsCacheMode, WithContext, Acc)
|
|
|
end,
|
|
|
Session,
|
|
|
Delivers).
|
|
|
|
|
|
-deliver_to_coap(AlternatePath, JsonData, MQTT, CacheMode, Session) when is_binary(JsonData)->
|
|
|
+deliver_to_coap(AlternatePath, JsonData, MQTT, CacheMode, WithContext, Session) when is_binary(JsonData)->
|
|
|
try
|
|
|
TermData = emqx_json:decode(JsonData, [return_maps]),
|
|
|
- deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, Session)
|
|
|
+ deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session)
|
|
|
catch
|
|
|
ExClass:Error:ST ->
|
|
|
?LOG(error, "deliver_to_coap - Invalid JSON: ~0p, Exception: ~0p, stacktrace: ~0p",
|
|
|
[JsonData, {ExClass, Error}, ST]),
|
|
|
+ WithContext(metrics, 'delivery.dropped'),
|
|
|
Session
|
|
|
end;
|
|
|
|
|
|
-deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, Session) when is_map(TermData) ->
|
|
|
+deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session) when is_map(TermData) ->
|
|
|
+ WithContext(metrics, 'messages.delivered'),
|
|
|
{Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
|
|
|
ExpiryTime = get_expiry_time(MQTT),
|
|
|
maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, Session).
|