Commit 1138a614 by Őry Máté

Merge branch 'feature-wait-for-agent' into 'master'

Feature Wait For Agent

closes #280
parents 8045b135 d932325c
...@@ -214,6 +214,14 @@ class ActivityModel(TimeStampedModel): ...@@ -214,6 +214,14 @@ class ActivityModel(TimeStampedModel):
self.result_data = None if value is None else value.to_dict() self.result_data = None if value is None else value.to_dict()
@classmethod
def construct_activity_code(cls, code_suffix, sub_suffix=None):
code = join_activity_code(cls.ACTIVITY_CODE_BASE, code_suffix)
if sub_suffix:
return join_activity_code(code, sub_suffix)
else:
return code
@celery.task() @celery.task()
def compute_cached(method, instance, memcached_seconds, def compute_cached(method, instance, memcached_seconds,
......
...@@ -481,7 +481,7 @@ class TemplateForm(forms.ModelForm): ...@@ -481,7 +481,7 @@ class TemplateForm(forms.ModelForm):
else: else:
self.allowed_fields = ( self.allowed_fields = (
'name', 'access_method', 'description', 'system', 'tags', 'name', 'access_method', 'description', 'system', 'tags',
'arch', 'lease') 'arch', 'lease', 'has_agent')
if (self.user.has_perm('vm.change_template_resources') if (self.user.has_perm('vm.change_template_resources')
or not self.instance.pk): or not self.instance.pk):
self.allowed_fields += tuple(set(self.fields.keys()) - self.allowed_fields += tuple(set(self.fields.keys()) -
......
...@@ -51,6 +51,7 @@ ...@@ -51,6 +51,7 @@
{{ form.req_traits|as_crispy_field }} {{ form.req_traits|as_crispy_field }}
{{ form.description|as_crispy_field }} {{ form.description|as_crispy_field }}
{{ form.system|as_crispy_field }} {{ form.system|as_crispy_field }}
{{ form.has_agent|as_crispy_field }}
</fieldset> </fieldset>
<fieldset> <fieldset>
<legend>{% trans "External resources" %}</legend> <legend>{% trans "External resources" %}</legend>
......
...@@ -24,7 +24,7 @@ from celery.signals import worker_ready ...@@ -24,7 +24,7 @@ from celery.signals import worker_ready
from celery.contrib.abortable import AbortableAsyncResult from celery.contrib.abortable import AbortableAsyncResult
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from django.db.models import CharField, ForeignKey from django.db.models import CharField, ForeignKey, BooleanField
from django.utils import timezone from django.utils import timezone
from django.utils.translation import ugettext_lazy as _, ugettext_noop from django.utils.translation import ugettext_lazy as _, ugettext_noop
...@@ -70,6 +70,8 @@ class InstanceActivity(ActivityModel): ...@@ -70,6 +70,8 @@ class InstanceActivity(ActivityModel):
help_text=_('Instance this activity works on.'), help_text=_('Instance this activity works on.'),
verbose_name=_('instance')) verbose_name=_('instance'))
resultant_state = CharField(blank=True, max_length=20, null=True) resultant_state = CharField(blank=True, max_length=20, null=True)
interruptible = BooleanField(default=False, help_text=_(
'Other activities can interrupt this one.'))
class Meta: class Meta:
app_label = 'vm' app_label = 'vm'
...@@ -91,24 +93,30 @@ class InstanceActivity(ActivityModel): ...@@ -91,24 +93,30 @@ class InstanceActivity(ActivityModel):
@classmethod @classmethod
def create(cls, code_suffix, instance, task_uuid=None, user=None, def create(cls, code_suffix, instance, task_uuid=None, user=None,
concurrency_check=True, readable_name=None, concurrency_check=True, readable_name=None,
resultant_state=None): resultant_state=None, interruptible=False):
readable_name = _normalize_readable_name(readable_name, code_suffix) readable_name = _normalize_readable_name(readable_name, code_suffix)
# Check for concurrent activities # Check for concurrent activities
active_activities = instance.activity_log.filter(finished__isnull=True) active_activities = instance.activity_log.filter(finished__isnull=True)
if concurrency_check and active_activities.exists(): if concurrency_check and active_activities.exists():
raise ActivityInProgressError.create(active_activities[0]) for i in active_activities:
if i.interruptible:
i.finish(False, result=ugettext_noop(
"Interrupted by other activity."))
else:
raise ActivityInProgressError.create(i)
activity_code = join_activity_code(cls.ACTIVITY_CODE_BASE, code_suffix) activity_code = cls.construct_activity_code(code_suffix)
act = cls(activity_code=activity_code, instance=instance, parent=None, act = cls(activity_code=activity_code, instance=instance, parent=None,
resultant_state=resultant_state, started=timezone.now(), resultant_state=resultant_state, started=timezone.now(),
readable_name_data=readable_name.to_dict(), readable_name_data=readable_name.to_dict(),
task_uuid=task_uuid, user=user) task_uuid=task_uuid, user=user, interruptible=interruptible)
act.save() act.save()
return act return act
def create_sub(self, code_suffix, task_uuid=None, concurrency_check=True, def create_sub(self, code_suffix, task_uuid=None, concurrency_check=True,
readable_name=None, resultant_state=None): readable_name=None, resultant_state=None,
interruptible=False):
readable_name = _normalize_readable_name(readable_name, code_suffix) readable_name = _normalize_readable_name(readable_name, code_suffix)
# Check for concurrent activities # Check for concurrent activities
...@@ -119,7 +127,7 @@ class InstanceActivity(ActivityModel): ...@@ -119,7 +127,7 @@ class InstanceActivity(ActivityModel):
act = InstanceActivity( act = InstanceActivity(
activity_code=join_activity_code(self.activity_code, code_suffix), activity_code=join_activity_code(self.activity_code, code_suffix),
instance=self.instance, parent=self, instance=self.instance, parent=self,
resultant_state=resultant_state, resultant_state=resultant_state, interruptible=interruptible,
readable_name_data=readable_name.to_dict(), started=timezone.now(), readable_name_data=readable_name.to_dict(), started=timezone.now(),
task_uuid=task_uuid, user=self.user) task_uuid=task_uuid, user=self.user)
act.save() act.save()
...@@ -183,13 +191,14 @@ class InstanceActivity(ActivityModel): ...@@ -183,13 +191,14 @@ class InstanceActivity(ActivityModel):
@contextmanager @contextmanager
def sub_activity(self, code_suffix, on_abort=None, on_commit=None, def sub_activity(self, code_suffix, on_abort=None, on_commit=None,
readable_name=None, task_uuid=None, readable_name=None, task_uuid=None,
concurrency_check=True): concurrency_check=True, interruptible=False):
"""Create a transactional context for a nested instance activity. """Create a transactional context for a nested instance activity.
""" """
if not readable_name: if not readable_name:
warn("Set readable_name", stacklevel=3) warn("Set readable_name", stacklevel=3)
act = self.create_sub(code_suffix, task_uuid, concurrency_check, act = self.create_sub(code_suffix, task_uuid, concurrency_check,
readable_name=readable_name) readable_name=readable_name,
interruptible=interruptible)
return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit) return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
def get_operation(self): def get_operation(self):
......
...@@ -123,6 +123,10 @@ class VirtualMachineDescModel(BaseResourceConfigModel): ...@@ -123,6 +123,10 @@ class VirtualMachineDescModel(BaseResourceConfigModel):
'format like "%s".') % 'format like "%s".') %
'Ubuntu 12.04 LTS Desktop amd64')) 'Ubuntu 12.04 LTS Desktop amd64'))
tags = TaggableManager(blank=True, verbose_name=_("tags")) tags = TaggableManager(blank=True, verbose_name=_("tags"))
has_agent = BooleanField(verbose_name=_('has agent'), default=True,
help_text=_(
'If the machine has agent installed, and '
'the manager should wait for its start.'))
class Meta: class Meta:
abstract = True abstract = True
...@@ -424,7 +428,8 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -424,7 +428,8 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
# prepare parameters # prepare parameters
common_fields = ['name', 'description', 'num_cores', 'ram_size', common_fields = ['name', 'description', 'num_cores', 'ram_size',
'max_ram_size', 'arch', 'priority', 'boot_menu', 'max_ram_size', 'arch', 'priority', 'boot_menu',
'raw_data', 'lease', 'access_method', 'system'] 'raw_data', 'lease', 'access_method', 'system',
'has_agent']
params = dict(template=template, owner=owner, pw=pwgen()) params = dict(template=template, owner=owner, pw=pwgen())
params.update([(f, getattr(template, f)) for f in common_fields]) params.update([(f, getattr(template, f)) for f in common_fields])
params.update(kwargs) # override defaults w/ user supplied values params.update(kwargs) # override defaults w/ user supplied values
......
...@@ -299,16 +299,20 @@ class DeployOperation(InstanceOperation): ...@@ -299,16 +299,20 @@ class DeployOperation(InstanceOperation):
"deploy network")): "deploy network")):
self.instance.deploy_net() self.instance.deploy_net()
try:
self.instance.renew(parent_activity=activity)
except:
pass
# Resume vm # Resume vm
with activity.sub_activity( with activity.sub_activity(
'booting', readable_name=ugettext_noop( 'booting', readable_name=ugettext_noop(
"boot virtual machine")): "boot virtual machine")):
self.instance.resume_vm(timeout=timeout) self.instance.resume_vm(timeout=timeout)
try: if self.instance.has_agent:
self.instance.renew(parent_activity=activity) activity.sub_activity('os_boot', readable_name=ugettext_noop(
except: "wait operating system loading"), interruptible=True)
pass
register_operation(DeployOperation) register_operation(DeployOperation)
...@@ -425,8 +429,11 @@ class RebootOperation(InstanceOperation): ...@@ -425,8 +429,11 @@ class RebootOperation(InstanceOperation):
required_perms = () required_perms = ()
accept_states = ('RUNNING', ) accept_states = ('RUNNING', )
def _operation(self, timeout=5): def _operation(self, activity, timeout=5):
self.instance.reboot_vm(timeout=timeout) self.instance.reboot_vm(timeout=timeout)
if self.instance.has_agent:
activity.sub_activity('os_boot', readable_name=ugettext_noop(
"wait operating system loading"), interruptible=True)
register_operation(RebootOperation) register_operation(RebootOperation)
...@@ -499,8 +506,11 @@ class ResetOperation(InstanceOperation): ...@@ -499,8 +506,11 @@ class ResetOperation(InstanceOperation):
required_perms = () required_perms = ()
accept_states = ('RUNNING', ) accept_states = ('RUNNING', )
def _operation(self, timeout=5): def _operation(self, activity, timeout=5):
self.instance.reset_vm(timeout=timeout) self.instance.reset_vm(timeout=timeout)
if self.instance.has_agent:
activity.sub_activity('os_boot', readable_name=ugettext_noop(
"wait operating system loading"), interruptible=True)
register_operation(ResetOperation) register_operation(ResetOperation)
......
...@@ -85,16 +85,22 @@ def agent_started(vm, version=None): ...@@ -85,16 +85,22 @@ def agent_started(vm, version=None):
from vm.models import Instance, instance_activity, InstanceActivity from vm.models import Instance, instance_activity, InstanceActivity
instance = Instance.objects.get(id=int(vm.split('-')[-1])) instance = Instance.objects.get(id=int(vm.split('-')[-1]))
queue = instance.get_remote_queue_name("agent") queue = instance.get_remote_queue_name("agent")
initialized = InstanceActivity.objects.filter( initialized = instance.activity_log.filter(
instance=instance, activity_code='vm.Instance.agent.cleanup').exists() activity_code='vm.Instance.agent.cleanup').exists()
with instance_activity(code_suffix='agent', with instance_activity(code_suffix='agent',
readable_name=ugettext_noop('agent'), readable_name=ugettext_noop('agent'),
concurrency_check=False,
instance=instance) as act: instance=instance) as act:
with act.sub_activity('starting', with act.sub_activity('starting',
readable_name=ugettext_noop('starting')): readable_name=ugettext_noop('starting')):
pass pass
for i in InstanceActivity.objects.filter(
instance=instance, activity_code__endswith='.os_boot',
finished__isnull=True):
i.finish(True)
if version and version != settings.AGENT_VERSION: if version and version != settings.AGENT_VERSION:
try: try:
update_agent(instance, act) update_agent(instance, act)
...@@ -108,10 +114,9 @@ def agent_started(vm, version=None): ...@@ -108,10 +114,9 @@ def agent_started(vm, version=None):
send_init_commands(instance, act) send_init_commands(instance, act)
send_networking_commands(instance, act) send_networking_commands(instance, act)
with act.sub_activity( with act.sub_activity('start_access_server',
'start_access_server', readable_name=ugettext_noop(
readable_name=ugettext_noop('start access server') 'start access server')):
):
start_access_server.apply_async(queue=queue, args=(vm, )) start_access_server.apply_async(queue=queue, args=(vm, ))
......
...@@ -217,6 +217,8 @@ class InstanceActivityTestCase(TestCase): ...@@ -217,6 +217,8 @@ class InstanceActivityTestCase(TestCase):
def test_create_concurrency_check(self): def test_create_concurrency_check(self):
instance = MagicMock(spec=Instance) instance = MagicMock(spec=Instance)
instance.activity_log.filter.return_value.__iter__.return_value = iter(
[MagicMock(spec=InstanceActivity, interruptible=False)])
instance.activity_log.filter.return_value.exists.return_value = True instance.activity_log.filter.return_value.exists.return_value = True
with self.assertRaises(ActivityInProgressError): with self.assertRaises(ActivityInProgressError):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment