local_agent_tasks.py 8.83 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

Bach Dániel committed
18
from common.models import create_readable
Bach Dániel committed
19 20
from manager.mancelery import celery
from vm.tasks.agent_tasks import (restart_networking, change_password,
21
                                  set_time, set_hostname, start_access_server,
22 23
                                  cleanup, update, append,
                                  change_ip, update_legacy)
Bach Dániel committed
24 25
from firewall.models import Host

Bach Dániel committed
26
import time
27
import os
Bach Dániel committed
28
from base64 import encodestring
29
from hashlib import md5
Bach Dániel committed
30 31 32
from StringIO import StringIO
from tarfile import TarFile, TarInfo
from django.conf import settings
33
from django.db.models import Q
Kálmán Viktor committed
34
from django.utils import timezone
35
from django.utils.translation import ugettext_noop
Bach Dániel committed
36
from celery.result import TimeoutError
Kálmán Viktor committed
37
from monitor.client import Client
Bach Dániel committed
38 39


40 41
def send_init_commands(instance, act):
    vm = instance.vm_name
Bach Dániel committed
42
    queue = instance.get_remote_queue_name("agent")
43
    with act.sub_activity('cleanup', readable_name=ugettext_noop('cleanup')):
Bach Dániel committed
44
        cleanup.apply_async(queue=queue, args=(vm, ))
45 46
    with act.sub_activity('change_password',
                          readable_name=ugettext_noop('change password')):
Bach Dániel committed
47
        change_password.apply_async(queue=queue, args=(vm, instance.pw))
48
    with act.sub_activity('set_time', readable_name=ugettext_noop('set time')):
Bach Dániel committed
49
        set_time.apply_async(queue=queue, args=(vm, time.time()))
50 51
    with act.sub_activity('set_hostname',
                          readable_name=ugettext_noop('set hostname')):
Bach Dániel committed
52
        set_hostname.apply_async(
53
            queue=queue, args=(vm, instance.short_hostname))
Bach Dániel committed
54 55


56 57 58 59 60 61 62 63 64 65 66
def send_networking_commands(instance, act):
    queue = instance.get_remote_queue_name("agent")
    with act.sub_activity('change_ip',
                          readable_name=ugettext_noop('change ip')):
        change_ip.apply_async(queue=queue, args=(
            instance.vm_name, ) + get_network_configs(instance))
    with act.sub_activity('restart_networking',
                          readable_name=ugettext_noop('restart networking')):
        restart_networking.apply_async(queue=queue, args=(instance.vm_name, ))


67
def create_linux_tar():
Bach Dániel committed
68
    def exclude(tarinfo):
69 70
        ignored = ('./.', './misc', './windows')
        if any(tarinfo.name.startswith(x) for x in ignored):
Bach Dániel committed
71 72 73 74 75 76
            return None
        else:
            return tarinfo

    f = StringIO()

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
    with TarFile.open(fileobj=f, mode='w:gz') as tar:
        agent_path = os.path.join(settings.AGENT_DIR, "agent-linux")
        tar.add(agent_path, arcname='.', filter=exclude)

        version_fileobj = StringIO(settings.AGENT_VERSION)
        version_info = TarInfo(name='version.txt')
        version_info.size = len(version_fileobj.buf)
        tar.addfile(version_info, version_fileobj)

    return encodestring(f.getvalue()).replace('\n', '')


def create_windows_tar():
    f = StringIO()

    agent_path = os.path.join(settings.AGENT_DIR, "agent-win")
Bach Dániel committed
93
    with TarFile.open(fileobj=f, mode='w|gz') as tar:
94
        tar.add(agent_path, arcname='.')
Bach Dániel committed
95 96 97 98 99 100 101 102 103

        version_fileobj = StringIO(settings.AGENT_VERSION)
        version_info = TarInfo(name='version.txt')
        version_info.size = len(version_fileobj.buf)
        tar.addfile(version_info, version_fileobj)

    return encodestring(f.getvalue()).replace('\n', '')


Bach Dániel committed
104
@celery.task
105
def agent_started(vm, version=None, system=None):
106
    from vm.models import Instance, InstanceActivity
Bach Dániel committed
107
    instance = Instance.objects.get(id=int(vm.split('-')[-1]))
Bach Dániel committed
108
    queue = instance.get_remote_queue_name("agent")
109 110
    initialized = instance.activity_log.filter(
        activity_code='vm.Instance.agent.cleanup').exists()
Bach Dániel committed
111

112
    with instance.activity(code_suffix='agent',
113
                           readable_name=ugettext_noop('agent'),
114
                           concurrency_check=False) as act:
