# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import base64 from collections import defaultdict, OrderedDict from decorator import decorator from operator import attrgetter import io import logging import os import shutil import tempfile import threading import zipfile import requests import werkzeug.urls from docutils import nodes from docutils.core import publish_string from docutils.transforms import Transform, writer_aux from docutils.writers.html4css1 import Writer import lxml.html import psycopg2 import odoo from odoo import api, fields, models, modules, tools, _ from odoo.addons.base.models.ir_model import MODULE_UNINSTALL_FLAG from odoo.exceptions import AccessDenied, UserError from odoo.osv import expression from odoo.tools.parse_version import parse_version from odoo.tools.misc import topological_sort from odoo.tools.translate import TranslationImporter from odoo.http import request from odoo.modules import get_module_path, get_module_resource _logger = logging.getLogger(__name__) ACTION_DICT = { 'view_mode': 'form', 'res_model': 'base.module.upgrade', 'target': 'new', 'type': 'ir.actions.act_window', } def backup(path, raise_exception=True): path = os.path.normpath(path) if not os.path.exists(path): if not raise_exception: return None raise OSError('path does not exists') cnt = 1 while True: bck = '%s~%d' % (path, cnt) if not os.path.exists(bck): shutil.move(path, bck) return bck cnt += 1 def assert_log_admin_access(method): """Decorator checking that the calling user is an administrator, and logging the call. Raises an AccessDenied error if the user does not have administrator privileges, according to `user._is_admin()`. """ def check_and_log(method, self, *args, **kwargs): user = self.env.user origin = request.httprequest.remote_addr if request else 'n/a' log_data = (method.__name__, self.sudo().mapped('display_name'), user.login, user.id, origin) if not self.env.is_admin(): _logger.warning('DENY access to module.%s on %s to user %s ID #%s via %s', *log_data) raise AccessDenied() _logger.info('ALLOW access to module.%s on %s to user %s #%s via %s', *log_data) return method(self, *args, **kwargs) return decorator(check_and_log, method) class ModuleCategory(models.Model): _name = "ir.module.category" _description = "Application" _order = 'name' @api.depends('module_ids') def _compute_module_nr(self): self.env['ir.module.module'].flush_model(['category_id']) self.flush_model(['parent_id']) cr = self._cr cr.execute('SELECT category_id, COUNT(*) \ FROM ir_module_module \ WHERE category_id IN %(ids)s \ OR category_id IN (SELECT id \ FROM ir_module_category \ WHERE parent_id IN %(ids)s) \ GROUP BY category_id', {'ids': tuple(self.ids)} ) result = dict(cr.fetchall()) for cat in self.filtered('id'): cr.execute('SELECT id FROM ir_module_category WHERE parent_id=%s', (cat.id,)) cat.module_nr = sum([result.get(c, 0) for (c,) in cr.fetchall()], result.get(cat.id, 0)) name = fields.Char(string='Name', required=True, translate=True, index=True) parent_id = fields.Many2one('ir.module.category', string='Parent Application', index=True) child_ids = fields.One2many('ir.module.category', 'parent_id', string='Child Applications') module_nr = fields.Integer(string='Number of Apps', compute='_compute_module_nr') module_ids = fields.One2many('ir.module.module', 'category_id', string='Modules') description = fields.Text(string='Description', translate=True) sequence = fields.Integer(string='Sequence') visible = fields.Boolean(string='Visible', default=True) exclusive = fields.Boolean(string='Exclusive') xml_id = fields.Char(string='External ID', compute='_compute_xml_id') def _compute_xml_id(self): xml_ids = defaultdict(list) domain = [('model', '=', self._name), ('res_id', 'in', self.ids)] for data in self.env['ir.model.data'].sudo().search_read(domain, ['module', 'name', 'res_id']): xml_ids[data['res_id']].append("%s.%s" % (data['module'], data['name'])) for cat in self: cat.xml_id = xml_ids.get(cat.id, [''])[0] class MyFilterMessages(Transform): """ Custom docutils transform to remove `system message` for a document and generate warnings. (The standard filter removes them based on some `report_level` passed in the `settings_override` dictionary, but if we use it, we can't see them and generate warnings.) """ default_priority = 870 def apply(self): for node in self.document.traverse(nodes.system_message): _logger.warning("docutils' system message present: %s", str(node)) node.parent.remove(node) class MyWriter(Writer): """ Custom docutils html4ccs1 writer that doesn't add the warnings to the output document. """ def get_transforms(self): return [MyFilterMessages, writer_aux.Admonitions] STATES = [ ('uninstallable', 'Uninstallable'), ('uninstalled', 'Not Installed'), ('installed', 'Installed'), ('to upgrade', 'To be upgraded'), ('to remove', 'To be removed'), ('to install', 'To be installed'), ] XML_DECLARATION = ( ' parse_version(mod.latest_version or default_version): res[0] += 1 if updated_values: mod.write(updated_values) else: mod_path = modules.get_module_path(mod_name) if not mod_path or not terp: continue state = "uninstalled" if terp.get('installable', True) else "uninstallable" mod = self.create(dict(name=mod_name, state=state, **values)) res[1] += 1 mod._update_dependencies(terp.get('depends', []), terp.get('auto_install')) mod._update_exclusions(terp.get('excludes', [])) mod._update_category(terp.get('category', 'Uncategorized')) return res @assert_log_admin_access def download(self, download=True): return [] @assert_log_admin_access @api.model def install_from_urls(self, urls): if not self.env.user.has_group('base.group_system'): raise AccessDenied() # One-click install is opt-in - cfr Issue #15225 ad_dir = tools.config.addons_data_dir if not os.access(ad_dir, os.W_OK): msg = (_("Automatic install of downloaded Apps is currently disabled.") + "\n\n" + _("To enable it, make sure this directory exists and is writable on the server:") + "\n%s" % ad_dir) _logger.warning(msg) raise UserError(msg) apps_server = werkzeug.urls.url_parse(self.get_apps_server()) OPENERP = odoo.release.product_name.lower() tmp = tempfile.mkdtemp() _logger.debug('Install from url: %r', urls) try: # 1. Download & unzip missing modules for module_name, url in urls.items(): if not url: continue # nothing to download, local version is already the last one up = werkzeug.urls.url_parse(url) if up.scheme != apps_server.scheme or up.netloc != apps_server.netloc: raise AccessDenied() try: _logger.info('Downloading module `%s` from OpenERP Apps', module_name) response = requests.get(url) response.raise_for_status() content = response.content except Exception: _logger.exception('Failed to fetch module %s', module_name) raise UserError(_('The `%s` module appears to be unavailable at the moment, please try again later.', module_name)) else: zipfile.ZipFile(io.BytesIO(content)).extractall(tmp) assert os.path.isdir(os.path.join(tmp, module_name)) # 2a. Copy/Replace module source in addons path for module_name, url in urls.items(): if module_name == OPENERP or not url: continue # OPENERP is special case, handled below, and no URL means local module module_path = modules.get_module_path(module_name, downloaded=True, display_warning=False) bck = backup(module_path, False) _logger.info('Copy downloaded module `%s` to `%s`', module_name, module_path) shutil.move(os.path.join(tmp, module_name), module_path) if bck: shutil.rmtree(bck) # 2b. Copy/Replace server+base module source if downloaded if urls.get(OPENERP): # special case. it contains the server and the base module. # extract path is not the same base_path = os.path.dirname(modules.get_module_path('base')) # copy all modules in the SERVER/odoo/addons directory to the new "odoo" module (except base itself) for d in os.listdir(base_path): if d != 'base' and os.path.isdir(os.path.join(base_path, d)): destdir = os.path.join(tmp, OPENERP, 'addons', d) # XXX 'odoo' subdirectory ? shutil.copytree(os.path.join(base_path, d), destdir) # then replace the server by the new "base" module server_dir = tools.config['root_path'] # XXX or dirname() bck = backup(server_dir) _logger.info('Copy downloaded module `odoo` to `%s`', server_dir) shutil.move(os.path.join(tmp, OPENERP), server_dir) #if bck: # shutil.rmtree(bck) self.update_list() with_urls = [module_name for module_name, url in urls.items() if url] downloaded = self.search([('name', 'in', with_urls)]) installed = self.search([('id', 'in', downloaded.ids), ('state', '=', 'installed')]) to_install = self.search([('name', 'in', list(urls)), ('state', '=', 'uninstalled')]) post_install_action = to_install.button_immediate_install() if installed or to_install: # in this case, force server restart to reload python code... self._cr.commit() odoo.service.server.restart() return { 'type': 'ir.actions.client', 'tag': 'home', 'params': {'wait': True}, } return post_install_action finally: shutil.rmtree(tmp) @api.model def get_apps_server(self): return tools.config.get('apps_server', 'https://apps.odoo.com/apps') def _update_dependencies(self, depends=None, auto_install_requirements=()): self.env['ir.module.module.dependency'].flush_model() existing = set(dep.name for dep in self.dependencies_id) needed = set(depends or []) for dep in (needed - existing): self._cr.execute('INSERT INTO ir_module_module_dependency (module_id, name) values (%s, %s)', (self.id, dep)) for dep in (existing - needed): self._cr.execute('DELETE FROM ir_module_module_dependency WHERE module_id = %s and name = %s', (self.id, dep)) self._cr.execute('UPDATE ir_module_module_dependency SET auto_install_required = (name = any(%s)) WHERE module_id = %s', (list(auto_install_requirements or ()), self.id)) self.env['ir.module.module.dependency'].invalidate_model(['auto_install_required']) self.invalidate_recordset(['dependencies_id']) def _update_exclusions(self, excludes=None): self.env['ir.module.module.exclusion'].flush_model() existing = set(excl.name for excl in self.exclusion_ids) needed = set(excludes or []) for name in (needed - existing): self._cr.execute('INSERT INTO ir_module_module_exclusion (module_id, name) VALUES (%s, %s)', (self.id, name)) for name in (existing - needed): self._cr.execute('DELETE FROM ir_module_module_exclusion WHERE module_id=%s AND name=%s', (self.id, name)) self.invalidate_recordset(['exclusion_ids']) def _update_category(self, category='Uncategorized'): current_category = self.category_id current_category_path = [] while current_category: current_category_path.insert(0, current_category.name) current_category = current_category.parent_id categs = category.split('/') if categs != current_category_path: cat_id = modules.db.create_categories(self._cr, categs) self.write({'category_id': cat_id}) def _update_translations(self, filter_lang=None, overwrite=False): if not filter_lang: langs = self.env['res.lang'].get_installed() filter_lang = [code for code, _ in langs] elif not isinstance(filter_lang, (list, tuple)): filter_lang = [filter_lang] update_mods = self.filtered(lambda r: r.state in ('installed', 'to install', 'to upgrade')) mod_dict = { mod.name: mod.dependencies_id.mapped('name') for mod in update_mods } mod_names = topological_sort(mod_dict) self.env['ir.module.module']._load_module_terms(mod_names, filter_lang, overwrite) def _check(self): for module in self: if not module.description_html: _logger.warning('module %s: description is empty !', module.name) def _get(self, name): """ Return the (sudoed) `ir.module.module` record with the given name. The result may be an empty recordset if the module is not found. """ model_id = self._get_id(name) if name else False return self.browse(model_id).sudo() @tools.ormcache('name') def _get_id(self, name): self.flush_model(['name']) self.env.cr.execute("SELECT id FROM ir_module_module WHERE name=%s", (name,)) return self.env.cr.fetchone() @api.model @tools.ormcache() def _installed(self): """ Return the set of installed modules as a dictionary {name: id} """ return { module.name: module.id for module in self.sudo().search([('state', '=', 'installed')]) } @api.model def search_panel_select_range(self, field_name, **kwargs): if field_name == 'category_id': enable_counters = kwargs.get('enable_counters', False) domain = [('parent_id', '=', False), ('child_ids.module_ids', '!=', False)] excluded_xmlids = [ 'base.module_category_website_theme', 'base.module_category_theme', ] if not self.user_has_groups('base.group_no_one'): excluded_xmlids.append('base.module_category_hidden') excluded_category_ids = [] for excluded_xmlid in excluded_xmlids: categ = self.env.ref(excluded_xmlid, False) if not categ: continue excluded_category_ids.append(categ.id) if excluded_category_ids: domain = expression.AND([ domain, [('id', 'not in', excluded_category_ids)], ]) records = self.env['ir.module.category'].search_read(domain, ['display_name'], order="sequence") values_range = OrderedDict() for record in records: record_id = record['id'] if enable_counters: model_domain = expression.AND([ kwargs.get('search_domain', []), kwargs.get('category_domain', []), kwargs.get('filter_domain', []), [('category_id', 'child_of', record_id), ('category_id', 'not in', excluded_category_ids)] ]) record['__count'] = self.env['ir.module.module'].search_count(model_domain) values_range[record_id] = record return { 'parent_field': 'parent_id', 'values': list(values_range.values()), } return super(Module, self).search_panel_select_range(field_name, **kwargs) @api.model def _load_module_terms(self, modules, langs, overwrite=False): """ Load PO files of the given modules for the given languages. """ # load i18n files translation_importer = TranslationImporter(self.env.cr, verbose=False) for module_name in modules: modpath = get_module_path(module_name) if not modpath: continue for lang in langs: lang_code = tools.get_iso_codes(lang) base_lang_code = None if '_' in lang_code: base_lang_code = lang_code.split('_')[0] # Step 1: for sub-languages, load base language first (e.g. es_CL.po is loaded over es.po) if base_lang_code: base_trans_file = get_module_resource(module_name, 'i18n', base_lang_code + '.po') if base_trans_file: _logger.info('module %s: loading base translation file %s for language %s', module_name, base_lang_code, lang) translation_importer.load_file(base_trans_file, lang) # i18n_extra folder is for additional translations handle manually (eg: for l10n_be) base_trans_extra_file = get_module_resource(module_name, 'i18n_extra', base_lang_code + '.po') if base_trans_extra_file: _logger.info('module %s: loading extra base translation file %s for language %s', module_name, base_lang_code, lang) translation_importer.load_file(base_trans_extra_file, lang) # Step 2: then load the main translation file, possibly overriding the terms coming from the base language trans_file = get_module_resource(module_name, 'i18n', lang_code + '.po') if trans_file: _logger.info('module %s: loading translation file %s for language %s', module_name, lang_code, lang) translation_importer.load_file(trans_file, lang) elif lang_code != 'en_US': _logger.info('module %s: no translation for language %s', module_name, lang_code) trans_extra_file = get_module_resource(module_name, 'i18n_extra', lang_code + '.po') if trans_extra_file: _logger.info('module %s: loading extra translation file %s for language %s', module_name, lang_code, lang) translation_importer.load_file(trans_extra_file, lang) translation_importer.save(overwrite=overwrite) DEP_STATES = STATES + [('unknown', 'Unknown')] class ModuleDependency(models.Model): _name = "ir.module.module.dependency" _description = "Module dependency" _log_access = False # inserts are done manually, create and write uid, dates are always null # the dependency name name = fields.Char(index=True) # the module that depends on it module_id = fields.Many2one('ir.module.module', 'Module', ondelete='cascade') # the module corresponding to the dependency, and its status depend_id = fields.Many2one('ir.module.module', 'Dependency', compute='_compute_depend', search='_search_depend') state = fields.Selection(DEP_STATES, string='Status', compute='_compute_state') auto_install_required = fields.Boolean( default=True, help="Whether this dependency blocks automatic installation " "of the dependent") @api.depends('name') def _compute_depend(self): # retrieve all modules corresponding to the dependency names names = list(set(dep.name for dep in self)) mods = self.env['ir.module.module'].search([('name', 'in', names)]) # index modules by name, and assign dependencies name_mod = dict((mod.name, mod) for mod in mods) for dep in self: dep.depend_id = name_mod.get(dep.name) def _search_depend(self, operator, value): assert operator == 'in' modules = self.env['ir.module.module'].browse(set(value)) return [('name', 'in', modules.mapped('name'))] @api.depends('depend_id.state') def _compute_state(self): for dependency in self: dependency.state = dependency.depend_id.state or 'unknown' class ModuleExclusion(models.Model): _name = "ir.module.module.exclusion" _description = "Module exclusion" # the exclusion name name = fields.Char(index=True) # the module that excludes it module_id = fields.Many2one('ir.module.module', 'Module', ondelete='cascade') # the module corresponding to the exclusion, and its status exclusion_id = fields.Many2one('ir.module.module', 'Exclusion Module', compute='_compute_exclusion', search='_search_exclusion') state = fields.Selection(DEP_STATES, string='Status', compute='_compute_state') @api.depends('name') def _compute_exclusion(self): # retrieve all modules corresponding to the exclusion names names = list(set(excl.name for excl in self)) mods = self.env['ir.module.module'].search([('name', 'in', names)]) # index modules by name, and assign dependencies name_mod = {mod.name: mod for mod in mods} for excl in self: excl.exclusion_id = name_mod.get(excl.name) def _search_exclusion(self, operator, value): assert operator == 'in' modules = self.env['ir.module.module'].browse(set(value)) return [('name', 'in', modules.mapped('name'))] @api.depends('exclusion_id.state') def _compute_state(self): for exclusion in self: exclusion.state = exclusion.exclusion_id.state or 'unknown'