activity.py 11.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

18
from __future__ import absolute_import, unicode_literals
Őry Máté committed
19 20
from contextlib import contextmanager
from logging import getLogger
21
from warnings import warn
Őry Máté committed
22

23
from celery.signals import worker_ready
Kálmán Viktor committed
24 25
from celery.contrib.abortable import AbortableAsyncResult

26
from django.core.urlresolvers import reverse
27
from django.db.models import CharField, ForeignKey, BooleanField
Őry Máté committed
28
from django.utils import timezone
29
from django.utils.translation import ugettext_lazy as _, ugettext_noop
Őry Máté committed
30

31
from common.models import (
32
    ActivityModel, activitycontextimpl, create_readable, join_activity_code,
33
    HumanReadableObject, HumanReadableException,
34 35
)

Kálmán Viktor committed
36 37
from manager.mancelery import celery

38

Őry Máté committed
39 40 41
logger = getLogger(__name__)


42
class ActivityInProgressError(HumanReadableException):
43

44 45 46 47 48 49 50 51 52
    @classmethod
    def create(cls, activity):
        obj = super(ActivityInProgressError, cls).create(
            ugettext_noop("%(activity)s activity is currently in progress."),
            ugettext_noop("%(activity)s (%(pk)s) activity is currently "
                          "in progress."),
            activity=activity.readable_name, pk=activity.pk)
        obj.activity = activity
        return obj
53 54


55 56 57 58 59 60 61 62 63 64 65 66
def _normalize_readable_name(name, default=None):
    if name is None:
        warn("Set readable_name to a HumanReadableObject",
             DeprecationWarning, 3)
        name = default.replace(".", " ")

    if not isinstance(name, HumanReadableObject):
        name = create_readable(name)

    return name


Őry Máté committed
67
class InstanceActivity(ActivityModel):
68
    ACTIVITY_CODE_BASE = join_activity_code('vm', 'Instance')
Őry Máté committed
69 70 71
    instance = ForeignKey('Instance', related_name='activity_log',
                          help_text=_('Instance this activity works on.'),
                          verbose_name=_('instance'))
72
    resultant_state = CharField(blank=True, max_length=20, null=True)
73 74
    interruptible = BooleanField(default=False, help_text=_(
        'Other activities can interrupt this one.'))
Őry Máté committed
75 76 77 78

    class Meta:
        app_label = 'vm'
        db_table = 'vm_instanceactivity'
79
        ordering = ['-finished', '-started', 'instance', '-id']
Őry Máté committed
80 81 82 83 84 85 86 87 88 89

    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.instance,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.instance)

Dudás Ádám committed
90 91
    def abort(self):
        AbortableAsyncResult(self.task_uuid, backend=celery.backend).abort()
92

Őry Máté committed
93
    @classmethod
94
    def create(cls, code_suffix, instance, task_uuid=None, user=None,
95
               concurrency_check=True, readable_name=None,
96
               resultant_state=None, interruptible=False):
97 98

        readable_name = _normalize_readable_name(readable_name, code_suffix)
99 100
        # Check for concurrent activities
        active_activities = instance.activity_log.filter(finished__isnull=True)
101
        if concurrency_check and active_activities.exists():
102 103 104 105 106 107
            for i in active_activities:
                if i.interruptible:
                    i.finish(False, result=ugettext_noop(
                        "Interrupted by other activity."))
                else:
                    raise ActivityInProgressError.create(i)
108

109
        activity_code = cls.construct_activity_code(code_suffix)
110
        act = cls(activity_code=activity_code, instance=instance, parent=None,
111
                  resultant_state=resultant_state, started=timezone.now(),
112
                  readable_name_data=readable_name.to_dict(),
113
                  task_uuid=task_uuid, user=user, interruptible=interruptible)
Őry Máté committed
114 115 116
        act.save()
        return act

117
    def create_sub(self, code_suffix, task_uuid=None, concurrency_check=True,
118 119
                   readable_name=None, resultant_state=None,
                   interruptible=False):
120 121

        readable_name = _normalize_readable_name(readable_name, code_suffix)
122 123
        # Check for concurrent activities
        active_children = self.children.filter(finished__isnull=True)
124
        if concurrency_check and active_children.exists():
125
            raise ActivityInProgressError.create(active_children[0])
126

Őry Máté committed
127
        act = InstanceActivity(
128
            activity_code=join_activity_code(self.activity_code, code_suffix),
129
            instance=self.instance, parent=self,
130
            resultant_state=resultant_state, interruptible=interruptible,
131 132
            readable_name_data=readable_name.to_dict(), started=timezone.now(),
            task_uuid=task_uuid, user=self.user)
Őry Máté committed
133 134 135
        act.save()
        return act

Dudás Ádám committed
136 137 138 139 140 141 142 143 144 145 146
    def get_absolute_url(self):
        return reverse('dashboard.views.vm-activity', args=[self.pk])

    def get_status_id(self):
        if self.succeeded is None:
            return 'wait'
        elif self.succeeded:
            return 'success'
        else:
            return 'failed'

147 148 149 150 151 152 153 154
    def has_percentage(self):
        op = self.instance.get_operation_from_activity_code(self.activity_code)
        return (self.task_uuid and op and op.has_percentage
                and not self.finished)

    def get_percentage(self):
        """Returns the percentage of the running operation if available.
        """
155 156 157 158 159
        result = celery.AsyncResult(id=self.task_uuid)
        if self.has_percentage() and result.info is not None:
            return result.info.get("percent")
        else:
            return 0
160

Dudás Ádám committed
161 162 163 164 165
    @property
    def is_abortable(self):
        """Can the activity be aborted?

        :returns: True if the activity can be aborted; otherwise, False.
166
        """
Dudás Ádám committed
167 168
        op = self.instance.get_operation_from_activity_code(self.activity_code)
        return self.task_uuid and op and op.abortable and not self.finished
Őry Máté committed
169

170 171 172 173 174 175 176
    def is_abortable_for(self, user):
        """Can the given user abort the activity?
        """

        return self.is_abortable and (
            user.is_superuser or user in (self.instance.owner, self.user))

177 178 179 180 181 182 183 184 185
    @property
    def is_aborted(self):
        """Has the activity been aborted?

        :returns: True if the activity has been aborted; otherwise, False.
        """
        return self.task_uuid and AbortableAsyncResult(self.task_uuid
                                                       ).is_aborted()

186 187 188 189 190
    def save(self, *args, **kwargs):
        ret = super(InstanceActivity, self).save(*args, **kwargs)
        self.instance._update_status()
        return ret

Őry Máté committed
191
    @contextmanager
192
    def sub_activity(self, code_suffix, on_abort=None, on_commit=None,
193
                     readable_name=None, task_uuid=None,
194
                     concurrency_check=True, interruptible=False):
195 196
        """Create a transactional context for a nested instance activity.
        """
197 198
        if not readable_name:
            warn("Set readable_name", stacklevel=3)
199
        act = self.create_sub(code_suffix, task_uuid, concurrency_check,
200 201
                              readable_name=readable_name,
                              interruptible=interruptible)
202
        return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
Őry Máté committed
203

204 205 206 207
    def get_operation(self):
        return self.instance.get_operation_from_activity_code(
            self.activity_code)

Őry Máté committed
208 209

@contextmanager
210
def instance_activity(code_suffix, instance, on_abort=None, on_commit=None,
211
                      task_uuid=None, user=None, concurrency_check=True,
212
                      readable_name=None, resultant_state=None):
213 214
    """Create a transactional context for an instance activity.
    """
215 216
    if not readable_name:
        warn("Set readable_name", stacklevel=3)
217
    act = InstanceActivity.create(code_suffix, instance, task_uuid, user,
218
                                  concurrency_check,
219 220
                                  readable_name=readable_name,
                                  resultant_state=resultant_state)
221
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
222 223 224


class NodeActivity(ActivityModel):
225
    ACTIVITY_CODE_BASE = join_activity_code('vm', 'Node')
226 227 228 229 230 231 232 233
    node = ForeignKey('Node', related_name='activity_log',
                      help_text=_('Node this activity works on.'),
                      verbose_name=_('node'))

    class Meta:
        app_label = 'vm'
        db_table = 'vm_nodeactivity'

234 235 236 237 238 239 240 241 242
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.node,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.node)

243
    @classmethod
244 245 246 247
    def create(cls, code_suffix, node, task_uuid=None, user=None,
               readable_name=None):

        readable_name = _normalize_readable_name(readable_name, code_suffix)
248 249
        activity_code = join_activity_code(cls.ACTIVITY_CODE_BASE, code_suffix)
        act = cls(activity_code=activity_code, node=node, parent=None,
250
                  readable_name_data=readable_name.to_dict(),
251
                  started=timezone.now(), task_uuid=task_uuid, user=user)
252 253 254
        act.save()
        return act

255 256 257
    def create_sub(self, code_suffix, task_uuid=None, readable_name=None):

        readable_name = _normalize_readable_name(readable_name, code_suffix)
258
        act = NodeActivity(
259
            activity_code=join_activity_code(self.activity_code, code_suffix),
260
            node=self.node, parent=self, started=timezone.now(),
261 262
            readable_name_data=readable_name.to_dict(), task_uuid=task_uuid,
            user=self.user)
263 264 265 266
        act.save()
        return act

    @contextmanager
267 268 269
    def sub_activity(self, code_suffix, task_uuid=None, readable_name=None):
        act = self.create_sub(code_suffix, task_uuid,
                              readable_name=readable_name)
270 271 272 273
        return activitycontextimpl(act)


@contextmanager
274 275 276 277
def node_activity(code_suffix, node, task_uuid=None, user=None,
                  readable_name=None):
    act = NodeActivity.create(code_suffix, node, task_uuid, user,
                              readable_name=readable_name)
278
    return activitycontextimpl(act)
279 280 281 282 283


@worker_ready.connect()
def cleanup(conf=None, **kwargs):
    # TODO check if other manager workers are running
284 285
    from celery.task.control import discard_all
    discard_all()
286 287 288
    msg_txt = ugettext_noop("Manager is restarted, activity is cleaned up. "
                            "You can try again now.")
    message = create_readable(msg_txt, msg_txt)
289
    for i in InstanceActivity.objects.filter(finished__isnull=True):
290
        i.finish(False, result=message)
291
        logger.error('Forced finishing stale activity %s', i)
292
    for i in NodeActivity.objects.filter(finished__isnull=True):
293
        i.finish(False, result=message)
294
        logger.error('Forced finishing stale activity %s', i)