115 116
        with act.sub_activity('starting',
                              readable_name=ugettext_noop('starting')):
Bach Dániel committed
117
            pass
Bach Dániel committed
118

119
        for i in InstanceActivity.objects.filter(
120 121 122
                (Q(activity_code__endswith='.os_boot') |
                 Q(activity_code__endswith='.agent_wait')),
                instance=instance, finished__isnull=True):
123 124
            i.finish(True)

Bach Dániel committed
125 126
        if version and version != settings.AGENT_VERSION:
            try:
127
                update_agent(instance, act, system, settings.AGENT_VERSION)
Bach Dániel committed
128 129
            except TimeoutError:
                pass
130
            else:
131 132
                act.sub_activity('agent_wait', readable_name=ugettext_noop(
                    "wait agent restarting"), interruptible=True)
133
                return  # agent is going to restart
Bach Dániel committed
134

Bach Dániel committed
135
        if not initialized:
Kálmán Viktor committed
136
            measure_boot_time(instance)
137 138 139
            send_init_commands(instance, act)

        send_networking_commands(instance, act)
140 141 142
        with act.sub_activity('start_access_server',
                              readable_name=ugettext_noop(
                                  'start access server')):
Bach Dániel committed
143
            start_access_server.apply_async(queue=queue, args=(vm, ))
Bach Dániel committed
144 145


Kálmán Viktor committed
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
def measure_boot_time(instance):
    if not instance.template:
        return

    from vm.models import InstanceActivity
    deploy_time = InstanceActivity.objects.filter(
        instance=instance, activity_code="vm.Instance.deploy"
    ).latest("finished").finished

    total_boot_time = (timezone.now() - deploy_time).total_seconds()

    Client().send([
        "template.%(pk)d.boot_time %(val)f %(time)s" % {
            'pk': instance.template.pk,
            'val': total_boot_time,
            'time': time.time(),
        }
    ])


Bach Dániel committed
166 167
@celery.task
def agent_stopped(vm):
168
    from vm.models import Instance, InstanceActivity
169
    from vm.models.activity import ActivityInProgressError
Bach Dániel committed
170 171 172 173
    instance = Instance.objects.get(id=int(vm.split('-')[-1]))
    qs = InstanceActivity.objects.filter(instance=instance,
                                         activity_code='vm.Instance.agent')
    act = qs.latest('id')
174 175 176 177 178
    try:
        with act.sub_activity('stopping',
                              readable_name=ugettext_noop('stopping')):
            pass
    except ActivityInProgressError:
Bach Dániel committed
179
        pass
180 181


Bach Dániel committed
182 183 184 185 186 187 188
def get_network_configs(instance):
    interfaces = {}
    for host in Host.objects.filter(interface__instance=instance):
        interfaces[str(host.mac)] = host.get_network_config()
    return (interfaces, settings.FIREWALL_SETTINGS['rdns_ip'])


189
def update_agent(instance, act=None, system=None, version=None):
190
    if act:
Bach Dániel committed
191
        act = act.sub_activity(
192 193 194 195 196
            'update',
            readable_name=create_readable(
                ugettext_noop('update to %(version)s'),
                version=settings.AGENT_VERSION))
    else:
197 198
        act = instance.activity(
            code_suffix='agent.update',
199 200 201 202 203
            readable_name=create_readable(
                ugettext_noop('update agent to %(version)s'),
                version=settings.AGENT_VERSION))
    with act:
        queue = instance.get_remote_queue_name("agent")
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
        if system == "Windows":
            executable = os.listdir(os.path.join(settings.AGENT_DIR,
                                                 "agent-win"))[0]
            # executable = "agent-winservice-%(version)s.exe" % {
            #   'version': version}
            data = create_windows_tar()
        elif system == "Linux":
            executable = ""
            data = create_linux_tar()
        else:
            executable = ""
            # Legacy update method
            return update_legacy.apply_async(
                queue=queue,
                args=(instance.vm_name, create_linux_tar())
            ).get(timeout=60)

        checksum = md5(data).hexdigest()
        chunk_size = 1024 * 1024
        chunk_number = 0
        index = 0
        filename = version + ".tar"
        while True:
            chunk = data[index:index+chunk_size]
            if chunk:
                append.apply_async(
                    queue=queue,
                    args=(instance.vm_name, chunk,
                          filename, chunk_number)).get(timeout=60)
                index = index + chunk_size
                chunk_number = chunk_number + 1
            else:
                update.apply_async(
                    queue=queue,
                    args=(instance.vm_name, filename, executable, checksum)
                ).get(timeout=60)
                break