JWT-RESTful进行身份认证

视频链接
文章目录JWT的实现 Go语言实现 JWT 更新状态问题(白名单) 总结
服务器存储与客户端存储 基于服务器的身份认证方式存在一些问题:客户端存储
JWT与的差异 相同点是,它们都是存储用户信息;然而,是在服务器端的,而JWT是在客户端的 。
方式存储用户信息的最大问题在于要占用大量服务器内存,增加服务器的开销 。
而JWT方式将用户状态分散到了客户端中,可以明显减轻服务端的内存压力 。
的状态是存储在服务器端,客户端只有 id;而Token的状态是存储在客户端 。
JWT的实现
语言实现
# -*- coding: utf8 -*-from typing import Optionalimport datetimeimport jwtfrom werkzeug.local import LocalProxyfrom flask import current_app, request, has_app_context, _app_ctx_stackfrom . import exceptionsimport osregistered_claims = {'iss', 'sub', 'aud', 'exp', 'nbf', 'iat', 'jti'}class _AuthObject:def __init__(self, **kwargs):for key, val in kwargs.items():setattr(self, key, val)def __getattr__(self, item):return Nonedef __repr__(self):return str(self.__dict__)def encode_token(iss: Optional[str] = None,expire: Optional[datetime.timedelta] = None,**kwargs):"""encode jwt token:param iss::param expire: timedelta object, set the expire time of jwt:param kwargs::return:"""try:header = {'algorithm': 'HS256', 'type': 'JWT'}payload = {'iat': datetime.datetime.utcnow(),'iss': iss or 'website.com',}# update public claim namespayload.update(**kwargs)# if set jwt expire time, update exp claimif expire:payload['exp'] = payload['iat'] + expire# gen jwt tokentoken = jwt.encode(payload, os.getenv('SECRET_KEY'), headers=header)return tokenexcept Exception:raise exceptions.Internal(message='无效token')def decode_token(token, verify_exp: bool = False):"""decode jwt token:param token::param verify_exp::return:"""try:tmp = {}payload = jwt.decode(token, os.getenv('SECRET_KEY'), options={'verify_exp': verify_exp})# get the public claim namesfor field in payload.keys():if field in registered_claims:continuetmp[field] = payload[field]return tmpexcept jwt.ExpiredSignatureError:raise exceptions.Unauthenticated(message='token已过期')except jwt.InvalidTokenError:raise exceptions.Unauthenticated(message='无效token')def _get_auth():if not has_app_context():raise RuntimeError('No application found. Either work inside a view function or push'' an application context.')if not hasattr(_app_ctx_stack.top, 'auth'):# get and validate auth filed in request headerauth_header = request.headers.get('Authorization')if not auth_header:raise exceptions.InvalidArgument(message="请求头错误")auth_attr = auth_header.split(' ')if not auth_attr or auth_attr[0] != 'JWT' or len(auth_attr) != 2:raise exceptions.InvalidArgument(message="token格式错误")payload = decode_token(auth_attr[1])# set auth data to app context_app_ctx_stack.top.auth = _AuthObject(**payload)return getattr(_app_ctx_stack.top, 'auth')# global variablecurrent_auth = LocalProxy(lambda: _get_auth())
Flask 与
flask 一般在请求钩子中设置 或者 在调用时加入
jwt 使用装饰器
init.py
# -*- coding: utf8 -*-"""权限管理,JWT实现"""from .permission import auth_control, current_auth__all__ = ['auth_control', 'current_auth']
# -*- coding: utf8 -*-from typing import Optionalimport datetimeimport jwtfrom werkzeug.local import LocalProxyfrom flask import current_app, request, has_app_context, _app_ctx_stackfrom . import exceptionsregistered_claims = {'iss', 'sub', 'aud', 'exp', 'nbf', 'iat', 'jti'}class _AuthObject:def __init__(self, **kwargs):for key, val in kwargs.items():setattr(self, key, val)def __getattr__(self, item):return Nonedef __repr__(self):return str(self.__dict__)def encode_token(iss: Optional[str] = None,expire: Optional[datetime.timedelta] = None,**kwargs):"""encode jwt token:param iss::param expire: timedelta object, set the expire time of jwt:param kwargs::return:"""try:header = {'algorithm': 'HS256', 'type': 'JWT'}payload = {'iat': datetime.datetime.utcnow(),'iss': iss or 'startask',}# update public claim namespayload.update(**kwargs)# if set jwt expire time, update exp claimif expire:payload['exp'] = payload['iat'] + expire# gen jwt tokentoken = jwt.encode(payload, current_app.config['SECRET_KEY'], headers=header)return tokenexcept Exception:raise exceptions.Internal(message='无效token')def decode_token(token, verify_exp: bool = False):"""decode jwt token:param token::param verify_exp::return:"""try:tmp = {}payload = jwt.decode(token, current_app.config['SECRET_KEY'], options={'verify_exp': verify_exp})# get the public claim namesfor field in payload.keys():if field in registered_claims:continuetmp[field] = payload[field]return tmpexcept jwt.ExpiredSignatureError:raise exceptions.Unauthenticated(message='token已过期')except jwt.InvalidTokenError:raise exceptions.Unauthenticated(message='无效token')def _get_auth():if not has_app_context():raise RuntimeError('No application found. Either work inside a view function or push'' an application context.')if not hasattr(_app_ctx_stack.top, 'auth'):# get and validate auth filed in request headerauth_header = request.headers.get('Authorization')if not auth_header:raise exceptions.InvalidArgument(message="请求头错误")auth_attr = auth_header.split(' ')if not auth_attr or auth_attr[0] != 'JWT' or len(auth_attr) != 2:raise exceptions.InvalidArgument(message="token格式错误")payload = decode_token(auth_attr[1])# set auth data to app context_app_ctx_stack.top.auth = _AuthObject(**payload)return getattr(_app_ctx_stack.top, 'auth')# global variablecurrent_auth = LocalProxy(lambda: _get_auth())