Commit 0282f543 by Bach Dániel

Merge branch 'feature-priority-queues' into 'master'

Feature Priority Queues

Updated to use priority queues
parents 3f72953c a830d9f3
...@@ -44,12 +44,14 @@ class Disk(object): ...@@ -44,12 +44,14 @@ class Disk(object):
@classmethod @classmethod
def deserialize(cls, desc): def deserialize(cls, desc):
"""Create cls object from JSON."""
logging.info(desc) logging.info(desc)
if isinstance(desc, basestring): if isinstance(desc, basestring):
desc = json.loads(desc) desc = json.loads(desc)
return cls(**desc) return cls(**desc)
def get_desc(self): def get_desc(self):
"""Create dict from Disk object."""
return { return {
'name': self.name, 'name': self.name,
'dir': self.dir, 'dir': self.dir,
...@@ -59,9 +61,11 @@ class Disk(object): ...@@ -59,9 +61,11 @@ class Disk(object):
} }
def get_path(self): def get_path(self):
"""Get absolute path for disk."""
return os.path.realpath(self.dir + '/' + self.name) return os.path.realpath(self.dir + '/' + self.name)
def get_base(self): def get_base(self):
"""Get absolute path for disk's base image."""
return os.path.realpath(self.dir + '/' + self.base_name) return os.path.realpath(self.dir + '/' + self.base_name)
def __unicode__(self): def __unicode__(self):
...@@ -70,8 +74,7 @@ class Disk(object): ...@@ -70,8 +74,7 @@ class Disk(object):
@classmethod @classmethod
def get(cls, dir, name): def get(cls, dir, name):
''' Create disk from path """Create disk from path."""
'''
path = os.path.realpath(dir + '/' + name) path = os.path.realpath(dir + '/' + name)
output = subprocess.check_output(['qemu-img', 'info', path]) output = subprocess.check_output(['qemu-img', 'info', path])
...@@ -92,9 +95,9 @@ class Disk(object): ...@@ -92,9 +95,9 @@ class Disk(object):
return Disk(dir, name, format, size, base, type) return Disk(dir, name, format, size, base, type)
def create(self): def create(self):
''' Creating new image format specified at self.format. """ Creating new image format specified at self.format.
self.format van be "qcow2-normal" self.format can be "qcow2-normal"
''' """
# Check if type is avaliable to create # Check if type is avaliable to create
if self.format not in self.CREATE_FORMATS: if self.format not in self.CREATE_FORMATS:
raise Exception('Invalid format: %s' % self.format) raise Exception('Invalid format: %s' % self.format)
...@@ -114,7 +117,7 @@ class Disk(object): ...@@ -114,7 +117,7 @@ class Disk(object):
subprocess.check_output(cmdline) subprocess.check_output(cmdline)
def download(self, task, url, parent_id=None): # noqa def download(self, task, url, parent_id=None): # noqa
''' Download image from url. ''' """Download image from url."""
disk_path = self.get_path() disk_path = self.get_path()
logger.info("Downloading image from %s to %s", url, disk_path) logger.info("Downloading image from %s to %s", url, disk_path)
r = requests.get(url, stream=True) r = requests.get(url, stream=True)
...@@ -221,8 +224,8 @@ class Disk(object): ...@@ -221,8 +224,8 @@ class Disk(object):
subprocess.check_output(cmdline) subprocess.check_output(cmdline)
def merge(self, new_disk): def merge(self, new_disk):
''' Merging a new_disk from the actual disk and its base. """ Merging a new_disk from the actual disk and its base.
''' """
# Check if file already exists # Check if file already exists
if os.path.isfile(new_disk.get_path()): if os.path.isfile(new_disk.get_path()):
raise Exception('File already exists: %s' % self.get_path()) raise Exception('File already exists: %s' % self.get_path())
...@@ -240,11 +243,11 @@ class Disk(object): ...@@ -240,11 +243,11 @@ class Disk(object):
copy(self.get_path(), new_disk.get_path()) copy(self.get_path(), new_disk.get_path())
def delete(self): def delete(self):
''' Delete file """ Delete file. """
'''
if os.path.isfile(self.get_path()): if os.path.isfile(self.get_path()):
os.unlink(self.get_path()) os.unlink(self.get_path())
@classmethod @classmethod
def list(cls, dir): def list(cls, dir):
""" List all files in <dir> directory."""
return [cls.get(dir, file) for file in os.listdir(dir)] return [cls.get(dir, file) for file in os.listdir(dir)]
description "IK Cloud Django Development Server"
start on runlevel [2345]
stop on runlevel [!2345]
pre-start script
hostname=$(hostname -s)
for inst in storage.fast storage.slow
do
start storagecelery NAME=$hostname.$inst || :
done
end script
post-stop script
for inst in `initctl list|grep "^storagecelery "|awk '{print $2}'|tr -d ')'|tr -d '('`
do
stop storagecelery NAME=$inst || :
done
end script
description "CIRCLE Storage Driver Celery Upstart" description "IK Cloud Django Development Server"
start on runlevel [2345]
stop on runlevel [!2345]
respawn respawn
respawn limit 30 30 respawn limit 30 30
setgid cloud
setuid cloud setuid cloud
setgid cloud
chdir /home/cloud/storagedriver instance $NAME
script script
. /home/cloud/.virtualenvs/storage/local/bin/activate cd /home/cloud/storagedriver/
. /home/cloud/.virtualenvs/storage/local/bin/postactivate . /home/cloud/.virtualenvs/storagedriver/local/bin/activate
HOSTNAME=$(hostname -s) . /home/cloud/.virtualenvs/storagedriver/local/bin/postactivate
exec celery -A storagecelery worker --loglevel=info -n ${HOSTNAME}.storage exec celery -A storagecelery worker --loglevel=info -n $NAME
end script end script
from celery import Celery from celery import Celery
from kombu import Queue, Exchange from kombu import Queue, Exchange
from socket import gethostname
from os import getenv from os import getenv
HOSTNAME = gethostname() from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-n", "--hostname", dest="hostname",
help="Define the full queue name with"
"with priority", metavar="hostname.queue.priority")
(args, unknwon_args) = parser.parse_known_args()
HOSTNAME = vars(args).pop("hostname")
if HOSTNAME is None:
raise Exception("You must define hostname as -n <hostname> or "
"--hostname=<hostname>.\n"
"Hostname format must be hostname.module.priority.")
AMQP_URI = getenv('AMQP_URI') AMQP_URI = getenv('AMQP_URI')
CACHE_URI = getenv('CACHE_URI') CACHE_URI = getenv('CACHE_URI')
...@@ -15,7 +27,7 @@ celery.conf.update( ...@@ -15,7 +27,7 @@ celery.conf.update(
CELERY_CACHE_BACKEND=CACHE_URI, CELERY_CACHE_BACKEND=CACHE_URI,
CELERY_TASK_RESULT_EXPIRES=300, CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=( CELERY_QUEUES=(
Queue(HOSTNAME + '.storage', Exchange( Queue(HOSTNAME, Exchange(
'storagedriver', type='direct'), routing_key='storagedriver'), 'storagedriver', type='direct'), routing_key='storagedriver'),
) )
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment