Commit 7ef374f6 by Czémán Arnold

Extends save() and restore() functions with ceph block device support

* vmdriver: modify save() and restore() functions
* privcelery: celery daemon for priviliged operations on the temporary dump file
* ceph: ceph block device operations
* util: helper function which returns the privcelery's queuename
* Add init scripts for privcelery
parent 456da2c9
import rados
import rbd
import os
from privcelery import celery
class CephConnection:
def __init__(self, pool_name, ceph_config=None):
self.pool_name = pool_name
self.ceph_config = ceph_config
self.cluster = None
self.ioctx = None
def __enter__(self):
try:
if self.ceph_config is None:
self.ceph_config = os.getenv("CEPH_CONFIG",
"/etc/ceph/ceph.conf")
self.cluster = rados.Rados(conffile=self.ceph_config)
self.cluster.connect(timeout=2)
self.ioctx = self.cluster.open_ioctx(self.pool_name)
except rados.InterruptedOrTimeoutError as e:
raise Exception(e)
return self
def __exit__(self, type, value, traceback):
self.ioctx.close()
self.cluster.shutdown()
@celery.task
def write_to_ceph_block_device(poolname, diskname):
diskname = str(diskname)
path = "/tmp/" + diskname
statinfo = os.stat(path)
disk_size = statinfo.st_size
with open(path, "rb") as f:
with CephConnection(str(poolname)) as conn:
rbd_inst = rbd.RBD()
try:
rbd_inst.create(conn.ioctx, diskname, disk_size)
except rbd.ImageExists:
rbd_inst.remove(conn.ioctx, diskname)
rbd_inst.create(conn.ioctx, diskname, disk_size)
try:
with rbd.Image(conn.ioctx, diskname) as image:
offset = 0
data = f.read(4096)
while data:
offset += image.write(data, offset)
data = f.read(4096)
except:
rbd_inst.remove(conn.ioctx, diskname)
raise
@celery.task
def read_from_ceph_block_device(poolname, diskname):
diskname = str(diskname)
path = "/tmp/" + diskname
try:
with open(path, "wb") as f:
with CephConnection(str(poolname)) as conn:
with rbd.Image(conn.ioctx, diskname) as image:
offset = 0
size = image.size()
while offset < size - 4096:
data = image.read(offset, 4096)
f.write(data)
offset += 4096
data = image.read(offset, size - offset)
f.write(data)
rbd_inst = rbd.RBD()
rbd_inst.remove(conn.ioctx, diskname)
except:
with CephConnection(str(poolname)) as conn:
rbd_inst = rbd.RBD()
rbd_inst.remove(conn.ioctx, diskname)
remove_temp_file(path)
raise
@celery.task
def remove_temp_file(path):
os.unlink(path)
......@@ -13,6 +13,11 @@ pre-start script
do
start netcelery NAME=$hostname.$inst || :
done
for inst in vm.priv
do
start privcelery NAME=$hostname.$inst || :
done
end script
post-stop script
......@@ -24,4 +29,8 @@ post-stop script
do
stop netcelery NAME=$inst || :
done
for inst in `initctl list|grep "^privcelery "|awk '{print $2}'|tr -d ')'|tr -d '('`
do
stop privcelery NAME=$inst || :
done
end script
......@@ -4,6 +4,7 @@ After=network.target
BindsTo=netcelery@net.fast.service
BindsTo=vmcelery@vm.fast.service
BindsTo=vmcelery@vm.slow.service
BindsTo=privcelery@vm.priv.service
BindsTo=agentdriver.service
[Service]
......
description "IK Cloud Django Development Server"
respawn
respawn limit 30 30
setuid root
setgid root
instance $NAME
script
cd /home/cloud/vmdriver
. /home/cloud/.virtualenvs/vmdriver/local/bin/activate
. /home/cloud/.virtualenvs/vmdriver/local/bin/postactivate
exec celery -A privcelery worker --loglevel=info -n ${NAME}
end script
[Unit]
Description=privcelery %I
BindsTo=node.service
[Service]
User=root
Group=root
KillSignal=SIGTERM
TimeoutStopSec=600
Restart=always
WorkingDirectory=/home/cloud/vmdriver
ExecStart=/bin/bash -c "source /etc/profile; source /home/cloud/.virtualenvs/vmdriver/bin/activate; source /home/cloud/.virtualenvs/vmdriver/bin/postactivate; export C_FORCE_ROOT=\"true\"; exec celery -A privcelery worker --loglevel=info -n $(/bin/hostname -s).%I"
""" Celery module for privileged RPC calls. """
from celery import Celery
from kombu import Queue, Exchange
from os import getenv
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-n", "--hostname", dest="hostname",
help="Define the full queue name with"
"with priority", metavar="hostname.queue.priority")
(args, unknwon_args) = parser.parse_known_args()
HOSTNAME = vars(args).pop("hostname")
if HOSTNAME is None:
raise Exception("You must define hostname as -n <hostname> or "
"--hostname=<hostname>.\n"
"Hostname format must be hostname.module.priority.")
AMQP_URI = getenv('AMQP_URI')
celery = Celery('privcelery',
broker=AMQP_URI,
include=['ceph'])
celery.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=(
Queue(HOSTNAME, Exchange(
'ceph', type='direct'), routing_key="ceph"),
)
)
import socket
import logging
logger = logging.getLogger(__name__)
def get_priviliged_queue_name():
hostname = socket.gethostname()
logger.debug("Checking for vmdriver priviliged queue %s.vm.priv",
hostname)
queue_name = hostname + '.vm.priv'
return queue_name
......@@ -16,6 +16,12 @@ from vm import VMInstance, VMDisk, CephVMDisk, VMNetwork
from vmcelery import celery, lib_connection, to_bool
from ceph import (write_to_ceph_block_device,
read_from_ceph_block_device,
remove_temp_file)
from util import get_priviliged_queue_name
sys.path.append(os.path.dirname(os.path.basename(__file__)))
vm_xml_dump = None
......@@ -324,17 +330,31 @@ def suspend(name):
@celery.task
@req_connection
@wrap_libvirtError
def save(name, path):
def save(name, data_store_type, dir, filename):
""" Stop virtual machine and save its memory to path. """
domain = lookupByName(name)
domain.save(path)
if data_store_type == "ceph_block":
try:
path = "/tmp/" + filename
domain.save(path)
queue_name = get_priviliged_queue_name()
write_to_ceph_block_device.apply_async(
args=[dir, filename], queue=queue_name).get(timeout=300)
remove_temp_file.apply_async(
args=[path], queue=queue_name).get(timeout=300)
except:
remove_temp_file.apply_async(
args=[path], queue=queue_name).get(timeout=300)
raise
else:
path = dir + "/" + filename
domain.save(path)
@celery.task
@req_connection
@wrap_libvirtError
def restore(name, path):
def restore(name, data_store_type, dir, filename):
""" Restore a saved virtual machine.
Restores the virtual machine from the memory image
......@@ -342,7 +362,18 @@ def restore(name, path):
Return the domain info dict.
"""
Connection.get().restore(path)
if data_store_type == "ceph_block":
path = "/tmp/" + filename
queue_name = get_priviliged_queue_name()
read_from_ceph_block_device.apply_async(
args=[dir, filename], queue=queue_name).get(timeout=300)
Connection.get().restore(path)
remove_temp_file.apply_async(
args=[path], queue=queue_name).get(timeout=300)
else:
path = dir + "/" + filename
Connection.get().restore(path)
return domain_info(name)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment