activity.py 10.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

18
from __future__ import absolute_import, unicode_literals
Őry Máté committed
19 20
from contextlib import contextmanager
from logging import getLogger
21
from warnings import warn
Őry Máté committed
22

Kálmán Viktor committed
23 24
from celery.contrib.abortable import AbortableAsyncResult

25
from django.core.urlresolvers import reverse
26
from django.db.models import CharField, ForeignKey, BooleanField
Őry Máté committed
27
from django.utils import timezone
28
from django.utils.translation import ugettext_lazy as _, ugettext_noop
Őry Máté committed
29

30
from common.models import (
31
    ActivityModel, activitycontextimpl, create_readable, join_activity_code,
32
    HumanReadableObject, HumanReadableException,
33 34
)

Kálmán Viktor committed
35 36
from manager.mancelery import celery

37

Őry Máté committed
38 39 40
logger = getLogger(__name__)


41
class ActivityInProgressError(HumanReadableException):
42

43 44 45 46 47 48 49 50 51
    @classmethod
    def create(cls, activity):
        obj = super(ActivityInProgressError, cls).create(
            ugettext_noop("%(activity)s activity is currently in progress."),
            ugettext_noop("%(activity)s (%(pk)s) activity is currently "
                          "in progress."),
            activity=activity.readable_name, pk=activity.pk)
        obj.activity = activity
        return obj
52 53


54 55 56 57 58 59 60 61 62 63 64 65
def _normalize_readable_name(name, default=None):
    if name is None:
        warn("Set readable_name to a HumanReadableObject",
             DeprecationWarning, 3)
        name = default.replace(".", " ")

    if not isinstance(name, HumanReadableObject):
        name = create_readable(name)

    return name


Őry Máté committed
66
class InstanceActivity(ActivityModel):
67
    ACTIVITY_CODE_BASE = join_activity_code('vm', 'Instance')
Őry Máté committed
68 69 70
    instance = ForeignKey('Instance', related_name='activity_log',
                          help_text=_('Instance this activity works on.'),
                          verbose_name=_('instance'))
71
    resultant_state = CharField(blank=True, max_length=20, null=True)
72 73
    interruptible = BooleanField(default=False, help_text=_(
        'Other activities can interrupt this one.'))
Őry Máté committed
74 75 76 77

    class Meta:
        app_label = 'vm'
        db_table = 'vm_instanceactivity'
78
        ordering = ['-finished', '-started', 'instance', '-id']
Őry Máté committed
79 80 81 82 83 84 85 86 87 88

    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.instance,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.instance)

Dudás Ádám committed
89 90
    def abort(self):
        AbortableAsyncResult(self.task_uuid, backend=celery.backend).abort()
91

Őry Máté committed
92
    @classmethod
93
    def create(cls, code_suffix, instance, task_uuid=None, user=None,
94
               concurrency_check=True, readable_name=None,
95
               resultant_state=None, interruptible=False):
96 97

        readable_name = _normalize_readable_name(readable_name, code_suffix)
98 99
        # Check for concurrent activities
        active_activities = instance.activity_log.filter(finished__isnull=True)
100
        if concurrency_check and active_activities.exists():
101 102 103 104 105 106
            for i in active_activities:
                if i.interruptible:
                    i.finish(False, result=ugettext_noop(
                        "Interrupted by other activity."))
                else:
                    raise ActivityInProgressError.create(i)
107

108
        activity_code = cls.construct_activity_code(code_suffix)
109
        act = cls(activity_code=activity_code, instance=instance, parent=None,
110
                  resultant_state=resultant_state, started=timezone.now(),
111
                  readable_name_data=readable_name.to_dict(),
112
                  task_uuid=task_uuid, user=user, interruptible=interruptible)
Őry Máté committed
113 114 115
        act.save()
        return act

116
    def create_sub(self, code_suffix, task_uuid=None, concurrency_check=True,
117 118
                   readable_name=None, resultant_state=None,
                   interruptible=False):
119 120

        readable_name = _normalize_readable_name(readable_name, code_suffix)
121 122
        # Check for concurrent activities
        active_children = self.children.filter(finished__isnull=True)
123
        if concurrency_check and active_children.exists():
124
            raise ActivityInProgressError.create(active_children[0])
125

Őry Máté committed
126
        act = InstanceActivity(
127
            activity_code=join_activity_code(self.activity_code, code_suffix),
128
            instance=self.instance, parent=self,
129
            resultant_state=resultant_state, interruptible=interruptible,
130 131
            readable_name_data=readable_name.to_dict(), started=timezone.now(),
            task_uuid=task_uuid, user=self.user)
Őry Máté committed
132 133 134
        act.save()
        return act

Dudás Ádám committed
135 136 137 138 139 140 141 142 143 144 145
    def get_absolute_url(self):
        return reverse('dashboard.views.vm-activity', args=[self.pk])

    def get_status_id(self):
        if self.succeeded is None:
            return 'wait'
        elif self.succeeded:
            return 'success'
        else:
            return 'failed'

