Commit aab4fb6c by Őry Máté

vm: implement abortable ShutdownOperation

parent 6586d87e
...@@ -5,6 +5,8 @@ from logging import getLogger ...@@ -5,6 +5,8 @@ from logging import getLogger
from string import ascii_lowercase from string import ascii_lowercase
from warnings import warn from warnings import warn
from celery.exceptions import TimeoutError
from celery.contrib.abortable import AbortableAsyncResult
import django.conf import django.conf
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core import signing from django.core import signing
...@@ -22,6 +24,7 @@ from taggit.managers import TaggableManager ...@@ -22,6 +24,7 @@ from taggit.managers import TaggableManager
from acl.models import AclBase from acl.models import AclBase
from common.operations import OperatedMixin from common.operations import OperatedMixin
from manager.mancelery import celery
from ..tasks import vm_tasks, agent_tasks from ..tasks import vm_tasks, agent_tasks
from .activity import (ActivityInProgressError, instance_activity, from .activity import (ActivityInProgressError, instance_activity,
InstanceActivity) InstanceActivity)
...@@ -813,13 +816,21 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -813,13 +816,21 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
queue=queue_name queue=queue_name
).get(timeout=timeout) ).get(timeout=timeout)
def shutdown_vm(self, timeout=120): def shutdown_vm(self, task=None, step=5):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm')
logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name, logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name,
self.vm_name) self.vm_name)
return vm_tasks.shutdown.apply_async(kwargs={'name': self.vm_name}, remote = vm_tasks.shutdown.apply_async(kwargs={'name': self.vm_name},
queue=queue_name queue=queue_name)
).get(timeout=timeout)
while True:
try:
return remote.get(timeout=step)
except TimeoutError:
if task is not None and task.is_aborted():
AbortableAsyncResult(remote.id,
backend=celery.backend).abort()
raise Exception("Shutdown aborted by user.")
def suspend_vm(self, timeout=60): def suspend_vm(self, timeout=60):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm')
......
...@@ -321,17 +321,11 @@ class ShutdownOperation(InstanceOperation): ...@@ -321,17 +321,11 @@ class ShutdownOperation(InstanceOperation):
if self.instance.status not in ['RUNNING']: if self.instance.status not in ['RUNNING']:
raise self.instance.WrongStateError(self.instance) raise self.instance.WrongStateError(self.instance)
def on_abort(self, activity, error):
if isinstance(error, TimeLimitExceeded):
activity.resultant_state = None
else:
activity.resultant_state = 'ERROR'
def on_commit(self, activity): def on_commit(self, activity):
activity.resultant_state = 'STOPPED' activity.resultant_state = 'STOPPED'
def _operation(self, timeout=120): def _operation(self, task=None):
self.instance.shutdown_vm(timeout=timeout) self.instance.shutdown_vm(task=task)
self.instance.yield_node() self.instance.yield_node()
self.instance.yield_vnc_port() self.instance.yield_vnc_port()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment