common.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. # -*- coding: utf-8 -*-
  2. # Part of Odoo. See LICENSE file for full copyright and licensing details.
  3. import datetime
  4. import random
  5. import re
  6. import werkzeug
  7. from unittest.mock import patch
  8. from odoo import tools
  9. from odoo.addons.link_tracker.tests.common import MockLinkTracker
  10. from odoo.addons.mail.tests.common import MailCase, MailCommon, mail_new_test_user
  11. from odoo.sql_db import Cursor
  12. class MassMailCase(MailCase, MockLinkTracker):
  13. # ------------------------------------------------------------
  14. # ASSERTS
  15. # ------------------------------------------------------------
  16. def assertMailingStatistics(self, mailing, **kwargs):
  17. """ Helper to assert mailing statistics fields. As we have many of them
  18. it helps lessening test asserts. """
  19. if not kwargs.get('expected'):
  20. kwargs['expected'] = len(mailing.mailing_trace_ids)
  21. if not kwargs.get('delivered'):
  22. kwargs['delivered'] = len(mailing.mailing_trace_ids)
  23. for fname in ['scheduled', 'expected', 'sent', 'delivered',
  24. 'opened', 'replied', 'clicked',
  25. 'canceled', 'failed', 'bounced']:
  26. self.assertEqual(
  27. mailing[fname], kwargs.get(fname, 0),
  28. 'Mailing %s statistics failed: got %s instead of %s' % (fname, mailing[fname], kwargs.get(fname, 0))
  29. )
  30. def assertMailTraces(self, recipients_info, mailing, records,
  31. check_mail=True, sent_unlink=False,
  32. author=None, mail_links_info=None):
  33. """ Check content of traces. Traces are fetched based on a given mailing
  34. and records. Their content is compared to recipients_info structure that
  35. holds expected information. Links content may be checked, notably to
  36. assert shortening or unsubscribe links. Mail.mail records may optionally
  37. be checked.
  38. :param recipients_info: list[{
  39. # TRACE
  40. 'partner': res.partner record (may be empty),
  41. 'email': email used when sending email (may be empty, computed based on partner),
  42. 'trace_status': outgoing / sent / open / reply / bounce / error / cancel (sent by default),
  43. 'record: linked record,
  44. # MAIL.MAIL
  45. 'content': optional content that should be present in mail.mail body_html;
  46. 'email_to_mail': optional email used for the mail, when different from the
  47. one stored on the trace itself;
  48. 'email_to_recipients': optional, see '_assertMailMail';
  49. 'failure_type': optional failure reason;
  50. }, { ... }]
  51. :param mailing: a mailing.mailing record from which traces have been
  52. generated;
  53. :param records: records given to mailing that generated traces. It is
  54. used notably to find traces using their IDs;
  55. :param check_mail: if True, also check mail.mail records that should be
  56. linked to traces;
  57. :param sent_unlink: it True, sent mail.mail are deleted and we check gateway
  58. output result instead of actual mail.mail records;
  59. :param mail_links_info: if given, should follow order of ``recipients_info``
  60. and give details about links. See ``assertLinkShortenedHtml`` helper for
  61. more details about content to give;
  62. :param author: author of sent mail.mail;
  63. """
  64. # map trace state to email state
  65. state_mapping = {
  66. 'sent': 'sent',
  67. 'open': 'sent', # opened implies something has been sent
  68. 'reply': 'sent', # replied implies something has been sent
  69. 'error': 'exception',
  70. 'cancel': 'cancel',
  71. 'bounce': 'cancel',
  72. }
  73. traces = self.env['mailing.trace'].search([
  74. ('mass_mailing_id', 'in', mailing.ids),
  75. ('res_id', 'in', records.ids)
  76. ])
  77. debug_info = '\n'.join(
  78. (
  79. f'Trace: to {t.email} - state {t.trace_status}'
  80. for t in traces
  81. )
  82. )
  83. # ensure trace coherency
  84. self.assertTrue(all(s.model == records._name for s in traces))
  85. self.assertEqual(set(s.res_id for s in traces), set(records.ids))
  86. # check each traces
  87. if not mail_links_info:
  88. mail_links_info = [None] * len(recipients_info)
  89. for recipient_info, link_info, record in zip(recipients_info, mail_links_info, records):
  90. partner = recipient_info.get('partner', self.env['res.partner'])
  91. email = recipient_info.get('email')
  92. email_to_mail = recipient_info.get('email_to_mail') or email
  93. email_to_recipients = recipient_info.get('email_to_recipients')
  94. status = recipient_info.get('trace_status', 'sent')
  95. record = record or recipient_info.get('record')
  96. content = recipient_info.get('content')
  97. if email is None and partner:
  98. email = partner.email_normalized
  99. recipient_trace = traces.filtered(
  100. lambda t: (t.email == email or (not email and not t.email)) and \
  101. t.trace_status == status and \
  102. (t.res_id == record.id if record else True)
  103. )
  104. self.assertTrue(
  105. len(recipient_trace) == 1,
  106. 'MailTrace: email %s (recipient %s, status: %s, record: %s): found %s records (1 expected)\n%s' % (
  107. email, partner, status, record,
  108. len(recipient_trace), debug_info)
  109. )
  110. self.assertTrue(bool(recipient_trace.mail_mail_id_int))
  111. if 'failure_type' in recipient_info or status in ('error', 'cancel', 'bounce'):
  112. self.assertEqual(recipient_trace.failure_type, recipient_info['failure_type'])
  113. if check_mail:
  114. if author is None:
  115. author = self.env.user.partner_id
  116. # mail.mail specific values to check
  117. fields_values = {'mailing_id': mailing}
  118. if 'failure_reason' in recipient_info:
  119. fields_values['failure_reason'] = recipient_info['failure_reason']
  120. if 'email_to_mail' in recipient_info:
  121. fields_values['email_to'] = recipient_info['email_to_mail']
  122. # specific for partner: email_formatted is used
  123. if partner:
  124. if status == 'sent' and sent_unlink:
  125. self.assertSentEmail(author, [partner])
  126. else:
  127. self.assertMailMail(
  128. partner, state_mapping[status],
  129. author=author,
  130. content=content,
  131. email_to_recipients=email_to_recipients,
  132. fields_values=fields_values,
  133. )
  134. # specific if email is False -> could have troubles finding it if several falsy traces
  135. elif not email and status in ('cancel', 'bounce'):
  136. self.assertMailMailWId(
  137. recipient_trace.mail_mail_id_int, state_mapping[status],
  138. author=author,
  139. content=content,
  140. email_to_recipients=email_to_recipients,
  141. fields_values=fields_values,
  142. )
  143. else:
  144. self.assertMailMailWEmails(
  145. [email_to_mail], state_mapping[status],
  146. author=author,
  147. content=content,
  148. email_to_recipients=email_to_recipients,
  149. fields_values=fields_values,
  150. )
  151. if link_info:
  152. trace_mail = self._find_mail_mail_wrecord(record)
  153. for (anchor_id, url, is_shortened, add_link_params) in link_info:
  154. link_params = {'utm_medium': 'Email', 'utm_source': mailing.name}
  155. if add_link_params:
  156. link_params.update(**add_link_params)
  157. self.assertLinkShortenedHtml(
  158. trace_mail.body_html,
  159. (anchor_id, url, is_shortened),
  160. link_params=link_params,
  161. )
  162. # ------------------------------------------------------------
  163. # TOOLS
  164. # ------------------------------------------------------------
  165. def gateway_mail_bounce(self, mailing, record, bounce_base_values=None):
  166. """ Generate a bounce at mailgateway level.
  167. :param mailing: a ``mailing.mailing`` record on which we find a trace
  168. to bounce;
  169. :param record: record which should bounce;
  170. :param bounce_base_values: optional values given to routing;
  171. """
  172. trace = mailing.mailing_trace_ids.filtered(lambda t: t.model == record._name and t.res_id == record.id)
  173. parsed_bounce_values = {
  174. 'email_from': 'some.email@external.example.com', # TDE check: email_from -> trace email ?
  175. 'to': 'bounce@test.example.com', # TDE check: bounce alias ?
  176. 'message_id': tools.generate_tracking_message_id('MailTest'),
  177. 'bounced_partner': self.env['res.partner'].sudo(),
  178. 'bounced_message': self.env['mail.message'].sudo()
  179. }
  180. if bounce_base_values:
  181. parsed_bounce_values.update(bounce_base_values)
  182. parsed_bounce_values.update({
  183. 'bounced_email': trace.email,
  184. 'bounced_msg_id': [trace.message_id],
  185. })
  186. self.env['mail.thread']._routing_handle_bounce(False, parsed_bounce_values)
  187. def gateway_mail_click(self, mailing, record, click_label):
  188. """ Simulate a click on a sent email. """
  189. trace = mailing.mailing_trace_ids.filtered(lambda t: t.model == record._name and t.res_id == record.id)
  190. email = self._find_sent_mail_wemail(trace.email)
  191. self.assertTrue(bool(email))
  192. for (_url_href, link_url, _dummy, label) in re.findall(tools.HTML_TAG_URL_REGEX, email['body']):
  193. if label == click_label and '/r/' in link_url: # shortened link, like 'http://localhost:8069/r/LBG/m/53'
  194. parsed_url = werkzeug.urls.url_parse(link_url)
  195. path_items = parsed_url.path.split('/')
  196. code, trace_id = path_items[2], int(path_items[4])
  197. self.assertEqual(trace.id, trace_id)
  198. self.env['link.tracker.click'].sudo().add_click(
  199. code,
  200. ip='100.200.300.%3f' % random.random(),
  201. country_code='BE',
  202. mailing_trace_id=trace.id
  203. )
  204. break
  205. else:
  206. raise AssertionError('url %s not found in mailing %s for record %s' % (click_label, mailing, record))
  207. @classmethod
  208. def _create_bounce_trace(cls, mailing, records, dt=None):
  209. if dt is None:
  210. dt = datetime.datetime.now() - datetime.timedelta(days=1)
  211. return cls._create_traces(mailing, records, dt, trace_status='bounce')
  212. @classmethod
  213. def _create_sent_traces(cls, mailing, records, dt=None):
  214. if dt is None:
  215. dt = datetime.datetime.now() - datetime.timedelta(days=1)
  216. return cls._create_traces(mailing, records, dt, trace_status='sent')
  217. @classmethod
  218. def _create_traces(cls, mailing, records, dt, **values):
  219. if 'email_normalized' in records:
  220. fname = 'email_normalized'
  221. elif 'email_from' in records:
  222. fname = 'email_from'
  223. else:
  224. fname = 'email'
  225. randomized = random.random()
  226. # Cursor.now() uses transaction's timestamp and not datetime lib -> freeze_time
  227. # is not sufficient
  228. with patch.object(Cursor, 'now', lambda *args, **kwargs: dt):
  229. traces = cls.env['mailing.trace'].sudo().create([
  230. dict({'mass_mailing_id': mailing.id,
  231. 'model': record._name,
  232. 'res_id': record.id,
  233. 'trace_status': values.get('trace_status', 'bounce'),
  234. # TDE FIXME: improve this with a mail-enabled heuristics
  235. 'email': record[fname],
  236. 'message_id': '<%5f@gilbert.boitempomils>' % randomized,
  237. }, **values)
  238. for record in records
  239. ])
  240. return traces
  241. class MassMailCommon(MailCommon, MassMailCase):
  242. @classmethod
  243. def setUpClass(cls):
  244. super(MassMailCommon, cls).setUpClass()
  245. cls.user_marketing = mail_new_test_user(
  246. cls.env, login='user_marketing',
  247. groups='base.group_user,base.group_partner_manager,mass_mailing.group_mass_mailing_user',
  248. name='Martial Marketing', signature='--\nMartial')
  249. cls.email_reply_to = 'MyCompany SomehowAlias <test.alias@test.mycompany.com>'
  250. @classmethod
  251. def _create_mailing_list(cls):
  252. """ Shortcut to create mailing lists. Currently hardcoded, maybe evolve
  253. in a near future. """
  254. cls.mailing_list_1 = cls.env['mailing.list'].with_context(cls._test_context).create({
  255. 'name': 'List1',
  256. 'contact_ids': [
  257. (0, 0, {'name': 'Déboulonneur', 'email': 'fleurus@example.com'}),
  258. (0, 0, {'name': 'Gorramts', 'email': 'gorramts@example.com'}),
  259. (0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
  260. ]
  261. })
  262. cls.mailing_list_2 = cls.env['mailing.list'].with_context(cls._test_context).create({
  263. 'name': 'List2',
  264. 'contact_ids': [
  265. (0, 0, {'name': 'Gilberte', 'email': 'gilberte@example.com'}),
  266. (0, 0, {'name': 'Gilberte En Mieux', 'email': 'gilberte@example.com'}),
  267. (0, 0, {'name': 'Norbert', 'email': 'norbert@example.com'}),
  268. (0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
  269. ]
  270. })
  271. @classmethod
  272. def _create_mailing_list_of_x_contacts(cls, contacts_nbr):
  273. """ Shortcut to create a mailing list that contains a defined number
  274. of contacts. """
  275. return cls.env['mailing.list'].with_context(cls._test_context).create({
  276. 'name': 'Test List',
  277. 'contact_ids': [
  278. (0, 0, {'name': 'Contact %s' % i, 'email': 'contact%s@example.com' % i})
  279. for i in range(contacts_nbr)
  280. ],
  281. })