Commit d8264cd8 by Karsa Zoltán István

linter

parent e2461572
# Python bytecode:
*.py[co]
.tokens
.env
.ruff_cache/
# Packaging files:
*.egg*
......
......@@ -4,22 +4,23 @@ from fastapi import HTTPException
import requests
import json
def proxy_datacenters(serverpath: str, username, method="GET", balancer_fun = rr_get):
def proxy_datacenters(serverpath: str, username, method="GET", balancer_fun=rr_get):
server = balancer_fun()
token = get_datacenter_token(username, server)
url=f"{server}/{serverpath}"
url = f"{server}/{serverpath}"
t_resp = requests.request(
method=method,
url=url,
allow_redirects=False, verify=False,
headers={
'Authorization': token
}
allow_redirects=False,
verify=False,
headers={"Authorization": token},
)
if t_resp.status_code / 100 != 2:
raise HTTPException(status_code=t_resp.status_code, detail="Remote server error")
raise HTTPException(
status_code=t_resp.status_code, detail="Remote server error"
)
response = ORJSONResponse(
json.loads(t_resp.content),
status_code=t_resp.status_code
json.loads(t_resp.content), status_code=t_resp.status_code
)
return response
......@@ -3,6 +3,7 @@ from typing import Dict
import jwt
from passlib.hash import pbkdf2_sha256
from decouple import config
from fastapi import HTTPException
JWT_SECRET = config("secret")
......@@ -10,25 +11,23 @@ JWT_ALGORITHM = config("algorithm")
def token_response(token: str):
return {
"access_token": token
}
return {"access_token": token}
def signJWT(user_id: str) -> Dict[str, str]:
payload = {
"user_id": user_id,
"expires": time.time() + 60000
}
payload = {"user_id": user_id, "expires": time.time() + 60000}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return token_response(token)
def decodeJWT(token: str) -> dict:
try:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["expires"] >= time.time() else None
except:
return {}
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=501, detail="JWT token decode error")
def hash_pass(password: str) -> str:
return pbkdf2_sha256.hash(password)
......@@ -6,18 +6,23 @@ from sredis.models import PUser
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
credentials: HTTPAuthorizationCredentials = await super(
JWTBearer, self
).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
raise HTTPException(
status_code=403, detail="Invalid authentication scheme."
)
cred = self.verify_jwt(credentials.credentials)
if not cred:
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
raise HTTPException(
status_code=403, detail="Invalid token or expired token."
)
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
......@@ -25,21 +30,20 @@ class JWTBearer(HTTPBearer):
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
try:
payload = decodeJWT(jwtoken)
except:
payload = None
if payload:
return payload
return isTokenValid
async def get_current_user(token: str = Depends(JWTBearer())) -> str:
payload = decodeJWT(token)
return payload['user_id']
return payload["user_id"]
async def admin_user(token: str = Depends(JWTBearer())) -> str:
payload = decodeJWT(token)
user = PUser.find(PUser.username == payload['user_id']).all()[0]
user = PUser.find(PUser.username == payload["user_id"]).all()[0]
if not user.admin:
raise HTTPException(status_code=401, detail="you can not have access")
return payload['user_id']
\ No newline at end of file
return payload["user_id"]
from typing import Union
from pydantic import BaseModel, EmailStr
class User(BaseModel):
username: str
email: EmailStr
password: str
class DataCenter(BaseModel):
name: str
class Token(BaseModel):
datacenter: str
token: str
class UserLoginSchema(BaseModel):
username: str
password: str
class Config:
schema_extra = {
"example": {
"username": "user",
"password": "weakpassword"
}
}
\ No newline at end of file
schema_extra = {"example": {"username": "user", "password": "weakpassword"}}
from fastapi import FastAPI, Response, Body, Depends
from typing import List
from balancer.util import proxy_datacenters
from sredis.models import *
from sredis.sredis import check_user, create_puser, add_datacenter, set_token
import logging
import requests
......@@ -10,7 +9,7 @@ from core.auth import signJWT
from core.bearer import get_current_user, admin_user
from redis_om import Migrator
logging.config.fileConfig('logging.conf', disable_existing_loggers=False)
logging.config.fileConfig("logging.conf", disable_existing_loggers=False)
# get root logger
logger = logging.getLogger(__name__)
......@@ -28,41 +27,32 @@ async def create_user(user: User = Body(...)):
create_puser(user)
return signJWT(user.username)
@app.post("/user/login", tags=["user"])
async def user_login(user: UserLoginSchema = Body(...)):
if check_user(user):
return signJWT(user.username)
return {
"error": "Wrong login details!"
}
return {"error": "Wrong login details!"}
@app.get("/lb/{server_path:path}")
def proxy(
server_path: str = "/",
username = Depends(get_current_user)
):
def proxy_get(server_path: str = "/", username=Depends(get_current_user)):
return proxy_datacenters(server_path, username)
@app.post("/lb/{server_path:path}")
def proxy(
server_path: str = "/",
username = Depends(get_current_user)
):
def proxy_post(server_path: str = "/", username=Depends(get_current_user)):
return proxy_datacenters(server_path, username, method="POST")
@app.post("/add_datacenter/")
def create_datacenter(
dc: DataCenter = None,
username = Depends(admin_user)
):
def create_datacenter(dc: DataCenter = None, username=Depends(admin_user)):
add_datacenter(dc.name)
return Response(status_code=201)
@app.post("/set_tokens/")
def set_tokens(
tokens: List[Token] = None,
username = Depends(get_current_user)
):
def set_tokens(tokens: List[Token] = None, username=Depends(get_current_user)):
for token in tokens:
set_token(username, str(token.datacenter), str(token.token))
return tokens
......@@ -18,13 +18,15 @@ python-decouple = "^3.8"
redis-om = "^0.1.2"
pydantic = {extras = ["email"], version = "^1.10.6"}
passlib = "^1.7.4"
black = "^23.1.0"
ruff = "^0.0.254"
[tool.poe.tasks.start]
shell = "poetry run uvicorn main:app --reload --port 6973 --host 0.0.0.0"
help = "Start the microservice on port 6973"
[tool.poe.tasks.lint]
shell = "poetry run black . && poetry run ruff --fix . && poetry run mypy backend"
shell = "poetry run black . && poetry run ruff --fix . "
help = "Lint the most important parts of the microservice with black"
[build-system]
......
from redis_om import HashModel, Field
from pydantic import EmailStr
class PUser(HashModel):
username: str = Field(index=True)
email: EmailStr
......
......@@ -5,13 +5,13 @@ from passlib.hash import pbkdf2_sha256
from core.models import User
from core.auth import hash_pass
from fastapi import HTTPException
from redis_om import Migrator
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
r.set("datacenters_cnt", 1)
all_keys = list(r.hgetall('datacenters_hash').keys())
all_keys = list(r.hgetall("datacenters_hash").keys())
if all_keys:
r.hdel('datacenters_hash', *all_keys)
r.hdel("datacenters_hash", *all_keys)
def add_datacenter(datacenter: str):
cnt = int(r.get("datacenters_cnt"))
......@@ -19,6 +19,7 @@ def add_datacenter(datacenter: str):
r.incr("datacenters_cnt")
r.set("roundrobin_cnt", 1)
def rr_get():
cnt = int(r.get("datacenters_cnt"))
rr = int(r.get("roundrobin_cnt"))
......@@ -28,26 +29,28 @@ def rr_get():
r.incr("roundrobin_cnt")
return str(r.hget("datacenters_hash", rr))
def set_token(username: str, datacenter: str, token: str):
print(f"tokens:{username}" + datacenter)
r.hset(f"tokens:{username}", datacenter, token)
def get_datacenter_token(username: str, datacenter: str):
return str(r.hget(f"tokens:{username}", datacenter))
def check_user(data: UserLoginSchema):
user = PUser.find(PUser.username == data.username).all()
if pbkdf2_sha256.verify(data.password, user[0].password):
return user[0]
return False
def create_puser(user: User):
s = PUser.find(PUser.username == user.username).all()
if s:
raise HTTPException(status_code=403, detail="User already exists")
user = PUser(
username=user.username,
email=user.email,
password=hash_pass(user.password)
username=user.username, email=user.email, password=hash_pass(user.password)
)
user.save()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment