activity.py 9.05 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

18
from __future__ import absolute_import, unicode_literals
Őry Máté committed
19 20 21
from contextlib import contextmanager
from logging import getLogger

22
from celery.signals import worker_ready
Kálmán Viktor committed
23 24
from celery.contrib.abortable import AbortableAsyncResult

25
from django.core.urlresolvers import reverse
26
from django.db.models import CharField, ForeignKey
Őry Máté committed
27
from django.utils import timezone
28
from django.utils.translation import ugettext_lazy as _, ugettext_noop
Őry Máté committed
29

30
from common.models import (
31 32
    ActivityModel, activitycontextimpl, create_readable, join_activity_code,
    split_activity_code
33 34
)

Kálmán Viktor committed
35 36
from manager.mancelery import celery

37

Őry Máté committed
38 39 40
logger = getLogger(__name__)


41 42 43 44 45 46 47 48 49 50 51 52
class ActivityInProgressError(Exception):

        def __init__(self, activity, message=None):
            if message is None:
                message = ("Another activity is currently in progress: '%s'."
                           % activity.activity_code)

            Exception.__init__(self, message)

            self.activity = activity


Őry Máté committed
53
class InstanceActivity(ActivityModel):
54
    ACTIVITY_CODE_BASE = join_activity_code('vm', 'Instance')
Őry Máté committed
55 56 57
    instance = ForeignKey('Instance', related_name='activity_log',
                          help_text=_('Instance this activity works on.'),
                          verbose_name=_('instance'))
58
    resultant_state = CharField(blank=True, max_length=20, null=True)
Őry Máté committed
59 60 61 62

    class Meta:
        app_label = 'vm'
        db_table = 'vm_instanceactivity'
63
        ordering = ['-finished', '-started', 'instance', '-id']
Őry Máté committed
64 65 66 67 68 69 70 71 72 73

    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.instance,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.instance)

Dudás Ádám committed
74 75
    def abort(self):
        AbortableAsyncResult(self.task_uuid, backend=celery.backend).abort()
76

Őry Máté committed
77
    @classmethod
78 79
    def create(cls, code_suffix, instance, task_uuid=None, user=None,
               concurrency_check=True):
80 81
        # Check for concurrent activities
        active_activities = instance.activity_log.filter(finished__isnull=True)
82
        if concurrency_check and active_activities.exists():
83 84
            raise ActivityInProgressError(active_activities[0])

85 86 87 88
        activity_code = join_activity_code(cls.ACTIVITY_CODE_BASE, code_suffix)
        act = cls(activity_code=activity_code, instance=instance, parent=None,
                  resultant_state=None, started=timezone.now(),
                  task_uuid=task_uuid, user=user)
Őry Máté committed
89 90 91
        act.save()
        return act

92
    def create_sub(self, code_suffix, task_uuid=None, concurrency_check=True):
93 94
        # Check for concurrent activities
        active_children = self.children.filter(finished__isnull=True)
95
        if concurrency_check and active_children.exists():
96 97
            raise ActivityInProgressError(active_children[0])

Őry Máté committed
98
        act = InstanceActivity(
99
            activity_code=join_activity_code(self.activity_code, code_suffix),
100 101
            instance=self.instance, parent=self, resultant_state=None,
            started=timezone.now(), task_uuid=task_uuid, user=self.user)
Őry Máté committed
102 103 104
        act.save()
        return act

Dudás Ádám committed
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
    def get_absolute_url(self):
        return reverse('dashboard.views.vm-activity', args=[self.pk])

    def get_readable_name(self):
        activity_code_last_suffix = split_activity_code(self.activity_code)[-1]
        return activity_code_last_suffix.replace('_', ' ').capitalize()

    def get_status_id(self):
        if self.succeeded is None:
            return 'wait'
        elif self.succeeded:
            return 'success'
        else:
            return 'failed'

120 121 122 123 124 125 126 127
    def has_percentage(self):
        op = self.instance.get_operation_from_activity_code(self.activity_code)
        return (self.task_uuid and op and op.has_percentage
                and not self.finished)

    def get_percentage(self):
        """Returns the percentage of the running operation if available.
        """
