Advertisement
Guest User

Untitled

a guest
Sep 28th, 2016
143
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 36.45 KB | None | 0 0
  1. #!/usr/bin/python -OO
  2. # Copyright 2008-2015 The SABnzbd-Team <team@sabnzbd.org>
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; either version 2
  7. # of the License, or (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  17.  
  18. """
  19. sabnzbd.downloader - download engine
  20. """
  21.  
  22. import time
  23. import select
  24. import logging
  25. from threading import Thread, RLock
  26. from nntplib import NNTPPermanentError
  27. import socket
  28. import random
  29. import sys
  30.  
  31. import sabnzbd
  32. from sabnzbd.decorators import synchronized, synchronized_CV, CV
  33. from sabnzbd.decoder import Decoder
  34. from sabnzbd.utils.sslinfo import ssl_protocols
  35. from sabnzbd.newswrapper import NewsWrapper, request_server_info
  36. from sabnzbd.articlecache import ArticleCache
  37. import sabnzbd.notifier as notifier
  38. import sabnzbd.config as config
  39. import sabnzbd.cfg as cfg
  40. from sabnzbd.bpsmeter import BPSMeter
  41. import sabnzbd.scheduler
  42. from sabnzbd.misc import from_units
  43. from sabnzbd.utils.happyeyeballs import happyeyeballs
  44.  
  45.  
  46. # Timeout penalty in minutes for each cause
  47. _PENALTY_UNKNOWN = 3 # Unknown cause
  48. _PENALTY_502 = 5 # Unknown 502
  49. _PENALTY_TIMEOUT = 10 # Server doesn't give an answer (multiple times)
  50. _PENALTY_SHARE = 10 # Account sharing detected
  51. _PENALTY_TOOMANY = 10 # Too many connections
  52. _PENALTY_PERM = 10 # Permanent error, like bad username/password
  53. _PENALTY_SHORT = 1 # Minimal penalty when no_penalties is set
  54. _PENALTY_VERYSHORT = 0.1 # Error 400 without cause clues
  55.  
  56.  
  57. TIMER_LOCK = RLock()
  58.  
  59.  
  60. class Server(object):
  61.  
  62. def __init__(self, id, displayname, host, port, timeout, threads, priority, ssl, ssl_type, send_group, username=None,
  63. password=None, optional=False, retention=0, categories=None):
  64.  
  65. self.id = id
  66. self.newid = None
  67. self.restart = False
  68. self.displayname = displayname
  69. self.host = host
  70. self.port = port
  71. self.timeout = timeout
  72. self.threads = threads
  73. self.priority = priority
  74. self.ssl = ssl
  75. self.ssl_type = None
  76. self.optional = optional
  77. self.retention = retention
  78. self.send_group = send_group
  79.  
  80. self.username = username
  81. self.password = password
  82.  
  83. self.categories = categories
  84.  
  85. self.busy_threads = []
  86. self.idle_threads = []
  87. self.active = True
  88. self.bad_cons = 0
  89. self.errormsg = ''
  90. self.warning = ''
  91. self.info = None # Will hold getaddrinfo() list
  92. self.request = False # True if a getaddrinfo() request is pending
  93. self.have_body = 'free.xsusenet.com' not in host
  94. self.have_stat = True # Assume server has "STAT", until proven otherwise
  95.  
  96. if ssl:
  97. # When the user has set a supported protocol, use it
  98. if ssl_type and ssl_type in ssl_protocols():
  99. self.ssl_type = ssl_type
  100.  
  101. for i in range(threads):
  102. self.idle_threads.append(NewsWrapper(self, i + 1))
  103.  
  104. @property
  105. def hostip(self):
  106. """ based on value of load_balancing() and self.info:
  107. 0: return the host name itself (so: do nothing)
  108. 1 and self.info has more than 1 entry (read: IP address): Return a random entry from the possible IPs
  109. 2 and self.info has more than 1 entry (read: IP address): Return the quickest IP based on the happyeyeballs algorithm
  110. In case of problems: return the host name itself
  111. """
  112. if cfg.load_balancing() == 1 and self.info and len(self.info) > 1:
  113. # Return a random entry from the possible IPs
  114. rnd = random.randint(0, len(self.info) - 1)
  115. ip = self.info[rnd][4][0]
  116. logging.debug('For server %s, using IP %s', self.host, ip)
  117. elif cfg.load_balancing() == 2 and self.info and len(self.info) > 1:
  118. # RFC6555 / Happy Eyeballs:
  119. ip = happyeyeballs(self.host, port=self.port, ssl=self.ssl)
  120. if ip:
  121. logging.debug('For server %s, using IP %s as server', self.host, ip)
  122. else:
  123. # nothing returned, so there was a connection problem
  124. ip = self.host
  125. logging.debug('For server %s, no successful IP connection possible', self.host)
  126. else:
  127. ip = self.host
  128. return ip
  129.  
  130. def stop(self, readers, writers):
  131. for nw in self.idle_threads:
  132. try:
  133. fno = nw.nntp.sock.fileno()
  134. except:
  135. fno = None
  136. if fno and fno in readers:
  137. readers.pop(fno)
  138. if fno and fno in writers:
  139. writers.pop(fno)
  140. nw.terminate(quit=True)
  141. self.idle_threads = []
  142.  
  143. def __repr__(self):
  144. return "%s:%s" % (self.host, self.port)
  145.  
  146.  
  147. class Downloader(Thread):
  148. """ Singleton Downloader Thread """
  149. do = None
  150.  
  151. def __init__(self, paused=False):
  152. Thread.__init__(self)
  153.  
  154. logging.debug("Initializing downloader/decoder")
  155.  
  156. # Used for scheduled pausing
  157. self.paused = paused
  158.  
  159. # used for throttling bandwidth and scheduling bandwidth changes
  160. cfg.bandwidth_perc.callback(self.speed_set)
  161. cfg.bandwidth_max.callback(self.speed_set)
  162. self.speed_set()
  163.  
  164. # Used for reducing speed
  165. self.delayed = False
  166.  
  167. self.postproc = False
  168.  
  169. self.shutdown = False
  170.  
  171. # A user might change server parms again before server restart is ready.
  172. # Keep a counter to prevent multiple restarts
  173. self.__restart = 0
  174.  
  175. self.force_disconnect = False
  176.  
  177. self.read_fds = {}
  178. self.write_fds = {}
  179.  
  180. self.servers = []
  181. self._timers = {}
  182.  
  183. for server in config.get_servers():
  184. self.init_server(None, server)
  185.  
  186. self.decoder = Decoder(self.servers)
  187. Downloader.do = self
  188.  
  189. def init_server(self, oldserver, newserver):
  190. """ Setup or re-setup single server
  191. When oldserver is defined and in use, delay startup.
  192. Note that the server names are "host:port" strings!
  193. """
  194.  
  195. create = False
  196.  
  197. servers = config.get_servers()
  198. if newserver in servers:
  199. srv = servers[newserver]
  200. enabled = srv.enable()
  201. displayname = srv.displayname()
  202. host = srv.host()
  203. port = srv.port()
  204. timeout = srv.timeout()
  205. threads = srv.connections()
  206. priority = srv.priority()
  207. ssl = srv.ssl() and sabnzbd.newswrapper.HAVE_SSL
  208. ssl_type = srv.ssl_type()
  209. username = srv.username()
  210. password = srv.password()
  211. optional = srv.optional()
  212. categories = srv.categories()
  213. retention = float(srv.retention() * 24 * 3600) # days ==> seconds
  214. send_group = srv.send_group()
  215. create = True
  216.  
  217. if oldserver:
  218. for n in xrange(len(self.servers)):
  219. if self.servers[n].id == oldserver:
  220. # Server exists, do re-init later
  221. create = False
  222. self.servers[n].newid = newserver
  223. self.servers[n].restart = True
  224. self.__restart += 1
  225. break
  226.  
  227. if create and enabled and host and port and threads:
  228. self.servers.append(Server(newserver, displayname, host, port, timeout, threads, priority, ssl,
  229. ssl_type, send_group,
  230. username, password, optional, retention, categories=categories))
  231.  
  232. return
  233.  
  234. @synchronized_CV
  235. def set_paused_state(self, state):
  236. """ Set downloader to specified paused state """
  237. self.paused = state
  238.  
  239. @synchronized_CV
  240. def resume(self):
  241. # Do not notify when SABnzbd is still starting
  242. if self.paused and sabnzbd.WEB_DIR:
  243. logging.info("Resuming")
  244. notifier.send_notification("SABnzbd", T('Resuming'), 'download')
  245. self.paused = False
  246.  
  247. @synchronized_CV
  248. def pause(self, save=True):
  249. """ Pause the downloader, optionally saving admin """
  250. if not self.paused:
  251. self.paused = True
  252. logging.info("Pausing")
  253. notifier.send_notification("SABnzbd", T('Paused'), 'download')
  254. if self.is_paused():
  255. BPSMeter.do.reset()
  256. if cfg.autodisconnect():
  257. self.disconnect()
  258. if save:
  259. ArticleCache.do.flush_articles()
  260.  
  261. @synchronized_CV
  262. def delay(self):
  263. logging.debug("Delaying")
  264. self.delayed = True
  265.  
  266. @synchronized_CV
  267. def undelay(self):
  268. logging.debug("Undelaying")
  269. self.delayed = False
  270.  
  271. @synchronized_CV
  272. def wait_for_postproc(self):
  273. logging.info("Waiting for post-processing to finish")
  274. self.postproc = True
  275.  
  276. @synchronized_CV
  277. def resume_from_postproc(self):
  278. logging.info("Post-processing finished, resuming download")
  279. self.postproc = False
  280.  
  281. def disconnect(self):
  282. self.force_disconnect = True
  283.  
  284. @synchronized_CV
  285. def limit_speed(self, value):
  286. ''' Set the actual download speed in Bytes/sec
  287. When 'value' ends with a '%' sign or is within 1-100, it is interpreted as a pecentage of the maximum bandwidth
  288. When no '%' is found, it is interpreted as an absolute speed (including KMGT notation).
  289. '''
  290. if value:
  291. mx = cfg.bandwidth_max.get_int()
  292. if '%' in str(value) or (from_units(value) > 0 and from_units(value) < 101):
  293. limit = value.strip(' %')
  294. self.bandwidth_perc = from_units(limit)
  295. if mx:
  296. self.bandwidth_limit = mx * self.bandwidth_perc / 100
  297. else:
  298. logging.warning(T('You must set a maximum bandwidth before you can set a bandwidth limit'))
  299. else:
  300. self.bandwidth_limit = from_units(value)
  301. if mx:
  302. self.bandwidth_perc = self.bandwidth_limit / mx * 100
  303. else:
  304. self.bandwidth_perc = 100
  305. else:
  306. self.speed_set()
  307. logging.info("Speed limit set to %s B/s", self.bandwidth_limit)
  308.  
  309. def get_limit(self):
  310. return self.bandwidth_perc
  311.  
  312. def get_limit_abs(self):
  313. return self.bandwidth_limit
  314.  
  315. def speed_set(self):
  316. limit = cfg.bandwidth_max.get_int()
  317. perc = cfg.bandwidth_perc()
  318. if limit and perc:
  319. self.bandwidth_perc = perc
  320. self.bandwidth_limit = limit * perc / 100
  321. else:
  322. self.bandwidth_perc = 0
  323. self.bandwidth_limit = 0
  324.  
  325. def is_paused(self):
  326. from sabnzbd.nzbqueue import NzbQueue
  327. if not self.paused:
  328. return False
  329. else:
  330. if NzbQueue.do.has_forced_items():
  331. return False
  332. else:
  333. return True
  334.  
  335. def highest_server(self, me):
  336. """ Return True when this server has the highest priority of the active ones
  337. 0 is the highest priority
  338. """
  339.  
  340. for server in self.servers:
  341. if server is not me and server.active and server.priority < me.priority:
  342. return False
  343. return True
  344.  
  345. def nzo_servers(self, nzo):
  346. return filter(nzo.server_in_try_list, self.servers)
  347.  
  348. def maybe_block_server(self, server):
  349. from sabnzbd.nzbqueue import NzbQueue
  350. if server.optional and server.active and (server.bad_cons / server.threads) > 3:
  351. # Optional and active server had too many problems,
  352. # disable it now and send a re-enable plan to the scheduler
  353. server.bad_cons = 0
  354. server.active = False
  355. server.errormsg = T('Server %s will be ignored for %s minutes') % ('', _PENALTY_TIMEOUT)
  356. logging.warning(T('Server %s will be ignored for %s minutes'), server.id, _PENALTY_TIMEOUT)
  357. self.plan_server(server.id, _PENALTY_TIMEOUT)
  358.  
  359. # Remove all connections to server
  360. for nw in server.idle_threads + server.busy_threads:
  361. self.__reset_nw(nw, "forcing disconnect", warn=False, wait=False, quit=False)
  362. # Make sure server address resolution is refreshed
  363. server.info = None
  364.  
  365. NzbQueue.do.reset_all_try_lists()
  366.  
  367. def run(self):
  368. from sabnzbd.nzbqueue import NzbQueue
  369. self.decoder.start()
  370.  
  371. # Kick BPS-Meter to check quota
  372. BPSMeter.do.update()
  373.  
  374. while 1:
  375. for server in self.servers:
  376. if 0: assert isinstance(server, Server) # Assert only for debug purposes
  377. for nw in server.busy_threads[:]:
  378. if (nw.nntp and nw.nntp.error_msg) or (nw.timeout and time.time() > nw.timeout):
  379. if nw.nntp and nw.nntp.error_msg:
  380. self.__reset_nw(nw, "", warn=False)
  381. else:
  382. self.__reset_nw(nw, "timed out")
  383. server.bad_cons += 1
  384. self.maybe_block_server(server)
  385. if server.restart:
  386. if not server.busy_threads:
  387. newid = server.newid
  388. server.stop(self.read_fds, self.write_fds)
  389. self.servers.remove(server)
  390. if newid:
  391. self.init_server(None, newid)
  392. self.__restart -= 1
  393. NzbQueue.do.reset_all_try_lists()
  394. # Have to leave this loop, because we removed element
  395. break
  396. else:
  397. # Restart pending, don't add new articles
  398. continue
  399.  
  400. if 0: assert isinstance(server, Server) # Assert only for debug purposes
  401. if not server.idle_threads or server.restart or self.is_paused() or self.shutdown or self.delayed or self.postproc:
  402. continue
  403.  
  404. if not (server.active and NzbQueue.do.has_articles_for(server)):
  405. continue
  406.  
  407. for nw in server.idle_threads[:]:
  408. if 0: assert isinstance(nw, NewsWrapper) # Assert only for debug purposes
  409. if nw.timeout:
  410. if time.time() < nw.timeout:
  411. continue
  412. else:
  413. nw.timeout = None
  414.  
  415. if not server.active:
  416. break
  417.  
  418. if server.info is None:
  419. self.maybe_block_server(server)
  420. request_server_info(server)
  421. break
  422.  
  423. article = NzbQueue.do.get_article(server, self.servers)
  424.  
  425. if not article:
  426. break
  427.  
  428. if server.retention and article.nzf.nzo.avg_stamp < time.time() - server.retention:
  429. # Article too old for the server, treat as missing
  430. if sabnzbd.LOG_ALL:
  431. logging.debug('Article %s too old for %s', article.article, server.id)
  432. self.decoder.decode(article, None)
  433. break
  434.  
  435. server.idle_threads.remove(nw)
  436. server.busy_threads.append(nw)
  437.  
  438. nw.article = article
  439.  
  440. if nw.connected:
  441. self.__request_article(nw)
  442. else:
  443. try:
  444. logging.info("%s@%s: Initiating connection",
  445. nw.thrdnum, server.id)
  446. nw.init_connect(self.write_fds)
  447. except:
  448. logging.error(T('Failed to initialize %s@%s with reason: %s'), nw.thrdnum, server.id, sys.exc_info()[1])
  449. self.__reset_nw(nw, "failed to initialize")
  450.  
  451. # Exit-point
  452. if self.shutdown:
  453. empty = True
  454. for server in self.servers:
  455. if server.busy_threads:
  456. empty = False
  457. break
  458.  
  459. if empty:
  460. self.decoder.stop()
  461. self.decoder.join()
  462.  
  463. for server in self.servers:
  464. server.stop(self.read_fds, self.write_fds)
  465.  
  466. logging.info("Shutting down")
  467. break
  468.  
  469. if self.force_disconnect:
  470. for server in self.servers:
  471. for nw in server.idle_threads + server.busy_threads:
  472. quit = nw.connected and server.active
  473. self.__reset_nw(nw, "forcing disconnect", warn=False, wait=False, quit=quit)
  474. # Make sure server address resolution is refreshed
  475. server.info = None
  476.  
  477. self.force_disconnect = False
  478.  
  479. # => Select
  480. readkeys = self.read_fds.keys()
  481. writekeys = self.write_fds.keys()
  482.  
  483. if readkeys or writekeys:
  484. read, write, error = select.select(readkeys, writekeys, (), 1.0)
  485.  
  486. # Why check so often when so few things happend?
  487. if len(readkeys) >= 15 and len(read) <= 2:
  488. time.sleep(0.05)
  489.  
  490. else:
  491. read, write, error = ([], [], [])
  492.  
  493. BPSMeter.do.reset()
  494.  
  495. time.sleep(1.0)
  496.  
  497. CV.acquire()
  498. while (NzbQueue.do.is_empty() or self.is_paused() or self.delayed or self.postproc) and not \
  499. self.shutdown and not self.__restart:
  500. CV.wait()
  501. CV.release()
  502.  
  503. self.force_disconnect = False
  504.  
  505. for selected in write:
  506. nw = self.write_fds[selected]
  507.  
  508. fileno = nw.nntp.sock.fileno()
  509.  
  510. if fileno not in self.read_fds:
  511. self.read_fds[fileno] = nw
  512.  
  513. if fileno in self.write_fds:
  514. self.write_fds.pop(fileno)
  515.  
  516. if not read:
  517. BPSMeter.do.update()
  518. continue
  519.  
  520. for selected in read:
  521. nw = self.read_fds[selected]
  522. article = nw.article
  523. server = nw.server
  524.  
  525. if article:
  526. nzo = article.nzf.nzo
  527.  
  528. try:
  529. bytes, done, skip = nw.recv_chunk()
  530. except:
  531. bytes, done, skip = (0, False, False)
  532.  
  533. if skip:
  534. BPSMeter.do.update()
  535. continue
  536.  
  537. if bytes < 1:
  538. self.__reset_nw(nw, "server closed connection", warn=False, wait=False)
  539. continue
  540.  
  541. else:
  542. if self.bandwidth_limit:
  543. bps = BPSMeter.do.get_bps()
  544. bps += bytes
  545. limit = self.bandwidth_limit
  546. if bps > limit:
  547. while BPSMeter.do.get_bps() > limit:
  548. time.sleep(0.05)
  549. BPSMeter.do.update()
  550. BPSMeter.do.update(server.id, bytes)
  551.  
  552. if nzo:
  553. nzo.update_download_stats(BPSMeter.do.get_bps(), server.id, bytes)
  554.  
  555. if len(nw.lines) == 1:
  556. code = nw.lines[0][:3]
  557. if not nw.connected or code == '480':
  558. done = False
  559.  
  560. try:
  561. nw.finish_connect(code)
  562. if sabnzbd.LOG_ALL:
  563. logging.debug("%s@%s last message -> %s", nw.thrdnum, nw.server.id, nw.lines[0])
  564. nw.lines = []
  565. nw.data = ''
  566. except NNTPPermanentError, error:
  567. # Handle login problems
  568. block = False
  569. penalty = 0
  570. msg = error.response
  571. ecode = msg[:3]
  572. display_msg = ' [%s]' % msg
  573. logging.debug('Server login problem: %s, %s', ecode, msg)
  574. if ecode in ('502', '400', '481', '482') and clues_too_many(msg):
  575. # Too many connections: remove this thread and reduce thread-setting for server
  576. # Plan to go back to the full number after a penalty timeout
  577. if server.active:
  578. errormsg = T('Too many connections to server %s') % display_msg
  579. if server.errormsg != errormsg:
  580. server.errormsg = errormsg
  581. logging.warning(T('Too many connections to server %s'), server.id)
  582. self.__reset_nw(nw, None, warn=False, destroy=True, quit=True)
  583. self.plan_server(server.id, _PENALTY_TOOMANY)
  584. server.threads -= 1
  585. elif ecode in ('502', '481', '482') and clues_too_many_ip(msg):
  586. # Account sharing?
  587. if server.active:
  588. errormsg = T('Probable account sharing') + display_msg
  589. if server.errormsg != errormsg:
  590. server.errormsg = errormsg
  591. name = ' (%s)' % server.id
  592. logging.warning(T('Probable account sharing') + name)
  593. penalty = _PENALTY_SHARE
  594. elif ecode in ('481', '482', '381') or (ecode == '502' and clues_login(msg)):
  595. # Cannot login, block this server
  596. if server.active:
  597. errormsg = T('Failed login for server %s') % display_msg
  598. if server.errormsg != errormsg:
  599. server.errormsg = errormsg
  600. logging.error(T('Failed login for server %s'), server.id)
  601. penalty = _PENALTY_PERM
  602. block = True
  603. elif ecode == '502':
  604. # Cannot connect (other reasons), block this server
  605. if server.active:
  606. errormsg = T('Cannot connect to server %s [%s]') % ('', display_msg)
  607. if server.errormsg != errormsg:
  608. server.errormsg = errormsg
  609. logging.warning(T('Cannot connect to server %s [%s]'), server.id, msg)
  610. if clues_pay(msg):
  611. penalty = _PENALTY_PERM
  612. else:
  613. penalty = _PENALTY_502
  614. block = True
  615. elif ecode == '400':
  616. # Temp connection problem?
  617. if server.active:
  618. logging.debug('Unspecified error 400 from server %s', server.id)
  619. penalty = _PENALTY_VERYSHORT
  620. block = True
  621. else:
  622. # Unknown error, just keep trying
  623. if server.active:
  624. errormsg = T('Cannot connect to server %s [%s]') % ('', display_msg)
  625. if server.errormsg != errormsg:
  626. server.errormsg = errormsg
  627. logging.warning(T('Cannot connect to server %s [%s]'), server.id, msg)
  628. penalty = _PENALTY_UNKNOWN
  629. if block or (penalty and server.optional):
  630. if server.active:
  631. server.active = False
  632. if (not server.optional) and cfg.no_penalties():
  633. penalty = _PENALTY_SHORT
  634. if penalty and (block or server.optional):
  635. logging.info('Server %s ignored for %s minutes', server.id, penalty)
  636. self.plan_server(server.id, penalty)
  637. NzbQueue.do.reset_all_try_lists()
  638. self.__reset_nw(nw, None, warn=False, quit=True)
  639. continue
  640. except:
  641. logging.error(T('Connecting %s@%s failed, message=%s'),
  642. nw.thrdnum, nw.server.id, nw.lines[0])
  643. # No reset-warning needed, above logging is sufficient
  644. self.__reset_nw(nw, None, warn=False)
  645.  
  646. if nw.connected:
  647. logging.info("Connecting %s@%s finished", nw.thrdnum, nw.server.id)
  648. self.__request_article(nw)
  649.  
  650. elif code == '223':
  651. done = True
  652. logging.debug('Article <%s> is present', article.article)
  653. self.decoder.decode(article, nw.lines)
  654.  
  655. elif code == '211':
  656. done = False
  657.  
  658. logging.debug("group command ok -> %s",
  659. nw.lines)
  660. nw.group = nw.article.nzf.nzo.group
  661. nw.lines = []
  662. nw.data = ''
  663. self.__request_article(nw)
  664.  
  665. elif code in ('411', '423', '430'):
  666. done = True
  667. nw.lines = None
  668.  
  669. logging.info('Thread %s@%s: Article ' +
  670. '%s missing (error=%s)',
  671. nw.thrdnum, nw.server.id, article.article, code)
  672.  
  673. elif code == '480':
  674. if server.active:
  675. server.active = False
  676. server.errormsg = T('Server %s requires user/password') % ''
  677. self.plan_server(server.id, 0)
  678. NzbQueue.do.reset_all_try_lists()
  679. msg = T('Server %s requires user/password') % nw.server.id
  680. self.__reset_nw(nw, msg, quit=True)
  681.  
  682. elif code == '500':
  683. if nzo.precheck:
  684. # Assume "STAT" command is not supported
  685. server.have_stat = False
  686. logging.debug('Server %s does not support STAT', server.id)
  687. else:
  688. # Assume "BODY" command is not supported
  689. server.have_body = False
  690. logging.debug('Server %s does not support BODY', server.id)
  691. nw.lines = []
  692. nw.data = ''
  693. self.__request_article(nw)
  694.  
  695. if done:
  696. server.bad_cons = 0 # Succesful data, clear "bad" counter
  697. server.errormsg = server.warning = ''
  698. if sabnzbd.LOG_ALL:
  699. logging.debug('Thread %s@%s: %s done', nw.thrdnum, server.id, article.article)
  700. self.decoder.decode(article, nw.lines)
  701.  
  702. nw.soft_reset()
  703. server.busy_threads.remove(nw)
  704. server.idle_threads.append(nw)
  705.  
  706. def __lookup_nw(self, nw):
  707. """ Find the fileno matching the nw, needed for closed connections """
  708. for f in self.read_fds:
  709. if self.read_fds[f] == nw:
  710. return f
  711. for f in self.write_fds:
  712. if self.read_fds[f] == nw:
  713. return f
  714. return None
  715.  
  716. def __reset_nw(self, nw, errormsg, warn=True, wait=True, destroy=False, quit=False):
  717. from sabnzbd.nzbqueue import NzbQueue
  718. server = nw.server
  719. article = nw.article
  720. fileno = None
  721.  
  722. if nw.nntp:
  723. try:
  724. fileno = nw.nntp.sock.fileno()
  725. except:
  726. fileno = self.__lookup_nw(nw)
  727. destroy = True
  728. nw.nntp.error_msg = None
  729.  
  730. if warn and errormsg:
  731. server.warning = errormsg
  732. logging.info('Thread %s@%s: ' + errormsg, nw.thrdnum, server.id)
  733. elif errormsg:
  734. logging.info('Thread %s@%s: ' + errormsg, nw.thrdnum, server.id)
  735.  
  736. if nw in server.busy_threads:
  737. server.busy_threads.remove(nw)
  738. if not (destroy or nw in server.idle_threads):
  739. server.idle_threads.append(nw)
  740.  
  741. if fileno and fileno in self.write_fds:
  742. self.write_fds.pop(fileno)
  743. if fileno and fileno in self.read_fds:
  744. self.read_fds.pop(fileno)
  745.  
  746. if article:
  747. if article.tries > cfg.max_art_tries() and (article.fetcher.optional or not cfg.max_art_opt()):
  748. # Too many tries on this server, consider article missing
  749. self.decoder.decode(article, None)
  750. else:
  751. # Remove this server from try_list
  752. article.fetcher = None
  753.  
  754. nzf = article.nzf
  755. nzo = nzf.nzo
  756.  
  757. # Allow all servers to iterate over each nzo/nzf again ##
  758. NzbQueue.do.reset_try_lists(nzf, nzo)
  759.  
  760. if destroy:
  761. nw.terminate(quit=quit)
  762. else:
  763. nw.hard_reset(wait, quit=quit)
  764.  
  765. def __request_article(self, nw):
  766. try:
  767. nzo = nw.article.nzf.nzo
  768. if nw.server.send_group and nzo.group != nw.group:
  769. group = nzo.group
  770. if sabnzbd.LOG_ALL:
  771. logging.debug('Thread %s@%s: GROUP <%s>', nw.thrdnum, nw.server.id, group)
  772. nw.send_group(group)
  773. else:
  774. if sabnzbd.LOG_ALL:
  775. logging.debug('Thread %s@%s: BODY %s', nw.thrdnum, nw.server.id, nw.article.article)
  776. nw.body(nzo.precheck)
  777.  
  778. fileno = nw.nntp.sock.fileno()
  779. if fileno not in self.read_fds:
  780. self.read_fds[fileno] = nw
  781. except socket.error, err:
  782. logging.info('Looks like server closed connection: %s', err)
  783. self.__reset_nw(nw, "server broke off connection", quit=False)
  784. except:
  785. logging.error(T('Suspect error in downloader'))
  786. logging.info("Traceback: ", exc_info=True)
  787. self.__reset_nw(nw, "server broke off connection", quit=False)
  788.  
  789. #------------------------------------------------------------------------------
  790. # Timed restart of servers admin.
  791. # For each server all planned events are kept in a list.
  792. # When the first timer of a server fires, all other existing timers
  793. # are neutralized.
  794. # Each server has a dictionary entry, consisting of a list of timestamps.
  795.  
  796. @synchronized(TIMER_LOCK)
  797. def plan_server(self, server_id, interval):
  798. """ Plan the restart of a server in 'interval' minutes """
  799. logging.debug('Set planned server resume %s in %s mins', server_id, interval)
  800. if server_id not in self._timers:
  801. self._timers[server_id] = []
  802. stamp = time.time() + 60.0 * interval
  803. self._timers[server_id].append(stamp)
  804. if interval:
  805. sabnzbd.scheduler.plan_server(self.trigger_server, [server_id, stamp], interval)
  806.  
  807. @synchronized(TIMER_LOCK)
  808. def trigger_server(self, server_id, timestamp):
  809. """ Called by scheduler, start server if timer still valid """
  810. logging.debug('Trigger planned server resume %s', server_id)
  811. if server_id in self._timers:
  812. if timestamp in self._timers[server_id]:
  813. del self._timers[server_id]
  814. self.init_server(server_id, server_id)
  815.  
  816. @synchronized_CV
  817. @synchronized(TIMER_LOCK)
  818. def unblock(self, server_id):
  819. # Remove timer
  820. try:
  821. del self._timers[server_id]
  822. except KeyError:
  823. pass
  824. # Activate server if it was inactive
  825. for server in self.servers:
  826. if server.id == server_id and not server.active:
  827. logging.debug('Unblock server %s', server_id)
  828. self.init_server(server_id, server_id)
  829. break
  830.  
  831. def unblock_all(self):
  832. for server_id in self._timers.keys():
  833. self.unblock(server_id)
  834.  
  835. @synchronized_CV
  836. @synchronized(TIMER_LOCK)
  837. def check_timers(self):
  838. """ Make sure every server without a non-expired timer is active """
  839. # Clean expired timers
  840. now = time.time()
  841. kicked = []
  842. for server_id in self._timers.keys():
  843. if not [stamp for stamp in self._timers[server_id] if stamp >= now]:
  844. logging.debug('Forcing re-evaluation of server %s', server_id)
  845. del self._timers[server_id]
  846. self.init_server(server_id, server_id)
  847. kicked.append(server_id)
  848. # Activate every inactive server without an active timer
  849. for server in self.servers:
  850. if server.id not in self._timers:
  851. if server.id not in kicked and not server.active:
  852. logging.debug('Forcing activation of server %s', server.id)
  853. self.init_server(server.id, server.id)
  854.  
  855. @synchronized_CV
  856. def update_server(self, oldserver, newserver):
  857. self.init_server(oldserver, newserver)
  858.  
  859. @synchronized_CV
  860. def wakeup(self):
  861. """ Just rattle the semaphore """
  862. pass
  863.  
  864. def stop(self):
  865. self.shutdown = True
  866. notifier.send_notification("SABnzbd", T('Shutting down'), 'startup')
  867.  
  868.  
  869. def stop():
  870. CV.acquire()
  871. try:
  872. Downloader.do.stop()
  873. finally:
  874. CV.notifyAll()
  875. CV.release()
  876. try:
  877. Downloader.do.join()
  878. except:
  879. pass
  880.  
  881.  
  882. def clues_login(text):
  883. """ Check for any "failed login" clues in the response code """
  884. text = text.lower()
  885. for clue in ('username', 'password', 'invalid', 'authen', 'access denied'):
  886. if clue in text:
  887. return True
  888. return False
  889.  
  890.  
  891. def clues_too_many(text):
  892. """ Check for any "too many connections" clues in the response code """
  893. text = text.lower()
  894. for clue in ('exceed', 'connections', 'too many', 'threads', 'limit'):
  895. if clue in text:
  896. return True
  897. return False
  898.  
  899.  
  900. def clues_too_many_ip(text):
  901. """ Check for any "account sharing" clues in the response code """
  902. text = text.lower()
  903. for clue in ('simultaneous ip', 'multiple ip'):
  904. if clue in text:
  905. return True
  906. return False
  907.  
  908.  
  909. def clues_pay(text):
  910. """ Check for messages about payments """
  911. text = text.lower()
  912. for clue in ('credits', 'paym', 'expired'):
  913. if clue in text:
  914. return True
  915. return False
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement