# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. """ High-level objects for fields. """ from collections import defaultdict from datetime import date, datetime, time from lxml import etree, html from operator import attrgetter from xmlrpc.client import MAXINT import ast import base64 import copy import binascii import enum import itertools import json import logging import uuid import warnings from markupsafe import Markup import psycopg2 from psycopg2.extras import Json as PsycopgJson import pytz from difflib import get_close_matches from hashlib import sha256 from .tools import ( float_repr, float_round, float_compare, float_is_zero, human_size, pg_varchar, ustr, OrderedSet, pycompat, sql, date_utils, unique, image_process, merge_sequences, SQL_ORDER_BY_TYPE, is_list_of, has_list_types, html_normalize, html_sanitize, ) from .tools import DEFAULT_SERVER_DATE_FORMAT as DATE_FORMAT from .tools import DEFAULT_SERVER_DATETIME_FORMAT as DATETIME_FORMAT from .tools.translate import html_translate, _ from .tools.mimetypes import guess_mimetype from odoo.exceptions import CacheMiss from odoo.osv import expression DATE_LENGTH = len(date.today().strftime(DATE_FORMAT)) DATETIME_LENGTH = len(datetime.now().strftime(DATETIME_FORMAT)) # hacky-ish way to prevent access to a field through the ORM (except for sudo mode) NO_ACCESS='.' IR_MODELS = ( 'ir.model', 'ir.model.data', 'ir.model.fields', 'ir.model.fields.selection', 'ir.model.relation', 'ir.model.constraint', 'ir.module.module', ) _logger = logging.getLogger(__name__) _schema = logging.getLogger(__name__[:-7] + '.schema') NoneType = type(None) Default = object() # default value for __init__() methods def first(records): """ Return the first record in ``records``, with the same prefetching. """ return next(iter(records)) if len(records) > 1 else records def resolve_mro(model, name, predicate): """ Return the list of successively overridden values of attribute ``name`` in mro order on ``model`` that satisfy ``predicate``. Model registry classes are ignored. """ result = [] for cls in model._model_classes: value = cls.__dict__.get(name, Default) if value is Default: continue if not predicate(value): break result.append(value) return result def determine(needle, records, *args): """ Simple helper for calling a method given as a string or a function. :param needle: callable or name of method to call on ``records`` :param BaseModel records: recordset to call ``needle`` on or with :params args: additional arguments to pass to the determinant :returns: the determined value if the determinant is a method name or callable :raise TypeError: if ``records`` is not a recordset, or ``needle`` is not a callable or valid method name """ if not isinstance(records, BaseModel): raise TypeError("Determination requires a subject recordset") if isinstance(needle, str): needle = getattr(records, needle) if needle.__name__.find('__'): return needle(*args) elif callable(needle): if needle.__name__.find('__'): return needle(records, *args) raise TypeError("Determination requires a callable or method name") class MetaField(type): """ Metaclass for field classes. """ by_type = {} def __init__(cls, name, bases, attrs): super(MetaField, cls).__init__(name, bases, attrs) if not hasattr(cls, 'type'): return if cls.type and cls.type not in MetaField.by_type: MetaField.by_type[cls.type] = cls # compute class attributes to avoid calling dir() on fields cls.related_attrs = [] cls.description_attrs = [] for attr in dir(cls): if attr.startswith('_related_'): cls.related_attrs.append((attr[9:], attr)) elif attr.startswith('_description_'): cls.description_attrs.append((attr[13:], attr)) _global_seq = iter(itertools.count()) class Field(MetaField('DummyField', (object,), {})): """The field descriptor contains the field definition, and manages accesses and assignments of the corresponding field on records. The following attributes may be provided when instantiating a field: :param str string: the label of the field seen by users; if not set, the ORM takes the field name in the class (capitalized). :param str help: the tooltip of the field seen by users :param invisible: whether the field is invisible (boolean, by default ``False``) :param bool readonly: whether the field is readonly (default: ``False``) This only has an impact on the UI. Any field assignation in code will work (if the field is a stored field or an inversable one). :param bool required: whether the value of the field is required (default: ``False``) :param str index: whether the field is indexed in database, and the kind of index. Note: this has no effect on non-stored and virtual fields. The possible values are: * ``"btree"`` or ``True``: standard index, good for many2one * ``"btree_not_null"``: BTREE index without NULL values (useful when most values are NULL, or when NULL is never searched for) * ``"trigram"``: Generalized Inverted Index (GIN) with trigrams (good for full-text search) * ``None`` or ``False``: no index (default) :param default: the default value for the field; this is either a static value, or a function taking a recordset and returning a value; use ``default=None`` to discard default values for the field :type default: value or callable :param dict states: a dictionary mapping state values to lists of UI attribute-value pairs; possible attributes are: ``readonly``, ``required``, ``invisible``. .. warning:: Any state-based condition requires the ``state`` field value to be available on the client-side UI. This is typically done by including it in the relevant views, possibly made invisible if not relevant for the end-user. :param str groups: comma-separated list of group xml ids (string); this restricts the field access to the users of the given groups only :param bool company_dependent: whether the field value is dependent of the current company; The value isn't stored on the model table. It is registered as `ir.property`. When the value of the company_dependent field is needed, an `ir.property` is searched, linked to the current company (and current record if one property exists). If the value is changed on the record, it either modifies the existing property for the current record (if one exists), or creates a new one for the current company and res_id. If the value is changed on the company side, it will impact all records on which the value hasn't been changed. :param bool copy: whether the field value should be copied when the record is duplicated (default: ``True`` for normal fields, ``False`` for ``one2many`` and computed fields, including property fields and related fields) :param bool store: whether the field is stored in database (default:``True``, ``False`` for computed fields) :param str group_operator: aggregate function used by :meth:`~odoo.models.Model.read_group` when grouping on this field. Supported aggregate functions are: * ``array_agg`` : values, including nulls, concatenated into an array * ``count`` : number of rows * ``count_distinct`` : number of distinct rows * ``bool_and`` : true if all values are true, otherwise false * ``bool_or`` : true if at least one value is true, otherwise false * ``max`` : maximum value of all values * ``min`` : minimum value of all values * ``avg`` : the average (arithmetic mean) of all values * ``sum`` : sum of all values :param str group_expand: function used to expand read_group results when grouping on the current field. .. code-block:: python @api.model def _read_group_selection_field(self, values, domain, order): return ['choice1', 'choice2', ...] # available selection choices. @api.model def _read_group_many2one_field(self, records, domain, order): return records + self.search([custom_domain]) .. rubric:: Computed Fields :param str compute: name of a method that computes the field .. seealso:: :ref:`Advanced Fields/Compute fields ` :param bool precompute: whether the field should be computed before record insertion in database. Should be used to specify manually some fields as precompute=True when the field can be computed before record insertion. (e.g. avoid statistics fields based on search/read_group), many2one linking to the previous record, ... (default: `False`) .. warning:: Precomputation only happens when no explicit value and no default value is provided to create(). This means that a default value disables the precomputation, even if the field is specified as precompute=True. Precomputing a field can be counterproductive if the records of the given model are not created in batch. Consider the situation were many records are created one by one. If the field is not precomputed, it will normally be computed in batch at the flush(), and the prefetching mechanism will help making the computation efficient. On the other hand, if the field is precomputed, the computation will be made one by one, and will therefore not be able to take advantage of the prefetching mechanism. Following the remark above, precomputed fields can be interesting on the lines of a one2many, which are usually created in batch by the ORM itself, provided that they are created by writing on the record that contains them. :param bool compute_sudo: whether the field should be recomputed as superuser to bypass access rights (by default ``True`` for stored fields, ``False`` for non stored fields) :param bool recursive: whether the field has recursive dependencies (the field ``X`` has a dependency like ``parent_id.X``); declaring a field recursive must be explicit to guarantee that recomputation is correct :param str inverse: name of a method that inverses the field (optional) :param str search: name of a method that implement search on the field (optional) :param str related: sequence of field names :param bool default_export_compatible: whether the field must be exported by default in an import-compatible export .. seealso:: :ref:`Advanced fields/Related fields ` """ type = None # type of the field (string) relational = False # whether the field is a relational one translate = False # whether the field is translated column_type = None # database column type (ident, spec) write_sequence = 0 # field ordering for write() args = None # the parameters given to __init__() _module = None # the field's module name _modules = None # modules that define this field _setup_done = True # whether the field is completely set up _sequence = None # absolute ordering of the field _base_fields = () # the fields defining self, in override order _extra_keys = () # unknown attributes set on the field _direct = False # whether self may be used directly (shared) _toplevel = False # whether self is on the model's registry class automatic = False # whether the field is automatically created ("magic" field) inherited = False # whether the field is inherited (_inherits) inherited_field = None # the corresponding inherited field name = None # name of the field model_name = None # name of the model of this field comodel_name = None # name of the model of values (if relational) store = True # whether the field is stored in database index = None # how the field is indexed in database manual = False # whether the field is a custom field copy = True # whether the field is copied over by BaseModel.copy() _depends = None # collection of field dependencies _depends_context = None # collection of context key dependencies recursive = False # whether self depends on itself compute = None # compute(recs) computes field on recs compute_sudo = False # whether field should be recomputed as superuser precompute = False # whether field has to be computed before creation inverse = None # inverse(recs) inverses field on recs search = None # search(recs, operator, value) searches on self related = None # sequence of field names, for related fields company_dependent = False # whether ``self`` is company-dependent (property field) default = None # default(recs) returns the default value string = None # field label help = None # field tooltip invisible = False # whether the field is invisible readonly = False # whether the field is readonly required = False # whether the field is required states = None # set readonly and required depending on state groups = None # csv list of group xml ids change_default = False # whether the field may trigger a "user-onchange" related_field = None # corresponding related field group_operator = None # operator for aggregating values group_expand = None # name of method to expand groups in read_group() prefetch = True # the prefetch group (False means no group) default_export_compatible = False # whether the field must be exported by default in an import-compatible export exportable = True def __init__(self, string=Default, **kwargs): kwargs['string'] = string self._sequence = next(_global_seq) self.args = {key: val for key, val in kwargs.items() if val is not Default} def __str__(self): if self.name is None: return "<%s.%s>" % (__name__, type(self).__name__) return "%s.%s" % (self.model_name, self.name) def __repr__(self): if self.name is None: return "<%s.%s>" % (__name__, type(self).__name__) return "%s.%s" % (self.model_name, self.name) ############################################################################ # # Base field setup: things that do not depend on other models/fields # # The base field setup is done by field.__set_name__(), which determines the # field's name, model name, module and its parameters. # # The dictionary field.args gives the parameters passed to the field's # constructor. Most parameters have an attribute of the same name on the # field. The parameters as attributes are assigned by the field setup. # # When several definition classes of the same model redefine a given field, # the field occurrences are "merged" into one new field instantiated at # runtime on the registry class of the model. The occurrences of the field # are given to the new field as the parameter '_base_fields'; it is a list # of fields in override order (or reverse MRO). # # In order to save memory, a field should avoid having field.args and/or # many attributes when possible. We call "direct" a field that can be set # up directly from its definition class. Direct fields are non-related # fields defined on models, and can be shared across registries. We call # "toplevel" a field that is put on the model's registry class, and is # therefore specific to the registry. # # Toplevel field are set up once, and are no longer set up from scratch # after that. Those fields can save memory by discarding field.args and # field._base_fields once set up, because those are no longer necessary. # # Non-toplevel non-direct fields are the fields on definition classes that # may not be shared. In other words, those fields are never used directly, # and are always recreated as toplevel fields. On those fields, the base # setup is useless, because only field.args is used for setting up other # fields. We therefore skip the base setup for those fields. The only # attributes of those fields are: '_sequence', 'args', 'model_name', 'name' # and '_module', which makes their __dict__'s size minimal. def __set_name__(self, owner, name): """ Perform the base setup of a field. :param owner: the owner class of the field (the model's definition or registry class) :param name: the name of the field """ assert issubclass(owner, BaseModel) self.model_name = owner._name self.name = name if is_definition_class(owner): # only for fields on definition classes, not registry classes self._module = owner._module owner._field_definitions.append(self) if not self.args.get('related'): self._direct = True if self._direct or self._toplevel: self._setup_attrs(owner, name) if self._toplevel: # free memory, self.args and self._base_fields are no longer useful self.__dict__.pop('args', None) self.__dict__.pop('_base_fields', None) # # Setup field parameter attributes # def _get_attrs(self, model_class, name): """ Return the field parameter attributes as a dictionary. """ # determine all inherited field attributes attrs = {} modules = [] for field in self.args.get('_base_fields', ()): if not isinstance(self, type(field)): # 'self' overrides 'field' and their types are not compatible; # so we ignore all the parameters collected so far attrs.clear() modules.clear() continue attrs.update(field.args) if field._module: modules.append(field._module) attrs.update(self.args) if self._module: modules.append(self._module) attrs['args'] = self.args attrs['model_name'] = model_class._name attrs['name'] = name attrs['_module'] = modules[-1] if modules else None attrs['_modules'] = tuple(set(modules)) # initialize ``self`` with ``attrs`` if name == 'state': # by default, `state` fields should be reset on copy attrs['copy'] = attrs.get('copy', False) if attrs.get('compute'): # by default, computed fields are not stored, computed in superuser # mode if stored, not copied (unless stored and explicitly not # readonly), and readonly (unless inversible) attrs['store'] = store = attrs.get('store', False) attrs['compute_sudo'] = attrs.get('compute_sudo', store) if not (attrs['store'] and not attrs.get('readonly', True)): attrs['copy'] = attrs.get('copy', False) attrs['readonly'] = attrs.get('readonly', not attrs.get('inverse')) if attrs.get('related'): # by default, related fields are not stored, computed in superuser # mode, not copied and readonly attrs['store'] = store = attrs.get('store', False) attrs['compute_sudo'] = attrs.get('compute_sudo', attrs.get('related_sudo', True)) attrs['copy'] = attrs.get('copy', False) attrs['readonly'] = attrs.get('readonly', True) if attrs.get('precompute'): if not attrs.get('compute') and not attrs.get('related'): warnings.warn(f"precompute attribute doesn't make any sense on non computed field {self}") attrs['precompute'] = False elif not attrs.get('store'): warnings.warn(f"precompute attribute has no impact on non stored field {self}") attrs['precompute'] = False if attrs.get('company_dependent'): # by default, company-dependent fields are not stored, not computed # in superuser mode and not copied attrs['store'] = False attrs['compute_sudo'] = attrs.get('compute_sudo', False) attrs['copy'] = attrs.get('copy', False) attrs['default'] = attrs.get('default', self._default_company_dependent) attrs['compute'] = self._compute_company_dependent if not attrs.get('readonly'): attrs['inverse'] = self._inverse_company_dependent attrs['search'] = self._search_company_dependent attrs['depends_context'] = attrs.get('depends_context', ()) + ('company',) # parameters 'depends' and 'depends_context' are stored in attributes # '_depends' and '_depends_context', respectively if 'depends' in attrs: attrs['_depends'] = tuple(attrs.pop('depends')) if 'depends_context' in attrs: attrs['_depends_context'] = tuple(attrs.pop('depends_context')) return attrs def _setup_attrs(self, model_class, name): """ Initialize the field parameter attributes. """ attrs = self._get_attrs(model_class, name) # determine parameters that must be validated extra_keys = [key for key in attrs if not hasattr(self, key)] if extra_keys: attrs['_extra_keys'] = extra_keys self.__dict__.update(attrs) # prefetch only stored, column, non-manual fields if not self.store or not self.column_type or self.manual: self.prefetch = False if not self.string and not self.related: # related fields get their string from their parent field self.string = ( name[:-4] if name.endswith('_ids') else name[:-3] if name.endswith('_id') else name ).replace('_', ' ').title() # self.default must be either None or a callable if self.default is not None and not callable(self.default): value = self.default self.default = lambda model: value ############################################################################ # # Complete field setup: everything else # def prepare_setup(self): self._setup_done = False def setup(self, model): """ Perform the complete setup of a field. """ if not self._setup_done: # validate field params for key in self._extra_keys: if not model._valid_field_parameter(self, key): _logger.warning( "Field %s: unknown parameter %r, if this is an actual" " parameter you may want to override the method" " _valid_field_parameter on the relevant model in order to" " allow it", self, key ) if self.related: self.setup_related(model) else: self.setup_nonrelated(model) self._setup_done = True # # Setup of non-related fields # def setup_nonrelated(self, model): """ Determine the dependencies and inverse field(s) of ``self``. """ pass def get_depends(self, model): """ Return the field's dependencies and cache dependencies. """ if self._depends is not None: # the parameter 'depends' has priority over 'depends' on compute return self._depends, self._depends_context or () if self.related: if self._depends_context is not None: depends_context = self._depends_context else: related_model = model.env[self.related_field.model_name] depends, depends_context = self.related_field.get_depends(related_model) return [self.related], depends_context if not self.compute: return (), self._depends_context or () # determine the functions implementing self.compute if isinstance(self.compute, str): funcs = resolve_mro(model, self.compute, callable) else: funcs = [self.compute] # collect depends and depends_context depends = [] depends_context = list(self._depends_context or ()) for func in funcs: deps = getattr(func, '_depends', ()) depends.extend(deps(model) if callable(deps) else deps) depends_context.extend(getattr(func, '_depends_context', ())) # display_name may depend on context['lang'] (`test_lp1071710`) if self.automatic and self.name == 'display_name' and model._rec_name: if model._fields[model._rec_name].base_field.translate: if 'lang' not in depends_context: depends_context.append('lang') return depends, depends_context # # Setup of related fields # def setup_related(self, model): """ Setup the attributes of a related field. """ assert isinstance(self.related, str), self.related # determine the chain of fields, and make sure they are all set up model_name = self.model_name for name in self.related.split('.'): field = model.pool[model_name]._fields.get(name) if field is None: raise KeyError( f"Field {name} referenced in related field definition {self} does not exist." ) if not field._setup_done: field.setup(model.env[model_name]) model_name = field.comodel_name self.related_field = field # check type consistency if self.type != field.type: raise TypeError("Type of related field %s is inconsistent with %s" % (self, field)) # determine dependencies, compute, inverse, and search self.compute = self._compute_related if self.inherited or not (self.readonly or field.readonly): self.inverse = self._inverse_related if field._description_searchable: # allow searching on self only if the related field is searchable self.search = self._search_related # A readonly related field without an inverse method should not have a # default value, as it does not make sense. if self.default and self.readonly and not self.inverse: _logger.warning("Redundant default on %s", self) # copy attributes from field to self (string, help, etc.) for attr, prop in self.related_attrs: # check whether 'attr' is explicitly set on self (from its field # definition), and ignore its class-level value (only a default) if attr not in self.__dict__ and prop.startswith('_related_'): setattr(self, attr, getattr(field, prop)) for attr in field._extra_keys: if not hasattr(self, attr) and model._valid_field_parameter(self, attr): setattr(self, attr, getattr(field, attr)) # special cases of inherited fields if self.inherited: self.inherited_field = field if not self.states: self.states = field.states if field.required: self.required = True # add modules from delegate and target fields; the first one ensures # that inherited fields introduced via an abstract model (_inherits # being on the abstract model) are assigned an XML id delegate_field = model._fields[self.related.split('.')[0]] self._modules = tuple({*self._modules, *delegate_field._modules, *field._modules}) def traverse_related(self, record): """ Traverse the fields of the related field `self` except for the last one, and return it as a pair `(last_record, last_field)`. """ for name in self.related.split('.')[:-1]: record = first(record[name]) return record, self.related_field def _compute_related(self, records): """ Compute the related field ``self`` on ``records``. """ # # Traverse fields one by one for all records, in order to take advantage # of prefetching for each field access. In order to clarify the impact # of the algorithm, consider traversing 'foo.bar' for records a1 and a2, # where 'foo' is already present in cache for a1, a2. Initially, both a1 # and a2 are marked for prefetching. As the commented code below shows, # traversing all fields one record at a time will fetch 'bar' one record # at a time. # # b1 = a1.foo # mark b1 for prefetching # v1 = b1.bar # fetch/compute bar for b1 # b2 = a2.foo # mark b2 for prefetching # v2 = b2.bar # fetch/compute bar for b2 # # On the other hand, traversing all records one field at a time ensures # maximal prefetching for each field access. # # b1 = a1.foo # mark b1 for prefetching # b2 = a2.foo # mark b2 for prefetching # v1 = b1.bar # fetch/compute bar for b1, b2 # v2 = b2.bar # value already in cache # # This difference has a major impact on performance, in particular in # the case where 'bar' is a computed field that takes advantage of batch # computation. # values = list(records) for name in self.related.split('.')[:-1]: try: values = [first(value[name]) for value in values] except AccessError as e: description = records.env['ir.model']._get(records._name).name raise AccessError( _("%(previous_message)s\n\nImplicitly accessed through '%(document_kind)s' (%(document_model)s).") % { 'previous_message': e.args[0], 'document_kind': description, 'document_model': records._name, } ) # assign final values to records for record, value in zip(records, values): record[self.name] = self._process_related(value[self.related_field.name]) def _process_related(self, value): """No transformation by default, but allows override.""" return value def _inverse_related(self, records): """ Inverse the related field ``self`` on ``records``. """ # store record values, otherwise they may be lost by cache invalidation! record_value = {record: record[self.name] for record in records} for record in records: target, field = self.traverse_related(record) # update 'target' only if 'record' and 'target' are both real or # both new (see `test_base_objects.py`, `test_basic`) if target and bool(target.id) == bool(record.id): target[field.name] = record_value[record] def _search_related(self, records, operator, value): """ Determine the domain to search on field ``self``. """ return [(self.related, operator, value)] # properties used by setup_related() to copy values from related field _related_comodel_name = property(attrgetter('comodel_name')) _related_string = property(attrgetter('string')) _related_help = property(attrgetter('help')) _related_groups = property(attrgetter('groups')) _related_group_operator = property(attrgetter('group_operator')) @property def base_field(self): """ Return the base field of an inherited field, or ``self``. """ return self.inherited_field.base_field if self.inherited_field else self @property def groupable(self): """ Return whether the field may be used for grouping in :meth:`~odoo.models.BaseModel.read_group`. """ return self.store and self.column_type # # Company-dependent fields # def _default_company_dependent(self, model): return model.env['ir.property']._get(self.name, self.model_name) def _compute_company_dependent(self, records): # read property as superuser, as the current user may not have access Property = records.env['ir.property'].sudo() values = Property._get_multi(self.name, self.model_name, records.ids) for record in records: record[self.name] = values.get(record.id) def _inverse_company_dependent(self, records): # update property as superuser, as the current user may not have access Property = records.env['ir.property'].sudo() values = { record.id: self.convert_to_write(record[self.name], record) for record in records } Property._set_multi(self.name, self.model_name, values) def _search_company_dependent(self, records, operator, value): Property = records.env['ir.property'].sudo() return Property.search_multi(self.name, self.model_name, operator, value) # # Setup of field triggers # def resolve_depends(self, registry): """ Return the dependencies of `self` as a collection of field tuples. """ Model0 = registry[self.model_name] for dotnames in registry.field_depends[self]: field_seq = [] model_name = self.model_name check_precompute = self.precompute for index, fname in enumerate(dotnames.split('.')): Model = registry[model_name] if Model0._transient and not Model._transient: # modifying fields on regular models should not trigger # recomputations of fields on transient models break try: field = Model._fields[fname] except KeyError: raise ValueError( f"Wrong @depends on '{self.compute}' (compute method of field {self}). " f"Dependency field '{fname}' not found in model {model_name}." ) if field is self and index and not self.recursive: self.recursive = True warnings.warn(f"Field {self} should be declared with recursive=True") # precomputed fields can depend on non-precomputed ones, as long # as they are reachable through at least one many2one field if check_precompute and field.store and field.compute and not field.precompute: warnings.warn(f"Field {self} cannot be precomputed as it depends on non-precomputed field {field}") self.precompute = False if field_seq and not field_seq[-1]._description_searchable: # the field before this one is not searchable, so there is # no way to know which on records to recompute self warnings.warn( f"Field {field_seq[-1]!r} in dependency of {self} should be searchable. " f"This is necessary to determine which records to recompute when {field} is modified. " f"You should either make the field searchable, or simplify the field dependency." ) field_seq.append(field) # do not make self trigger itself: for instance, a one2many # field line_ids with domain [('foo', ...)] will have # 'line_ids.foo' as a dependency if not (field is self and not index): yield tuple(field_seq) if field.type == 'one2many': for inv_field in Model.pool.field_inverses[field]: yield tuple(field_seq) + (inv_field,) if check_precompute and field.type == 'many2one': check_precompute = False model_name = field.comodel_name ############################################################################ # # Field description # def get_description(self, env, attributes=None): """ Return a dictionary that describes the field ``self``. """ desc = {} for attr, prop in self.description_attrs: if attributes is not None and attr not in attributes: continue if not prop.startswith('_description_'): continue value = getattr(self, prop) if callable(value): value = value(env) if value is not None: desc[attr] = value return desc # properties used by get_description() _description_name = property(attrgetter('name')) _description_type = property(attrgetter('type')) _description_store = property(attrgetter('store')) _description_manual = property(attrgetter('manual')) _description_related = property(attrgetter('related')) _description_company_dependent = property(attrgetter('company_dependent')) _description_readonly = property(attrgetter('readonly')) _description_required = property(attrgetter('required')) _description_states = property(attrgetter('states')) _description_groups = property(attrgetter('groups')) _description_change_default = property(attrgetter('change_default')) _description_group_operator = property(attrgetter('group_operator')) _description_default_export_compatible = property(attrgetter('default_export_compatible')) _description_exportable = property(attrgetter('exportable')) def _description_depends(self, env): return env.registry.field_depends[self] @property def _description_searchable(self): return bool(self.store or self.search) @property def _description_sortable(self): return (self.column_type and self.store) or (self.inherited and self.related_field._description_sortable) def _description_string(self, env): if self.string and env.lang: model_name = self.base_field.model_name field_string = env['ir.model.fields'].get_field_string(model_name) return field_string.get(self.name) or self.string return self.string def _description_help(self, env): if self.help and env.lang: model_name = self.base_field.model_name field_help = env['ir.model.fields'].get_field_help(model_name) return field_help.get(self.name) or self.help return self.help def is_editable(self): """ Return whether the field can be editable in a view. """ return not self.readonly or self.states and any( 'readonly' in item for items in self.states.values() for item in items ) ############################################################################ # # Conversion of values # def null(self, record): """ Return the null value for this field in the record format. """ return False def convert_to_column(self, value, record, values=None, validate=True): """ Convert ``value`` from the ``write`` format to the SQL format. """ if value is None or value is False: return None return pycompat.to_text(value) def convert_to_cache(self, value, record, validate=True): """ Convert ``value`` to the cache format; ``value`` may come from an assignment, or have the format of methods :meth:`BaseModel.read` or :meth:`BaseModel.write`. If the value represents a recordset, it should be added for prefetching on ``record``. :param value: :param record: :param bool validate: when True, field-specific validation of ``value`` will be performed """ return value def convert_to_record(self, value, record): """ Convert ``value`` from the cache format to the record format. If the value represents a recordset, it should share the prefetching of ``record``. """ return False if value is None else value def convert_to_record_multi(self, values, records): """ Convert a list of values from the cache format to the record format. Some field classes may override this method to add optimizations for batch processing. """ # spare the method lookup overhead convert = self.convert_to_record return [convert(value, record) for value, record in zip(values, records)] def convert_to_read(self, value, record, use_name_get=True): """ Convert ``value`` from the record format to the format returned by method :meth:`BaseModel.read`. :param value: :param record: :param bool use_name_get: when True, the value's display name will be computed using :meth:`BaseModel.name_get`, if relevant for the field """ return False if value is None else value def convert_to_write(self, value, record): """ Convert ``value`` from any format to the format of method :meth:`BaseModel.write`. """ cache_value = self.convert_to_cache(value, record, validate=False) record_value = self.convert_to_record(cache_value, record) return self.convert_to_read(record_value, record) def convert_to_onchange(self, value, record, names): """ Convert ``value`` from the record format to the format returned by method :meth:`BaseModel.onchange`. :param value: :param record: :param names: a tree of field names (for relational fields only) """ return self.convert_to_read(value, record) def convert_to_export(self, value, record): """ Convert ``value`` from the record format to the export format. """ if not value: return '' return value def convert_to_display_name(self, value, record): """ Convert ``value`` from the record format to a suitable display name. """ return ustr(value) if value else False ############################################################################ # # Update database schema # @property def column_order(self): """ Prescribed column order in table. """ return 0 if self.column_type is None else SQL_ORDER_BY_TYPE[self.column_type[0]] def update_db(self, model, columns): """ Update the database schema to implement this field. :param model: an instance of the field's model :param columns: a dict mapping column names to their configuration in database :return: ``True`` if the field must be recomputed on existing rows """ if not self.column_type: return column = columns.get(self.name) # create/update the column, not null constraint; the index will be # managed by registry.check_indexes() self.update_db_column(model, column) self.update_db_notnull(model, column) # optimization for computing simple related fields like 'foo_id.bar' if ( not column and self.related and self.related.count('.') == 1 and self.related_field.store and not self.related_field.compute and not (self.related_field.type == 'binary' and self.related_field.attachment) and self.related_field.type not in ('one2many', 'many2many') ): join_field = model._fields[self.related.split('.')[0]] if ( join_field.type == 'many2one' and join_field.store and not join_field.compute ): model.pool.post_init(self.update_db_related, model) # discard the "classical" computation return False return not column def update_db_column(self, model, column): """ Create/update the column corresponding to ``self``. :param model: an instance of the field's model :param column: the column's configuration (dict) if it exists, or ``None`` """ if not column: # the column does not exist, create it sql.create_column(model._cr, model._table, self.name, self.column_type[1], self.string) return if column['udt_name'] == self.column_type[0]: return if column['is_nullable'] == 'NO': sql.drop_not_null(model._cr, model._table, self.name) self._convert_db_column(model, column) def _convert_db_column(self, model, column): """ Convert the given database column to the type of the field. """ sql.convert_column(model._cr, model._table, self.name, self.column_type[1]) def update_db_notnull(self, model, column): """ Add or remove the NOT NULL constraint on ``self``. :param model: an instance of the field's model :param column: the column's configuration (dict) if it exists, or ``None`` """ has_notnull = column and column['is_nullable'] == 'NO' if not column or (self.required and not has_notnull): # the column is new or it becomes required; initialize its values if model._table_has_rows(): model._init_column(self.name) if self.required and not has_notnull: # _init_column may delay computations in post-init phase @model.pool.post_init def add_not_null(): # flush values before adding NOT NULL constraint model.flush_model([self.name]) model.pool.post_constraint(apply_required, model, self.name) elif not self.required and has_notnull: sql.drop_not_null(model._cr, model._table, self.name) def update_db_related(self, model): """ Compute a stored related field directly in SQL. """ comodel = model.env[self.related_field.model_name] join_field, comodel_field = self.related.split('.') model.env.cr.execute(""" UPDATE "{model_table}" AS x SET "{model_field}" = y."{comodel_field}" FROM "{comodel_table}" AS y WHERE x."{join_field}" = y.id """.format( model_table=model._table, model_field=self.name, comodel_table=comodel._table, comodel_field=comodel_field, join_field=join_field, )) ############################################################################ # # Alternatively stored fields: if fields don't have a `column_type` (not # stored as regular db columns) they go through a read/create/write # protocol instead # def read(self, records): """ Read the value of ``self`` on ``records``, and store it in cache. """ if not self.column_type: raise NotImplementedError("Method read() undefined on %s" % self) def create(self, record_values): """ Write the value of ``self`` on the given records, which have just been created. :param record_values: a list of pairs ``(record, value)``, where ``value`` is in the format of method :meth:`BaseModel.write` """ for record, value in record_values: self.write(record, value) def write(self, records, value): """ Write the value of ``self`` on ``records``. This method must update the cache and prepare database updates. :param records: :param value: a value in any format :return: the subset of `records` that have been modified """ # discard recomputation of self on records records.env.remove_to_compute(self, records) # discard the records that are not modified cache = records.env.cache cache_value = self.convert_to_cache(value, records) records = cache.get_records_different_from(records, self, cache_value) if not records: return records # update the cache dirty = self.store and any(records._ids) cache.update(records, self, itertools.repeat(cache_value), dirty=dirty) return records ############################################################################ # # Descriptor methods # def __get__(self, record, owner): """ return the value of field ``self`` on ``record`` """ if record is None: return self # the field is accessed through the owner class if not record._ids: # null record -> return the null value for this field value = self.convert_to_cache(False, record, validate=False) return self.convert_to_record(value, record) env = record.env # only a single record may be accessed record.ensure_one() if self.compute and self.store: # process pending computations self.recompute(record) try: value = env.cache.get(record, self) except KeyError: # behavior in case of cache miss: # # on a real record: # stored -> fetch from database (computation done above) # not stored and computed -> compute # not stored and not computed -> default # # on a new record w/ origin: # stored and not (computed and readonly) -> fetch from origin # stored and computed and readonly -> compute # not stored and computed -> compute # not stored and not computed -> default # # on a new record w/o origin: # stored and computed -> compute # stored and not computed -> new delegate or default # not stored and computed -> compute # not stored and not computed -> default # if self.store and record.id: # real record: fetch from database recs = record._in_cache_without(self) try: recs._fetch_field(self) except AccessError: record._fetch_field(self) if not env.cache.contains(record, self): raise MissingError("\n".join([ _("Record does not exist or has been deleted."), _("(Record: %s, User: %s)") % (record, env.uid), ])) value = env.cache.get(record, self) elif self.store and record._origin and not (self.compute and self.readonly): # new record with origin: fetch from origin value = self.convert_to_cache(record._origin[self.name], record) env.cache.set(record, self, value) elif self.compute: # non-stored field or new record without origin: compute if env.is_protected(self, record): value = self.convert_to_cache(False, record, validate=False) env.cache.set(record, self, value) else: recs = record if self.recursive else record._in_cache_without(self) try: self.compute_value(recs) except (AccessError, MissingError): self.compute_value(record) try: value = env.cache.get(record, self) except CacheMiss: if self.readonly and not self.store: raise ValueError("Compute method failed to assign %s.%s" % (record, self.name)) # fallback to null value if compute gives nothing value = self.convert_to_cache(False, record, validate=False) env.cache.set(record, self, value) elif self.type == 'many2one' and self.delegate and not record.id: # parent record of a new record: new record, with the same # values as record for the corresponding inherited fields def is_inherited_field(name): field = record._fields[name] return field.inherited and field.related.split('.')[0] == self.name parent = record.env[self.comodel_name].new({ name: value for name, value in record._cache.items() if is_inherited_field(name) }) # in case the delegate field has inverse one2many fields, this # updates the inverse fields as well record._update_cache({self.name: parent}, validate=False) value = env.cache.get(record, self) else: # non-stored field or stored field on new record: default value value = self.convert_to_cache(False, record, validate=False) env.cache.set(record, self, value) defaults = record.default_get([self.name]) if self.name in defaults: # The null value above is necessary to convert x2many field # values. For instance, converting [(Command.LINK, id)] # accesses the field's current value, then adds the given # id. Without an initial value, the conversion ends up here # to determine the field's value, and generates an infinite # recursion. value = self.convert_to_cache(defaults[self.name], record) env.cache.set(record, self, value) return self.convert_to_record(value, record) def mapped(self, records): """ Return the values of ``self`` for ``records``, either as a list (scalar fields), or as a recordset (relational fields). This method is meant to be used internally and has very little benefit over a simple call to `~odoo.models.BaseModel.mapped()` on a recordset. """ if self.name == 'id': # not stored in cache return list(records._ids) if self.compute and self.store: # process pending computations self.recompute(records) # retrieve values in cache, and fetch missing ones vals = records.env.cache.get_until_miss(records, self) while len(vals) < len(records): # It is important to construct a 'remaining' recordset with the # _prefetch_ids of the original recordset, in order to prefetch as # many records as possible. If not done this way, scenarios such as # [rec.line_ids.mapped('name') for rec in recs] would generate one # query per record in `recs`! remaining = records.__class__(records.env, records._ids[len(vals):], records._prefetch_ids) self.__get__(first(remaining), type(remaining)) vals += records.env.cache.get_until_miss(remaining, self) return self.convert_to_record_multi(vals, records) def __set__(self, records, value): """ set the value of field ``self`` on ``records`` """ protected_ids = [] new_ids = [] other_ids = [] for record_id in records._ids: if record_id in records.env._protected.get(self, ()): protected_ids.append(record_id) elif not record_id: new_ids.append(record_id) else: other_ids.append(record_id) if protected_ids: # records being computed: no business logic, no recomputation protected_records = records.browse(protected_ids) self.write(protected_records, value) if new_ids: # new records: no business logic new_records = records.browse(new_ids) with records.env.protecting(records.pool.field_computed.get(self, [self]), records): if self.relational: new_records.modified([self.name], before=True) self.write(new_records, value) new_records.modified([self.name]) if self.inherited: # special case: also assign parent records if they are new parents = records[self.related.split('.')[0]] parents.filtered(lambda r: not r.id)[self.name] = value if other_ids: # base case: full business logic records = records.browse(other_ids) write_value = self.convert_to_write(value, records) records.write({self.name: write_value}) ############################################################################ # # Computation of field values # def recompute(self, records): """ Process the pending computations of ``self`` on ``records``. This should be called only if ``self`` is computed and stored. """ to_compute_ids = records.env.all.tocompute.get(self) if not to_compute_ids: return def apply_except_missing(func, records): """ Apply `func` on `records`, with a fallback ignoring non-existent records. """ try: func(records) except MissingError: existing = records.exists() if existing: func(existing) # mark the field as computed on missing records, otherwise they # remain to compute forever, which may lead to an infinite loop missing = records - existing for f in records.pool.field_computed[self]: records.env.remove_to_compute(f, missing) if self.recursive: # recursive computed fields are computed record by record, in order # to recursively handle dependencies inside records def recursive_compute(records): for record in records: if record.id in to_compute_ids: self.compute_value(record) apply_except_missing(recursive_compute, records) return for record in records: if record.id in to_compute_ids: ids = expand_ids(record.id, to_compute_ids) recs = record.browse(itertools.islice(ids, PREFETCH_MAX)) try: apply_except_missing(self.compute_value, recs) except AccessError: self.compute_value(record) def compute_value(self, records): """ Invoke the compute method on ``records``; the results are in cache. """ env = records.env if self.compute_sudo: records = records.sudo() fields = records.pool.field_computed[self] # Just in case the compute method does not assign a value, we already # mark the computation as done. This is also necessary if the compute # method accesses the old value of the field: the field will be fetched # with _read(), which will flush() it. If the field is still to compute, # the latter flush() will recursively compute this field! for field in fields: if field.store: env.remove_to_compute(field, records) try: with records.env.protecting(fields, records): records._compute_field_value(self) except Exception: for field in fields: if field.store: env.add_to_compute(field, records) raise def determine_inverse(self, records): """ Given the value of ``self`` on ``records``, inverse the computation. """ determine(self.inverse, records) def determine_domain(self, records, operator, value): """ Return a domain representing a condition on ``self``. """ return determine(self.search, records, operator, value) class Boolean(Field): """ Encapsulates a :class:`bool`. """ type = 'boolean' column_type = ('bool', 'bool') def convert_to_column(self, value, record, values=None, validate=True): return bool(value) def convert_to_cache(self, value, record, validate=True): return bool(value) def convert_to_export(self, value, record): return value class Integer(Field): """ Encapsulates an :class:`int`. """ type = 'integer' column_type = ('int4', 'int4') group_operator = 'sum' def convert_to_column(self, value, record, values=None, validate=True): return int(value or 0) def convert_to_cache(self, value, record, validate=True): if isinstance(value, dict): # special case, when an integer field is used as inverse for a one2many return value.get('id', None) return int(value or 0) def convert_to_record(self, value, record): return value or 0 def convert_to_read(self, value, record, use_name_get=True): # Integer values greater than 2^31-1 are not supported in pure XMLRPC, # so we have to pass them as floats :-( if value and value > MAXINT: return float(value) return value def _update(self, records, value): # special case, when an integer field is used as inverse for a one2many cache = records.env.cache for record in records: cache.set(record, self, value.id or 0) def convert_to_export(self, value, record): if value or value == 0: return value return '' class Float(Field): """ Encapsulates a :class:`float`. The precision digits are given by the (optional) ``digits`` attribute. :param digits: a pair (total, decimal) or a string referencing a :class:`~odoo.addons.base.models.decimal_precision.DecimalPrecision` record name. :type digits: tuple(int,int) or str When a float is a quantity associated with an unit of measure, it is important to use the right tool to compare or round values with the correct precision. The Float class provides some static methods for this purpose: :func:`~odoo.fields.Float.round()` to round a float with the given precision. :func:`~odoo.fields.Float.is_zero()` to check if a float equals zero at the given precision. :func:`~odoo.fields.Float.compare()` to compare two floats at the given precision. .. admonition:: Example To round a quantity with the precision of the unit of measure:: fields.Float.round(self.product_uom_qty, precision_rounding=self.product_uom_id.rounding) To check if the quantity is zero with the precision of the unit of measure:: fields.Float.is_zero(self.product_uom_qty, precision_rounding=self.product_uom_id.rounding) To compare two quantities:: field.Float.compare(self.product_uom_qty, self.qty_done, precision_rounding=self.product_uom_id.rounding) The compare helper uses the __cmp__ semantics for historic purposes, therefore the proper, idiomatic way to use this helper is like so: if result == 0, the first and second floats are equal if result < 0, the first float is lower than the second if result > 0, the first float is greater than the second """ type = 'float' _digits = None # digits argument passed to class initializer group_operator = 'sum' def __init__(self, string=Default, digits=Default, **kwargs): super(Float, self).__init__(string=string, _digits=digits, **kwargs) @property def column_type(self): # Explicit support for "falsy" digits (0, False) to indicate a NUMERIC # field with no fixed precision. The values are saved in the database # with all significant digits. # FLOAT8 type is still the default when there is no precision because it # is faster for most operations (sums, etc.) return ('numeric', 'numeric') if self._digits is not None else \ ('float8', 'double precision') def get_digits(self, env): if isinstance(self._digits, str): precision = env['decimal.precision'].precision_get(self._digits) return 16, precision else: return self._digits _related__digits = property(attrgetter('_digits')) def _description_digits(self, env): return self.get_digits(env) def convert_to_column(self, value, record, values=None, validate=True): result = float(value or 0.0) digits = self.get_digits(record.env) if digits: precision, scale = digits result = float_repr(float_round(result, precision_digits=scale), precision_digits=scale) return result def convert_to_cache(self, value, record, validate=True): # apply rounding here, otherwise value in cache may be wrong! value = float(value or 0.0) digits = self.get_digits(record.env) return float_round(value, precision_digits=digits[1]) if digits else value def convert_to_record(self, value, record): return value or 0.0 def convert_to_export(self, value, record): if value or value == 0.0: return value return '' round = staticmethod(float_round) is_zero = staticmethod(float_is_zero) compare = staticmethod(float_compare) class Monetary(Field): """ Encapsulates a :class:`float` expressed in a given :class:`res_currency`. The decimal precision and currency symbol are taken from the ``currency_field`` attribute. :param str currency_field: name of the :class:`Many2one` field holding the :class:`res_currency ` this monetary field is expressed in (default: `\'currency_id\'`) """ type = 'monetary' write_sequence = 10 column_type = ('numeric', 'numeric') currency_field = None group_operator = 'sum' def __init__(self, string=Default, currency_field=Default, **kwargs): super(Monetary, self).__init__(string=string, currency_field=currency_field, **kwargs) def _description_currency_field(self, env): return self.get_currency_field(env[self.model_name]) def get_currency_field(self, model): """ Return the name of the currency field. """ return self.currency_field or ( 'currency_id' if 'currency_id' in model._fields else 'x_currency_id' if 'x_currency_id' in model._fields else None ) def setup_nonrelated(self, model): super().setup_nonrelated(model) assert self.get_currency_field(model) in model._fields, \ "Field %s with unknown currency_field %r" % (self, self.get_currency_field(model)) def setup_related(self, model): super().setup_related(model) if self.inherited: self.currency_field = self.related_field.get_currency_field(model.env[self.related_field.model_name]) assert self.get_currency_field(model) in model._fields, \ "Field %s with unknown currency_field %r" % (self, self.get_currency_field(model)) def convert_to_column(self, value, record, values=None, validate=True): # retrieve currency from values or record currency_field_name = self.get_currency_field(record) currency_field = record._fields[currency_field_name] if values and currency_field_name in values: dummy = record.new({currency_field_name: values[currency_field_name]}) currency = dummy[currency_field_name] elif values and currency_field.related and currency_field.related.split('.')[0] in values: related_field_name = currency_field.related.split('.')[0] dummy = record.new({related_field_name: values[related_field_name]}) currency = dummy[currency_field_name] else: # Note: this is wrong if 'record' is several records with different # currencies, which is functional nonsense and should not happen # BEWARE: do not prefetch other fields, because 'value' may be in # cache, and would be overridden by the value read from database! currency = record[:1].with_context(prefetch_fields=False)[currency_field_name] currency = currency.with_env(record.env) value = float(value or 0.0) if currency: return float_repr(currency.round(value), currency.decimal_places) return value def convert_to_cache(self, value, record, validate=True): # cache format: float value = float(value or 0.0) if value and validate: # FIXME @rco-odoo: currency may not be already initialized if it is # a function or related field! # BEWARE: do not prefetch other fields, because 'value' may be in # cache, and would be overridden by the value read from database! currency_field = self.get_currency_field(record) currency = record.sudo().with_context(prefetch_fields=False)[currency_field] if len(currency) > 1: raise ValueError("Got multiple currencies while assigning values of monetary field %s" % str(self)) elif currency: value = currency.with_env(record.env).round(value) return value def convert_to_record(self, value, record): return value or 0.0 def convert_to_read(self, value, record, use_name_get=True): return value def convert_to_write(self, value, record): return value class _String(Field): """ Abstract class for string fields. """ translate = False # whether the field is translated unaccent = True def __init__(self, string=Default, **kwargs): # translate is either True, False, or a callable if 'translate' in kwargs and not callable(kwargs['translate']): kwargs['translate'] = bool(kwargs['translate']) super(_String, self).__init__(string=string, **kwargs) _related_translate = property(attrgetter('translate')) def _description_translate(self, env): return bool(self.translate) def _convert_db_column(self, model, column): # specialized implementation for converting from/to translated fields if self.translate or column['udt_name'] == 'jsonb': sql.convert_column_translatable(model._cr, model._table, self.name, self.column_type[1]) else: sql.convert_column(model._cr, model._table, self.name, self.column_type[1]) def get_trans_terms(self, value): """ Return the sequence of terms to translate found in `value`. """ if not callable(self.translate): return [value] if value else [] terms = [] self.translate(terms.append, value) return terms def get_text_content(self, term): """ Return the textual content for the given term. """ func = getattr(self.translate, 'get_text_content', lambda term: term) return func(term) def convert_to_column(self, value, record, values=None, validate=True): cache_value = self.convert_to_cache(value, record) if cache_value is None: return None if callable(self.translate): # pylint: disable=not-callable cache_value = self.translate(lambda t: None, cache_value) if self.translate: cache_value = {'en_US': cache_value, record.env.lang or 'en_US': cache_value} return self._convert_from_cache_to_column(cache_value) def _convert_from_cache_to_column(self, value): """ Convert from cache_raw value to column value """ if value is None: return None return PsycopgJson(value) if self.translate else value def convert_to_cache(self, value, record, validate=True): if value is None or value is False: return None return value def convert_to_record(self, value, record): if value is None: return False if callable(self.translate) and record.env.context.get('edit_translations'): terms = self.get_trans_terms(value) base_lang = record._get_base_lang() if base_lang != (record.env.lang or 'en_US'): base_value = record.with_context(edit_translations=None, lang=base_lang)[self.name] base_terms = self.get_trans_terms(base_value) term_to_state = {term: "translated" if base_term != term else "to_translate" for term, base_term in zip(terms, base_terms)} else: term_to_state = defaultdict(lambda: 'translated') # use a wrapper to let the frontend js code identify each term and its metadata in the 'edit_translations' context # pylint: disable=not-callable value = self.translate( lambda term: f'''{term}''', value ) return value def convert_to_write(self, value, record): return value def get_translation_dictionary(self, from_lang_value, to_lang_values): """ Build a dictionary from terms in from_lang_value to terms in to_lang_values :param str from_lang_value: from xml/html :param dict to_lang_values: {lang: lang_value} :return: {from_lang_term: {lang: lang_term}} :rtype: dict """ from_lang_terms = self.get_trans_terms(from_lang_value) dictionary = defaultdict(lambda: defaultdict(dict)) for lang, to_lang_value in to_lang_values.items(): to_lang_terms = self.get_trans_terms(to_lang_value) if len(from_lang_terms) != len(to_lang_terms): for from_lang_term in from_lang_terms: dictionary[from_lang_term][lang] = from_lang_term else: for from_lang_term, to_lang_term in zip(from_lang_terms, to_lang_terms): dictionary[from_lang_term][lang] = to_lang_term return dictionary def _get_stored_translations(self, record): """ : return: {'en_US': 'value_en_US', 'fr_FR': 'French'} """ # assert (self.translate and self.store and record) record.flush_recordset([self.name]) cr = record.env.cr cr.execute(f'SELECT "{self.name}" FROM "{record._table}" WHERE id = %s', (record.id,)) res = cr.fetchone() return res[0] if res else None def write(self, records, value): if not self.translate or value is False or value is None: return super().write(records, value) cache = records.env.cache cache_value = self.convert_to_cache(value, records) records = cache.get_records_different_from(records, self, cache_value) if not records: return records # flush dirty None values dirty_records = records & cache.get_dirty_records(records, self) if any(v is None for v in cache.get_values(dirty_records, self)): dirty_records.flush_recordset([self.name]) dirty = self.store and any(records._ids) lang = records.env.lang or 'en_US' # not dirty fields if not dirty: cache.update_raw(records, self, [{lang: cache_value} for _id in records._ids], dirty=False) return records # model translation if not callable(self.translate): # invalidate clean fields because them may contain fallback value clean_records = records - cache.get_dirty_records(records, self) clean_records.invalidate_recordset([self.name]) cache.update(records, self, itertools.repeat(cache_value), dirty=True) if lang != 'en_US' and not records.env['res.lang']._lang_get_id('en_US'): # if 'en_US' is not active, we always write en_US to make sure value_en is meaningful cache.update(records.with_context(lang='en_US'), self, itertools.repeat(cache_value), dirty=True) return records # model term translation new_translations_list = [] # pylint: disable=not-callable cache_value = self.translate(lambda t: None, cache_value) new_terms = set(self.get_trans_terms(cache_value)) for record in records: # shortcut when no term needs to be translated if not new_terms: new_translations_list.append({'en_US': cache_value, lang: cache_value}) continue # _get_stored_translations can be refactored and prefetches translations for multi records, # but it is really rare to write the same non-False/None/no-term value to multi records old_translations = self._get_stored_translations(record) if not old_translations: new_translations_list.append({'en_US': cache_value, lang: cache_value}) continue from_lang_value = old_translations.get(lang, old_translations['en_US']) translation_dictionary = self.get_translation_dictionary(from_lang_value, old_translations) text2terms = defaultdict(list) for term in new_terms: text2terms[self.get_text_content(term)].append(term) for old_term in list(translation_dictionary.keys()): if old_term not in new_terms: old_term_text = self.get_text_content(old_term) matches = get_close_matches(old_term_text, text2terms, 1, 0.9) if matches: closest_term = get_close_matches(old_term, text2terms[matches[0]], 1, 0)[0] old_is_text = old_term == self.get_text_content(old_term) closest_is_text = closest_term == self.get_text_content(closest_term) if old_is_text or not closest_is_text: translation_dictionary[closest_term] = translation_dictionary.pop(old_term) # pylint: disable=not-callable new_translations = { l: self.translate(lambda term: translation_dictionary.get(term, {l: None})[l], cache_value) for l in old_translations.keys() } new_translations[lang] = cache_value if not records.env['res.lang']._lang_get_id('en_US'): new_translations['en_US'] = cache_value new_translations_list.append(new_translations) # Maybe we can use Cache.update(records.with_context(cache_update_raw=True), self, new_translations_list, dirty=True) cache.update_raw(records, self, new_translations_list, dirty=True) return records class Char(_String): """ Basic string field, can be length-limited, usually displayed as a single-line string in clients. :param int size: the maximum size of values stored for that field :param bool trim: states whether the value is trimmed or not (by default, ``True``). Note that the trim operation is applied only by the web client. :param translate: enable the translation of the field's values; use ``translate=True`` to translate field values as a whole; ``translate`` may also be a callable such that ``translate(callback, value)`` translates ``value`` by using ``callback(term)`` to retrieve the translation of terms. :type translate: bool or callable """ type = 'char' size = None # maximum size of values (deprecated) trim = True # whether value is trimmed (only by web client) def _setup_attrs(self, model_class, name): super()._setup_attrs(model_class, name) assert self.size is None or isinstance(self.size, int), \ "Char field %s with non-integer size %r" % (self, self.size) @property def column_type(self): return ('jsonb', 'jsonb') if self.translate else ('varchar', pg_varchar(self.size)) def update_db_column(self, model, column): if ( column and self.column_type[0] == 'varchar' and column['udt_name'] == 'varchar' and column['character_maximum_length'] and (self.size is None or column['character_maximum_length'] < self.size) ): # the column's varchar size does not match self.size; convert it sql.convert_column(model._cr, model._table, self.name, self.column_type[1]) super().update_db_column(model, column) _related_size = property(attrgetter('size')) _related_trim = property(attrgetter('trim')) _description_size = property(attrgetter('size')) _description_trim = property(attrgetter('trim')) def convert_to_column(self, value, record, values=None, validate=True): if value is None or value is False: return None # we need to convert the string to a unicode object to be able # to evaluate its length (and possibly truncate it) reliably return super().convert_to_column(pycompat.to_text(value)[:self.size], record, values, validate) def convert_to_cache(self, value, record, validate=True): if value is None or value is False: return None return pycompat.to_text(value)[:self.size] class Text(_String): """ Very similar to :class:`Char` but used for longer contents, does not have a size and usually displayed as a multiline text box. :param translate: enable the translation of the field's values; use ``translate=True`` to translate field values as a whole; ``translate`` may also be a callable such that ``translate(callback, value)`` translates ``value`` by using ``callback(term)`` to retrieve the translation of terms. :type translate: bool or callable """ type = 'text' @property def column_type(self): return ('jsonb', 'jsonb') if self.translate else ('text', 'text') def convert_to_cache(self, value, record, validate=True): if value is None or value is False: return None return ustr(value) class Html(_String): """ Encapsulates an html code content. :param bool sanitize: whether value must be sanitized (default: ``True``) :param bool sanitize_overridable: whether the sanitation can be bypassed by the users part of the `base.group_sanitize_override` group (default: ``False``) :param bool sanitize_tags: whether to sanitize tags (only a white list of attributes is accepted, default: ``True``) :param bool sanitize_attributes: whether to sanitize attributes (only a white list of attributes is accepted, default: ``True``) :param bool sanitize_style: whether to sanitize style attributes (default: ``False``) :param bool strip_style: whether to strip style attributes (removed and therefore not sanitized, default: ``False``) :param bool strip_classes: whether to strip classes attributes (default: ``False``) """ type = 'html' sanitize = True # whether value must be sanitized sanitize_overridable = False # whether the sanitation can be bypassed by the users part of the `base.group_sanitize_override` group sanitize_tags = True # whether to sanitize tags (only a white list of attributes is accepted) sanitize_attributes = True # whether to sanitize attributes (only a white list of attributes is accepted) sanitize_style = False # whether to sanitize style attributes sanitize_form = True # whether to sanitize forms strip_style = False # whether to strip style attributes (removed and therefore not sanitized) strip_classes = False # whether to strip classes attributes def _get_attrs(self, model_class, name): # called by _setup_attrs(), working together with _String._setup_attrs() attrs = super()._get_attrs(model_class, name) # Translated sanitized html fields must use html_translate or a callable. if attrs.get('translate') is True and attrs.get('sanitize', True): attrs['translate'] = html_translate return attrs @property def column_type(self): return ('jsonb', 'jsonb') if self.translate else ('text', 'text') _related_sanitize = property(attrgetter('sanitize')) _related_sanitize_tags = property(attrgetter('sanitize_tags')) _related_sanitize_attributes = property(attrgetter('sanitize_attributes')) _related_sanitize_style = property(attrgetter('sanitize_style')) _related_strip_style = property(attrgetter('strip_style')) _related_strip_classes = property(attrgetter('strip_classes')) _description_sanitize = property(attrgetter('sanitize')) _description_sanitize_tags = property(attrgetter('sanitize_tags')) _description_sanitize_attributes = property(attrgetter('sanitize_attributes')) _description_sanitize_style = property(attrgetter('sanitize_style')) _description_strip_style = property(attrgetter('strip_style')) _description_strip_classes = property(attrgetter('strip_classes')) def convert_to_column(self, value, record, values=None, validate=True): return super().convert_to_column(self._convert(value, record, True), record, values, validate) def convert_to_cache(self, value, record, validate=True): return self._convert(value, record, validate) def _convert(self, value, record, validate): if value is None or value is False: return None if not validate or not self.sanitize: return value sanitize_vals = { 'silent': True, 'sanitize_tags': self.sanitize_tags, 'sanitize_attributes': self.sanitize_attributes, 'sanitize_style': self.sanitize_style, 'sanitize_form': self.sanitize_form, 'strip_style': self.strip_style, 'strip_classes': self.strip_classes } if self.sanitize_overridable: if record.user_has_groups('base.group_sanitize_override'): return value original_value = record[self.name] if original_value: # Note that sanitize also normalize original_value_sanitized = html_sanitize(original_value, **sanitize_vals) original_value_normalized = html_normalize(original_value) if ( not original_value_sanitized # sanitizer could empty it or original_value_normalized != original_value_sanitized ): # The field contains element(s) that would be removed if # sanitized. It means that someone who was part of a group # allowing to bypass the sanitation saved that field # previously. raise UserError(_( "The field value you're saving (%s %s) includes content that is " "restricted for security reasons. It is possible that someone " "with higher privileges previously modified it, and you are therefore " "not able to modify it yourself while preserving the content.", record._description, self.string, )) return html_sanitize(value, **sanitize_vals) def convert_to_record(self, value, record): r = super().convert_to_record(value, record) if isinstance(r, bytes): r = r.decode() return r and Markup(r) def convert_to_read(self, value, record, use_name_get=True): r = super().convert_to_read(value, record, use_name_get) if isinstance(r, bytes): r = r.decode() return r and Markup(r) def get_trans_terms(self, value): # ensure the translation terms are stringified, otherwise we can break the PO file return list(map(str, super().get_trans_terms(value))) class Date(Field): """ Encapsulates a python :class:`date ` object. """ type = 'date' column_type = ('date', 'date') start_of = staticmethod(date_utils.start_of) end_of = staticmethod(date_utils.end_of) add = staticmethod(date_utils.add) subtract = staticmethod(date_utils.subtract) @staticmethod def today(*args): """Return the current day in the format expected by the ORM. .. note:: This function may be used to compute default values. """ return date.today() @staticmethod def context_today(record, timestamp=None): """Return the current date as seen in the client's timezone in a format fit for date fields. .. note:: This method may be used to compute default values. :param record: recordset from which the timezone will be obtained. :param datetime timestamp: optional datetime value to use instead of the current date and time (must be a datetime, regular dates can't be converted between timezones). :rtype: date """ today = timestamp or datetime.now() context_today = None tz_name = record._context.get('tz') or record.env.user.tz if tz_name: try: today_utc = pytz.timezone('UTC').localize(today, is_dst=False) # UTC = no DST context_today = today_utc.astimezone(pytz.timezone(tz_name)) except Exception: _logger.debug("failed to compute context/client-specific today date, using UTC value for `today`", exc_info=True) return (context_today or today).date() @staticmethod def to_date(value): """Attempt to convert ``value`` to a :class:`date` object. .. warning:: If a datetime object is given as value, it will be converted to a date object and all datetime-specific information will be lost (HMS, TZ, ...). :param value: value to convert. :type value: str or date or datetime :return: an object representing ``value``. :rtype: date or None """ if not value: return None if isinstance(value, date): if isinstance(value, datetime): return value.date() return value value = value[:DATE_LENGTH] return datetime.strptime(value, DATE_FORMAT).date() # kept for backwards compatibility, but consider `from_string` as deprecated, will probably # be removed after V12 from_string = to_date @staticmethod def to_string(value): """ Convert a :class:`date` or :class:`datetime` object to a string. :param value: value to convert. :return: a string representing ``value`` in the server's date format, if ``value`` is of type :class:`datetime`, the hours, minute, seconds, tzinfo will be truncated. :rtype: str """ return value.strftime(DATE_FORMAT) if value else False def convert_to_cache(self, value, record, validate=True): if not value: return None if isinstance(value, datetime): # TODO: better fix data files (crm demo data) value = value.date() # raise TypeError("%s (field %s) must be string or date, not datetime." % (value, self)) return self.to_date(value) def convert_to_export(self, value, record): if not value: return '' return self.from_string(value) class Datetime(Field): """ Encapsulates a python :class:`datetime ` object. """ type = 'datetime' column_type = ('timestamp', 'timestamp') start_of = staticmethod(date_utils.start_of) end_of = staticmethod(date_utils.end_of) add = staticmethod(date_utils.add) subtract = staticmethod(date_utils.subtract) @staticmethod def now(*args): """Return the current day and time in the format expected by the ORM. .. note:: This function may be used to compute default values. """ # microseconds must be annihilated as they don't comply with the server datetime format return datetime.now().replace(microsecond=0) @staticmethod def today(*args): """Return the current day, at midnight (00:00:00).""" return Datetime.now().replace(hour=0, minute=0, second=0) @staticmethod def context_timestamp(record, timestamp): """Return the given timestamp converted to the client's timezone. .. note:: This method is *not* meant for use as a default initializer, because datetime fields are automatically converted upon display on client side. For default values, :meth:`now` should be used instead. :param record: recordset from which the timezone will be obtained. :param datetime timestamp: naive datetime value (expressed in UTC) to be converted to the client timezone. :return: timestamp converted to timezone-aware datetime in context timezone. :rtype: datetime """ assert isinstance(timestamp, datetime), 'Datetime instance expected' tz_name = record._context.get('tz') or record.env.user.tz utc_timestamp = pytz.utc.localize(timestamp, is_dst=False) # UTC = no DST if tz_name: try: context_tz = pytz.timezone(tz_name) return utc_timestamp.astimezone(context_tz) except Exception: _logger.debug("failed to compute context/client-specific timestamp, " "using the UTC value", exc_info=True) return utc_timestamp @staticmethod def to_datetime(value): """Convert an ORM ``value`` into a :class:`datetime` value. :param value: value to convert. :type value: str or date or datetime :return: an object representing ``value``. :rtype: datetime or None """ if not value: return None if isinstance(value, date): if isinstance(value, datetime): if value.tzinfo: raise ValueError("Datetime field expects a naive datetime: %s" % value) return value return datetime.combine(value, time.min) # TODO: fix data files return datetime.strptime(value, DATETIME_FORMAT[:len(value)-2]) # kept for backwards compatibility, but consider `from_string` as deprecated, will probably # be removed after V12 from_string = to_datetime @staticmethod def to_string(value): """Convert a :class:`datetime` or :class:`date` object to a string. :param value: value to convert. :type value: datetime or date :return: a string representing ``value`` in the server's datetime format, if ``value`` is of type :class:`date`, the time portion will be midnight (00:00:00). :rtype: str """ return value.strftime(DATETIME_FORMAT) if value else False def convert_to_cache(self, value, record, validate=True): return self.to_datetime(value) def convert_to_export(self, value, record): if not value: return '' value = self.convert_to_display_name(value, record) return self.from_string(value) def convert_to_display_name(self, value, record): assert record, 'Record expected' return Datetime.to_string(Datetime.context_timestamp(record, Datetime.from_string(value))) # http://initd.org/psycopg/docs/usage.html#binary-adaptation # Received data is returned as buffer (in Python 2) or memoryview (in Python 3). _BINARY = memoryview class Binary(Field): """Encapsulates a binary content (e.g. a file). :param bool attachment: whether the field should be stored as `ir_attachment` or in a column of the model's table (default: ``True``). """ type = 'binary' prefetch = False # not prefetched by default _depends_context = ('bin_size',) # depends on context (content or size) attachment = True # whether value is stored in attachment @property def column_type(self): return None if self.attachment else ('bytea', 'bytea') def _get_attrs(self, model_class, name): attrs = super()._get_attrs(model_class, name) if not attrs.get('store', True): attrs['attachment'] = False return attrs _description_attachment = property(attrgetter('attachment')) def convert_to_column(self, value, record, values=None, validate=True): # Binary values may be byte strings (python 2.6 byte array), but # the legacy OpenERP convention is to transfer and store binaries # as base64-encoded strings. The base64 string may be provided as a # unicode in some circumstances, hence the str() cast here. # This str() coercion will only work for pure ASCII unicode strings, # on purpose - non base64 data must be passed as a 8bit byte strings. if not value: return None # Detect if the binary content is an SVG for restricting its upload # only to system users. magic_bytes = { b'P', # first 6 bits of '<' (0x3C) b64 encoded b'<', # plaintext XML tag opening } if isinstance(value, str): value = value.encode() if value[:1] in magic_bytes: try: decoded_value = base64.b64decode(value.translate(None, delete=b'\r\n'), validate=True) except binascii.Error: decoded_value = value # Full mimetype detection if (guess_mimetype(decoded_value).startswith('image/svg') and not record.env.is_system()): raise UserError(_("Only admins can upload SVG files.")) if isinstance(value, bytes): return psycopg2.Binary(value) try: return psycopg2.Binary(str(value).encode('ascii')) except UnicodeEncodeError: raise UserError(_("ASCII characters are required for %s in %s") % (value, self.name)) def convert_to_cache(self, value, record, validate=True): if isinstance(value, _BINARY): return bytes(value) if isinstance(value, str): # the cache must contain bytes or memoryview, but sometimes a string # is given when assigning a binary field (test `TestFileSeparator`) return value.encode() if isinstance(value, int) and \ (record._context.get('bin_size') or record._context.get('bin_size_' + self.name)): # If the client requests only the size of the field, we return that # instead of the content. Presumably a separate request will be done # to read the actual content, if necessary. value = human_size(value) # human_size can return False (-> None) or a string (-> encoded) return value.encode() if value else None return None if value is False else value def convert_to_record(self, value, record): if isinstance(value, _BINARY): return bytes(value) return False if value is None else value def compute_value(self, records): bin_size_name = 'bin_size_' + self.name if records.env.context.get('bin_size') or records.env.context.get(bin_size_name): # always compute without bin_size records_no_bin_size = records.with_context(**{'bin_size': False, bin_size_name: False}) super().compute_value(records_no_bin_size) # manually update the bin_size cache cache = records.env.cache for record_no_bin_size, record in zip(records_no_bin_size, records): try: value = cache.get(record_no_bin_size, self) try: value = base64.b64decode(value) except (TypeError, binascii.Error): pass try: if isinstance(value, (bytes, _BINARY)): value = human_size(len(value)) except (TypeError): pass cache_value = self.convert_to_cache(value, record) dirty = self.column_type and self.store and any(records._ids) cache.set(record, self, cache_value, dirty=dirty) except CacheMiss: pass else: super().compute_value(records) def read(self, records): # values are stored in attachments, retrieve them assert self.attachment domain = [ ('res_model', '=', records._name), ('res_field', '=', self.name), ('res_id', 'in', records.ids), ] # Note: the 'bin_size' flag is handled by the field 'datas' itself data = { att.res_id: att.datas for att in records.env['ir.attachment'].sudo().search(domain) } records.env.cache.insert_missing(records, self, map(data.get, records._ids)) def create(self, record_values): assert self.attachment if not record_values: return # create the attachments that store the values env = record_values[0][0].env with env.norecompute(): env['ir.attachment'].sudo().with_context( binary_field_real_user=env.user, ).create([{ 'name': self.name, 'res_model': self.model_name, 'res_field': self.name, 'res_id': record.id, 'type': 'binary', 'datas': value, } for record, value in record_values if value ]) def write(self, records, value): if not self.attachment: return super().write(records, value) # discard recomputation of self on records records.env.remove_to_compute(self, records) # update the cache, and discard the records that are not modified cache = records.env.cache cache_value = self.convert_to_cache(value, records) records = cache.get_records_different_from(records, self, cache_value) if not records: return records if self.store: # determine records that are known to be not null not_null = cache.get_records_different_from(records, self, None) cache.update(records, self, itertools.repeat(cache_value)) # retrieve the attachments that store the values, and adapt them if self.store and any(records._ids): real_records = records.filtered('id') atts = records.env['ir.attachment'].sudo() if not_null: atts = atts.search([ ('res_model', '=', self.model_name), ('res_field', '=', self.name), ('res_id', 'in', real_records.ids), ]) if value: # update the existing attachments atts.write({'datas': value}) atts_records = records.browse(atts.mapped('res_id')) # create the missing attachments missing = (real_records - atts_records) if missing: atts.create([{ 'name': self.name, 'res_model': record._name, 'res_field': self.name, 'res_id': record.id, 'type': 'binary', 'datas': value, } for record in missing ]) else: atts.unlink() return records class Image(Binary): """Encapsulates an image, extending :class:`Binary`. If image size is greater than the ``max_width``/``max_height`` limit of pixels, the image will be resized to the limit by keeping aspect ratio. :param int max_width: the maximum width of the image (default: ``0``, no limit) :param int max_height: the maximum height of the image (default: ``0``, no limit) :param bool verify_resolution: whether the image resolution should be verified to ensure it doesn't go over the maximum image resolution (default: ``True``). See :class:`odoo.tools.image.ImageProcess` for maximum image resolution (default: ``50e6``). .. note:: If no ``max_width``/``max_height`` is specified (or is set to 0) and ``verify_resolution`` is False, the field content won't be verified at all and a :class:`Binary` field should be used. """ max_width = 0 max_height = 0 verify_resolution = True def create(self, record_values): new_record_values = [] for record, value in record_values: # strange behavior when setting related image field, when `self` # does not resize the same way as its related field new_value = self._image_process(value) new_record_values.append((record, new_value)) cache_value = self.convert_to_cache(value if self.related else new_value, record) record.env.cache.update(record, self, itertools.repeat(cache_value)) super(Image, self).create(new_record_values) def write(self, records, value): try: new_value = self._image_process(value) except UserError: if not any(records._ids): # Some crap is assigned to a new record. This can happen in an # onchange, where the client sends the "bin size" value of the # field instead of its full value (this saves bandwidth). In # this case, we simply don't assign the field: its value will be # taken from the records' origin. return raise super(Image, self).write(records, new_value) cache_value = self.convert_to_cache(value if self.related else new_value, records) dirty = self.column_type and self.store and any(records._ids) records.env.cache.update(records, self, itertools.repeat(cache_value), dirty=dirty) def _image_process(self, value): if self.readonly and not self.max_width and not self.max_height: # no need to process images for computed fields, or related fields return value try: img = base64.b64decode(value or '') or False except: raise UserError(_("Image is not encoded in base64.")) return base64.b64encode(image_process(img, size=(self.max_width, self.max_height), verify_resolution=self.verify_resolution, ) or b'') or False def _process_related(self, value): """Override to resize the related value before saving it on self.""" try: return self._image_process(super()._process_related(value)) except UserError: # Avoid the following `write` to fail if the related image was saved # invalid, which can happen for pre-existing databases. return False class Selection(Field): """ Encapsulates an exclusive choice between different values. :param selection: specifies the possible values for this field. It is given as either a list of pairs ``(value, label)``, or a model method, or a method name. :type selection: list(tuple(str,str)) or callable or str :param selection_add: provides an extension of the selection in the case of an overridden field. It is a list of pairs ``(value, label)`` or singletons ``(value,)``, where singleton values must appear in the overridden selection. The new values are inserted in an order that is consistent with the overridden selection and this list:: selection = [('a', 'A'), ('b', 'B')] selection_add = [('c', 'C'), ('b',)] > result = [('a', 'A'), ('c', 'C'), ('b', 'B')] :type selection_add: list(tuple(str,str)) :param ondelete: provides a fallback mechanism for any overridden field with a selection_add. It is a dict that maps every option from the selection_add to a fallback action. This fallback action will be applied to all records whose selection_add option maps to it. The actions can be any of the following: - 'set null' -- the default, all records with this option will have their selection value set to False. - 'cascade' -- all records with this option will be deleted along with the option itself. - 'set default' -- all records with this option will be set to the default of the field definition - 'set VALUE' -- all records with this option will be set to the given value - -- a callable whose first and only argument will be the set of records containing the specified Selection option, for custom processing The attribute ``selection`` is mandatory except in the case of ``related`` or extended fields. """ type = 'selection' column_type = ('varchar', pg_varchar()) selection = None # [(value, string), ...], function or method name validate = True # whether validating upon write ondelete = None # {value: policy} (what to do when value is deleted) def __init__(self, selection=Default, string=Default, **kwargs): super(Selection, self).__init__(selection=selection, string=string, **kwargs) def setup_nonrelated(self, model): super().setup_nonrelated(model) assert self.selection is not None, "Field %s without selection" % self def setup_related(self, model): super().setup_related(model) # selection must be computed on related field field = self.related_field self.selection = lambda model: field._description_selection(model.env) def _get_attrs(self, model_class, name): attrs = super()._get_attrs(model_class, name) # arguments 'selection' and 'selection_add' are processed below attrs.pop('selection_add', None) # Selection fields have an optional default implementation of a group_expand function if attrs.get('group_expand') is True: attrs['group_expand'] = self._default_group_expand return attrs def _setup_attrs(self, model_class, name): super()._setup_attrs(model_class, name) if not self._base_fields: return # determine selection (applying 'selection_add' extensions) values = None labels = {} for field in self._base_fields: # We cannot use field.selection or field.selection_add here # because those attributes are overridden by ``_setup_attrs``. if 'selection' in field.args: if self.related: _logger.warning("%s: selection attribute will be ignored as the field is related", self) selection = field.args['selection'] if isinstance(selection, list): if values is not None and values != [kv[0] for kv in selection]: _logger.warning("%s: selection=%r overrides existing selection; use selection_add instead", self, selection) values = [kv[0] for kv in selection] labels = dict(selection) self.ondelete = {} else: values = None labels = {} self.selection = selection self.ondelete = None if 'selection_add' in field.args: if self.related: _logger.warning("%s: selection_add attribute will be ignored as the field is related", self) selection_add = field.args['selection_add'] assert isinstance(selection_add, list), \ "%s: selection_add=%r must be a list" % (self, selection_add) assert values is not None, \ "%s: selection_add=%r on non-list selection %r" % (self, selection_add, self.selection) ondelete = field.args.get('ondelete') or {} new_values = [kv[0] for kv in selection_add if kv[0] not in values] for key in new_values: ondelete.setdefault(key, 'set null') if self.required and new_values and 'set null' in ondelete.values(): raise ValueError( "%r: required selection fields must define an ondelete policy that " "implements the proper cleanup of the corresponding records upon " "module uninstallation. Please use one or more of the following " "policies: 'set default' (if the field has a default defined), 'cascade', " "or a single-argument callable where the argument is the recordset " "containing the specified option." % self ) # check ondelete values for key, val in ondelete.items(): if callable(val) or val in ('set null', 'cascade'): continue if val == 'set default': assert self.default is not None, ( "%r: ondelete policy of type 'set default' is invalid for this field " "as it does not define a default! Either define one in the base " "field, or change the chosen ondelete policy" % self ) elif val.startswith('set '): assert val[4:] in values, ( "%s: ondelete policy of type 'set %%' must be either 'set null', " "'set default', or 'set value' where value is a valid selection value." ) % self else: raise ValueError( "%r: ondelete policy %r for selection value %r is not a valid ondelete" " policy, please choose one of 'set null', 'set default', " "'set [value]', 'cascade' or a callable" % (self, val, key) ) values = merge_sequences(values, [kv[0] for kv in selection_add]) labels.update(kv for kv in selection_add if len(kv) == 2) self.ondelete.update(ondelete) if values is not None: self.selection = [(value, labels[value]) for value in values] if isinstance(self.selection, list): assert all(isinstance(v, str) for v, _ in self.selection), \ "Field %s with non-str value in selection" % self def _selection_modules(self, model): """ Return a mapping from selection values to modules defining each value. """ if not isinstance(self.selection, list): return {} value_modules = defaultdict(set) for field in reversed(resolve_mro(model, self.name, type(self).__instancecheck__)): module = field._module if not module: continue if 'selection' in field.args: value_modules.clear() if isinstance(field.args['selection'], list): for value, label in field.args['selection']: value_modules[value].add(module) if 'selection_add' in field.args: for value_label in field.args['selection_add']: if len(value_label) > 1: value_modules[value_label[0]].add(module) return value_modules def _description_selection(self, env): """ return the selection list (pairs (value, label)); labels are translated according to context language """ selection = self.selection if isinstance(selection, str) or callable(selection): return determine(selection, env[self.model_name]) # translate selection labels if env.lang: return env['ir.model.fields'].get_field_selection(self.model_name, self.name) else: return selection def _default_group_expand(self, records, groups, domain, order): # return a group per selection option, in definition order return self.get_values(records.env) def get_values(self, env): """Return a list of the possible values.""" selection = self.selection if isinstance(selection, str) or callable(selection): selection = determine(selection, env[self.model_name].with_context(lang=None)) return [value for value, _ in selection] def convert_to_column(self, value, record, values=None, validate=True): if validate and self.validate: value = self.convert_to_cache(value, record) return super(Selection, self).convert_to_column(value, record, values, validate) def convert_to_cache(self, value, record, validate=True): if not validate: return value or None if value and self.column_type[0] == 'int4': value = int(value) if value in self.get_values(record.env): return value elif not value: return None raise ValueError("Wrong value for %s: %r" % (self, value)) def convert_to_export(self, value, record): if not isinstance(self.selection, list): # FIXME: this reproduces an existing buggy behavior! return value if value else '' for item in self._description_selection(record.env): if item[0] == value: return item[1] return '' class Reference(Selection): """ Pseudo-relational field (no FK in database). The field value is stored as a :class:`string ` following the pattern ``"res_model,res_id"`` in database. """ type = 'reference' @property def column_type(self): return ('varchar', pg_varchar()) def convert_to_column(self, value, record, values=None, validate=True): return Field.convert_to_column(self, value, record, values, validate) def convert_to_cache(self, value, record, validate=True): # cache format: str ("model,id") or None if isinstance(value, BaseModel): if not validate or (value._name in self.get_values(record.env) and len(value) <= 1): return "%s,%s" % (value._name, value.id) if value else None elif isinstance(value, str): res_model, res_id = value.split(',') if not validate or res_model in self.get_values(record.env): if record.env[res_model].browse(int(res_id)).exists(): return value else: return None elif not value: return None raise ValueError("Wrong value for %s: %r" % (self, value)) def convert_to_record(self, value, record): if value: res_model, res_id = value.split(',') return record.env[res_model].browse(int(res_id)) return None def convert_to_read(self, value, record, use_name_get=True): return "%s,%s" % (value._name, value.id) if value else False def convert_to_export(self, value, record): return value.display_name if value else '' def convert_to_display_name(self, value, record): return value.display_name if value else False class _Relational(Field): """ Abstract class for relational fields. """ relational = True domain = [] # domain for searching values context = {} # context for searching values check_company = False def __get__(self, records, owner): # base case: do the regular access if records is None or len(records._ids) <= 1: return super().__get__(records, owner) # multirecord case: use mapped return self.mapped(records) def setup_nonrelated(self, model): super().setup_nonrelated(model) if self.comodel_name not in model.pool: _logger.warning("Field %s with unknown comodel_name %r", self, self.comodel_name) self.comodel_name = '_unknown' def get_domain_list(self, model): """ Return a list domain from the domain parameter. """ domain = self.domain if callable(domain): domain = domain(model) return domain if isinstance(domain, list) else [] @property def _related_domain(self): if callable(self.domain): # will be called with another model than self's return lambda recs: self.domain(recs.env[self.model_name]) else: # maybe not correct if domain is a string... return self.domain _related_context = property(attrgetter('context')) _description_relation = property(attrgetter('comodel_name')) _description_context = property(attrgetter('context')) def _description_domain(self, env): if self.check_company and not self.domain: if self.company_dependent: if self.comodel_name == "res.users": # user needs access to current company (self.env.company) return "[('company_ids', 'in', allowed_company_ids[0])]" else: return "[('company_id', 'in', [allowed_company_ids[0], False])]" else: # when using check_company=True on a field on 'res.company', the # company_id comes from the id of the current record cid = "id" if self.model_name == "res.company" else "company_id" if self.comodel_name == "res.users": # User allowed company ids = user.company_ids return f"['|', (not {cid}, '=', True), ('company_ids', 'in', [{cid}])]" else: return f"[('company_id', 'in', [{cid}, False])]" return self.domain(env[self.model_name]) if callable(self.domain) else self.domain def null(self, record): return record.env[self.comodel_name] class Many2one(_Relational): """ The value of such a field is a recordset of size 0 (no record) or 1 (a single record). :param str comodel_name: name of the target model ``Mandatory`` except for related or extended fields. :param domain: an optional domain to set on candidate values on the client side (domain or string) :param dict context: an optional context to use on the client side when handling that field :param str ondelete: what to do when the referred record is deleted; possible values are: ``'set null'``, ``'restrict'``, ``'cascade'`` :param bool auto_join: whether JOINs are generated upon search through that field (default: ``False``) :param bool delegate: set it to ``True`` to make fields of the target model accessible from the current model (corresponds to ``_inherits``) :param bool check_company: Mark the field to be verified in :meth:`~odoo.models.Model._check_company`. Add a default company domain depending on the field attributes. """ type = 'many2one' column_type = ('int4', 'int4') ondelete = None # what to do when value is deleted auto_join = False # whether joins are generated upon search delegate = False # whether self implements delegation def __init__(self, comodel_name=Default, string=Default, **kwargs): super(Many2one, self).__init__(comodel_name=comodel_name, string=string, **kwargs) def _setup_attrs(self, model_class, name): super()._setup_attrs(model_class, name) # determine self.delegate if not self.delegate and name in model_class._inherits.values(): self.delegate = True # self.delegate implies self.auto_join if self.delegate: self.auto_join = True def setup_nonrelated(self, model): super().setup_nonrelated(model) # 3 cases: # 1) The ondelete attribute is not defined, we assign it a sensible default # 2) The ondelete attribute is defined and its definition makes sense # 3) The ondelete attribute is explicitly defined as 'set null' for a required m2o, # this is considered a programming error. if not self.ondelete: comodel = model.env[self.comodel_name] if model.is_transient() and not comodel.is_transient(): # Many2one relations from TransientModel Model are annoying because # they can block deletion due to foreign keys. So unless stated # otherwise, we default them to ondelete='cascade'. self.ondelete = 'cascade' if self.required else 'set null' else: self.ondelete = 'restrict' if self.required else 'set null' if self.ondelete == 'set null' and self.required: raise ValueError( "The m2o field %s of model %s is required but declares its ondelete policy " "as being 'set null'. Only 'restrict' and 'cascade' make sense." % (self.name, model._name) ) if self.ondelete == 'restrict' and self.comodel_name in IR_MODELS: raise ValueError( f"Field {self.name} of model {model._name} is defined as ondelete='restrict' " f"while having {self.comodel_name} as comodel, the 'restrict' mode is not " f"supported for this type of field as comodel." ) def update_db(self, model, columns): comodel = model.env[self.comodel_name] if not model.is_transient() and comodel.is_transient(): raise ValueError('Many2one %s from Model to TransientModel is forbidden' % self) return super(Many2one, self).update_db(model, columns) def update_db_column(self, model, column): super(Many2one, self).update_db_column(model, column) model.pool.post_init(self.update_db_foreign_key, model, column) def update_db_foreign_key(self, model, column): comodel = model.env[self.comodel_name] # foreign keys do not work on views, and users can define custom models on sql views. if not model._is_an_ordinary_table() or not comodel._is_an_ordinary_table(): return # ir_actions is inherited, so foreign key doesn't work on it if not comodel._auto or comodel._table == 'ir_actions': return # create/update the foreign key, and reflect it in 'ir.model.constraint' model.pool.add_foreign_key( model._table, self.name, comodel._table, 'id', self.ondelete or 'set null', model, self._module ) def _update(self, records, value): """ Update the cached value of ``self`` for ``records`` with ``value``. """ cache = records.env.cache for record in records: cache.set(record, self, self.convert_to_cache(value, record, validate=False)) def convert_to_column(self, value, record, values=None, validate=True): return value or None def convert_to_cache(self, value, record, validate=True): # cache format: id or None if type(value) in IdType: id_ = value elif isinstance(value, BaseModel): if validate and (value._name != self.comodel_name or len(value) > 1): raise ValueError("Wrong value for %s: %r" % (self, value)) id_ = value._ids[0] if value._ids else None elif isinstance(value, tuple): # value is either a pair (id, name), or a tuple of ids id_ = value[0] if value else None elif isinstance(value, dict): # return a new record (with the given field 'id' as origin) comodel = record.env[self.comodel_name] origin = comodel.browse(value.get('id')) id_ = comodel.new(value, origin=origin).id else: id_ = None if self.delegate and record and not any(record._ids): # if all records are new, then so is the parent id_ = id_ and NewId(id_) return id_ def convert_to_record(self, value, record): # use registry to avoid creating a recordset for the model ids = () if value is None else (value,) prefetch_ids = PrefetchMany2one(record, self) return record.pool[self.comodel_name](record.env, ids, prefetch_ids) def convert_to_record_multi(self, values, records): # return the ids as a recordset without duplicates prefetch_ids = PrefetchMany2one(records, self) ids = tuple(unique(id_ for id_ in values if id_ is not None)) return records.pool[self.comodel_name](records.env, ids, prefetch_ids) def convert_to_read(self, value, record, use_name_get=True): if use_name_get and value: # evaluate name_get() as superuser, because the visibility of a # many2one field value (id and name) depends on the current record's # access rights, and not the value's access rights. try: # performance: value.sudo() prefetches the same records as value return (value.id, value.sudo().display_name) except MissingError: # Should not happen, unless the foreign key is missing. return False else: return value.id def convert_to_write(self, value, record): if type(value) in IdType: return value if not value: return False if isinstance(value, BaseModel) and value._name == self.comodel_name: return value.id if isinstance(value, tuple): # value is either a pair (id, name), or a tuple of ids return value[0] if value else False if isinstance(value, dict): return record.env[self.comodel_name].new(value).id raise ValueError("Wrong value for %s: %r" % (self, value)) def convert_to_export(self, value, record): return value.display_name if value else '' def convert_to_display_name(self, value, record): return value.display_name def convert_to_onchange(self, value, record, names): # if value is a new record, serialize its origin instead return super().convert_to_onchange(value._origin, record, names) def write(self, records, value): # discard recomputation of self on records records.env.remove_to_compute(self, records) # discard the records that are not modified cache = records.env.cache cache_value = self.convert_to_cache(value, records) records = cache.get_records_different_from(records, self, cache_value) if not records: return records # remove records from the cache of one2many fields of old corecords self._remove_inverses(records, cache_value) # update the cache of self dirty = self.store and any(records._ids) cache.update(records, self, itertools.repeat(cache_value), dirty=dirty) # update the cache of one2many fields of new corecord self._update_inverses(records, cache_value) return records def _remove_inverses(self, records, value): """ Remove `records` from the cached values of the inverse fields of `self`. """ cache = records.env.cache record_ids = set(records._ids) # align(id) returns a NewId if records are new, a real id otherwise align = (lambda id_: id_) if all(record_ids) else (lambda id_: id_ and NewId(id_)) for invf in records.pool.field_inverses[self]: corecords = records.env[self.comodel_name].browse( align(id_) for id_ in cache.get_values(records, self) ) for corecord in corecords: ids0 = cache.get(corecord, invf, None) if ids0 is not None: ids1 = tuple(id_ for id_ in ids0 if id_ not in record_ids) cache.set(corecord, invf, ids1) def _update_inverses(self, records, value): """ Add `records` to the cached values of the inverse fields of `self`. """ if value is None: return cache = records.env.cache corecord = self.convert_to_record(value, records) for invf in records.pool.field_inverses[self]: valid_records = records.filtered_domain(invf.get_domain_list(corecord)) if not valid_records: continue ids0 = cache.get(corecord, invf, None) # if the value for the corecord is not in cache, but this is a new # record, assign it anyway, as you won't be able to fetch it from # database (see `test_sale_order`) if ids0 is not None or not corecord.id: ids1 = tuple(unique((ids0 or ()) + valid_records._ids)) cache.set(corecord, invf, ids1) class Many2oneReference(Integer): """ Pseudo-relational field (no FK in database). The field value is stored as an :class:`integer ` id in database. Contrary to :class:`Reference` fields, the model has to be specified in a :class:`Char` field, whose name has to be specified in the `model_field` attribute for the current :class:`Many2oneReference` field. :param str model_field: name of the :class:`Char` where the model name is stored. """ type = 'many2one_reference' model_field = None _related_model_field = property(attrgetter('model_field')) def convert_to_cache(self, value, record, validate=True): # cache format: id or None if isinstance(value, BaseModel): value = value._ids[0] if value._ids else None return super().convert_to_cache(value, record, validate) def _remove_inverses(self, records, value): # TODO: unused # remove records from the cache of one2many fields of old corecords cache = records.env.cache record_ids = set(records._ids) model_ids = self._record_ids_per_res_model(records) for invf in records.pool.field_inverses[self]: records = records.browse(model_ids[invf.model_name]) if not records: continue corecords = records.env[invf.model_name].browse( id_ for id_ in cache.get_values(records, self) ) for corecord in corecords: ids0 = cache.get(corecord, invf, None) if ids0 is not None: ids1 = tuple(id_ for id_ in ids0 if id_ not in record_ids) cache.set(corecord, invf, ids1) def _update_inverses(self, records, value): """ Add `records` to the cached values of the inverse fields of `self`. """ if not value: return cache = records.env.cache model_ids = self._record_ids_per_res_model(records) for invf in records.pool.field_inverses[self]: records = records.browse(model_ids[invf.model_name]) if not records: continue corecord = records.env[invf.model_name].browse(value) records = records.filtered_domain(invf.get_domain_list(corecord)) if not records: continue ids0 = cache.get(corecord, invf, None) # if the value for the corecord is not in cache, but this is a new # record, assign it anyway, as you won't be able to fetch it from # database (see `test_sale_order`) if ids0 is not None or not corecord.id: ids1 = tuple(unique((ids0 or ()) + records._ids)) cache.set(corecord, invf, ids1) def _record_ids_per_res_model(self, records): model_ids = defaultdict(set) for record in records: model = record[self.model_field] if not model and record._fields[self.model_field].compute: # fallback when the model field is computed :-/ record._fields[self.model_field].compute_value(record) model = record[self.model_field] if not model: continue model_ids[model].add(record.id) return model_ids class Json(Field): """ JSON Field that contain unstructured information in jsonb PostgreSQL column. This field is still in beta Some features have not been implemented and won't be implemented in stable versions, including: * searching * indexing * mutating the values. """ type = 'json' column_type = ('jsonb', 'jsonb') def convert_to_record(self, value, record): """ Return a copy of the value """ return False if value is None else copy.deepcopy(value) def convert_to_cache(self, value, record, validate=True): if not value: return None return json.loads(json.dumps(value)) def convert_to_column(self, value, record, values=None, validate=True): if not value: return None return PsycopgJson(value) def convert_to_export(self, value, record): if not value: return '' return json.dumps(value) class Properties(Field): """ Field that contains a list of properties (aka "sub-field") based on a definition defined on a container. Properties are pseudo-fields, acting like Odoo fields but without being independently stored in database. This field allows a light customization based on a container record. Used for relationships such as / ,... New properties can be created on the fly without changing the structure of the database. The "definition_record" define the field used to find the container of the current record. The container must have a :class:`~odoo.fields.PropertiesDefinition` field "definition_record_field" that contains the properties definition (type of each property, default value)... Only the value of each property is stored on the child. When we read the properties field, we read the definition on the container and merge it with the value of the child. That way the web client has access to the full field definition (property type, ...). """ type = 'properties' column_type = ('jsonb', 'jsonb') copy = False prefetch = False write_sequence = 10 # because it must be written after the definition field # the field is computed editable by design (see the compute method below) store = True readonly = False precompute = True definition = None definition_record = None # field on the current model that point to the definition record definition_record_field = None # field on the definition record which defined the Properties field definition _description_definition_record = property(attrgetter('definition_record')) _description_definition_record_field = property(attrgetter('definition_record_field')) ALLOWED_TYPES = ( 'boolean', 'integer', 'float', 'char', 'date', 'datetime', 'many2one', 'many2many', 'selection', 'tags', ) def _setup_attrs(self, model_class, name): super()._setup_attrs(model_class, name) if self.definition: # determine definition_record and definition_record_field assert self.definition.count(".") == 1 self.definition_record, self.definition_record_field = self.definition.rsplit('.', 1) # make the field computed, and set its dependencies self._depends = (self.definition_record, ) self.compute = self._compute # Database/cache format: a value is either None, or a dict mapping property # names to their corresponding value, like # # { # '3adf37f3258cfe40': 'red', # 'aa34746a6851ee4e': 1337, # } # def convert_to_column(self, value, record, values=None, validate=True): if not value: return None value = self.convert_to_cache(value, record, validate=validate) return json.dumps(value) def convert_to_cache(self, value, record, validate=True): # any format -> cache format {name: value} or None if not value: return None if isinstance(value, dict): # avoid accidental side effects from shared mutable data return copy.deepcopy(value) if isinstance(value, str): value = json.loads(value) if not isinstance(value, dict): raise ValueError(f"Wrong property value {value!r}") return value if isinstance(value, list): # Convert the list with all definitions into a simple dict # {name: value} to store the strict minimum on the child self._remove_display_name(value) return self._list_to_dict(value) raise ValueError(f"Wrong property type {type(value)!r}") # Record format: the value is a list, where each element is a dict # containing the definition of a property, together with the property's # corresponding value, like # # [{ # 'name': '3adf37f3258cfe40', # 'string': 'Color Code', # 'type': 'char', # 'default': 'blue', # 'value': 'red', # }, { # 'name': 'aa34746a6851ee4e', # 'string': 'Partner', # 'type': 'many2one', # 'comodel': 'test_new_api.partner', # 'value': 1337, # }] # def convert_to_record(self, value, record): # value is in cache format definition = self._get_properties_definition(record) if not value or not definition: return definition or [] assert isinstance(value, dict), f"Wrong type {value!r}" value = self._dict_to_list(value, definition) self._parse_json_types(value, record.env) return value # Read format: almost identical to the record format, except that relational # field values have a display name. # # [{ # 'name': '3adf37f3258cfe40', # 'string': 'Color Code', # 'type': 'char', # 'default': 'blue', # 'value': 'red', # }, { # 'name': 'aa34746a6851ee4e', # 'string': 'Partner', # 'type': 'many2one', # 'comodel': 'test_new_api.partner', # 'value': [1337, 'Bob'], # }] # def convert_to_read(self, value, record, use_name_get=True): # value is in record format if use_name_get: self._add_display_name(value, record.env) return value def convert_to_write(self, value, record): """If we write a list on the child, update the definition record.""" if isinstance(value, list): # will update the definition record self._remove_display_name(value) return value return super().convert_to_write(value, record) def convert_to_onchange(self, value, record, names): self._add_display_name(value, record.env) return value def read(self, records): """Read everything needed in batch for the given records. To retrieve relational properties names, or to check their existence, we need to do some SQL queries. To reduce the number of queries when we read in batch, we put in cache everything needed before calling convert_to_record / convert_to_read. """ definition_records_map = { record: record[self.definition_record].sudo()[self.definition_record_field] for record in records } # ids per model we need to fetch in batch to put in cache ids_per_model = defaultdict(OrderedSet) records_cached_values = list(records.env.cache.get_values(records, self)) for record, record_values in zip(records, records_cached_values): definition = definition_records_map.get(record) if not record_values or not definition: continue for property_definition in definition: comodel = property_definition.get('comodel') type_ = property_definition.get('type') name = property_definition.get('name') if not comodel or type_ not in ('many2one', 'many2many') or name not in record_values: continue default = property_definition.get('default') or [] property_value = record_values[name] or [] if type_ == 'many2one': default = [default] if default else [] property_value = [property_value] if property_value else [] ids_per_model[comodel].update(default) ids_per_model[comodel].update(property_value) # check existence and pre-fetch in batch existing_ids_per_model = {} for model, ids in ids_per_model.items(): recs = records.env[model].browse(ids).exists() existing_ids_per_model[model] = set(recs.ids) for record in recs: # read a field to pre-fetch the recordset try: record.display_name except AccessError: pass # update the cache and remove non-existing ids for record, record_values in zip(records, records_cached_values): definition = definition_records_map.get(record) if not record_values or not definition: continue for property_definition in definition: comodel = property_definition.get('comodel') type_ = property_definition.get('type') name = property_definition.get('name') if not comodel or type_ not in ('many2one', 'many2many') or not record_values.get(name): continue property_value = record_values[name] if type_ == 'many2one': record_values[name] = property_value if property_value in existing_ids_per_model[comodel] else False else: record_values[name] = [id_ for id_ in property_value if id_ in existing_ids_per_model[comodel]] records.env.cache.update(record, self, [record_values], check_dirty=False) def write(self, records, value): """Check if the properties definition has been changed. To avoid extra SQL queries used to detect definition change, we add a flag in the properties list. Parent update is done only when this flag is present, delegating the check to the caller (generally web client). For deletion, we need to keep the removed property definition in the list to be able to put the delete flag in it. Otherwise we have no way to know that a property has been removed. """ if isinstance(value, str): value = json.loads(value) if isinstance(value, dict): # don't need to write on the container definition return super().write(records, value) definition_changed = any( definition.get('definition_changed') or definition.get('definition_deleted') for definition in value ) if definition_changed: value = [ definition for definition in value if not definition.get('definition_deleted') ] for definition in value: definition.pop('definition_changed', None) # update the properties definition on the container container = records[self.definition_record] if container: properties_definition = copy.deepcopy(value) for property_definition in properties_definition: property_definition.pop('value', None) container[self.definition_record_field] = properties_definition _logger.info('Properties field: User #%i changed definition of %r', records.env.user.id, container) return super().write(records, value) def _compute(self, records): """Add the default properties value when the container is changed.""" for record in records: record[self.name] = self._add_default_values( record.env, {self.name: record[self.name], self.definition_record: record[self.definition_record]}, ) def _add_default_values(self, env, values): """Read the properties definition to add default values. Default values are defined on the container in the 'default' key of the definition. :param env: environment :param values: All values that will be written on the record :return: Return the default values in the "dict" format """ properties_values = values.get(self.name) or {} if not values.get(self.definition_record): # container is not given in the value, can not find properties definition return properties_values container_id = values[self.definition_record] if not isinstance(container_id, (int, BaseModel)): raise ValueError(f"Wrong container value {container_id!r}") if isinstance(container_id, int): # retrieve the container record current_model = env[self.model_name] definition_record_field = current_model._fields[self.definition_record] container_model_name = definition_record_field.comodel_name container_id = env[container_model_name].browse(container_id) properties_definition = container_id[self.definition_record_field] if not properties_definition: return properties_values assert isinstance(properties_values, (list, dict)) if isinstance(properties_values, list): self._remove_display_name(properties_values) properties_list_values = properties_values else: properties_list_values = self._dict_to_list(properties_values, properties_definition) for properties_value in properties_list_values: if properties_value.get('value') is None: default = properties_value.get('default') or False properties_value['value'] = default return properties_list_values def _get_properties_definition(self, record): """Return the properties definition of the given record.""" container = record[self.definition_record] if container: return container.sudo()[self.definition_record_field] @classmethod def _add_display_name(cls, values_list, env, value_keys=('value', 'default')): """Add the "name_get" for each many2one / many2many properties. Modify in place "values_list". :param values_list: List of properties definition and values :param env: environment """ for property_definition in values_list: property_type = property_definition.get('type') property_model = property_definition.get('comodel') if not property_model: continue for value_key in value_keys: property_value = property_definition.get(value_key) if property_type == 'many2one' and property_value and isinstance(property_value, int): try: display_name = env[property_model].browse(property_value).display_name property_definition[value_key] = (property_value, display_name) except AccessError: # protect from access error message, show an empty name property_definition[value_key] = (property_value, None) except MissingError: property_definition[value_key] = False elif property_type == 'many2many' and property_value and is_list_of(property_value, int): property_definition[value_key] = [] records = env[property_model].browse(property_value) for record in records: try: property_definition[value_key].append((record.id, record.display_name)) except AccessError: property_definition[value_key].append((record.id, None)) except MissingError: continue @classmethod def _remove_display_name(cls, values_list, value_key='value'): """Remove the display name received by the web client for the relational properties. Modify in place "values_list". - many2one: (35, 'Bob') -> 35 - many2many: [(35, 'Bob'), (36, 'Alice')] -> [35, 36] :param values_list: List of properties definition with properties value :param value_key: In which dict key we need to remove the display name """ for property_definition in values_list: if not isinstance(property_definition, dict) or not property_definition.get('name'): continue property_value = property_definition.get(value_key) if not property_value: continue property_type = property_definition.get('type') if property_type == 'many2one' and has_list_types(property_value, [int, (str, NoneType)]): property_definition[value_key] = property_value[0] elif property_type == 'many2many': if is_list_of(property_value, (list, tuple)): # [(35, 'Admin'), (36, 'Demo')] -> [35, 36] property_definition[value_key] = [ many2many_value[0] for many2many_value in property_value ] @classmethod def _add_missing_names(cls, values_list): """Generate new properties name if needed. Modify in place "values_list". :param values_list: List of properties definition with properties value """ for definition in values_list: if definition.get('definition_changed') and not definition.get('name'): # keep only the first 64 bits definition['name'] = str(uuid.uuid4()).replace('-', '')[:16] @classmethod def _parse_json_types(cls, values_list, env): """Parse the value stored in the JSON. Check for records existence, if we removed a selection option, ... Modify in place "values_list". :param values_list: List of properties definition and values :param env: environment """ for property_definition in values_list: property_value = property_definition.get('value') property_type = property_definition.get('type') res_model = property_definition.get('comodel') if property_type not in cls.ALLOWED_TYPES: raise ValueError(f'Wrong property type {property_type!r}') if property_type == 'boolean': # E.G. convert zero to False property_value = bool(property_value) elif property_type == 'char' and not isinstance(property_value, str) \ and property_value is not None: property_value = False elif property_value and property_type == 'selection': # check if the selection option still exists options = property_definition.get('selection') or [] options = {option[0] for option in options if option or ()} # always length 2 if property_value not in options: # maybe the option has been removed on the container property_value = False elif property_value and property_type == 'tags': # remove all tags that are not defined on the container all_tags = {tag[0] for tag in property_definition.get('tags') or ()} property_value = [tag for tag in property_value if tag in all_tags] elif property_type == 'many2one' and property_value and res_model in env: if not isinstance(property_value, int): raise ValueError(f'Wrong many2one value: {property_value!r}.') elif property_type == 'many2many' and property_value and res_model in env: if not is_list_of(property_value, int): raise ValueError(f'Wrong many2many value: {property_value!r}.') if len(property_value) != len(set(property_value)): # remove duplicated value and preserve order property_value = list(dict.fromkeys(property_value)) property_definition['value'] = property_value @classmethod def _list_to_dict(cls, values_list): """Convert a list of properties with definition into a dict {name: value}. To not repeat data in database, we only store the value of each property on the child. The properties definition is stored on the container. E.G. Input list: [{ 'name': '3adf37f3258cfe40', 'string': 'Color Code', 'type': 'char', 'default': 'blue', 'value': 'red', }, { 'name': 'aa34746a6851ee4e', 'string': 'Partner', 'type': 'many2one', 'comodel': 'test_new_api.partner', 'value': [1337, 'Bob'], }] Output dict: { '3adf37f3258cfe40': 'red', 'aa34746a6851ee4e': 1337, } :param values_list: List of properties definition and value :return: Generate a dict {name: value} from this definitions / values list """ if not is_list_of(values_list, dict): raise ValueError(f'Wrong properties value {values_list!r}') cls._add_missing_names(values_list) dict_value = {} for property_definition in values_list: property_value = property_definition.get('value') property_type = property_definition.get('type') property_model = property_definition.get('comodel') if property_type not in ('integer', 'float') or property_value != 0: property_value = property_value or False if property_type in ('many2one', 'many2many') and property_model and property_value: # check that value are correct before storing them in database if property_type == 'many2many' and property_value and not is_list_of(property_value, int): raise ValueError(f"Wrong many2many value {property_value!r}") if property_type == 'many2one' and not isinstance(property_value, int): raise ValueError(f"Wrong many2one value {property_value!r}") dict_value[property_definition['name']] = property_value return dict_value @classmethod def _dict_to_list(cls, values_dict, properties_definition): """Convert a dict of {property: value} into a list of property definition with values. :param values_dict: JSON value coming from the child table :param properties_definition: Properties definition coming from the container table :return: Merge both value into a list of properties with value Ignore every values in the child that is not defined on the container. """ if not is_list_of(properties_definition, dict): raise ValueError(f'Wrong properties value {properties_definition!r}') values_list = copy.deepcopy(properties_definition) for property_definition in values_list: property_definition['value'] = values_dict.get(property_definition['name']) return values_list class PropertiesDefinition(Field): """ Field used to define the properties definition (see :class:`~odoo.fields.Properties` field). This field is used on the container record to define the structure of expected properties on subrecords. It is used to check the properties definition. """ type = 'properties_definition' column_type = ('jsonb', 'jsonb') copy = True # containers may act like templates, keep definitions to ease usage readonly = False prefetch = True REQUIRED_KEYS = ('name', 'type') ALLOWED_KEYS = ( 'name', 'string', 'type', 'comodel', 'default', 'selection', 'tags', 'domain', 'view_in_kanban', ) # those keys will be removed if the types does not match PROPERTY_PARAMETERS_MAP = { 'comodel': {'many2one', 'many2many'}, 'domain': {'many2one', 'many2many'}, 'selection': {'selection'}, 'tags': {'tags'}, } def convert_to_column(self, value, record, values=None, validate=True): """Convert the value before inserting it in database. This method accepts a list properties definition. The relational properties (many2one / many2many) default value might contain the name_get of those records (and will be removed). [{ 'name': '3adf37f3258cfe40', 'string': 'Color Code', 'type': 'char', 'default': 'blue', 'default': 'red', }, { 'name': 'aa34746a6851ee4e', 'string': 'Partner', 'type': 'many2one', 'comodel': 'test_new_api.partner', 'default': [1337, 'Bob'], }] """ if not value: return None if isinstance(value, str): value = json.loads(value) if not isinstance(value, list): raise ValueError(f'Wrong properties definition type {type(value)!r}') Properties._remove_display_name(value, value_key='default') self._validate_properties_definition(value, record.env) return json.dumps(value) def convert_to_cache(self, value, record, validate=True): # any format -> cache format (list of dicts or None) if not value: return None if isinstance(value, list): # avoid accidental side effects from shared mutable data, and make # the value strict with respect to JSON (tuple -> list, etc) value = json.dumps(value) if isinstance(value, str): value = json.loads(value) if not isinstance(value, list): raise ValueError(f'Wrong properties definition type {type(value)!r}') Properties._remove_display_name(value, value_key='default') self._validate_properties_definition(value, record.env) return value def convert_to_record(self, value, record): # cache format -> record format (list of dicts) if not value: return [] # return a copy of the definition in cache where all property # definitions have been cleaned up result = [] for property_definition in value: if not all(property_definition.get(key) for key in self.REQUIRED_KEYS): # some required keys are missing, ignore this property definition continue # don't modify the value in cache property_definition = dict(property_definition) # check if the model still exists in the environment, the module of the # model might have been uninstalled so the model might not exist anymore property_model = property_definition.get('comodel') if property_model and property_model not in record.env: property_definition['comodel'] = property_model = False if not property_model and 'domain' in property_definition: del property_definition['domain'] property_domain = property_definition.get('domain') if property_domain: # some fields in the domain might have been removed # (e.g. if the module has been uninstalled) # check if the domain is still valid try: expression.expression( ast.literal_eval(property_domain), record.env[property_model], ) except ValueError: del property_definition['domain'] result.append(property_definition) for property_parameter, allowed_types in self.PROPERTY_PARAMETERS_MAP.items(): if property_definition.get('type') not in allowed_types: property_definition.pop(property_parameter, None) return result def convert_to_read(self, value, record, use_name_get=True): # record format -> read format (list of dicts with display names) if not value: return value if use_name_get: Properties._add_display_name(value, record.env, value_keys=('default',)) return value @classmethod def _validate_properties_definition(cls, properties_definition, env): """Raise an error if the property definition is not valid.""" properties_names = set() for property_definition in properties_definition: property_definition_keys = set(property_definition.keys()) invalid_keys = property_definition_keys - set(cls.ALLOWED_KEYS) if invalid_keys: raise ValueError( 'Some key are not allowed for a properties definition [%s].' % ', '.join(invalid_keys), ) required_keys = set(cls.REQUIRED_KEYS) - property_definition_keys if required_keys: raise ValueError( 'Some key are missing for a properties definition [%s].' % ', '.join(required_keys), ) property_name = property_definition.get('name') if not property_name or property_name in properties_names: raise ValueError(f'The property name {property_name!r} is not set or duplicated.') properties_names.add(property_name) property_type = property_definition.get('type') if property_type and property_type not in Properties.ALLOWED_TYPES: raise ValueError(f'Wrong property type {property_type!r}.') model = property_definition.get('comodel') if model and model not in env: raise ValueError(f'Invalid model name {model!r}') property_selection = property_definition.get('selection') if property_selection: if (not is_list_of(property_selection, (list, tuple)) or not all(len(selection) == 2 for selection in property_selection)): raise ValueError(f'Wrong options {property_selection!r}.') all_options = [option[0] for option in property_selection] if len(all_options) != len(set(all_options)): duplicated = set(filter(lambda x: all_options.count(x) > 1, all_options)) raise ValueError(f'Some options are duplicated: {", ".join(duplicated)}.') property_tags = property_definition.get('tags') if property_tags: if (not is_list_of(property_tags, (list, tuple)) or not all(len(tag) == 3 and isinstance(tag[2], int) for tag in property_tags)): raise ValueError(f'Wrong tags definition {property_tags!r}.') all_tags = [tag[0] for tag in property_tags] if len(all_tags) != len(set(all_tags)): duplicated = set(filter(lambda x: all_tags.count(x) > 1, all_tags)) raise ValueError(f'Some tags are duplicated: {", ".join(duplicated)}.') class Command(enum.IntEnum): """ :class:`~odoo.fields.One2many` and :class:`~odoo.fields.Many2many` fields expect a special command to manipulate the relation they implement. Internally, each command is a 3-elements tuple where the first element is a mandatory integer that identifies the command, the second element is either the related record id to apply the command on (commands update, delete, unlink and link) either 0 (commands create, clear and set), the third element is either the ``values`` to write on the record (commands create and update) either the new ``ids`` list of related records (command set), either 0 (commands delete, unlink, link, and clear). Via Python, we encourage developers craft new commands via the various functions of this namespace. We also encourage developers to use the command identifier constant names when comparing the 1st element of existing commands. Via RPC, it is impossible nor to use the functions nor the command constant names. It is required to instead write the literal 3-elements tuple where the first element is the integer identifier of the command. """ CREATE = 0 UPDATE = 1 DELETE = 2 UNLINK = 3 LINK = 4 CLEAR = 5 SET = 6 @classmethod def create(cls, values: dict): """ Create new records in the comodel using ``values``, link the created records to ``self``. In case of a :class:`~odoo.fields.Many2many` relation, one unique new record is created in the comodel such that all records in `self` are linked to the new record. In case of a :class:`~odoo.fields.One2many` relation, one new record is created in the comodel for every record in ``self`` such that every record in ``self`` is linked to exactly one of the new records. Return the command triple :samp:`(CREATE, 0, {values})` """ return (cls.CREATE, 0, values) @classmethod def update(cls, id: int, values: dict): """ Write ``values`` on the related record. Return the command triple :samp:`(UPDATE, {id}, {values})` """ return (cls.UPDATE, id, values) @classmethod def delete(cls, id: int): """ Remove the related record from the database and remove its relation with ``self``. In case of a :class:`~odoo.fields.Many2many` relation, removing the record from the database may be prevented if it is still linked to other records. Return the command triple :samp:`(DELETE, {id}, 0)` """ return (cls.DELETE, id, 0) @classmethod def unlink(cls, id: int): """ Remove the relation between ``self`` and the related record. In case of a :class:`~odoo.fields.One2many` relation, the given record is deleted from the database if the inverse field is set as ``ondelete='cascade'``. Otherwise, the value of the inverse field is set to False and the record is kept. Return the command triple :samp:`(UNLINK, {id}, 0)` """ return (cls.UNLINK, id, 0) @classmethod def link(cls, id: int): """ Add a relation between ``self`` and the related record. Return the command triple :samp:`(LINK, {id}, 0)` """ return (cls.LINK, id, 0) @classmethod def clear(cls): """ Remove all records from the relation with ``self``. It behaves like executing the `unlink` command on every record. Return the command triple :samp:`(CLEAR, 0, 0)` """ return (cls.CLEAR, 0, 0) @classmethod def set(cls, ids: list): """ Replace the current relations of ``self`` by the given ones. It behaves like executing the ``unlink`` command on every removed relation then executing the ``link`` command on every new relation. Return the command triple :samp:`(SET, 0, {ids})` """ return (cls.SET, 0, ids) class _RelationalMulti(_Relational): r"Abstract class for relational fields \*2many." write_sequence = 20 # Important: the cache contains the ids of all the records in the relation, # including inactive records. Inactive records are filtered out by # convert_to_record(), depending on the context. def _update(self, records, value): """ Update the cached value of ``self`` for ``records`` with ``value``, and return whether everything is in cache. """ if not isinstance(records, BaseModel): # the inverse of self is a non-relational field; `value` is a # corecord that refers to `records` by an integer field model = value.env[self.model_name] domain = self.domain(model) if callable(self.domain) else self.domain if not value.filtered_domain(domain): return records = model.browse(records) result = True if value: cache = records.env.cache for record in records: if cache.contains(record, self): val = self.convert_to_cache(record[self.name] | value, record, validate=False) cache.set(record, self, val) else: result = False records.modified([self.name]) return result def convert_to_cache(self, value, record, validate=True): # cache format: tuple(ids) if isinstance(value, BaseModel): if validate and value._name != self.comodel_name: raise ValueError("Wrong value for %s: %s" % (self, value)) ids = value._ids if record and not record.id: # x2many field value of new record is new records ids = tuple(it and NewId(it) for it in ids) return ids elif isinstance(value, (list, tuple)): # value is a list/tuple of commands, dicts or record ids comodel = record.env[self.comodel_name] # if record is new, the field's value is new records if record and not record.id: browse = lambda it: comodel.browse((it and NewId(it),)) else: browse = comodel.browse # determine the value ids ids = OrderedSet(record[self.name]._ids if validate else ()) # modify ids with the commands for command in value: if isinstance(command, (tuple, list)): if command[0] == Command.CREATE: ids.add(comodel.new(command[2], ref=command[1]).id) elif command[0] == Command.UPDATE: line = browse(command[1]) if validate: line.update(command[2]) else: line._update_cache(command[2], validate=False) ids.add(line.id) elif command[0] in (Command.DELETE, Command.UNLINK): ids.discard(browse(command[1]).id) elif command[0] == Command.LINK: ids.add(browse(command[1]).id) elif command[0] == Command.CLEAR: ids.clear() elif command[0] == Command.SET: ids = OrderedSet(browse(it).id for it in command[2]) elif isinstance(command, dict): ids.add(comodel.new(command).id) else: ids.add(browse(command).id) # return result as a tuple return tuple(ids) elif not value: return () raise ValueError("Wrong value for %s: %s" % (self, value)) def convert_to_record(self, value, record): # use registry to avoid creating a recordset for the model prefetch_ids = PrefetchX2many(record, self) Comodel = record.pool[self.comodel_name] corecords = Comodel(record.env, value, prefetch_ids) if ( Comodel._active_name and self.context.get('active_test', record.env.context.get('active_test', True)) ): corecords = corecords.filtered(Comodel._active_name).with_prefetch(prefetch_ids) return corecords def convert_to_record_multi(self, values, records): # return the list of ids as a recordset without duplicates prefetch_ids = PrefetchX2many(records, self) Comodel = records.pool[self.comodel_name] ids = tuple(unique(id_ for ids in values for id_ in ids)) corecords = Comodel(records.env, ids, prefetch_ids) if ( Comodel._active_name and self.context.get('active_test', records.env.context.get('active_test', True)) ): corecords = corecords.filtered(Comodel._active_name).with_prefetch(prefetch_ids) return corecords def convert_to_read(self, value, record, use_name_get=True): return value.ids def convert_to_write(self, value, record): if isinstance(value, tuple): # a tuple of ids, this is the cache format value = record.env[self.comodel_name].browse(value) if isinstance(value, BaseModel) and value._name == self.comodel_name: def get_origin(val): return val._origin if isinstance(val, BaseModel) else val # make result with new and existing records inv_names = {field.name for field in record.pool.field_inverses[self]} result = [Command.set([])] for record in value: origin = record._origin if not origin: values = record._convert_to_write({ name: record[name] for name in record._cache if name not in inv_names }) result.append(Command.create(values)) else: result[0][2].append(origin.id) if record != origin: values = record._convert_to_write({ name: record[name] for name in record._cache if name not in inv_names and get_origin(record[name]) != origin[name] }) if values: result.append(Command.update(origin.id, values)) return result if value is False or value is None: return [Command.clear()] if isinstance(value, list): return value raise ValueError("Wrong value for %s: %s" % (self, value)) def convert_to_export(self, value, record): return ','.join(name for id, name in value.name_get()) if value else '' def convert_to_display_name(self, value, record): raise NotImplementedError() def get_depends(self, model): depends, depends_context = super().get_depends(model) if not self.compute and isinstance(self.domain, list): depends = unique(itertools.chain(depends, ( self.name + '.' + arg[0] for arg in self.domain if isinstance(arg, (tuple, list)) and isinstance(arg[0], str) ))) return depends, depends_context def create(self, record_values): """ Write the value of ``self`` on the given records, which have just been created. :param record_values: a list of pairs ``(record, value)``, where ``value`` is in the format of method :meth:`BaseModel.write` """ self.write_batch(record_values, True) def write(self, records, value): # discard recomputation of self on records records.env.remove_to_compute(self, records) return self.write_batch([(records, value)]) def write_batch(self, records_commands_list, create=False): if not records_commands_list: return False for idx, (recs, value) in enumerate(records_commands_list): if isinstance(value, tuple): value = [Command.set(value)] elif isinstance(value, BaseModel) and value._name == self.comodel_name: value = [Command.set(value._ids)] elif value is False or value is None: value = [Command.clear()] elif isinstance(value, list) and value and not isinstance(value[0], (tuple, list)): value = [Command.set(tuple(value))] if not isinstance(value, list): raise ValueError("Wrong value for %s: %s" % (self, value)) records_commands_list[idx] = (recs, value) record_ids = {rid for recs, cs in records_commands_list for rid in recs._ids} if all(record_ids): return self.write_real(records_commands_list, create) else: assert not any(record_ids) return self.write_new(records_commands_list) class One2many(_RelationalMulti): """One2many field; the value of such a field is the recordset of all the records in ``comodel_name`` such that the field ``inverse_name`` is equal to the current record. :param str comodel_name: name of the target model :param str inverse_name: name of the inverse ``Many2one`` field in ``comodel_name`` :param domain: an optional domain to set on candidate values on the client side (domain or string) :param dict context: an optional context to use on the client side when handling that field :param bool auto_join: whether JOINs are generated upon search through that field (default: ``False``) The attributes ``comodel_name`` and ``inverse_name`` are mandatory except in the case of related fields or field extensions. """ type = 'one2many' inverse_name = None # name of the inverse field auto_join = False # whether joins are generated upon search copy = False # o2m are not copied by default def __init__(self, comodel_name=Default, inverse_name=Default, string=Default, **kwargs): super(One2many, self).__init__( comodel_name=comodel_name, inverse_name=inverse_name, string=string, **kwargs ) def setup_nonrelated(self, model): super(One2many, self).setup_nonrelated(model) if self.inverse_name: # link self to its inverse field and vice-versa comodel = model.env[self.comodel_name] invf = comodel._fields[self.inverse_name] if isinstance(invf, (Many2one, Many2oneReference)): # setting one2many fields only invalidates many2one inverses; # integer inverses (res_model/res_id pairs) are not supported model.pool.field_inverses.add(self, invf) comodel.pool.field_inverses.add(invf, self) _description_relation_field = property(attrgetter('inverse_name')) def update_db(self, model, columns): if self.comodel_name in model.env: comodel = model.env[self.comodel_name] if self.inverse_name not in comodel._fields: raise UserError(_("No inverse field %r found for %r") % (self.inverse_name, self.comodel_name)) def get_domain_list(self, records): comodel = records.env.registry[self.comodel_name] inverse_field = comodel._fields[self.inverse_name] domain = super(One2many, self).get_domain_list(records) if inverse_field.type == 'many2one_reference': domain = domain + [(inverse_field.model_field, '=', records._name)] return domain def __get__(self, records, owner): if records is not None and self.inverse_name is not None: # force the computation of the inverse field to ensure that the # cache value of self is consistent inverse_field = records.pool[self.comodel_name]._fields[self.inverse_name] if inverse_field.compute: records.env[self.comodel_name]._recompute_model([self.inverse_name]) return super().__get__(records, owner) def read(self, records): # retrieve the lines in the comodel context = {'active_test': False} context.update(self.context) comodel = records.env[self.comodel_name].with_context(**context) inverse = self.inverse_name inverse_field = comodel._fields[inverse] domain = self.get_domain_list(records) + [(inverse, 'in', records.ids)] lines = comodel.search(domain) get_id = (lambda rec: rec.id) if inverse_field.type == 'many2one' else int if len(records) == 1: # optimization: all lines have the same value for 'inverse_field', # so we don't need to fetch it from database if not inverse_field._description_searchable: # fix: if the field is not searchable maybe we got some lines that are not linked to records lines = lines.with_context(prefetch_fields=False).filtered( lambda line: get_id(line[inverse]) == records.id ) records.env.cache.insert_missing(records, self, [lines._ids]) records.env.cache.insert_missing(lines, inverse_field, itertools.repeat(records.id)) return # group lines by inverse field (without prefetching other fields) group = defaultdict(list) for line in lines.with_context(prefetch_fields=False): # line[inverse] may be a record or an integer group[get_id(line[inverse])].append(line.id) # store result in cache values = [tuple(group[id_]) for id_ in records._ids] records.env.cache.insert_missing(records, self, values) def write_real(self, records_commands_list, create=False): """ Update real records. """ # records_commands_list = [(records, commands), ...] if not records_commands_list: return model = records_commands_list[0][0].browse() comodel = model.env[self.comodel_name].with_context(**self.context) ids = OrderedSet(rid for recs, cs in records_commands_list for rid in recs.ids) records = records_commands_list[0][0].browse(ids) if self.store: inverse = self.inverse_name to_create = [] # line vals to create to_delete = [] # line ids to delete to_link = defaultdict(OrderedSet) # {record: line_ids} allow_full_delete = not create def unlink(lines): if getattr(comodel._fields[inverse], 'ondelete', False) == 'cascade': to_delete.extend(lines._ids) else: lines[inverse] = False def flush(): if to_link: before = {record: record[self.name] for record in to_link} if to_delete: # unlink() will remove the lines from the cache comodel.browse(to_delete).unlink() to_delete.clear() if to_create: # create() will add the new lines to the cache of records comodel.create(to_create) to_create.clear() if to_link: for record, line_ids in to_link.items(): lines = comodel.browse(line_ids) - before[record] # linking missing lines should fail lines.mapped(inverse) lines[inverse] = record to_link.clear() for recs, commands in records_commands_list: for command in (commands or ()): if command[0] == Command.CREATE: for record in recs: to_create.append(dict(command[2], **{inverse: record.id})) allow_full_delete = False elif command[0] == Command.UPDATE: comodel.browse(command[1]).write(command[2]) elif command[0] == Command.DELETE: to_delete.append(command[1]) elif command[0] == Command.UNLINK: unlink(comodel.browse(command[1])) elif command[0] == Command.LINK: to_link[recs[-1]].add(command[1]) allow_full_delete = False elif command[0] in (Command.CLEAR, Command.SET): line_ids = command[2] if command[0] == Command.SET else [] if not allow_full_delete: # do not try to delete anything in creation mode if nothing has been created before if line_ids: # equivalent to Command.LINK if line_ids.__class__ is int: line_ids = [line_ids] to_link[recs[-1]].update(line_ids) allow_full_delete = False continue flush() # assign the given lines to the last record only lines = comodel.browse(line_ids) domain = self.get_domain_list(model) + \ [(inverse, 'in', recs.ids), ('id', 'not in', lines.ids)] unlink(comodel.search(domain)) lines[inverse] = recs[-1] flush() else: cache = records.env.cache def link(record, lines): ids = record[self.name]._ids cache.set(record, self, tuple(unique(ids + lines._ids))) def unlink(lines): for record in records: cache.set(record, self, (record[self.name] - lines)._ids) for recs, commands in records_commands_list: for command in (commands or ()): if command[0] == Command.CREATE: for record in recs: link(record, comodel.new(command[2], ref=command[1])) elif command[0] == Command.UPDATE: comodel.browse(command[1]).write(command[2]) elif command[0] == Command.DELETE: unlink(comodel.browse(command[1])) elif command[0] == Command.UNLINK: unlink(comodel.browse(command[1])) elif command[0] == Command.LINK: link(recs[-1], comodel.browse(command[1])) elif command[0] in (Command.CLEAR, Command.SET): # assign the given lines to the last record only cache.update(recs, self, itertools.repeat(())) lines = comodel.browse(command[2] if command[0] == Command.SET else []) cache.set(recs[-1], self, lines._ids) return records def write_new(self, records_commands_list): if not records_commands_list: return model = records_commands_list[0][0].browse() cache = model.env.cache comodel = model.env[self.comodel_name].with_context(**self.context) ids = {record.id for records, _ in records_commands_list for record in records} records = model.browse(ids) def browse(ids): return comodel.browse([id_ and NewId(id_) for id_ in ids]) # make sure self is in cache records[self.name] if self.store: inverse = self.inverse_name # make sure self's inverse is in cache inverse_field = comodel._fields[inverse] for record in records: cache.update(record[self.name], inverse_field, itertools.repeat(record.id)) for recs, commands in records_commands_list: for command in commands: if command[0] == Command.CREATE: for record in recs: line = comodel.new(command[2], ref=command[1]) line[inverse] = record elif command[0] == Command.UPDATE: browse([command[1]]).update(command[2]) elif command[0] == Command.DELETE: browse([command[1]])[inverse] = False elif command[0] == Command.UNLINK: browse([command[1]])[inverse] = False elif command[0] == Command.LINK: browse([command[1]])[inverse] = recs[-1] elif command[0] == Command.CLEAR: cache.update(recs, self, itertools.repeat(())) elif command[0] == Command.SET: # assign the given lines to the last record only cache.update(recs, self, itertools.repeat(())) last, lines = recs[-1], browse(command[2]) cache.set(last, self, lines._ids) cache.update(lines, inverse_field, itertools.repeat(last.id)) else: def link(record, lines): ids = record[self.name]._ids cache.set(record, self, tuple(unique(ids + lines._ids))) def unlink(lines): for record in records: cache.set(record, self, (record[self.name] - lines)._ids) for recs, commands in records_commands_list: for command in commands: if command[0] == Command.CREATE: for record in recs: link(record, comodel.new(command[2], ref=command[1])) elif command[0] == Command.UPDATE: browse([command[1]]).update(command[2]) elif command[0] == Command.DELETE: unlink(browse([command[1]])) elif command[0] == Command.UNLINK: unlink(browse([command[1]])) elif command[0] == Command.LINK: link(recs[-1], browse([command[1]])) elif command[0] in (Command.CLEAR, Command.SET): # assign the given lines to the last record only cache.update(recs, self, itertools.repeat(())) lines = comodel.browse(command[2] if command[0] == Command.SET else []) cache.set(recs[-1], self, lines._ids) return records class Many2many(_RelationalMulti): """ Many2many field; the value of such a field is the recordset. :param comodel_name: name of the target model (string) mandatory except in the case of related or extended fields :param str relation: optional name of the table that stores the relation in the database :param str column1: optional name of the column referring to "these" records in the table ``relation`` :param str column2: optional name of the column referring to "those" records in the table ``relation`` The attributes ``relation``, ``column1`` and ``column2`` are optional. If not given, names are automatically generated from model names, provided ``model_name`` and ``comodel_name`` are different! Note that having several fields with implicit relation parameters on a given model with the same comodel is not accepted by the ORM, since those field would use the same table. The ORM prevents two many2many fields to use the same relation parameters, except if - both fields use the same model, comodel, and relation parameters are explicit; or - at least one field belongs to a model with ``_auto = False``. :param domain: an optional domain to set on candidate values on the client side (domain or string) :param dict context: an optional context to use on the client side when handling that field :param bool check_company: Mark the field to be verified in :meth:`~odoo.models.Model._check_company`. Add a default company domain depending on the field attributes. """ type = 'many2many' _explicit = True # whether schema is explicitly given relation = None # name of table column1 = None # column of table referring to model column2 = None # column of table referring to comodel auto_join = False # whether joins are generated upon search ondelete = 'cascade' # optional ondelete for the column2 fkey def __init__(self, comodel_name=Default, relation=Default, column1=Default, column2=Default, string=Default, **kwargs): super(Many2many, self).__init__( comodel_name=comodel_name, relation=relation, column1=column1, column2=column2, string=string, **kwargs ) def setup_nonrelated(self, model): super().setup_nonrelated(model) # 2 cases: # 1) The ondelete attribute is defined and its definition makes sense # 2) The ondelete attribute is explicitly defined as 'set null' for a m2m, # this is considered a programming error. if self.ondelete not in ('cascade', 'restrict'): raise ValueError( "The m2m field %s of model %s declares its ondelete policy " "as being %r. Only 'restrict' and 'cascade' make sense." % (self.name, model._name, self.ondelete) ) if self.store: if not (self.relation and self.column1 and self.column2): if not self.relation: self._explicit = False # table name is based on the stable alphabetical order of tables comodel = model.env[self.comodel_name] if not self.relation: tables = sorted([model._table, comodel._table]) assert tables[0] != tables[1], \ "%s: Implicit/canonical naming of many2many relationship " \ "table is not possible when source and destination models " \ "are the same" % self self.relation = '%s_%s_rel' % tuple(tables) if not self.column1: self.column1 = '%s_id' % model._table if not self.column2: self.column2 = '%s_id' % comodel._table # check validity of table name check_pg_name(self.relation) else: self.relation = self.column1 = self.column2 = None if self.relation: m2m = model.pool._m2m # check whether other fields use the same schema fields = m2m[(self.relation, self.column1, self.column2)] for field in fields: if ( # same model: relation parameters must be explicit self.model_name == field.model_name and self.comodel_name == field.comodel_name and self._explicit and field._explicit ) or ( # different models: one model must be _auto=False self.model_name != field.model_name and not (model._auto and model.env[field.model_name]._auto) ): continue msg = "Many2many fields %s and %s use the same table and columns" raise TypeError(msg % (self, field)) fields.append(self) # retrieve inverse fields, and link them in field_inverses for field in m2m[(self.relation, self.column2, self.column1)]: model.pool.field_inverses.add(self, field) model.pool.field_inverses.add(field, self) def update_db(self, model, columns): cr = model._cr # Do not reflect relations for custom fields, as they do not belong to a # module. They are automatically removed when dropping the corresponding # 'ir.model.field'. if not self.manual: model.pool.post_init(model.env['ir.model.relation']._reflect_relation, model, self.relation, self._module) comodel = model.env[self.comodel_name] if not sql.table_exists(cr, self.relation): query = """ CREATE TABLE "{rel}" ("{id1}" INTEGER NOT NULL, "{id2}" INTEGER NOT NULL, PRIMARY KEY("{id1}","{id2}")); COMMENT ON TABLE "{rel}" IS %s; CREATE INDEX ON "{rel}" ("{id2}","{id1}"); """.format(rel=self.relation, id1=self.column1, id2=self.column2) cr.execute(query, ['RELATION BETWEEN %s AND %s' % (model._table, comodel._table)]) _schema.debug("Create table %r: m2m relation between %r and %r", self.relation, model._table, comodel._table) model.pool.post_init(self.update_db_foreign_keys, model) return True model.pool.post_init(self.update_db_foreign_keys, model) def update_db_foreign_keys(self, model): """ Add the foreign keys corresponding to the field's relation table. """ comodel = model.env[self.comodel_name] if model._is_an_ordinary_table(): model.pool.add_foreign_key( self.relation, self.column1, model._table, 'id', 'cascade', model, self._module, force=False, ) if comodel._is_an_ordinary_table(): model.pool.add_foreign_key( self.relation, self.column2, comodel._table, 'id', self.ondelete, model, self._module, ) @property def groupable(self): return self.store def read(self, records): context = {'active_test': False} context.update(self.context) comodel = records.env[self.comodel_name].with_context(**context) domain = self.get_domain_list(records) comodel._flush_search(domain) wquery = comodel._where_calc(domain) comodel._apply_ir_rules(wquery, 'read') order_by = comodel._generate_order_by(None, wquery) from_c, where_c, where_params = wquery.get_sql() query = """ SELECT {rel}.{id1}, {rel}.{id2} FROM {rel}, {from_c} WHERE {where_c} AND {rel}.{id1} IN %s AND {rel}.{id2} = {tbl}.id {order_by} """.format(rel=self.relation, id1=self.column1, id2=self.column2, tbl=comodel._table, from_c=from_c, where_c=where_c or '1=1', order_by=order_by) where_params.append(tuple(records.ids)) # retrieve lines and group them by record group = defaultdict(list) records._cr.execute(query, where_params) for row in records._cr.fetchall(): group[row[0]].append(row[1]) # store result in cache values = [tuple(group[id_]) for id_ in records._ids] records.env.cache.insert_missing(records, self, values) def write_real(self, records_commands_list, create=False): # records_commands_list = [(records, commands), ...] if not records_commands_list: return model = records_commands_list[0][0].browse() comodel = model.env[self.comodel_name].with_context(**self.context) cr = model.env.cr # determine old and new relation {x: ys} set = OrderedSet ids = set(rid for recs, cs in records_commands_list for rid in recs.ids) records = model.browse(ids) if self.store: # Using `record[self.name]` generates 2 SQL queries when the value # is not in cache: one that actually checks access rules for # records, and the other one fetching the actual data. We use # `self.read` instead to shortcut the first query. missing_ids = list(records.env.cache.get_missing_ids(records, self)) if missing_ids: self.read(records.browse(missing_ids)) # determine new relation {x: ys} old_relation = {record.id: set(record[self.name]._ids) for record in records} new_relation = {x: set(ys) for x, ys in old_relation.items()} # operations on new relation def relation_add(xs, y): for x in xs: new_relation[x].add(y) def relation_remove(xs, y): for x in xs: new_relation[x].discard(y) def relation_set(xs, ys): for x in xs: new_relation[x] = set(ys) def relation_delete(ys): # the pairs (x, y) have been cascade-deleted from relation for ys1 in old_relation.values(): ys1 -= ys for ys1 in new_relation.values(): ys1 -= ys for recs, commands in records_commands_list: to_create = [] # line vals to create to_delete = [] # line ids to delete for command in (commands or ()): if not isinstance(command, (list, tuple)) or not command: continue if command[0] == Command.CREATE: to_create.append((recs._ids, command[2])) elif command[0] == Command.UPDATE: comodel.browse(command[1]).write(command[2]) elif command[0] == Command.DELETE: to_delete.append(command[1]) elif command[0] == Command.UNLINK: relation_remove(recs._ids, command[1]) elif command[0] == Command.LINK: relation_add(recs._ids, command[1]) elif command[0] in (Command.CLEAR, Command.SET): # new lines must no longer be linked to records to_create = [(set(ids) - set(recs._ids), vals) for (ids, vals) in to_create] relation_set(recs._ids, command[2] if command[0] == Command.SET else ()) if to_create: # create lines in batch, and link them lines = comodel.create([vals for ids, vals in to_create]) for line, (ids, vals) in zip(lines, to_create): relation_add(ids, line.id) if to_delete: # delete lines in batch comodel.browse(to_delete).unlink() relation_delete(to_delete) # update the cache of self cache = records.env.cache for record in records: cache.set(record, self, tuple(new_relation[record.id])) # determine the corecords for which the relation has changed modified_corecord_ids = set() # process pairs to add (beware of duplicates) pairs = [(x, y) for x, ys in new_relation.items() for y in ys - old_relation[x]] if pairs: if self.store: query = "INSERT INTO {} ({}, {}) VALUES {} ON CONFLICT DO NOTHING".format( self.relation, self.column1, self.column2, ", ".join(["%s"] * len(pairs)), ) cr.execute(query, pairs) # update the cache of inverse fields y_to_xs = defaultdict(set) for x, y in pairs: y_to_xs[y].add(x) modified_corecord_ids.add(y) for invf in records.pool.field_inverses[self]: domain = invf.get_domain_list(comodel) valid_ids = set(records.filtered_domain(domain)._ids) if not valid_ids: continue for y, xs in y_to_xs.items(): corecord = comodel.browse(y) try: ids0 = cache.get(corecord, invf) ids1 = tuple(set(ids0) | (xs & valid_ids)) cache.set(corecord, invf, ids1) except KeyError: pass # process pairs to remove pairs = [(x, y) for x, ys in old_relation.items() for y in ys - new_relation[x]] if pairs: y_to_xs = defaultdict(set) for x, y in pairs: y_to_xs[y].add(x) modified_corecord_ids.add(y) if self.store: # express pairs as the union of cartesian products: # pairs = [(1, 11), (1, 12), (1, 13), (2, 11), (2, 12), (2, 14)] # -> y_to_xs = {11: {1, 2}, 12: {1, 2}, 13: {1}, 14: {2}} # -> xs_to_ys = {{1, 2}: {11, 12}, {2}: {14}, {1}: {13}} xs_to_ys = defaultdict(set) for y, xs in y_to_xs.items(): xs_to_ys[frozenset(xs)].add(y) # delete the rows where (id1 IN xs AND id2 IN ys) OR ... COND = "{} IN %s AND {} IN %s".format(self.column1, self.column2) query = "DELETE FROM {} WHERE {}".format( self.relation, " OR ".join([COND] * len(xs_to_ys)), ) params = [arg for xs, ys in xs_to_ys.items() for arg in [tuple(xs), tuple(ys)]] cr.execute(query, params) # update the cache of inverse fields for invf in records.pool.field_inverses[self]: for y, xs in y_to_xs.items(): corecord = comodel.browse(y) try: ids0 = cache.get(corecord, invf) ids1 = tuple(id_ for id_ in ids0 if id_ not in xs) cache.set(corecord, invf, ids1) except KeyError: pass if modified_corecord_ids: # trigger the recomputation of fields that depend on the inverse # fields of self on the modified corecords corecords = comodel.browse(modified_corecord_ids) corecords.modified([ invf.name for invf in model.pool.field_inverses[self] if invf.model_name == self.comodel_name ]) return records.filtered( lambda record: new_relation[record.id] != old_relation[record.id] ) def write_new(self, records_commands_list): """ Update self on new records. """ if not records_commands_list: return model = records_commands_list[0][0].browse() comodel = model.env[self.comodel_name].with_context(**self.context) new = lambda id_: id_ and NewId(id_) # determine old and new relation {x: ys} set = OrderedSet old_relation = {record.id: set(record[self.name]._ids) for records, _ in records_commands_list for record in records} new_relation = {x: set(ys) for x, ys in old_relation.items()} ids = set(old_relation.keys()) records = model.browse(ids) for recs, commands in records_commands_list: for command in commands: if not isinstance(command, (list, tuple)) or not command: continue if command[0] == Command.CREATE: line_id = comodel.new(command[2], ref=command[1]).id for line_ids in new_relation.values(): line_ids.add(line_id) elif command[0] == Command.UPDATE: line_id = new(command[1]) comodel.browse([line_id]).update(command[2]) elif command[0] == Command.DELETE: line_id = new(command[1]) for line_ids in new_relation.values(): line_ids.discard(line_id) elif command[0] == Command.UNLINK: line_id = new(command[1]) for line_ids in new_relation.values(): line_ids.discard(line_id) elif command[0] == Command.LINK: line_id = new(command[1]) for line_ids in new_relation.values(): line_ids.add(line_id) elif command[0] in (Command.CLEAR, Command.SET): # new lines must no longer be linked to records line_ids = command[2] if command[0] == Command.SET else () line_ids = set(new(line_id) for line_id in line_ids) for id_ in recs._ids: new_relation[id_] = set(line_ids) if new_relation == old_relation: return records.browse() # update the cache of self cache = records.env.cache for record in records: cache.set(record, self, tuple(new_relation[record.id])) # determine the corecords for which the relation has changed modified_corecord_ids = set() # process pairs to add (beware of duplicates) pairs = [(x, y) for x, ys in new_relation.items() for y in ys - old_relation[x]] if pairs: # update the cache of inverse fields y_to_xs = defaultdict(set) for x, y in pairs: y_to_xs[y].add(x) modified_corecord_ids.add(y) for invf in records.pool.field_inverses[self]: domain = invf.get_domain_list(comodel) valid_ids = set(records.filtered_domain(domain)._ids) if not valid_ids: continue for y, xs in y_to_xs.items(): corecord = comodel.browse([y]) try: ids0 = cache.get(corecord, invf) ids1 = tuple(set(ids0) | (xs & valid_ids)) cache.set(corecord, invf, ids1) except KeyError: pass # process pairs to remove pairs = [(x, y) for x, ys in old_relation.items() for y in ys - new_relation[x]] if pairs: # update the cache of inverse fields y_to_xs = defaultdict(set) for x, y in pairs: y_to_xs[y].add(x) modified_corecord_ids.add(y) for invf in records.pool.field_inverses[self]: for y, xs in y_to_xs.items(): corecord = comodel.browse([y]) try: ids0 = cache.get(corecord, invf) ids1 = tuple(id_ for id_ in ids0 if id_ not in xs) cache.set(corecord, invf, ids1) except KeyError: pass if modified_corecord_ids: # trigger the recomputation of fields that depend on the inverse # fields of self on the modified corecords corecords = comodel.browse(modified_corecord_ids) corecords.modified([ invf.name for invf in model.pool.field_inverses[self] if invf.model_name == self.comodel_name ]) return records.filtered( lambda record: new_relation[record.id] != old_relation[record.id] ) class Id(Field): """ Special case for field 'id'. """ type = 'integer' column_type = ('int4', 'int4') string = 'ID' store = True readonly = True prefetch = False def update_db(self, model, columns): pass # this column is created with the table def __get__(self, record, owner): if record is None: return self # the field is accessed through the class owner # the code below is written to make record.id as quick as possible ids = record._ids size = len(ids) if size == 0: return False elif size == 1: return ids[0] raise ValueError("Expected singleton: %s" % record) def __set__(self, record, value): raise TypeError("field 'id' cannot be assigned") class PrefetchMany2one: """ Iterable for the values of a many2one field on the prefetch set of a given record. """ __slots__ = 'record', 'field' def __init__(self, record, field): self.record = record self.field = field def __iter__(self): records = self.record.browse(self.record._prefetch_ids) ids = self.record.env.cache.get_values(records, self.field) return unique(id_ for id_ in ids if id_ is not None) def __reversed__(self): records = self.record.browse(reversed(self.record._prefetch_ids)) ids = self.record.env.cache.get_values(records, self.field) return unique(id_ for id_ in ids if id_ is not None) class PrefetchX2many: """ Iterable for the values of an x2many field on the prefetch set of a given record. """ __slots__ = 'record', 'field' def __init__(self, record, field): self.record = record self.field = field def __iter__(self): records = self.record.browse(self.record._prefetch_ids) ids_list = self.record.env.cache.get_values(records, self.field) return unique(id_ for ids in ids_list for id_ in ids) def __reversed__(self): records = self.record.browse(reversed(self.record._prefetch_ids)) ids_list = self.record.env.cache.get_values(records, self.field) return unique(id_ for ids in ids_list for id_ in ids) def apply_required(model, field_name): """ Set a NOT NULL constraint on the given field, if necessary. """ # At the time this function is called, the model's _fields may have been reset, although # the model's class is still the same. Retrieve the field to see whether the NOT NULL # constraint still applies field = model._fields[field_name] if field.store and field.required: sql.set_not_null(model.env.cr, model._table, field_name) # imported here to avoid dependency cycle issues # pylint: disable=wrong-import-position from .exceptions import AccessError, MissingError, UserError from .models import ( check_pg_name, expand_ids, is_definition_class, BaseModel, IdType, NewId, PREFETCH_MAX, )