Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from stompy import stomp
- import json
- s = stomp.Stomp(amq_ip, amq_port)
- try:
- s.connect(username=amq_user, password=amq_pass)
- s.subscribe({'destination': '%s' % amq_queue, 'ack': 'client'})
- except Exception as e:
- print "ActiveMQ errorn %s" % e
- while True:
- try:
- frame = s.receive_frame()
- body = json.loads(frame.body)
- # это сообщение для меня?
- if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg":
- print "Its for me. I receive it"
- # Это сообщение для меня. Я его приму и обработаю
- s.ack(frame)
- else:
- # Это сообщение предназначено для кого-то другого и мне не подходит
- print "Its not for me"
- except Exception as e:
- print e
- # -*- coding: utf-8 -*-
- import activemq_api
- import urllib3
- import json
- # Connection to ActiveMQ
- BROKER_NAME = "localhost"
- AMQ_API_PORT = 8161
- AMQ_API_USER = "admin"
- AMQ_API_PASS = "admin"
- AMQ_API_POSTFIX = "/api/jolokia"
- AMQ_TASK_QUEUE_NAME = "test"
- BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS)
- AMQ_STATUS_QUEUE = "/queue/test"
- LOGIN_EXEMPT_URLS = [
- r'admin/'
- ]
- LOGIN_URL = 'url_login'
- LOGOUT_REDIRECT_URL = 'url_login'
- if __name__ == '__main__':
- user_agent = "curl/7.49.1"
- headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent)
- addition = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Accept": "*/*"
- }
- try:
- headers.update(addition)
- connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX)
- manager = activemq_api.AMQManager(connect)
- except Exception as e:
- print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__)
- else:
- print(u'Соединение успешно установлено')
- try:
- id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1"
- secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw"
- print(manager.removeMsgQueue("test", id))
- except Exception as inst:
- print inst
- #!/usr/bin/python2
- # -*- coding: utf-8 -*-
- import urllib3
- import json
- class Connection:
- def __init__(self, amq_ip, amq_port, broker, header, postfix):
- self.BROKER_NAME = broker
- self.AMQ_IP = amq_ip
- self.AMQ_PORT = amq_port
- self.HEADERS = header
- self.POSTFIX = postfix
- class AMQManager():
- def __init__(self, conn):
- self.QUEUES = {}
- self.QUEUES_COUNT = None
- self.HEAP_MEMORY_USED = None
- self.MEMORY_PERSENT_USED = None
- self.CONNECTION = conn
- self.update()
- def rmQueue(self, queue_names):
- REUQEST = {
- "type": "exec",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
- "operation": "removeQueue(java.lang.String)",
- "arguments": [queue_names]
- }
- return json.dumps(REUQEST)
- def queueList(self):
- REUQEST = {
- "type": "read",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
- "attribute":"Queues"
- }
- return json.dumps(REUQEST)
- def browseQueueSubscribers(self):
- REUQEST = {
- "type": "read",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
- "attribute": "QueueSubscribers"
- }
- return json.dumps(REUQEST)
- def memoryPersentUsed(self):
- REUQEST = {
- "type": "read",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
- "attribute": "MemoryPercentUsage"
- }
- return json.dumps(REUQEST)
- def heapMemoryUsed(self):
- REUQEST = {
- "type": "read",
- "mbean": "java.lang:type=Memory",
- "attribute":"HeapMemoryUsage",
- "path":"used"
- }
- return json.dumps(REUQEST)
- def request(self, name, param):
- http = urllib3.PoolManager()
- body = ''
- if name == "removeQueue":
- body = self.rmQueue(param["QUEUE_NAME"])
- elif name == "queueList":
- body = self.queueList()
- elif name == "browseQueueSubscribers":
- body = self.browseQueueSubscribers()
- elif name == "memoryPersentUsed":
- body = self.memoryPersentUsed()
- elif name == "heapMemoryUsed":
- body = self.heapMemoryUsed()
- url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
- r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
- return r.data
- def updateQueues(self):
- res = json.loads(self.request("queueList", {}))
- # print res
- data = []
- for queue in res["value"]:
- object = {}
- queue["objectName"] = queue["objectName"].split(":")[1]
- for key in queue["objectName"].split(","):
- object.update({key.split("=")[0]: key.split("=")[1]})
- data.append(object)
- self.QUEUES_COUNT = 0
- self.QUEUES = {}
- # print data
- for queue in data:
- self.QUEUES.update({queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION)})
- self.QUEUES_COUNT += 1
- def updateHeapMem(self):
- self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", {}))["value"]
- def updatePersMem(self):
- self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", {}))["value"]
- Ars, [26.01.17 14:06]
- ## EXPORTABLE
- def update(self):
- self.updateQueues()
- self.updateHeapMem()
- self.updatePersMem()
- ## EXPORTABLE
- def getQueues(self):
- self.updateQueues()
- data = []
- for queue in self.QUEUES:
- data.append(self.QUEUES[queue].getInfo())
- return {
- "queues_count": self.QUEUES_COUNT,
- "queues": data
- }
- ## EXPORTABLE
- def getQueueInfo(self, name):
- return self.QUEUES[name].getInfo()
- ## EXPORTABLE
- def browseQueue(self, name):
- return self.QUEUES[name].browse()
- ## EXPORTABLE
- def getMessage(self, name, msg_id):
- return self.QUEUES[name].message(msg_id)
- def getAllQueueMessages(self, name):
- return self.QUEUES[name].messages()
- ## EXPORTABLE
- def removeQueue(self, name):
- param = {
- "QUEUE_NAME": name
- }
- return json.loads(self.request("removeQueue", param))
- ## EXPORTABLE
- def clearQueue(self, name):
- return self.QUEUES[name].clear()
- # ARS
- def removeMsgQueue(self,nameQueue, id):
- return self.QUEUES[nameQueue].delete_msg(id)
- class Queue():
- def __init__(self, q_name, conn):
- # научите обращаться к атрибутам суперкласса!
- self.MESSAGES = []
- self.QUEUE_NAME = q_name
- self.ENQUEUE_COUNT = None
- self.DEQUEUE_COUNT = None
- self.CONSUMER_COUNT = None
- self.QUEUE_SIZE = None
- self.CONNECTION = conn
- self.updateEnCount()
- self.updateDeCount()
- self.updateCoCount()
- self.updateQuSize()
- def queueEnqueueCount(self):
- # MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"']
- REUQEST = {
- "type": "read",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
- % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
- "attribute": "EnqueueCount"
- }
- return json.dumps(REUQEST)
- def queueDequeueCount(self):
- REUQEST = {
- "type": "read",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
- % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
- "attribute": "DequeueCount"
- }
- return json.dumps(REUQEST)
- def queueConsumerCount(self):
- REUQEST = {
- "type": "read",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
- % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
- "attribute": "ConsumerCount"
- }
- return json.dumps(REUQEST)
- def queueSize(self):
- REUQEST = {
- "type": "read",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
- % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
- "attribute": "QueueSize"
- }
- return json.dumps(REUQEST)
- def browseMessages(self):
- REUQEST = {
- "type": "exec",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
- % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
- "operation": "browse()",
- # "arguments": [""]
- }
- return json.dumps(REUQEST)
- Ars, [26.01.17 14:06]
- def purge(self):
- REUQEST = {
- "type": "exec",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
- % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
- "operation": "purge()"
- }
- return json.dumps(REUQEST)
- #ARS
- def deleteMsg(self, ID):
- REUQEST = {
- "type": "exec",
- "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue"
- % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
- "operation": "deleteMessage()",
- "arguments": [ID, "11111111-1111-1111-1111-111111111111"]
- }
- return json.dumps(REUQEST)
- def request(self, name, param):
- http = urllib3.PoolManager()
- if name == "queueEnqueueCount":
- body = self.queueEnqueueCount()
- elif name == "queueDequeueCount":
- body = self.queueDequeueCount()
- elif name == "queueConsumerCount":
- body = self.queueConsumerCount()
- elif name == "queueSize":
- body = self.queueSize()
- elif name == "browseMessages":
- body = self.browseMessages()
- elif name == "purge":
- body = self.purge()
- elif name == "delete_msg":
- body = self.deleteMsg(param)
- url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
- r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
- return r.data
- def updateEnCount(self):
- try:
- self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", {}))["value"]
- except Exception as inst:
- self.ENQUEUE_COUNT = -1
- def updateDeCount(self):
- try:
- self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", {}))["value"]
- except Exception as inst:
- self.ENQUEUE_COUNT = -1
- def updateCoCount(self):
- try:
- self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", {}))["value"]
- except Exception as inst:
- self.ENQUEUE_COUNT = -1
- def updateQuSize(self):
- try:
- self.QUEUE_SIZE = json.loads(self.request("queueSize", {}))["value"]
- except Exception as inst:
- self.ENQUEUE_COUNT = -1
- def updateMessages(self):
- self.MESSAGES = []
- res = json.loads(self.request("browseMessages", {}))["value"]
- for msg in res:
- data = {
- "id": msg["JMSMessageID"],
- "data": msg["Text"],
- "timestamp": msg["JMSTimestamp"],
- "priority": msg["JMSPriority"]
- }
- self.MESSAGES.append(data)
- def update(self):
- self.updateEnCount()
- self.updateDeCount()
- self.updateCoCount()
- self.updateQuSize()
- self.updateMessages()
- def getInfo(self):
- self.updateEnCount()
- self.updateDeCount()
- self.updateCoCount()
- self.updateQuSize()
- return {
- "queue_name": self.QUEUE_NAME,
- "enqueue_count": self.ENQUEUE_COUNT,
- "dequeue_count": self.DEQUEUE_COUNT,
- "consumer_count": self.CONSUMER_COUNT,
- "queue_size": self.QUEUE_SIZE
- }
- def browse(self):
- self.updateMessages()
- data = []
- for msg in self.MESSAGES:
- chunk = {
- "id": msg["id"],
- "timestamp": msg["timestamp"],
- "priority": msg["priority"]
- }
- data.append(chunk)
- return data
- Ars, [26.01.17 14:06]
- def message(self, msg_id):
- self.updateMessages()
- for msg in self.MESSAGES:
- if msg["id"] == msg_id:
- return msg["data"]
- # ARS
- def messages(self):
- self.updateMessages()
- return self.MESSAGES
- # ARS
- def delete_msg(self, id):
- return json.loads(self.request("delete_msg",id))
- def clear(self):
- return json.loads(self.request("purge", {}))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement