ir_cron.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. # Part of Odoo. See LICENSE file for full copyright and licensing details.
  2. import logging
  3. import threading
  4. import time
  5. import os
  6. import psycopg2
  7. import pytz
  8. from datetime import datetime, timedelta
  9. from dateutil.relativedelta import relativedelta
  10. import odoo
  11. from odoo import api, fields, models, _
  12. from odoo.exceptions import UserError
  13. from psycopg2 import sql
  14. _logger = logging.getLogger(__name__)
  15. BASE_VERSION = odoo.modules.get_manifest('base')['version']
  16. MAX_FAIL_TIME = timedelta(hours=5) # chosen with a fair roll of the dice
  17. # custom function to call instead of NOTIFY postgresql command (opt-in)
  18. ODOO_NOTIFY_FUNCTION = os.environ.get('ODOO_NOTIFY_FUNCTION')
  19. class BadVersion(Exception):
  20. pass
  21. class BadModuleState(Exception):
  22. pass
  23. _intervalTypes = {
  24. 'days': lambda interval: relativedelta(days=interval),
  25. 'hours': lambda interval: relativedelta(hours=interval),
  26. 'weeks': lambda interval: relativedelta(days=7*interval),
  27. 'months': lambda interval: relativedelta(months=interval),
  28. 'minutes': lambda interval: relativedelta(minutes=interval),
  29. }
  30. class ir_cron(models.Model):
  31. """ Model describing cron jobs (also called actions or tasks).
  32. """
  33. # TODO: perhaps in the future we could consider a flag on ir.cron jobs
  34. # that would cause database wake-up even if the database has not been
  35. # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
  36. # See also odoo.cron
  37. _name = "ir.cron"
  38. _order = 'cron_name'
  39. _description = 'Scheduled Actions'
  40. ir_actions_server_id = fields.Many2one(
  41. 'ir.actions.server', 'Server action',
  42. delegate=True, ondelete='restrict', required=True)
  43. cron_name = fields.Char('Name', related='ir_actions_server_id.name', store=True, readonly=False)
  44. user_id = fields.Many2one('res.users', string='Scheduler User', default=lambda self: self.env.user, required=True)
  45. active = fields.Boolean(default=True)
  46. interval_number = fields.Integer(default=1, help="Repeat every x.")
  47. interval_type = fields.Selection([('minutes', 'Minutes'),
  48. ('hours', 'Hours'),
  49. ('days', 'Days'),
  50. ('weeks', 'Weeks'),
  51. ('months', 'Months')], string='Interval Unit', default='months')
  52. numbercall = fields.Integer(string='Number of Calls', default=1, help='How many times the method is called,\na negative number indicates no limit.')
  53. doall = fields.Boolean(string='Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts.")
  54. nextcall = fields.Datetime(string='Next Execution Date', required=True, default=fields.Datetime.now, help="Next planned execution date for this job.")
  55. lastcall = fields.Datetime(string='Last Execution Date', help="Previous time the cron ran successfully, provided to the job through the context on the `lastcall` key")
  56. priority = fields.Integer(default=5, help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
  57. @api.model_create_multi
  58. def create(self, vals_list):
  59. for vals in vals_list:
  60. vals['usage'] = 'ir_cron'
  61. if os.getenv('ODOO_NOTIFY_CRON_CHANGES'):
  62. self._cr.postcommit.add(self._notifydb)
  63. return super().create(vals_list)
  64. @api.model
  65. def default_get(self, fields_list):
  66. # only 'code' state is supported for cron job so set it as default
  67. if not self._context.get('default_state'):
  68. self = self.with_context(default_state='code')
  69. return super(ir_cron, self).default_get(fields_list)
  70. def method_direct_trigger(self):
  71. self.check_access_rights('write')
  72. for cron in self:
  73. cron.with_user(cron.user_id).with_context({'lastcall': cron.lastcall}).ir_actions_server_id.run()
  74. cron.lastcall = fields.Datetime.now()
  75. return True
  76. @classmethod
  77. def _process_jobs(cls, db_name):
  78. """ Execute every job ready to be run on this database. """
  79. try:
  80. db = odoo.sql_db.db_connect(db_name)
  81. threading.current_thread().dbname = db_name
  82. with db.cursor() as cron_cr:
  83. cls._check_version(cron_cr)
  84. jobs = cls._get_all_ready_jobs(cron_cr)
  85. if not jobs:
  86. return
  87. cls._check_modules_state(cron_cr, jobs)
  88. for job_id in (job['id'] for job in jobs):
  89. try:
  90. job = cls._acquire_one_job(cron_cr, (job_id,))
  91. except psycopg2.extensions.TransactionRollbackError:
  92. cron_cr.rollback()
  93. _logger.debug("job %s has been processed by another worker, skip", job_id)
  94. continue
  95. if not job:
  96. _logger.debug("another worker is processing job %s, skip", job_id)
  97. continue
  98. _logger.debug("job %s acquired", job_id)
  99. # take into account overridings of _process_job() on that database
  100. registry = odoo.registry(db_name)
  101. registry[cls._name]._process_job(db, cron_cr, job)
  102. _logger.debug("job %s updated and released", job_id)
  103. except BadVersion:
  104. _logger.warning('Skipping database %s as its base version is not %s.', db_name, BASE_VERSION)
  105. except BadModuleState:
  106. _logger.warning('Skipping database %s because of modules to install/upgrade/remove.', db_name)
  107. except psycopg2.ProgrammingError as e:
  108. if e.pgcode == '42P01':
  109. # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
  110. # The table ir_cron does not exist; this is probably not an OpenERP database.
  111. _logger.warning('Tried to poll an undefined table on database %s.', db_name)
  112. else:
  113. raise
  114. except Exception:
  115. _logger.warning('Exception in cron:', exc_info=True)
  116. finally:
  117. if hasattr(threading.current_thread(), 'dbname'):
  118. del threading.current_thread().dbname
  119. @classmethod
  120. def _check_version(cls, cron_cr):
  121. """ Ensure the code version matches the database version """
  122. cron_cr.execute("""
  123. SELECT latest_version
  124. FROM ir_module_module
  125. WHERE name='base'
  126. """)
  127. (version,) = cron_cr.fetchone()
  128. if version is None:
  129. raise BadModuleState()
  130. if version != BASE_VERSION:
  131. raise BadVersion()
  132. @classmethod
  133. def _check_modules_state(cls, cr, jobs):
  134. """ Ensure no module is installing or upgrading """
  135. cr.execute("""
  136. SELECT COUNT(*)
  137. FROM ir_module_module
  138. WHERE state LIKE %s
  139. """, ['to %'])
  140. (changes,) = cr.fetchone()
  141. if not changes:
  142. return
  143. if not jobs:
  144. raise BadModuleState()
  145. oldest = min([
  146. fields.Datetime.from_string(job['nextcall'])
  147. for job in jobs
  148. ])
  149. if datetime.now() - oldest < MAX_FAIL_TIME:
  150. raise BadModuleState()
  151. # the cron execution failed around MAX_FAIL_TIME * 60 times (1 failure
  152. # per minute for 5h) in which case we assume that the crons are stuck
  153. # because the db has zombie states and we force a call to
  154. # reset_module_states.
  155. odoo.modules.reset_modules_state(cr.dbname)
  156. @classmethod
  157. def _get_all_ready_jobs(cls, cr):
  158. """ Return a list of all jobs that are ready to be executed """
  159. cr.execute("""
  160. SELECT *, cron_name->>'en_US' as cron_name
  161. FROM ir_cron
  162. WHERE active = true
  163. AND numbercall != 0
  164. AND (nextcall <= (now() at time zone 'UTC')
  165. OR id in (
  166. SELECT cron_id
  167. FROM ir_cron_trigger
  168. WHERE call_at <= (now() at time zone 'UTC')
  169. )
  170. )
  171. ORDER BY priority
  172. """)
  173. return cr.dictfetchall()
  174. @classmethod
  175. def _acquire_one_job(cls, cr, job_ids):
  176. """
  177. Acquire for update one job that is ready from the job_ids tuple.
  178. The jobs that have already been processed in this worker should
  179. be excluded from the tuple.
  180. This function raises a ``psycopg2.errors.SerializationFailure``
  181. when the ``nextcall`` of one of the job_ids is modified in
  182. another transaction. You should rollback the transaction and try
  183. again later.
  184. """
  185. # We have to make sure ALL jobs are executed ONLY ONCE no matter
  186. # how many cron workers may process them. The exlusion mechanism
  187. # is twofold: (i) prevent parallel processing of the same job,
  188. # and (ii) prevent re-processing jobs that have been processed
  189. # already.
  190. #
  191. # (i) is implemented via `LIMIT 1 FOR UPDATE SKIP LOCKED`, each
  192. # worker just acquire one available job at a time and lock it so
  193. # the other workers don't select it too.
  194. # (ii) is implemented via the `WHERE` statement, when a job has
  195. # been processed, its nextcall is updated to a date in the
  196. # future and the optional triggers are removed.
  197. #
  198. # Note about (ii): it is possible that a job becomes available
  199. # again quickly (e.g. high frequency or self-triggering cron).
  200. # This function doesn't prevent from acquiring that job multiple
  201. # times at different moments. This can block a worker on
  202. # executing a same job in loop. To prevent this problem, the
  203. # callee is responsible of providing a `job_ids` tuple without
  204. # the jobs it has executed already.
  205. #
  206. # An `UPDATE` lock type is the strongest row lock, it conflicts
  207. # with ALL other lock types. Among them the `KEY SHARE` row lock
  208. # which is implicitely aquired by foreign keys to prevent the
  209. # referenced record from being removed while in use. Because we
  210. # never delete acquired cron jobs, foreign keys are safe to
  211. # concurrently reference cron jobs. Hence, the `NO KEY UPDATE`
  212. # row lock is used, it is a weaker lock that does conflict with
  213. # everything BUT `KEY SHARE`.
  214. #
  215. # Learn more: https://www.postgresql.org/docs/current/explicit-locking.html#LOCKING-ROWS
  216. query = """
  217. SELECT *, cron_name->>'en_US' as cron_name
  218. FROM ir_cron
  219. WHERE active = true
  220. AND numbercall != 0
  221. AND (nextcall <= (now() at time zone 'UTC')
  222. OR EXISTS (
  223. SELECT cron_id
  224. FROM ir_cron_trigger
  225. WHERE call_at <= (now() at time zone 'UTC')
  226. AND cron_id = ir_cron.id
  227. )
  228. )
  229. AND id in %s
  230. ORDER BY priority
  231. LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED
  232. """
  233. try:
  234. cr.execute(query, [job_ids], log_exceptions=False)
  235. except psycopg2.extensions.TransactionRollbackError:
  236. # A serialization error can occur when another cron worker
  237. # commits the new `nextcall` value of a cron it just ran and
  238. # that commit occured just before this query. The error is
  239. # genuine and the job should be skipped in this cron worker.
  240. raise
  241. except Exception as exc:
  242. _logger.error("bad query: %s\nERROR: %s", query, exc)
  243. raise
  244. return cr.dictfetchone()
  245. @classmethod
  246. def _process_job(cls, db, cron_cr, job):
  247. """ Execute a cron job and re-schedule a call for later. """
  248. # Compute how many calls were missed and at what time we should
  249. # recall the cron next. In the example bellow, we fake a cron
  250. # with an interval of 30 (starting at 0) that was last executed
  251. # at 15 and that is executed again at 135.
  252. #
  253. # 0 60 120 180
  254. # --|-----|-----|-----|-----|-----|-----|----> time
  255. # 1 2* * * * 3 4
  256. #
  257. # 1: lastcall, the last time the cron was executed
  258. # 2: past_nextcall, the cron nextcall as seen from lastcall
  259. # *: missed_call, a total of 4 calls are missing
  260. # 3: now
  261. # 4: future_nextcall, the cron nextcall as seen from now
  262. with cls.pool.cursor() as job_cr:
  263. lastcall = fields.Datetime.to_datetime(job['lastcall'])
  264. interval = _intervalTypes[job['interval_type']](job['interval_number'])
  265. env = api.Environment(job_cr, job['user_id'], {'lastcall': lastcall})
  266. ir_cron = env[cls._name]
  267. # Use the user's timezone to compare and compute datetimes,
  268. # otherwise unexpected results may appear. For instance, adding
  269. # 1 month in UTC to July 1st at midnight in GMT+2 gives July 30
  270. # instead of August 1st!
  271. now = fields.Datetime.context_timestamp(ir_cron, datetime.utcnow())
  272. past_nextcall = fields.Datetime.context_timestamp(
  273. ir_cron, fields.Datetime.to_datetime(job['nextcall']))
  274. # Compute how many call were missed
  275. missed_call = past_nextcall
  276. missed_call_count = 0
  277. while missed_call <= now:
  278. missed_call += interval
  279. missed_call_count += 1
  280. future_nextcall = missed_call
  281. # Compute how many time we should run the cron
  282. effective_call_count = (
  283. 1 if not missed_call_count # run at least once
  284. else 1 if not job['doall'] # run once for all
  285. else missed_call_count if job['numbercall'] == -1 # run them all
  286. else min(missed_call_count, job['numbercall']) # run maximum numbercall times
  287. )
  288. call_count_left = max(job['numbercall'] - effective_call_count, -1)
  289. # The actual cron execution
  290. for call in range(effective_call_count):
  291. ir_cron._callback(job['cron_name'], job['ir_actions_server_id'], job['id'])
  292. # Update the cron with the information computed above
  293. cron_cr.execute("""
  294. UPDATE ir_cron
  295. SET nextcall=%s,
  296. numbercall=%s,
  297. lastcall=%s,
  298. active=%s
  299. WHERE id=%s
  300. """, [
  301. fields.Datetime.to_string(future_nextcall.astimezone(pytz.UTC)),
  302. call_count_left,
  303. fields.Datetime.to_string(now.astimezone(pytz.UTC)),
  304. job['active'] and bool(call_count_left),
  305. job['id'],
  306. ])
  307. cron_cr.execute("""
  308. DELETE FROM ir_cron_trigger
  309. WHERE cron_id = %s
  310. AND call_at < (now() at time zone 'UTC')
  311. """, [job['id']])
  312. cron_cr.commit()
  313. @api.model
  314. def _callback(self, cron_name, server_action_id, job_id):
  315. """ Run the method associated to a given job. It takes care of logging
  316. and exception handling. Note that the user running the server action
  317. is the user calling this method. """
  318. try:
  319. if self.pool != self.pool.check_signaling():
  320. # the registry has changed, reload self in the new registry
  321. self.env.reset()
  322. self = self.env()[self._name]
  323. log_depth = (None if _logger.isEnabledFor(logging.DEBUG) else 1)
  324. odoo.netsvc.log(_logger, logging.DEBUG, 'cron.object.execute', (self._cr.dbname, self._uid, '*', cron_name, server_action_id), depth=log_depth)
  325. start_time = False
  326. _logger.info('Starting job `%s`.', cron_name)
  327. if _logger.isEnabledFor(logging.DEBUG):
  328. start_time = time.time()
  329. self.env['ir.actions.server'].browse(server_action_id).run()
  330. _logger.info('Job `%s` done.', cron_name)
  331. if start_time and _logger.isEnabledFor(logging.DEBUG):
  332. end_time = time.time()
  333. _logger.debug('%.3fs (cron %s, server action %d with uid %d)', end_time - start_time, cron_name, server_action_id, self.env.uid)
  334. self.pool.signal_changes()
  335. except Exception as e:
  336. self.pool.reset_changes()
  337. _logger.exception("Call from cron %s for server action #%s failed in Job #%s",
  338. cron_name, server_action_id, job_id)
  339. self._handle_callback_exception(cron_name, server_action_id, job_id, e)
  340. @api.model
  341. def _handle_callback_exception(self, cron_name, server_action_id, job_id, job_exception):
  342. """ Method called when an exception is raised by a job.
  343. Simply logs the exception and rollback the transaction. """
  344. self._cr.rollback()
  345. def _try_lock(self, lockfk=False):
  346. """Try to grab a dummy exclusive write-lock to the rows with the given ids,
  347. to make sure a following write() or unlink() will not block due
  348. to a process currently executing those cron tasks.
  349. :param lockfk: acquire a strong row lock which conflicts with
  350. the lock aquired by foreign keys when they
  351. reference this row.
  352. """
  353. row_level_lock = "UPDATE" if lockfk else "NO KEY UPDATE"
  354. try:
  355. self._cr.execute(f"""
  356. SELECT id
  357. FROM "{self._table}"
  358. WHERE id IN %s
  359. FOR {row_level_lock} NOWAIT
  360. """, [tuple(self.ids)], log_exceptions=False)
  361. except psycopg2.OperationalError:
  362. self._cr.rollback() # early rollback to allow translations to work for the user feedback
  363. raise UserError(_("Record cannot be modified right now: "
  364. "This cron task is currently being executed and may not be modified "
  365. "Please try again in a few minutes"))
  366. def write(self, vals):
  367. self._try_lock()
  368. if ('nextcall' in vals or vals.get('active')) and os.getenv('ODOO_NOTIFY_CRON_CHANGES'):
  369. self._cr.postcommit.add(self._notifydb)
  370. return super(ir_cron, self).write(vals)
  371. def unlink(self):
  372. self._try_lock(lockfk=True)
  373. return super(ir_cron, self).unlink()
  374. def try_write(self, values):
  375. try:
  376. with self._cr.savepoint():
  377. self._cr.execute(f"""
  378. SELECT id
  379. FROM "{self._table}"
  380. WHERE id IN %s
  381. FOR NO KEY UPDATE NOWAIT
  382. """, [tuple(self.ids)], log_exceptions=False)
  383. except psycopg2.OperationalError:
  384. pass
  385. else:
  386. return super(ir_cron, self).write(values)
  387. return False
  388. @api.model
  389. def toggle(self, model, domain):
  390. # Prevent deactivated cron jobs from being re-enabled through side effects on
  391. # neutralized databases.
  392. if self.env['ir.config_parameter'].sudo().get_param('database.is_neutralized'):
  393. return True
  394. active = bool(self.env[model].search_count(domain))
  395. return self.try_write({'active': active})
  396. def _trigger(self, at=None):
  397. """
  398. Schedule a cron job to be executed soon independently of its
  399. ``nextcall`` field value.
  400. By default the cron is scheduled to be executed in the next batch but
  401. the optional `at` argument may be given to delay the execution later
  402. with a precision down to 1 minute.
  403. The method may be called with a datetime or an iterable of datetime.
  404. The actual implementation is in :meth:`~._trigger_list`, which is the
  405. recommended method for overrides.
  406. :param Optional[Union[datetime.datetime, list[datetime.datetime]]] at:
  407. When to execute the cron, at one or several moments in time instead
  408. of as soon as possible.
  409. """
  410. if at is None:
  411. at_list = [fields.Datetime.now()]
  412. elif isinstance(at, datetime):
  413. at_list = [at]
  414. else:
  415. at_list = list(at)
  416. assert all(isinstance(at, datetime) for at in at_list)
  417. self._trigger_list(at_list)
  418. def _trigger_list(self, at_list):
  419. """
  420. Implementation of :meth:`~._trigger`.
  421. :param list[datetime.datetime] at_list:
  422. Execute the cron later, at precise moments in time.
  423. """
  424. self.ensure_one()
  425. now = fields.Datetime.now()
  426. if not self.sudo().active:
  427. # skip triggers that would be ignored
  428. at_list = [at for at in at_list if at > now]
  429. if not at_list:
  430. return
  431. self.env['ir.cron.trigger'].sudo().create([
  432. {'cron_id': self.id, 'call_at': at}
  433. for at in at_list
  434. ])
  435. if _logger.isEnabledFor(logging.DEBUG):
  436. ats = ', '.join(map(str, at_list))
  437. _logger.debug("will execute '%s' at %s", self.sudo().name, ats)
  438. if min(at_list) <= now or os.getenv('ODOO_NOTIFY_CRON_CHANGES'):
  439. self._cr.postcommit.add(self._notifydb)
  440. def _notifydb(self):
  441. """ Wake up the cron workers
  442. The ODOO_NOTIFY_CRON_CHANGES environment variable allows to force the notifydb on both
  443. ir_cron modification and on trigger creation (regardless of call_at)
  444. """
  445. with odoo.sql_db.db_connect('postgres').cursor() as cr:
  446. if ODOO_NOTIFY_FUNCTION:
  447. query = sql.SQL("SELECT {}('cron_trigger', %s)").format(sql.Identifier(ODOO_NOTIFY_FUNCTION))
  448. else:
  449. query = "NOTIFY cron_trigger, %s"
  450. cr.execute(query, [self.env.cr.dbname])
  451. _logger.debug("cron workers notified")
  452. class ir_cron_trigger(models.Model):
  453. _name = 'ir.cron.trigger'
  454. _description = 'Triggered actions'
  455. cron_id = fields.Many2one("ir.cron", index=True)
  456. call_at = fields.Datetime()
  457. @api.autovacuum
  458. def _gc_cron_triggers(self):
  459. self.search([('call_at', '<', datetime.now() + relativedelta(weeks=-1))]).unlink()