Advertisement
Guest User

Untitled

a guest
Feb 29th, 2016
118
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.65 KB | None | 0 0
  1. package utils;
  2.  
  3. import java.io.IOException;
  4. import java.sql.Timestamp;
  5.  
  6. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  7. import org.eclipse.paho.client.mqttv3.MqttCallback;
  8. import org.eclipse.paho.client.mqttv3.MqttClient;
  9. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  10. import org.eclipse.paho.client.mqttv3.MqttException;
  11. import org.eclipse.paho.client.mqttv3.MqttMessage;
  12. import org.eclipse.paho.client.mqttv3.MqttSecurityException;
  13. import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
  14.  
  15. public class CnMqClientCluster1 implements MqttCallback {
  16.  
  17. public static CnMqClientCluster1 init() {
  18. String protocol = "tcp://";
  19. String broker = "localhost";
  20. int port = 1883;
  21. String clientId = "testClient002";
  22. boolean cleanSession = true;
  23. boolean quietMode = false;
  24. String userName = "username";
  25. String password = "password";
  26. String url = protocol + broker + ":" + port;
  27. int qos = 0;
  28. String topic = "hello/world";
  29.  
  30. try {
  31. // Create an instance of this class
  32. CnMqClientCluster1 sampleClient = new CnMqClientCluster1(url, clientId, cleanSession, quietMode,userName,password);
  33. sampleClient.connect();
  34. sampleClient.subscribe(topic,qos);
  35.  
  36. return sampleClient;
  37. } catch(MqttException me) {
  38. // Display full details of any exception that occurs
  39. System.out.println("reason "+me.getReasonCode());
  40. System.out.println("msg "+me.getMessage());
  41. System.out.println("loc "+me.getLocalizedMessage());
  42. System.out.println("cause "+me.getCause());
  43. System.out.println("excep "+me);
  44. me.printStackTrace();
  45.  
  46. return null;
  47. }
  48. }
  49.  
  50. // Private instance variables
  51. private MqttClient client;
  52. private String brokerUrl;
  53. private boolean quietMode;
  54. private MqttConnectOptions conOpt;
  55. private boolean clean;
  56. private String password;
  57. private String userName;
  58.  
  59. public String getClientId() {
  60. return client.getClientId();
  61. }
  62.  
  63. /**
  64. * Constructs an instance of the sample client wrapper
  65. * @param brokerUrl the url of the server to connect to
  66. * @param clientId the client id to connect with
  67. * @param cleanSession clear state at end of connection or not (durable or non-durable subscriptions)
  68. * @param quietMode whether debug should be printed to standard out
  69. * @param userName the username to connect with
  70. * @param password the password for the user
  71. * @throws MqttException
  72. */
  73. public CnMqClientCluster1(String brokerUrl, String clientId, boolean cleanSession, boolean quietMode, String userName, String password) throws MqttException {
  74. this.brokerUrl = brokerUrl;
  75. this.quietMode = quietMode;
  76. this.clean = cleanSession;
  77. this.password = password;
  78. this.userName = userName;
  79. //This sample stores in a temporary directory... where messages temporarily
  80. // stored until the message has been delivered to the server.
  81. //..a real application ought to store them somewhere
  82. // where they are not likely to get deleted or tampered with
  83. String tmpDir = System.getProperty("java.io.tmpdir");
  84. MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
  85.  
  86. try {
  87. // Construct the connection options object that contains connection parameters
  88. // such as cleanSession and LWT
  89. conOpt = new MqttConnectOptions();
  90. conOpt.setCleanSession(clean);
  91. if(password != null ) {
  92. conOpt.setPassword(this.password.toCharArray());
  93. }
  94. if(userName != null) {
  95. conOpt.setUserName(this.userName);
  96. }
  97.  
  98. // Construct an MQTT blocking mode client
  99. client = new MqttClient(this.brokerUrl,clientId, dataStore);
  100.  
  101. // Set this wrapper as the callback handler
  102. client.setCallback(this);
  103.  
  104. } catch (MqttException e) {
  105. e.printStackTrace();
  106. log("Unable to set up client: "+e.toString());
  107. System.exit(1);
  108. }
  109. }
  110.  
  111. public void publish(String topicName, int qos, byte[] payload) throws MqttException {
  112. String time = new Timestamp(System.currentTimeMillis()).toString();
  113. log("Publishing at: "+time+ " to topic \""+topicName+"\" qos "+qos);
  114.  
  115. // Create and configure a message
  116. MqttMessage message = new MqttMessage(payload);
  117. message.setQos(qos);
  118.  
  119. // Send the message to the server, control is not returned until
  120. // it has been delivered to the server meeting the specified
  121. // quality of service.
  122. // this.connect();
  123. client.publish(topicName, message);
  124. // this.disconnect();
  125. }
  126.  
  127. public void connect() {
  128. int retry = 0;
  129. while (retry < 20) {
  130. try {
  131. client.connect(conOpt);
  132. retry = 21;
  133. log("Connected to "+brokerUrl+" with client ID "+client.getClientId());
  134. } catch (MqttException e) {
  135. System.out.println("failed to reconnect");
  136. retry++;
  137. }
  138. }
  139. }
  140.  
  141. public void subscribe(String topicName, int qos) throws MqttException {
  142. // System.out.println("topicName : "+topicName);
  143. // // Connect to the MQTT server
  144. // client.connect(conOpt);
  145. // log("Connected to "+brokerUrl+" with client ID "+client.getClientId());
  146.  
  147. // Subscribe to the requested topic
  148. // The QoS specified is the maximum level that messages will be sent to the client at.
  149. // For instance if QoS 1 is specified, any messages originally published at QoS 2 will
  150. // be downgraded to 1 when delivering to the client but messages published at 1 and 0
  151. // will be received at the same level they were published at.
  152. log(client.getClientId() + " is subscribing to topic \""+topicName+"\" qos "+qos);
  153. client.subscribe(topicName, qos);
  154. }
  155.  
  156. public void addNewTopic(String topicName, int qos) throws MqttException {
  157. log(client.getClientId() + " is Subscribing to topic \""+topicName+"\" qos "+qos);
  158. client.subscribe(topicName, 0);
  159. }
  160.  
  161. public void unsubscribeTopic(String topicName) throws MqttException {
  162. log("Unsubscribing to topic \""+topicName+"\"");
  163. client.unsubscribe(topicName);
  164. }
  165.  
  166. public void disconnect() throws MqttException {
  167. // Disconnect the client from the server
  168. client.disconnect();
  169. log("Disconnected");
  170. }
  171.  
  172. /**
  173. * Utility method to handle logging. If 'quietMode' is set, this method does nothing
  174. * @param message the message to log
  175. */
  176. private void log(String message) {
  177. if (!quietMode) {
  178. System.out.println(message);
  179. }
  180. }
  181.  
  182. /****************************************************************/
  183. /* Methods to implement the MqttCallback interface */
  184. /****************************************************************/
  185.  
  186. /**
  187. * @see MqttCallback#connectionLost(Throwable)
  188. */
  189. public void connectionLost(Throwable cause) {
  190. // Called when the connection to the server has been lost.
  191. // An application may choose to implement reconnection
  192. // logic at this point. This sample simply exits.
  193. log("Connection to " + brokerUrl + " lost!" + cause);
  194. try {
  195. this.connect();
  196. this.subscribe("hello/world", 0);
  197. } catch (MqttException e) {
  198. log("failed to reconnect");
  199. }
  200. //System.exit(1);
  201. }
  202.  
  203. /**
  204. * @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
  205. */
  206. public void deliveryComplete(IMqttDeliveryToken token) {
  207. // Called when a message has been delivered to the
  208. // server. The token passed in here is the same one
  209. // that was passed to or returned from the original call to publish.
  210. // This allows applications to perform asynchronous
  211. // delivery without blocking until delivery completes.
  212. //
  213. // This sample demonstrates asynchronous deliver and
  214. // uses the token.waitForCompletion() call in the main thread which
  215. // blocks until the delivery has completed.
  216. // Additionally the deliveryComplete method will be called if
  217. // the callback is set on the client
  218. //
  219. // If the connection to the server breaks before delivery has completed
  220. // delivery of a message will complete after the client has re-connected.
  221. // The getPendingTokens method will provide tokens for any messages
  222. // that are still to be delivered.
  223. }
  224.  
  225. /**
  226. * @see MqttCallback#messageArrived(String, MqttMessage)
  227. */
  228. public void messageArrived(String topic, MqttMessage message) throws MqttException {
  229. // Called when a message arrives from the server that matches any
  230. // subscription made by the client
  231. String time = new Timestamp(System.currentTimeMillis()).toString();
  232. System.out.println(client.getClientId() + ":\t" + "Time:\t" +time +
  233. " Topic:\t" + topic +
  234. " Message:\t" + new String(message.getPayload()) +
  235. " QoS:\t" + message.getQos());
  236. }
  237.  
  238. /****************************************************************/
  239. /* End of MqttCallback methods */
  240. /****************************************************************/
  241.  
  242. static void printHelp() {
  243. System.out.println(
  244. "Syntax:\n\n" +
  245. " Sample [-h] [-a publish|subscribe] [-t <topic>] [-m <message text>]\n" +
  246. " [-s 0|1|2] -b <hostname|IP address>] [-p <brokerport>] [-i <clientID>]\n\n" +
  247. " -h Print this help text and quit\n" +
  248. " -q Quiet mode (default is false)\n" +
  249. " -a Perform the relevant action (default is publish)\n" +
  250. " -t Publish/subscribe to <topic> instead of the default\n" +
  251. " (publish: \"Sample/Java/v3\", subscribe: \"Sample/#\")\n" +
  252. " -m Use <message text> instead of the default\n" +
  253. " (\"Message from MQTTv3 Java client\")\n" +
  254. " -s Use this QoS instead of the default (2)\n" +
  255. " -b Use this name/IP address instead of the default (m2m.eclipse.org)\n" +
  256. " -p Use this port instead of the default (1883)\n\n" +
  257. " -i Use this client ID instead of SampleJavaV3_<action>\n" +
  258. " -c Connect to the server with a clean session (default is false)\n" +
  259. " \n\n Security Options \n" +
  260. " -u Username \n" +
  261. " -z Password \n" +
  262. " \n\n SSL Options \n" +
  263. " -v SSL enabled; true - (default is false) " +
  264. " -k Use this JKS format key store to verify the client\n" +
  265. " -w Passpharse to verify certificates in the keys store\n" +
  266. " -r Use this JKS format keystore to verify the server\n" +
  267. " If javax.net.ssl properties have been set only the -v flag needs to be set\n" +
  268. "Delimit strings containing spaces with \"\"\n\n" +
  269. "Publishers transmit a single message then disconnect from the server.\n" +
  270. "Subscribers remain connected to the server and receive appropriate\n" +
  271. "messages until <enter> is pressed.\n\n"
  272. );
  273. }
  274.  
  275. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement