Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Jul 1st, 2012  |  syntax: None  |  size: 15.94 KB  |  hits: 14  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. # This file is part of Miro Community.
  2. # Copyright (C) 2010 Participatory Culture Foundation
  3. #
  4. # Miro Community is free software: you can redistribute it and/or modify it
  5. # under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or (at your
  7. # option) any later version.
  8. #
  9. # Miro Community is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with Miro Community.  If not, see <http://www.gnu.org/licenses/>.
  16.  
  17. import datetime
  18. import os
  19. import logging
  20.  
  21. try:
  22.    from xapian import DatabaseLockError
  23. except ImportError:
  24.     class DatabaseLockError(Exception):
  25.         """
  26.         Dummy exception; nothing raises me.
  27.         """
  28. else:
  29.     import random # don't need this otherwise
  30.  
  31. from celery.exceptions import MaxRetriesExceededError
  32. from celery.task import task
  33. from django.conf import settings
  34. from django.db.models.loading import get_model
  35. from django.contrib.auth.models import User
  36. from haystack import site
  37. from haystack.query import SearchQuerySet
  38.  
  39. from localtv import utils
  40. from localtv.exceptions import CannotOpenImageUrl
  41. from localtv.models import Video, Feed, SiteLocation, SavedSearch, Category
  42. from localtv.tiers import Tier
  43.  
  44.  
  45. CELERY_USING = getattr(settings, 'LOCALTV_CELERY_USING', 'default')
  46.  
  47.  
  48. if hasattr(settings.DATABASES, 'module'):
  49.     def patch_settings(func):
  50.         def wrapper(*args, **kwargs):
  51.             using = kwargs.get('using', None)
  52.             if using in (None, 'default', CELERY_USING):
  53.                 logging.info('running %s(*%s, **%s) on default',
  54.                              func, args, kwargs)
  55.                 kwargs['using'] = 'default'
  56.                 return func(*args, **kwargs)
  57.             logging.info('running %s(*%s, **%s) on %s',
  58.                          func, args, kwargs, using)
  59.             environ = os.environ.copy()
  60.             wrapped = settings._wrapped
  61.             os.environ['DJANGO_SETTINGS_MODULE'] = '%s.settings' % using
  62.             new_settings = settings.DATABASES.module(using)
  63.             new_settings.DATABASES = settings.DATABASES
  64.             settings._wrapped = new_settings
  65.             try:
  66.                 return func(*args, **kwargs)
  67.             finally:
  68.                 settings._wrapped = wrapped
  69.                 os.environ = environ
  70.         wrapper.func_name = func.func_name
  71.         wrapper.func_doc = func.func_doc
  72.         wrapper.func_defaults = func.func_defaults
  73.         return wrapper
  74. else:
  75.     def patch_settings(func):
  76.         def wrapper(*args, **kwargs):
  77.             using = kwargs.get('using', None)
  78.             if using == CELERY_USING:
  79.                 kwargs['using'] = 'default'
  80.             return func(*args, **kwargs)
  81.         wrapper.func_name = func.func_name
  82.         wrapper.func_doc = func.func_doc
  83.         wrapper.func_defaults = func.func_defaults
  84.         return wrapper
  85.  
  86. @task(ignore_result=True)
  87. @patch_settings
  88. def update_sources(using='default'):
  89.     feeds = Feed.objects.using(using).filter(status=Feed.ACTIVE,
  90.                                              auto_update=True)
  91.     for feed_pk in feeds.values_list('pk', flat=True):
  92.         feed_update.delay(feed_pk, using=using)
  93.  
  94.     searches = SavedSearch.objects.using(using).filter(auto_update=True)
  95.     for search_pk in searches.values_list('pk', flat=True):
  96.         search_update.delay(search_pk, using=using)
  97.  
  98. @task(ignore_result=True)
  99. @patch_settings
  100. def feed_update(feed_id, using='default'):
  101.     try:
  102.         feed = Feed.objects.using(using).get(pk=feed_id)
  103.     except Feed.DoesNotExist:
  104.         logging.warn('feed_update(%s, using=%r) could not find feed',
  105.                      feed_id, using)
  106.         return
  107.  
  108.     feed.update(using=using, clear_rejected=True)
  109.  
  110. @task(ignore_result=True)
  111. @patch_settings
  112. def search_update(search_id, using='default'):
  113.     try:
  114.         search = SavedSearch.objects.using(using).get(pk=search_id)
  115.     except SavedSearch.DoesNotExist:
  116.         logging.warn('search_update(%s, using=%r) could not find search',
  117.                      search_id, using)
  118.         return
  119.     search.update(using=using, clear_rejected=True)
  120.  
  121.  
  122. @task(ignore_result=True)
  123. @patch_settings
  124. def mark_import_pending(import_app_label, import_model, import_pk,
  125.                         using='default'):
  126.     """
  127.     Checks whether an import's first stage is complete.
  128.  
  129.     """
  130.     import_class = get_model(import_app_label, import_model)
  131.     try:
  132.         source_import = import_class._default_manager.using(using).get(
  133.                                                     pk=import_pk,
  134.                                                     status=import_class.STARTED)
  135.     except import_class.DoesNotExist:
  136.         return
  137.     source_import.last_activity = datetime.datetime.now()
  138.     if source_import.total_videos is None:
  139.         source_import.save()
  140.         return
  141.     # get the correct counts from the database, rather than the race-condition
  142.     # prone count fields
  143.     import_count = source_import.indexes.count()
  144.     skipped_count = source_import.errors.filter(is_skip=True).count()
  145.     if import_count != source_import.videos_imported:
  146.         source_import.videos_imported = import_count
  147.     if skipped_count != source_import.videos_skipped:
  148.         source_import.videos_skipped = skipped_count
  149.     if (source_import.videos_imported + source_import.videos_skipped
  150.         >= source_import.total_videos):
  151.         active_set = None
  152.         unapproved_set = source_import.get_videos(using).filter(
  153.             status=Video.PENDING)
  154.         if source_import.auto_approve:
  155.             if not SiteLocation.enforce_tiers(using=using):
  156.                 active_set = unapproved_set
  157.                 unapproved_set = None
  158.             else:
  159.                 remaining_videos = (Tier.get().videos_limit()
  160.                                     - Video.objects.using(using
  161.                                         ).filter(status=Video.ACTIVE
  162.                                         ).count())
  163.                 if remaining_videos > source_import.videos_imported:
  164.                     active_set = unapproved_set
  165.                     unapproved_set = None
  166.                 else:
  167.                     unapproved_set = unapproved_set.order_by('when_submitted')
  168.                     # only approve `remaining_videos` videos
  169.                     when_submitted = unapproved_set[
  170.                         remaining_videos].when_submitted
  171.                     active_set = unapproved_set.filter(
  172.                         when_submitted__lt=when_submitted)
  173.                     unapproved_set = unapproved_set.filter(
  174.                         when_submitted__gte=when_submitted)
  175.         if unapproved_set is not None:
  176.             unapproved_set.update(status=Video.UNAPPROVED)
  177.         if active_set is None:
  178.             source_import.status = import_class.COMPLETE
  179.         else:
  180.             source_import.status = import_class.PENDING
  181.             active_set.update(status=Video.ACTIVE)
  182.             opts = Video._meta
  183.             for pk in active_set.values_list('pk', flat=True):
  184.                 haystack_update_index.delay(opts.app_label, opts.module_name,
  185.                                             pk, is_removal=False,
  186.                                             using=using,
  187.                                             import_app_label=import_app_label,
  188.                                             import_model=import_model,
  189.                                             import_pk=import_pk)
  190.  
  191.     source_import.save()
  192.  
  193.  
  194. @task(ignore_result=True)
  195. @patch_settings
  196. def mark_import_complete(import_app_label, import_model, import_pk,
  197.                          using='default'):
  198.     """
  199.     Checks whether an import's second stage is complete.
  200.  
  201.     """
  202.     import_class = get_model(import_app_label, import_model)
  203.     try:
  204.         source_import = import_class._default_manager.using(using).get(
  205.                                                     pk=import_pk,
  206.                                                     status=import_class.PENDING)
  207.     except import_class.DoesNotExist:
  208.         return
  209.  
  210.     video_pks = list(source_import.get_videos(using).filter(
  211.                             status=Video.ACTIVE).values_list('pk', flat=True))
  212.     haystack_count = SearchQuerySet().models(Video).filter(
  213.                                                     pk__in=video_pks).count()
  214.     if haystack_count >= len(video_pks):
  215.         source_import.status = import_class.COMPLETE
  216.         if import_app_label == 'localtv' and import_model == 'feedimport':
  217.             source_import.source.status = source_import.source.ACTIVE
  218.             source_import.source.save()
  219.  
  220.     source_import.last_activity = datetime.datetime.now()
  221.     source_import.save()
  222.  
  223.  
  224. @task(ignore_result=True)
  225. @patch_settings
  226. def video_from_vidscraper_video(vidscraper_video, site_pk,
  227.                                 import_app_label=None, import_model=None,
  228.                                 import_pk=None, status=None, author_pks=None,
  229.                                 category_pks=None, clear_rejected=False,
  230.                                 using='default'):
  231.     if import_app_label is None or import_model is None or import_pk is None:
  232.         # XXX what is this for?
  233.         source_import = None
  234.     else:
  235.         import_class = get_model(import_app_label, import_model)
  236.         try:
  237.             source_import = import_class.objects.using(using).get(
  238.                pk=import_pk,
  239.                status=import_class.STARTED)
  240.         except import_class.DoesNotExist:
  241.             logging.debug('Skipping %r: expected import instance missing.',
  242.                           vidscraper_video.url)
  243.             return
  244.     try:
  245.         try:
  246.             vidscraper_video.load()
  247.         except Exception:
  248.             source_import.handle_error(
  249.                 ('Skipped %r: Could not load video data.'
  250.                  % vidscraper_video.url),
  251.                 using=using, is_skip=True,
  252.                 with_exception=True)
  253.             return
  254.  
  255.         if not vidscraper_video.title:
  256.             source_import.handle_error(
  257.                 ('Skipped %r: Failed to scrape basic data.'
  258.                  % vidscraper_video.url),
  259.                 is_skip=True, using=using)
  260.             return
  261.  
  262.         if ((vidscraper_video.file_url_expires or
  263.              not vidscraper_video.file_url)
  264.             and not vidscraper_video.embed_code):
  265.             source_import.handle_error(('Skipping %r: no file or embed code.'
  266.                                         % vidscraper_video.url),
  267.                                        is_skip=True, using=using)
  268.             return
  269.  
  270.         site_videos = Video.objects.using(using).filter(site=site_pk)
  271.  
  272.         if vidscraper_video.guid:
  273.             guid_videos = site_videos.filter(guid=vidscraper_video.guid)
  274.             if clear_rejected:
  275.                 guid_videos.rejected().delete()
  276.             if guid_videos.exists():
  277.                 source_import.handle_error(('Skipping %r: duplicate guid.'
  278.                                             % vidscraper_video.url),
  279.                                            is_skip=True, using=using)
  280.                 return
  281.  
  282.         if vidscraper_video.link:
  283.             videos_with_link = site_videos.filter(
  284.                 website_url=vidscraper_video.link)
  285.             if clear_rejected:
  286.                 videos_with_link.rejected().delete()
  287.             if videos_with_link.exists():
  288.                 source_import.handle_error(('Skipping %r: duplicate link.'
  289.                                             % vidscraper_video.url),
  290.                                            is_skip=True, using=using)
  291.                 return
  292.  
  293.         categories = Category.objects.using(using).filter(pk__in=category_pks)
  294.  
  295.         if author_pks:
  296.             authors = User.objects.using(using).filter(pk__in=author_pks)
  297.         else:
  298.             if vidscraper_video.user:
  299.                 name = vidscraper_video.user
  300.                 if ' ' in name:
  301.                     first, last = name.split(' ', 1)
  302.                 else:
  303.                     first, last = name, ''
  304.                 author, created = User.objects.db_manager(using).get_or_create(
  305.                     username=name[:30],
  306.                     defaults={'first_name': first[:30],
  307.                               'last_name': last[:30]})
  308.                 if created:
  309.                     author.set_unusable_password()
  310.                     author.save()
  311.                     utils.get_profile_model().objects.db_manager(using).create(
  312.                        user=author,
  313.                        website=vidscraper_video.user_url or '')
  314.                 authors = [author]
  315.             else:
  316.                 authors = []
  317.  
  318.         # Since we check above whether the vidscraper_video is valid, we don't
  319.         # catch InvalidVideo here, since it would be unexpected.
  320.         video = Video.from_vidscraper_video(vidscraper_video, status=status,
  321.                                             using=using,
  322.                                             source_import=source_import,
  323.                                             authors=authors,
  324.                                             categories=categories,
  325.                                             site_pk=site_pk)
  326.         logging.debug('Made video %i: %r', video.pk, video.name)
  327.         if video.thumbnail_url:
  328.             video_save_thumbnail.delay(video.pk, using=using)
  329.     except Exception:
  330.         source_import.handle_error(('Unknown error during import of %r'
  331.                                     % vidscraper_video.url),
  332.                                    is_skip=True, using=using,
  333.                                    with_exception=True)
  334.         raise # so it shows up in the Celery log
  335.  
  336. @task(ignore_result=True)
  337. @patch_settings
  338. def video_save_thumbnail(video_pk, using='default'):
  339.     try:
  340.         v = Video.objects.using(using).get(pk=video_pk)
  341.     except Video.DoesNotExist:
  342.         logging.warn(
  343.             'video_save_thumbnail(%s, using=%r) could not find video',
  344.             video_pk, using)
  345.         return
  346.     try:
  347.         v.save_thumbnail()
  348.     except CannotOpenImageUrl:
  349.         try:
  350.             return video_save_thumbnail.retry()
  351.         except MaxRetriesExceededError:
  352.             logging.warn(
  353.                 'video_save_thumbnail(%s, using=%r) exceeded max retries',
  354.                 video_pk, using
  355.             )
  356.        
  357.  
  358. @task(ignore_result=True,
  359.       max_retries=None)
  360. @patch_settings
  361. def haystack_update_index(app_label, model_name, pk, is_removal,
  362.                           import_app_label=None, import_model=None,
  363.                           import_pk=None, using='default', backoff=0):
  364.     """
  365.     Updates a haystack index for the given model (specified by ``app_label``
  366.     and ``model_name``). If ``is_removal`` is ``True``, a fake instance is
  367.     constructed with the given ``pk`` and passed to the index's
  368.     :meth:`remove_object` method. Otherwise, the latest version of the instance
  369.     is fetched from the database and passed to the index's
  370.     :meth:`update_object` method.
  371.  
  372.     If an import_app_label, import_model, and import_pk are provided, this task
  373.     will spawn ``mark_import_complete``.
  374.  
  375.     """
  376.     model_class = get_model(app_label, model_name)
  377.     search_index = site.get_index(model_class)
  378.     try:
  379.         if is_removal:
  380.             instance = model_class(pk=pk)
  381.             search_index.remove_object(instance)
  382.         else:
  383.             try:
  384.                 instance = search_index.read_queryset().using(using).get(pk=pk)
  385.             except model_class.DoesNotExist:
  386.                 pass
  387.             else:
  388.                 search_index.update_object(instance)
  389.     except DatabaseLockError:
  390.         backoff += 1
  391.         countdown = random.random() * (2 ** backoff - 1)
  392.         haystack_update_index.retry(
  393.             args=(app_label, model_name, pk, is_removal),
  394.             kwargs={'using': using, 'backoff': backoff},
  395.             countdown=countdown)
  396.     else:
  397.         if (import_app_label is not None and import_model is not None and
  398.             import_pk is not None):
  399.             mark_import_complete.delay(import_app_label, import_model,
  400.                                        import_pk, using)