Commit 6fe422c2 by Őry Máté

Merge branch 'garbage-collector'

Closes #74
parents 1200c6f8 0bd5e524
......@@ -11,7 +11,8 @@ celery = Celery('manager', backend='amqp',
'vm.tasks.local_periodic_tasks',
'vm.tasks.local_agent_tasks',
'storage.tasks.local_tasks',
'firewall.tasks.local_tasks'])
'storage.tasks.periodic_tasks',
'firewall.tasks.local_tasks', ])
celery.conf.update(
CELERY_TASK_RESULT_EXPIRES=300,
......@@ -32,6 +33,11 @@ celery.conf.update(
'schedule': timedelta(seconds=10),
'options': {'queue': 'localhost.man'}
},
'storage.periodic_tasks': {
'task': 'storage.tasks.periodic_tasks.garbage_collector',
'schedule': timedelta(hours=1),
'options': {'queue': 'localhost.man'}
},
}
)
from storage.models import DataStore
import os
from django.utils import timezone
from datetime import timedelta
from manager.mancelery import celery
import logging
from storage.tasks import remote_tasks
logger = logging.getLogger(__name__)
@celery.task
def garbage_collector(timeout=15):
""" Garbage collector for disk images.
Moves 1 day old deleted images to trash folder.
If there is not enough free space on datastore (default 10%)
deletes oldest images from trash.
:param timeout: Seconds before TimeOut exception
:type timeoit: int
"""
for ds in DataStore.objects.all():
time_before = timezone.now() - timedelta(days=1)
file_list = os.listdir(ds.path)
disk_list = [disk.filename for disk in
ds.disk_set.filter(destroyed__lt=time_before)]
queue_name = ds.get_remote_queue_name('storage')
for i in set(file_list).intersection(disk_list):
logger.info("Image: %s at Datastore: %s moved to trash folder." %
(i, ds.path))
remote_tasks.move_to_trash.apply_async(
args=[ds.path, i], queue=queue_name).get(timeout=timeout)
try:
remote_tasks.make_free_space.apply_async(
args=[ds.path], queue=queue_name).get(timeout=timeout)
except Exception as e:
logger.warning(str(e))
@celery.task
def list_orphan_disks(timeout=15):
"""List disk image files without Disk object in the database.
Exclude cloud-xxxxxxxx.dump format images.
:param timeout: Seconds before TimeOut exception
:type timeoit: int
"""
import re
for ds in DataStore.objects.all():
queue_name = ds.get_remote_queue_name('storage')
files = set(remote_tasks.list_files.apply_async(
args=[ds.path], queue=queue_name).get(timeout=timeout))
disks = set([disk.filename for disk in ds.disk_set.all()])
for i in files - disks:
if not re.match('cloud-[0-9]*\.dump', i):
logging.warning("Orphan disk: %s" % i)
@celery.task
def list_missing_disks(timeout=15):
"""List Disk objects without disk image files.
:param timeout: Seconds before TimeOut exception
:type timeoit: int
"""
for ds in DataStore.objects.all():
queue_name = ds.get_remote_queue_name('storage')
files = set(remote_tasks.list_files.apply_async(
args=[ds.path], queue=queue_name).get(timeout=timeout))
disks = set([disk.filename for disk in
ds.disk_set.filter(destroyed__isnull=True)])
for i in disks - files:
logging.critical("Image: %s is missing from %s datastore."
% (i, ds.path))
......@@ -6,6 +6,11 @@ def list(dir):
pass
@celery.task(name='storagedriver.list_files')
def list_files(dir):
pass
@celery.task(name='storagedriver.create')
def create(disk_desc):
pass
......@@ -39,3 +44,18 @@ def get(path):
@celery.task(name='storagedriver.merge')
def merge(src_disk_desc, dst_disk_desc):
pass
@celery.task(name='storagedriver.make_free_space')
def make_free_space(datastore, percent):
pass
@celery.task(name='storagedriver.move_to_trash')
def move_to_trash(datastore, disk_path):
pass
@celery.task(name='storagedriver.get_storage_stat')
def get_storage_stat(path):
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment