from __future__ import unicode_literals
import re
from mongoengine import QuerySet
def _check_args(required, incompatible, kwargs, cmp_func=lambda x, y: x.startswith(y)):
"""Check for correct arguments.
:param required: required keyword argument.
:type required: unicode
:param incompatible: tuple of incompatible args.
:type incompatible: tuple(unicode)
:param kwargs: kwargs
:type kwargs: dict
:param cmp_func: function that returns true if a key is valid.
:type cmp_func: function
:return: key, value
"""
# Required arguments
try:
key, value = [(k, v) for k, v in kwargs.iteritems() if cmp_func(k, required)][0]
except IndexError:
raise AttributeError('Argument {} without modifiers required.'.format(required))
# Incompatible arguments
if len([x for x in kwargs.keys() if x in incompatible]) > 0:
raise AttributeError('You cannot call this function with {}. Use only fview argument for that.'.format(
', '.join(incompatible)))
return key, value
[docs]class ModelActionQuerySet(QuerySet):
"""Custom manager for ModelAction.
"""
def _prepare_kwargs_by_model(self, kwargs):
required_arg = 'klass'
incompatible_args = ('model__full_name', 'model__name', 'model__app')
key, klass = _check_args(required_arg, incompatible_args, kwargs, cmp_func=lambda x, y: x == y)
# Clean kwargs
del kwargs[key]
klass_name = klass.__name__
klass_module = klass.__module__.split('.', 1)[0]
kwargs['model__name'] = klass_name
kwargs['model__app'] = klass_module
return kwargs
[docs] def filter_by_model(self, *args, **kwargs):
"""Filtered queryset by model.
:param kwargs: klass kwarg required.
:return: QuerySet
"""
kwargs = self._prepare_kwargs_by_model(kwargs)
print(self.filter)
return self.filter(*args, **kwargs)
[docs] def get_by_model(self, *args, **kwargs):
"""Get object by model.
:param kwargs: klass kwarg required.
:return: ModelAction
"""
kwargs = self._prepare_kwargs_by_model(kwargs)
return self.get(*args, **kwargs)
def _prepare_kwargs_by_model_list(self, kwargs):
required_arg = 'klass'
incompatible_args = ('model__full_name', 'model__name', 'model__app')
key, klass = _check_args(required_arg, incompatible_args, kwargs, cmp_func=lambda x, y: x == y)
# Clean kwargs
del kwargs[key]
r = r''
for k in klass:
klass_name = k.__name__
klass_module = k.__module__
klass_full_name = klass_module + '.' + klass_name
r += klass_full_name + '|'
r = r[:-1]
kwargs['model__full_name'] = re.compile(r)
return kwargs
[docs] def filter_by_model_list(self, *args, **kwargs):
"""Filtered queryset by model list.
:param kwargs: klass kwarg required.
:return: QuerySet
"""
kwargs = self._prepare_kwargs_by_model_list(kwargs)
return self.filter(*args, **kwargs)
[docs] def get_by_model_list(self, *args, **kwargs):
"""Get object by model list.
:param kwargs: klass kwarg required.
:return: ModelAction
"""
kwargs = self._prepare_kwargs_by_model_list(kwargs)
return self.get(*args, **kwargs)
def _prepare_kwargs_by_instance(self, kwargs):
required_arg = 'obj'
incompatible_args = ('instance__id', 'model__name', 'model__app', 'model__full_name')
key, obj = _check_args(required_arg, incompatible_args, kwargs, cmp_func=lambda x, y: x == y)
# Clean kwargs
del kwargs[key]
obj_id = str(obj.pk)
kwargs['instance__id'] = obj_id
kwargs['model__name'] = obj.__class__.__name__
kwargs['model__app'] = obj.__module__.split('.')[0]
return kwargs
[docs] def filter_by_instance(self, *args, **kwargs):
"""Filtered queryset by object instance.
:param kwargs: obj kwarg required.
:return: QuerySet
"""
kwargs = self._prepare_kwargs_by_instance(kwargs)
return self.filter(*args, **kwargs)
[docs] def get_by_instance(self, *args, **kwargs):
"""Get object by instance.
:param kwargs: obj kwarg required.
:return: ModelAction
"""
kwargs = self._prepare_kwargs_by_instance(kwargs)
return self.get(*args, **kwargs)
[docs]class AccessQuerySet(QuerySet):
"""Custom manager for Access.
"""
def _prepare_kwargs_by_view(self, kwargs):
required_arg = 'fview'
incompatible_args = ('view__name', 'view__full_name', 'view__app')
key, fview = _check_args(required_arg, incompatible_args, kwargs, cmp_func=lambda x, y: x == y)
# Clean kwargs
del kwargs[key]
view_name = fview.__name__
view_module = fview.__module__
view_full_name = view_module + '.' + view_name
kwargs['view__full_name'] = view_full_name
return kwargs
[docs] def filter_by_view(self, *args, **kwargs):
"""Filtered queryset by view object or function.
:param kwargs: fview kwarg required.
:return: QuerySet
"""
kwargs = self._prepare_kwargs_by_view(kwargs)
return self.filter(*args, **kwargs)
[docs] def get_by_view(self, *args, **kwargs):
"""Get object by view.
:param kwargs: fview kwarg required.
:return: Access
"""
kwargs = self._prepare_kwargs_by_view(kwargs)
return self.get(*args, **kwargs)
def _prepare_kwargs_by_url(self, kwargs):
required_arg = 'url'
incompatible_args = ('request__path', )
key, url = _check_args(required_arg, incompatible_args, kwargs)
# Clean kwargs
del kwargs[key]
# Get modifier
splitted_key = key.rsplit('__', 1)
if len(splitted_key) > 1:
modifier = splitted_key[1]
else:
modifier = ''
if modifier == 'regex':
url = re.compile(url)
key = 'request__path'
elif modifier:
key = 'request__path__' + modifier
else:
key = 'request__path'
kwargs[key] = url
return kwargs
[docs] def filter_by_url(self, *args, **kwargs):
"""Filtered queryset by url. Url accept all modifiers, including __regex.
:param kwargs: url kwarg required.
:return: QuerySet
"""
kwargs = self._prepare_kwargs_by_url(kwargs)
return self.filter(*args, **kwargs)
[docs] def get_by_url(self, *args, **kwargs):
"""Get object by url. Url accept all modifiers, including __regex.
:param kwargs: url kwarg required.
:return: Access
"""
kwargs = self._prepare_kwargs_by_url(kwargs)
return self.get(*args, **kwargs)
def _prepare_kwargs_by_exception(self, kwargs):
required_arg = 'exc'
incompatible_args = ('exception__type', )
key, exc = _check_args(required_arg, incompatible_args, kwargs, cmp_func=lambda x, y: x == y)
# Clean kwargs
del kwargs[key]
exc_name = exc.__name__
kwargs['exception__type'] = exc_name
return kwargs
[docs] def filter_by_exception(self, *args, **kwargs):
"""Filter object by exception.
:param kwargs: exc kwarg required.
:return: QuerySet
"""
kwargs = self._prepare_kwargs_by_exception(kwargs)
return self.filter(*args, **kwargs)
[docs] def get_by_exception(self, *args, **kwargs):
"""Get object by exception.
:param kwargs: exc kwarg required.
:return: Access
"""
kwargs = self._prepare_kwargs_by_exception(kwargs)
return self.get(*args, **kwargs)