123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- # -*- coding: utf-8 -*-
- # Part of Odoo. See LICENSE file for full copyright and licensing details.
- import base64
- import functools
- import json
- import logging
- import os
- import werkzeug.urls
- import werkzeug.utils
- from werkzeug.exceptions import BadRequest
- from odoo import api, http, SUPERUSER_ID, _
- from odoo.exceptions import AccessDenied
- from odoo.http import request, Response
- from odoo import registry as registry_get
- from odoo.tools.misc import clean_context
- from odoo.addons.auth_signup.controllers.main import AuthSignupHome as Home
- from odoo.addons.web.controllers.utils import ensure_db, _get_login_redirect_url
- _logger = logging.getLogger(__name__)
- #----------------------------------------------------------
- # helpers
- #----------------------------------------------------------
- def fragment_to_query_string(func):
- @functools.wraps(func)
- def wrapper(self, *a, **kw):
- kw.pop('debug', False)
- if not kw:
- return Response("""<html><head><script>
- var l = window.location;
- var q = l.hash.substring(1);
- var r = l.pathname + l.search;
- if(q.length !== 0) {
- var s = l.search ? (l.search === '?' ? '' : '&') : '?';
- r = l.pathname + l.search + s + q;
- }
- if (r == l.pathname) {
- r = '/';
- }
- window.location = r;
- </script></head><body></body></html>""")
- return func(self, *a, **kw)
- return wrapper
- #----------------------------------------------------------
- # Controller
- #----------------------------------------------------------
- class OAuthLogin(Home):
- def list_providers(self):
- try:
- providers = request.env['auth.oauth.provider'].sudo().search_read([('enabled', '=', True)])
- except Exception:
- providers = []
- for provider in providers:
- return_url = request.httprequest.url_root + 'auth_oauth/signin'
- state = self.get_state(provider)
- params = dict(
- response_type='token',
- client_id=provider['client_id'],
- redirect_uri=return_url,
- scope=provider['scope'],
- state=json.dumps(state),
- # nonce=base64.urlsafe_b64encode(os.urandom(16)),
- )
- provider['auth_link'] = "%s?%s" % (provider['auth_endpoint'], werkzeug.urls.url_encode(params))
- return providers
- def get_state(self, provider):
- redirect = request.params.get('redirect') or 'web'
- if not redirect.startswith(('//', 'http://', 'https://')):
- redirect = '%s%s' % (request.httprequest.url_root, redirect[1:] if redirect[0] == '/' else redirect)
- state = dict(
- d=request.session.db,
- p=provider['id'],
- r=werkzeug.urls.url_quote_plus(redirect),
- )
- token = request.params.get('token')
- if token:
- state['t'] = token
- return state
- @http.route()
- def web_login(self, *args, **kw):
- ensure_db()
- if request.httprequest.method == 'GET' and request.session.uid and request.params.get('redirect'):
- # Redirect if already logged in and redirect param is present
- return request.redirect(request.params.get('redirect'))
- providers = self.list_providers()
- response = super(OAuthLogin, self).web_login(*args, **kw)
- if response.is_qweb:
- error = request.params.get('oauth_error')
- if error == '1':
- error = _("Sign up is not allowed on this database.")
- elif error == '2':
- error = _("Access Denied")
- elif error == '3':
- error = _("You do not have access to this database or your invitation has expired. Please ask for an invitation and be sure to follow the link in your invitation email.")
- else:
- error = None
- response.qcontext['providers'] = providers
- if error:
- response.qcontext['error'] = error
- return response
- def get_auth_signup_qcontext(self):
- result = super(OAuthLogin, self).get_auth_signup_qcontext()
- result["providers"] = self.list_providers()
- return result
- class OAuthController(http.Controller):
- @http.route('/auth_oauth/signin', type='http', auth='none')
- @fragment_to_query_string
- def signin(self, **kw):
- state = json.loads(kw['state'])
- # make sure request.session.db and state['d'] are the same,
- # update the session and retry the request otherwise
- dbname = state['d']
- if not http.db_filter([dbname]):
- return BadRequest()
- ensure_db(db=dbname)
- provider = state['p']
- request.update_context(**clean_context(state.get('c', {})))
- try:
- # auth_oauth may create a new user, the commit makes it
- # visible to authenticate()'s own transaction below
- _, login, key = request.env['res.users'].with_user(SUPERUSER_ID).auth_oauth(provider, kw)
- request.env.cr.commit()
- action = state.get('a')
- menu = state.get('m')
- redirect = werkzeug.urls.url_unquote_plus(state['r']) if state.get('r') else False
- url = '/web'
- if redirect:
- url = redirect
- elif action:
- url = '/web#action=%s' % action
- elif menu:
- url = '/web#menu_id=%s' % menu
- pre_uid = request.session.authenticate(dbname, login, key)
- resp = request.redirect(_get_login_redirect_url(pre_uid, url), 303)
- resp.autocorrect_location_header = False
- # Since /web is hardcoded, verify user has right to land on it
- if werkzeug.urls.url_parse(resp.location).path == '/web' and not request.env.user._is_internal():
- resp.location = '/'
- return resp
- except AttributeError: # TODO juc master: useless since ensure_db()
- # auth_signup is not installed
- _logger.error("auth_signup not installed on database %s: oauth sign up cancelled.", dbname)
- url = "/web/login?oauth_error=1"
- except AccessDenied:
- # oauth credentials not valid, user could be on a temporary session
- _logger.info('OAuth2: access denied, redirect to main page in case a valid session exists, without setting cookies')
- url = "/web/login?oauth_error=3"
- except Exception:
- # signup error
- _logger.exception("Exception during request handling")
- url = "/web/login?oauth_error=2"
- redirect = request.redirect(url, 303)
- redirect.autocorrect_location_header = False
- return redirect
- @http.route('/auth_oauth/oea', type='http', auth='none')
- def oea(self, **kw):
- """login user via Odoo Account provider"""
- dbname = kw.pop('db', None)
- if not dbname:
- dbname = request.db
- if not dbname:
- raise BadRequest()
- if not http.db_filter([dbname]):
- raise BadRequest()
- registry = registry_get(dbname)
- with registry.cursor() as cr:
- try:
- env = api.Environment(cr, SUPERUSER_ID, {})
- provider = env.ref('auth_oauth.provider_openerp')
- except ValueError:
- redirect = request.redirect(f'/web?db={dbname}', 303)
- redirect.autocorrect_location_header = False
- return redirect
- assert provider._name == 'auth.oauth.provider'
- state = {
- 'd': dbname,
- 'p': provider.id,
- 'c': {'no_user_creation': True},
- }
- kw['state'] = json.dumps(state)
- return self.signin(**kw)
|