Advertisement
Guest User

Untitled

a guest
Jan 26th, 2017
118
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.24 KB | None | 0 0
  1. from stompy import stomp
  2. import json
  3.  
  4.  
  5. s = stomp.Stomp(amq_ip, amq_port)
  6.  
  7. try:
  8. s.connect(username=amq_user, password=amq_pass)
  9. s.subscribe({'destination': '%s' % amq_queue, 'ack': 'client'})
  10. except Exception as e:
  11. print "ActiveMQ errorn %s" % e
  12.  
  13. while True:
  14. try:
  15. frame = s.receive_frame()
  16. body = json.loads(frame.body)
  17.  
  18. # это сообщение для меня?
  19. if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg":
  20. print "Its for me. I receive it"
  21. # Это сообщение для меня. Я его приму и обработаю
  22. s.ack(frame)
  23. else:
  24. # Это сообщение предназначено для кого-то другого и мне не подходит
  25. print "Its not for me"
  26. except Exception as e:
  27. print e
  28.  
  29. # -*- coding: utf-8 -*-
  30. import activemq_api
  31. import urllib3
  32. import json
  33.  
  34.  
  35. # Connection to ActiveMQ
  36. BROKER_NAME = "localhost"
  37. AMQ_API_PORT = 8161
  38. AMQ_API_USER = "admin"
  39. AMQ_API_PASS = "admin"
  40. AMQ_API_POSTFIX = "/api/jolokia"
  41. AMQ_TASK_QUEUE_NAME = "test"
  42. BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS)
  43. AMQ_STATUS_QUEUE = "/queue/test"
  44.  
  45. LOGIN_EXEMPT_URLS = [
  46. r'admin/'
  47. ]
  48.  
  49. LOGIN_URL = 'url_login'
  50.  
  51. LOGOUT_REDIRECT_URL = 'url_login'
  52.  
  53. if __name__ == '__main__':
  54. user_agent = "curl/7.49.1"
  55. headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent)
  56. addition = {
  57. "Content-Type": "application/x-www-form-urlencoded",
  58. "Accept": "*/*"
  59. }
  60. try:
  61. headers.update(addition)
  62. connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX)
  63. manager = activemq_api.AMQManager(connect)
  64. except Exception as e:
  65. print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__)
  66. else:
  67. print(u'Соединение успешно установлено')
  68.  
  69. try:
  70. id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1"
  71. secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw"
  72. print(manager.removeMsgQueue("test", id))
  73. except Exception as inst:
  74. print inst
  75.  
  76.  
  77. #!/usr/bin/python2
  78. # -*- coding: utf-8 -*-
  79. import urllib3
  80. import json
  81.  
  82. class Connection:
  83. def __init__(self, amq_ip, amq_port, broker, header, postfix):
  84. self.BROKER_NAME = broker
  85. self.AMQ_IP = amq_ip
  86. self.AMQ_PORT = amq_port
  87. self.HEADERS = header
  88. self.POSTFIX = postfix
  89.  
  90. class AMQManager():
  91. def __init__(self, conn):
  92. self.QUEUES = {}
  93. self.QUEUES_COUNT = None
  94. self.HEAP_MEMORY_USED = None
  95. self.MEMORY_PERSENT_USED = None
  96. self.CONNECTION = conn
  97. self.update()
  98.  
  99. def rmQueue(self, queue_names):
  100. REUQEST = {
  101. "type": "exec",
  102. "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
  103. "operation": "removeQueue(java.lang.String)",
  104. "arguments": [queue_names]
  105. }
  106. return json.dumps(REUQEST)
  107.  
  108. def queueList(self):
  109. REUQEST = {
  110. "type": "read",
  111. "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
  112. "attribute":"Queues"
  113. }
  114. return json.dumps(REUQEST)
  115.  
  116. def browseQueueSubscribers(self):
  117. REUQEST = {
  118. "type": "read",
  119. "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
  120. "attribute": "QueueSubscribers"
  121. }
  122. return json.dumps(REUQEST)
  123.  
  124. def memoryPersentUsed(self):
  125. REUQEST = {
  126. "type": "read",
  127. "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
  128. "attribute": "MemoryPercentUsage"
  129. }
  130. return json.dumps(REUQEST)
  131.  
  132. def heapMemoryUsed(self):
  133. REUQEST = {
  134. "type": "read",
  135. "mbean": "java.lang:type=Memory",
  136. "attribute":"HeapMemoryUsage",
  137. "path":"used"
  138. }
  139. return json.dumps(REUQEST)
  140.  
  141. def request(self, name, param):
  142. http = urllib3.PoolManager()
  143. body = ''
  144. if name == "removeQueue":
  145. body = self.rmQueue(param["QUEUE_NAME"])
  146. elif name == "queueList":
  147. body = self.queueList()
  148. elif name == "browseQueueSubscribers":
  149. body = self.browseQueueSubscribers()
  150. elif name == "memoryPersentUsed":
  151. body = self.memoryPersentUsed()
  152. elif name == "heapMemoryUsed":
  153. body = self.heapMemoryUsed()
  154.  
  155. url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
  156. r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
  157. return r.data
  158.  
  159. def updateQueues(self):
  160. res = json.loads(self.request("queueList", {}))
  161. # print res
  162. data = []
  163. for queue in res["value"]:
  164. object = {}
  165. queue["objectName"] = queue["objectName"].split(":")[1]
  166. for key in queue["objectName"].split(","):
  167. object.update({key.split("=")[0]: key.split("=")[1]})
  168. data.append(object)
  169. self.QUEUES_COUNT = 0
  170. self.QUEUES = {}
  171. # print data
  172. for queue in data:
  173. self.QUEUES.update({queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION)})
  174. self.QUEUES_COUNT += 1
  175.  
  176. def updateHeapMem(self):
  177. self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", {}))["value"]
  178.  
  179. def updatePersMem(self):
  180. self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", {}))["value"]
  181.  
  182. Ars, [26.01.17 14:06]
  183. ## EXPORTABLE
  184. def update(self):
  185. self.updateQueues()
  186. self.updateHeapMem()
  187. self.updatePersMem()
  188. ## EXPORTABLE
  189. def getQueues(self):
  190. self.updateQueues()
  191. data = []
  192. for queue in self.QUEUES:
  193. data.append(self.QUEUES[queue].getInfo())
  194. return {
  195. "queues_count": self.QUEUES_COUNT,
  196. "queues": data
  197. }
  198. ## EXPORTABLE
  199. def getQueueInfo(self, name):
  200. return self.QUEUES[name].getInfo()
  201. ## EXPORTABLE
  202. def browseQueue(self, name):
  203. return self.QUEUES[name].browse()
  204. ## EXPORTABLE
  205. def getMessage(self, name, msg_id):
  206. return self.QUEUES[name].message(msg_id)
  207. def getAllQueueMessages(self, name):
  208. return self.QUEUES[name].messages()
  209. ## EXPORTABLE
  210. def removeQueue(self, name):
  211. param = {
  212. "QUEUE_NAME": name
  213. }
  214. return json.loads(self.request("removeQueue", param))
  215. ## EXPORTABLE
  216. def clearQueue(self, name):
  217. return self.QUEUES[name].clear()
  218. # ARS
  219. def removeMsgQueue(self,nameQueue, id):
  220. return self.QUEUES[nameQueue].delete_msg(id)
  221.  
  222.  
  223.  
  224. class Queue():
  225. def __init__(self, q_name, conn):
  226. # научите обращаться к атрибутам суперкласса!
  227. self.MESSAGES = []
  228. self.QUEUE_NAME = q_name
  229. self.ENQUEUE_COUNT = None
  230. self.DEQUEUE_COUNT = None
  231. self.CONSUMER_COUNT = None
  232. self.QUEUE_SIZE = None
  233. self.CONNECTION = conn
  234. self.updateEnCount()
  235. self.updateDeCount()
  236. self.updateCoCount()
  237. self.updateQuSize()
  238.  
  239. def queueEnqueueCount(self):
  240. # MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"']
  241. REUQEST = {
  242. "type": "read",
  243. "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
  244. % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
  245. "attribute": "EnqueueCount"
  246. }
  247. return json.dumps(REUQEST)
  248.  
  249. def queueDequeueCount(self):
  250. REUQEST = {
  251. "type": "read",
  252. "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
  253. % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
  254. "attribute": "DequeueCount"
  255. }
  256. return json.dumps(REUQEST)
  257.  
  258. def queueConsumerCount(self):
  259. REUQEST = {
  260. "type": "read",
  261. "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
  262. % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
  263. "attribute": "ConsumerCount"
  264. }
  265. return json.dumps(REUQEST)
  266.  
  267. def queueSize(self):
  268. REUQEST = {
  269. "type": "read",
  270. "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
  271. % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
  272. "attribute": "QueueSize"
  273. }
  274. return json.dumps(REUQEST)
  275.  
  276. def browseMessages(self):
  277. REUQEST = {
  278. "type": "exec",
  279. "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
  280. % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
  281. "operation": "browse()",
  282. # "arguments": [""]
  283. }
  284. return json.dumps(REUQEST)
  285.  
  286. Ars, [26.01.17 14:06]
  287. def purge(self):
  288. REUQEST = {
  289. "type": "exec",
  290. "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
  291. % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
  292. "operation": "purge()"
  293. }
  294. return json.dumps(REUQEST)
  295. #ARS
  296. def deleteMsg(self, ID):
  297. REUQEST = {
  298. "type": "exec",
  299. "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
  300. % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
  301. "operation": "deleteMessage()",
  302. "arguments": [ID, "11111111-1111-1111-1111-111111111111"]
  303. }
  304. return json.dumps(REUQEST)
  305.  
  306. def request(self, name, param):
  307. http = urllib3.PoolManager()
  308.  
  309. if name == "queueEnqueueCount":
  310. body = self.queueEnqueueCount()
  311. elif name == "queueDequeueCount":
  312. body = self.queueDequeueCount()
  313. elif name == "queueConsumerCount":
  314. body = self.queueConsumerCount()
  315. elif name == "queueSize":
  316. body = self.queueSize()
  317. elif name == "browseMessages":
  318. body = self.browseMessages()
  319. elif name == "purge":
  320. body = self.purge()
  321. elif name == "delete_msg":
  322. body = self.deleteMsg(param)
  323.  
  324. url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
  325. r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
  326. return r.data
  327.  
  328. def updateEnCount(self):
  329. try:
  330. self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", {}))["value"]
  331. except Exception as inst:
  332. self.ENQUEUE_COUNT = -1
  333.  
  334. def updateDeCount(self):
  335. try:
  336. self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", {}))["value"]
  337. except Exception as inst:
  338. self.ENQUEUE_COUNT = -1
  339.  
  340. def updateCoCount(self):
  341. try:
  342. self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", {}))["value"]
  343. except Exception as inst:
  344. self.ENQUEUE_COUNT = -1
  345.  
  346. def updateQuSize(self):
  347. try:
  348. self.QUEUE_SIZE = json.loads(self.request("queueSize", {}))["value"]
  349. except Exception as inst:
  350. self.ENQUEUE_COUNT = -1
  351.  
  352. def updateMessages(self):
  353. self.MESSAGES = []
  354. res = json.loads(self.request("browseMessages", {}))["value"]
  355. for msg in res:
  356. data = {
  357. "id": msg["JMSMessageID"],
  358. "data": msg["Text"],
  359. "timestamp": msg["JMSTimestamp"],
  360. "priority": msg["JMSPriority"]
  361. }
  362. self.MESSAGES.append(data)
  363.  
  364. def update(self):
  365. self.updateEnCount()
  366. self.updateDeCount()
  367. self.updateCoCount()
  368. self.updateQuSize()
  369. self.updateMessages()
  370.  
  371. def getInfo(self):
  372. self.updateEnCount()
  373. self.updateDeCount()
  374. self.updateCoCount()
  375. self.updateQuSize()
  376. return {
  377. "queue_name": self.QUEUE_NAME,
  378. "enqueue_count": self.ENQUEUE_COUNT,
  379. "dequeue_count": self.DEQUEUE_COUNT,
  380. "consumer_count": self.CONSUMER_COUNT,
  381. "queue_size": self.QUEUE_SIZE
  382. }
  383.  
  384. def browse(self):
  385. self.updateMessages()
  386. data = []
  387. for msg in self.MESSAGES:
  388. chunk = {
  389. "id": msg["id"],
  390. "timestamp": msg["timestamp"],
  391. "priority": msg["priority"]
  392. }
  393. data.append(chunk)
  394. return data
  395.  
  396. Ars, [26.01.17 14:06]
  397. def message(self, msg_id):
  398. self.updateMessages()
  399. for msg in self.MESSAGES:
  400. if msg["id"] == msg_id:
  401. return msg["data"]
  402. # ARS
  403. def messages(self):
  404. self.updateMessages()
  405. return self.MESSAGES
  406.  
  407. # ARS
  408. def delete_msg(self, id):
  409. return json.loads(self.request("delete_msg",id))
  410.  
  411. def clear(self):
  412. return json.loads(self.request("purge", {}))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement