diff --git a/liminal/connection/benchling_service.py b/liminal/connection/benchling_service.py index be2b626..0030189 100644 --- a/liminal/connection/benchling_service.py +++ b/liminal/connection/benchling_service.py @@ -2,8 +2,9 @@ import logging import os from typing import Any +from urllib.parse import urlparse -from playwright.async_api import async_playwright +from playwright.async_api import async_playwright, Request import requests from benchling_sdk.auth.client_credentials_oauth2 import ClientCredentialsOAuth2 from benchling_sdk.benchling import Benchling @@ -96,28 +97,23 @@ def __init__( self.use_internal_api = use_internal_api if use_internal_api: try: - authenticated_session = asyncio.get_event_loop().run_until_complete( - self.autogenerate_auth( - connection.tenant_name, - connection.internal_api_admin_email, - connection.internal_api_admin_password, - connection.playwright_data_dir, - ) + authenticated_session, csrf_token = self.autogenerate_auth( + connection.tenant_name, + connection.internal_api_admin_email, + connection.internal_api_admin_password, + connection.playwright_data_dir, ) except SSODisabledError as e: raise SSODisabledError( f"{e} Please provide `internal_api_admin_email` and `internal_api_admin_password` in your BenchlingConnection." ) - except RuntimeError as e: - raise RuntimeError( - f"{e}. If you are running this in a Jupyter notebook, use `nest_asyncio.apply()` to allow the async playwright login to run." - ) self.custom_post_cookies = { "session": authenticated_session, } self.custom_post_headers = { "Referer": f"https://{connection.tenant_name}.benchling.com/", "Content-Type": "application/json", + "x-csrftoken": csrf_token, } LOGGER.info( f"Tenant {connection.tenant_name}: Connected to Benchling internal API." @@ -260,14 +256,14 @@ def upsert_remote_revision_id(self, revision_id: str) -> bool: ) @classmethod - async def autogenerate_auth( + def autogenerate_auth( cls, benchling_tenant: str, email: str | None = None, password: str | None = None, playwright_data_dir: str | None = None, - ) -> str: - """Logs in to Benchling using the admin email and password or playwright and returns the session cookie. + ) -> tuple[str, str]: + """Logs in to Benchling using the admin email and password or playwright and returns the session cookie and CSRF token. If email and password are not passed in or if SSO is set to required on the Benchling tenant, playwright is used to log in. Otherwise, the admin email and password are used to log in.""" with requests.Session() as session: @@ -291,9 +287,16 @@ async def autogenerate_auth( f"admin_email and admin_password not provided when sso is turned off for Benchling tenant {benchling_tenant}." ) if signin_page.status_code == 302: - return await cls.get_authenticated_session_sso_login_playwright( - benchling_tenant, playwright_data_dir - ) + try: + return asyncio.get_event_loop().run_until_complete( + cls.get_authenticated_session_sso_login_playwright( + benchling_tenant, playwright_data_dir + ) + ) + except RuntimeError as e: + raise RuntimeError( + f"{e}. If you are running this in a Jupyter notebook, use `nest_asyncio.apply()` to allow the async playwright login to run." + ) else: raise ValueError( f"Unexpected response: Status code {signin_page.status_code}: {signin_page.text}" @@ -302,8 +305,8 @@ async def autogenerate_auth( @classmethod async def get_authenticated_session_sso_login_playwright( cls, benchling_tenant: str, playwright_data_dir: str | None = None - ) -> str: - """Logs in to Benchling using playwright and returns the session cookie. + ) -> tuple[str, str]: + """Logs in to Benchling using playwright and returns the session cookie and CSRF token. This can be used when SSO is enabled and required on the Benchling tenant.""" LOGGER.info(f"Log into your {benchling_tenant} Benchling tenant...") async with async_playwright() as playwright: @@ -319,6 +322,15 @@ async def get_authenticated_session_sso_login_playwright( ) context = await browser.new_context() page = await context.new_page() + + headers = {} + + async def async_get_request_headers(request: Request) -> None: + if urlparse(request.url).netloc == f"{benchling_tenant}.benchling.com": + headers.update(await request.all_headers()) + + page.on("request", async_get_request_headers) + try: await page.goto(f"https://{benchling_tenant}.benchling.com") except Exception: @@ -340,7 +352,10 @@ async def get_authenticated_session_sso_login_playwright( ) if not session_cookie: raise ValueError("No session cookie found.") - return session_cookie + csrf_token = headers.get("x-csrftoken", None) + if not csrf_token: + raise ValueError("No CSRF token found.") + return session_cookie, csrf_token @classmethod @retry( @@ -351,8 +366,8 @@ async def get_authenticated_session_sso_login_playwright( ) def get_authenticated_session_benchling_admin_login( cls, benchling_tenant: str, email: str, password: str - ) -> str: - """Logs in to Benchling using the admin email and password and returns the session cookie. + ) -> tuple[str, str]: + """Logs in to Benchling using the admin email and password and returns the session cookie and CSRF token. This can be used when SSO is disabled or optional on the Benchling tenant.""" with requests.Session() as session: homepage = session.get(f"https://{benchling_tenant}.benchling.com/signin") @@ -384,5 +399,6 @@ def get_authenticated_session_benchling_admin_login( return ( signin_response.headers["Set-Cookie"] .split("; Secure")[0] - .removeprefix("session=") + .removeprefix("session="), + csrf_token, )