Source code for audit_tools.audit.signals

from __future__ import unicode_literals

import datetime
import logging

from django.db.models.signals import pre_save, post_save, pre_delete

from audit_tools.audit.cache import cache
from audit_tools.audit.decorators import CheckActivate
from audit_tools.audit import settings
from audit_tools.audit.utils import extract_process_data, dynamic_import, serialize_model_instance


_CACHE = {}

logger = logging.getLogger(__name__)


@CheckActivate
def _pre_save(sender, **kwargs):
    try:
        i = kwargs['instance']
        if i.pk:
            try:
                original_instance = sender.objects.get(pk=i.pk)
                _CACHE[id(i)] = serialize_model_instance(original_instance)
            except:
                # New object.
                pass
    except Exception:
        logger.exception("<Pre Save>")


@CheckActivate
def _post_save(sender, **kwargs):
    try:
        from audit_tools.audit.models import ACTIONS
        from audit_tools.audit.tasks import save_model_action

        i = kwargs['instance']

        # Model
        model = _extract_model_data(i)

        # Old and new content
        old_data = {}
        if id(i) in _CACHE:
            old_data = _CACHE[id(i)]
            del _CACHE[id(i)]
        new_data = serialize_model_instance(i)

        content = _extract_content_data(old_data, new_data)

        # Action
        if not old_data and new_data:
            action = ACTIONS.CREATE
        else:
            action = ACTIONS.UPDATE

        # Instance
        instance = _extract_instance_data(i)

        # Timestamp
        timestamp = datetime.datetime.now()

        model_action = {
            'model': model,
            'action': action,
            'content': content,
            'instance': instance,
            'timestamp': timestamp,
        }

        # Process
        process = extract_process_data()

        try:
            # Get process
            process = cache.get_process(process)
            access = cache.get_last_access()

            if not settings.RUN_ASYNC:
                save_model_action(model_action, access, process)
            else:
                save_model_action.apply_async((model_action, access, process))
            logger.info("<%s> Model:%s ID:%s", action.capitalize(), model['full_name'], instance['id'])
        except Exception:
            logger.exception("<%s> Model:%s ID:%s", action.capitalize(), model['full_name'], instance['id'])
    except Exception:
        logger.exception("<Post Save>")


@CheckActivate
def _pre_delete(sender, **kwargs):
    try:
        from audit_tools.audit.models import ACTIONS
        from audit_tools.audit.tasks import save_model_action

        i = kwargs['instance']

        model = _extract_model_data(i)

        # Old and new content
        old_data = serialize_model_instance(i)
        new_data = {}
        content = _extract_content_data(old_data, new_data)

        # Action
        action = ACTIONS.DELETE

        instance = _extract_instance_data(i)

        # Timestamp
        timestamp = datetime.datetime.now()

        model_action = {
            'model': model,
            'action': action,
            'content': content,
            'instance': instance,
            'timestamp': timestamp,
        }

        # Process
        process = extract_process_data()

        try:
            # Get process
            process = cache.get_process(process)
            access = cache.get_last_access()

            if not settings.RUN_ASYNC:
                save_model_action(model_action, access, process)
            else:
                save_model_action.apply_async((model_action, access, process))

            logger.info("<%s> Model:%s ID:%s", action.capitalize(), model['full_name'], instance['id'])
        except Exception:
            logger.exception("<%s> Model:%s ID:%s Error:%s", action.capitalize(), model['full_name'], instance['id'])
    except Exception:
        logger.exception("<Pre Delete>")


[docs]def register(model): """Register a model to the audit code. :param model: Model to register. :type model: object """ try: pre_save.connect(_pre_save, sender=model, dispatch_uid=str(model)) post_save.connect(_post_save, sender=model, dispatch_uid=str(model)) pre_delete.connect(_pre_delete, sender=model, dispatch_uid=str(model)) except Exception as e: logger.error("<Register> %s", e.message)
[docs]def unregister(model): """Unregister a model to the audit code. :param model: Model to unregister. :type model: object """ try: pre_save.disconnect(_pre_save, sender=model, dispatch_uid=str(model)) post_save.disconnect(_post_save, sender=model, dispatch_uid=str(model)) pre_delete.disconnect(_pre_delete, sender=model, dispatch_uid=str(model)) except Exception as e: logger.error("<Unregister> %s", e.message)
[docs]def register_models(): """Register all models listed in :const:`settings.LOGGED_MODELS`. """ for model in settings.LOGGED_MODELS: m = dynamic_import(model) register(m)
[docs]def unregister_models(): """Unregister all models listed in :const:`settings.LOGGED_MODELS`. """ for model in settings.LOGGED_MODELS: m = dynamic_import(model) unregister(m)
def _extract_model_data(instance): """Extract model data from a instance. :param instance: Model instance. :type instance: object :return: Extracted data. :rtype: dict """ model_module = instance.__module__ model_name = instance.__class__.__name__ return { 'app': model_module.split('.', 1)[0], 'full_name': model_module + '.' + model_name, 'name': model_name, } def _extract_content_data(old_object=None, new_object=None): """Extract content data from object's state change. :param old_object: Object serialization in his previous state. :type old_object: dict :param new_object: Object serialization in his current state. :type new_object: dict :return: Extracted data. :rtype: dict """ if old_object and new_object: changes = { k1: {'old': v1, 'new': v2} for k1, v1 in old_object.iteritems() for k2, v2 in new_object.iteritems() if k1 == k2 and v1 != v2 } elif not old_object and new_object: changes = {k: {'old': None, 'new': v} for k, v in new_object.iteritems()} elif not new_object and old_object: changes = {k: {'old': v, 'new': None} for k, v in old_object.iteritems()} else: changes = {} return { 'old': old_object, 'new': new_object, 'changes': changes } def _extract_instance_data(instance): try: id_ = unicode(instance.pk) except: id_ = '' try: description = unicode(str(instance), errors='ignore') except: description = '' return { 'id': id_, 'description': description, }