快上网专注成都网站设计 成都网站制作 成都网站建设
成都网站建设公司服务热线:028-86922220

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

drf 认证、权限、频率限制以及其源码

内容概要

  • token认证小练习
  • 认证
  • 权限
  • 频率

内容详细

登录携带token认证小练习

models.py:

在穆棱等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供做网站、网站建设 网站设计制作按需定制,公司网站建设,企业网站建设,高端网站设计,全网整合营销推广,成都外贸网站制作,穆棱网站建设费用合理。

class User(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=32)


class UserToken(models.Model):
    user = models.OneToOneField(to='User', on_delete=models.CASCADE)
    token = models.CharField(max_length=64)

views.py:

from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
import uuid

class BookViewSet(ModelViewSet):
    queryset = Book.objects
    serializer_class = BookModelSerializer

    @action(methods=['GET', 'POST'], detail=False)
    def login(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user = models.User.objects.filter(username=username, password=password).first()
        if user:
            token = str(uuid.uuid4())
            models.UserToken.objects.update_or_create(user=user, defaults={'token': token})
            return Response({'code': 100, 'msg': '登录成功', 'token': token})
        return Response({'code': 101, 'msg': '用户名或密码错误'})

urls.py:

from rest_framework.routers import DefaultRouter, SimpleRouter

router = DefaultRouter()
router.register('books4', views.BookViewSet)

urlpatterns = [
    path('admin/', admin.site.urls),
]
urlpatterns += router.urls

开始学习之前,先搞清楚 drf 认证、权限、频率三大功能是在哪里实现的?

APIView 源码中实现,所有继承 APIView 的类都可以实现三大认证

APIView 的类的 as_view 返回 view 函数的内存地址,view运行后调用了 APIView中的 dispatch方法,三大认证在 self.initial(request, *args, **kwargs) 函数中调用

    def dispatch(self, request, *args, **kwargs):
        request = self.initialize_request(request, *args, **kwargs)
        try:
            self.initial(request, *args, **kwargs)
        except Exception as exc:
            response = self.handle_exception(exc)

APIView --> initial

    def initial(self, request, *args, **kwargs):
        self.perform_authentication(request)  # 认证
        self.check_permissions(request)  # 权限
        self.check_throttles(request)  # 频率

认证源码

APIView --> perform_authentication(request)

源码中只返回了登录用户或者匿名用户对象

该方法调用了 request.user ,也就是重写后的 request 属性中的 user,我们知道那里的 user有被调用触发的方法 和 被赋值触发的方法

    def perform_authentication(self, request):
        """
        `request.user` or `request.auth` is accessed.
        """
        request.user

转到重写 request 对象的Request类中:

Request --> user(property)

可以看到调用的是 Request类中的 _authenticate()

    @property
    def user(self):
        if not hasattr(self, '_user'):
            with wrap_attributeerrors():
                self._authenticate()
        return self._user

Request --> _authenticate 核心代码

该方法循环 Request类中的 authenticators列表(认证类列表)

调用认证类 authenticator 中的 authenticate(self) 方法,并放回元组user_auth_tuple,包含登录用户对象(user)和 auth

如果 user_auth_tuple 不为 None,则 request 对象便获取了user对象和 auth对象

  • 我们在自定义认证类时,就要重写 authenticate(self) 方法,并放回 user对象和 auth对象
    def _authenticate(self):
        for authenticator in self.authenticators:
            try:
                user_auth_tuple = authenticator.authenticate(self)  # 调用类中的authenticate方法
            except exceptions.APIException:
                self._not_authenticated()
                raise

            if user_auth_tuple is not None:
                self._authenticator = authenticator
                self.user, self.auth = user_auth_tuple
                return
        self._not_authenticated()

authenticators 怎么来的?

可以看出,authenticatorsRequest类实例化时传进来的参数,那么我们就得回到APIView类中的 initialize_request(self, request, *args, **kwargs) 方法,因为是在那里实例化了 Request

class Request:
    def __init__(self, request, parsers=None, authenticators=None,
                 negotiator=None, parser_context=None):
        self.authenticators = authenticators or ()

APIView --> initialize_request

可以看到调用了自己的 get_authenticators() 方法

    def initialize_request(self, request, *args, **kwargs):
        return Request(
            request,
            parsers=self.get_parsers(),
            authenticators=self.get_authenticators(),
            negotiator=self.get_content_negotiator(),
            parser_context=parser_context
        )

APIView --> get_authenticators()

这是个列表生成式,从视图函数类中的认证类列表 authentication_classes 中取出一个个认证类加括号实例化为对象,并存在类表中放回,那么get_authenticators() 方法放回的是一个认证类对象的列表

    def get_authenticators(self):
        """
        Instantiates and returns the list of authenticators that this view can use.
        """
        return [auth() for auth in self.authentication_classes]

绕来绕去,最终明白,我们可以在视图类中定义 authentication_classes 列表,存放认证类,在触发视图函数类时就会执行列表中认证类重写的 authenticate(self)方法,并放回登录的用户对象和auth

权限源码

APIView --> check_permissions(request)

get_permissions()是获取认证类对象列表

has_permission(request, self) 是我们自定义权限类时要改写的方法,返回True或False

    def check_permissions(self, request):
        for permission in self.get_permissions():
            if not permission.has_permission(request, self):
                self.permission_denied(
                    request,
                    message=getattr(permission, 'message', None),
                    code=getattr(permission, 'code', None)
                )

频率源码

APIView --> check_throttles(request)

    def check_throttles(self, request):
        throttle_durations = []
        for throttle in self.get_throttles():
            if not throttle.allow_request(request, self):
                throttle_durations.append(throttle.wait())
        if throttle_durations:
            durations = [
                duration for duration in throttle_durations
                if duration is not None
            ]
            duration = max(durations, default=None)
            self.throttled(request, duration)

在调用自定义认证类时视图类中需要提前定义的参数

class APIView(View):

    # The following policies may be set at either globally, or per-view.
    renderer_classes = api_settings.DEFAULT_RENDERER_CLASSES
    parser_classes = api_settings.DEFAULT_PARSER_CLASSES
    authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES  # 认证类列表
    throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES              # 频率限制类
    permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES          # 权限类
    content_negotiation_class = api_settings.DEFAULT_CONTENT_NEGOTIATION_CLASS
    metadata_class = api_settings.DEFAULT_METADATA_CLASS
    versioning_class = api_settings.DEFAULT_VERSIONING_CLASS

三个认证类在 drf 配置文件中的配置

DEFAULTS = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.BasicAuthentication'
    ],
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.AllowAny',
    ],
    'DEFAULT_THROTTLE_CLASSES': [],
}

先写一个登录视图类

校验过登录用户数据之后把token信息存到usertoken表中

class LoginViewSet(ModelViewSet):
    queryset = models.User.objects
    serializer_class = UserSerializer

    @action(methods=['POST'], detail=False)  # api 要多写一个 login/
    def login(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user_obj = models.User.objects.filter(username=username, password=password).first()
        if not user_obj:
            return Response({'code': 1001, 'msg': '该用户不存在'})
        uuid_str = str(uuid.uuid4())
        # 过滤条件中user=user的对象,而不是pk
        models.UserToken.objects.update_or_create(user=user_obj, defaults={'token': uuid_str})  # 别漏写default后的字典
        return Response({'code': 1000, 'msg': '登录成功', 'token': uuid_str})

认证

1、新建一个认证模块,写一个认证类继承 BaseAuthentication(多态),重写authenticate方法,在方法中校验是否登录,是则返回两个值(request.userrequest.auth

from rest_framework.authentication import BaseAuthentication


# 登录认证
class LoginAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.data.get('token')
        is_login = models.UserToken.objects.filter(token=token).first()
        if not is_login:
            raise AuthenticationFailed('您没有登录')
        return is_login.user, token

2、在视图类中书写 authentication_classes,存放导入进来的认证类

  • 局部配置
from app01.authentications import LoginAuth, ChangePermission

class BookViewSet(ViewSetMixin, ListAPIView):
    queryset = models.Book.objects
    serializer_class = BookModelSerializer
    authentication_classes = [LoginAuth, ]
  • 局部禁用
class BookViewSet(ViewSetMixin, ListAPIView):
    queryset = models.Book.objects
    serializer_class = BookModelSerializer
    authentication_classes = []

3、全局配置(在项目文件夹下的 settings.py 文件中配置)

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'app01.authentications.LoginAuth',
    ],
}

