Advertisement
Guest User

Untitled

a guest
Apr 18th, 2019
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.13 KB | None | 0 0
  1. package com.dikahastanto.test;
  2.  
  3.  
  4. import android.os.Bundle;
  5. import android.os.Handler;
  6. import android.os.Message;
  7. import android.os.StrictMode;
  8. import android.provider.Settings;
  9. import android.util.Log;
  10.  
  11. import com.rabbitmq.client.*;
  12. import com.rabbitmq.client.QueueingConsumer;
  13.  
  14. import java.io.IOException;
  15. import java.net.URISyntaxException;
  16. import java.nio.charset.StandardCharsets;
  17. import java.security.KeyManagementException;
  18. import java.security.NoSuchAlgorithmException;
  19. import java.util.concurrent.TimeoutException;
  20.  
  21. public class RMQ {
  22.  
  23. // Load Global Configuration
  24. Settings settings = new Settings();
  25.  
  26. // Setup RMQ Configuration (Ganti dengan setting akun RMQ nya)
  27. String user = "dettacare_event";
  28. String pass = "dettacare_event";
  29. String host = "http://192.168.3.102";
  30. String vhost = "vhost";
  31. String exchanges_name = "amq.topic";
  32. String queue_name_publish = "dettacare_event";
  33. String queue_name_subscribe = "dettacare_event";
  34. String routingKey = "dettacare_event";
  35.  
  36. // Define new Connection Factory
  37. ConnectionFactory factory = new ConnectionFactory();
  38.  
  39. /**
  40. * Setup Connection Factory
  41. * Load Setting Connection RMQ Sebagai Parameter Koneksi
  42. */
  43. public void setupConnectionFactory() {
  44. try {
  45. factory.setAutomaticRecoveryEnabled(false);
  46. factory.setUri("amqp://"+user+":"+pass+"@"+host);
  47. factory.setVirtualHost(vhost);
  48. } catch (KeyManagementException | NoSuchAlgorithmException | URISyntaxException e1) {
  49. e1.printStackTrace();
  50. }
  51. }
  52.  
  53. /**
  54. * Publish data lewat RMQ
  55. * @param message
  56. */
  57. public void publish(String message) {
  58. try {
  59. StrictMode.ThreadPolicy policy = new StrictMode.ThreadPolicy.Builder().permitAll().build();
  60. StrictMode.setThreadPolicy(policy);
  61.  
  62. Connection connection = factory.newConnection();
  63. Channel channel = connection.createChannel();
  64.  
  65. String messageOn = message ;
  66. channel.basicPublish("", queue_name_publish,null,messageOn.getBytes());
  67.  
  68. } catch (IOException e) {
  69. Log.d("Publish Error", e.getMessage());
  70. } catch (TimeoutException e) {
  71. Log.d("Publish Error", e.getMessage());
  72. } catch (Exception e) {
  73. Log.d("Publish Error", e.getMessage());
  74. }
  75.  
  76. }
  77.  
  78. /**
  79. * Optional, Send Speed for threading speed
  80. * @throws InterruptedException
  81. */
  82. public void SendSpeed() throws InterruptedException {
  83. Thread.sleep(500); //0.5 sec
  84. }
  85.  
  86. /**
  87. * Fungsi untuk subscribe data RMQ
  88. * @param handler
  89. * @param subscribeThread
  90. */
  91. public void subscribe(final Handler handler, Thread subscribeThread)
  92. {
  93. subscribeThread = new Thread(new Runnable() {
  94. @Override
  95. public void run() {
  96. while(true) {
  97. try {
  98. Connection connection = factory.newConnection();
  99. Channel channel = connection.createChannel();
  100. channel.basicQos(0);
  101. channel.queueBind(queue_name_subscribe, exchanges_name, routingKey);
  102. QueueingConsumer consumer = new QueueingConsumer(channel);
  103. channel.basicConsume(queue_name_subscribe, true, consumer);
  104. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  105.  
  106. if (delivery != null){
  107.  
  108. try{
  109.  
  110. String message = null;
  111. if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.KITKAT) {
  112. message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  113. }
  114. Log.d("ConsumeDataRMQ", "MessageConsumed" + message);
  115.  
  116. Message msg = handler.obtainMessage();
  117. Bundle bundle = new Bundle();
  118.  
  119. bundle.putString("msg", message);
  120. msg.setData(bundle);
  121. handler.sendMessage(msg);
  122.  
  123. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  124.  
  125. }catch (Exception e){
  126. channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);
  127. }
  128. }
  129. } catch (InterruptedException e) {
  130. break;
  131. } catch (Exception e1) {
  132. Log.d("", "Connection broken: " + e1.getClass().getName());
  133. try {
  134. Thread.sleep(4000); //sleep and then try again
  135. } catch (InterruptedException e) {
  136. break;
  137. }
  138. }
  139. }
  140. }
  141. });
  142. subscribeThread.start();
  143. }
  144. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement