Commit 645ad6a0 by Bach Dániel

Merge branch 'feature-agent-operations' into 'master'

Feature agent operations

See merge request !251
parents 542bbd0a 7ecbd0fb
...@@ -16,15 +16,22 @@ ...@@ -16,15 +16,22 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import, unicode_literals from __future__ import absolute_import, unicode_literals
from base64 import encodestring
from hashlib import md5
from logging import getLogger from logging import getLogger
import os
from re import search from re import search
from string import ascii_lowercase from string import ascii_lowercase
from StringIO import StringIO
from tarfile import TarFile, TarInfo
import time
from urlparse import urlsplit from urlparse import urlsplit
from django.core.exceptions import PermissionDenied from django.core.exceptions import PermissionDenied
from django.utils import timezone from django.utils import timezone
from django.utils.translation import ugettext_lazy as _, ugettext_noop from django.utils.translation import ugettext_lazy as _, ugettext_noop
from django.conf import settings from django.conf import settings
from django.db.models import Q
from sizefield.utils import filesizeformat from sizefield.utils import filesizeformat
...@@ -43,9 +50,11 @@ from .models import ( ...@@ -43,9 +50,11 @@ from .models import (
Instance, InstanceActivity, InstanceTemplate, Interface, Node, Instance, InstanceActivity, InstanceTemplate, Interface, Node,
NodeActivity, pwgen NodeActivity, pwgen
) )
from .tasks import agent_tasks, local_agent_tasks, vm_tasks from .tasks import agent_tasks, vm_tasks
from dashboard.store_api import Store, NoStoreException from dashboard.store_api import Store, NoStoreException
from firewall.models import Host
from monitor.client import Client
from storage.tasks import storage_tasks from storage.tasks import storage_tasks
logger = getLogger(__name__) logger = getLogger(__name__)
...@@ -162,6 +171,30 @@ class RemoteInstanceOperation(RemoteOperationMixin, InstanceOperation): ...@@ -162,6 +171,30 @@ class RemoteInstanceOperation(RemoteOperationMixin, InstanceOperation):
return [self.instance.vm_name] return [self.instance.vm_name]
class EnsureAgentMixin(object):
accept_states = ('RUNNING', )
def check_precond(self):
super(EnsureAgentMixin, self).check_precond()
last_boot_time = self.instance.activity_log.filter(
succeeded=True, activity_code__in=(
"vm.Instance.deploy", "vm.Instance.reset",
"vm.Instance.reboot")).latest("finished").finished
try:
InstanceActivity.objects.filter(
activity_code="vm.Instance.agent.starting",
started__gt=last_boot_time).latest("started")
except InstanceActivity.DoesNotExist: # no agent since last boot
raise self.instance.NoAgentError(self.instance)
class RemoteAgentOperation(EnsureAgentMixin, RemoteInstanceOperation):
remote_queue = ('agent', )
concurrency_check = False
@register_operation @register_operation
class AddInterfaceOperation(InstanceOperation): class AddInterfaceOperation(InstanceOperation):
id = 'add_interface' id = 'add_interface'
...@@ -198,7 +231,8 @@ class AddInterfaceOperation(InstanceOperation): ...@@ -198,7 +231,8 @@ class AddInterfaceOperation(InstanceOperation):
self.rollback(net, activity) self.rollback(net, activity)
raise raise
net.deploy() net.deploy()
local_agent_tasks.send_networking_commands(self.instance, activity) self.instance._change_ip(parent_activity=activity)
self.instance._restart_networking(parent_activity=activity)
def get_activity_name(self, kwargs): def get_activity_name(self, kwargs):
return create_readable(ugettext_noop("add %(vlan)s interface"), return create_readable(ugettext_noop("add %(vlan)s interface"),
...@@ -646,6 +680,11 @@ class SaveAsTemplateOperation(InstanceOperation): ...@@ -646,6 +680,11 @@ class SaveAsTemplateOperation(InstanceOperation):
def _operation(self, activity, user, system, name=None, def _operation(self, activity, user, system, name=None,
with_shutdown=True, clone=False, task=None, **kwargs): with_shutdown=True, clone=False, task=None, **kwargs):
try:
self.instance._cleanup(parent_activity=activity, user=user)
except Instance.WrongStateError:
pass
if with_shutdown: if with_shutdown:
try: try:
ShutdownOperation(self.instance).call(parent_activity=activity, ShutdownOperation(self.instance).call(parent_activity=activity,
...@@ -1176,27 +1215,8 @@ class ResourcesOperation(InstanceOperation): ...@@ -1176,27 +1215,8 @@ class ResourcesOperation(InstanceOperation):
) )
class EnsureAgentMixin(object):
accept_states = ('RUNNING', )
def check_precond(self):
super(EnsureAgentMixin, self).check_precond()
last_boot_time = self.instance.activity_log.filter(
succeeded=True, activity_code__in=(
"vm.Instance.deploy", "vm.Instance.reset",
"vm.Instance.reboot")).latest("finished").finished
try:
InstanceActivity.objects.filter(
activity_code="vm.Instance.agent.starting",
started__gt=last_boot_time).latest("started")
except InstanceActivity.DoesNotExist: # no agent since last boot
raise self.instance.NoAgentError(self.instance)
@register_operation @register_operation
class PasswordResetOperation(EnsureAgentMixin, InstanceOperation): class PasswordResetOperation(RemoteAgentOperation):
id = 'password_reset' id = 'password_reset'
name = _("password reset") name = _("password reset")
description = _("Generate and set a new login password on the virtual " description = _("Generate and set a new login password on the virtual "
...@@ -1205,17 +1225,236 @@ class PasswordResetOperation(EnsureAgentMixin, InstanceOperation): ...@@ -1205,17 +1225,236 @@ class PasswordResetOperation(EnsureAgentMixin, InstanceOperation):
"logging in as other settings are possible to prevent " "logging in as other settings are possible to prevent "
"it.") "it.")
acl_level = "owner" acl_level = "owner"
task = agent_tasks.change_password
required_perms = () required_perms = ()
def _operation(self): def _get_remote_args(self, password, **kwargs):
self.instance.pw = pwgen() return (super(PasswordResetOperation, self)._get_remote_args(**kwargs)
queue = self.instance.get_remote_queue_name("agent") + [password])
agent_tasks.change_password.apply_async(
queue=queue, args=(self.instance.vm_name, self.instance.pw)) def _operation(self, password=None):
if not password:
password = pwgen()
super(PasswordResetOperation, self)._operation(password=password)
self.instance.pw = password
self.instance.save() self.instance.save()
@register_operation @register_operation
class AgentStartedOperation(InstanceOperation):
id = 'agent_started'
name = _("agent")
acl_level = "owner"
required_perms = ()
concurrency_check = False
@classmethod
def get_activity_code_suffix(cls):
return 'agent'
@property
def initialized(self):
return self.instance.activity_log.filter(
activity_code='vm.Instance.agent._cleanup').exists()
def measure_boot_time(self):
if not self.instance.template:
return
deploy_time = InstanceActivity.objects.filter(
instance=self.instance, activity_code="vm.Instance.deploy"
).latest("finished").finished
total_boot_time = (timezone.now() - deploy_time).total_seconds()
Client().send([
"template.%(pk)d.boot_time %(val)f %(time)s" % {
'pk': self.instance.template.pk,
'val': total_boot_time,
'time': time.time(),
}
])
def finish_agent_wait(self):
for i in InstanceActivity.objects.filter(
(Q(activity_code__endswith='.os_boot') |
Q(activity_code__endswith='.agent_wait')),
instance=self.instance, finished__isnull=True):
i.finish(True)
def _operation(self, user, activity, old_version=None, agent_system=None):
with activity.sub_activity('starting', concurrency_check=False,
readable_name=ugettext_noop('starting')):
pass
self.finish_agent_wait()
self.instance._change_ip(parent_activity=activity)
self.instance._restart_networking(parent_activity=activity)
new_version = settings.AGENT_VERSION
if new_version and old_version and new_version != old_version:
try:
self.instance.update_agent(
parent_activity=activity, agent_system=agent_system)
except TimeoutError:
pass
else:
activity.sub_activity(
'agent_wait', readable_name=ugettext_noop(
"wait agent restarting"), interruptible=True)
return # agent is going to restart
if not self.initialized:
try:
self.measure_boot_time()
except:
logger.exception('Unhandled error in measure_boot_time()')
self.instance._cleanup(parent_activity=activity)
self.instance.password_reset(
parent_activity=activity, password=self.instance.pw)
self.instance._set_time(parent_activity=activity)
self.instance._set_hostname(parent_activity=activity)
@register_operation
class CleanupOperation(SubOperationMixin, RemoteAgentOperation):
id = '_cleanup'
name = _("cleanup")
task = agent_tasks.cleanup
@register_operation
class SetTimeOperation(SubOperationMixin, RemoteAgentOperation):
id = '_set_time'
name = _("set time")
task = agent_tasks.set_time
def _get_remote_args(self, **kwargs):
cls = AgentStartedOperation.SetTimeOperation
return (super(cls, self)._get_remote_args(**kwargs)
+ [time.time()])
@register_operation
class SetHostnameOperation(SubOperationMixin, RemoteAgentOperation):
id = '_set_hostname'
name = _("set hostname")
task = agent_tasks.set_hostname
def _get_remote_args(self, **kwargs):
cls = AgentStartedOperation.SetHostnameOperation
return (super(cls, self)._get_remote_args(**kwargs)
+ [self.instance.short_hostname])
@register_operation
class RestartNetworkingOperation(SubOperationMixin, RemoteAgentOperation):
id = '_restart_networking'
name = _("restart networking")
task = agent_tasks.restart_networking
@register_operation
class ChangeIpOperation(SubOperationMixin, RemoteAgentOperation):
id = '_change_ip'
name = _("change ip")
task = agent_tasks.change_ip
def _get_remote_args(self, **kwargs):
hosts = Host.objects.filter(interface__instance=self.instance)
interfaces = {str(host.mac): host.get_network_config()
for host in hosts}
cls = AgentStartedOperation.ChangeIpOperation
return (super(cls, self)._get_remote_args(**kwargs)
+ [interfaces, settings.FIREWALL_SETTINGS['rdns_ip']])
@register_operation
class UpdateAgentOperation(RemoteAgentOperation):
id = 'update_agent'
name = _("update agent")
acl_level = "owner"
required_perms = ()
def get_activity_name(self, kwargs):
return create_readable(
ugettext_noop('update agent to %(version)s'),
version=settings.AGENT_VERSION)
@staticmethod
def create_linux_tar():
def exclude(tarinfo):
ignored = ('./.', './misc', './windows')
if any(tarinfo.name.startswith(x) for x in ignored):
return None
else:
return tarinfo
f = StringIO()
with TarFile.open(fileobj=f, mode='w:gz') as tar:
agent_path = os.path.join(settings.AGENT_DIR, "agent-linux")
tar.add(agent_path, arcname='.', filter=exclude)
version_fileobj = StringIO(settings.AGENT_VERSION)
version_info = TarInfo(name='version.txt')
version_info.size = len(version_fileobj.buf)
tar.addfile(version_info, version_fileobj)
return encodestring(f.getvalue()).replace('\n', '')
@staticmethod
def create_windows_tar():
f = StringIO()
agent_path = os.path.join(settings.AGENT_DIR, "agent-win")
with TarFile.open(fileobj=f, mode='w|gz') as tar:
tar.add(agent_path, arcname='.')
version_fileobj = StringIO(settings.AGENT_VERSION)
version_info = TarInfo(name='version.txt')
version_info.size = len(version_fileobj.buf)
tar.addfile(version_info, version_fileobj)
return encodestring(f.getvalue()).replace('\n', '')
def _operation(self, user, activity, agent_system):
queue = self._get_remote_queue()
instance = self.instance
if agent_system == "Windows":
executable = os.listdir(
os.path.join(settings.AGENT_DIR, "agent-win"))[0]
data = self.create_windows_tar()
elif agent_system == "Linux":
executable = ""
data = self.create_linux_tar()
else:
# Legacy update method
executable = ""
return agent_tasks.update_legacy.apply_async(
queue=queue,
args=(instance.vm_name, self.create_linux_tar())
).get(timeout=60)
checksum = md5(data).hexdigest()
chunk_size = 1024 * 1024
chunk_number = 0
index = 0
filename = settings.AGENT_VERSION + ".tar"
while True:
chunk = data[index:index+chunk_size]
if chunk:
agent_tasks.append.apply_async(
queue=queue,
args=(instance.vm_name, chunk,
filename, chunk_number)).get(timeout=60)
index = index + chunk_size
chunk_number = chunk_number + 1
else:
agent_tasks.update.apply_async(
queue=queue,
args=(instance.vm_name, filename, executable, checksum)
).get(timeout=60)
break
@register_operation
class MountStoreOperation(EnsureAgentMixin, InstanceOperation): class MountStoreOperation(EnsureAgentMixin, InstanceOperation):
id = 'mount_store' id = 'mount_store'
name = _("mount store") name = _("mount store")
......
...@@ -15,226 +15,26 @@ ...@@ -15,226 +15,26 @@
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from common.models import create_readable
from manager.mancelery import celery
from vm.tasks.agent_tasks import (restart_networking, change_password,
set_time, set_hostname, start_access_server,
cleanup, update, append,
change_ip, update_legacy)
from firewall.models import Host
import time
import os
from base64 import encodestring
from hashlib import md5
from StringIO import StringIO
from tarfile import TarFile, TarInfo
from django.conf import settings
from django.db.models import Q
from django.utils import timezone
from django.utils.translation import ugettext_noop from django.utils.translation import ugettext_noop
from celery.result import TimeoutError
from monitor.client import Client
def send_init_commands(instance, act):
vm = instance.vm_name
queue = instance.get_remote_queue_name("agent")
with act.sub_activity('cleanup', readable_name=ugettext_noop('cleanup')):
cleanup.apply_async(queue=queue, args=(vm, ))
with act.sub_activity('change_password',
readable_name=ugettext_noop('change password')):
change_password.apply_async(queue=queue, args=(vm, instance.pw))
with act.sub_activity('set_time', readable_name=ugettext_noop('set time')):
set_time.apply_async(queue=queue, args=(vm, time.time()))
with act.sub_activity('set_hostname',
readable_name=ugettext_noop('set hostname')):
set_hostname.apply_async(
queue=queue, args=(vm, instance.short_hostname))
def send_networking_commands(instance, act):
queue = instance.get_remote_queue_name("agent")
with act.sub_activity('change_ip',
readable_name=ugettext_noop('change ip')):
change_ip.apply_async(queue=queue, args=(
instance.vm_name, ) + get_network_configs(instance))
with act.sub_activity('restart_networking',
readable_name=ugettext_noop('restart networking')):
restart_networking.apply_async(queue=queue, args=(instance.vm_name, ))
def create_linux_tar():
def exclude(tarinfo):
ignored = ('./.', './misc', './windows')
if any(tarinfo.name.startswith(x) for x in ignored):
return None
else:
return tarinfo
f = StringIO()
with TarFile.open(fileobj=f, mode='w:gz') as tar: from manager.mancelery import celery
agent_path = os.path.join(settings.AGENT_DIR, "agent-linux")
tar.add(agent_path, arcname='.', filter=exclude)
version_fileobj = StringIO(settings.AGENT_VERSION)
version_info = TarInfo(name='version.txt')
version_info.size = len(version_fileobj.buf)
tar.addfile(version_info, version_fileobj)
return encodestring(f.getvalue()).replace('\n', '')
def create_windows_tar():
f = StringIO()
agent_path = os.path.join(settings.AGENT_DIR, "agent-win")
with TarFile.open(fileobj=f, mode='w|gz') as tar:
tar.add(agent_path, arcname='.')
version_fileobj = StringIO(settings.AGENT_VERSION)
version_info = TarInfo(name='version.txt')
version_info.size = len(version_fileobj.buf)
tar.addfile(version_info, version_fileobj)
return encodestring(f.getvalue()).replace('\n', '')
@celery.task @celery.task
def agent_started(vm, version=None, system=None): def agent_started(vm, version=None, system=None):
from vm.models import Instance, InstanceActivity from vm.models import Instance
instance = Instance.objects.get(id=int(vm.split('-')[-1])) instance = Instance.objects.get(id=int(vm.split('-')[-1]))
queue = instance.get_remote_queue_name("agent") instance.agent_started(
initialized = instance.activity_log.filter( user=instance.owner, old_version=version, agent_system=system)
activity_code='vm.Instance.agent.cleanup').exists()
with instance.activity(code_suffix='agent',
readable_name=ugettext_noop('agent'),
concurrency_check=False) as act:
with act.sub_activity('starting',
readable_name=ugettext_noop('starting')):
pass
for i in InstanceActivity.objects.filter(
(Q(activity_code__endswith='.os_boot') |
Q(activity_code__endswith='.agent_wait')),
instance=instance, finished__isnull=True):
i.finish(True)
if version and version != settings.AGENT_VERSION:
try:
update_agent(instance, act, system, settings.AGENT_VERSION)
except TimeoutError:
pass
else:
act.sub_activity('agent_wait', readable_name=ugettext_noop(
"wait agent restarting"), interruptible=True)
return # agent is going to restart
if not initialized:
measure_boot_time(instance)
send_init_commands(instance, act)
send_networking_commands(instance, act)
with act.sub_activity('start_access_server',
readable_name=ugettext_noop(
'start access server')):
start_access_server.apply_async(queue=queue, args=(vm, ))
def measure_boot_time(instance):
if not instance.template:
return
from vm.models import InstanceActivity
deploy_time = InstanceActivity.objects.filter(
instance=instance, activity_code="vm.Instance.deploy"
).latest("finished").finished
total_boot_time = (timezone.now() - deploy_time).total_seconds()
Client().send([
"template.%(pk)d.boot_time %(val)f %(time)s" % {
'pk': instance.template.pk,
'val': total_boot_time,
'time': time.time(),
}
])
@celery.task @celery.task
def agent_stopped(vm): def agent_stopped(vm):
from vm.models import Instance, InstanceActivity from vm.models import Instance, InstanceActivity
from vm.models.activity import ActivityInProgressError
instance = Instance.objects.get(id=int(vm.split('-')[-1])) instance = Instance.objects.get(id=int(vm.split('-')[-1]))
qs = InstanceActivity.objects.filter(instance=instance, qs = InstanceActivity.objects.filter(
activity_code='vm.Instance.agent') instance=instance, activity_code='vm.Instance.agent')
act = qs.latest('id') act = qs.latest('id')
try: with act.sub_activity('stopping', concurrency_check=False,
with act.sub_activity('stopping', readable_name=ugettext_noop('stopping')):
readable_name=ugettext_noop('stopping')):
pass
except ActivityInProgressError:
pass pass
def get_network_configs(instance):
interfaces = {}
for host in Host.objects.filter(interface__instance=instance):
interfaces[str(host.mac)] = host.get_network_config()
return (interfaces, settings.FIREWALL_SETTINGS['rdns_ip'])
def update_agent(instance, act=None, system=None, version=None):
if act:
act = act.sub_activity(
'update',
readable_name=create_readable(
ugettext_noop('update to %(version)s'),
version=settings.AGENT_VERSION))
else:
act = instance.activity(
code_suffix='agent.update',
readable_name=create_readable(
ugettext_noop('update agent to %(version)s'),
version=settings.AGENT_VERSION))
with act:
queue = instance.get_remote_queue_name("agent")
if system == "Windows":
executable = os.listdir(os.path.join(settings.AGENT_DIR,
"agent-win"))[0]
# executable = "agent-winservice-%(version)s.exe" % {
# 'version': version}
data = create_windows_tar()
elif system == "Linux":
executable = ""
data = create_linux_tar()
else:
executable = ""
# Legacy update method
return update_legacy.apply_async(
queue=queue,
args=(instance.vm_name, create_linux_tar())
).get(timeout=60)
checksum = md5(data).hexdigest()
chunk_size = 1024 * 1024
chunk_number = 0
index = 0
filename = version + ".tar"
while True:
chunk = data[index:index+chunk_size]
if chunk:
append.apply_async(
queue=queue,
args=(instance.vm_name, chunk,
filename, chunk_number)).get(timeout=60)
index = index + chunk_size
chunk_number = chunk_number + 1
else:
update.apply_async(
queue=queue,
args=(instance.vm_name, filename, executable, checksum)
).get(timeout=60)
break
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment