Commit fc706883 by Belákovics Ádám

Merge branch 'image_storage' into 'master'

Image & Storage interfaces

See merge request !2
parents a6b560f3 474f62d5
from openstack.exceptions import ResourceNotFound
from interface.image.ImageManager import ImageManager
from interface.image.Image import Image
class OpenstackImageManager(ImageManager):
def __init__(self, openstack) -> None:
super().__init__()
self.openstack = openstack
@staticmethod
def os_image_to_rc_image(os_image):
return Image(
os_image.id,
os_image.name,
os_image.disk_format
)
def upload_file(self, name, path, format):
os_image = self.openstack.image.create_image(
name=name,
filename=path,
disk_format=format
)
return self.os_image_to_rc_image(os_image)
def get(self, id):
try:
os_image = self.openstack.image.get_image(id)
except ResourceNotFound:
return None
return self.os_image_to_rc_image(os_image)
def download(self, id):
return self.openstack.image.download_image(id)
def delete(self, id):
try:
self.openstack.image.delete_image(id)
except ResourceNotFound:
return False
return True
def list(self):
images = []
for os_image in self.openstack.image.images():
images.append(self.os_image_to_rc_image(os_image))
return images
from openstack.exceptions import ResourceNotFound
from interface.storage.Snapshot import Snapshot
from interface.storage.SnapshotManager import SnapshotManager
class OpenstackSnapshotManager(SnapshotManager):
def __init__(self, os) -> None:
super().__init__()
self.openstack = os
@staticmethod
def os_snapshot_to_rc_snapshot(os_snapshot):
return Snapshot(
os_snapshot.id,
os_snapshot.volume_id,
os_snapshot.size,
os_snapshot.status,
os_snapshot.created_at
)
def create_from_volume(self, id):
os_snapshot = self.openstack.block_storage.create_snapshot(volume_id=id)
return self.os_snapshot_to_rc_snapshot(os_snapshot)
def get(self, id):
try:
os_snapshot = self.openstack.block_storage.get_snapshot(id)
except ResourceNotFound:
return None
return self.os_snapshot_to_rc_snapshot(os_snapshot)
def list(self):
snapshots = []
for os_snapshot in self.openstack.block_storage.snapshots():
snapshots.append(self.os_snapshot_to_rc_snapshot(os_snapshot))
return snapshots
def delete(self, id):
try:
self.openstack.block_storage.delete_snapshot(id)
except ResourceNotFound:
return False
return True
from openstack.exceptions import ResourceNotFound
from interface.storage.StorageManager import StorageManager
from interface.storage.Volume import Volume
class OpenstackStorageManager(StorageManager):
def __init__(self, openstack) -> None:
super().__init__()
self.openstack = openstack
@staticmethod
def os_volume_to_rc_volume(os_volume):
return Volume(
os_volume.id,
os_volume.image_id,
os_volume.size,
os_volume.is_bootable,
os_volume.status,
os_volume.created_at
)
def create(self, size):
os_volume = self.openstack.block_storage.create_volume(
size=size
)
return self.os_volume_to_rc_volume(os_volume)
def create_from_image(self, id, size, bootable):
os_volume = self.openstack.block_storage.create_volume(
image_id=id,
size=size,
bootable=bootable
)
return self.os_volume_to_rc_volume(os_volume)
def create_from_snapshot(self, id):
os_volume = self.openstack.block_storage.create_volume(
snapshot_id=id
)
return self.os_volume_to_rc_volume(os_volume)
def get(self, id):
try:
os_volume = self.openstack.block_storage.get_volume(id)
except ResourceNotFound:
return None
return self.os_volume_to_rc_volume(os_volume)
def delete(self, id):
try:
self.openstack.block_storage.delete_volume(id)
except ResourceNotFound:
return False
return True
def list(self):
volumes = []
for os_volume in self.openstack.block_storage.volumes():
volumes.append(self.os_volume_to_rc_volume(os_volume))
return volumes
import json
class Image:
def __init__(self, id, name, format) -> None:
super().__init__()
self.id = id
self.name = name
self.format = format
def __str__(self) -> str:
return self.toJSON()
def toJSON(self):
return json.dumps(self.__dict__)
class ImageManager:
def __init__(self) -> None:
super().__init__()
def upload_file(self, name, path, format):
raise NotImplementedError
def get(self, id):
raise NotImplementedError
def download(self, id):
raise NotImplementedError
def download_to_file(self, id, path):
f = open(path, 'wb')
f.write(self.download(id))
f.close()
def delete(self, id):
raise NotImplementedError
def list(self):
raise NotImplementedError
import json
class Snapshot:
def __init__(self,
id,
volume_id,
size,
status,
created_at
) -> None:
super().__init__()
self.id = id
self.volume_id = volume_id
self.size = size
self.status = status
self.created_at = created_at
def __str__(self) -> str:
return self.toJSON()
def toJSON(self):
return json.dumps(self.__dict__)
class SnapshotManager:
def __init__(self) -> None:
super().__init__()
def create_from_volume(self, id):
raise NotImplementedError
def get(self, id):
raise NotImplementedError
def list(self):
raise NotImplementedError
def delete(self, id):
raise NotImplementedError
class StorageManager:
def __init__(self) -> None:
super().__init__()
def create(self, size):
raise NotImplementedError
def create_from_image(self, id, size, bootable):
return NotImplementedError
def create_from_snapshot(self, id):
raise NotImplementedError
def get(self, id):
raise NotImplementedError
def delete(self, id):
raise NotImplementedError
def list(self):
raise NotImplementedError
import json
class Volume:
def __init__(self,
id,
image_id,
size,
bootable,
status,
created_at
) -> None:
super().__init__()
self.id = id
self.image_id = image_id
self.size = size
self.bootable = bootable
self.status = status
self.created_at = created_at
def __str__(self) -> str:
return self.toJSON()
def toJSON(self):
return json.dumps(self.__dict__)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment