- # This file is part of Miro Community.
- # Copyright (C) 2010 Participatory Culture Foundation
- #
- # Miro Community is free software: you can redistribute it and/or modify it
- # under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or (at your
- # option) any later version.
- #
- # Miro Community is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with Miro Community. If not, see <http://www.gnu.org/licenses/>.
- import datetime
- import os
- import logging
- try:
- from xapian import DatabaseLockError
- except ImportError:
- class DatabaseLockError(Exception):
- """
- Dummy exception; nothing raises me.
- """
- else:
- import random # don't need this otherwise
- from celery.exceptions import MaxRetriesExceededError
- from celery.task import task
- from django.conf import settings
- from django.db.models.loading import get_model
- from django.contrib.auth.models import User
- from haystack import site
- from haystack.query import SearchQuerySet
- from localtv import utils
- from localtv.exceptions import CannotOpenImageUrl
- from localtv.models import Video, Feed, SiteLocation, SavedSearch, Category
- from localtv.tiers import Tier
- CELERY_USING = getattr(settings, 'LOCALTV_CELERY_USING', 'default')
- if hasattr(settings.DATABASES, 'module'):
- def patch_settings(func):
- def wrapper(*args, **kwargs):
- using = kwargs.get('using', None)
- if using in (None, 'default', CELERY_USING):
- logging.info('running %s(*%s, **%s) on default',
- func, args, kwargs)
- kwargs['using'] = 'default'
- return func(*args, **kwargs)
- logging.info('running %s(*%s, **%s) on %s',
- func, args, kwargs, using)
- environ = os.environ.copy()
- wrapped = settings._wrapped
- os.environ['DJANGO_SETTINGS_MODULE'] = '%s.settings' % using
- new_settings = settings.DATABASES.module(using)
- new_settings.DATABASES = settings.DATABASES
- settings._wrapped = new_settings
- try:
- return func(*args, **kwargs)
- finally:
- settings._wrapped = wrapped
- os.environ = environ
- wrapper.func_name = func.func_name
- wrapper.func_doc = func.func_doc
- wrapper.func_defaults = func.func_defaults
- return wrapper
- else:
- def patch_settings(func):
- def wrapper(*args, **kwargs):
- using = kwargs.get('using', None)
- if using == CELERY_USING:
- kwargs['using'] = 'default'
- return func(*args, **kwargs)
- wrapper.func_name = func.func_name
- wrapper.func_doc = func.func_doc
- wrapper.func_defaults = func.func_defaults
- return wrapper
- @task(ignore_result=True)
- @patch_settings
- def update_sources(using='default'):
- feeds = Feed.objects.using(using).filter(status=Feed.ACTIVE,
- auto_update=True)
- for feed_pk in feeds.values_list('pk', flat=True):
- feed_update.delay(feed_pk, using=using)
- searches = SavedSearch.objects.using(using).filter(auto_update=True)
- for search_pk in searches.values_list('pk', flat=True):
- search_update.delay(search_pk, using=using)
- @task(ignore_result=True)
- @patch_settings
- def feed_update(feed_id, using='default'):
- try:
- feed = Feed.objects.using(using).get(pk=feed_id)
- except Feed.DoesNotExist:
- logging.warn('feed_update(%s, using=%r) could not find feed',
- feed_id, using)
- return
- feed.update(using=using, clear_rejected=True)
- @task(ignore_result=True)
- @patch_settings
- def search_update(search_id, using='default'):
- try:
- search = SavedSearch.objects.using(using).get(pk=search_id)
- except SavedSearch.DoesNotExist:
- logging.warn('search_update(%s, using=%r) could not find search',
- search_id, using)
- return
- search.update(using=using, clear_rejected=True)
- @task(ignore_result=True)
- @patch_settings
- def mark_import_pending(import_app_label, import_model, import_pk,
- using='default'):
- """
- Checks whether an import's first stage is complete.
- """
- import_class = get_model(import_app_label, import_model)
- try:
- source_import = import_class._default_manager.using(using).get(
- pk=import_pk,
- status=import_class.STARTED)
- except import_class.DoesNotExist:
- return
- source_import.last_activity = datetime.datetime.now()
- if source_import.total_videos is None:
- source_import.save()
- return
- # get the correct counts from the database, rather than the race-condition
- # prone count fields
- import_count = source_import.indexes.count()
- skipped_count = source_import.errors.filter(is_skip=True).count()
- if import_count != source_import.videos_imported:
- source_import.videos_imported = import_count
- if skipped_count != source_import.videos_skipped:
- source_import.videos_skipped = skipped_count
- if (source_import.videos_imported + source_import.videos_skipped
- >= source_import.total_videos):
- active_set = None
- unapproved_set = source_import.get_videos(using).filter(
- status=Video.PENDING)
- if source_import.auto_approve:
- if not SiteLocation.enforce_tiers(using=using):
- active_set = unapproved_set
- unapproved_set = None
- else:
- remaining_videos = (Tier.get().videos_limit()
- - Video.objects.using(using
- ).filter(status=Video.ACTIVE
- ).count())
- if remaining_videos > source_import.videos_imported:
- active_set = unapproved_set
- unapproved_set = None
- else:
- unapproved_set = unapproved_set.order_by('when_submitted')
- # only approve `remaining_videos` videos
- when_submitted = unapproved_set[
- remaining_videos].when_submitted
- active_set = unapproved_set.filter(
- when_submitted__lt=when_submitted)
- unapproved_set = unapproved_set.filter(
- when_submitted__gte=when_submitted)
- if unapproved_set is not None:
- unapproved_set.update(status=Video.UNAPPROVED)
- if active_set is None:
- source_import.status = import_class.COMPLETE
- else:
- source_import.status = import_class.PENDING
- active_set.update(status=Video.ACTIVE)
- opts = Video._meta
- for pk in active_set.values_list('pk', flat=True):
- haystack_update_index.delay(opts.app_label, opts.module_name,
- pk, is_removal=False,
- using=using,
- import_app_label=import_app_label,
- import_model=import_model,
- import_pk=import_pk)
- source_import.save()
- @task(ignore_result=True)
- @patch_settings
- def mark_import_complete(import_app_label, import_model, import_pk,
- using='default'):
- """
- Checks whether an import's second stage is complete.
- """
- import_class = get_model(import_app_label, import_model)
- try:
- source_import = import_class._default_manager.using(using).get(
- pk=import_pk,
- status=import_class.PENDING)
- except import_class.DoesNotExist:
- return
- video_pks = list(source_import.get_videos(using).filter(
- status=Video.ACTIVE).values_list('pk', flat=True))
- haystack_count = SearchQuerySet().models(Video).filter(
- pk__in=video_pks).count()
- if haystack_count >= len(video_pks):
- source_import.status = import_class.COMPLETE
- if import_app_label == 'localtv' and import_model == 'feedimport':
- source_import.source.status = source_import.source.ACTIVE
- source_import.source.save()
- source_import.last_activity = datetime.datetime.now()
- source_import.save()
- @task(ignore_result=True)
- @patch_settings
- def video_from_vidscraper_video(vidscraper_video, site_pk,
- import_app_label=None, import_model=None,
- import_pk=None, status=None, author_pks=None,
- category_pks=None, clear_rejected=False,
- using='default'):
- if import_app_label is None or import_model is None or import_pk is None:
- # XXX what is this for?
- source_import = None
- else:
- import_class = get_model(import_app_label, import_model)
- try:
- source_import = import_class.objects.using(using).get(
- pk=import_pk,
- status=import_class.STARTED)
- except import_class.DoesNotExist:
- logging.debug('Skipping %r: expected import instance missing.',
- vidscraper_video.url)
- return
- try:
- try:
- vidscraper_video.load()
- except Exception:
- source_import.handle_error(
- ('Skipped %r: Could not load video data.'
- % vidscraper_video.url),
- using=using, is_skip=True,
- with_exception=True)
- return
- if not vidscraper_video.title:
- source_import.handle_error(
- ('Skipped %r: Failed to scrape basic data.'
- % vidscraper_video.url),
- is_skip=True, using=using)
- return
- if ((vidscraper_video.file_url_expires or
- not vidscraper_video.file_url)
- and not vidscraper_video.embed_code):
- source_import.handle_error(('Skipping %r: no file or embed code.'
- % vidscraper_video.url),
- is_skip=True, using=using)
- return
- site_videos = Video.objects.using(using).filter(site=site_pk)
- if vidscraper_video.guid:
- guid_videos = site_videos.filter(guid=vidscraper_video.guid)
- if clear_rejected:
- guid_videos.rejected().delete()
- if guid_videos.exists():
- source_import.handle_error(('Skipping %r: duplicate guid.'
- % vidscraper_video.url),
- is_skip=True, using=using)
- return
- if vidscraper_video.link:
- videos_with_link = site_videos.filter(
- website_url=vidscraper_video.link)
- if clear_rejected:
- videos_with_link.rejected().delete()
- if videos_with_link.exists():
- source_import.handle_error(('Skipping %r: duplicate link.'
- % vidscraper_video.url),
- is_skip=True, using=using)
- return
- categories = Category.objects.using(using).filter(pk__in=category_pks)
- if author_pks:
- authors = User.objects.using(using).filter(pk__in=author_pks)
- else:
- if vidscraper_video.user:
- name = vidscraper_video.user
- if ' ' in name:
- first, last = name.split(' ', 1)
- else:
- first, last = name, ''
- author, created = User.objects.db_manager(using).get_or_create(
- username=name[:30],
- defaults={'first_name': first[:30],
- 'last_name': last[:30]})
- if created:
- author.set_unusable_password()
- author.save()
- utils.get_profile_model().objects.db_manager(using).create(
- user=author,
- website=vidscraper_video.user_url or '')
- authors = [author]
- else:
- authors = []
- # Since we check above whether the vidscraper_video is valid, we don't
- # catch InvalidVideo here, since it would be unexpected.
- video = Video.from_vidscraper_video(vidscraper_video, status=status,
- using=using,
- source_import=source_import,
- authors=authors,
- categories=categories,
- site_pk=site_pk)
- logging.debug('Made video %i: %r', video.pk, video.name)
- if video.thumbnail_url:
- video_save_thumbnail.delay(video.pk, using=using)
- except Exception:
- source_import.handle_error(('Unknown error during import of %r'
- % vidscraper_video.url),
- is_skip=True, using=using,
- with_exception=True)
- raise # so it shows up in the Celery log
- @task(ignore_result=True)
- @patch_settings
- def video_save_thumbnail(video_pk, using='default'):
- try:
- v = Video.objects.using(using).get(pk=video_pk)
- except Video.DoesNotExist:
- logging.warn(
- 'video_save_thumbnail(%s, using=%r) could not find video',
- video_pk, using)
- return
- try:
- v.save_thumbnail()
- except CannotOpenImageUrl:
- try:
- return video_save_thumbnail.retry()
- except MaxRetriesExceededError:
- logging.warn(
- 'video_save_thumbnail(%s, using=%r) exceeded max retries',
- video_pk, using
- )
- @task(ignore_result=True,
- max_retries=None)
- @patch_settings
- def haystack_update_index(app_label, model_name, pk, is_removal,
- import_app_label=None, import_model=None,
- import_pk=None, using='default', backoff=0):
- """
- Updates a haystack index for the given model (specified by ``app_label``
- and ``model_name``). If ``is_removal`` is ``True``, a fake instance is
- constructed with the given ``pk`` and passed to the index's
- :meth:`remove_object` method. Otherwise, the latest version of the instance
- is fetched from the database and passed to the index's
- :meth:`update_object` method.
- If an import_app_label, import_model, and import_pk are provided, this task
- will spawn ``mark_import_complete``.
- """
- model_class = get_model(app_label, model_name)
- search_index = site.get_index(model_class)
- try:
- if is_removal:
- instance = model_class(pk=pk)
- search_index.remove_object(instance)
- else:
- try:
- instance = search_index.read_queryset().using(using).get(pk=pk)
- except model_class.DoesNotExist:
- pass
- else:
- search_index.update_object(instance)
- except DatabaseLockError:
- backoff += 1
- countdown = random.random() * (2 ** backoff - 1)
- haystack_update_index.retry(
- args=(app_label, model_name, pk, is_removal),
- kwargs={'using': using, 'backoff': backoff},
- countdown=countdown)
- else:
- if (import_app_label is not None and import_model is not None and
- import_pk is not None):
- mark_import_complete.delay(import_app_label, import_model,
- import_pk, using)