Advertisement
Guest User

Untitled

a guest
Feb 26th, 2017
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.76 KB | None | 0 0
  1. #!/usr/bin/python
  2.  
  3. #nice apt-get -s -o Debug::NoLocking=true upgrade | grep ^Inst
  4.  
  5. import apt_pkg
  6. import os
  7. import sys
  8. from optparse import OptionParser
  9. import gettext
  10. import subprocess
  11.  
  12. SYNAPTIC_PINFILE = "/var/lib/synaptic/preferences"
  13. DISTRO = subprocess.Popen(["lsb_release","-c","-s"],
  14. stdout=subprocess.PIPE).communicate()[0].strip()
  15.  
  16. class OpNullProgress(object):
  17. def update(self, percent=None):
  18. pass
  19. def done(self):
  20. pass
  21.  
  22. def _(msg):
  23. return gettext.dgettext("update-notifier", msg)
  24.  
  25. def _handleException(type, value, tb):
  26. sys.stderr.write("E: "+ _("Unknown Error: '%s' (%s)") % (type,value))
  27. sys.exit(-1)
  28.  
  29. def clean(cache,depcache):
  30. " unmark (clean) all changes from the given depcache "
  31. # mvo: looping is too inefficient with the new auto-mark code
  32. #for pkg in cache.Packages:
  33. # depcache.MarkKeep(pkg)
  34. depcache.init()
  35.  
  36. def saveDistUpgrade(cache,depcache):
  37. """ this functions mimics a upgrade but will never remove anything """
  38. depcache.upgrade(True)
  39. if depcache.del_count > 0:
  40. clean(cache,depcache)
  41. depcache.upgrade()
  42.  
  43. def isSecurityUpgrade(ver):
  44. " check if the given version is a security update (or masks one) "
  45. security_pockets = [{"origin": "Ubuntu",
  46. "archive": "%s-security" % DISTRO},
  47.  
  48. {"origin": "gNewSense",
  49. "archive": "%s-security" % DISTRO},
  50.  
  51. {"origin": "Debian",
  52. "label": "Debian-Security"}]
  53.  
  54. for (file, index) in ver.file_list:
  55. for pocket in security_pockets:
  56. if all(getattr(file, key) == pocket[key] for key in pocket):
  57. return True
  58. return False
  59.  
  60. def write_package_names(outstream, cache, depcache):
  61. " write out package names that change to outstream "
  62. pkgs = filter(lambda pkg:
  63. depcache.marked_install(pkg) or depcache.marked_upgrade(pkg),
  64. cache.packages)
  65. outstream.write("\n".join(map(lambda p: p.name, pkgs)))
  66.  
  67. def write_human_readable_summary(outstream, upgrades, security_updates):
  68. " write out human summary summary to outstream "
  69. outstream.write(gettext.dngettext("update-notifier",
  70. "%i package can be updated.",
  71. "%i packages can be updated.",
  72. upgrades) % upgrades)
  73. outstream.write("\n")
  74. outstream.write(gettext.dngettext("update-notifier",
  75. "%i update is a security update.",
  76. "%i updates are security updates.",
  77. security_updates) % security_updates)
  78. outstream.write("\n")
  79. def init():
  80. " init the system, be nice "
  81. # FIXME: do a ionice here too?
  82. os.nice(19)
  83. apt_pkg.init()
  84. # force apt to build its caches in memory for now to make sure
  85. # that there is no race when the pkgcache file gets re-generated
  86. apt_pkg.config.set("Dir::Cache::pkgcache","")
  87.  
  88. def run(options=None):
  89.  
  90. # we are run in "are security updates installed automatically?"
  91. # question mode
  92. if options.security_updates_unattended:
  93. res = apt_pkg.config.find_i("APT::Periodic::Unattended-Upgrade", 0)
  94. #print res
  95. sys.exit(res)
  96.  
  97. # get caches
  98. try:
  99. cache = apt_pkg.Cache(OpNullProgress())
  100. except SystemError, e:
  101. sys.stderr.write("E: "+ _("Error: Opening the cache (%s)") % e)
  102. sys.exit(-1)
  103. depcache = apt_pkg.DepCache(cache)
  104.  
  105. # read the pin files
  106. depcache.read_pinfile()
  107. # read the synaptic pins too
  108. if os.path.exists(SYNAPTIC_PINFILE):
  109. depcache.read_pinfile(SYNAPTIC_PINFILE)
  110.  
  111. # init the depcache
  112. depcache.init()
  113.  
  114. if depcache.broken_count > 0:
  115. sys.stderr.write("E: "+ _("Error: BrokenCount > 0"))
  116. sys.exit(-1)
  117.  
  118. # do the upgrade (not dist-upgrade!)
  119. try:
  120. saveDistUpgrade(cache,depcache)
  121. except SystemError, e:
  122. sys.stderr.write("E: "+ _("Error: Marking the upgrade (%s)") % e)
  123. sys.exit(-1)
  124.  
  125. # analyze the ugprade
  126. upgrades = 0
  127. security_updates = 0
  128. for pkg in cache.packages:
  129. # skip packages that are not marked upgraded/installed
  130. if not (depcache.marked_install(pkg) or depcache.marked_upgrade(pkg)):
  131. continue
  132. # check if this is really a upgrade or a false positive
  133. # (workaround for ubuntu #7907)
  134. inst_ver = pkg.current_ver
  135. cand_ver = depcache.get_candidate_ver(pkg)
  136. if cand_ver == inst_ver:
  137. continue
  138.  
  139. # check for security upgrades
  140. upgrades = upgrades + 1
  141. if isSecurityUpgrade(cand_ver):
  142. security_updates += 1
  143. continue
  144.  
  145. # now check for security updates that are masked by a
  146. # canidate version from another repo (-proposed or -updates)
  147. for ver in pkg.version_list:
  148. if (inst_ver and apt_pkg.version_compare(ver.ver_str, inst_ver.ver_str) <= 0):
  149. #print "skipping '%s' " % ver.VerStr
  150. continue
  151. if isSecurityUpgrade(ver):
  152. security_updates += 1
  153. break
  154.  
  155. # print the number of upgrades
  156. if options and options.show_package_names:
  157. write_package_names(sys.stderr, cache, depcache)
  158. elif options and options.readable_output:
  159. write_human_readable_summary(sys.stdout, upgrades, security_updates)
  160. else:
  161. # print the number of regular upgrades and the number of
  162. # security upgrades
  163. sys.stderr.write("%s;%s" % (upgrades,security_updates))
  164.  
  165. # return the number of upgrades (if its used as a module)
  166. return(upgrades,security_updates)
  167.  
  168.  
  169. if __name__ == "__main__":
  170. # setup a exception handler to make sure that uncaught stuff goes
  171. # to the notifier
  172. sys.excepthook = _handleException
  173.  
  174. # gettext
  175. APP="update-notifier"
  176. DIR="/usr/share/locale"
  177. gettext.bindtextdomain(APP, DIR)
  178. gettext.textdomain(APP)
  179.  
  180. # check arguments
  181. parser = OptionParser()
  182. parser.add_option("-p",
  183. "--package-names",
  184. action="store_true",
  185. dest="show_package_names",
  186. help=_("Show the packages that are going to be installed/upgraded"))
  187. parser.add_option("",
  188. "--human-readable",
  189. action="store_true",
  190. dest="readable_output",
  191. help=_("Show human readable output on stdout"))
  192. parser.add_option("",
  193. "--security-updates-unattended",
  194. action="store_true",
  195. help=_("Return the time in days when security updates "
  196. "are installed unattended (0 means disabled)"))
  197. (options, args) = parser.parse_args()
  198.  
  199. # run it
  200. init()
  201. run(options)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement