zouguihou пре 2 година
родитељ
комит
d1977630b3

+ 5 - 0
pom.xml

@@ -99,6 +99,11 @@
             <artifactId>fastjson</artifactId>
             <version>1.2.33</version>
         </dependency>
+<!--        mqtt-->
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
     </dependencies>
 
     <repositories>  <!-- 配置阿里云镜像仓库 -->

+ 59 - 0
src/main/java/com/welampiot/configuration/MqttConfig.java

@@ -0,0 +1,59 @@
+package com.welampiot.configuration;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Component
+@ConfigurationProperties(prefix = "welampiot.mqtt")
+public class MqttConfig {
+    private String url;
+    private String clientId;
+    private String topics;
+    private String username;
+    private String password;
+    private int timeout;
+    private int keepalive;
+
+    public String getUrl() {
+        return url;
+    }
+    public void setUrl(String url) {
+        this.url = url;
+    }
+    public String getUsername() {
+        return username;
+    }
+    public void setUsername(String username) {
+        this.username = username;
+    }
+    public String getPassword() {
+        return password;
+    }
+    public void setPassword(String password) {
+        this.password = password;
+    }
+    public String getClientId() {
+        return clientId;
+    }
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+    public String getTopics() {
+        return topics;
+    }
+    public void setTopics(String topics) {
+        this.topics = topics;
+    }
+    public int getTimeout() {
+        return timeout;
+    }
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+    public int getKeepalive() {
+        return keepalive;
+    }
+    public void setKeepalive(int keepalive) {
+        this.keepalive = keepalive;
+    }
+}

+ 4 - 4
src/main/java/com/welampiot/controller/PolicyController.java

@@ -18,10 +18,10 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
 import javax.servlet.http.HttpServletRequest;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-import static com.sun.xml.internal.ws.policy.sourcemodel.wspolicy.XmlToken.Policy;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 @RestController
 @CrossOrigin

+ 47 - 0
src/main/java/com/welampiot/handle/MqttHandler.java

@@ -0,0 +1,47 @@
+package com.welampiot.handle;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+@Component
+@Scope("prototype")
+public class MqttHandler implements MqttCallback{
+    private String topic;
+    private String res;
+
+    public String getRes() {
+        return res;
+    }
+
+    public void setRes(String res) {
+        this.res = res;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public void connectionLost(Throwable throwable) {
+        System.out.println("connectionLost---------");
+    }
+
+    @Override
+    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+        if (topic != null && topic.equals(s)){
+            res = new String(mqttMessage.getPayload());
+        }
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+
+    }
+}

+ 224 - 1
src/main/java/com/welampiot/utils/ToolUtils.java

@@ -1,13 +1,19 @@
 package com.welampiot.utils;
 
 import com.welampiot.common.BaseResult;
+import com.welampiot.configuration.MqttConfig;
 import com.welampiot.configuration.ResponseConfig;
 import com.welampiot.dto.GlobalLocationDTO;
 import com.welampiot.dto.SectionDTO;
 import com.welampiot.dto.UserDTO;
+import com.welampiot.handle.MqttHandler;
 import com.welampiot.service.GlobalLocationService;
 import com.welampiot.service.SectionService;
 import com.welampiot.service.UserService;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -25,7 +31,10 @@ public class ToolUtils {
     private GlobalLocationService globalLocationService;
     @Autowired
     private ResponseConfig responseConfig;
-
+    @Autowired
+    private MqttConfig mqttConfig;
+    @Autowired
+    private MqttHandler mqttHandler;
 
     /**
      * 获取用户所有路段 id
@@ -155,4 +164,218 @@ public class ToolUtils {
         }
         return new BaseResult<>(code,msg,obj);
     }
+
+    /**
+     * 发送mqtt指令
+     * @param sendTopic  发送的topic
+     * @param cmdInfo   发送的指令内容
+     * @param resTopic  接收指令返回的topic
+     * @return
+     */
+    public String sendMqttCmd(String sendTopic,Object cmdInfo,String resTopic){
+        MqttClient client;
+        String res = "";
+
+        try {
+            try {
+                client=new MqttClient(mqttConfig.getUrl(),mqttConfig.getClientId()+"_"+(new Date()).getTime(),new MemoryPersistence());
+                MqttConnectOptions options=new MqttConnectOptions();
+                options.setCleanSession(true);
+                options.setUserName(mqttConfig.getUsername());
+                options.setPassword(mqttConfig.getPassword().toCharArray());
+                options.setConnectionTimeout(mqttConfig.getTimeout());
+                options.setKeepAliveInterval(mqttConfig.getKeepalive());
+                mqttHandler.setTopic(resTopic);
+
+                // 设置回调
+                client.setCallback(mqttHandler);
+
+                // 建立连接
+                client.connect(options);
+                //            MqttCustomerClient.setClient(client);
+                client.subscribe(resTopic);
+
+                // 消息发布所需参数
+                MqttMessage message;
+                if (cmdInfo instanceof String){
+                    message = new MqttMessage(((String) cmdInfo).getBytes());
+                }else {
+                    message = new MqttMessage((byte[]) cmdInfo);
+                }
+                message.setQos(0);
+                client.publish(sendTopic, message);
+                int in = 0;
+
+                while (true){
+                    Thread.sleep(1000);
+                    if (mqttHandler.getRes() != null && mqttHandler.getRes().length() != 0){
+                        res = mqttHandler.getRes();
+                        break;
+                    }
+                    in ++;
+                    if (in > 30) break;
+                }
+                client.disconnect();
+            }catch (Exception e){
+                e.printStackTrace();
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+        return res;
+    }
+
+    /**
+     * 发送mqtt指令
+     * @param sendTopic  发送的topic
+     * @param cmdInfo   发送的指令内容
+     * @param resTopic  接收指令返回的topic
+     * @param timeout  指令超时时间
+     * @return
+     */
+    public String sendMqttCmd(String sendTopic,Object cmdInfo,String resTopic,Integer timeout){
+        MqttClient client;
+        String res = "";
+
+        try {
+            try {
+                client=new MqttClient(mqttConfig.getUrl(),mqttConfig.getClientId()+"_"+(new Date()).getTime(),new MemoryPersistence());
+                MqttConnectOptions options=new MqttConnectOptions();
+                options.setCleanSession(true);
+                options.setUserName(mqttConfig.getUsername());
+                options.setPassword(mqttConfig.getPassword().toCharArray());
+                options.setConnectionTimeout(mqttConfig.getTimeout());
+                options.setKeepAliveInterval(mqttConfig.getKeepalive());
+                mqttHandler.setTopic(resTopic);
+
+                // 设置回调
+                client.setCallback(mqttHandler);
+
+                // 建立连接
+                client.connect(options);
+                //            MqttCustomerClient.setClient(client);
+                client.subscribe(resTopic);
+
+                // 消息发布所需参数
+                MqttMessage message;
+                if (cmdInfo instanceof String){
+                    message = new MqttMessage(((String) cmdInfo).getBytes());
+                }else {
+                    message = new MqttMessage((byte[]) cmdInfo);
+                }
+
+                message.setQos(0);
+                client.publish(sendTopic, message);
+                int in = 0;
+                if (timeout > 0){
+                    while (true){
+                        Thread.sleep(1000);
+                        if (mqttHandler.getRes() != null && mqttHandler.getRes().length() != 0){
+                            res = mqttHandler.getRes();
+                            break;
+                        }
+                        in ++;
+                        if (in > timeout) break;
+                    }
+                }
+                client.disconnect();
+            }catch (Exception e){
+                e.printStackTrace();
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+        return res;
+    }
+
+    /**
+     * 字节数组转16进制字符串
+     * @param b 字节数组
+     * @return 16进制字符串
+     * @throws
+     */
+    public static String bytes2HexString(byte[] b) {
+        StringBuffer result = new StringBuffer();
+        String hex;
+        for (int i = 0; i < b.length; i++) {
+            hex = Integer.toHexString(b[i] & 0xFF);
+            if (hex.length() == 1) {
+                hex = '0' + hex;
+            }
+            result.append(hex.toUpperCase());
+        }
+        return result.toString();
+    }
+
+    /**
+     * 16进制字符串转字节数组
+     * @param src 16进制字符串
+     * @return 字节数组
+     * @throws
+     */
+    public byte[] hexString2Bytes(String src) {
+        int l = src.length() / 2;
+        byte[] ret = new byte[l];
+        for (int i = 0; i < l; i++) {
+            ret[i] = (byte) Integer
+                    .valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
+        }
+        return ret;
+    }
+
+    /**
+     * 计算modbus CRC16校验码
+     * @param bytes 需要计算的数据
+     * @return
+     */
+    public String getCRC(byte[] bytes) {
+        // CRC寄存器全为1
+        int CRC = 0x0000ffff;
+        // 多项式校验值
+        int POLYNOMIAL = 0x0000a001;
+        int i, j;
+        for (i = 0; i < bytes.length; i++) {
+            CRC ^= ((int) bytes[i] & 0x000000ff);
+            for (j = 0; j < 8; j++) {
+                if ((CRC & 0x00000001) != 0) {
+                    CRC >>= 1;
+                    CRC ^= POLYNOMIAL;
+                } else {
+                    CRC >>= 1;
+                }
+            }
+        }
+        // 结果转换为16进制
+        String result = Integer.toHexString(CRC).toUpperCase();
+        if (result.length() != 4) {
+            StringBuffer sb = new StringBuffer("0000");
+            result = sb.replace(4 - result.length(), 4, result).toString();
+        }
+        //高位在前地位在后
+        //return result.substring(2, 4) + " " + result.substring(0, 2);
+        // 交换高低位,低位在前高位在后
+        return result.substring(2, 4)+result.substring(0, 2);
+    }
+
+    public Object getRequestContent(HttpServletRequest request,String key){
+        return request.getParameter(key);
+    }
+
+    /**
+     * 获取请求参数
+     * @param request  请求对象
+     * @param key   参数值
+     * @param type  参数类型 1 int 2 string
+     * @return
+     */
+    public Object getRequestContent(HttpServletRequest request,String key,Integer type){
+        if (type == 1){ // 获取 int 数据
+            Integer keyId = request.getParameter(key) == null || request.getParameter(key).length() == 0? 0 : Integer.parseInt(request.getParameter(key));
+            return keyId;
+        }else if (type == 2){
+            String keyStr = request.getParameter(key) == null ? "" : request.getParameter(key).toString();
+            return keyStr;
+        }
+        return request.getParameter(key);
+    }
 }

+ 21 - 0
src/main/resources/prod/application.yml

@@ -21,8 +21,29 @@ spring:
       filters: config
       connect-properties:
         config.decrypt: true
+<<<<<<< HEAD:src/main/resources/prod/application.yml
 #        config.decrypt.key: MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAI5wh9Y/+kW7LK1jgUTygp2wk5uVUucBheqNDeKPAKH/orx1r1GF4mcFgfaBYO/hyVJC7Gk3xf6ypKd2MCrFRXsCAwEAAQ==
         config.decrypt.key: ${spring.datasource.druid.publickey}
+=======
+        config.decrypt.key: MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAI5wh9Y/+kW7LK1jgUTygp2wk5uVUucBheqNDeKPAKH/orx1r1GF4mcFgfaBYO/hyVJC7Gk3xf6ypKd2MCrFRXsCAwEAAQ==
+  #      config.decrypt.key: ${spring.datasource.druid.publickey}
+#  jackson:
+#    serialization:
+#      write-null-map-values: true
+#  gson:
+#    serialize-nulls: true
+
+welampiot:
+  mqtt:
+    url: tcp://139.196.213.241:1883
+    clientId: welampiot_mqtt_client
+    topics: /#
+    username: admin
+    password: admin
+    timeout: 10
+    keepalive: 20
+
+>>>>>>> 88427924a01a77a4971f067c9a1b13afdc761b62:src/main/resources/application-prod.yml
 mybatis:
   mapper-locations: classpath:/mapper/*.xml
   check-config-location: true

+ 11 - 0
src/main/resources/test/application.yml

@@ -29,6 +29,16 @@ spring:
 #      write-null-map-values: true
 #  gson:
 #    serialize-nulls: true
+
+welampiot:
+  mqtt:
+    url: tcp://139.196.213.241:1883
+    clientId: welampiot_mqtt_client
+    topics: /#
+    username: admin
+    password: admin
+    timeout: 10
+    keepalive: 20
 mybatis:
   mapper-locations: classpath:/mapper/*.xml
   check-config-location: true
@@ -43,3 +53,4 @@ logging:
   level:
     root: info
   config: classpath:logback.xml
+