146 147 148 149 150 151 152 153
    def has_percentage(self):
        op = self.instance.get_operation_from_activity_code(self.activity_code)
        return (self.task_uuid and op and op.has_percentage
                and not self.finished)

    def get_percentage(self):
        """Returns the percentage of the running operation if available.
        """
154 155 156 157 158
        result = celery.AsyncResult(id=self.task_uuid)
        if self.has_percentage() and result.info is not None:
            return result.info.get("percent")
        else:
            return 0
159

Dudás Ádám committed
160 161 162 163 164
    @property
    def is_abortable(self):
        """Can the activity be aborted?

        :returns: True if the activity can be aborted; otherwise, False.
165
        """
Dudás Ádám committed
166 167
        op = self.instance.get_operation_from_activity_code(self.activity_code)
        return self.task_uuid and op and op.abortable and not self.finished
Őry Máté committed
168

169 170 171 172 173 174 175
    def is_abortable_for(self, user):
        """Can the given user abort the activity?
        """

        return self.is_abortable and (
            user.is_superuser or user in (self.instance.owner, self.user))

176 177 178 179 180 181 182 183 184
    @property
    def is_aborted(self):
        """Has the activity been aborted?

        :returns: True if the activity has been aborted; otherwise, False.
        """
        return self.task_uuid and AbortableAsyncResult(self.task_uuid
                                                       ).is_aborted()

185 186 187 188 189
    def save(self, *args, **kwargs):
        ret = super(InstanceActivity, self).save(*args, **kwargs)
        self.instance._update_status()
        return ret

Őry Máté committed
190
    @contextmanager
191
    def sub_activity(self, code_suffix, on_abort=None, on_commit=None,
192
                     readable_name=None, task_uuid=None,
193
                     concurrency_check=True, interruptible=False):
194 195
        """Create a transactional context for a nested instance activity.
        """
196 197
        if not readable_name:
            warn("Set readable_name", stacklevel=3)
198
        act = self.create_sub(code_suffix, task_uuid, concurrency_check,
199 200
                              readable_name=readable_name,
                              interruptible=interruptible)
201
        return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
Őry Máté committed
202

203 204 205 206
    def get_operation(self):
        return self.instance.get_operation_from_activity_code(
            self.activity_code)

Őry Máté committed
207

208
class NodeActivity(ActivityModel):
209
    ACTIVITY_CODE_BASE = join_activity_code('vm', 'Node')
210 211 212 213 214 215 216 217
    node = ForeignKey('Node', related_name='activity_log',
                      help_text=_('Node this activity works on.'),
                      verbose_name=_('node'))

    class Meta:
        app_label = 'vm'
        db_table = 'vm_nodeactivity'

218 219 220 221 222 223 224 225 226
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.node,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.node)

227
    @classmethod
228 229 230 231
    def create(cls, code_suffix, node, task_uuid=None, user=None,
               readable_name=None):

        readable_name = _normalize_readable_name(readable_name, code_suffix)
232 233
        activity_code = join_activity_code(cls.ACTIVITY_CODE_BASE, code_suffix)
        act = cls(activity_code=activity_code, node=node, parent=None,
234
                  readable_name_data=readable_name.to_dict(),
235
                  started=timezone.now(), task_uuid=task_uuid, user=user)
236 237 238
        act.save()
        return act

239 240 241
    def create_sub(self, code_suffix, task_uuid=None, readable_name=None):

        readable_name = _normalize_readable_name(readable_name, code_suffix)
242
        act = NodeActivity(
243
            activity_code=join_activity_code(self.activity_code, code_suffix),
244
            node=self.node, parent=self, started=timezone.now(),
245 246
            readable_name_data=readable_name.to_dict(), task_uuid=task_uuid,
            user=self.user)
247 248 249 250
        act.save()
        return act

    @contextmanager
251 252 253
    def sub_activity(self, code_suffix, task_uuid=None, readable_name=None):
        act = self.create_sub(code_suffix, task_uuid,
                              readable_name=readable_name)
254 255 256 257
        return activitycontextimpl(act)


@contextmanager
258 259 260 261
def node_activity(code_suffix, node, task_uuid=None, user=None,
                  readable_name=None):
    act = NodeActivity.create(code_suffix, node, task_uuid, user,
                              readable_name=readable_name)
262
    return activitycontextimpl(act)
263 264 265 266


def cleanup(conf=None, **kwargs):
    # TODO check if other manager workers are running
267 268 269
    msg_txt = ugettext_noop("Manager is restarted, activity is cleaned up. "
                            "You can try again now.")
    message = create_readable(msg_txt, msg_txt)
270
    queue_name = kwargs.get('queue_name', None)
271
    for i in InstanceActivity.objects.filter(finished__isnull=True):
272 273 274 275
        op = i.get_operation()
        if op and op.async_queue == queue_name:
            i.finish(False, result=message)
            logger.error('Forced finishing stale activity %s', i)
276
    for i in NodeActivity.objects.filter(finished__isnull=True):
277
        i.finish(False, result=message)
278
        logger.error('Forced finishing stale activity %s', i)