Commit 2e1af39b by Bach Dániel

vm: add agent operations

parent 6841cc02
......@@ -16,15 +16,22 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import, unicode_literals
from base64 import encodestring
from hashlib import md5
from logging import getLogger
import os
from re import search
from string import ascii_lowercase
from StringIO import StringIO
from tarfile import TarFile, TarInfo
import time
from urlparse import urlsplit
from django.core.exceptions import PermissionDenied
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _, ugettext_noop
from django.conf import settings
from django.db.models import Q
from sizefield.utils import filesizeformat
......@@ -43,9 +50,11 @@ from .models import (
Instance, InstanceActivity, InstanceTemplate, Interface, Node,
NodeActivity, pwgen
)
from .tasks import agent_tasks, local_agent_tasks, vm_tasks
from .tasks import agent_tasks, vm_tasks
from dashboard.store_api import Store, NoStoreException
from firewall.models import Host
from monitor.client import Client
from storage.tasks import storage_tasks
logger = getLogger(__name__)
......@@ -162,6 +171,30 @@ class RemoteInstanceOperation(RemoteOperationMixin, InstanceOperation):
return [self.instance.vm_name]
class EnsureAgentMixin(object):
accept_states = ('RUNNING', )
def check_precond(self):
super(EnsureAgentMixin, self).check_precond()
last_boot_time = self.instance.activity_log.filter(
succeeded=True, activity_code__in=(
"vm.Instance.deploy", "vm.Instance.reset",
"vm.Instance.reboot")).latest("finished").finished
try:
InstanceActivity.objects.filter(
activity_code="vm.Instance.agent.starting",
started__gt=last_boot_time).latest("started")
except InstanceActivity.DoesNotExist: # no agent since last boot
raise self.instance.NoAgentError(self.instance)
class RemoteAgentOperation(EnsureAgentMixin, RemoteInstanceOperation):
remote_queue = ('agent', )
concurrency_check = False
@register_operation
class AddInterfaceOperation(InstanceOperation):
id = 'add_interface'
......@@ -198,7 +231,8 @@ class AddInterfaceOperation(InstanceOperation):
self.rollback(net, activity)
raise
net.deploy()
local_agent_tasks.send_networking_commands(self.instance, activity)
self.instance._change_ip(parent_activity=activity)
self.instance._restart_networking(parent_activity=activity)
def get_activity_name(self, kwargs):
return create_readable(ugettext_noop("add %(vlan)s interface"),
......@@ -1176,27 +1210,8 @@ class ResourcesOperation(InstanceOperation):
)
class EnsureAgentMixin(object):
accept_states = ('RUNNING', )
def check_precond(self):
super(EnsureAgentMixin, self).check_precond()
last_boot_time = self.instance.activity_log.filter(
succeeded=True, activity_code__in=(
"vm.Instance.deploy", "vm.Instance.reset",
"vm.Instance.reboot")).latest("finished").finished
try:
InstanceActivity.objects.filter(
activity_code="vm.Instance.agent.starting",
started__gt=last_boot_time).latest("started")
except InstanceActivity.DoesNotExist: # no agent since last boot
raise self.instance.NoAgentError(self.instance)
@register_operation
class PasswordResetOperation(EnsureAgentMixin, InstanceOperation):
class PasswordResetOperation(RemoteAgentOperation):
id = 'password_reset'
name = _("password reset")
description = _("Generate and set a new login password on the virtual "
......@@ -1205,17 +1220,236 @@ class PasswordResetOperation(EnsureAgentMixin, InstanceOperation):
"logging in as other settings are possible to prevent "
"it.")
acl_level = "owner"
task = agent_tasks.change_password
required_perms = ()
def _operation(self):
self.instance.pw = pwgen()
queue = self.instance.get_remote_queue_name("agent")
agent_tasks.change_password.apply_async(
queue=queue, args=(self.instance.vm_name, self.instance.pw))
def _get_remote_args(self, password, **kwargs):
return (super(PasswordResetOperation, self)._get_remote_args(**kwargs)
+ [password])
def _operation(self, password=None):
if not password:
password = pwgen()
super(PasswordResetOperation, self)._operation(password=password)
self.instance.pw = password
self.instance.save()
@register_operation
class AgentStartedOperation(InstanceOperation):
id = 'agent_started'
name = _("agent")
acl_level = "owner"
required_perms = ()
concurrency_check = False
@classmethod
def get_activity_code_suffix(cls):
return 'agent'
@property
def initialized(self):
return self.instance.activity_log.filter(
activity_code='vm.Instance.agent._cleanup').exists()
def measure_boot_time(self):
if not self.instance.template:
return
deploy_time = InstanceActivity.objects.filter(
instance=self.instance, activity_code="vm.Instance.deploy"
).latest("finished").finished
total_boot_time = (timezone.now() - deploy_time).total_seconds()
Client().send([
"template.%(pk)d.boot_time %(val)f %(time)s" % {
'pk': self.instance.template.pk,
'val': total_boot_time,
'time': time.time(),
}
])
def finish_agent_wait(self):
for i in InstanceActivity.objects.filter(
(Q(activity_code__endswith='.os_boot') |
Q(activity_code__endswith='.agent_wait')),
instance=self.instance, finished__isnull=True):
i.finish(True)
def _operation(self, user, activity, old_version=None, agent_system=None):
with activity.sub_activity('starting', concurrency_check=False,
readable_name=ugettext_noop('starting')):
pass
self.finish_agent_wait()
self.instance._change_ip(parent_activity=activity)
self.instance._restart_networking(parent_activity=activity)
new_version = settings.AGENT_VERSION
if new_version and old_version and new_version != old_version:
try:
self.instance.update_agent(
parent_activity=activity, agent_system=agent_system)
except TimeoutError:
pass
else:
activity.sub_activity(
'agent_wait', readable_name=ugettext_noop(
"wait agent restarting"), interruptible=True)
return # agent is going to restart
if not self.initialized:
try:
self.measure_boot_time()
except:
logger.exception('Unhandled error in measure_boot_time()')
self.instance._cleanup(parent_activity=activity)
self.instance.password_reset(
parent_activity=activity, password=self.instance.pw)
self.instance._set_time(parent_activity=activity)
self.instance._set_hostname(parent_activity=activity)
@register_operation
class CleanupOperation(SubOperationMixin, RemoteAgentOperation):
id = '_cleanup'
name = _("cleanup")
task = agent_tasks.cleanup
@register_operation
class SetTimeOperation(SubOperationMixin, RemoteAgentOperation):
id = '_set_time'
name = _("set time")
task = agent_tasks.set_time
def _get_remote_args(self, **kwargs):
cls = AgentStartedOperation.SetTimeOperation
return (super(cls, self)._get_remote_args(**kwargs)
+ [time.time()])
@register_operation
class SetHostnameOperation(SubOperationMixin, RemoteAgentOperation):
id = '_set_hostname'
name = _("set hostname")
task = agent_tasks.set_hostname
def _get_remote_args(self, **kwargs):
cls = AgentStartedOperation.SetHostnameOperation
return (super(cls, self)._get_remote_args(**kwargs)
+ [self.instance.short_hostname])
@register_operation
class RestartNetworkingOperation(SubOperationMixin, RemoteAgentOperation):
id = '_restart_networking'
name = _("restart networking")
task = agent_tasks.restart_networking
@register_operation
class ChangeIpOperation(SubOperationMixin, RemoteAgentOperation):
id = '_change_ip'
name = _("change ip")
task = agent_tasks.change_ip
def _get_remote_args(self, **kwargs):
hosts = Host.objects.filter(interface__instance=self.instance)
interfaces = {str(host.mac): host.get_network_config()
for host in hosts}
cls = AgentStartedOperation.ChangeIpOperation
return (super(cls, self)._get_remote_args(**kwargs)
+ [interfaces, settings.FIREWALL_SETTINGS['rdns_ip']])
@register_operation
class UpdateAgentOperation(RemoteAgentOperation):
id = 'update_agent'
name = _("update agent")
acl_level = "owner"
required_perms = ()
def get_activity_name(self, kwargs):
return create_readable(
ugettext_noop('update agent to %(version)s'),
version=settings.AGENT_VERSION)
@staticmethod
def create_linux_tar():
def exclude(tarinfo):
ignored = ('./.', './misc', './windows')
if any(tarinfo.name.startswith(x) for x in ignored):
return None
else:
return tarinfo
f = StringIO()
with TarFile.open(fileobj=f, mode='w:gz') as tar:
agent_path = os.path.join(settings.AGENT_DIR, "agent-linux")
tar.add(agent_path, arcname='.', filter=exclude)
version_fileobj = StringIO(settings.AGENT_VERSION)
version_info = TarInfo(name='version.txt')
version_info.size = len(version_fileobj.buf)
tar.addfile(version_info, version_fileobj)
return encodestring(f.getvalue()).replace('\n', '')
@staticmethod
def create_windows_tar():
f = StringIO()
agent_path = os.path.join(settings.AGENT_DIR, "agent-win")
with TarFile.open(fileobj=f, mode='w|gz') as tar:
tar.add(agent_path, arcname='.')
version_fileobj = StringIO(settings.AGENT_VERSION)
version_info = TarInfo(name='version.txt')
version_info.size = len(version_fileobj.buf)
tar.addfile(version_info, version_fileobj)
return encodestring(f.getvalue()).replace('\n', '')
def _operation(self, user, activity, agent_system):
queue = self._get_remote_queue()
instance = self.instance
if agent_system == "Windows":
executable = os.listdir(
os.path.join(settings.AGENT_DIR, "agent-win"))[0]
data = self.create_windows_tar()
elif agent_system == "Linux":
executable = ""
data = self.create_linux_tar()
else:
# Legacy update method
executable = ""
return agent_tasks.update_legacy.apply_async(
queue=queue,
args=(instance.vm_name, self.create_linux_tar())
).get(timeout=60)
checksum = md5(data).hexdigest()
chunk_size = 1024 * 1024
chunk_number = 0
index = 0
filename = settings.AGENT_VERSION + ".tar"
while True:
chunk = data[index:index+chunk_size]
if chunk:
agent_tasks.append.apply_async(
queue=queue,
args=(instance.vm_name, chunk,
filename, chunk_number)).get(timeout=60)
index = index + chunk_size
chunk_number = chunk_number + 1
else:
agent_tasks.update.apply_async(
queue=queue,
args=(instance.vm_name, filename, executable, checksum)
).get(timeout=60)
break
@register_operation
class MountStoreOperation(EnsureAgentMixin, InstanceOperation):
id = 'mount_store'
name = _("mount store")
......
......@@ -15,226 +15,26 @@
# You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from common.models import create_readable
from manager.mancelery import celery
from vm.tasks.agent_tasks import (restart_networking, change_password,
set_time, set_hostname, start_access_server,
cleanup, update, append,
change_ip, update_legacy)
from firewall.models import Host
import time
import os
from base64 import encodestring
from hashlib import md5
from StringIO import StringIO
from tarfile import TarFile, TarInfo
from django.conf import settings
from django.db.models import Q
from django.utils import timezone
from django.utils.translation import ugettext_noop
from celery.result import TimeoutError
from monitor.client import Client
def send_init_commands(instance, act):
vm = instance.vm_name
queue = instance.get_remote_queue_name("agent")
with act.sub_activity('cleanup', readable_name=ugettext_noop('cleanup')):
cleanup.apply_async(queue=queue, args=(vm, ))
with act.sub_activity('change_password',
readable_name=ugettext_noop('change password')):
change_password.apply_async(queue=queue, args=(vm, instance.pw))
with act.sub_activity('set_time', readable_name=ugettext_noop('set time')):
set_time.apply_async(queue=queue, args=(vm, time.time()))
with act.sub_activity('set_hostname',
readable_name=ugettext_noop('set hostname')):
set_hostname.apply_async(
queue=queue, args=(vm, instance.short_hostname))
def send_networking_commands(instance, act):
queue = instance.get_remote_queue_name("agent")
with act.sub_activity('change_ip',
readable_name=ugettext_noop('change ip')):
change_ip.apply_async(queue=queue, args=(
instance.vm_name, ) + get_network_configs(instance))
with act.sub_activity('restart_networking',
readable_name=ugettext_noop('restart networking')):
restart_networking.apply_async(queue=queue, args=(instance.vm_name, ))
def create_linux_tar():
def exclude(tarinfo):
ignored = ('./.', './misc', './windows')
if any(tarinfo.name.startswith(x) for x in ignored):
return None
else:
return tarinfo
f = StringIO()
with TarFile.open(fileobj=f, mode='w:gz') as tar:
agent_path = os.path.join(settings.AGENT_DIR, "agent-linux")
tar.add(agent_path, arcname='.', filter=exclude)
version_fileobj = StringIO(settings.AGENT_VERSION)
version_info = TarInfo(name='version.txt')
version_info.size = len(version_fileobj.buf)
tar.addfile(version_info, version_fileobj)
return encodestring(f.getvalue()).replace('\n', '')
def create_windows_tar():
f = StringIO()
agent_path = os.path.join(settings.AGENT_DIR, "agent-win")
with TarFile.open(fileobj=f, mode='w|gz') as tar:
tar.add(agent_path, arcname='.')
version_fileobj = StringIO(settings.AGENT_VERSION)
version_info = TarInfo(name='version.txt')
version_info.size = len(version_fileobj.buf)
tar.addfile(version_info, version_fileobj)
return encodestring(f.getvalue()).replace('\n', '')
from manager.mancelery import celery
@celery.task
def agent_started(vm, version=None, system=None):
from vm.models import Instance, InstanceActivity
from vm.models import Instance
instance = Instance.objects.get(id=int(vm.split('-')[-1]))
queue = instance.get_remote_queue_name("agent")
initialized = instance.activity_log.filter(
activity_code='vm.Instance.agent.cleanup').exists()
with instance.activity(code_suffix='agent',
readable_name=ugettext_noop('agent'),
concurrency_check=False) as act:
with act.sub_activity('starting',
readable_name=ugettext_noop('starting')):
pass
for i in InstanceActivity.objects.filter(
(Q(activity_code__endswith='.os_boot') |
Q(activity_code__endswith='.agent_wait')),
instance=instance, finished__isnull=True):
i.finish(True)
if version and version != settings.AGENT_VERSION:
try:
update_agent(instance, act, system, settings.AGENT_VERSION)
except TimeoutError:
pass
else:
act.sub_activity('agent_wait', readable_name=ugettext_noop(
"wait agent restarting"), interruptible=True)
return # agent is going to restart
if not initialized:
measure_boot_time(instance)
send_init_commands(instance, act)
send_networking_commands(instance, act)
with act.sub_activity('start_access_server',
readable_name=ugettext_noop(
'start access server')):
start_access_server.apply_async(queue=queue, args=(vm, ))
def measure_boot_time(instance):
if not instance.template:
return
from vm.models import InstanceActivity
deploy_time = InstanceActivity.objects.filter(
instance=instance, activity_code="vm.Instance.deploy"
).latest("finished").finished
total_boot_time = (timezone.now() - deploy_time).total_seconds()
Client().send([
"template.%(pk)d.boot_time %(val)f %(time)s" % {
'pk': instance.template.pk,
'val': total_boot_time,
'time': time.time(),
}
])
instance.agent_started(
user=instance.owner, old_version=version, agent_system=system)
@celery.task
def agent_stopped(vm):
from vm.models import Instance, InstanceActivity
from vm.models.activity import ActivityInProgressError
instance = Instance.objects.get(id=int(vm.split('-')[-1]))
qs = InstanceActivity.objects.filter(instance=instance,
activity_code='vm.Instance.agent')
qs = InstanceActivity.objects.filter(
instance=instance, activity_code='vm.Instance.agent')
act = qs.latest('id')
try:
with act.sub_activity('stopping',
readable_name=ugettext_noop('stopping')):
pass
except ActivityInProgressError:
with act.sub_activity('stopping', concurrency_check=False,
readable_name=ugettext_noop('stopping')):
pass
def get_network_configs(instance):
interfaces = {}
for host in Host.objects.filter(interface__instance=instance):
interfaces[str(host.mac)] = host.get_network_config()
return (interfaces, settings.FIREWALL_SETTINGS['rdns_ip'])
def update_agent(instance, act=None, system=None, version=None):
if act:
act = act.sub_activity(
'update',
readable_name=create_readable(
ugettext_noop('update to %(version)s'),
version=settings.AGENT_VERSION))
else:
act = instance.activity(
code_suffix='agent.update',
readable_name=create_readable(
ugettext_noop('update agent to %(version)s'),
version=settings.AGENT_VERSION))
with act:
queue = instance.get_remote_queue_name("agent")
if system == "Windows":
executable = os.listdir(os.path.join(settings.AGENT_DIR,
"agent-win"))[0]
# executable = "agent-winservice-%(version)s.exe" % {
# 'version': version}
data = create_windows_tar()
elif system == "Linux":
executable = ""
data = create_linux_tar()
else:
executable = ""
# Legacy update method
return update_legacy.apply_async(
queue=queue,
args=(instance.vm_name, create_linux_tar())
).get(timeout=60)
checksum = md5(data).hexdigest()
chunk_size = 1024 * 1024
chunk_number = 0
index = 0
filename = version + ".tar"
while True:
chunk = data[index:index+chunk_size]
if chunk:
append.apply_async(
queue=queue,
args=(instance.vm_name, chunk,
filename, chunk_number)).get(timeout=60)
index = index + chunk_size
chunk_number = chunk_number + 1
else:
update.apply_async(
queue=queue,
args=(instance.vm_name, filename, executable, checksum)
).get(timeout=60)
break
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment