Commit bc83f8c2 by Czémán Arnold

Rework ceph support, new solution use block device mapping; remove privileged celery solution

parent 7ef374f6
import rados
import rbd
import os
import subprocess
from privcelery import celery
DUMP_SIZE_LIMIT = int(os.getenv("DUMP_SIZE_LIMIT", 20 * 1024 ** 3)) # 20GB
class CephConnection:
......@@ -33,59 +35,50 @@ class CephConnection:
self.cluster.shutdown()
@celery.task
def write_to_ceph_block_device(poolname, diskname):
diskname = str(diskname)
path = "/tmp/" + diskname
statinfo = os.stat(path)
disk_size = statinfo.st_size
with open(path, "rb") as f:
with CephConnection(str(poolname)) as conn:
rbd_inst = rbd.RBD()
try:
rbd_inst.create(conn.ioctx, diskname, disk_size)
except rbd.ImageExists:
rbd_inst.remove(conn.ioctx, diskname)
rbd_inst.create(conn.ioctx, diskname, disk_size)
try:
with rbd.Image(conn.ioctx, diskname) as image:
offset = 0
data = f.read(4096)
while data:
offset += image.write(data, offset)
data = f.read(4096)
except:
rbd_inst.remove(conn.ioctx, diskname)
raise
@celery.task
def read_from_ceph_block_device(poolname, diskname):
diskname = str(diskname)
path = "/tmp/" + diskname
def sudo(*args):
try:
with open(path, "wb") as f:
with CephConnection(str(poolname)) as conn:
with rbd.Image(conn.ioctx, diskname) as image:
offset = 0
size = image.size()
while offset < size - 4096:
data = image.read(offset, 4096)
f.write(data)
offset += 4096
data = image.read(offset, size - offset)
f.write(data)
rbd_inst = rbd.RBD()
rbd_inst.remove(conn.ioctx, diskname)
subprocess.check_output(["/bin/sudo"] + list(args))
except subprocess.CalledProcessError as e:
raise Exception(e)
def map_rbd(ceph_path, local_path):
try:
sudo("/bin/rbd", "map", ceph_path)
except:
with CephConnection(str(poolname)) as conn:
rbd_inst = rbd.RBD()
sudo("/bin/rbd", "unmap", local_path)
sudo("/bin/rbd", "map", ceph_path)
def save(domain, poolname, diskname):
diskname = str(diskname)
poolname = str(poolname)
ceph_path = "%s/%s" % (poolname, diskname)
local_path = "/dev/rbd/" + ceph_path
disk_size = DUMP_SIZE_LIMIT
with CephConnection(poolname) as conn:
rbd_inst = rbd.RBD()
try:
rbd_inst.create(conn.ioctx, diskname, disk_size)
except rbd.ImageExists:
rbd_inst.remove(conn.ioctx, diskname)
remove_temp_file(path)
raise
rbd_inst.create(conn.ioctx, diskname, disk_size)
try:
map_rbd(ceph_path, local_path)
domain.save(local_path)
except:
rbd_inst.remove(conn.ioctx, diskname)
raise
@celery.task
def remove_temp_file(path):
os.unlink(path)
def restore(connection, poolname, diskname):
diskname = str(diskname)
poolname = str(poolname)
local_path = "/dev/rbd/%s/%s" % (poolname, diskname)
connection.restore(local_path)
sudo("/bin/rbd", "unmap", local_path)
with CephConnection(poolname) as conn:
rbd_inst = rbd.RBD()
rbd_inst.remove(conn.ioctx, diskname)
......@@ -13,11 +13,6 @@ pre-start script
do
start netcelery NAME=$hostname.$inst || :
done
for inst in vm.priv
do
start privcelery NAME=$hostname.$inst || :
done
end script
post-stop script
......@@ -29,8 +24,4 @@ post-stop script
do
stop netcelery NAME=$inst || :
done
for inst in `initctl list|grep "^privcelery "|awk '{print $2}'|tr -d ')'|tr -d '('`
do
stop privcelery NAME=$inst || :
done
end script
......@@ -4,7 +4,6 @@ After=network.target
BindsTo=netcelery@net.fast.service
BindsTo=vmcelery@vm.fast.service
BindsTo=vmcelery@vm.slow.service
BindsTo=privcelery@vm.priv.service
BindsTo=agentdriver.service
[Service]
......
description "IK Cloud Django Development Server"
respawn
respawn limit 30 30
setuid root
setgid root
instance $NAME
script
cd /home/cloud/vmdriver
. /home/cloud/.virtualenvs/vmdriver/local/bin/activate
. /home/cloud/.virtualenvs/vmdriver/local/bin/postactivate
exec celery -A privcelery worker --loglevel=info -n ${NAME}
end script
[Unit]
Description=privcelery %I
BindsTo=node.service
[Service]
User=root
Group=root
KillSignal=SIGTERM
TimeoutStopSec=600
Restart=always
WorkingDirectory=/home/cloud/vmdriver
ExecStart=/bin/bash -c "source /etc/profile; source /home/cloud/.virtualenvs/vmdriver/bin/activate; source /home/cloud/.virtualenvs/vmdriver/bin/postactivate; export C_FORCE_ROOT=\"true\"; exec celery -A privcelery worker --loglevel=info -n $(/bin/hostname -s).%I"
""" Celery module for privileged RPC calls. """
from celery import Celery
from kombu import Queue, Exchange
from os import getenv
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-n", "--hostname", dest="hostname",
help="Define the full queue name with"
"with priority", metavar="hostname.queue.priority")
(args, unknwon_args) = parser.parse_known_args()
HOSTNAME = vars(args).pop("hostname")
if HOSTNAME is None:
raise Exception("You must define hostname as -n <hostname> or "
"--hostname=<hostname>.\n"
"Hostname format must be hostname.module.priority.")
AMQP_URI = getenv('AMQP_URI')
celery = Celery('privcelery',
broker=AMQP_URI,
include=['ceph'])
celery.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=(
Queue(HOSTNAME, Exchange(
'ceph', type='direct'), routing_key="ceph"),
)
)
import socket
import logging
logger = logging.getLogger(__name__)
def get_priviliged_queue_name():
hostname = socket.gethostname()
logger.debug("Checking for vmdriver priviliged queue %s.vm.priv",
hostname)
queue_name = hostname + '.vm.priv'
return queue_name
......@@ -16,10 +16,7 @@ from vm import VMInstance, VMDisk, CephVMDisk, VMNetwork
from vmcelery import celery, lib_connection, to_bool
from ceph import (write_to_ceph_block_device,
read_from_ceph_block_device,
remove_temp_file)
from util import get_priviliged_queue_name
import ceph
sys.path.append(os.path.dirname(os.path.basename(__file__)))
......@@ -334,18 +331,7 @@ def save(name, data_store_type, dir, filename):
""" Stop virtual machine and save its memory to path. """
domain = lookupByName(name)
if data_store_type == "ceph_block":
try:
path = "/tmp/" + filename
domain.save(path)
queue_name = get_priviliged_queue_name()
write_to_ceph_block_device.apply_async(
args=[dir, filename], queue=queue_name).get(timeout=300)
remove_temp_file.apply_async(
args=[path], queue=queue_name).get(timeout=300)
except:
remove_temp_file.apply_async(
args=[path], queue=queue_name).get(timeout=300)
raise
ceph.save(domain, dir, filename)
else:
path = dir + "/" + filename
domain.save(path)
......@@ -363,13 +349,7 @@ def restore(name, data_store_type, dir, filename):
"""
if data_store_type == "ceph_block":
path = "/tmp/" + filename
queue_name = get_priviliged_queue_name()
read_from_ceph_block_device.apply_async(
args=[dir, filename], queue=queue_name).get(timeout=300)
Connection.get().restore(path)
remove_temp_file.apply_async(
args=[path], queue=queue_name).get(timeout=300)
ceph.restore(Connection.get(), dir, filename)
else:
path = dir + "/" + filename
Connection.get().restore(path)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment