# -*- coding: utf-8 -*- import logging from datetime import datetime, timedelta import hashlib from odoo import models from odoo.http import request __author__ = 'jinling.yang' _logger = logging.getLogger(__name__) class AuthenticationError(Exception): pass class Http(models.AbstractModel): _inherit = 'ir.http' @classmethod def _auth_method_sf_token(cls): # 从headers.environ中获取对方传过来的token,timestamp,加密的校验字符串 datas = request.httprequest.headers.environ _logger.info('datas:%s' % datas) if 'HTTP_TOKEN' in datas: _logger.info('token:%s' % datas['HTTP_TOKEN']) # 查询密钥 factory_secret = request.env['res.partner'].sudo().search( [('sf_token', '=', datas['HTTP_TOKEN'])], limit=1) if not factory_secret: raise AuthenticationError('无效的token') # 设置API接口请求时间,不能超过5秒 # deltime = datetime.timedelta(seconds=30) # if abs(int(datas['HTTP_TIMESTAMP']) - timestamp_str) > deltime.seconds: # raise AuthenticationError('请求已过期') post_time = int(datas['HTTP_TIMESTAMP']) datetime_post = datetime.fromtimestamp(post_time) datetime_now = datetime.now().replace(microsecond=0) datetime_del = datetime_now + timedelta(seconds=5) if datetime_post > datetime_del: raise AuthenticationError('请求已过期') check_str = '%s%s%s' % (datas['HTTP_TOKEN'], post_time, factory_secret.sf_secret_key) check_sf_str = hashlib.sha1(check_str.encode('utf-8')).hexdigest() _logger.info('HTTP_TIMESTAMP:%s' % post_time) _logger.info('HTTP_CHECKSTR:%s' % datas['HTTP_CHECKSTR']) _logger.info('check_sf_str:%s' % check_sf_str) _logger.info('check_str:%s' % check_str) _logger.info('sf_secret_key:%s' % factory_secret.sf_secret_key) if check_sf_str != datas['HTTP_CHECKSTR']: raise AuthenticationError('数据校验不通过') else: raise AuthenticationError('请求参数中无token')