Commit 6890d130 by Karsa Zoltán István

rp pool balancer

parent 3c569d1c
...@@ -18,6 +18,7 @@ STAT_TARGETS = { ...@@ -18,6 +18,7 @@ STAT_TARGETS = {
"cpu_percent": CPU_PERCENT_TARGET, "cpu_percent": CPU_PERCENT_TARGET,
} }
def get_static_info_from_dc(datacenter): def get_static_info_from_dc(datacenter):
url: str = f"{datacenter}/{STATIC_INFO}" url: str = f"{datacenter}/{STATIC_INFO}"
req = requests.request( req = requests.request(
...@@ -34,13 +35,14 @@ def get_static_info_from_dc(datacenter): ...@@ -34,13 +35,14 @@ def get_static_info_from_dc(datacenter):
try: try:
data = json.loads(req.content) data = json.loads(req.content)
core_num = data["core_num"] core_num = data["core_num"]
max_ram = data["max_ram"] max_ram = data["max_ram"]
return core_num, max_ram return core_num, max_ram
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logger.warning(f"JSON decoding error: Static infos") logger.warning("JSON decoding error: Static infos")
return 0, 0 return 0, 0
def get_stat_from_dc(datacenter, retry = False): def get_stat_from_dc(datacenter, retry=False):
stat = {} stat = {}
for key, target in STAT_TARGETS.items(): for key, target in STAT_TARGETS.items():
url: str = f"{datacenter}/{target}" url: str = f"{datacenter}/{target}"
...@@ -56,10 +58,13 @@ def get_stat_from_dc(datacenter, retry = False): ...@@ -56,10 +58,13 @@ def get_stat_from_dc(datacenter, retry = False):
else: else:
try: try:
data = json.loads(req.content) data = json.loads(req.content)
stat[key] = [max( stat[key] = [
i["datapoints"][0][0] if i["datapoints"][0][0] else 0.0, max(
i["datapoints"][1][0] if i["datapoints"][1][0] else 0.0 i["datapoints"][0][0] if i["datapoints"][0][0] else 0.0,
) for i in data] i["datapoints"][1][0] if i["datapoints"][1][0] else 0.0,
)
for i in data
]
if retry and stat[key][0] is None: if retry and stat[key][0] is None:
time.sleep(1) time.sleep(1)
...@@ -69,11 +74,14 @@ def get_stat_from_dc(datacenter, retry = False): ...@@ -69,11 +74,14 @@ def get_stat_from_dc(datacenter, retry = False):
verify=False, verify=False,
) )
data = json.loads(req.content) data = json.loads(req.content)
stat[key] = [max( stat[key] = [
i["datapoints"][0][0] if i["datapoints"][0][0] else 0.0, max(
i["datapoints"][1][0] if i["datapoints"][1][0] else 0.0 i["datapoints"][0][0] if i["datapoints"][0][0] else 0.0,
) for i in data] i["datapoints"][1][0] if i["datapoints"][1][0] else 0.0,
)
for i in data
]
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logger.warning(f"JSON decoding error: key {key}") logger.warning(f"JSON decoding error: key {key}")
stat[key] = 0.0 stat[key] = 0.0
return stat return stat
from fastapi.responses import ORJSONResponse from fastapi.responses import ORJSONResponse
from balancer.stats import get_stat_from_dc
from sredis.sredis import ( from sredis.sredis import (
available_ram,
get_all_datacenter, get_all_datacenter,
get_datacenter_token, get_datacenter_token,
pop_vm, pop_vm,
...@@ -24,12 +24,18 @@ def proxy_datacenters( ...@@ -24,12 +24,18 @@ def proxy_datacenters(
balancer_fun=rr_get, balancer_fun=rr_get,
datacenter=None, datacenter=None,
): ):
vm_ram = 4096
cpu_core = 1
if "ram_size" in body:
vm_ram = body["ram_size"]
if "num_cores" in body:
cpu_core = body["num_cores"]
if "datacenter" in body: if "datacenter" in body:
server = body["datacenter"] server = body["datacenter"]
elif datacenter: elif datacenter:
server = datacenter server = datacenter
else: else:
server = balancer_fun() server = balancer_fun(vm_ram, cpu_core)
token = get_datacenter_token(username, server) token = get_datacenter_token(username, server)
url = f"{server}/{serverpath}" url = f"{server}/{serverpath}"
try: try:
...@@ -85,9 +91,9 @@ def create_vm_from_template( ...@@ -85,9 +91,9 @@ def create_vm_from_template(
def create_rp(username: str, rp: ResourcePoolSchema): def create_rp(username: str, rp: ResourcePoolSchema):
id = 0
errors = 0
if rp.manual: if rp.manual:
id = 0
errors = 0
for dc, cnt in rp.manual.items(): for dc, cnt in rp.manual.items():
for _ in range(cnt): for _ in range(cnt):
try: try:
...@@ -103,13 +109,50 @@ def create_rp(username: str, rp: ResourcePoolSchema): ...@@ -103,13 +109,50 @@ def create_rp(username: str, rp: ResourcePoolSchema):
id += 1 id += 1
except Exception: except Exception:
errors += 1 errors += 1
return id, errors
else: else:
datacenters = get_all_datacenter() details = check_template_details_from_name(rp.from_template, username)
stats = {} ram_sum = details["ram_size"] * rp.num_vms
for dc in datacenters: avail_ram = available_ram()
stats[dc] = get_stat_from_dc(dc) avail_ram_sum = sum(avail_ram.values())
frac = ram_sum / avail_ram_sum
for dc, ram in avail_ram.items():
cnt = int(frac * ram / details["ram_size"])
for _ in range(cnt):
try:
create_vm_from_template(
dc,
username,
name=f"{rp.rpname} #{id}",
template_name=rp.from_template,
)
save_rpvm(rp.rpname, username, dc, id, rp.key)
id += 1
except Exception:
errors += 1
return id, errors
def check_template_details_from_name(template_name, username):
dcs = get_all_datacenter()
details = {"ram_size": 0, "num_cores": 1}
for dc in dcs:
resp = proxy_datacenters(
f"dashboard/acpi/template?name={template_name}", username, datacenter=dc
)
if resp.status_code != 200:
raise HTTPException(
tatus_code=500,
detail=f"Remote server error: {dc} - code {resp.status_code}",
)
details["num_cores"] = max(details["num_cores"], resp["num_cores"])
details["ram_size"] = max(details["ram_size"], resp["ram_size"])
return details
def update_vm_owner(datacenter, vm_id, username): def update_vm_owner(datacenter, vm_id, username):
......
...@@ -132,9 +132,7 @@ def update_datacenter_stat( ...@@ -132,9 +132,7 @@ def update_datacenter_stat(
@app.get("/stats/") @app.get("/stats/")
def get_datacenter_stat( def get_datacenter_stat(username=Depends(get_current_user)):
username=Depends(get_current_user)
):
stats = get_all_stats() stats = get_all_stats()
stats = [ stats = [
ResourceStat( ResourceStat(
...@@ -143,20 +141,19 @@ def get_datacenter_stat( ...@@ -143,20 +141,19 @@ def get_datacenter_stat(
mem_used=x.mem_used, mem_used=x.mem_used,
cpu_usage=x.cpu_usage, cpu_usage=x.cpu_usage,
vm_count=x.vm_count, vm_count=x.vm_count,
cpu_core=x.cpu_core cpu_core=x.cpu_core,
) for x in stats )
for x in stats
] ]
return stats return stats
@app.get("/stats/ram/") @app.get("/stats/ram/")
def get_datacenter_stat( def get_datacenter_stat_ram(username=Depends(get_current_user), vm_ram: int = 1024):
username=Depends(get_current_user), vm_ram: int = 1024
):
stats = valid_ram_datacenters(vm_ram) stats = valid_ram_datacenters(vm_ram)
return stats return stats
@app.get("/tokens/") @app.get("/tokens/")
def get_tokens(username=Depends(get_current_user)): def get_tokens(username=Depends(get_current_user)):
return get_rtokens(username=username) return get_rtokens(username=username)
......
...@@ -65,13 +65,19 @@ def update_datacenter_stats(): ...@@ -65,13 +65,19 @@ def update_datacenter_stats():
stats = get_stat_from_dc(dc, retry=True) stats = get_stat_from_dc(dc, retry=True)
cpu_core, mem_max = get_static_info_from_dc(dc) cpu_core, mem_max = get_static_info_from_dc(dc)
rs = ResourceStat( rs = ResourceStat(
datacenter_name=dc, datacenter_name=dc,
cpu_core=cpu_core, cpu_core=cpu_core,
mem_max=int(mem_max / 1048576) if mem_max else 4096, mem_max=int(mem_max / 1048576) if mem_max else 4096,
mem_allocated=int(stats["mem_allocated"][0] / 1048576.0) if stats["mem_allocated"] else 0, mem_allocated=int(stats["mem_allocated"][0] / 1048576.0)
mem_used=int(stats["mem_usage"][0] * 4096 / 100.0) if stats["mem_usage"][0] else 0, if stats["mem_allocated"]
cpu_usage=float(stats["cpu_percent"][0]) if stats["cpu_percent"][0] else 0.0, else 0,
vm_count=int(stats["vmcount"][0]) if stats["vmcount"][0] else 0 mem_used=int(stats["mem_usage"][0] * 4096 / 100.0)
if stats["mem_usage"][0]
else 0,
cpu_usage=float(stats["cpu_percent"][0])
if stats["cpu_percent"][0]
else 0.0,
vm_count=int(stats["vmcount"][0]) if stats["vmcount"][0] else 0,
) )
update_stats(rs) update_stats(rs)
...@@ -85,8 +91,12 @@ def add_datacenter(datacenter: str): ...@@ -85,8 +91,12 @@ def add_datacenter(datacenter: str):
update_status(datacenter) update_status(datacenter)
def rr_get(): def rr_get(vm_ram, cpu_core):
rr = r.rpoplpush("rrlist", "rrlist") rr = r.rpoplpush("rrlist", "rrlist")
valid = valid_ram_datacenter(rr, vm_ram)
while not valid:
rr = r.rpoplpush("rrlist", "rrlist")
valid = valid_ram_datacenter(rr, vm_ram)
return rr return rr
...@@ -96,6 +106,14 @@ def wr_get(centers: DataCenterSchema): ...@@ -96,6 +106,14 @@ def wr_get(centers: DataCenterSchema):
return random.choice(indexis, weights=())[0] return random.choice(indexis, weights=())[0]
def available_ram():
dcs = DataCenterResource.find().all()
valid_dcs = {}
for dc in dcs:
valid_dcs[dc.datacenter_name] = dc.mem_max - dc.mem_used
return valid_dcs
def set_token(username: str, datacenter: str, token: str): def set_token(username: str, datacenter: str, token: str):
r.hset(f"tokens:{username}", datacenter, token) r.hset(f"tokens:{username}", datacenter, token)
...@@ -161,11 +179,11 @@ def update_stats(stat: ResourceStat): ...@@ -161,11 +179,11 @@ def update_stats(stat: ResourceStat):
mem_used=stat.mem_used, mem_used=stat.mem_used,
cpu_usage=stat.cpu_usage, cpu_usage=stat.cpu_usage,
vm_count=stat.vm_count, vm_count=stat.vm_count,
cpu_core=stat.cpu_core cpu_core=stat.cpu_core,
) )
dc.save() dc.save()
else: else:
dc = dc[0] dc = dc[0]
dc.mem_max = stat.mem_max if stat.mem_max else dc.mem_max dc.mem_max = stat.mem_max if stat.mem_max else dc.mem_max
dc.mem_used = stat.mem_used if stat.mem_used else dc.mem_used dc.mem_used = stat.mem_used if stat.mem_used else dc.mem_used
dc.cpu_usage = stat.cpu_usage if stat.cpu_usage else dc.cpu_usage dc.cpu_usage = stat.cpu_usage if stat.cpu_usage else dc.cpu_usage
...@@ -173,18 +191,25 @@ def update_stats(stat: ResourceStat): ...@@ -173,18 +191,25 @@ def update_stats(stat: ResourceStat):
dc.cpu_core = stat.cpu_core if stat.cpu_core else dc.cpu_core dc.cpu_core = stat.cpu_core if stat.cpu_core else dc.cpu_core
dc.save() dc.save()
def get_stats(datacenter_name :str):
dc = DataCenterResource.find(DataCenterResource.datacenter_name == datacenter_name).get_item() def get_stats(datacenter_name: str):
dc = DataCenterResource.find(
DataCenterResource.datacenter_name == datacenter_name
).get_item()
return dc return dc
def valid_ram_datacenter(datacenter_name :str, vm_mem: int):
dc = DataCenterResource.find(DataCenterResource.datacenter_name == datacenter_name).get_item() def valid_ram_datacenter(datacenter_name: str, vm_mem: int):
dc = DataCenterResource.find(
DataCenterResource.datacenter_name == datacenter_name
).get_item()
if dc is None: if dc is None:
return False return False
if dc.mem_max - dc.mem_used > vm_mem: if dc.mem_max - dc.mem_used > vm_mem:
return True return True
return False return False
def valid_ram_datacenters(vm_mem: int): def valid_ram_datacenters(vm_mem: int):
dcs = DataCenterResource.find().all() dcs = DataCenterResource.find().all()
valid_dcs = [] valid_dcs = []
...@@ -193,6 +218,7 @@ def valid_ram_datacenters(vm_mem: int): ...@@ -193,6 +218,7 @@ def valid_ram_datacenters(vm_mem: int):
valid_dcs.append(dc.datacenter_name) valid_dcs.append(dc.datacenter_name)
return valid_dcs return valid_dcs
def get_all_stats(): def get_all_stats():
dcs = DataCenterResource.find().all() dcs = DataCenterResource.find().all()
return dcs return dcs
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment