sequence_mixin.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # -*- coding: utf-8 -*-
  2. from odoo import api, fields, models, _
  3. from odoo.exceptions import ValidationError
  4. from odoo.tools.misc import format_date
  5. from odoo.tools import frozendict
  6. import re
  7. from collections import defaultdict
  8. from psycopg2 import sql
  9. class SequenceMixin(models.AbstractModel):
  10. """Mechanism used to have an editable sequence number.
  11. Be careful of how you use this regarding the prefixes. More info in the
  12. docstring of _get_last_sequence.
  13. """
  14. _name = 'sequence.mixin'
  15. _description = "Automatic sequence"
  16. _sequence_field = "name"
  17. _sequence_date_field = "date"
  18. _sequence_index = False
  19. _sequence_monthly_regex = r'^(?P<prefix1>.*?)(?P<year>((?<=\D)|(?<=^))((19|20|21)\d{2}|(\d{2}(?=\D))))(?P<prefix2>\D*?)(?P<month>(0[1-9]|1[0-2]))(?P<prefix3>\D+?)(?P<seq>\d*)(?P<suffix>\D*?)$'
  20. _sequence_yearly_regex = r'^(?P<prefix1>.*?)(?P<year>((?<=\D)|(?<=^))((19|20|21)?\d{2}))(?P<prefix2>\D+?)(?P<seq>\d*)(?P<suffix>\D*?)$'
  21. _sequence_fixed_regex = r'^(?P<prefix1>.*?)(?P<seq>\d{0,9})(?P<suffix>\D*?)$'
  22. sequence_prefix = fields.Char(compute='_compute_split_sequence', store=True)
  23. sequence_number = fields.Integer(compute='_compute_split_sequence', store=True)
  24. def init(self):
  25. # Add an index to optimise the query searching for the highest sequence number
  26. if not self._abstract and self._sequence_index:
  27. index_name = self._table + '_sequence_index'
  28. self.env.cr.execute('SELECT indexname FROM pg_indexes WHERE indexname = %s', (index_name,))
  29. if not self.env.cr.fetchone():
  30. self.env.cr.execute(sql.SQL("""
  31. CREATE INDEX {index_name} ON {table} ({sequence_index}, sequence_prefix desc, sequence_number desc, {field});
  32. CREATE INDEX {index2_name} ON {table} ({sequence_index}, id desc, sequence_prefix);
  33. """).format(
  34. sequence_index=sql.Identifier(self._sequence_index),
  35. index_name=sql.Identifier(index_name),
  36. index2_name=sql.Identifier(index_name + "2"),
  37. table=sql.Identifier(self._table),
  38. field=sql.Identifier(self._sequence_field),
  39. ))
  40. def _must_check_constrains_date_sequence(self):
  41. return True
  42. def _sequence_matches_date(self):
  43. self.ensure_one()
  44. date = fields.Date.to_date(self[self._sequence_date_field])
  45. sequence = self[self._sequence_field]
  46. if not sequence or not date:
  47. return True
  48. format_values = self._get_sequence_format_param(sequence)[1]
  49. year_match = (
  50. not format_values["year"]
  51. or format_values["year"] == date.year % 10 ** len(str(format_values["year"]))
  52. )
  53. month_match = not format_values['month'] or format_values['month'] == date.month
  54. return year_match and month_match
  55. @api.constrains(lambda self: (self._sequence_field, self._sequence_date_field))
  56. def _constrains_date_sequence(self):
  57. # Make it possible to bypass the constraint to allow edition of already messed up documents.
  58. # /!\ Do not use this to completely disable the constraint as it will make this mixin unreliable.
  59. constraint_date = fields.Date.to_date(self.env['ir.config_parameter'].sudo().get_param(
  60. 'sequence.mixin.constraint_start_date',
  61. '1970-01-01'
  62. ))
  63. for record in self:
  64. if not record._must_check_constrains_date_sequence():
  65. continue
  66. date = fields.Date.to_date(record[record._sequence_date_field])
  67. sequence = record[record._sequence_field]
  68. if (
  69. sequence
  70. and date
  71. and date > constraint_date
  72. and not record._sequence_matches_date()
  73. ):
  74. raise ValidationError(_(
  75. "The %(date_field)s (%(date)s) doesn't match the sequence number of the related %(model)s (%(sequence)s)\n"
  76. "You will need to clear the %(model)s's %(sequence_field)s to proceed.\n"
  77. "In doing so, you might want to resequence your entries in order to maintain a continuous date-based sequence.",
  78. date=format_date(self.env, date),
  79. sequence=sequence,
  80. date_field=record._fields[record._sequence_date_field]._description_string(self.env),
  81. sequence_field=record._fields[record._sequence_field]._description_string(self.env),
  82. model=self.env['ir.model']._get(record._name).display_name,
  83. ))
  84. @api.depends(lambda self: [self._sequence_field])
  85. def _compute_split_sequence(self):
  86. for record in self:
  87. sequence = record[record._sequence_field] or ''
  88. regex = re.sub(r"\?P<\w+>", "?:", record._sequence_fixed_regex.replace(r"?P<seq>", "")) # make the seq the only matching group
  89. matching = re.match(regex, sequence)
  90. record.sequence_prefix = sequence[:matching.start(1)]
  91. record.sequence_number = int(matching.group(1) or 0)
  92. @api.model
  93. def _deduce_sequence_number_reset(self, name):
  94. """Detect if the used sequence resets yearly, montly or never.
  95. :param name: the sequence that is used as a reference to detect the resetting
  96. periodicity. Typically, it is the last before the one you want to give a
  97. sequence.
  98. """
  99. for regex, ret_val, requirements in [
  100. (self._sequence_monthly_regex, 'month', ['seq', 'month', 'year']),
  101. (self._sequence_yearly_regex, 'year', ['seq', 'year']),
  102. (self._sequence_fixed_regex, 'never', ['seq']),
  103. ]:
  104. match = re.match(regex, name or '')
  105. if match:
  106. groupdict = match.groupdict()
  107. if all(req in groupdict for req in requirements):
  108. return ret_val
  109. raise ValidationError(_(
  110. 'The sequence regex should at least contain the seq grouping keys. For instance:\n'
  111. '^(?P<prefix1>.*?)(?P<seq>\d*)(?P<suffix>\D*?)$'
  112. ))
  113. def _get_last_sequence_domain(self, relaxed=False):
  114. """Get the sql domain to retreive the previous sequence number.
  115. This function should be overriden by models inheriting from this mixin.
  116. :param relaxed: see _get_last_sequence.
  117. :returns: tuple(where_string, where_params): with
  118. where_string: the entire SQL WHERE clause as a string.
  119. where_params: a dictionary containing the parameters to substitute
  120. at the execution of the query.
  121. """
  122. self.ensure_one()
  123. return "", {}
  124. def _get_starting_sequence(self):
  125. """Get a default sequence number.
  126. This function should be overriden by models heriting from this mixin
  127. This number will be incremented so you probably want to start the sequence at 0.
  128. :return: string to use as the default sequence to increment
  129. """
  130. self.ensure_one()
  131. return "00000000"
  132. def _get_last_sequence(self, relaxed=False, with_prefix=None, lock=True):
  133. """Retrieve the previous sequence.
  134. This is done by taking the number with the greatest alphabetical value within
  135. the domain of _get_last_sequence_domain. This means that the prefix has a
  136. huge importance.
  137. For instance, if you have INV/2019/0001 and INV/2019/0002, when you rename the
  138. last one to FACT/2019/0001, one might expect the next number to be
  139. FACT/2019/0002 but it will be INV/2019/0002 (again) because INV > FACT.
  140. Therefore, changing the prefix might not be convenient during a period, and
  141. would only work when the numbering makes a new start (domain returns by
  142. _get_last_sequence_domain is [], i.e: a new year).
  143. :param field_name: the field that contains the sequence.
  144. :param relaxed: this should be set to True when a previous request didn't find
  145. something without. This allows to find a pattern from a previous period, and
  146. try to adapt it for the new period.
  147. :param with_prefix: The sequence prefix to restrict the search on, if any.
  148. :return: the string of the previous sequence or None if there wasn't any.
  149. """
  150. self.ensure_one()
  151. if self._sequence_field not in self._fields or not self._fields[self._sequence_field].store:
  152. raise ValidationError(_('%s is not a stored field', self._sequence_field))
  153. where_string, param = self._get_last_sequence_domain(relaxed)
  154. if self._origin.id:
  155. where_string += " AND id != %(id)s "
  156. param['id'] = self._origin.id
  157. if with_prefix is not None:
  158. where_string += " AND sequence_prefix = %(with_prefix)s "
  159. param['with_prefix'] = with_prefix
  160. query = f"""
  161. SELECT {{field}} FROM {self._table}
  162. {where_string}
  163. AND sequence_prefix = (SELECT sequence_prefix FROM {self._table} {where_string} ORDER BY id DESC LIMIT 1)
  164. ORDER BY sequence_number DESC
  165. LIMIT 1
  166. """
  167. if lock:
  168. query = f"""
  169. UPDATE {self._table} SET write_date = write_date WHERE id = (
  170. {query.format(field='id')}
  171. )
  172. RETURNING {self._sequence_field};
  173. """
  174. else:
  175. query = query.format(field=self._sequence_field)
  176. self.flush_model([self._sequence_field, 'sequence_number', 'sequence_prefix'])
  177. self.env.cr.execute(query, param)
  178. return (self.env.cr.fetchone() or [None])[0]
  179. def _get_sequence_format_param(self, previous):
  180. """Get the python format and format values for the sequence.
  181. :param previous: the sequence we want to extract the format from
  182. :return tuple(format, format_values):
  183. format is the format string on which we should call .format()
  184. format_values is the dict of values to format the `format` string
  185. ``format.format(**format_values)`` should be equal to ``previous``
  186. """
  187. sequence_number_reset = self._deduce_sequence_number_reset(previous)
  188. regex = self._sequence_fixed_regex
  189. if sequence_number_reset == 'year':
  190. regex = self._sequence_yearly_regex
  191. elif sequence_number_reset == 'month':
  192. regex = self._sequence_monthly_regex
  193. format_values = re.match(regex, previous).groupdict()
  194. format_values['seq_length'] = len(format_values['seq'])
  195. format_values['year_length'] = len(format_values.get('year', ''))
  196. if not format_values.get('seq') and 'prefix1' in format_values and 'suffix' in format_values:
  197. # if we don't have a seq, consider we only have a prefix and not a suffix
  198. format_values['prefix1'] = format_values['suffix']
  199. format_values['suffix'] = ''
  200. for field in ('seq', 'year', 'month'):
  201. format_values[field] = int(format_values.get(field) or 0)
  202. placeholders = re.findall(r'(prefix\d|seq|suffix\d?|year|month)', regex)
  203. format = ''.join(
  204. "{seq:0{seq_length}d}" if s == 'seq' else
  205. "{month:02d}" if s == 'month' else
  206. "{year:0{year_length}d}" if s == 'year' else
  207. "{%s}" % s
  208. for s in placeholders
  209. )
  210. return format, format_values
  211. def _set_next_sequence(self):
  212. """Set the next sequence.
  213. This method ensures that the field is set both in the ORM and in the database.
  214. This is necessary because we use a database query to get the previous sequence,
  215. and we need that query to always be executed on the latest data.
  216. :param field_name: the field that contains the sequence.
  217. """
  218. self.ensure_one()
  219. last_sequence = self._get_last_sequence()
  220. new = not last_sequence
  221. if new:
  222. last_sequence = self._get_last_sequence(relaxed=True) or self._get_starting_sequence()
  223. format, format_values = self._get_sequence_format_param(last_sequence)
  224. if new:
  225. format_values['seq'] = 0
  226. format_values['year'] = self[self._sequence_date_field].year % (10 ** format_values['year_length'])
  227. format_values['month'] = self[self._sequence_date_field].month
  228. format_values['seq'] = format_values['seq'] + 1
  229. self[self._sequence_field] = format.format(**format_values)
  230. self._compute_split_sequence()
  231. def _is_last_from_seq_chain(self):
  232. """Tells whether or not this element is the last one of the sequence chain.
  233. :return: True if it is the last element of the chain.
  234. """
  235. last_sequence = self._get_last_sequence(with_prefix=self.sequence_prefix)
  236. if not last_sequence:
  237. return True
  238. seq_format, seq_format_values = self._get_sequence_format_param(last_sequence)
  239. seq_format_values['seq'] += 1
  240. return seq_format.format(**seq_format_values) == self.name
  241. def _is_end_of_seq_chain(self):
  242. """Tells whether or not these elements are the last ones of the sequence chain.
  243. :return: True if self are the last elements of the chain.
  244. """
  245. batched = defaultdict(lambda: {'last_rec': self.browse(), 'seq_list': []})
  246. for record in self.filtered(lambda x: x[x._sequence_field]):
  247. seq_format, format_values = record._get_sequence_format_param(record[record._sequence_field])
  248. seq = format_values.pop('seq')
  249. batch = batched[(seq_format, frozendict(format_values))]
  250. batch['seq_list'].append(seq)
  251. if batch['last_rec'].sequence_number <= record.sequence_number:
  252. batch['last_rec'] = record
  253. for values in batched.values():
  254. # The sequences we are deleting are not sequential
  255. seq_list = values['seq_list']
  256. if max(seq_list) - min(seq_list) != len(seq_list) - 1:
  257. return False
  258. # last_rec must have the highest number in the database
  259. record = values['last_rec']
  260. if not record._is_last_from_seq_chain():
  261. return False
  262. return True