Commit c2d36f81 by Őry Máté

Merge branch 'storage-fixes'

parents 9dcd479e 3f0b123b
......@@ -48,3 +48,9 @@ class create_from_url(AbortableTask):,
def create_empty(Disk, instance, params, user):
Disk.create_empty(instance, params, user,
from storage.models import DataStore
import os
from django.utils import timezone
from datetime import timedelta
from manager.mancelery import celery
import logging
from storage.tasks import remote_tasks
......@@ -21,10 +19,8 @@ def garbage_collector(timeout=15):
:type timeoit: int
for ds in DataStore.objects.all():
time_before = - timedelta(days=1)
file_list = os.listdir(ds.path)
disk_list = [disk.filename for disk in
disk_list = ds.get_deletable_disks()
queue_name = ds.get_remote_queue_name('storage')
for i in set(file_list).intersection(disk_list):"Image: %s at Datastore: %s moved to trash folder." %
This file demonstrates writing tests using the unittest module. These will pass
when you run " test".
Replace this with more appropriate tests for your application.
from django.test import TestCase
class SimpleTest(TestCase):
def test_basic_addition(self):
Tests that 1 + 1 always equals 2.
self.assertEqual(1 + 1, 2)
from datetime import timedelta
from django.test import TestCase
from django.utils import timezone
from ..models import Disk, DataStore
old = - timedelta(days=2)
new = - timedelta(hours=2)
class DiskTestCase(TestCase):
n = 0
def setUp(self):
self.ds = DataStore.objects.create(path="/datastore",
hostname="devenv", name="default")
def _disk(self, destroyed=None, base=None):
self.n += 1
n = "d%d" % self.n
return Disk.objects.create(name=n, filename=n, base=base, size=1,
destroyed=destroyed, datastore=self.ds)
def test_deletable_not_destroyed(self):
d = self._disk()
assert not d.is_deletable()
def test_deletable_newly_destroyed(self):
d = self._disk(destroyed=new)
assert not d.is_deletable()
def test_deletable_no_child(self):
d = self._disk(destroyed=old)
assert d.is_deletable()
def test_deletable_child_not_destroyed(self):
d = self._disk()
self._disk(base=d, destroyed=old)
assert not d.is_deletable()
def test_deletable_child_newly_destroyed(self):
d = self._disk(destroyed=old)
self._disk(base=d, destroyed=new)
assert not d.is_deletable()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment