Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class CustomerCsvFile(models.Model):
- source = models.CharField(max_length=100, blank=True)
- csv_file = models.FileField(upload_to='csvimport/customer/', max_length=255)
- uploaded_by = models.ForeignKey(UserProfile)
- date_uploaded = models.DateField(auto_now_add=True)
- date_imported = models.DateField(blank=True, null=True)
- field_order = models.TextField(blank=True)
- has_headings = models.BooleanField(default=True, verbose_name='First line is headings')
- reimport_filename = models.CharField(max_length=255, blank=True)
- parent = models.ForeignKey('self', blank=True, null=True)
- celery_task_id = models.CharField(max_length=255, blank=True)
- def __unicode__(self):
- return self.filename
- def _get_filename(self):
- return Path(self.csv_file.path).name
- filename = property(_get_filename)
- def _get_first_line(self):
- if Path(self.csv_file.path).exists():
- with open(self.csv_file.path, 'rb') as f:
- r = csv.reader(f)
- if self.has_headings:
- # skip the first line if it is headings
- r.next()
- first_line = r.next()
- f.close()
- return first_line
- else:
- return False
- first_line = property(_get_first_line)
- def _file_exists(self):
- return Path(self.csv_file.path).exists()
- file_exists = property(_file_exists)
- def get_lines(self, num_lines):
- if Path(self.csv_file.path).exists():
- #csv_file = self.csv_file_object
- rows = []
- r = csv.reader(self.csv_file_object)
- if self.has_headings:
- # skip the first line if it is headings
- r.next()
- # We're going to work under the assumption that the first empty row
- # indicates the end of the file and the loop should therefore end
- for i in range(num_lines):
- try:
- row = r.next()
- except StopIteration:
- break
- if not any(row):
- break
- rows.append(row)
- self.csv_file_object.close()
- return rows
- return False
- def _get_file(self):
- if Path(self.csv_file.path).exists():
- f = open(self.csv_file.path, 'rb')
- return f
- return False
- csv_file_object = property(_get_file)
- def _get_best_row(self):
- '''
- Grab the row which has the most columns completed to allow for the most
- comprehensive initial data assignation
- '''
- r = csv.reader(self.csv_file_object)
- if self.has_headings:
- # skip the first line if it is headings
- r.next()
- current_counter = 0
- biggest_row = []
- for row in r:
- counter = 0
- # Check that the line isn't empty
- if any(row):
- '''
- For some reason some of the CSV exports have "\N" as a field
- value. Don't count those fields, or the fields which are empty,
- in the count
- '''
- for value in row:
- if value.replace('\N', '').strip() != '':
- counter += 1
- if (counter > current_counter):
- current_counter = counter
- biggest_row = row
- return biggest_row
- best_row = property(_get_best_row)
- def _get_row_count(self):
- r = csv.reader(self.csv_file_object)
- if self.has_headings:
- # skip the first line if it is headings
- r.next()
- counter = 0
- for row in r:
- if any(row):
- counter += 1
- self.csv_file_object.seek(0)
- return counter
- row_count = property(_get_row_count)
- def _get_csv_fields(self):
- if self.field_order != '':
- field_list = json.loads(self.field_order)
- field_dict = {}
- for counter, item in enumerate(field_list):
- if (item['field_name'] != 'unknown' and
- item['field_name'].replace('\N', '').strip() is not ''):
- field_dict[item['field_name']] = counter
- return field_dict
- return {}
- csv_fields = property(_get_csv_fields)
- def _get_parent_field_order(self):
- if self.parent is not None:
- return self.parent.field_order
- else:
- return False
- parent_field_order = property(_get_parent_field_order)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement