Main.java 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import java.io.*;
  2. import java.util.*;
  3. import com.erlport.erlang.term.*;
  4. import com.erlport.*;
  5. class State implements Serializable {
  6. Integer times;
  7. public State() {
  8. times = 0;
  9. }
  10. public Integer incr() {
  11. times += 1;
  12. return times;
  13. }
  14. @Override
  15. public String toString() {
  16. return String.format("State(times: %d)", times);
  17. }
  18. }
  19. public class Main {
  20. static Integer OK = 0;
  21. static Integer ERROR = 0;
  22. //-------------------
  23. // Connection level
  24. public static Object init(Object conn, Object connInfo) {
  25. System.err.printf("[java] established a conn=%s, connInfo=%s\n", conn, connInfo);
  26. // set an instance to be the connection state
  27. // it just a example structure to record the callback total times
  28. Object state = new State();
  29. // subscribe the topic `t/dn` with qos0
  30. subscribe(conn, new Binary("t/dn"), 0);
  31. // return the initial conn's state
  32. return Tuple.two(OK, state);
  33. }
  34. public static Object received(Object conn, Object data, Object state) {
  35. System.err.printf("[java] received data conn=%s, data=%s, state=%s\n", conn, data, state);
  36. // echo the conn's data
  37. send(conn, data);
  38. // return the new conn's state
  39. State nstate = (State) state;
  40. nstate.incr();
  41. return Tuple.two(OK, nstate);
  42. }
  43. public static void terminated(Object conn, Object reason, Object state) {
  44. System.err.printf("[java] terminated conn=%s, reason=%s, state=%s\n", conn, reason, state);
  45. return;
  46. }
  47. //-----------------------
  48. // Protocol/Session level
  49. public static Object deliver(Object conn, Object msgs0, Object state) {
  50. System.err.printf("[java] received messages conn=%s, msgs=%s, state=%s\n", conn, msgs0, state);
  51. List<Object> msgs = (List<Object>) msgs0;
  52. for(Object msg: msgs) {
  53. publish(conn, msg);
  54. }
  55. // return the new conn's state
  56. State nstate = (State) state;
  57. nstate.incr();
  58. return Tuple.two(OK, nstate);
  59. }
  60. //-----------------------
  61. // APIs
  62. public static void send(Object conn, Object data) {
  63. try {
  64. Erlang.call("emqx_exproto", "send", new Object[]{conn, data}, 5000);
  65. } catch (Exception e) {
  66. System.err.printf("[java] send data error: %s\n", e);
  67. }
  68. return;
  69. }
  70. public static void close(Object conn) {
  71. try {
  72. Erlang.call("emqx_exproto", "close", new Object[]{conn}, 5000);
  73. } catch (Exception e) {
  74. System.err.printf("[java] send data error: %s\n", e);
  75. }
  76. return;
  77. }
  78. public static void register(Object conn, Object clientInfo) {
  79. try {
  80. Erlang.call("emqx_exproto", "register", new Object[]{conn, clientInfo}, 5000);
  81. } catch (Exception e) {
  82. System.err.printf("[java] send data error: %s\n", e);
  83. }
  84. return;
  85. }
  86. public static void publish(Object conn, Object message) {
  87. try {
  88. Erlang.call("emqx_exproto", "publish", new Object[]{conn, message}, 5000);
  89. } catch (Exception e) {
  90. System.err.printf("[java] send data error: %s\n", e);
  91. }
  92. return;
  93. }
  94. public static void subscribe(Object conn, Object topic, Object qos) {
  95. try {
  96. Erlang.call("emqx_exproto", "subscribe", new Object[]{conn, topic, qos}, 5000);
  97. } catch (Exception e) {
  98. System.err.printf("[java] send data error: %s\n", e);
  99. }
  100. return;
  101. }
  102. public static void unsubscribe(Object conn, Object topic) {
  103. try {
  104. Erlang.call("emqx_exproto", "unsubscribe", new Object[]{conn, topic}, 5000);
  105. } catch (Exception e) {
  106. System.err.printf("[java] send data error: %s\n", e);
  107. }
  108. return;
  109. }
  110. }