Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import base64
- import re
- import time
- from datetime import datetime, timedelta
- from json import JSONDecodeError
- import requests
- from django.conf import settings
- from apps.core.utils.connections_errors_handlers import catch_timeout_error, \
- catch_server_errors, raise_for_status_server_error
- from apps.core.utils.filtered_time_entities import filtered_time_entities
- from apps.core.utils.timekeeper_api_integration import \
- get_time_point_from_timekeeper
- def get_data_from_toggl_api(access_key, user, timezone):
- """
- This method helps us to get data from Toggl API and render it into
- required format
- """
- headers, payload = toggl_authorization(access_key)
- # Get start date from TimeKeeper server
- tk_error_message, start_date = get_time_point_from_timekeeper(
- user)
- # Set end date to current time
- end_date = datetime.now().astimezone(timezone) \
- .isoformat().replace(':', '%3A') \
- .replace('+', '%2B')
- status_toggl = 0
- toggl_error_message = None
- if start_date is not None:
- if isinstance(start_date, int):
- start_date = datetime.fromtimestamp(start_date) \
- .astimezone(timezone).isoformat()
- start = start_date.replace(':', '%3A').replace('+', '%2B')
- url = '{}?start_date={}&end_date={}'.format(settings.TOGGL_API_URL,
- start,
- end_date)
- else:
- url = '{}?end_date={}'.format(settings.TOGGL_API_URL, end_date)
- # Retrieve all information about time entities from Toggl API related
- # to current user in JSON format
- request_url = url.split('time_entries')
- try:
- settings.LOGGER.info('Send "GET" request to Toggle API')
- request_get = requests.request("GET", url,
- headers=headers, data=payload,
- verify=False)
- status_toggl = raise_for_status_server_error(request_get)
- response = request_get.json()
- settings.LOGGER.info('Get data from Toggle API. Status code {}'.format(
- request_get.status_code))
- settings.LOGGER.debug(
- 'Request get data from "{}" / "time_entries{}" endpoint /'
- ' status code "{}". Response: {}'.format(request_url[0],
- request_url[1],
- request_get.status_code,
- response))
- except JSONDecodeError as json_error:
- response = []
- settings.LOGGER.debug(
- 'Request get data from "{}" / "time_entries{}" endpoint.'
- ' Toggl token incorrectly specified. JSONDecodeError: '
- '{}'.format(request_url[0], request_url[1], json_error))
- settings.LOGGER.error(
- 'No data received from Toggle API. Due to Toggl token '
- 'incorrectly specified. '
- 'JSONDecodeError: {}'.format(json_error))
- except requests.exceptions.Timeout:
- response = []
- toggl_error_message = catch_timeout_error('time_entries',
- request_url, "Toggl")
- except requests.exceptions.HTTPError:
- response = []
- toggl_error_message = catch_server_errors(
- 'time_entries',
- request_url,
- status_toggl, "Toggl")
- # time_entities variables to store modified data from JSON
- time_entities = []
- time_entities_without_id = []
- time_entities_no_id_at_the_end = []
- for item in response:
- # Parameters for storing information about structure of description, and
- # correctness and availability id task id an the end of description
- wrong_sequence = False
- no_task_id_at_the_end = False
- task_id_as_digit_at_end = False
- different_task_ids_provided = False
- # Retrieve date and time from time entity in special data format
- updated_at = datetime.strptime(
- item["at"],
- "%Y-%m-%dT%H:%M:%S%z").astimezone(timezone).strftime(
- "%Y-%m-%d %H:%M")
- started_at = datetime.strptime(
- item["start"],
- "%Y-%m-%dT%H:%M:%S%z").astimezone(timezone).strftime(
- "%H:%M")
- started_at_day = datetime.strptime(
- item["start"],
- "%Y-%m-%dT%H:%M:%S%z").astimezone(timezone).strftime(
- "%Y-%m-%d %H:%M")
- # Verification if time entity has stop parameter
- if "stop" in item:
- stopped_at = datetime.strptime(
- item["stop"],
- "%Y-%m-%dT%H:%M:%S%z").astimezone(timezone).strftime(
- "%H:%M")
- else:
- stopped_at = ''
- # Verification if time entity has positive duration parameter
- item_duration = item['duration']
- if item_duration > 0:
- duration, duration_minutes = _rounding_seconds_into_minutes(
- item_duration)
- # Verification if time entity long more than one day
- days = timedelta(seconds=item_duration).days
- if days != 0:
- # convert days into hours
- hours = int(duration.split(':')[0]) + days * 24
- minutes = duration.split(':')[1]
- # regenerate duration representation
- duration = "{}:{}".format(hours, minutes)
- else:
- duration = ''
- duration_minutes = 0
- # Verification if description is provided
- try:
- # Parse needed information from description field by using regex
- # and search method
- item_description = (item['description']).replace('“', '"').replace(
- '”', '"').strip()
- keywords = ["ACT=", "COMMENT=", "URL=", "ID="]
- wrong_sequence = _is_wrong_sequence_param(item_description,
- keywords,
- wrong_sequence)
- # Search keywords in description by using regex
- keywords_in_description = re.search(
- '^[a-zA-z0-9](.*?)(ACT|COMMENT|URL|ID)=',
- item_description, re.M | re.I)
- # This part of program will execute if keyword found in description
- if keywords_in_description:
- if re.search('^(([a-zA-z]*[0-9]*)[-]([0-9]*)) ',
- item_description,
- re.M | re.I):
- description = keywords_in_description.group(0)
- else:
- description = keywords_in_description.group(0)
- if re.search('^([A-Z]{3,})', description):
- description = ''
- wrong_sequence = True
- for key in keywords:
- occurrence_count = description.count(key)
- replace_keyword = "{}".format(key)
- if occurrence_count >= 2:
- repeat_replace(description, replace_keyword, '',
- (occurrence_count - 1))
- else:
- description = description.replace(replace_keyword, '')
- task_id_from_search = search_field(item_description, 'ID=')
- task_id_as_digit_at_end = _is_id_digit_at_the_end(
- task_id_as_digit_at_end, task_id_from_search)
- no_task_id_at_the_end = is_id_at_the_end(
- task_id_from_search)
- # Search task id regex will find ID keyword
- # at the end of string
- search_task_id_regex = re.search(
- '(^[a-zA-z]*[0-9]*)(?<!^)[-]([0-9]*)$',
- task_id_from_search, re.M | re.I)
- if task_id_from_search != '' and search_task_id_regex:
- task_id_at_the_end = search_task_id_regex.group(0)
- else:
- task_id_at_the_end = ''
- activity = search_field(item_description, 'ACT=')
- url = search_field(item_description, 'URL=')
- comment = search_field(item_description, 'COMMENT=')
- else:
- description = item_description
- activity, url, comment, task_id_at_the_end = '', '', '', ''
- no_task_id_at_the_end = True
- settings.LOGGER.warning(
- 'Unrecognizable task id when no keywords are provided. '
- 'Description "{}"'.format(
- item_description))
- # By using this regex we parse task id from the following types of
- # strings: BAI-4 To do smth ACT="Documentation"...
- # - BAI-4 - ID of task
- # - To do smth - description of the task
- # - ACT="Documentation" - option of the task
- if re.search('^(([a-zA-z]*[0-9]*)[-]([0-9]*)) ', description,
- re.M | re.I):
- task_id = re.search(
- '^(([a-zA-z]*[0-9]*)[-]([0-9]*)) ',
- description, re.M | re.I).group(0).strip()
- # Match special symbols in fist part of description after
- # task id. This verification used for define if description
- # added to time entity. Regex match all symbols except equal
- # symbol
- if not re.search(
- '^(([a-zA-z]*[0-9]*)[-]([0-9]*)) [^=]* ',
- description,
- re.M | re.I):
- wrong_sequence = True
- else:
- task_id = ''
- settings.LOGGER.warning(
- 'Unrecognizable task id. Description "{}"'.format(
- item_description))
- if task_id_at_the_end != task_id and not no_task_id_at_the_end \
- and task_id != '':
- different_task_ids_provided = True
- wrong_sequence = False
- else:
- different_task_ids_provided = False
- except KeyError:
- description, task_id, activity, url, comment, task_id_at_the_end = '', '', '', '', '', ''
- wrong_sequence = True
- try:
- stop = item['stop']
- except KeyError:
- stop = ''
- try:
- tags = item['tags']
- except KeyError:
- tags = []
- try:
- tid = item['tid']
- except KeyError:
- tid = 0
- try:
- pid = item['pid']
- except KeyError:
- pid = 0
- # Creating dictionary with retrieved information
- dictionary = {
- 'id': item['id'],
- 'started_at': started_at,
- 'stopped_at': stopped_at,
- 'updated_at': updated_at,
- 'duration': duration,
- 'duration_minutes': duration_minutes,
- 'description': description,
- 'activity': activity,
- 'url': url,
- 'comment': comment,
- 'task_id': task_id,
- 'start': item['start'],
- 'stop': stop,
- 'duration_seconds': item_duration,
- 'tags': tags,
- 'wid': item['wid'],
- 'pid': pid,
- 'tid': tid,
- 'billable': item['billable'],
- 'at': item['at'],
- 'started_at_day': started_at_day,
- 'task_id_at_the_end': task_id_at_the_end,
- 'wrong_sequence': wrong_sequence,
- 'no_task_id_at_the_end': no_task_id_at_the_end,
- 'task_id_as_digit_at_end': task_id_as_digit_at_end,
- 'different_task_ids_provided': different_task_ids_provided
- }
- if task_id == '':
- time_entities_without_id.append(dictionary)
- if task_id_at_the_end == '':
- time_entities_no_id_at_the_end.append(dictionary)
- # Appending time entities with new information
- time_entities.append(dictionary)
- time_entities_filtered = filtered_time_entities(time_entities,
- 'started_at_day')
- defining_gaps_and_overlaps(time_entities, timezone, user)
- defining_gap_for_last_time_entry(time_entities_filtered, user)
- for dates, values in time_entities_filtered.items():
- for time_entity in values:
- if 'overlapped' not in time_entity:
- time_entity['overlapped'] = False
- if 'has_gap' not in time_entity:
- time_entity['has_gap'] = False
- if 'gapped' not in time_entity:
- time_entity['gapped'] = False
- if 'gap_eod' not in time_entity:
- time_entity['gap_eod'] = False
- if 'has_overlap' not in time_entity:
- time_entity['has_overlap'] = False
- return toggl_error_message, tk_error_message, \
- time_entities, time_entities_filtered, \
- time_entities_without_id, time_entities_no_id_at_the_end
- def is_id_at_the_end(task_id_from_search):
- """
- This function uses for verification if no task id at the
- end
- :param task_id_from_search: task id searched by using regex
- end contains only digits
- :type task_id_from_search: str
- :return task_id_from_search == '': True if task id is not present in
- description at the end
- """
- return task_id_from_search == ''
- def _is_id_digit_at_the_end(task_id_as_digit_at_end,
- task_id_from_search):
- """
- This function uses for searching and verification if task id at the
- end contains only digits
- :param task_id_from_search: task id searched by using regex
- :param task_id_as_digit_at_end: True or False related if task id at the
- end contains only digits
- :type task_id_from_search: str
- :type task_id_as_digit_at_end: bool
- :return task_id_as_digit_at_end: True if task id contains only digits
- """
- task_id_as_digit = re.search(
- '^[0-9]*',
- task_id_from_search, re.M | re.I)
- if task_id_as_digit:
- if task_id_as_digit.group(0) != '':
- task_id_as_digit_at_end = True
- else:
- task_id_as_digit_at_end = False
- return task_id_as_digit_at_end
- def _is_wrong_sequence_param(item_description, keywords, wrong_sequence):
- """
- This function uses for defining structure of provided description. If
- description sequence is correct this function return False else return True
- :param item_description: time entry description
- :param keywords: ACT, COMMENT, URL, ID keywords
- :param wrong_sequence: True or False related if the position of keywords
- is correct or incorrect
- :type item_description: str
- :type keywords: list
- :type wrong_sequence: bool
- :return wrong_sequence
- """
- for key in range(len(keywords) - 1):
- try:
- position_first_element = item_description.find(
- keywords[key])
- position_next_element = item_description.find(
- keywords[key + 1])
- if keywords[key] != 'COMMENT=' or keywords[key] != 'ACT=' or \
- keywords[key] != 'URL=':
- if position_first_element == -1 and keywords[key] != 'COMMENT=' \
- and keywords[key] != 'ACT=' and \
- keywords[key] != 'URL=':
- wrong_sequence = True
- break
- elif position_next_element < position_first_element and position_next_element != -1:
- wrong_sequence = True
- break
- except IndexError:
- wrong_sequence = True
- return wrong_sequence
- def _rounding_seconds_into_minutes(item_duration_seconds):
- """
- This method helps to convert and round duration.
- Rounding of seconds:
- - if duration of time entry is 0:00:11 or 0:00:34 or 0:00:47 =>
- round to 0:01:00 (one minute)
- - if duration of time entry is 0:01:29 => round to 0:01:00
- - if duration of time entry is 0:01:31 => round to 0:02:00
- :param item_duration_seconds: get item_duration in seconds
- :return: rounded duration and duration_minutes
- :type item_duration_seconds: int
- :type duration: str
- :type duration_minutes: float
- """
- # Convert duration into list
- convert_duration_into_list_of_str = "{}".format(str(
- timedelta(seconds=item_duration_seconds))).split(':')
- # Get separate hours, minutes, seconds
- hours = int(convert_duration_into_list_of_str[0])
- minutes = int(convert_duration_into_list_of_str[1])
- seconds = int(convert_duration_into_list_of_str[2])
- # Convert duration into minutes
- duration_minutes = round(item_duration_seconds / 60, 0)
- # Verification for rounding seconds into minutes
- if hours == 0 and minutes == 0 and seconds > 0:
- duration = "00:01"
- duration_minutes = 1
- elif minutes >= 0 and seconds >= 30:
- duration = "{}:{}".format(
- time.strftime('%H', time.gmtime(item_duration_seconds)),
- str(minutes + 1).zfill(2))
- else:
- duration = time.strftime(
- '%H:%M',
- time.gmtime(item_duration_seconds))
- return duration, duration_minutes
- def toggl_authorization(access_key):
- payload = {}
- user_password = '{}:{}'.format(access_key, settings.TOGGL_USER_PASSWORD)
- user_password_encoded = base64.b64encode(user_password.encode("utf-8"))
- headers = {
- 'Authorization': 'Basic {}'.format(
- user_password_encoded.decode("utf-8"))
- }
- return headers, payload
- def defining_gaps_and_overlaps(time_entities, timezone, user):
- """
- In this loop we define the time between ending and beginning of time
- entity. We take end time of current time entity and subtract from ending of
- next time entity. Results we store in time entities dictionary.
- Also we store the result representation, gap start and gap stop
- information which we will use in our template
- """
- hour_from = user.working_hour_from.isoformat()
- hour_till = user.working_hour_till.isoformat()
- for idx in range(len(time_entities) - 1):
- try:
- current_stop = datetime.strptime(
- time_entities[idx]['stop'],
- "%Y-%m-%dT%H:%M:%S%z").astimezone(timezone).replace(
- tzinfo=None).isoformat()
- future_stop = datetime.strptime(
- time_entities[idx + 1]['stop'],
- "%Y-%m-%dT%H:%M:%S%z").astimezone(timezone).replace(
- tzinfo=None).isoformat()
- future_start = datetime.strptime(
- time_entities[idx + 1]['start'],
- "%Y-%m-%dT%H:%M:%S%z").astimezone(timezone).replace(
- tzinfo=None).isoformat()
- current_stop_slit = current_stop.split('T')
- future_stop_split = future_stop.split('T')
- future_start_split = future_start.split('T')
- if current_stop_slit[0] == future_stop_split[0]:
- if time_entities[idx]['stopped_at'] != '':
- stopped_at_task = time.mktime(
- datetime.strptime(time_entities[idx]['stopped_at'],
- '%H:%M').timetuple())
- started_at_task = time.mktime(
- datetime.strptime(
- time_entities[idx + 1]['started_at'],
- '%H:%M').timetuple())
- result = stopped_at_task - started_at_task
- time_entities[idx]['result'] = result
- if hour_from <= current_stop_slit[1] <= hour_till and \
- hour_from <= future_start_split[1] <= hour_till:
- time_entities[idx]['gap_start'] = \
- time_entities[idx]['stopped_at']
- time_entities[idx]['gap_stop'] = \
- time_entities[idx + 1]['started_at']
- else:
- if result < 0:
- time_entities[idx]['result'] = 0
- if current_stop < future_stop:
- time_entities[idx][
- 'result_representation'] = time.strftime(
- '%H:%M', time.gmtime(abs(result)))
- time_entities[idx]['overlap_start'] = \
- time_entities[idx + 1]['started_at']
- time_entities[idx]['overlap_stop'] = \
- time_entities[idx]['stopped_at']
- set_overlap_or_gap_param_for_time_entry(
- idx,
- time_entities)
- else:
- time_entities[idx]['result_representation'] = \
- time_entities[idx + 1]['duration']
- time_entities[idx]['overlap_start'] = \
- time_entities[idx + 1]['started_at']
- time_entities[idx]['overlap_stop'] = \
- time_entities[idx + 1]['stopped_at']
- set_overlap_or_gap_param_for_time_entry(
- idx,
- time_entities)
- except ValueError:
- time_entities[idx]['result'] = ''
- def defining_gap_for_last_time_entry(time_entities_filtered, user):
- """
- In this loop, we define the time between the ending of time entity and
- hour when working day ends. We loop by filtered time entities in reverse
- order and define stop time point. If we have returned only one
- time entity we took time point from that time entity
- """
- hour_from = user.working_hour_from.isoformat()
- hour_till = user.working_hour_till.isoformat()
- hour_till_timestamp = time.mktime(
- datetime.strptime(hour_till,
- '%H:%M:%S').timetuple())
- for date, time_entities in time_entities_filtered.items():
- if len(time_entities) > 1:
- for idx in reversed(range(len(time_entities) - 1)):
- try:
- if hour_from <= time_entities[idx][
- 'stopped_at'] <= hour_till \
- and hour_from <= time_entities[idx][
- 'started_at'] <= hour_till:
- if time_entities[idx]['stop'].split('T')[1] < \
- time_entities[idx + 1]['stop'].split('T')[1]:
- _gap_eod_calculation(hour_till_timestamp,
- time_entities[idx + 1],
- hour_till)
- break
- elif time_entities[idx]['stop'].split('T')[1] > \
- time_entities[idx + 1]['stop'].split('T')[1]:
- _gap_eod_calculation(hour_till_timestamp,
- time_entities[idx], hour_till)
- break
- except IndexError:
- pass
- else:
- if hour_from <= time_entities[0]['stopped_at'] <= hour_till:
- _gap_eod_calculation(hour_till_timestamp,
- time_entities[0], hour_till)
- def _gap_eod_calculation(hour_till_timestamp, element, hour_till):
- """
- This function calculates the gap between the last time entity and the end
- of the day which defined in the settings page by the user.
- We verify if the result more than the gap defined in the settings file,
- if yes we set for gap_eod param True, parse the result in param gap_eod_result
- """
- stopped_at_task = time.mktime(
- datetime.strptime(
- element['stopped_at'],
- '%H:%M').timetuple())
- if element['stopped_at'] <= hour_till:
- result = abs(stopped_at_task - hour_till_timestamp)
- if result > settings.GAP:
- element['gap_eod'] = True
- hours, minutes = divmod(result / 60, 60)
- if hours != 0:
- element['gap_eod_result'] = \
- "%dh %02dm" % (hours, minutes)
- else:
- element['gap_eod_result'] = \
- "%02dm" % minutes
- def set_overlap_or_gap_param_for_time_entry(idx, time_entities):
- """
- This function sets param into time entry dict if overlap or gap present
- :param idx: index
- :param time_entities: list of all time entities
- :type idx: int
- :type time_entities: list
- """
- if time_entities[idx]['result'] > 0 and \
- time_entities[idx]['result'] > settings.OVERLAP:
- time_entities[idx]['overlapped'] = True
- current_start = time_entities[idx]['started_at_day'].split(' ')[0]
- future_start = time_entities[idx + 1]['started_at_day'].split(' ')[0]
- if current_start == future_start:
- time_entities[idx + 1]['has_overlap'] = True
- if time_entities[idx]['stopped_at'] < time_entities[idx + 1][
- 'stopped_at']:
- result = int(time_entities[idx]['result'])
- time_entities[idx + 1]['overlap_time'] = '{}-{}'.format(
- time_entities[idx + 1]['started_at'],
- time_entities[idx]['stopped_at'])
- else:
- started_at = time.mktime(
- datetime.strptime(time_entities[idx + 1]['started_at'],
- '%H:%M').timetuple())
- stopped_at = time.mktime(
- datetime.strptime(time_entities[idx + 1]['stopped_at'],
- '%H:%M').timetuple())
- result = int(stopped_at - started_at)
- time_entities[idx + 1]['overlap_time'] = '{}-{}'.format(
- time_entities[idx + 1]['started_at'],
- time_entities[idx + 1]['stopped_at'])
- hours, minutes = divmod(result / 60, 60)
- if hours != 0:
- time_entities[idx + 1]['overlap_result'] = \
- "%d hour %02d minutes" % (hours, minutes)
- if minutes <= 1 < hours:
- time_entities[idx + 1]['overlap_result'] = \
- "%d hours %02d minute" % (hours, minutes)
- if minutes == 0:
- time_entities[idx + 1]['overlap_result'] = \
- "%d hours" % (hours)
- if minutes <= 1 and hours == 1:
- time_entities[idx + 1]['overlap_result'] = \
- "%d hour %02d minute" % (hours, minutes)
- if minutes == 0:
- time_entities[idx + 1]['overlap_result'] = \
- "%d hour" % (hours)
- if hours > 1 and minutes > 1:
- time_entities[idx + 1]['overlap_result'] = \
- "%d hours %02d minutes" % (hours, minutes)
- else:
- time_entities[idx + 1]['overlap_result'] = \
- "%02d minutes" % minutes
- if minutes < 1:
- time_entities[idx + 1]['overlap_result'] = \
- "%02d minute" % minutes
- elif time_entities[idx]['result'] < 0 and abs(
- time_entities[idx]['result']) > settings.GAP:
- time_entities[idx]['gapped'] = True
- time_entities[idx + 1]['has_gap'] = True
- hours, minutes = divmod(abs(int(
- time_entities[idx]['result'])) / 60, 60)
- if hours != 0:
- time_entities[idx + 1]['gap_result'] = \
- "%dh %02dm" % (hours, minutes)
- else:
- time_entities[idx + 1]['gap_result'] = \
- "%02dm" % minutes
- def search_field(field, keywords):
- """
- This method helps to parse field with special keyword by using regex.
- We split string to several groups and take information in double quote.
- """
- # Find key word in description by using regex. Example which will be found
- # ID="TEST-123".
- search_item = re.search(
- r'{}((?<![\\])["])((?:.(?!(?<![\\])\1))*.?)\1'.format(
- keywords), field, re.M | re.I)
- if search_item:
- item = search_item.group(2)
- else:
- item = ''
- return item
- def repeat_replace(s, old, new, occurrence):
- string = s.rsplit(old, occurrence)
- return new.join(string)
Add Comment
Please, Sign In to add comment