123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158 |
- # -*- coding: utf-8 -*-
- # Part of Odoo. See LICENSE file for full copyright and licensing details.
- from collections import defaultdict
- import itertools
- import math
- from datetime import datetime, time, timedelta
- from dateutil.relativedelta import relativedelta
- from dateutil.rrule import rrule, DAILY, WEEKLY
- from functools import partial
- from itertools import chain
- from pytz import timezone, utc
- from odoo import api, fields, models, _
- from odoo.addons.base.models.res_partner import _tz_get
- from odoo.exceptions import ValidationError
- from odoo.osv import expression
- from odoo.tools.float_utils import float_round
- from odoo.tools import date_utils, float_utils
- from .resource_mixin import timezone_datetime
- # Default hour per day value. The one should
- # only be used when the one from the calendar
- # is not available.
- HOURS_PER_DAY = 8
- # This will generate 16th of days
- ROUNDING_FACTOR = 16
- def make_aware(dt):
- """ Return ``dt`` with an explicit timezone, together with a function to
- convert a datetime to the same (naive or aware) timezone as ``dt``.
- """
- if dt.tzinfo:
- return dt, lambda val: val.astimezone(dt.tzinfo)
- else:
- return dt.replace(tzinfo=utc), lambda val: val.astimezone(utc).replace(tzinfo=None)
- def string_to_datetime(value):
- """ Convert the given string value to a datetime in UTC. """
- return utc.localize(fields.Datetime.from_string(value))
- def datetime_to_string(dt):
- """ Convert the given datetime (converted in UTC) to a string value. """
- return fields.Datetime.to_string(dt.astimezone(utc))
- def float_to_time(hours):
- """ Convert a number of hours into a time object. """
- if hours == 24.0:
- return time.max
- fractional, integral = math.modf(hours)
- return time(int(integral), int(float_round(60 * fractional, precision_digits=0)), 0)
- def _boundaries(intervals, opening, closing):
- """ Iterate on the boundaries of intervals. """
- for start, stop, recs in intervals:
- if start < stop:
- yield (start, opening, recs)
- yield (stop, closing, recs)
- class Intervals(object):
- """ Collection of ordered disjoint intervals with some associated records.
- Each interval is a triple ``(start, stop, records)``, where ``records``
- is a recordset.
- """
- def __init__(self, intervals=()):
- self._items = []
- if intervals:
- # normalize the representation of intervals
- append = self._items.append
- starts = []
- recses = []
- for value, flag, recs in sorted(_boundaries(intervals, 'start', 'stop')):
- if flag == 'start':
- starts.append(value)
- recses.append(recs)
- else:
- start = starts.pop()
- if not starts:
- append((start, value, recses[0].union(*recses)))
- recses.clear()
- def __bool__(self):
- return bool(self._items)
- def __len__(self):
- return len(self._items)
- def __iter__(self):
- return iter(self._items)
- def __reversed__(self):
- return reversed(self._items)
- def __or__(self, other):
- """ Return the union of two sets of intervals. """
- return Intervals(chain(self._items, other._items))
- def __and__(self, other):
- """ Return the intersection of two sets of intervals. """
- return self._merge(other, False)
- def __sub__(self, other):
- """ Return the difference of two sets of intervals. """
- return self._merge(other, True)
- def _merge(self, other, difference):
- """ Return the difference or intersection of two sets of intervals. """
- result = Intervals()
- append = result._items.append
- # using 'self' and 'other' below forces normalization
- bounds1 = _boundaries(self, 'start', 'stop')
- bounds2 = _boundaries(other, 'switch', 'switch')
- start = None # set by start/stop
- recs1 = None # set by start
- enabled = difference # changed by switch
- for value, flag, recs in sorted(chain(bounds1, bounds2)):
- if flag == 'start':
- start = value
- recs1 = recs
- elif flag == 'stop':
- if enabled and start < value:
- append((start, value, recs1))
- start = None
- else:
- if not enabled and start is not None:
- start = value
- if enabled and start is not None and start < value:
- append((start, value, recs1))
- enabled = not enabled
- return result
- def sum_intervals(intervals):
- """ Sum the intervals duration (unit : hour)"""
- return sum(
- (stop - start).total_seconds() / 3600
- for start, stop, meta in intervals
- )
- class ResourceCalendar(models.Model):
- """ Calendar model for a resource. It has
- - attendance_ids: list of resource.calendar.attendance that are a working
- interval in a given weekday.
- - leave_ids: list of leaves linked to this calendar. A leave can be general
- or linked to a specific resource, depending on its resource_id.
- All methods in this class use intervals. An interval is a tuple holding
- (begin_datetime, end_datetime). A list of intervals is therefore a list of
- tuples, holding several intervals of work or leaves. """
- _name = "resource.calendar"
- _description = "Resource Working Time"
- @api.model
- def default_get(self, fields):
- res = super(ResourceCalendar, self).default_get(fields)
- if not res.get('name') and res.get('company_id'):
- res['name'] = _('Working Hours of %s', self.env['res.company'].browse(res['company_id']).name)
- if 'attendance_ids' in fields and not res.get('attendance_ids'):
- company_id = res.get('company_id', self.env.company.id)
- company = self.env['res.company'].browse(company_id)
- company_attendance_ids = company.resource_calendar_id.attendance_ids
- if not company.resource_calendar_id.two_weeks_calendar and company_attendance_ids:
- res['attendance_ids'] = [
- (0, 0, {
- 'name': attendance.name,
- 'dayofweek': attendance.dayofweek,
- 'hour_from': attendance.hour_from,
- 'hour_to': attendance.hour_to,
- 'day_period': attendance.day_period,
- })
- for attendance in company_attendance_ids
- ]
- else:
- res['attendance_ids'] = [
- (0, 0, {'name': _('Monday Morning'), 'dayofweek': '0', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
- (0, 0, {'name': _('Monday Afternoon'), 'dayofweek': '0', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'}),
- (0, 0, {'name': _('Tuesday Morning'), 'dayofweek': '1', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
- (0, 0, {'name': _('Tuesday Afternoon'), 'dayofweek': '1', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'}),
- (0, 0, {'name': _('Wednesday Morning'), 'dayofweek': '2', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
- (0, 0, {'name': _('Wednesday Afternoon'), 'dayofweek': '2', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'}),
- (0, 0, {'name': _('Thursday Morning'), 'dayofweek': '3', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
- (0, 0, {'name': _('Thursday Afternoon'), 'dayofweek': '3', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'}),
- (0, 0, {'name': _('Friday Morning'), 'dayofweek': '4', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
- (0, 0, {'name': _('Friday Afternoon'), 'dayofweek': '4', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'})
- ]
- return res
- name = fields.Char(required=True)
- active = fields.Boolean("Active", default=True,
- help="If the active field is set to false, it will allow you to hide the Working Time without removing it.")
- company_id = fields.Many2one(
- 'res.company', 'Company',
- default=lambda self: self.env.company)
- attendance_ids = fields.One2many(
- 'resource.calendar.attendance', 'calendar_id', 'Working Time',
- compute='_compute_attendance_ids', store=True, readonly=False, copy=True)
- leave_ids = fields.One2many(
- 'resource.calendar.leaves', 'calendar_id', 'Time Off')
- global_leave_ids = fields.One2many(
- 'resource.calendar.leaves', 'calendar_id', 'Global Time Off',
- compute='_compute_global_leave_ids', store=True, readonly=False,
- domain=[('resource_id', '=', False)], copy=True,
- )
- hours_per_day = fields.Float("Average Hour per Day", default=HOURS_PER_DAY,
- help="Average hours per day a resource is supposed to work with this calendar.")
- tz = fields.Selection(
- _tz_get, string='Timezone', required=True,
- default=lambda self: self._context.get('tz') or self.env.user.tz or self.env.ref('base.user_admin').tz or 'UTC',
- help="This field is used in order to define in which timezone the resources will work.")
- tz_offset = fields.Char(compute='_compute_tz_offset', string='Timezone offset', invisible=True)
- two_weeks_calendar = fields.Boolean(string="Calendar in 2 weeks mode")
- two_weeks_explanation = fields.Char('Explanation', compute="_compute_two_weeks_explanation")
- @api.depends('company_id')
- def _compute_attendance_ids(self):
- for calendar in self.filtered(lambda c: not c._origin or c._origin.company_id != c.company_id):
- company_calendar = calendar.company_id.resource_calendar_id
- calendar.update({
- 'two_weeks_calendar': company_calendar.two_weeks_calendar,
- 'hours_per_day': company_calendar.hours_per_day,
- 'tz': company_calendar.tz,
- 'attendance_ids': [(5, 0, 0)] + [
- (0, 0, attendance._copy_attendance_vals()) for attendance in company_calendar.attendance_ids if not attendance.resource_id]
- })
- @api.depends('company_id')
- def _compute_global_leave_ids(self):
- for calendar in self.filtered(lambda c: not c._origin or c._origin.company_id != c.company_id):
- calendar.update({
- 'global_leave_ids': [(5, 0, 0)] + [
- (0, 0, leave._copy_leave_vals()) for leave in calendar.company_id.resource_calendar_id.global_leave_ids]
- })
- @api.depends('tz')
- def _compute_tz_offset(self):
- for calendar in self:
- calendar.tz_offset = datetime.now(timezone(calendar.tz or 'GMT')).strftime('%z')
- @api.returns('self', lambda value: value.id)
- def copy(self, default=None):
- self.ensure_one()
- if default is None:
- default = {}
- if not default.get('name'):
- default.update(name=_('%s (copy)') % (self.name))
- return super(ResourceCalendar, self).copy(default)
- @api.constrains('attendance_ids')
- def _check_attendance_ids(self):
- for resource in self:
- if (resource.two_weeks_calendar and
- resource.attendance_ids.filtered(lambda a: a.display_type == 'line_section') and
- not resource.attendance_ids.sorted('sequence')[0].display_type):
- raise ValidationError(_("In a calendar with 2 weeks mode, all periods need to be in the sections."))
- @api.depends('two_weeks_calendar')
- def _compute_two_weeks_explanation(self):
- today = fields.Date.today()
- week_type = self.env['resource.calendar.attendance'].get_week_type(today)
- week_type_str = _("second") if week_type else _("first")
- first_day = date_utils.start_of(today, 'week')
- last_day = date_utils.end_of(today, 'week')
- self.two_weeks_explanation = _("The current week (from %s to %s) correspond to the %s one.", first_day,
- last_day, week_type_str)
- def _get_global_attendances(self):
- return self.attendance_ids.filtered(lambda attendance:
- not attendance.date_from and not attendance.date_to
- and not attendance.resource_id and not attendance.display_type)
- def _compute_hours_per_day(self, attendances):
- if not attendances:
- return 0
- hour_count = 0.0
- for attendance in attendances:
- hour_count += attendance.hour_to - attendance.hour_from
- if self.two_weeks_calendar:
- number_of_days = len(set(attendances.filtered(lambda cal: cal.week_type == '1').mapped('dayofweek')))
- number_of_days += len(set(attendances.filtered(lambda cal: cal.week_type == '0').mapped('dayofweek')))
- else:
- number_of_days = len(set(attendances.mapped('dayofweek')))
- return float_round(hour_count / float(number_of_days), precision_digits=2)
- @api.onchange('attendance_ids', 'two_weeks_calendar')
- def _onchange_hours_per_day(self):
- attendances = self._get_global_attendances()
- self.hours_per_day = self._compute_hours_per_day(attendances)
- def switch_calendar_type(self):
- if not self.two_weeks_calendar:
- self.attendance_ids.unlink()
- self.attendance_ids = [
- (0, 0, {
- 'name': 'First week',
- 'dayofweek': '0',
- 'sequence': '0',
- 'hour_from': 0,
- 'day_period': 'morning',
- 'week_type': '0',
- 'hour_to': 0,
- 'display_type':
- 'line_section'}),
- (0, 0, {
- 'name': 'Second week',
- 'dayofweek': '0',
- 'sequence': '25',
- 'hour_from': 0,
- 'day_period': 'morning',
- 'week_type': '1',
- 'hour_to': 0,
- 'display_type': 'line_section'}),
- ]
- self.two_weeks_calendar = True
- default_attendance = self.default_get('attendance_ids')['attendance_ids']
- for idx, att in enumerate(default_attendance):
- att[2]["week_type"] = '0'
- att[2]["sequence"] = idx + 1
- self.attendance_ids = default_attendance
- for idx, att in enumerate(default_attendance):
- att[2]["week_type"] = '1'
- att[2]["sequence"] = idx + 26
- self.attendance_ids = default_attendance
- else:
- self.two_weeks_calendar = False
- self.attendance_ids.unlink()
- self.attendance_ids = self.default_get('attendance_ids')['attendance_ids']
- self._onchange_hours_per_day()
- @api.onchange('attendance_ids')
- def _onchange_attendance_ids(self):
- if not self.two_weeks_calendar:
- return
- even_week_seq = self.attendance_ids.filtered(lambda att: att.display_type == 'line_section' and att.week_type == '0')
- odd_week_seq = self.attendance_ids.filtered(lambda att: att.display_type == 'line_section' and att.week_type == '1')
- if len(even_week_seq) != 1 or len(odd_week_seq) != 1:
- raise ValidationError(_("You can't delete section between weeks."))
- even_week_seq = even_week_seq.sequence
- odd_week_seq = odd_week_seq.sequence
- for line in self.attendance_ids.filtered(lambda att: att.display_type is False):
- if even_week_seq > odd_week_seq:
- line.week_type = '1' if even_week_seq > line.sequence else '0'
- else:
- line.week_type = '0' if odd_week_seq > line.sequence else '1'
- def _check_overlap(self, attendance_ids):
- """ attendance_ids correspond to attendance of a week,
- will check for each day of week that there are no superimpose. """
- result = []
- for attendance in attendance_ids.filtered(lambda att: not att.date_from and not att.date_to):
- # 0.000001 is added to each start hour to avoid to detect two contiguous intervals as superimposing.
- # Indeed Intervals function will join 2 intervals with the start and stop hour corresponding.
- result.append((int(attendance.dayofweek) * 24 + attendance.hour_from + 0.000001, int(attendance.dayofweek) * 24 + attendance.hour_to, attendance))
- if len(Intervals(result)) != len(result):
- raise ValidationError(_("Attendances can't overlap."))
- @api.constrains('attendance_ids')
- def _check_attendance(self):
- # Avoid superimpose in attendance
- for calendar in self:
- attendance_ids = calendar.attendance_ids.filtered(lambda attendance: not attendance.resource_id and attendance.display_type is False)
- if calendar.two_weeks_calendar:
- calendar._check_overlap(attendance_ids.filtered(lambda attendance: attendance.week_type == '0'))
- calendar._check_overlap(attendance_ids.filtered(lambda attendance: attendance.week_type == '1'))
- else:
- calendar._check_overlap(attendance_ids)
- # --------------------------------------------------
- # Computation API
- # --------------------------------------------------
- def _attendance_intervals_batch(self, start_dt, end_dt, resources=None, domain=None, tz=None):
- assert start_dt.tzinfo and end_dt.tzinfo
- self.ensure_one()
- if not resources:
- resources = self.env['resource.resource']
- resources_list = [resources]
- else:
- resources_list = list(resources) + [self.env['resource.resource']]
- resource_ids = [r.id for r in resources_list]
- domain = domain if domain is not None else []
- domain = expression.AND([domain, [
- ('calendar_id', '=', self.id),
- ('resource_id', 'in', resource_ids),
- ('display_type', '=', False),
- ]])
- attendances = self.env['resource.calendar.attendance'].search(domain)
- # Since we only have one calendar to take in account
- # Group resources per tz they will all have the same result
- resources_per_tz = defaultdict(list)
- for resource in resources_list:
- resources_per_tz[tz or timezone((resource or self).tz)].append(resource)
- # Resource specific attendances
- attendance_per_resource = defaultdict(lambda: self.env['resource.calendar.attendance'])
- # Calendar attendances per day of the week
- # * 7 days per week * 2 for two week calendars
- attendances_per_day = [self.env['resource.calendar.attendance']] * 7 * 2
- weekdays = set()
- for attendance in attendances:
- if attendance.resource_id:
- attendance_per_resource[attendance.resource_id] |= attendance
- weekday = int(attendance.dayofweek)
- weekdays.add(weekday)
- if self.two_weeks_calendar:
- weektype = int(attendance.week_type)
- attendances_per_day[weekday + 7 * weektype] |= attendance
- else:
- attendances_per_day[weekday] |= attendance
- attendances_per_day[weekday + 7] |= attendance
- start = start_dt.astimezone(utc)
- end = end_dt.astimezone(utc)
- bounds_per_tz = {
- tz: (start_dt.astimezone(tz), end_dt.astimezone(tz))
- for tz in resources_per_tz.keys()
- }
- # Use the outer bounds from the requested timezones
- for tz, bounds in bounds_per_tz.items():
- start = min(start, bounds[0].replace(tzinfo=utc))
- end = max(end, bounds[1].replace(tzinfo=utc))
- # Generate once with utc as timezone
- days = rrule(DAILY, start.date(), until=end.date(), byweekday=weekdays)
- ResourceCalendarAttendance = self.env['resource.calendar.attendance']
- base_result = []
- per_resource_result = defaultdict(list)
- for day in days:
- week_type = ResourceCalendarAttendance.get_week_type(day)
- attendances = attendances_per_day[day.weekday() + 7 * week_type]
- for attendance in attendances:
- if (attendance.date_from and day.date() < attendance.date_from) or\
- (attendance.date_to and attendance.date_to < day.date()):
- continue
- day_from = datetime.combine(day, float_to_time(attendance.hour_from))
- day_to = datetime.combine(day, float_to_time(attendance.hour_to))
- if attendance.resource_id:
- per_resource_result[attendance.resource_id].append((day_from, day_to, attendance))
- else:
- base_result.append((day_from, day_to, attendance))
- # Copy the result localized once per necessary timezone
- # Strictly speaking comparing start_dt < time or start_dt.astimezone(tz) < time
- # should always yield the same result. however while working with dates it is easier
- # if all dates have the same format
- result_per_tz = {
- tz: [(max(bounds_per_tz[tz][0], tz.localize(val[0])),
- min(bounds_per_tz[tz][1], tz.localize(val[1])),
- val[2])
- for val in base_result]
- for tz in resources_per_tz.keys()
- }
- result_per_resource_id = dict()
- for tz, resources in resources_per_tz.items():
- res = result_per_tz[tz]
- res_intervals = Intervals(res)
- for resource in resources:
- if resource in per_resource_result:
- resource_specific_result = [(max(bounds_per_tz[tz][0], tz.localize(val[0])), min(bounds_per_tz[tz][1], tz.localize(val[1])), val[2])
- for val in per_resource_result[resource]]
- result_per_resource_id[resource.id] = Intervals(itertools.chain(res, resource_specific_result))
- else:
- result_per_resource_id[resource.id] = res_intervals
- return result_per_resource_id
- def _leave_intervals(self, start_dt, end_dt, resource=None, domain=None, tz=None):
- if resource is None:
- resource = self.env['resource.resource']
- return self._leave_intervals_batch(
- start_dt, end_dt, resources=resource, domain=domain, tz=tz
- )[resource.id]
- def _leave_intervals_batch(self, start_dt, end_dt, resources=None, domain=None, tz=None, any_calendar=False):
- """ Return the leave intervals in the given datetime range.
- The returned intervals are expressed in specified tz or in the calendar's timezone.
- """
- assert start_dt.tzinfo and end_dt.tzinfo
- self.ensure_one()
- if not resources:
- resources = self.env['resource.resource']
- resources_list = [resources]
- else:
- resources_list = list(resources) + [self.env['resource.resource']]
- resource_ids = [r.id for r in resources_list]
- if domain is None:
- domain = [('time_type', '=', 'leave')]
- if not any_calendar:
- domain = domain + [('calendar_id', 'in', [False, self.id])]
- # for the computation, express all datetimes in UTC
- domain = domain + [
- ('resource_id', 'in', resource_ids),
- ('date_from', '<=', datetime_to_string(end_dt)),
- ('date_to', '>=', datetime_to_string(start_dt)),
- ]
- # retrieve leave intervals in (start_dt, end_dt)
- result = defaultdict(lambda: [])
- tz_dates = {}
- for leave in self.env['resource.calendar.leaves'].search(domain):
- for resource in resources_list:
- if leave.resource_id.id not in [False, resource.id]:
- continue
- tz = tz if tz else timezone((resource or self).tz)
- if (tz, start_dt) in tz_dates:
- start = tz_dates[(tz, start_dt)]
- else:
- start = start_dt.astimezone(tz)
- tz_dates[(tz, start_dt)] = start
- if (tz, end_dt) in tz_dates:
- end = tz_dates[(tz, end_dt)]
- else:
- end = end_dt.astimezone(tz)
- tz_dates[(tz, end_dt)] = end
- dt0 = string_to_datetime(leave.date_from).astimezone(tz)
- dt1 = string_to_datetime(leave.date_to).astimezone(tz)
- result[resource.id].append((max(start, dt0), min(end, dt1), leave))
- return {r.id: Intervals(result[r.id]) for r in resources_list}
- def _work_intervals_batch(self, start_dt, end_dt, resources=None, domain=None, tz=None, compute_leaves=True):
- """ Return the effective work intervals between the given datetimes. """
- if not resources:
- resources = self.env['resource.resource']
- resources_list = [resources]
- else:
- resources_list = list(resources) + [self.env['resource.resource']]
- attendance_intervals = self._attendance_intervals_batch(start_dt, end_dt, resources, tz=tz)
- if compute_leaves:
- leave_intervals = self._leave_intervals_batch(start_dt, end_dt, resources, domain, tz=tz)
- return {
- r.id: (attendance_intervals[r.id] - leave_intervals[r.id]) for r in resources_list
- }
- else:
- return {
- r.id: attendance_intervals[r.id] for r in resources_list
- }
- def _unavailable_intervals(self, start_dt, end_dt, resource=None, domain=None, tz=None):
- if resource is None:
- resource = self.env['resource.resource']
- return self._unavailable_intervals_batch(
- start_dt, end_dt, resources=resource, domain=domain, tz=tz
- )[resource.id]
- def _unavailable_intervals_batch(self, start_dt, end_dt, resources=None, domain=None, tz=None):
- """ Return the unavailable intervals between the given datetimes. """
- if not resources:
- resources = self.env['resource.resource']
- resources_list = [resources]
- else:
- resources_list = list(resources)
- resources_work_intervals = self._work_intervals_batch(start_dt, end_dt, resources, domain, tz)
- result = {}
- for resource in resources_list:
- work_intervals = [(start, stop) for start, stop, meta in resources_work_intervals[resource.id]]
- # start + flatten(intervals) + end
- work_intervals = [start_dt] + list(chain.from_iterable(work_intervals)) + [end_dt]
- # put it back to UTC
- work_intervals = list(map(lambda dt: dt.astimezone(utc), work_intervals))
- # pick groups of two
- work_intervals = list(zip(work_intervals[0::2], work_intervals[1::2]))
- result[resource.id] = work_intervals
- return result
- # --------------------------------------------------
- # Private Methods / Helpers
- # --------------------------------------------------
- def _get_days_data(self, intervals, day_total):
- """
- helper function to compute duration of `intervals`
- expressed in days and hours.
- `day_total` is a dict {date: n_hours} with the number of hours for each day.
- """
- day_hours = defaultdict(float)
- for start, stop, meta in intervals:
- day_hours[start.date()] += (stop - start).total_seconds() / 3600
- # compute number of days as quarters
- days = sum(
- float_utils.round(ROUNDING_FACTOR * day_hours[day] / day_total[day]) / ROUNDING_FACTOR if day_total[day] else 0
- for day in day_hours
- )
- return {
- 'days': days,
- 'hours': sum(day_hours.values()),
- }
- def _get_resources_day_total(self, from_datetime, to_datetime, resources=None):
- """
- @return dict with hours of attendance in each day between `from_datetime` and `to_datetime`
- """
- self.ensure_one()
- if not resources:
- resources = self.env['resource.resource']
- resources_list = [resources]
- else:
- resources_list = list(resources) + [self.env['resource.resource']]
- # total hours per day: retrieve attendances with one extra day margin,
- # in order to compute the total hours on the first and last days
- from_full = from_datetime - timedelta(days=1)
- to_full = to_datetime + timedelta(days=1)
- intervals = self._attendance_intervals_batch(from_full, to_full, resources=resources)
- result = defaultdict(lambda: defaultdict(float))
- for resource in resources_list:
- day_total = result[resource.id]
- for start, stop, meta in intervals[resource.id]:
- day_total[start.date()] += (stop - start).total_seconds() / 3600
- return result
- def _get_closest_work_time(self, dt, match_end=False, resource=None, search_range=None, compute_leaves=True):
- """Return the closest work interval boundary within the search range.
- Consider only starts of intervals unless `match_end` is True. It will then only consider
- ends of intervals.
- :param dt: reference datetime
- :param match_end: wether to search for the begining of an interval or the end.
- :param search_range: time interval considered. Defaults to the entire day of `dt`
- :rtype: datetime | None
- """
- def interval_dt(interval):
- return interval[1 if match_end else 0]
- tz = resource.tz if resource else self.tz
- if resource is None:
- resource = self.env['resource.resource']
- if not dt.tzinfo or search_range and not (search_range[0].tzinfo and search_range[1].tzinfo):
- raise ValueError('Provided datetimes needs to be timezoned')
- dt = dt.astimezone(timezone(tz))
- if not search_range:
- range_start = dt + relativedelta(hour=0, minute=0, second=0)
- range_end = dt + relativedelta(days=1, hour=0, minute=0, second=0)
- else:
- range_start, range_end = search_range
- if not range_start <= dt <= range_end:
- return None
- work_intervals = sorted(
- self._work_intervals_batch(range_start, range_end, resource, compute_leaves=compute_leaves)[resource.id],
- key=lambda i: abs(interval_dt(i) - dt),
- )
- return interval_dt(work_intervals[0]) if work_intervals else None
- def _get_unusual_days(self, start_dt, end_dt):
- if not self:
- return {}
- self.ensure_one()
- if not start_dt.tzinfo:
- start_dt = start_dt.replace(tzinfo=utc)
- if not end_dt.tzinfo:
- end_dt = end_dt.replace(tzinfo=utc)
- works = {d[0].date() for d in self._work_intervals_batch(start_dt, end_dt)[False]}
- return {fields.Date.to_string(day.date()): (day.date() not in works) for day in rrule(DAILY, start_dt, until=end_dt)}
- # --------------------------------------------------
- # External API
- # --------------------------------------------------
- def get_work_hours_count(self, start_dt, end_dt, compute_leaves=True, domain=None):
- """
- `compute_leaves` controls whether or not this method is taking into
- account the global leaves.
- `domain` controls the way leaves are recognized.
- None means default value ('time_type', '=', 'leave')
- Counts the number of work hours between two datetimes.
- """
- self.ensure_one()
- # Set timezone in UTC if no timezone is explicitly given
- if not start_dt.tzinfo:
- start_dt = start_dt.replace(tzinfo=utc)
- if not end_dt.tzinfo:
- end_dt = end_dt.replace(tzinfo=utc)
- if compute_leaves:
- intervals = self._work_intervals_batch(start_dt, end_dt, domain=domain)[False]
- else:
- intervals = self._attendance_intervals_batch(start_dt, end_dt)[False]
- return sum(
- (stop - start).total_seconds() / 3600
- for start, stop, meta in intervals
- )
- def get_work_duration_data(self, from_datetime, to_datetime, compute_leaves=True, domain=None):
- """
- Get the working duration (in days and hours) for a given period, only
- based on the current calendar. This method does not use resource to
- compute it.
- `domain` is used in order to recognise the leaves to take,
- None means default value ('time_type', '=', 'leave')
- Returns a dict {'days': n, 'hours': h} containing the
- quantity of working time expressed as days and as hours.
- """
- # naive datetimes are made explicit in UTC
- from_datetime, dummy = make_aware(from_datetime)
- to_datetime, dummy = make_aware(to_datetime)
- day_total = self._get_resources_day_total(from_datetime, to_datetime)[False]
- # actual hours per day
- if compute_leaves:
- intervals = self._work_intervals_batch(from_datetime, to_datetime, domain=domain)[False]
- else:
- intervals = self._attendance_intervals_batch(from_datetime, to_datetime, domain=domain)[False]
- return self._get_days_data(intervals, day_total)
- def plan_hours(self, hours, day_dt, compute_leaves=False, domain=None, resource=None):
- """
- `compute_leaves` controls whether or not this method is taking into
- account the global leaves.
- `domain` controls the way leaves are recognized.
- None means default value ('time_type', '=', 'leave')
- Return datetime after having planned hours
- """
- day_dt, revert = make_aware(day_dt)
- if resource is None:
- resource = self.env['resource.resource']
- # which method to use for retrieving intervals
- if compute_leaves:
- get_intervals = partial(self._work_intervals_batch, domain=domain, resources=resource)
- resource_id = resource.id
- else:
- get_intervals = self._attendance_intervals_batch
- resource_id = False
- if hours >= 0:
- delta = timedelta(days=14)
- for n in range(100):
- dt = day_dt + delta * n
- for start, stop, meta in get_intervals(dt, dt + delta)[resource_id]:
- interval_hours = (stop - start).total_seconds() / 3600
- if hours <= interval_hours:
- return revert(start + timedelta(hours=hours))
- hours -= interval_hours
- return False
- else:
- hours = abs(hours)
- delta = timedelta(days=14)
- for n in range(100):
- dt = day_dt - delta * n
- for start, stop, meta in reversed(get_intervals(dt - delta, dt)[resource_id]):
- interval_hours = (stop - start).total_seconds() / 3600
- if hours <= interval_hours:
- return revert(stop - timedelta(hours=hours))
- hours -= interval_hours
- return False
- def plan_days(self, days, day_dt, compute_leaves=False, domain=None):
- """
- `compute_leaves` controls whether or not this method is taking into
- account the global leaves.
- `domain` controls the way leaves are recognized.
- None means default value ('time_type', '=', 'leave')
- Returns the datetime of a days scheduling.
- """
- day_dt, revert = make_aware(day_dt)
- # which method to use for retrieving intervals
- if compute_leaves:
- get_intervals = partial(self._work_intervals_batch, domain=domain)
- else:
- get_intervals = self._attendance_intervals_batch
- if days > 0:
- found = set()
- delta = timedelta(days=14)
- for n in range(100):
- dt = day_dt + delta * n
- for start, stop, meta in get_intervals(dt, dt + delta)[False]:
- found.add(start.date())
- if len(found) == days:
- return revert(stop)
- return False
- elif days < 0:
- days = abs(days)
- found = set()
- delta = timedelta(days=14)
- for n in range(100):
- dt = day_dt - delta * n
- for start, stop, meta in reversed(get_intervals(dt - delta, dt)[False]):
- found.add(start.date())
- if len(found) == days:
- return revert(start)
- return False
- else:
- return revert(day_dt)
- def _get_max_number_of_hours(self, start, end):
- self.ensure_one()
- if not self.attendance_ids:
- return 0
- mapped_data = defaultdict(lambda: 0)
- for attendance in self.attendance_ids.filtered(lambda a: (not a.date_from or not a.date_to) or (a.date_from <= end.date() and a.date_to >= start.date())):
- mapped_data[(attendance.week_type, attendance.dayofweek)] += attendance.hour_to - attendance.hour_from
- return max(mapped_data.values())
- class ResourceCalendarAttendance(models.Model):
- _name = "resource.calendar.attendance"
- _description = "Work Detail"
- _order = 'week_type, dayofweek, hour_from'
- name = fields.Char(required=True)
- dayofweek = fields.Selection([
- ('0', 'Monday'),
- ('1', 'Tuesday'),
- ('2', 'Wednesday'),
- ('3', 'Thursday'),
- ('4', 'Friday'),
- ('5', 'Saturday'),
- ('6', 'Sunday')
- ], 'Day of Week', required=True, index=True, default='0')
- date_from = fields.Date(string='Starting Date')
- date_to = fields.Date(string='End Date')
- hour_from = fields.Float(string='Work from', required=True, index=True,
- help="Start and End time of working.\n"
- "A specific value of 24:00 is interpreted as 23:59:59.999999.")
- hour_to = fields.Float(string='Work to', required=True)
- calendar_id = fields.Many2one("resource.calendar", string="Resource's Calendar", required=True, ondelete='cascade')
- day_period = fields.Selection([('morning', 'Morning'), ('afternoon', 'Afternoon')], required=True, default='morning')
- resource_id = fields.Many2one('resource.resource', 'Resource')
- week_type = fields.Selection([
- ('1', 'Second'),
- ('0', 'First')
- ], 'Week Number', default=False)
- two_weeks_calendar = fields.Boolean("Calendar in 2 weeks mode", related='calendar_id.two_weeks_calendar')
- display_type = fields.Selection([
- ('line_section', "Section")], default=False, help="Technical field for UX purpose.")
- sequence = fields.Integer(default=10,
- help="Gives the sequence of this line when displaying the resource calendar.")
- @api.onchange('hour_from', 'hour_to')
- def _onchange_hours(self):
- # avoid negative or after midnight
- self.hour_from = min(self.hour_from, 23.99)
- self.hour_from = max(self.hour_from, 0.0)
- self.hour_to = min(self.hour_to, 24)
- self.hour_to = max(self.hour_to, 0.0)
- # avoid wrong order
- self.hour_to = max(self.hour_to, self.hour_from)
- @api.model
- def get_week_type(self, date):
- # week_type is defined by
- # * counting the number of days from January 1 of year 1
- # (extrapolated to dates prior to the first adoption of the Gregorian calendar)
- # * converted to week numbers and then the parity of this number is asserted.
- # It ensures that an even week number always follows an odd week number. With classical week number,
- # some years have 53 weeks. Therefore, two consecutive odd week number follow each other (53 --> 1).
- return int(math.floor((date.toordinal() - 1) / 7) % 2)
- def _compute_display_name(self):
- super()._compute_display_name()
- this_week_type = str(self.get_week_type(fields.Date.context_today(self)))
- section_names = {'0': _('First week'), '1': _('Second week')}
- section_info = {True: _('this week'), False: _('other week')}
- for record in self.filtered(lambda l: l.display_type == 'line_section'):
- section_name = "%s (%s)" % (section_names[record.week_type], section_info[this_week_type == record.week_type])
- record.display_name = section_name
- def _copy_attendance_vals(self):
- self.ensure_one()
- return {
- 'name': self.name,
- 'dayofweek': self.dayofweek,
- 'date_from': self.date_from,
- 'date_to': self.date_to,
- 'hour_from': self.hour_from,
- 'hour_to': self.hour_to,
- 'day_period': self.day_period,
- 'week_type': self.week_type,
- 'display_type': self.display_type,
- 'sequence': self.sequence,
- }
- class ResourceResource(models.Model):
- _name = "resource.resource"
- _description = "Resources"
- _order = "name"
- @api.model
- def default_get(self, fields):
- res = super(ResourceResource, self).default_get(fields)
- if not res.get('calendar_id') and res.get('company_id'):
- company = self.env['res.company'].browse(res['company_id'])
- res['calendar_id'] = company.resource_calendar_id.id
- return res
- name = fields.Char(required=True)
- active = fields.Boolean(
- 'Active', default=True,
- help="If the active field is set to False, it will allow you to hide the resource record without removing it.")
- company_id = fields.Many2one('res.company', string='Company', default=lambda self: self.env.company)
- resource_type = fields.Selection([
- ('user', 'Human'),
- ('material', 'Material')], string='Type',
- default='user', required=True)
- user_id = fields.Many2one('res.users', string='User', help='Related user name for the resource to manage its access.')
- time_efficiency = fields.Float(
- 'Efficiency Factor', default=100, required=True,
- help="This field is used to calculate the expected duration of a work order at this work center. For example, if a work order takes one hour and the efficiency factor is 100%, then the expected duration will be one hour. If the efficiency factor is 200%, however the expected duration will be 30 minutes.")
- calendar_id = fields.Many2one(
- "resource.calendar", string='Working Time',
- default=lambda self: self.env.company.resource_calendar_id,
- required=True, domain="[('company_id', '=', company_id)]")
- tz = fields.Selection(
- _tz_get, string='Timezone', required=True,
- default=lambda self: self._context.get('tz') or self.env.user.tz or 'UTC')
- _sql_constraints = [
- ('check_time_efficiency', 'CHECK(time_efficiency>0)', 'Time efficiency must be strictly positive'),
- ]
- @api.model_create_multi
- def create(self, vals_list):
- for values in vals_list:
- if values.get('company_id') and not values.get('calendar_id'):
- values['calendar_id'] = self.env['res.company'].browse(values['company_id']).resource_calendar_id.id
- if not values.get('tz'):
- # retrieve timezone on user or calendar
- tz = (self.env['res.users'].browse(values.get('user_id')).tz or
- self.env['resource.calendar'].browse(values.get('calendar_id')).tz)
- if tz:
- values['tz'] = tz
- return super(ResourceResource, self).create(vals_list)
- @api.returns('self', lambda value: value.id)
- def copy(self, default=None):
- self.ensure_one()
- if default is None:
- default = {}
- if not default.get('name'):
- default.update(name=_('%s (copy)') % (self.name))
- return super(ResourceResource, self).copy(default)
- def write(self, values):
- if self.env.context.get('check_idempotence') and len(self) == 1:
- values = {
- fname: value
- for fname, value in values.items()
- if self._fields[fname].convert_to_write(self[fname], self) != value
- }
- if not values:
- return True
- return super().write(values)
- @api.onchange('company_id')
- def _onchange_company_id(self):
- if self.company_id:
- self.calendar_id = self.company_id.resource_calendar_id.id
- @api.onchange('user_id')
- def _onchange_user_id(self):
- if self.user_id:
- self.tz = self.user_id.tz
- def _get_work_interval(self, start, end):
- # Deprecated method. Use `_adjust_to_calendar` instead
- return self._adjust_to_calendar(start, end)
- def _adjust_to_calendar(self, start, end, compute_leaves=True):
- """Adjust the given start and end datetimes to the closest effective hours encoded
- in the resource calendar. Only attendances in the same day as `start` and `end` are
- considered (respectively). If no attendance is found during that day, the closest hour
- is None.
- e.g. simplified example:
- given two attendances: 8am-1pm and 2pm-5pm, given start=9am and end=6pm
- resource._adjust_to_calendar(start, end)
- >>> {resource: (8am, 5pm)}
- :return: Closest matching start and end of working periods for each resource
- :rtype: dict(resource, tuple(datetime | None, datetime | None))
- """
- start, revert_start_tz = make_aware(start)
- end, revert_end_tz = make_aware(end)
- result = {}
- for resource in self:
- resource_tz = timezone(resource.tz)
- start, end = start.astimezone(resource_tz), end.astimezone(resource_tz)
- search_range = [
- start + relativedelta(hour=0, minute=0, second=0),
- end + relativedelta(days=1, hour=0, minute=0, second=0),
- ]
- calendar_start = resource.calendar_id._get_closest_work_time(start, resource=resource, search_range=search_range,
- compute_leaves=compute_leaves)
- search_range[0] = start
- calendar_end = resource.calendar_id._get_closest_work_time(end if end > start else start, match_end=True,
- resource=resource, search_range=search_range,
- compute_leaves=compute_leaves)
- result[resource] = (
- calendar_start and revert_start_tz(calendar_start),
- calendar_end and revert_end_tz(calendar_end),
- )
- return result
- def _get_unavailable_intervals(self, start, end):
- """ Compute the intervals during which employee is unavailable with hour granularity between start and end
- Note: this method is used in enterprise (forecast and planning)
- """
- start_datetime = timezone_datetime(start)
- end_datetime = timezone_datetime(end)
- resource_mapping = {}
- calendar_mapping = defaultdict(lambda: self.env['resource.resource'])
- for resource in self:
- calendar_mapping[resource.calendar_id] |= resource
- for calendar, resources in calendar_mapping.items():
- resources_unavailable_intervals = calendar._unavailable_intervals_batch(start_datetime, end_datetime, resources, tz=timezone(calendar.tz))
- resource_mapping.update(resources_unavailable_intervals)
- return resource_mapping
- def _get_calendars_validity_within_period(self, start, end, default_company=None):
- """ Gets a dict of dict with resource's id as first key and resource's calendar as secondary key
- The value is the validity interval of the calendar for the given resource.
- Here the validity interval for each calendar is the whole interval but it's meant to be overriden in further modules
- handling resource's employee contracts.
- """
- assert start.tzinfo and end.tzinfo
- resource_calendars_within_period = defaultdict(lambda: defaultdict(Intervals)) # keys are [resource id:integer][calendar:self.env['resource.calendar']]
- default_calendar = default_company and default_company.resource_calendar_id or self.env.company.resource_calendar_id
- if not self:
- # if no resource, add the company resource calendar.
- resource_calendars_within_period[False][default_calendar] = Intervals([(start, end, self.env['resource.calendar.attendance'])])
- for resource in self:
- calendar = resource.calendar_id or resource.company_id.resource_calendar_id or default_calendar
- resource_calendars_within_period[resource.id][calendar] = Intervals([(start, end, self.env['resource.calendar.attendance'])])
- return resource_calendars_within_period
- def _get_valid_work_intervals(self, start, end, calendars=None):
- """ Gets the valid work intervals of the resource following their calendars between ``start`` and ``end``
- This methods handle the eventuality of a resource having multiple resource calendars, see _get_calendars_validity_within_period method
- for further explanation.
- """
- assert start.tzinfo and end.tzinfo
- resource_calendar_validity_intervals = {}
- calendar_resources = defaultdict(lambda: self.env['resource.resource'])
- resource_work_intervals = defaultdict(Intervals)
- calendar_work_intervals = dict()
- resource_calendar_validity_intervals = self.sudo()._get_calendars_validity_within_period(start, end)
- for resource in self:
- # For each resource, retrieve its calendar and their validity intervals
- for calendar in resource_calendar_validity_intervals[resource.id]:
- calendar_resources[calendar] |= resource
- for calendar in (calendars or []):
- calendar_resources[calendar] |= self.env['resource.resource']
- for calendar, resources in calendar_resources.items():
- # For each calendar used by the resources, retrieve the work intervals for every resources using it
- work_intervals_batch = calendar._work_intervals_batch(start, end, resources=resources)
- for resource in resources:
- # Make the conjunction between work intervals and calendar validity
- resource_work_intervals[resource.id] |= work_intervals_batch[resource.id] & resource_calendar_validity_intervals[resource.id][calendar]
- calendar_work_intervals[calendar.id] = work_intervals_batch[False]
- return resource_work_intervals, calendar_work_intervals
- class ResourceCalendarLeaves(models.Model):
- _name = "resource.calendar.leaves"
- _description = "Resource Time Off Detail"
- _order = "date_from"
- def default_get(self, fields_list):
- res = super().default_get(fields_list)
- if 'date_from' in fields_list and 'date_to' in fields_list and not res.get('date_from') and not res.get('date_to'):
- # Then we give the current day and we search the begin and end hours for this day in resource.calendar of the current company
- today = fields.Datetime.now()
- user_tz = timezone(self.env.user.tz or self._context.get('tz') or self.company_id.resource_calendar_id.tz or 'UTC')
- date_from = user_tz.localize(datetime.combine(today, time.min))
- date_to = user_tz.localize(datetime.combine(today, time.max))
- intervals = self.env.company.resource_calendar_id._work_intervals_batch(date_from.replace(tzinfo=utc), date_to.replace(tzinfo=utc))[False]
- if intervals: # Then we stop and return the dates given in parameter
- list_intervals = [(start, stop) for start, stop, records in intervals] # Convert intervals in interval list
- date_from = list_intervals[0][0] # We take the first date in the interval list
- date_to = list_intervals[-1][1] # We take the last date in the interval list
- res.update(
- date_from=date_from.astimezone(utc).replace(tzinfo=None),
- date_to=date_to.astimezone(utc).replace(tzinfo=None)
- )
- return res
- name = fields.Char('Reason')
- company_id = fields.Many2one(
- 'res.company', string="Company", readonly=True, store=True,
- default=lambda self: self.env.company, compute='_compute_company_id')
- calendar_id = fields.Many2one('resource.calendar', 'Working Hours', domain="[('company_id', 'in', [company_id, False])]", check_company=True, index=True)
- date_from = fields.Datetime('Start Date', required=True)
- date_to = fields.Datetime('End Date', required=True)
- resource_id = fields.Many2one(
- "resource.resource", 'Resource', index=True,
- help="If empty, this is a generic time off for the company. If a resource is set, the time off is only for this resource")
- time_type = fields.Selection([('leave', 'Time Off'), ('other', 'Other')], default='leave',
- help="Whether this should be computed as a time off or as work time (eg: formation)")
- @api.depends('calendar_id')
- def _compute_company_id(self):
- for leave in self:
- leave.company_id = leave.calendar_id.company_id or leave.company_id or self.env.company
- @api.constrains('date_from', 'date_to')
- def check_dates(self):
- if self.filtered(lambda leave: leave.date_from > leave.date_to):
- raise ValidationError(_('The start date of the time off must be earlier than the end date.'))
- @api.onchange('resource_id')
- def onchange_resource(self):
- if self.resource_id:
- self.calendar_id = self.resource_id.calendar_id
- def _copy_leave_vals(self):
- self.ensure_one()
- return {
- 'name': self.name,
- 'date_from': self.date_from,
- 'date_to': self.date_to,
- 'time_type': self.time_type,
- }
|