Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # file: core/management/commands/bench_po.py
- from timeit import default_timer as timer
- from memory_profiler import profile
- from django.core.management.base import BaseCommand
- from core.models import CoreData
- class Command(BaseCommand):
- def add_arguments(self, parser):
- parser.add_argument('batch_size', type=int)
- @profile
- def iterate_all(self):
- qs = CoreData.objects.all()
- counter = 0
- for counter, o in enumerate(qs.iterator(chunk_size=self.batch_size)):
- o
- print(f'Found {counter}')
- @profile
- def get_all_batch(self):
- qs = CoreData.objects.all()
- counter = 0
- for counter, o in enumerate(qs):
- o
- print(f'Found {counter}')
- @profile
- def get_chanked_batch(self):
- qs = CoreData.objects.all()
- counter = 0
- def batch_qs(qs, batch_size=1000):
- total = qs.count()
- for start in range(0, total, batch_size):
- end = min(start + batch_size, total)
- yield qs[start:end]
- for batch in batch_qs(qs, batch_size=self.batch_size):
- for document in batch:
- document
- counter += 1
- print(f'Found {counter}')
- def handle(self, *args, **options):
- self.batch_size = options['batch_size']
- start = timer()
- self.iterate_all()
- print(timer() - start)
- # self.get_all_batch()
- # self.get_chanked_batch()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement