Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- LOGGING_CONFIG = {
- 'version' : 1,
- 'disable_existing_loggers': True,
- 'formatters': {
- 'simple': {
- 'format': '%(asctime)s %(levelname)s %(process)d %(message)s'
- },
- },
- 'handlers': {
- 'console':{
- 'level': 'DEBUG',
- 'class': 'logging.StreamHandler',
- 'formatter': 'simple',
- },
- },
- 'loggers': {
- '': {
- 'handlers': ['console'],
- 'level': 'DEBUG',
- },
- },
- }
- import logging.config
- logging.config.dictConfig(LOGGING_CONFIG)
- from logging import debug
- from cStringIO import StringIO
- import time
- import pycurl
- from pyev import EV_READ, EV_WRITE, EV_IO
- import pyev
- import os
- event_map = {
- pycurl.POLL_NONE: 0,
- pycurl.POLL_IN: EV_READ,
- pycurl.POLL_OUT: EV_WRITE,
- pycurl.POLL_INOUT: EV_IO,
- }
- def process_curl_complete_handles():
- global multi
- while 1:
- num_q, ok_list, err_list = multi.info_read()
- debug("process_curl_complete_handles: %s, %s, %s" % (num_q, ok_list, err_list))
- for c in ok_list:
- multi.remove_handle(c)
- debug("success c.fetcher: %s" % repr(c.fetcher))
- # XXX
- c.fetcher.success()
- for c, errnum, errmsg in err_list:
- debug("c.fetcher.fail")
- multi.remove_handle(c)
- # XXX
- c.fetcher.fail(errnum, errmsg)
- os._exit(1)
- if num_q == 0:
- break
- def ev_event_cb(watcher, events):
- global multi
- debug("libev_eventc_cb: fd:%s, ev_event:%s" % (watcher.fd, events))
- curl_action = 0
- if events & EV_READ:
- curl_action |= pycurl.CSELECT_IN
- if events & EV_WRITE:
- curl_action |= pycurl.CSELECT_OUT
- while 1:
- debug("processing curl fd:%s" % watcher.fd)
- try:
- ret, num_handles = multi.socket_action(watcher.fd, curl_action)
- except pycurl.error, e:
- ret = e.args[0]
- if ret != pycurl.E_CALL_MULTI_PERFORM:
- break
- process_curl_complete_handles()
- def curl_socket_cb(event, fd, multi, data):
- global watchers, loop
- debug("curl_socket_cb: event: %s, fd:%s, multi:%s data:%s" % (event, fd, multi, data))
- debug("watchers: %s" % repr(watchers))
- if event == pycurl.POLL_REMOVE:
- # curl wants to stop poll on fd
- debug("removing polling for fd:%s" % fd)
- watchers[fd].stop()
- del watchers[fd]
- return
- else:
- ev_event = event_map[event]
- if fd not in watchers:
- # create new watcher for socket
- debug("creating new watcher")
- watcher = loop.io(fd, ev_event, ev_event_cb)
- watcher.start()
- watchers[fd] = watcher
- else:
- debug("updating watcher")
- watcher = watchers[fd]
- watcher.stop()
- watcher.set(fd, ev_event)
- watcher.start()
- def fetch(addr, url, headers):
- global multi
- fetcher = Fetcher(addr, url, headers)
- fetchers.add(fetcher)
- multi.socket_action(pycurl.SOCKET_TIMEOUT, 0)
- process_curl_complete_handles()
- class Fetcher(object):
- def __init__(self, addr, url, extra_headers):
- global multi
- c = pycurl.Curl()
- debug("just created curl: %s in fetcher: %s for %s" % (c, self, addr))
- self.addr = addr
- c.setopt(pycurl.VERBOSE, 1)
- c.setopt(pycurl.DEBUGFUNCTION, self.debug)
- self.read_buf = StringIO()
- self.headers_buf = StringIO()
- setopt = c.setopt
- setopt(pycurl.CONNECTTIMEOUT, 1)
- setopt(pycurl.TIMEOUT, 5)
- setopt(pycurl.URL, url)
- setopt(c.WRITEFUNCTION, self.read_buf.write)
- setopt(c.HEADERFUNCTION, self.headers_buf.write)
- headers = { "Connection" : "Keep-Alive",
- "Accept-Encoding" : "gzip",
- "X-Uniq" : "0001092D567400BF",
- "User" : "fooNova",
- "X-Real-Ip" : "127.0.0.1",
- 'User-Agent' : '',
- 'Accept' : '',
- }
- headers.update(extra_headers)
- c.setopt(pycurl.HTTPHEADER, ['%s: %s' % (k,v) for k,v in headers.iteritems()])
- c.fetcher = self
- self.c = c
- multi.add_handle(c)
- def debug(self, debug_type, msg):
- debug("%s debug: %s %s" % (self.addr, repr(debug_type), repr(msg)))
- def success(self):
- debug("fetcher %s success" % (self,))
- fetchers.remove(self)
- def fail(self, *args):
- debug("fetcher %s fail" % (self,))
- fetchers.remove(self)
- # main
- watchers = {}
- fetchers = set([])
- loop = pyev.default_loop()
- multi = pycurl.CurlMulti()
- multi.setopt(pycurl.M_MAXCONNECTS, 30)
- multi.setopt(pycurl.M_SOCKETFUNCTION, curl_socket_cb)
- last_fetch = time.time()
- def my_fetch():
- fetch('spellcheck', 'http://beta21.foo.ru/spellcheck/vxml?query=%D0%BA%D1%83%D0%BF%D0%B8%D1%82%D1%8C%20%D0%B0%D0%B2%D1%82%D0%BE', {})
- fetch('xmlsearch', 'http://xmlsearch.bar.ru/bar_url', {"Cookie": "spravka=dD0xMzM0ODQ1MTc4O2k9MTI3LjAuMC4xO3U9MTMzNDg0NTE3ODUwMDYyMDM0MztoPWIzYzU1NmE5ZmJiYWYxNzkwZTc0NWUwZjhiMGFhMDI2"})
- fetch('yabs','http://yabs.bar.ru/code/yabs.bar_url', {"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11", "Referer": "http://nova.foo.ru/serp?query=%D0%BA%D1%83%D0%BF%D0%B8%D1%82%D1%8C%20%D0%B0%D0%B2%D1%82%D0%BE"} )
- fetch('r0', 'http://localhost/rl0/xml/search.xml?id=search', {})
- fetch('baz', 'http://autocontext.baz.ru/baz_url', {"User-Agent" : "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11", "Cookie": "ruid=0000001D4F7D452C7D2C470D00000201"})
- my_fetch()
- while 1:
- loop.start(pyev.EVRUN_ONCE)
- if time.time() - last_fetch > 130:
- last_fetch = time.time()
- my_fetch()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement