12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409 |
- # -*- coding: utf-8 -*-
- # Part of Odoo. See LICENSE file for full copyright and licensing details.
- import base64
- import hashlib
- import hmac
- import io
- import logging
- import lxml
- import random
- import re
- import requests
- import threading
- import werkzeug.urls
- from ast import literal_eval
- from dateutil.relativedelta import relativedelta
- from markupsafe import Markup
- from werkzeug.urls import url_join
- from PIL import Image, UnidentifiedImageError
- from odoo import api, fields, models, tools, _
- from odoo.addons.base_import.models.base_import import ImportValidationError
- from odoo.exceptions import UserError, ValidationError
- from odoo.osv import expression
- _logger = logging.getLogger(__name__)
- # Syntax of the data URL Scheme: https://tools.ietf.org/html/rfc2397#section-3
- # Used to find inline images
- image_re = re.compile(r"data:(image/[A-Za-z]+);base64,(.*)")
- DEFAULT_IMAGE_TIMEOUT = 3
- DEFAULT_IMAGE_MAXBYTES = 10 * 1024 * 1024 # 10MB
- DEFAULT_IMAGE_CHUNK_SIZE = 32768
- mso_re = re.compile(r"\[if mso\]>[\s\S]*<!\[endif\]")
- class MassMailing(models.Model):
- """ Mass Mailing models the sending of emails to a list of recipients for a mass mailing campaign."""
- _name = 'mailing.mailing'
- _description = 'Mass Mailing'
- _inherit = ['mail.thread',
- 'mail.activity.mixin',
- 'mail.render.mixin',
- 'utm.source.mixin'
- ]
- _order = 'calendar_date DESC'
- _rec_name = "subject"
- @api.model
- def default_get(self, fields_list):
- vals = super(MassMailing, self).default_get(fields_list)
- # field sent by the calendar view when clicking on a date block
- # we use it to setup the scheduled date of the created mailing.mailing
- default_calendar_date = self.env.context.get('default_calendar_date')
- if default_calendar_date and ('schedule_type' in fields_list and 'schedule_date' in fields_list) \
- and fields.Datetime.from_string(default_calendar_date) > fields.Datetime.now():
- vals.update({
- 'schedule_type': 'scheduled',
- 'schedule_date': default_calendar_date
- })
- if 'contact_list_ids' in fields_list and not vals.get('contact_list_ids') and vals.get('mailing_model_id'):
- if vals.get('mailing_model_id') == self.env['ir.model']._get('mailing.list').id:
- mailing_list = self.env['mailing.list'].search([], limit=2)
- if len(mailing_list) == 1:
- vals['contact_list_ids'] = [(6, 0, [mailing_list.id])]
- return vals
- @api.model
- def _get_default_mail_server_id(self):
- server_id = self.env['ir.config_parameter'].sudo().get_param('mass_mailing.mail_server_id')
- try:
- server_id = literal_eval(server_id) if server_id else False
- return self.env['ir.mail_server'].search([('id', '=', server_id)]).id
- except ValueError:
- return False
- active = fields.Boolean(default=True, tracking=True)
- subject = fields.Char(
- 'Subject', required=True, translate=False)
- preview = fields.Char(
- 'Preview', translate=False,
- help='Catchy preview sentence that encourages recipients to open this email.\n'
- 'In most inboxes, this is displayed next to the subject.\n'
- 'Keep it empty if you prefer the first characters of your email content to appear instead.')
- email_from = fields.Char(
- string='Send From',
- compute='_compute_email_from', readonly=False, required=True, store=True,
- default=lambda self: self.env.user.email_formatted)
- favorite = fields.Boolean('Favorite', copy=False, tracking=True)
- favorite_date = fields.Datetime(
- 'Favorite Date',
- compute='_compute_favorite_date', store=True,
- copy=False,
- help='When this mailing was added in the favorites')
- sent_date = fields.Datetime(string='Sent Date', copy=False)
- schedule_type = fields.Selection(
- [('now', 'Send now'), ('scheduled', 'Send on')],
- string='Schedule', default='now',
- readonly=True, required=True,
- states={'draft': [('readonly', False)], 'in_queue': [('readonly', False)]})
- schedule_date = fields.Datetime(
- string='Scheduled for',
- compute='_compute_schedule_date', readonly=True, store=True,
- copy=True, tracking=True,
- states={'draft': [('readonly', False)], 'in_queue': [('readonly', False)]})
- calendar_date = fields.Datetime(
- 'Calendar Date',
- compute='_compute_calendar_date', store=True,
- copy=False,
- help="Date at which the mailing was or will be sent.")
- # don't translate 'body_arch', the translations are only on 'body_html'
- body_arch = fields.Html(string='Body', translate=False, sanitize=False)
- body_html = fields.Html(string='Body converted to be sent by mail', render_engine='qweb', sanitize=False)
- # used to determine if the mail body is empty
- is_body_empty = fields.Boolean(compute="_compute_is_body_empty")
- attachment_ids = fields.Many2many(
- 'ir.attachment', 'mass_mailing_ir_attachments_rel',
- 'mass_mailing_id', 'attachment_id', string='Attachments')
- keep_archives = fields.Boolean(string='Keep Archives')
- campaign_id = fields.Many2one('utm.campaign', string='UTM Campaign', index=True, ondelete='set null')
- medium_id = fields.Many2one(
- 'utm.medium', string='Medium',
- compute='_compute_medium_id', readonly=False, store=True,
- ondelete='restrict',
- help="UTM Medium: delivery method (email, sms, ...)")
- state = fields.Selection(
- [('draft', 'Draft'), ('in_queue', 'In Queue'),
- ('sending', 'Sending'), ('done', 'Sent')],
- string='Status',
- default='draft', required=True,
- copy=False, tracking=True,
- group_expand='_group_expand_states')
- color = fields.Integer(string='Color Index')
- user_id = fields.Many2one(
- 'res.users', string='Responsible',
- tracking=True,
- default=lambda self: self.env.user)
- # mailing options
- mailing_type = fields.Selection([('mail', 'Email')], string="Mailing Type", default="mail", required=True)
- mailing_type_description = fields.Char('Mailing Type Description', compute="_compute_mailing_type_description")
- reply_to_mode = fields.Selection(
- [('update', 'Recipient Followers'), ('new', 'Specified Email Address')],
- string='Reply-To Mode',
- compute='_compute_reply_to_mode', readonly=False, store=True,
- help='Thread: replies go to target document. Email: replies are routed to a given email.')
- reply_to = fields.Char(
- string='Reply To',
- compute='_compute_reply_to', readonly=False, store=True,
- help='Preferred Reply-To Address')
- # recipients
- mailing_model_real = fields.Char(
- string='Recipients Real Model', compute='_compute_mailing_model_real')
- mailing_model_id = fields.Many2one(
- 'ir.model', string='Recipients Model',
- ondelete='cascade', required=True,
- domain=[('is_mailing_enabled', '=', True)],
- default=lambda self: self.env.ref('mass_mailing.model_mailing_list').id)
- mailing_model_name = fields.Char(
- string='Recipients Model Name',
- related='mailing_model_id.model', readonly=True, related_sudo=True)
- mailing_domain = fields.Char(
- string='Domain',
- compute='_compute_mailing_domain', readonly=False, store=True)
- mail_server_available = fields.Boolean(
- compute='_compute_mail_server_available',
- help="Technical field used to know if the user has activated the outgoing mail server option in the settings")
- mail_server_id = fields.Many2one('ir.mail_server', string='Mail Server',
- default=_get_default_mail_server_id,
- help="Use a specific mail server in priority. Otherwise Odoo relies on the first outgoing mail server available (based on their sequencing) as it does for normal mails.")
- contact_list_ids = fields.Many2many('mailing.list', 'mail_mass_mailing_list_rel', string='Mailing Lists')
- # Mailing Filter
- mailing_filter_id = fields.Many2one(
- 'mailing.filter', string='Favorite Filter',
- compute='_compute_mailing_filter_id', readonly=False, store=True,
- domain="[('mailing_model_name', '=', mailing_model_name)]")
- mailing_filter_domain = fields.Char('Favorite filter domain', related='mailing_filter_id.mailing_domain')
- mailing_filter_count = fields.Integer('# Favorite Filters', compute='_compute_mailing_filter_count')
- # A/B Testing
- ab_testing_completed = fields.Boolean(related='campaign_id.ab_testing_completed', store=True)
- ab_testing_description = fields.Html('A/B Testing Description', compute="_compute_ab_testing_description")
- ab_testing_enabled = fields.Boolean(
- string='Allow A/B Testing', default=False,
- help='If checked, recipients will be mailed only once for the whole campaign. '
- 'This lets you send different mailings to randomly selected recipients and test '
- 'the effectiveness of the mailings, without causing duplicate messages.')
- ab_testing_mailings_count = fields.Integer(related="campaign_id.ab_testing_mailings_count")
- ab_testing_pc = fields.Integer(
- string='A/B Testing percentage',
- default=10,
- help='Percentage of the contacts that will be mailed. Recipients will be chosen randomly.')
- ab_testing_schedule_datetime = fields.Datetime(
- related="campaign_id.ab_testing_schedule_datetime", readonly=False,
- default=lambda self: fields.Datetime.now() + relativedelta(days=1))
- ab_testing_winner_selection = fields.Selection(
- related="campaign_id.ab_testing_winner_selection", readonly=False,
- default="opened_ratio",
- copy=True)
- kpi_mail_required = fields.Boolean('KPI mail required', copy=False)
- # statistics data
- mailing_trace_ids = fields.One2many('mailing.trace', 'mass_mailing_id', string='Emails Statistics')
- total = fields.Integer(compute="_compute_total")
- scheduled = fields.Integer(compute="_compute_statistics")
- expected = fields.Integer(compute="_compute_statistics")
- canceled = fields.Integer(compute="_compute_statistics")
- sent = fields.Integer(compute="_compute_statistics")
- delivered = fields.Integer(compute="_compute_statistics")
- opened = fields.Integer(compute="_compute_statistics")
- clicked = fields.Integer(compute="_compute_statistics")
- replied = fields.Integer(compute="_compute_statistics")
- bounced = fields.Integer(compute="_compute_statistics")
- failed = fields.Integer(compute="_compute_statistics")
- received_ratio = fields.Integer(compute="_compute_statistics", string='Received Ratio')
- opened_ratio = fields.Integer(compute="_compute_statistics", string='Opened Ratio')
- replied_ratio = fields.Integer(compute="_compute_statistics", string='Replied Ratio')
- bounced_ratio = fields.Integer(compute="_compute_statistics", string='Bounced Ratio')
- clicks_ratio = fields.Integer(compute="_compute_clicks_ratio", string="Number of Clicks")
- next_departure = fields.Datetime(compute="_compute_next_departure", string='Scheduled date')
- # UX
- warning_message = fields.Char(
- 'Warning Message', compute='_compute_warning_message',
- help='Warning message displayed in the mailing form view')
- _sql_constraints = [(
- 'percentage_valid',
- 'CHECK(ab_testing_pc >= 0 AND ab_testing_pc <= 100)',
- 'The A/B Testing Percentage needs to be between 0 and 100%'
- )]
- @api.constrains('mailing_model_id', 'mailing_filter_id')
- def _check_mailing_filter_model(self):
- """Check that if the favorite filter is set, it must contain the same recipient model as mailing"""
- for mailing in self:
- if mailing.mailing_filter_id and mailing.mailing_model_id != mailing.mailing_filter_id.mailing_model_id:
- raise ValidationError(
- _("The saved filter targets different recipients and is incompatible with this mailing.")
- )
- @api.depends('mail_server_id')
- def _compute_email_from(self):
- user_email = self.env.user.email_formatted
- notification_email = self.env['ir.mail_server']._get_default_from_address()
- for mailing in self:
- server = mailing.mail_server_id
- if not server:
- mailing.email_from = mailing.email_from or user_email
- elif mailing.email_from and server._match_from_filter(mailing.email_from, server.from_filter):
- mailing.email_from = mailing.email_from
- elif server._match_from_filter(user_email, server.from_filter):
- mailing.email_from = user_email
- elif server._match_from_filter(notification_email, server.from_filter):
- mailing.email_from = notification_email
- else:
- mailing.email_from = mailing.email_from or user_email
- @api.depends('favorite')
- def _compute_favorite_date(self):
- favorited = self.filtered('favorite')
- (self - favorited).favorite_date = False
- favorited.filtered(lambda mailing: not mailing.favorite_date).favorite_date = fields.Datetime.now()
- def _compute_total(self):
- for mass_mailing in self:
- total = self.env[mass_mailing.mailing_model_real].search_count(mass_mailing._parse_mailing_domain())
- if total and mass_mailing.ab_testing_enabled and mass_mailing.ab_testing_pc < 100:
- total = max(int(total / 100.0 * mass_mailing.ab_testing_pc), 1)
- mass_mailing.total = total
- def _compute_clicks_ratio(self):
- self.env.cr.execute("""
- SELECT COUNT(DISTINCT(stats.id)) AS nb_mails, COUNT(DISTINCT(clicks.mailing_trace_id)) AS nb_clicks, stats.mass_mailing_id AS id
- FROM mailing_trace AS stats
- LEFT OUTER JOIN link_tracker_click AS clicks ON clicks.mailing_trace_id = stats.id
- WHERE stats.mass_mailing_id IN %s
- GROUP BY stats.mass_mailing_id
- """, [tuple(self.ids) or (None,)])
- mass_mailing_data = self.env.cr.dictfetchall()
- mapped_data = dict([(m['id'], 100 * m['nb_clicks'] / m['nb_mails']) for m in mass_mailing_data])
- for mass_mailing in self:
- mass_mailing.clicks_ratio = mapped_data.get(mass_mailing.id, 0)
- def _compute_statistics(self):
- """ Compute statistics of the mass mailing """
- for key in (
- 'scheduled', 'expected', 'canceled', 'sent', 'delivered', 'opened',
- 'clicked', 'replied', 'bounced', 'failed', 'received_ratio',
- 'opened_ratio', 'replied_ratio', 'bounced_ratio',
- ):
- self[key] = False
- if not self.ids:
- return
- # ensure traces are sent to db
- self.env['mailing.trace'].flush_model()
- self.env['mailing.mailing'].flush_model()
- self.env.cr.execute("""
- SELECT
- m.id as mailing_id,
- COUNT(s.id) AS expected,
- COUNT(s.sent_datetime) AS sent,
- COUNT(s.trace_status) FILTER (WHERE s.trace_status = 'outgoing') AS scheduled,
- COUNT(s.trace_status) FILTER (WHERE s.trace_status = 'cancel') AS canceled,
- COUNT(s.trace_status) FILTER (WHERE s.trace_status in ('sent', 'open', 'reply')) AS delivered,
- COUNT(s.trace_status) FILTER (WHERE s.trace_status in ('open', 'reply')) AS opened,
- COUNT(s.links_click_datetime) AS clicked,
- COUNT(s.trace_status) FILTER (WHERE s.trace_status = 'reply') AS replied,
- COUNT(s.trace_status) FILTER (WHERE s.trace_status = 'bounce') AS bounced,
- COUNT(s.trace_status) FILTER (WHERE s.trace_status = 'error') AS failed
- FROM
- mailing_trace s
- RIGHT JOIN
- mailing_mailing m
- ON (m.id = s.mass_mailing_id)
- WHERE
- m.id IN %s
- GROUP BY
- m.id
- """, (tuple(self.ids), ))
- for row in self.env.cr.dictfetchall():
- total = (row['expected'] - row['canceled']) or 1
- row['received_ratio'] = 100.0 * row['delivered'] / total
- row['opened_ratio'] = 100.0 * row['opened'] / total
- row['replied_ratio'] = 100.0 * row['replied'] / total
- row['bounced_ratio'] = 100.0 * row['bounced'] / total
- self.browse(row.pop('mailing_id')).update(row)
- def _compute_next_departure(self):
- # Schedule_date should only be False if schedule_type = "now" or
- # mass_mailing is canceled.
- # A cron.trigger is created when mailing is put "in queue"
- # so we can reasonably expect that the cron worker will
- # execute this based on the cron.trigger's call_at which should
- # be now() when clicking "Send" or schedule_date if scheduled
- for mass_mailing in self:
- if mass_mailing.schedule_date:
- # max in case the user schedules a date in the past
- mass_mailing.next_departure = max(mass_mailing.schedule_date, fields.datetime.now())
- else:
- mass_mailing.next_departure = fields.datetime.now()
- @api.depends('email_from', 'mail_server_id')
- def _compute_warning_message(self):
- for mailing in self:
- mail_server = mailing.mail_server_id
- if mail_server and not mail_server._match_from_filter(mailing.email_from, mail_server.from_filter):
- mailing.warning_message = _(
- 'This email from can not be used with this mail server.\n'
- 'Your emails might be marked as spam on the mail clients.'
- )
- else:
- mailing.warning_message = False
- @api.depends('mailing_type')
- def _compute_medium_id(self):
- for mailing in self:
- if mailing.mailing_type == 'mail' and not mailing.medium_id:
- mailing.medium_id = self.env.ref('utm.utm_medium_email').id
- @api.depends('mailing_model_id')
- def _compute_reply_to_mode(self):
- """ For main models not really using chatter to gather answers (contacts
- and mailing contacts), set reply-to as email-based. Otherwise answers
- by default go on the original discussion thread (business document). Note
- that mailing_model being mailing.list means contacting mailing.contact
- (see mailing_model_name versus mailing_model_real). """
- for mailing in self:
- if mailing.mailing_model_id.model in ['res.partner', 'mailing.list', 'mailing.contact']:
- mailing.reply_to_mode = 'new'
- else:
- mailing.reply_to_mode = 'update'
- @api.depends('reply_to_mode')
- def _compute_reply_to(self):
- for mailing in self:
- if mailing.reply_to_mode == 'new' and not mailing.reply_to:
- mailing.reply_to = self.env.user.email_formatted
- elif mailing.reply_to_mode == 'update':
- mailing.reply_to = False
- @api.depends('mailing_model_id', 'mailing_domain')
- def _compute_mailing_filter_count(self):
- filter_data = self.env['mailing.filter']._read_group([
- ('mailing_model_id', 'in', self.mailing_model_id.ids)
- ], ['mailing_model_id'], ['mailing_model_id'])
- mapped_data = {data['mailing_model_id'][0]: data['mailing_model_id_count'] for data in filter_data}
- for mailing in self:
- mailing.mailing_filter_count = mapped_data.get(mailing.mailing_model_id.id, 0)
- @api.depends('mailing_model_id')
- def _compute_mailing_model_real(self):
- for mailing in self:
- mailing.mailing_model_real = 'mailing.contact' if mailing.mailing_model_id.model == 'mailing.list' else mailing.mailing_model_id.model
- @api.depends('mailing_model_id', 'contact_list_ids', 'mailing_type', 'mailing_filter_id')
- def _compute_mailing_domain(self):
- for mailing in self:
- if not mailing.mailing_model_id:
- mailing.mailing_domain = ''
- elif mailing.mailing_filter_id:
- mailing.mailing_domain = mailing.mailing_filter_id.mailing_domain
- else:
- mailing.mailing_domain = repr(mailing._get_default_mailing_domain())
- @api.depends('mailing_model_name')
- def _compute_mailing_filter_id(self):
- for mailing in self:
- mailing.mailing_filter_id = False
- @api.depends('schedule_type')
- def _compute_schedule_date(self):
- for mailing in self:
- if mailing.schedule_type == 'now' or not mailing.schedule_date:
- mailing.schedule_date = False
- @api.depends('state', 'schedule_date', 'sent_date', 'next_departure')
- def _compute_calendar_date(self):
- for mailing in self:
- if mailing.state == 'done':
- mailing.calendar_date = mailing.sent_date
- elif mailing.state == 'in_queue':
- mailing.calendar_date = mailing.next_departure
- elif mailing.state == 'sending':
- mailing.calendar_date = fields.Datetime.now()
- else:
- mailing.calendar_date = False
- @api.depends('body_arch')
- def _compute_is_body_empty(self):
- for mailing in self:
- mailing.is_body_empty = tools.is_html_empty(mailing.body_arch)
- def _compute_mail_server_available(self):
- self.mail_server_available = self.env['ir.config_parameter'].sudo().get_param('mass_mailing.outgoing_mail_server')
- # Overrides of mail.render.mixin
- @api.depends('mailing_model_real')
- def _compute_render_model(self):
- for mailing in self:
- mailing.render_model = mailing.mailing_model_real
- @api.depends('mailing_type')
- def _compute_mailing_type_description(self):
- for mailing in self:
- mailing.mailing_type_description = dict(self._fields.get('mailing_type').selection).get(mailing.mailing_type)
- @api.depends(lambda self: self._get_ab_testing_description_modifying_fields())
- def _compute_ab_testing_description(self):
- mailing_ab_test = self.filtered('ab_testing_enabled')
- (self - mailing_ab_test).ab_testing_description = False
- for mailing in mailing_ab_test:
- mailing.ab_testing_description = self.env['ir.qweb']._render(
- 'mass_mailing.ab_testing_description',
- mailing._get_ab_testing_description_values()
- )
- def _get_ab_testing_description_modifying_fields(self):
- return ['ab_testing_enabled', 'ab_testing_pc', 'ab_testing_schedule_datetime', 'ab_testing_winner_selection', 'campaign_id']
- # ------------------------------------------------------
- # ORM
- # ------------------------------------------------------
- @api.model_create_multi
- def create(self, vals_list):
- ab_testing_cron = self.env.ref('mass_mailing.ir_cron_mass_mailing_ab_testing').sudo()
- for values in vals_list:
- if values.get('body_html'):
- values['body_html'] = self._convert_inline_images_to_urls(values['body_html'])
- if values.get('ab_testing_schedule_datetime'):
- at = fields.Datetime.from_string(values['ab_testing_schedule_datetime'])
- ab_testing_cron._trigger(at=at)
- mailings = super().create(vals_list)
- mailings._create_ab_testing_utm_campaigns()
- mailings._fix_attachment_ownership()
- return mailings
- def write(self, values):
- if values.get('body_html'):
- values['body_html'] = self._convert_inline_images_to_urls(values['body_html'])
- # If ab_testing is already enabled on a mailing and the campaign is removed, we raise a ValidationError
- if values.get('campaign_id') is False and any(mailing.ab_testing_enabled for mailing in self) and 'ab_testing_enabled' not in values:
- raise ValidationError(_("A campaign should be set when A/B test is enabled"))
- result = super(MassMailing, self).write(values)
- if values.get('ab_testing_enabled'):
- self._create_ab_testing_utm_campaigns()
- self._fix_attachment_ownership()
- if any(self.mapped('ab_testing_schedule_datetime')):
- schedule_date = min(m.ab_testing_schedule_datetime for m in self if m.ab_testing_schedule_datetime)
- ab_testing_cron = self.env.ref('mass_mailing.ir_cron_mass_mailing_ab_testing').sudo()
- ab_testing_cron._trigger(at=schedule_date)
- return result
- def _create_ab_testing_utm_campaigns(self):
- """ Creates the A/B test campaigns for the mailings that do not have campaign set already """
- campaign_vals = [
- mailing._get_default_ab_testing_campaign_values()
- for mailing in self.filtered(lambda mailing: mailing.ab_testing_enabled and not mailing.campaign_id)
- ]
- return self.env['utm.campaign'].create(campaign_vals)
- def _fix_attachment_ownership(self):
- for record in self:
- record.attachment_ids.write({'res_model': record._name, 'res_id': record.id})
- return self
- @api.returns('self', lambda value: value.id)
- def copy(self, default=None):
- self.ensure_one()
- default = dict(default or {}, contact_list_ids=self.contact_list_ids.ids)
- if self.mail_server_id and not self.mail_server_id.active:
- default['mail_server_id'] = self._get_default_mail_server_id()
- return super(MassMailing, self).copy(default=default)
- def _group_expand_states(self, states, domain, order):
- return [key for key, val in type(self).state.selection]
- # ------------------------------------------------------
- # ACTIONS
- # ------------------------------------------------------
- def action_set_favorite(self):
- """Add the current mailing in the favorites list."""
- self.favorite = True
- return {
- 'type': 'ir.actions.client',
- 'tag': 'display_notification',
- 'params': {
- 'message': _(
- 'Design added to the %s Templates!',
- ', '.join(self.mapped('mailing_model_id.name')),
- ),
- 'next': {'type': 'ir.actions.act_window_close'},
- 'sticky': False,
- 'type': 'info',
- }
- }
- def action_remove_favorite(self):
- """Remove the current mailing from the favorites list."""
- self.favorite = False
- return {
- 'type': 'ir.actions.client',
- 'tag': 'display_notification',
- 'params': {
- 'message': _(
- 'Design removed from the %s Templates!',
- ', '.join(self.mapped('mailing_model_id.name')),
- ),
- 'next': {'type': 'ir.actions.act_window_close'},
- 'sticky': False,
- 'type': 'info',
- }
- }
- def action_duplicate(self):
- self.ensure_one()
- mass_mailing_copy = self.copy()
- if mass_mailing_copy:
- context = dict(self.env.context)
- context['form_view_initial_mode'] = 'edit'
- action = {
- 'type': 'ir.actions.act_window',
- 'view_mode': 'form',
- 'res_model': 'mailing.mailing',
- 'res_id': mass_mailing_copy.id,
- 'context': context,
- }
- if self.mailing_type == 'mail':
- action['views'] = [
- (self.env.ref('mass_mailing.mailing_mailing_view_form_full_width').id, 'form'),
- ]
- return action
- return False
- def action_test(self):
- self.ensure_one()
- ctx = dict(self.env.context, default_mass_mailing_id=self.id, dialog_size='medium')
- return {
- 'name': _('Test Mailing'),
- 'type': 'ir.actions.act_window',
- 'view_mode': 'form',
- 'res_model': 'mailing.mailing.test',
- 'target': 'new',
- 'context': ctx,
- }
- def action_launch(self):
- self.write({'schedule_type': 'now'})
- return self.action_put_in_queue()
- def action_schedule(self):
- self.ensure_one()
- if self.schedule_date and self.schedule_date > fields.Datetime.now():
- return self.action_put_in_queue()
- action = self.env["ir.actions.actions"]._for_xml_id("mass_mailing.mailing_mailing_schedule_date_action")
- action['context'] = dict(self.env.context, default_mass_mailing_id=self.id, dialog_size='medium')
- return action
- def action_put_in_queue(self):
- self.write({'state': 'in_queue'})
- cron = self.env.ref('mass_mailing.ir_cron_mass_mailing_queue')
- cron._trigger(
- schedule_date or fields.Datetime.now()
- for schedule_date in self.mapped('schedule_date')
- )
- def action_cancel(self):
- self.write({'state': 'draft', 'schedule_date': False, 'schedule_type': 'now', 'next_departure': False})
- def action_retry_failed(self):
- failed_mails = self.env['mail.mail'].sudo().search([
- ('mailing_id', 'in', self.ids),
- ('state', '=', 'exception')
- ])
- failed_mails.mapped('mailing_trace_ids').unlink()
- failed_mails.unlink()
- self.action_put_in_queue()
- def action_view_traces_scheduled(self):
- return self._action_view_traces_filtered('scheduled')
- def action_view_traces_canceled(self):
- return self._action_view_traces_filtered('canceled')
- def action_view_traces_failed(self):
- return self._action_view_traces_filtered('failed')
- def action_view_traces_sent(self):
- return self._action_view_traces_filtered('sent')
- def _action_view_traces_filtered(self, view_filter):
- action = self.env["ir.actions.actions"]._for_xml_id("mass_mailing.mailing_trace_action")
- action['name'] = _('Sent Mailings')
- action['context'] = {'search_default_mass_mailing_id': self.id,}
- filter_key = 'search_default_filter_%s' % (view_filter)
- action['context'][filter_key] = True
- action['views'] = [
- (self.env.ref('mass_mailing.mailing_trace_view_tree_mail').id, 'tree'),
- (self.env.ref('mass_mailing.mailing_trace_view_form').id, 'form')
- ]
- return action
- def action_view_clicked(self):
- model_name = self.env['ir.model']._get('link.tracker').display_name
- recipient = self.env['ir.model']._get(self.mailing_model_real).display_name
- helper_header = _("No %s clicked your mailing yet!", recipient)
- helper_message = _("Link Trackers will measure how many times each link is clicked as well as "
- "the proportion of %s who clicked at least once in your mailing.", recipient)
- return {
- 'name': model_name,
- 'type': 'ir.actions.act_window',
- 'view_mode': 'tree',
- 'res_model': 'link.tracker',
- 'domain': [('mass_mailing_id', '=', self.id)],
- 'help': Markup('<p class="o_view_nocontent_smiling_face">%s</p><p>%s</p>') % (
- helper_header, helper_message,
- ),
- 'context': dict(self._context, create=False)
- }
- def action_view_opened(self):
- return self._action_view_documents_filtered('open')
- def action_view_replied(self):
- return self._action_view_documents_filtered('reply')
- def action_view_bounced(self):
- return self._action_view_documents_filtered('bounce')
- def action_view_delivered(self):
- return self._action_view_documents_filtered('delivered')
- def _action_view_documents_filtered(self, view_filter):
- model_name = self.env['ir.model']._get(self.mailing_model_real).display_name
- helper_header = None
- helper_message = None
- if view_filter == 'reply':
- found_traces = self.mailing_trace_ids.filtered(lambda trace: trace.trace_status == view_filter)
- helper_header = _("No %s replied to your mailing yet!", model_name)
- helper_message = _("To track how many replies this mailing gets, make sure "
- "its reply-to address belongs to this database.")
- elif view_filter == 'bounce':
- found_traces = self.mailing_trace_ids.filtered(lambda trace: trace.trace_status == view_filter)
- helper_header = _("No %s address bounced yet!", model_name)
- helper_message = _("Bounce happens when a mailing cannot be delivered (fake address, "
- "server issues, ...). Check each record to see what went wrong.")
- elif view_filter == 'open':
- found_traces = self.mailing_trace_ids.filtered(lambda trace: trace.trace_status in ('open', 'reply'))
- helper_header = _("No %s opened your mailing yet!", model_name)
- helper_message = _("Come back once your mailing has been sent to track who opened your mailing.")
- elif view_filter == 'delivered':
- found_traces = self.mailing_trace_ids.filtered(lambda trace: trace.trace_status in ('sent', 'open', 'reply'))
- helper_header = _("No %s received your mailing yet!", model_name)
- helper_message = _("Wait until your mailing has been sent to check how many recipients you managed to reach.")
- elif view_filter == 'sent':
- found_traces = self.mailing_trace_ids.filtered(lambda trace: trace.sent_datetime)
- else:
- found_traces = self.env['mailing.trace']
- res_ids = found_traces.mapped('res_id')
- action = {
- 'name': model_name,
- 'type': 'ir.actions.act_window',
- 'view_mode': 'tree',
- 'res_model': self.mailing_model_real,
- 'domain': [('id', 'in', res_ids)],
- 'context': dict(self._context, create=False),
- }
- if helper_header and helper_message:
- action['help'] = Markup('<p class="o_view_nocontent_smiling_face">%s</p><p>%s</p>') % (
- helper_header, helper_message,
- ),
- return action
- def action_view_mailing_contacts(self):
- """Show the mailing contacts who are in a mailing list selected for this mailing."""
- self.ensure_one()
- action = self.env['ir.actions.actions']._for_xml_id('mass_mailing.action_view_mass_mailing_contacts')
- if self.contact_list_ids:
- action['context'] = {
- 'default_mailing_list_ids': self.contact_list_ids[0].ids,
- 'default_subscription_list_ids': [(0, 0, {'list_id': self.contact_list_ids[0].id})],
- }
- action['domain'] = [('list_ids', 'in', self.contact_list_ids.ids)]
- return action
- @api.model
- def action_fetch_favorites(self, extra_domain=None):
- """Return all mailings set as favorite and skip mailings with empty body.
- Return archived mailing templates as well, so the user can archive the templates
- while keeping using it, without cluttering the Kanban view if they're a lot of
- templates.
- """
- domain = [('favorite', '=', True)]
- if extra_domain:
- domain = expression.AND([domain, extra_domain])
- values_list = self.with_context(active_test=False).search_read(
- domain=domain,
- fields=['id', 'subject', 'body_arch', 'user_id', 'mailing_model_id'],
- order='favorite_date DESC',
- )
- values_list = [
- values for values in values_list
- if not tools.is_html_empty(values['body_arch'])
- ]
- # You see first the mailings without responsible, then your mailings and then the others
- values_list.sort(
- key=lambda values:
- values['user_id'][0] != self.env.user.id if values['user_id'] else -1
- )
- return values_list
- def update_opt_out(self, email, list_ids, value):
- if len(list_ids) > 0:
- model = self.env['mailing.contact'].with_context(active_test=False)
- records = model.search([('email_normalized', '=', tools.email_normalize(email))])
- opt_out_records = self.env['mailing.contact.subscription'].search([
- ('contact_id', 'in', records.ids),
- ('list_id', 'in', list_ids),
- ('opt_out', '!=', value)
- ])
- opt_out_records.write({'opt_out': value})
- message = _('The recipient <strong>unsubscribed from %s</strong> mailing list(s)') \
- if value else _('The recipient <strong>subscribed to %s</strong> mailing list(s)')
- for record in records:
- # filter the list_id by record
- record_lists = opt_out_records.filtered(lambda rec: rec.contact_id.id == record.id)
- if len(record_lists) > 0:
- record.sudo().message_post(body=message % ', '.join(str(list.name) for list in record_lists.mapped('list_id')))
- # ------------------------------------------------------
- # A/B Test
- # ------------------------------------------------------
- def action_compare_versions(self):
- self.ensure_one()
- if not self.campaign_id:
- raise ValueError(_("No mailing campaign has been found"))
- action = {
- 'name': _('A/B Tests'),
- 'type': 'ir.actions.act_window',
- 'view_mode': 'tree,kanban,form,calendar,graph',
- 'res_model': 'mailing.mailing',
- 'domain': [('campaign_id', '=', self.campaign_id.id), ('ab_testing_enabled', '=', True), ('mailing_type', '=', self.mailing_type)],
- }
- if self.mailing_type == 'mail':
- action['views'] = [
- (False, 'tree'),
- (False, 'kanban'),
- (self.env.ref('mass_mailing.mailing_mailing_view_form_full_width').id, 'form'),
- (False, 'calendar'),
- (False, 'graph'),
- ]
- return action
- def action_send_winner_mailing(self):
- """Send the winner mailing based on the winner selection field.
- This action is used in 2 cases:
- - When the user clicks on a button to send the winner mailing. There is only one mailing in self
- - When the cron is executed to send winner mailing based on the A/B testing schedule datetime. In this
- case 'self' contains all the mailing for the campaigns so we just need to take the first to determine the
- winner.
- If the winner mailing is computed automatically, we sudo the mailings of the campaign in order to sort correctly
- the mailings based on the selection that can be used with sub-modules like CRM and Sales
- """
- if len(self.campaign_id) != 1:
- raise ValueError(_("To send the winner mailing the same campaign should be used by the mailings"))
- if any(mailing.ab_testing_completed for mailing in self):
- raise ValueError(_("To send the winner mailing the campaign should not have been completed."))
- final_mailing = self[0]
- sorted_by = final_mailing._get_ab_testing_winner_selection()['value']
- if sorted_by != 'manual':
- ab_testing_mailings = final_mailing._get_ab_testing_siblings_mailings().sudo()
- selected_mailings = ab_testing_mailings.filtered(lambda m: m.state == 'done').sorted(sorted_by, reverse=True)
- if selected_mailings:
- final_mailing = selected_mailings[0]
- else:
- raise ValidationError(_("No mailing for this A/B testing campaign has been sent yet! Send one first and try again later."))
- return final_mailing.action_select_as_winner()
- def action_select_as_winner(self):
- self.ensure_one()
- if not self.ab_testing_enabled:
- raise ValueError(_("A/B test option has not been enabled"))
- self.campaign_id.write({
- 'ab_testing_completed': True,
- })
- final_mailing = self.copy({
- 'ab_testing_pc': 100,
- })
- final_mailing.action_launch()
- action = self.env['ir.actions.act_window']._for_xml_id('mass_mailing.action_ab_testing_open_winner_mailing')
- action['res_id'] = final_mailing.id
- if self.mailing_type == 'mail':
- action['views'] = [
- (self.env.ref('mass_mailing.mailing_mailing_view_form_full_width').id, 'form'),
- ]
- return action
- def _get_ab_testing_description_values(self):
- self.ensure_one()
- other_ab_testing_mailings = self._get_ab_testing_siblings_mailings().filtered(lambda m: m.id != self.id)
- other_ab_testing_pc = sum([mailing.ab_testing_pc for mailing in other_ab_testing_mailings])
- return {
- 'mailing': self,
- 'ab_testing_winner_selection_description': self._get_ab_testing_winner_selection()['description'],
- 'other_ab_testing_pc': other_ab_testing_pc,
- 'remaining_ab_testing_pc': 100 - (other_ab_testing_pc + self.ab_testing_pc),
- }
- def _get_ab_testing_siblings_mailings(self):
- return self.campaign_id.mailing_mail_ids.filtered(lambda m: m.ab_testing_enabled)
- def _get_ab_testing_winner_selection(self):
- ab_testing_winner_selection_description = dict(
- self._fields.get('ab_testing_winner_selection').related_field.selection
- ).get(self.ab_testing_winner_selection)
- return {
- 'value': self.ab_testing_winner_selection,
- 'description': ab_testing_winner_selection_description,
- }
- def _get_default_ab_testing_campaign_values(self, values=None):
- values = values or dict()
- return {
- 'ab_testing_schedule_datetime': values.get('ab_testing_schedule_datetime') or self.ab_testing_schedule_datetime,
- 'ab_testing_winner_selection': values.get('ab_testing_winner_selection') or self.ab_testing_winner_selection,
- 'mailing_mail_ids': self.ids,
- 'name': _('A/B Test: %s', values.get('subject') or self.subject or fields.Datetime.now()),
- 'user_id': values.get('user_id') or self.user_id.id or self.env.user.id,
- }
- # ------------------------------------------------------
- # Email Sending
- # ------------------------------------------------------
- def _get_opt_out_list(self):
- """ Give list of opt-outed emails, depending on specific model-based
- computation if available.
- :return list: opt-outed emails, preferably normalized (aka not records)
- """
- self.ensure_one()
- opt_out = {}
- target = self.env[self.mailing_model_real]
- if hasattr(self.env[self.mailing_model_name], '_mailing_get_opt_out_list'):
- opt_out = self.env[self.mailing_model_name]._mailing_get_opt_out_list(self)
- _logger.info(
- "Mass-mailing %s targets %s, blacklist: %s emails",
- self, target._name, len(opt_out))
- else:
- _logger.info("Mass-mailing %s targets %s, no opt out list available", self, target._name)
- return opt_out
- def _get_link_tracker_values(self):
- self.ensure_one()
- vals = {'mass_mailing_id': self.id}
- if self.campaign_id:
- vals['campaign_id'] = self.campaign_id.id
- if self.source_id:
- vals['source_id'] = self.source_id.id
- if self.medium_id:
- vals['medium_id'] = self.medium_id.id
- return vals
- def _get_seen_list(self):
- """Returns a set of emails already targeted by current mailing/campaign (no duplicates)"""
- self.ensure_one()
- target = self.env[self.mailing_model_real]
- query = """
- SELECT s.email
- FROM mailing_trace s
- JOIN %(target)s t ON (s.res_id = t.id)
- %(join_domain)s
- WHERE s.email IS NOT NULL
- %(where_domain)s
- """
- if self.ab_testing_enabled:
- query += """
- AND s.campaign_id = %%(mailing_campaign_id)s;
- """
- else:
- query += """
- AND s.mass_mailing_id = %%(mailing_id)s
- AND s.model = %%(target_model)s;
- """
- join_domain, where_domain = self._get_seen_list_extra()
- query = query % {'target': target._table, 'join_domain': join_domain, 'where_domain': where_domain}
- params = {'mailing_id': self.id, 'mailing_campaign_id': self.campaign_id.id, 'target_model': self.mailing_model_real}
- self._cr.execute(query, params)
- seen_list = set(m[0] for m in self._cr.fetchall())
- _logger.info(
- "Mass-mailing %s has already reached %s %s emails", self, len(seen_list), target._name)
- return seen_list
- def _get_seen_list_extra(self):
- return ('', '')
- def _get_mass_mailing_context(self):
- """Returns extra context items with pre-filled blacklist and seen list for massmailing"""
- return {
- 'post_convert_links': self._get_link_tracker_values(),
- }
- def _get_recipients(self):
- mailing_domain = self._parse_mailing_domain()
- res_ids = self.env[self.mailing_model_real].search(mailing_domain).ids
- # randomly choose a fragment
- if self.ab_testing_enabled and self.ab_testing_pc < 100:
- contact_nbr = self.env[self.mailing_model_real].search_count(mailing_domain)
- topick = 0
- if contact_nbr:
- topick = max(int(contact_nbr / 100.0 * self.ab_testing_pc), 1)
- if self.campaign_id and self.ab_testing_enabled:
- already_mailed = self.campaign_id._get_mailing_recipients()[self.campaign_id.id]
- else:
- already_mailed = set([])
- remaining = set(res_ids).difference(already_mailed)
- if topick > len(remaining) or (len(remaining) > 0 and topick == 0):
- topick = len(remaining)
- res_ids = random.sample(sorted(remaining), topick)
- return res_ids
- def _get_remaining_recipients(self):
- res_ids = self._get_recipients()
- trace_domain = [('model', '=', self.mailing_model_real)]
- if self.ab_testing_enabled and self.ab_testing_pc == 100:
- trace_domain = expression.AND([trace_domain, [('mass_mailing_id', 'in', self._get_ab_testing_siblings_mailings().ids)]])
- else:
- trace_domain = expression.AND([trace_domain, [
- ('res_id', 'in', res_ids),
- ('mass_mailing_id', '=', self.id),
- ]])
- already_mailed = self.env['mailing.trace'].search_read(trace_domain, ['res_id'])
- done_res_ids = {record['res_id'] for record in already_mailed}
- return [rid for rid in res_ids if rid not in done_res_ids]
- def _get_unsubscribe_url(self, email_to, res_id):
- url = werkzeug.urls.url_join(
- self.get_base_url(), 'mail/mailing/%(mailing_id)s/unsubscribe?%(params)s' % {
- 'mailing_id': self.id,
- 'params': werkzeug.urls.url_encode({
- 'res_id': res_id,
- 'email': email_to,
- 'token': self._unsubscribe_token(res_id, email_to),
- }),
- }
- )
- return url
- def _get_view_url(self, email_to, res_id):
- url = werkzeug.urls.url_join(
- self.get_base_url(), 'mailing/%(mailing_id)s/view?%(params)s' % {
- 'mailing_id': self.id,
- 'params': werkzeug.urls.url_encode({
- 'res_id': res_id,
- 'email': email_to,
- 'token': self._unsubscribe_token(res_id, email_to),
- }),
- }
- )
- return url
- def action_send_mail(self, res_ids=None):
- author_id = self.env.user.partner_id.id
- # If no recipient is passed, we don't want to use the recipients of the first
- # mailing for all the others
- initial_res_ids = res_ids
- for mailing in self:
- if not initial_res_ids:
- res_ids = mailing._get_remaining_recipients()
- if not res_ids:
- raise UserError(_('There are no recipients selected.'))
- composer_values = {
- 'author_id': author_id,
- 'attachment_ids': [(4, attachment.id) for attachment in mailing.attachment_ids],
- 'body': mailing._prepend_preview(mailing.body_html, mailing.preview),
- 'subject': mailing.subject,
- 'model': mailing.mailing_model_real,
- 'email_from': mailing.email_from,
- 'record_name': False,
- 'composition_mode': 'mass_mail',
- 'mass_mailing_id': mailing.id,
- 'mailing_list_ids': [(4, l.id) for l in mailing.contact_list_ids],
- 'reply_to_force_new': mailing.reply_to_mode == 'new',
- 'template_id': None,
- 'mail_server_id': mailing.mail_server_id.id,
- }
- if mailing.reply_to_mode == 'new':
- composer_values['reply_to'] = mailing.reply_to
- composer = self.env['mail.compose.message'].with_context(active_ids=res_ids).create(composer_values)
- extra_context = mailing._get_mass_mailing_context()
- composer = composer.with_context(active_ids=res_ids, **extra_context)
- # auto-commit except in testing mode
- auto_commit = not getattr(threading.current_thread(), 'testing', False)
- composer._action_send_mail(auto_commit=auto_commit)
- mailing.write({
- 'state': 'done',
- 'sent_date': fields.Datetime.now(),
- # send the KPI mail only if it's the first sending
- 'kpi_mail_required': not mailing.sent_date,
- })
- return True
- def convert_links(self):
- res = {}
- for mass_mailing in self:
- html = mass_mailing.body_html if mass_mailing.body_html else ''
- vals = {'mass_mailing_id': mass_mailing.id}
- if mass_mailing.campaign_id:
- vals['campaign_id'] = mass_mailing.campaign_id.id
- if mass_mailing.source_id:
- vals['source_id'] = mass_mailing.source_id.id
- if mass_mailing.medium_id:
- vals['medium_id'] = mass_mailing.medium_id.id
- res[mass_mailing.id] = mass_mailing._shorten_links(html, vals, blacklist=['/unsubscribe_from_list', '/view'])
- return res
- @api.model
- def _process_mass_mailing_queue(self):
- mass_mailings = self.search([('state', 'in', ('in_queue', 'sending')), '|', ('schedule_date', '<', fields.Datetime.now()), ('schedule_date', '=', False)])
- for mass_mailing in mass_mailings:
- user = mass_mailing.write_uid or self.env.user
- mass_mailing = mass_mailing.with_context(**user.with_user(user).context_get())
- if len(mass_mailing._get_remaining_recipients()) > 0:
- mass_mailing.state = 'sending'
- mass_mailing.action_send_mail()
- else:
- mass_mailing.write({
- 'state': 'done',
- 'sent_date': fields.Datetime.now(),
- # send the KPI mail only if it's the first sending
- 'kpi_mail_required': not mass_mailing.sent_date,
- })
- if self.env['ir.config_parameter'].sudo().get_param('mass_mailing.mass_mailing_reports'):
- mailings = self.env['mailing.mailing'].search([
- ('kpi_mail_required', '=', True),
- ('state', '=', 'done'),
- ('sent_date', '<=', fields.Datetime.now() - relativedelta(days=1)),
- ('sent_date', '>=', fields.Datetime.now() - relativedelta(days=5)),
- ])
- if mailings:
- mailings._action_send_statistics()
- # ------------------------------------------------------
- # STATISTICS
- # ------------------------------------------------------
- def _action_send_statistics(self):
- """Send an email to the responsible of each finished mailing with the statistics."""
- self.kpi_mail_required = False
- mails_sudo = self.env['mail.mail'].sudo()
- for mailing in self:
- if mailing.user_id:
- mailing = mailing.with_user(mailing.user_id).with_context(
- lang=mailing.user_id.lang or self._context.get('lang')
- )
- mailing_type = mailing._get_pretty_mailing_type()
- mail_user = mailing.user_id or self.env.user
- mail_company = mail_user.company_id
- link_trackers = self.env['link.tracker'].search(
- [('mass_mailing_id', '=', mailing.id)]
- ).sorted('count', reverse=True)
- link_trackers_body = self.env['ir.qweb']._render(
- 'mass_mailing.mass_mailing_kpi_link_trackers',
- {
- 'object': mailing,
- 'link_trackers': link_trackers,
- 'mailing_type': mailing_type,
- },
- )
- rendering_data = {
- 'body': tools.html_sanitize(link_trackers_body),
- 'company': mail_company,
- 'user': mail_user,
- 'display_mobile_banner': True,
- ** mailing._prepare_statistics_email_values(),
- }
- if mail_user.has_group('mass_mailing.group_mass_mailing_user'):
- rendering_data['mailing_report_token'] = self._get_unsubscribe_token(mail_user.id)
- rendering_data['user_id'] = mail_user.id
- rendered_body = self.env['ir.qweb']._render(
- 'digest.digest_mail_main',
- rendering_data
- )
- full_mail = self.env['mail.render.mixin']._render_encapsulate(
- 'digest.digest_mail_layout',
- rendered_body,
- )
- mail_values = {
- 'auto_delete': True,
- 'author_id': mail_user.partner_id.id,
- 'email_from': mail_user.email_formatted,
- 'email_to': mail_user.email_formatted,
- 'body_html': full_mail,
- 'reply_to': mail_company.email_formatted or mail_user.email_formatted,
- 'state': 'outgoing',
- 'subject': _('24H Stats of %(mailing_type)s "%(mailing_name)s"',
- mailing_type=mailing._get_pretty_mailing_type(),
- mailing_name=mailing.subject
- ),
- }
- mails_sudo += self.env['mail.mail'].sudo().create(mail_values)
- return mails_sudo
- def _prepare_statistics_email_values(self):
- """Return some statistics that will be displayed in the mailing statistics email.
- Each item in the returned list will be displayed as a table, with a title and
- 1, 2 or 3 columns.
- """
- self.ensure_one()
- mailing_type = self._get_pretty_mailing_type()
- kpi = {}
- if self.mailing_type == 'mail':
- kpi = {
- 'kpi_fullname': _('Engagement on %(expected)i %(mailing_type)s Sent',
- expected=self.expected,
- mailing_type=mailing_type
- ),
- 'kpi_col1': {
- 'value': f'{self.received_ratio}%',
- 'col_subtitle': _('RECEIVED (%i)', self.delivered),
- },
- 'kpi_col2': {
- 'value': f'{self.opened_ratio}%',
- 'col_subtitle': _('OPENED (%i)', self.opened),
- },
- 'kpi_col3': {
- 'value': f'{self.replied_ratio}%',
- 'col_subtitle': _('REPLIED (%i)', self.replied),
- },
- 'kpi_action': None,
- 'kpi_name': self.mailing_type,
- }
- random_tip = self.env['digest.tip'].search(
- [('group_id.category_id', '=', self.env.ref('base.module_category_marketing_email_marketing').id)]
- )
- if random_tip:
- random_tip = random.choice(random_tip).tip_description
- formatted_date = tools.format_datetime(
- self.env, self.sent_date, self.user_id.tz, 'MMM dd, YYYY', self.user_id.lang
- ) if self.sent_date else False
- web_base_url = self.get_base_url()
- return {
- 'title': _('24H Stats of %(mailing_type)s "%(mailing_name)s"',
- mailing_type=mailing_type,
- mailing_name=self.subject
- ),
- 'top_button_label': _('More Info'),
- 'top_button_url': url_join(web_base_url, f'/web#id={self.id}&model=mailing.mailing&view_type=form'),
- 'kpi_data': [
- kpi,
- {
- 'kpi_fullname': _('Business Benefits on %(expected)i %(mailing_type)s Sent',
- expected=self.expected,
- mailing_type=mailing_type
- ),
- 'kpi_action': None,
- 'kpi_col1': {},
- 'kpi_col2': {},
- 'kpi_col3': {},
- 'kpi_name': 'trace',
- },
- ],
- 'tips': [random_tip] if random_tip else False,
- 'formatted_date': formatted_date,
- }
- def _get_pretty_mailing_type(self):
- return _('Emails')
- def _get_unsubscribe_token(self, user_id):
- """Generate a secure hash for this user. It allows to opt out from
- mailing reports while keeping some security in that process. """
- return tools.hmac(self.env(su=True), 'mailing-report-deactivated', user_id)
- # ------------------------------------------------------
- # TOOLS
- # ------------------------------------------------------
- def _convert_inline_images_to_urls(self, body_html):
- """
- Find inline base64 encoded images, make an attachement out of
- them and replace the inline image with an url to the attachement.
- Find VML v:image elements, crop their source images, make an attachement
- out of them and replace their source with an url to the attachement.
- """
- root = lxml.html.fromstring(body_html)
- did_modify_body = False
- conversion_info = [] # list of tuples (image: base64 image, node: lxml node, old_url: string or None))
- with requests.Session() as session:
- for node in root.iter(lxml.etree.Element, lxml.etree.Comment):
- if node.tag == 'img':
- # Convert base64 images in img tags to attachments.
- match = image_re.match(node.attrib.get('src', ''))
- if match:
- image = match.group(2).encode() # base64 image as bytes
- conversion_info.append((image, node, None))
- elif 'base64' in (node.attrib.get('style') or ''):
- # Convert base64 images in inline styles to attachments.
- for match in re.findall(r'data:image/[A-Za-z]+;base64,.+?(?=&\#34;|\"|\'|"|\))', node.attrib.get('style')):
- image = re.sub(r'data:image/[A-Za-z]+;base64,', '', match).encode() # base64 image as bytes
- conversion_info.append((image, node, match))
- elif mso_re.match(node.text or ''):
- # Convert base64 images (in img tags or inline styles) in mso comments to attachments.
- base64_in_element_regex = re.compile(r"""
- (?:(?!^)|<)[^<>]*?(data:image/[A-Za-z]+;base64,[^<]+?)(?=&\#34;|\"|'|"|\))(?=[^<]+>)
- """, re.VERBOSE)
- for match in re.findall(base64_in_element_regex, node.text):
- image = re.sub(r'data:image/[A-Za-z]+;base64,', '', match).encode() # base64 image as bytes
- conversion_info.append((image, node, match))
- # Crop VML images.
- for match in re.findall(r'<v:image[^>]*>', node.text):
- url = re.search(r'src=\s*\"([^\"]+)\"', match)[1]
- # Make sure we have an absolute URL by adding a scheme and host if needed.
- absolute_url = url if '//' in url else f"{self.get_base_url()}{url if url.startswith('/') else f'/{url}'}"
- target_width_match = re.search(r'width:\s*([0-9\.]+)\s*px', match)
- target_height_match = re.search(r'height:\s*([0-9\.]+)\s*px', match)
- if target_width_match and target_height_match:
- target_width = float(target_width_match[1])
- target_height = float(target_height_match[1])
- try:
- image = self._get_image_by_url(absolute_url, session)
- except (ImportValidationError, UnidentifiedImageError):
- # Url invalid or doesn't resolve to a valid image.
- # Note: We choose to ignore errors so as not to
- # break the entire process just for one image's
- # responsive cropping behavior).
- pass
- else:
- image_processor = tools.ImageProcess(image)
- image = image_processor.crop_resize(target_width, target_height, 0, 0)
- conversion_info.append((base64.b64encode(image.source), node, url))
- # Apply the changes.
- urls = self._create_attachments_from_inline_images([image for (image, _, _) in conversion_info])
- for ((image, node, old_url), new_url) in zip(conversion_info, urls):
- did_modify_body = True
- if node.tag == 'img':
- node.attrib['src'] = new_url
- elif 'base64' in (node.attrib.get('style') or ''):
- node.attrib['style'] = node.attrib['style'].replace(old_url, new_url)
- else:
- node.text = node.text.replace(old_url, new_url)
- if did_modify_body:
- return lxml.html.tostring(root, encoding='unicode')
- return body_html
- def _create_attachments_from_inline_images(self, b64images):
- if not b64images:
- return []
- attachments = self.env['ir.attachment'].create([{
- 'datas': b64image,
- 'name': f"cropped_image_mailing_{self.id}_{i}",
- 'type': 'binary',} for i, b64image in enumerate(b64images)])
- urls = []
- for attachment in attachments:
- attachment.generate_access_token()
- urls.append('/web/image/%s?access_token=%s' % (attachment.id, attachment.access_token))
- return urls
- def _get_default_mailing_domain(self):
- mailing_domain = []
- if hasattr(self.env[self.mailing_model_name], '_mailing_get_default_domain'):
- mailing_domain = self.env[self.mailing_model_name]._mailing_get_default_domain(self)
- if self.mailing_type == 'mail' and 'is_blacklisted' in self.env[self.mailing_model_name]._fields:
- mailing_domain = expression.AND([[('is_blacklisted', '=', False)], mailing_domain])
- return mailing_domain
- def _get_image_by_url(self, url, session):
- maxsize = int(tools.config.get("import_image_maxbytes", DEFAULT_IMAGE_MAXBYTES))
- _logger.debug("Trying to import image from URL: %s", url)
- try:
- response = session.get(url, timeout=int(tools.config.get("import_image_timeout", DEFAULT_IMAGE_TIMEOUT)))
- response.raise_for_status()
- if response.headers.get('Content-Length') and int(response.headers['Content-Length']) > maxsize:
- raise ImportValidationError(
- _("File size exceeds configured maximum (%s bytes)", maxsize)
- )
- content = bytearray()
- for chunk in response.iter_content(DEFAULT_IMAGE_CHUNK_SIZE):
- content += chunk
- if len(content) > maxsize:
- raise ImportValidationError(
- _("File size exceeds configured maximum (%s bytes)", maxsize)
- )
- image = Image.open(io.BytesIO(content))
- w, h = image.size
- if w * h > 42e6:
- raise ImportValidationError(
- _("Image size excessive, imported images must be smaller than 42 million pixel")
- )
- return content
- except Exception as e:
- _logger.exception(e)
- raise ImportValidationError(_("Could not retrieve URL: %s", url)) from e
- def _parse_mailing_domain(self):
- self.ensure_one()
- try:
- mailing_domain = literal_eval(self.mailing_domain)
- except Exception:
- mailing_domain = [('id', 'in', [])]
- return mailing_domain
- def _unsubscribe_token(self, res_id, email):
- """Generate a secure hash for this mailing list and parameters.
- This is appended to the unsubscription URL and then checked at
- unsubscription time to ensure no malicious unsubscriptions are
- performed.
- :param int res_id:
- ID of the resource that will be unsubscribed.
- :param str email:
- Email of the resource that will be unsubscribed.
- """
- secret = self.env["ir.config_parameter"].sudo().get_param("database.secret")
- token = (self.env.cr.dbname, self.id, int(res_id), tools.ustr(email))
- return hmac.new(secret.encode('utf-8'), repr(token).encode('utf-8'), hashlib.sha512).hexdigest()
|