Using Okta with FastAPI

From PeformIQ Upgrade
Jump to navigation Jump to search


# -----------------------------------------------------------------------------
# Okta Login
# -----------------------------------------------------------------------------

okta_config = {
  "auth_uri": "https://dev-xxxx.okta.com/oauth2/default/v1/authorize",
  "client_id": "xxxx",
  "client_secret": "xxxx-",
  "redirect_uri": "http://127.0.0.1:8000/authorization-code/callback",
  "issuer": "https://dev-xxxx.okta.com/oauth2/default",
  "token_uri": "https://dev-xxxx.okta.com/oauth2/default/v1/token",
  "userinfo_uri": "https://dev-xxxx.okta.com/oauth2/default/v1/userinfo"
}

APP_STATE = 'ApplicationState'
NONCE     = 'SampleNonce'
BASE_URL  = "https://dev-xxxx.okta.com/oauth2/default/v1/authorize"

@app.get("/login", response_class=HTMLResponse)
def login():
    # get request params
    query_params = {'client_id': okta_config["client_id"],
                    'redirect_uri': okta_config["redirect_uri"],
                    'scope': "openid email profile",
                    'state': APP_STATE,
                    'nonce': NONCE,
                    'response_type': 'code',
                    'response_mode': 'query'}

    encoded_params = requests.compat.urlencode(query_params)

    print(f"  encoded_params |{encoded_params}|")

    request_uri = f"{okta_config['auth_uri']}?{encoded_params}"

    print(f"  request_uri |{request_uri}|")

    return RedirectResponse(url=request_uri, status_code=303)

# -----------------------------------------------------------------------------

@app.get("/authorization-code/callback")
def callback(code: str, state: str):
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    # code = request.args.get("code")
    if not code:
        return "The code was not returned or is not accessible", 403

    print(f"                   code |{code}|")
    print(f"                  state |{state}|")

    query_params = {'grant_type': 'authorization_code',
                    'code': code,
                    'redirect_uri': okta_config["redirect_uri"]
                    }

    encoded_params = requests.compat.urlencode(query_params)

    print(f"         encoded_params |{encoded_params}|")

    exchange = requests.post(
        okta_config["token_uri"],
        headers=headers,
        data=encoded_params,
        auth=(okta_config["client_id"], okta_config["client_secret"]),
    ).json()

    # Get tokens and validate

    token_type = exchange.get("token_type")
    print(f"             token_type |{token_type}|")

    #if not exchange.get("token_type"):
    #    return "Unsupported token type. Should be 'Bearer'.", 403

    access_token = exchange["access_token"]

    print(f"           access_token |{access_token}|")

    id_token = exchange["id_token"]

    print(f"               id_token |{id_token}|")

    #if not is_access_token_valid(access_token, okta_config["issuer"]):
    #    return "Access token is invalid", 403

    #if not is_id_token_valid(id_token, config["issuer"], okta_config["client_id"], NONCE):
    #    return "ID token is invalid", 403

    # Authorization flow successful, get userinfo and login user
    userinfo_response = requests.get(okta_config["userinfo_uri"],
                                     headers={'Authorization': f'Bearer {access_token}'}).json()

    unique_id  = userinfo_response["sub"]
    user_email = userinfo_response["email"]
    user_name  = userinfo_response["given_name"]

    print(f"              unique_id |{unique_id}|")
    print(f"             user_email |{user_email}|")
    print(f"              user_name |{user_name}|")


    #user = User(
    #    user_id=unique_id, name=user_name, email=user_email
    #)

    #if not User.get(unique_id):
    #    User.create(unique_id, user_name, user_email)

    # login_user(user)

    return RedirectResponse(url="/profile", status_code=303)

# -----------------------------------------------------------------------------

@app.get("/profile", response_class=HTMLResponse)
async def profile(request: Request):

    ctx = Context("Home")
    
    user_info = {"user_id": "asasd", "name": "aaaa", "email": "bbb@bbb.com"}

    #ctx.user = User(**user_info)
    ctx.user = User.create("asasd", "aaaa", "bbb@bbb.com")

    return templates.TemplateResponse("profile.html", {"request": request, "ctx": ctx})

# -----------------------------------------------------------------------------

@app.post("/logout", response_class=HTMLResponse)
async def logout(request: Request):
    return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)

# -----------------------------------------------------------------------------