权限

1、在认证模块中,写一个权限类继承 BasePermission(多态),重写has_permission方法,在方法中校验该用户的用户类型,是否有权限执行该视图类,返回 True 或者 False, 还可以自定义报错信息 self.message

from rest_framework.permissions import BasePermission

class ChangePermission(BasePermission):
    def has_permission(self, request, view):
        self.message = '您是%s,没有修改权限' % request.user.get_user_type_display()  # 根据源码,可以修改权限的提示信息
        user_type = request.user.user_type
        if user_type != 1:
            return False
        return True

2、在视图类中书写 permission_classes,存放导入进来的权限类

  • 局部配置

注意:在定义权限认证类前需要先定义登录认证类,否则没有用户对象 requset.user 来做权限认证

from app01.authentications import LoginAuth, ChangePermission

class BookViewChange(ViewSetMixin, CreateAPIView, RetrieveUpdateDestroyAPIView):
    queryset = models.Book.objects
    serializer_class = BookModelSerializer
    authentication_classes = [LoginAuth, ]
    permission_classes = [ChangePermission, ]
  • 局部禁用
class BookViewChange(ViewSetMixin, CreateAPIView, RetrieveUpdateDestroyAPIView):
    queryset = models.Book.objects
    serializer_class = BookModelSerializer
    authentication_classes = [LoginAuth, ]
    permission_classes = []

3、全局配置(在项目文件夹下的 settings.py 文件中配置)

REST_FRAMEWORK = {
    'DEFAULT_PERMISSION_CLASSES': [
        'app01.authentications.ChangePermission',
    ],
}

频率

1、在认证模块中,写一个频率限制类继承 SimpleRateThrottle(多态),重写类属性 scopeget_cache_key方法,该方法返回什么,就以什么为限制,scope配置文件中要用

from rest_framework.throttling import SimpleRateThrottle

# 频率限制
class IPThrottling(SimpleRateThrottle):
    scope = 'minute_3'

    # 返回什么就以什么做限制
    def get_cache_key(self, request, view):
        # return request.META.get('REMOTE_ADDR')  # 客户端ip地址
        return request.user.id  # 用户id

2、与其它两个认证不同,他需要在项目配置文件中配置:

REST_FRAMEWORK = [
    'DEFAULT_THROTTLE_RATES': {
        'minute_3': '3/m'  # minute_3是scope的字符串,一分钟访问3次
        'minute_5':'5/m'
    }
]

2、在视图类中书写 throttle_classes,存放导入进来的权限类

  • 局部配置
class BookViewSet(ViewSetMixin, ListAPIView):
    queryset = models.Book.objects
    serializer_class = BookModelSerializer
    authentication_classes = [LoginAuth, ]
    throttle_classes = [IPThrottling, ]
  • 局部禁用
class BookViewSet(ViewSetMixin, ListAPIView):
    queryset = models.Book.objects
    serializer_class = BookModelSerializer
    authentication_classes = [LoginAuth, ]
    throttle_classes = []

3、全局配置(在项目文件夹下的 settings.py 文件中配置)

REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_CLASSES': [
        'app01.authentications.IPThrottling'
    ],
    'DEFAULT_THROTTLE_RATES': {
        'minute_3': '3/m',  # minute_3是scope的字符串,一分钟访问3次
        # 'minute_5': '5/m'
    }
}

文章题目:drf 认证、权限、频率限制以及其源码
文章路径:http://6mz.cn/article/dsoghsg.html

其他资讯