128 129 130 131 132
        result = celery.AsyncResult(id=self.task_uuid)
        if self.has_percentage() and result.info is not None:
            return result.info.get("percent")
        else:
            return 0
133

Dudás Ádám committed
134 135 136 137 138
    @property
    def is_abortable(self):
        """Can the activity be aborted?

        :returns: True if the activity can be aborted; otherwise, False.
139
        """
Dudás Ádám committed
140 141
        op = self.instance.get_operation_from_activity_code(self.activity_code)
        return self.task_uuid and op and op.abortable and not self.finished
Őry Máté committed
142

143 144 145 146 147 148 149
    def is_abortable_for(self, user):
        """Can the given user abort the activity?
        """

        return self.is_abortable and (
            user.is_superuser or user in (self.instance.owner, self.user))

150 151 152 153 154 155 156 157 158
    @property
    def is_aborted(self):
        """Has the activity been aborted?

        :returns: True if the activity has been aborted; otherwise, False.
        """
        return self.task_uuid and AbortableAsyncResult(self.task_uuid
                                                       ).is_aborted()

159 160 161 162 163
    def save(self, *args, **kwargs):
        ret = super(InstanceActivity, self).save(*args, **kwargs)
        self.instance._update_status()
        return ret

Őry Máté committed
164
    @contextmanager
165
    def sub_activity(self, code_suffix, on_abort=None, on_commit=None,
166
                     task_uuid=None, concurrency_check=True):
167 168
        """Create a transactional context for a nested instance activity.
        """
169
        act = self.create_sub(code_suffix, task_uuid, concurrency_check)
170
        return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
Őry Máté committed
171 172 173


@contextmanager
174
def instance_activity(code_suffix, instance, on_abort=None, on_commit=None,
175
                      task_uuid=None, user=None, concurrency_check=True):
176 177
    """Create a transactional context for an instance activity.
    """
178 179
    act = InstanceActivity.create(code_suffix, instance, task_uuid, user,
                                  concurrency_check)
180
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
181 182 183


class NodeActivity(ActivityModel):
184
    ACTIVITY_CODE_BASE = join_activity_code('vm', 'Node')
185 186 187 188 189 190 191 192
    node = ForeignKey('Node', related_name='activity_log',
                      help_text=_('Node this activity works on.'),
                      verbose_name=_('node'))

    class Meta:
        app_label = 'vm'
        db_table = 'vm_nodeactivity'

193 194 195 196 197 198 199 200 201 202 203 204
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.node,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.node)

    def get_readable_name(self):
        return self.activity_code.split('.')[-1].replace('_', ' ').capitalize()

205 206
    @classmethod
    def create(cls, code_suffix, node, task_uuid=None, user=None):
207 208 209
        activity_code = join_activity_code(cls.ACTIVITY_CODE_BASE, code_suffix)
        act = cls(activity_code=activity_code, node=node, parent=None,
                  started=timezone.now(), task_uuid=task_uuid, user=user)
210 211 212 213 214
        act.save()
        return act

    def create_sub(self, code_suffix, task_uuid=None):
        act = NodeActivity(
215
            activity_code=join_activity_code(self.activity_code, code_suffix),
216 217 218 219 220 221 222 223 224 225 226 227 228
            node=self.node, parent=self, started=timezone.now(),
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act

    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
        return activitycontextimpl(act)


@contextmanager
def node_activity(code_suffix, node, task_uuid=None, user=None):
229
    act = NodeActivity.create(code_suffix, node, task_uuid, user)
230
    return activitycontextimpl(act)
231 232 233 234 235


@worker_ready.connect()
def cleanup(conf=None, **kwargs):
    # TODO check if other manager workers are running
236 237 238
    msg_txt = ugettext_noop("Manager is restarted, activity is cleaned up. "
                            "You can try again now.")
    message = create_readable(msg_txt, msg_txt)
239
    for i in InstanceActivity.objects.filter(finished__isnull=True):
240
        i.finish(False, result=message)
241
        logger.error('Forced finishing stale activity %s', i)
242
    for i in NodeActivity.objects.filter(finished__isnull=True):
243
        i.finish(False, result=message)
244
        logger.error('Forced finishing stale activity %s', i)