From fab1be10919f31d8ae1864cd6842deb6c14d2140 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 18 May 2026 17:55:44 +0000 Subject: [PATCH] feat: sync research and tasks endpoints Co-authored-by: william --- README.md | 46 ++- src/linkup/__init__.py | 26 ++ src/linkup/_client.py | 813 ++++++++++++++++++++++++++++++++++++-- src/linkup/_types.py | 239 ++++++++++- tests/unit/client_test.py | 393 ++++++++++++++++++ 5 files changed, 1470 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index 292355f..4c15f1b 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ benefit from Linkup services to the full extent. 📝 ## 🌟 Features - ✅ **Simple and intuitive API client.** -- 🔍 **Support all Linkup entrypoints and parameters.** +- 🔍 **Supports Linkup search, fetch, research, and task workflows.** - ⚡ **Support synchronous and asynchronous calls.** - 🔒 **Handle authentication and request management.** - 💳 **Built-in x402 payment protocol support for on-chain payments.** @@ -166,6 +166,50 @@ Check the code or the [official documentation](https://docs.linkup.so/pages/documentation/api-reference/endpoint/post-fetch) for the detailed list of available parameters. +#### 🧠 Research + +The `research` function creates an asynchronous research task. You can then use `get_research` or +`list_research` to inspect it later. + +```python +from linkup import LinkupClient, LinkupResearchTask + +client = LinkupClient() +research_task: LinkupResearchTask = client.research( + query="What changed in the AI browser market this quarter?", + output_type="sourcedAnswer", +) +print(research_task.id) +``` + +#### 🗂️ Tasks + +The `create_tasks` function lets you submit mixed `search`, `fetch`, and `research` jobs in a single +batch, then inspect them through `get_task` or `list_tasks`. + +```python +from linkup import ( + LinkupClient, + LinkupFetchTaskInput, + LinkupSearchTaskInput, +) + +client = LinkupClient() +tasks = client.create_tasks( + [ + LinkupSearchTaskInput( + query="Linkup latest product updates", + depth="deep", + output_type="sourcedAnswer", + ), + LinkupFetchTaskInput( + url="https://docs.linkup.so", + ), + ] +) +print([task.id for task in tasks]) +``` + #### ⌛ Asynchronous Calls All the Linkup main functions come with an asynchronous counterpart, with the same behavior and the diff --git a/src/linkup/__init__.py b/src/linkup/__init__.py index f2abef2..0d1030f 100644 --- a/src/linkup/__init__.py +++ b/src/linkup/__init__.py @@ -13,13 +13,26 @@ LinkupUnknownError, ) from ._types import ( + LinkupFetchImageExtraction, LinkupFetchResponse, + LinkupFetchTask, + LinkupFetchTaskInput, + LinkupResearchTask, + LinkupResearchTaskInput, + LinkupResearchTasksPage, LinkupSearchImageResult, LinkupSearchResults, LinkupSearchStructuredResponse, + LinkupSearchTask, + LinkupSearchTaskInput, LinkupSearchTextResult, LinkupSource, LinkupSourcedAnswer, + LinkupTask, + LinkupTaskInput, + LinkupTaskMetadata, + LinkupTaskQuota, + LinkupTasksPage, ) from ._version import __version__ @@ -27,19 +40,32 @@ "LinkupAuthenticationError", "LinkupClient", "LinkupFailedFetchError", + "LinkupFetchImageExtraction", "LinkupFetchResponse", "LinkupFetchResponseTooLargeError", + "LinkupFetchTask", + "LinkupFetchTaskInput", "LinkupFetchUrlIsFileError", "LinkupInsufficientCreditError", "LinkupInvalidRequestError", "LinkupNoResultError", "LinkupPaymentRequiredError", + "LinkupResearchTask", + "LinkupResearchTaskInput", + "LinkupResearchTasksPage", "LinkupSearchImageResult", "LinkupSearchResults", "LinkupSearchStructuredResponse", + "LinkupSearchTask", + "LinkupSearchTaskInput", "LinkupSearchTextResult", "LinkupSource", "LinkupSourcedAnswer", + "LinkupTask", + "LinkupTaskInput", + "LinkupTaskMetadata", + "LinkupTaskQuota", + "LinkupTasksPage", "LinkupTimeoutError", "LinkupTooManyRequestsError", "LinkupUnknownError", diff --git a/src/linkup/_client.py b/src/linkup/_client.py index 48af1b7..0f65539 100644 --- a/src/linkup/_client.py +++ b/src/linkup/_client.py @@ -5,7 +5,7 @@ import json import os from datetime import date # noqa: TC003 (`date` is used in test mocks) -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Literal, cast import httpx from pydantic import BaseModel, SecretStr @@ -25,9 +25,19 @@ ) from ._types import ( LinkupFetchResponse, + LinkupFetchTask, + LinkupFetchTaskInput, + LinkupResearchTask, + LinkupResearchTaskInput, + LinkupResearchTasksPage, LinkupSearchResults, LinkupSearchStructuredResponse, + LinkupSearchTask, + LinkupSearchTaskInput, LinkupSourcedAnswer, + LinkupTask, + LinkupTaskInput, + LinkupTasksPage, ) from ._version import __version__ @@ -271,6 +281,488 @@ async def async_search( include_sources=include_sources, ) + def research( + self, + query: str, + output_type: Literal["sourcedAnswer", "structured"], + reasoning_depth: Literal["S", "M", "L", "XL"] | None = None, + mode: Literal["answer", "auto", "investigate", "research"] | None = None, + structured_output_schema: type[BaseModel] | str | None = None, + from_date: date | None = None, + to_date: date | None = None, + exclude_domains: list[str] | None = None, + include_domains: list[str] | None = None, + timeout: float | None = None, + ) -> LinkupResearchTask: + """Create an asynchronous research task using the Linkup API `research` endpoint. + + The returned task can be inspected later with `get_research`, `list_research`, `get_task`, + or `list_tasks`. + + Args: + query: The research query to investigate. + output_type: The expected research output type. Use "sourcedAnswer" for an answer with + supporting sources, or "structured" for output matching `structured_output_schema`. + reasoning_depth: The amount of reasoning effort to use. If None, the Linkup API default + is used. + mode: The research mode to use. If None, the Linkup API default is used. + structured_output_schema: If output_type is "structured", specify the output schema. + Supported formats are a pydantic.BaseModel or a string representing a valid object + JSON schema. + from_date: The date from which the research sources should be considered. If None, + sources will not be filtered by a start date. + to_date: The date until which the research sources should be considered. If None, + sources will not be filtered by an end date. + exclude_domains: Domains to exclude from the research sources. + include_domains: Domains to restrict the research sources to. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + The created research task. + + Raises: + TypeError: If structured_output_schema is not a string or pydantic.BaseModel when + provided. + LinkupInvalidRequestError: If the request parameters are invalid. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupInsufficientCreditError: If you have run out of credit. + LinkupTimeoutError: If the request times out. + """ + params = self._get_research_params( + query=query, + output_type=output_type, + reasoning_depth=reasoning_depth, + mode=mode, + structured_output_schema=structured_output_schema, + from_date=from_date, + to_date=to_date, + exclude_domains=exclude_domains, + include_domains=include_domains, + ) + + response = self._request( + method="POST", + url="/research", + json=params, + timeout=timeout, + ) + + return self._parse_research_task(response.json()) + + async def async_research( + self, + query: str, + output_type: Literal["sourcedAnswer", "structured"], + reasoning_depth: Literal["S", "M", "L", "XL"] | None = None, + mode: Literal["answer", "auto", "investigate", "research"] | None = None, + structured_output_schema: type[BaseModel] | str | None = None, + from_date: date | None = None, + to_date: date | None = None, + exclude_domains: list[str] | None = None, + include_domains: list[str] | None = None, + timeout: float | None = None, + ) -> LinkupResearchTask: + """Asynchronously create a research task using the Linkup API `research` endpoint. + + The returned task can be inspected later with `async_get_research`, `async_list_research`, + `async_get_task`, or `async_list_tasks`. + + Args: + query: The research query to investigate. + output_type: The expected research output type. Use "sourcedAnswer" for an answer with + supporting sources, or "structured" for output matching `structured_output_schema`. + reasoning_depth: The amount of reasoning effort to use. If None, the Linkup API default + is used. + mode: The research mode to use. If None, the Linkup API default is used. + structured_output_schema: If output_type is "structured", specify the output schema. + Supported formats are a pydantic.BaseModel or a string representing a valid object + JSON schema. + from_date: The date from which the research sources should be considered. If None, + sources will not be filtered by a start date. + to_date: The date until which the research sources should be considered. If None, + sources will not be filtered by an end date. + exclude_domains: Domains to exclude from the research sources. + include_domains: Domains to restrict the research sources to. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + The created research task. + + Raises: + TypeError: If structured_output_schema is not a string or pydantic.BaseModel when + provided. + LinkupInvalidRequestError: If the request parameters are invalid. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupInsufficientCreditError: If you have run out of credit. + LinkupTimeoutError: If the request times out. + """ + params = self._get_research_params( + query=query, + output_type=output_type, + reasoning_depth=reasoning_depth, + mode=mode, + structured_output_schema=structured_output_schema, + from_date=from_date, + to_date=to_date, + exclude_domains=exclude_domains, + include_domains=include_domains, + ) + + response = await self._async_request( + method="POST", + url="/research", + json=params, + timeout=timeout, + ) + + return self._parse_research_task(response.json()) + + def list_research( + self, + page: int | None = None, + page_size: int | None = None, + sort_by: Literal["createdAt", "updatedAt"] | None = None, + sort_direction: Literal["asc", "desc"] | None = None, + timeout: float | None = None, + ) -> LinkupResearchTasksPage: + """List research tasks for the authenticated organization. + + Args: + page: The page number to retrieve. If None, the Linkup API default is used. + page_size: The number of tasks per page. If None, the Linkup API default is used. + sort_by: The field to sort by, either "createdAt" or "updatedAt". If None, the Linkup + API default is used. + sort_direction: The sort direction, either "asc" or "desc". If None, the Linkup API + default is used. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + A paginated page of research tasks. + + Raises: + LinkupInvalidRequestError: If the pagination or sorting parameters are invalid. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupTimeoutError: If the request times out. + """ + response = self._request( + method="GET", + url="/research", + params=self._get_paginated_params( + page=page, + page_size=page_size, + sort_by=sort_by, + sort_direction=sort_direction, + ), + timeout=timeout, + ) + + return self._parse_research_tasks_page(response.json()) + + async def async_list_research( + self, + page: int | None = None, + page_size: int | None = None, + sort_by: Literal["createdAt", "updatedAt"] | None = None, + sort_direction: Literal["asc", "desc"] | None = None, + timeout: float | None = None, + ) -> LinkupResearchTasksPage: + """Asynchronously list research tasks for the authenticated organization. + + Args: + page: The page number to retrieve. If None, the Linkup API default is used. + page_size: The number of tasks per page. If None, the Linkup API default is used. + sort_by: The field to sort by, either "createdAt" or "updatedAt". If None, the Linkup + API default is used. + sort_direction: The sort direction, either "asc" or "desc". If None, the Linkup API + default is used. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + A paginated page of research tasks. + + Raises: + LinkupInvalidRequestError: If the pagination or sorting parameters are invalid. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupTimeoutError: If the request times out. + """ + response = await self._async_request( + method="GET", + url="/research", + params=self._get_paginated_params( + page=page, + page_size=page_size, + sort_by=sort_by, + sort_direction=sort_direction, + ), + timeout=timeout, + ) + + return self._parse_research_tasks_page(response.json()) + + def get_research(self, research_id: str, timeout: float | None = None) -> LinkupResearchTask: + """Retrieve a single research task by identifier. + + Args: + research_id: The identifier of the research task to retrieve. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + The requested research task. + + Raises: + LinkupInvalidRequestError: If the research identifier is invalid or unknown. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupTimeoutError: If the request times out. + """ + response = self._request( + method="GET", + url=f"/research/{research_id}", + timeout=timeout, + ) + + return self._parse_research_task(response.json()) + + async def async_get_research( + self, research_id: str, timeout: float | None = None + ) -> LinkupResearchTask: + """Asynchronously retrieve a single research task by identifier. + + Args: + research_id: The identifier of the research task to retrieve. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + The requested research task. + + Raises: + LinkupInvalidRequestError: If the research identifier is invalid or unknown. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupTimeoutError: If the request times out. + """ + response = await self._async_request( + method="GET", + url=f"/research/{research_id}", + timeout=timeout, + ) + + return self._parse_research_task(response.json()) + + def create_tasks( + self, tasks: list[LinkupTaskInput], timeout: float | None = None + ) -> list[LinkupTask]: + """Create a mixed batch of search, fetch, and research tasks. + + Args: + tasks: The tasks to create. Supported task input models are `LinkupSearchTaskInput`, + `LinkupFetchTaskInput`, and `LinkupResearchTaskInput`. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + The created tasks, parsed according to each task type. + + Raises: + TypeError: If a task has an unsupported model type, or if a structured output schema has + an unsupported type. + LinkupInvalidRequestError: If the task payload is invalid. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupInsufficientCreditError: If you have run out of credit. + LinkupTimeoutError: If the request times out. + """ + response = self._request( + method="POST", + url="/tasks", + json=self._get_tasks_payload(tasks), + timeout=timeout, + ) + + response_data = cast("list[dict[str, Any]]", response.json()) + return [self._parse_task(task) for task in response_data] + + async def async_create_tasks( + self, tasks: list[LinkupTaskInput], timeout: float | None = None + ) -> list[LinkupTask]: + """Asynchronously create a mixed batch of search, fetch, and research tasks. + + Args: + tasks: The tasks to create. Supported task input models are `LinkupSearchTaskInput`, + `LinkupFetchTaskInput`, and `LinkupResearchTaskInput`. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + The created tasks, parsed according to each task type. + + Raises: + TypeError: If a task has an unsupported model type, or if a structured output schema has + an unsupported type. + LinkupInvalidRequestError: If the task payload is invalid. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupInsufficientCreditError: If you have run out of credit. + LinkupTimeoutError: If the request times out. + """ + response = await self._async_request( + method="POST", + url="/tasks", + json=self._get_tasks_payload(tasks), + timeout=timeout, + ) + + response_data = cast("list[dict[str, Any]]", response.json()) + return [self._parse_task(task) for task in response_data] + + def list_tasks( + self, + page: int | None = None, + page_size: int | None = None, + sort_by: Literal["createdAt", "updatedAt"] | None = None, + sort_direction: Literal["asc", "desc"] | None = None, + status: Literal["pending", "processing", "completed", "failed"] | None = None, + task_type: Literal["search", "fetch", "research"] | None = None, + timeout: float | None = None, + ) -> LinkupTasksPage: + """List tasks for the authenticated organization. + + Args: + page: The page number to retrieve. If None, the Linkup API default is used. + page_size: The number of tasks per page. If None, the Linkup API default is used. + sort_by: The field to sort by, either "createdAt" or "updatedAt". If None, the Linkup + API default is used. + sort_direction: The sort direction, either "asc" or "desc". If None, the Linkup API + default is used. + status: A task status to filter by. If None, no status filter is sent. + task_type: A task type to filter by. If None, no task type filter is sent. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + A paginated page of tasks with pagination metadata and quota information. + + Raises: + LinkupInvalidRequestError: If the filtering, pagination, or sorting parameters are + invalid. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupTimeoutError: If the request times out. + """ + response = self._request( + method="GET", + url="/tasks", + params=self._get_task_list_params( + page=page, + page_size=page_size, + sort_by=sort_by, + sort_direction=sort_direction, + status=status, + task_type=task_type, + ), + timeout=timeout, + ) + + return self._parse_tasks_page(response.json()) + + async def async_list_tasks( + self, + page: int | None = None, + page_size: int | None = None, + sort_by: Literal["createdAt", "updatedAt"] | None = None, + sort_direction: Literal["asc", "desc"] | None = None, + status: Literal["pending", "processing", "completed", "failed"] | None = None, + task_type: Literal["search", "fetch", "research"] | None = None, + timeout: float | None = None, + ) -> LinkupTasksPage: + """Asynchronously list tasks for the authenticated organization. + + Args: + page: The page number to retrieve. If None, the Linkup API default is used. + page_size: The number of tasks per page. If None, the Linkup API default is used. + sort_by: The field to sort by, either "createdAt" or "updatedAt". If None, the Linkup + API default is used. + sort_direction: The sort direction, either "asc" or "desc". If None, the Linkup API + default is used. + status: A task status to filter by. If None, no status filter is sent. + task_type: A task type to filter by. If None, no task type filter is sent. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + A paginated page of tasks with pagination metadata and quota information. + + Raises: + LinkupInvalidRequestError: If the filtering, pagination, or sorting parameters are + invalid. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupTimeoutError: If the request times out. + """ + response = await self._async_request( + method="GET", + url="/tasks", + params=self._get_task_list_params( + page=page, + page_size=page_size, + sort_by=sort_by, + sort_direction=sort_direction, + status=status, + task_type=task_type, + ), + timeout=timeout, + ) + + return self._parse_tasks_page(response.json()) + + def get_task(self, task_id: str, timeout: float | None = None) -> LinkupTask: + """Retrieve a single task by identifier. + + Args: + task_id: The identifier of the task to retrieve. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + The requested task, parsed according to its task type. + + Raises: + LinkupInvalidRequestError: If the task identifier is invalid or unknown. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupTimeoutError: If the request times out. + """ + response = self._request( + method="GET", + url=f"/tasks/{task_id}", + timeout=timeout, + ) + + return self._parse_task(cast("dict[str, Any]", response.json())) + + async def async_get_task(self, task_id: str, timeout: float | None = None) -> LinkupTask: + """Asynchronously retrieve a single task by identifier. + + Args: + task_id: The identifier of the task to retrieve. + timeout: The timeout for the HTTP request, in seconds. If None, the request will have + no timeout. + + Returns: + The requested task, parsed according to its task type. + + Raises: + LinkupInvalidRequestError: If the task identifier is invalid or unknown. + LinkupAuthenticationError: If the Linkup API key is invalid. + LinkupTimeoutError: If the request times out. + """ + response = await self._async_request( + method="GET", + url=f"/tasks/{task_id}", + timeout=timeout, + ) + + return self._parse_task(cast("dict[str, Any]", response.json())) + def fetch( self, url: str, @@ -383,16 +875,23 @@ def _request( method: str, url: str, *, - json: dict[str, Any], + json: dict[str, Any] | list[dict[str, Any]] | None = None, + params: dict[str, Any] | None = None, timeout: float | None, ) -> httpx.Response: try: with httpx.Client(base_url=self._base_url, headers=self._headers()) as client: + request_kwargs: dict[str, Any] = { + "method": method, + "url": url, + "timeout": timeout, + } + if json is not None: + request_kwargs["json"] = json + if params is not None: + request_kwargs["params"] = params response: httpx.Response = client.request( - method=method, - url=url, - json=json, - timeout=timeout, + **request_kwargs, ) if response.status_code == 402 and self._x402_signer is not None: return self._handle_x402_payment( @@ -401,6 +900,7 @@ def _request( method=method, url=url, json=json, + params=params, timeout=timeout, ) except httpx.TimeoutException as e: @@ -416,18 +916,25 @@ async def _async_request( method: str, url: str, *, - json: dict[str, Any], + json: dict[str, Any] | list[dict[str, Any]] | None = None, + params: dict[str, Any] | None = None, timeout: float | None, ) -> httpx.Response: try: async with httpx.AsyncClient( base_url=self._base_url, headers=self._headers() ) as client: + request_kwargs: dict[str, Any] = { + "method": method, + "url": url, + "timeout": timeout, + } + if json is not None: + request_kwargs["json"] = json + if params is not None: + request_kwargs["params"] = params response: httpx.Response = await client.request( - method=method, - url=url, - json=json, - timeout=timeout, + **request_kwargs, ) if response.status_code == 402 and self._x402_signer is not None: return await self._async_handle_x402_payment( @@ -436,6 +943,7 @@ async def _async_request( method=method, url=url, json=json, + params=params, timeout=timeout, ) except httpx.TimeoutException as e: @@ -452,7 +960,8 @@ def _handle_x402_payment( response: httpx.Response, method: str, url: str, - json: dict[str, Any], + json: dict[str, Any] | list[dict[str, Any]] | None, + params: dict[str, Any] | None, timeout: float | None, ) -> httpx.Response: if self._x402_signer is None: @@ -471,13 +980,17 @@ def _handle_x402_payment( ) from e merged_headers = {**self._headers(), **payment_headers} - retry_response: httpx.Response = client.request( - method=method, - url=url, - json=json, - timeout=timeout, - headers=merged_headers, - ) + request_kwargs: dict[str, Any] = { + "method": method, + "url": url, + "timeout": timeout, + "headers": merged_headers, + } + if json is not None: + request_kwargs["json"] = json + if params is not None: + request_kwargs["params"] = params + retry_response: httpx.Response = client.request(**request_kwargs) if retry_response.status_code != 200: self._raise_linkup_error(response=retry_response) @@ -490,7 +1003,8 @@ async def _async_handle_x402_payment( response: httpx.Response, method: str, url: str, - json: dict[str, Any], + json: dict[str, Any] | list[dict[str, Any]] | None, + params: dict[str, Any] | None, timeout: float | None, ) -> httpx.Response: if self._x402_signer is None: @@ -509,13 +1023,17 @@ async def _async_handle_x402_payment( ) from e merged_headers = {**self._headers(), **payment_headers} - retry_response: httpx.Response = await client.request( - method=method, - url=url, - json=json, - timeout=timeout, - headers=merged_headers, - ) + request_kwargs: dict[str, Any] = { + "method": method, + "url": url, + "timeout": timeout, + "headers": merged_headers, + } + if json is not None: + request_kwargs["json"] = json + if params is not None: + request_kwargs["params"] = params + retry_response: httpx.Response = await client.request(**request_kwargs) if retry_response.status_code != 200: self._raise_linkup_error(response=retry_response) @@ -618,10 +1136,10 @@ def _get_search_params( query: str, depth: Literal["standard", "deep"], output_type: Literal["searchResults", "sourcedAnswer", "structured"], - structured_output_schema: type[BaseModel] | str | None, + structured_output_schema: type[BaseModel] | str | dict[str, Any] | None, include_images: bool | None, - from_date: date | None, - to_date: date | None, + from_date: date | str | None, + to_date: date | str | None, exclude_domains: list[str] | None, include_domains: list[str] | None, max_results: int | None, @@ -637,6 +1155,8 @@ def _get_search_params( if structured_output_schema is not None: if isinstance(structured_output_schema, str): params["structuredOutputSchema"] = structured_output_schema + elif isinstance(structured_output_schema, dict): + params["structuredOutputSchema"] = json.dumps(structured_output_schema) elif issubclass(structured_output_schema, BaseModel): json_schema: dict[str, Any] = structured_output_schema.model_json_schema() params["structuredOutputSchema"] = json.dumps(json_schema) @@ -647,9 +1167,9 @@ def _get_search_params( if include_images is not None: params["includeImages"] = include_images if from_date is not None: - params["fromDate"] = from_date.isoformat() + params["fromDate"] = from_date if isinstance(from_date, str) else from_date.isoformat() if to_date is not None: - params["toDate"] = to_date.isoformat() + params["toDate"] = to_date if isinstance(to_date, str) else to_date.isoformat() if exclude_domains is not None: params["excludeDomains"] = exclude_domains if include_domains is not None: @@ -663,6 +1183,153 @@ def _get_search_params( return params + def _get_research_params( + self, + query: str, + output_type: Literal["sourcedAnswer", "structured"], + reasoning_depth: Literal["S", "M", "L", "XL"] | None, + mode: Literal["answer", "auto", "investigate", "research"] | None, + structured_output_schema: type[BaseModel] | str | dict[str, Any] | None, + from_date: date | str | None, + to_date: date | str | None, + exclude_domains: list[str] | None, + include_domains: list[str] | None, + ) -> dict[str, str | bool | list[str]]: + params: dict[str, str | bool | list[str]] = { + "q": query, + "outputType": output_type, + } + + if reasoning_depth is not None: + params["reasoningDepth"] = reasoning_depth + if mode is not None: + params["mode"] = mode + + if structured_output_schema is not None: + if isinstance(structured_output_schema, str): + params["structuredOutputSchema"] = structured_output_schema + elif isinstance(structured_output_schema, dict): + params["structuredOutputSchema"] = json.dumps(structured_output_schema) + elif issubclass(structured_output_schema, BaseModel): + json_schema: dict[str, Any] = structured_output_schema.model_json_schema() + params["structuredOutputSchema"] = json.dumps(json_schema) + else: + raise TypeError( + f"Unexpected structured_output_schema type: '{type(structured_output_schema)}'" + ) + if from_date is not None: + params["fromDate"] = from_date if isinstance(from_date, str) else from_date.isoformat() + if to_date is not None: + params["toDate"] = to_date if isinstance(to_date, str) else to_date.isoformat() + if exclude_domains is not None: + params["excludeDomains"] = exclude_domains + if include_domains is not None: + params["includeDomains"] = include_domains + + return params + + def _get_paginated_params( + self, + page: int | None, + page_size: int | None, + sort_by: Literal["createdAt", "updatedAt"] | None, + sort_direction: Literal["asc", "desc"] | None, + ) -> dict[str, Any]: + params: dict[str, Any] = {} + if page is not None: + params["page"] = page + if page_size is not None: + params["pageSize"] = page_size + if sort_by is not None: + params["sortBy"] = sort_by + if sort_direction is not None: + params["sortDirection"] = sort_direction + return params + + def _get_task_list_params( + self, + page: int | None, + page_size: int | None, + sort_by: Literal["createdAt", "updatedAt"] | None, + sort_direction: Literal["asc", "desc"] | None, + status: Literal["pending", "processing", "completed", "failed"] | None, + task_type: Literal["search", "fetch", "research"] | None, + ) -> dict[str, Any]: + params = self._get_paginated_params( + page=page, + page_size=page_size, + sort_by=sort_by, + sort_direction=sort_direction, + ) + if status is not None: + params["status"] = status + if task_type is not None: + params["type"] = task_type + return params + + def _get_tasks_payload(self, tasks: list[LinkupTaskInput]) -> list[dict[str, Any]]: + payload: list[dict[str, Any]] = [] + + for task in tasks: + if isinstance(task, LinkupSearchTaskInput): + payload.append( + { + "type": "search", + "input": self._get_search_params( + query=task.query, + depth=task.depth, + output_type=task.output_type, + structured_output_schema=task.structured_output_schema, + include_images=task.include_images, + from_date=task.from_date, + to_date=task.to_date, + exclude_domains=task.exclude_domains, + include_domains=task.include_domains, + max_results=task.max_results, + include_inline_citations=task.include_inline_citations, + include_sources=task.include_sources, + ), + } + ) + continue + + if isinstance(task, LinkupFetchTaskInput): + payload.append( + { + "type": "fetch", + "input": self._get_fetch_params( + url=task.url, + include_raw_html=task.include_raw_html, + render_js=task.render_js, + extract_images=task.extract_images, + ), + } + ) + continue + + if isinstance(task, LinkupResearchTaskInput): + payload.append( + { + "type": "research", + "input": self._get_research_params( + query=task.query, + output_type=task.output_type, + structured_output_schema=task.structured_output_schema, + from_date=task.from_date, + to_date=task.to_date, + exclude_domains=task.exclude_domains, + include_domains=task.include_domains, + mode=task.mode, + reasoning_depth=task.reasoning_depth, + ), + } + ) + continue + + raise TypeError(f"Unexpected task model type: '{type(task)}'") + + return payload + def _get_fetch_params( self, url: str, @@ -688,7 +1355,20 @@ def _parse_search_response( structured_output_schema: type[BaseModel] | str | None, include_sources: bool | None, ) -> Any: # noqa: ANN401 - response_data: Any = response.json() + return self._parse_search_response_data( + response_data=response.json(), + output_type=output_type, + structured_output_schema=structured_output_schema, + include_sources=include_sources, + ) + + def _parse_search_response_data( + self, + response_data: Any, # noqa: ANN401 + output_type: Literal["searchResults", "sourcedAnswer", "structured"], + structured_output_schema: type[BaseModel] | str | dict[str, Any] | None, + include_sources: bool | None, + ) -> Any: # noqa: ANN401 if output_type == "searchResults": return LinkupSearchResults.model_validate(response_data) if output_type == "sourcedAnswer": @@ -701,14 +1381,14 @@ def _parse_search_response( # HACK: we assume that `include_sources` will default to False, since the API output can # be arbitrary so we can't guess if it includes sources or not if include_sources: - if not isinstance(structured_output_schema, str) and issubclass( + if isinstance(structured_output_schema, type) and issubclass( structured_output_schema, BaseModel ): response_data["data"] = structured_output_schema.model_validate( response_data["data"] ) return LinkupSearchStructuredResponse.model_validate(response_data) - if not isinstance(structured_output_schema, str) and issubclass( + if isinstance(structured_output_schema, type) and issubclass( structured_output_schema, BaseModel ): return structured_output_schema.model_validate(response_data) @@ -716,4 +1396,65 @@ def _parse_search_response( raise ValueError(f"Unexpected output_type value: '{output_type}'") def _parse_fetch_response(self, response: httpx.Response) -> LinkupFetchResponse: - return LinkupFetchResponse.model_validate(response.json()) + return self._parse_fetch_response_data(response.json()) + + def _parse_fetch_response_data(self, response_data: Any) -> LinkupFetchResponse: # noqa: ANN401 + return LinkupFetchResponse.model_validate(response_data) + + def _parse_research_task(self, task_data: dict[str, Any]) -> LinkupResearchTask: + research_input = LinkupResearchTaskInput.model_validate(task_data["input"]) + parsed_output = task_data.get("output") + + if parsed_output is not None and research_input.output_type == "sourcedAnswer": + parsed_output = LinkupSourcedAnswer.model_validate(parsed_output) + + return LinkupResearchTask.model_validate( + {**task_data, "input": research_input, "output": parsed_output} + ) + + def _parse_task(self, task_data: dict[str, Any]) -> LinkupTask: + task_type = task_data["type"] + + if task_type == "search": + search_input = LinkupSearchTaskInput.model_validate(task_data["input"]) + parsed_output = task_data.get("output") + if parsed_output is not None: + parsed_output = self._parse_search_response_data( + response_data=parsed_output, + output_type=search_input.output_type, + structured_output_schema=search_input.structured_output_schema, + include_sources=search_input.include_sources, + ) + return LinkupSearchTask.model_validate( + {**task_data, "input": search_input, "output": parsed_output} + ) + + if task_type == "fetch": + fetch_input = LinkupFetchTaskInput.model_validate(task_data["input"]) + parsed_output = task_data.get("output") + if parsed_output is not None: + parsed_output = self._parse_fetch_response_data(parsed_output) + return LinkupFetchTask.model_validate( + {**task_data, "input": fetch_input, "output": parsed_output} + ) + + if task_type == "research": + return self._parse_research_task(task_data) + + raise ValueError(f"Unexpected task type value: '{task_type}'") + + def _parse_research_tasks_page(self, response_data: dict[str, Any]) -> LinkupResearchTasksPage: + return LinkupResearchTasksPage.model_validate( + { + **response_data, + "data": [self._parse_research_task(task) for task in response_data["data"]], + } + ) + + def _parse_tasks_page(self, response_data: dict[str, Any]) -> LinkupTasksPage: + return LinkupTasksPage.model_validate( + { + **response_data, + "data": [self._parse_task(task) for task in response_data["data"]], + } + ) diff --git a/src/linkup/_types.py b/src/linkup/_types.py index fbd8454..5eab74d 100644 --- a/src/linkup/_types.py +++ b/src/linkup/_types.py @@ -1,11 +1,16 @@ """Input and output types for Linkup functions.""" +from datetime import date from typing import Any, Literal from pydantic import BaseModel, ConfigDict, Field -class LinkupSearchTextResult(BaseModel): +class _LinkupBaseModel(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + +class LinkupSearchTextResult(_LinkupBaseModel): """A text result from a Linkup search. Attributes: @@ -23,7 +28,7 @@ class LinkupSearchTextResult(BaseModel): favicon: str = "" -class LinkupSearchImageResult(BaseModel): +class LinkupSearchImageResult(_LinkupBaseModel): """An image result from a Linkup search. Attributes: @@ -37,7 +42,7 @@ class LinkupSearchImageResult(BaseModel): url: str -class LinkupSearchResults(BaseModel): +class LinkupSearchResults(_LinkupBaseModel): """The results of the Linkup search. Attributes: @@ -47,7 +52,7 @@ class LinkupSearchResults(BaseModel): results: list[LinkupSearchTextResult | LinkupSearchImageResult] -class LinkupSource(BaseModel): +class LinkupSource(_LinkupBaseModel): """A source supporting a Linkup answer. Attributes: @@ -63,7 +68,7 @@ class LinkupSource(BaseModel): favicon: str = "" -class LinkupSourcedAnswer(BaseModel): +class LinkupSourcedAnswer(_LinkupBaseModel): """A Linkup answer, with the sources supporting it. Attributes: @@ -75,7 +80,7 @@ class LinkupSourcedAnswer(BaseModel): sources: list[LinkupSource] -class LinkupSearchStructuredResponse(BaseModel): +class LinkupSearchStructuredResponse(_LinkupBaseModel): """A Linkup `search` structured response, with the sources supporting it. Attributes: @@ -87,7 +92,7 @@ class LinkupSearchStructuredResponse(BaseModel): sources: list[LinkupSearchTextResult | LinkupSearchImageResult] -class LinkupFetchImageExtraction(BaseModel): +class LinkupFetchImageExtraction(_LinkupBaseModel): """An image extraction from a Linkup web page fetch. Attributes: @@ -99,7 +104,7 @@ class LinkupFetchImageExtraction(BaseModel): url: str -class LinkupFetchResponse(BaseModel): +class LinkupFetchResponse(_LinkupBaseModel): """The response from a Linkup web page fetch. Attributes: @@ -108,8 +113,222 @@ class LinkupFetchResponse(BaseModel): images: The optional list of image URLs. """ - model_config = ConfigDict(populate_by_name=True) - markdown: str raw_html: str | None = Field(default=None, validation_alias="rawHtml") images: list[LinkupFetchImageExtraction] | None = Field(default=None) + + +class LinkupSearchTaskInput(_LinkupBaseModel): + """Input for creating or retrieving a search task. + + Attributes: + query: The search query. + depth: The search depth. + output_type: The expected search output type. + include_images: Whether image results should be included. + from_date: The start date used to filter search sources, if any. + to_date: The end date used to filter search sources, if any. + exclude_domains: Domains to exclude from the search, if any. + include_domains: Domains to restrict the search to, if any. + max_results: The maximum number of search results requested, if any. + include_inline_citations: Whether inline citations should be included. + include_sources: Whether sources should be included for structured output. + structured_output_schema: The structured output schema, if any. + """ + + query: str = Field(validation_alias="q") + depth: Literal["standard", "deep"] + output_type: Literal["searchResults", "sourcedAnswer", "structured"] = Field( + validation_alias="outputType" + ) + include_images: bool | None = Field(default=None, validation_alias="includeImages") + from_date: date | str | None = Field(default=None, validation_alias="fromDate") + to_date: date | str | None = Field(default=None, validation_alias="toDate") + exclude_domains: list[str] | None = Field(default=None, validation_alias="excludeDomains") + include_domains: list[str] | None = Field(default=None, validation_alias="includeDomains") + max_results: int | None = Field(default=None, validation_alias="maxResults") + include_inline_citations: bool | None = Field( + default=None, validation_alias="includeInlineCitations" + ) + include_sources: bool | None = Field(default=None, validation_alias="includeSources") + structured_output_schema: type[BaseModel] | str | dict[str, Any] | None = Field( + default=None, validation_alias="structuredOutputSchema" + ) + + +class LinkupResearchTaskInput(_LinkupBaseModel): + """Input for creating or retrieving a research task. + + Attributes: + query: The research query. + output_type: The expected research output type. + mode: The research mode to use, if provided. + reasoning_depth: The reasoning depth to use, if provided. + from_date: The start date used to filter research sources, if any. + to_date: The end date used to filter research sources, if any. + exclude_domains: Domains to exclude from the research sources, if any. + include_domains: Domains to restrict the research sources to, if any. + structured_output_schema: The structured output schema, if any. + """ + + query: str = Field(validation_alias="q") + output_type: Literal["sourcedAnswer", "structured"] = Field(validation_alias="outputType") + mode: Literal["answer", "auto", "investigate", "research"] | None = None + reasoning_depth: Literal["S", "M", "L", "XL"] | None = Field( + default=None, validation_alias="reasoningDepth" + ) + from_date: date | str | None = Field(default=None, validation_alias="fromDate") + to_date: date | str | None = Field(default=None, validation_alias="toDate") + exclude_domains: list[str] | None = Field(default=None, validation_alias="excludeDomains") + include_domains: list[str] | None = Field(default=None, validation_alias="includeDomains") + structured_output_schema: type[BaseModel] | str | dict[str, Any] | None = Field( + default=None, validation_alias="structuredOutputSchema" + ) + + +class LinkupFetchTaskInput(_LinkupBaseModel): + """Input for creating or retrieving a fetch task. + + Attributes: + url: The URL requested for fetching. + include_raw_html: Whether raw HTML should be included in the fetch response. + render_js: Whether JavaScript rendering should be enabled. + extract_images: Whether image extraction should be enabled. + """ + + url: str + include_raw_html: bool | None = Field(default=None, validation_alias="includeRawHtml") + render_js: bool | None = Field(default=None, validation_alias="renderJs") + extract_images: bool | None = Field(default=None, validation_alias="extractImages") + + +LinkupTaskInput = LinkupSearchTaskInput | LinkupFetchTaskInput | LinkupResearchTaskInput + + +class LinkupTaskMetadata(_LinkupBaseModel): + """Pagination metadata returned by list endpoints. + + Attributes: + page: The current page number. + page_size: The number of tasks per page. + total: The total number of tasks matching the request. + total_pages: The total number of available pages. + """ + + page: int + page_size: int = Field(validation_alias="pageSize") + total: int + total_pages: int = Field(validation_alias="totalPages") + + +class LinkupTaskQuota(_LinkupBaseModel): + """Task quota information returned by `list_tasks`. + + Attributes: + in_flight: The number of tasks currently in flight. + limit: The maximum number of in-flight tasks allowed. + """ + + in_flight: int = Field(validation_alias="inFlight") + limit: int + + +class LinkupSearchTask(_LinkupBaseModel): + """A search task returned by the Linkup API. + + Attributes: + created_at: The task creation timestamp. + error: The task error message, if the task failed. + id: The task identifier. + input: The normalized search input for this task. + output: The parsed search output, if available. + status: The current task status. + type: The task type, in this case "search". + updated_at: The last task update timestamp. + """ + + created_at: str = Field(validation_alias="createdAt") + error: str | None = None + id: str + input: LinkupSearchTaskInput + output: Any = None + status: Literal["pending", "processing", "completed", "failed"] + type: Literal["search"] + updated_at: str = Field(validation_alias="updatedAt") + + +class LinkupFetchTask(_LinkupBaseModel): + """A fetch task returned by the Linkup API. + + Attributes: + created_at: The task creation timestamp. + error: The task error message, if the task failed. + id: The task identifier. + input: The normalized fetch input for this task. + output: The parsed fetch output, if available. + status: The current task status. + type: The task type, in this case "fetch". + updated_at: The last task update timestamp. + """ + + created_at: str = Field(validation_alias="createdAt") + error: str | None = None + id: str + input: LinkupFetchTaskInput + output: LinkupFetchResponse | None = None + status: Literal["pending", "processing", "completed", "failed"] + type: Literal["fetch"] + updated_at: str = Field(validation_alias="updatedAt") + + +class LinkupResearchTask(_LinkupBaseModel): + """A research task returned by the Linkup API. + + Attributes: + created_at: The task creation timestamp. + error: The task error message, if the task failed. + id: The task identifier. + input: The normalized research input for this task. + output: The parsed research output, if available. + status: The current task status. + type: The task type, in this case "research". + updated_at: The last task update timestamp. + """ + + created_at: str = Field(validation_alias="createdAt") + error: str | None = None + id: str + input: LinkupResearchTaskInput + output: Any = None + status: Literal["pending", "processing", "completed", "failed"] + type: Literal["research"] + updated_at: str = Field(validation_alias="updatedAt") + + +LinkupTask = LinkupSearchTask | LinkupFetchTask | LinkupResearchTask + + +class LinkupResearchTasksPage(_LinkupBaseModel): + """Paginated research task list. + + Attributes: + data: The research tasks in the current page. + metadata: Pagination metadata for the result page. + """ + + data: list[LinkupResearchTask] + metadata: LinkupTaskMetadata + + +class LinkupTasksPage(_LinkupBaseModel): + """Paginated task list. + + Attributes: + data: The tasks in the current page. + metadata: Pagination metadata for the result page. + quota: Task quota information for the authenticated organization. + """ + + data: list[LinkupTask] + metadata: LinkupTaskMetadata + quota: LinkupTaskQuota diff --git a/tests/unit/client_test.py b/tests/unit/client_test.py index 8d80f12..eb50536 100644 --- a/tests/unit/client_test.py +++ b/tests/unit/client_test.py @@ -15,17 +15,26 @@ LinkupFailedFetchError, LinkupFetchResponse, LinkupFetchResponseTooLargeError, + LinkupFetchTask, + LinkupFetchTaskInput, LinkupFetchUrlIsFileError, LinkupInsufficientCreditError, LinkupInvalidRequestError, LinkupNoResultError, LinkupPaymentRequiredError, + LinkupResearchTask, + LinkupResearchTaskInput, + LinkupResearchTasksPage, LinkupSearchImageResult, LinkupSearchResults, LinkupSearchStructuredResponse, + LinkupSearchTask, + LinkupSearchTaskInput, LinkupSearchTextResult, LinkupSource, LinkupSourcedAnswer, + LinkupTaskMetadata, + LinkupTasksPage, LinkupTimeoutError, LinkupTooManyRequestsError, LinkupUnknownError, @@ -539,6 +548,108 @@ async def test_async_search_timeout( ) +def test_research(mocker: MockerFixture, client: LinkupClient) -> None: + request_mock = mocker.patch( + "httpx.Client.request", + return_value=Response( + status_code=200, + content=b""" + { + "createdAt": "2026-05-18T00:00:00.000Z", + "error": null, + "id": "4a44f4e0-eaf0-42eb-8ea4-99311b1d0f01", + "input": { + "mode": "auto", + "outputType": "sourcedAnswer", + "q": "query" + }, + "output": null, + "status": "pending", + "type": "research", + "updatedAt": "2026-05-18T00:00:00.000Z" + } + """, + ), + ) + + research_response = client.research(query="query", output_type="sourcedAnswer", mode="auto") + + request_mock.assert_called_once_with( + method="POST", + url="/research", + json={ + "q": "query", + "outputType": "sourcedAnswer", + "mode": "auto", + }, + timeout=None, + ) + assert research_response == LinkupResearchTask( + created_at="2026-05-18T00:00:00.000Z", + error=None, + id="4a44f4e0-eaf0-42eb-8ea4-99311b1d0f01", + input=LinkupResearchTaskInput( + query="query", + output_type="sourcedAnswer", + mode="auto", + ), + output=None, + status="pending", + type="research", + updated_at="2026-05-18T00:00:00.000Z", + ) + + +@pytest.mark.asyncio +async def test_async_research(mocker: MockerFixture, client: LinkupClient) -> None: + request_mock = mocker.patch( + "httpx.AsyncClient.request", + return_value=Response( + status_code=200, + content=b""" + { + "createdAt": "2026-05-18T00:00:00.000Z", + "error": null, + "id": "9a1c4553-4d42-4622-98b1-113004c4cf20", + "input": { + "outputType": "structured", + "q": "query", + "structuredOutputSchema": { + "type": "object" + } + }, + "output": { + "summary": "done" + }, + "status": "completed", + "type": "research", + "updatedAt": "2026-05-18T00:00:00.000Z" + } + """, + ), + ) + + research_response = await client.async_research( + query="query", + output_type="structured", + structured_output_schema=Company, + ) + + request_mock.assert_called_once_with( + method="POST", + url="/research", + json={ + "q": "query", + "outputType": "structured", + "structuredOutputSchema": json.dumps(Company.model_json_schema()), + }, + timeout=None, + ) + assert research_response.output == {"summary": "done"} + assert research_response.input.query == "query" + assert research_response.input.structured_output_schema == {"type": "object"} + + test_fetch_parameters = [ ( {"url": "https://example.com"}, @@ -781,6 +892,288 @@ async def test_async_fetch_timeout( await client.async_fetch(url="https://example.com", timeout=1.0) +def test_create_tasks(mocker: MockerFixture, client: LinkupClient) -> None: + request_mock = mocker.patch( + "httpx.Client.request", + return_value=Response( + status_code=200, + content=b""" + [ + { + "created_at": "2026-05-18T00:00:00.000Z", + "error": null, + "id": "7132d2b9-61b8-4d6f-a6f2-b69daeff6d58", + "input": { + "depth": "deep", + "outputType": "structured", + "q": "query", + "structuredOutputSchema": { + "type": "object" + } + }, + "output": null, + "status": "pending", + "type": "search", + "updatedAt": "2026-05-18T00:00:00.000Z" + }, + { + "createdAt": "2026-05-18T00:00:00.000Z", + "error": null, + "id": "42057d84-72ea-4029-9598-1bf7424a6113", + "input": { + "extractImages": true, + "url": "https://example.com" + }, + "output": { + "images": [ + { + "alt": "hero", + "url": "https://example.com/image.png" + } + ], + "markdown": "Fetched content" + }, + "status": "completed", + "type": "fetch", + "updatedAt": "2026-05-18T00:00:00.000Z" + } + ] + """, + ), + ) + + tasks_response = client.create_tasks( + [ + LinkupSearchTaskInput( + query="query", + depth="deep", + output_type="structured", + structured_output_schema=Company, + ), + LinkupFetchTaskInput( + url="https://example.com", + extract_images=True, + ), + ] + ) + + request_mock.assert_called_once_with( + method="POST", + url="/tasks", + json=[ + { + "type": "search", + "input": { + "q": "query", + "depth": "deep", + "outputType": "structured", + "structuredOutputSchema": json.dumps(Company.model_json_schema()), + }, + }, + { + "type": "fetch", + "input": { + "url": "https://example.com", + "extractImages": True, + }, + }, + ], + timeout=None, + ) + assert isinstance(tasks_response[0], LinkupSearchTask) + assert tasks_response[0].input.query == "query" + assert tasks_response[0].input.structured_output_schema == {"type": "object"} + assert isinstance(tasks_response[1], LinkupFetchTask) + assert tasks_response[1].output is not None + assert tasks_response[1].output.images is not None + assert tasks_response[1].output.images[0].url == "https://example.com/image.png" + + +def test_create_tasks_research_model(mocker: MockerFixture, client: LinkupClient) -> None: + request_mock = mocker.patch( + "httpx.Client.request", + return_value=Response( + status_code=200, + content=b""" + [ + { + "createdAt": "2026-05-18T00:00:00.000Z", + "error": null, + "id": "bbd897fb-b761-4dd9-bf6a-b41ec52f2de7", + "input": { + "mode": "answer", + "outputType": "sourcedAnswer", + "q": "query", + "reasoningDepth": "S" + }, + "output": null, + "status": "processing", + "type": "research", + "updatedAt": "2026-05-18T00:00:00.000Z" + } + ] + """, + ), + ) + + tasks_response = client.create_tasks( + [ + LinkupResearchTaskInput( + query="query", + output_type="sourcedAnswer", + mode="answer", + reasoning_depth="S", + ) + ] + ) + + request_mock.assert_called_once_with( + method="POST", + url="/tasks", + json=[ + { + "type": "research", + "input": { + "q": "query", + "outputType": "sourcedAnswer", + "mode": "answer", + "reasoningDepth": "S", + }, + } + ], + timeout=None, + ) + assert isinstance(tasks_response[0], LinkupResearchTask) + assert tasks_response[0].input.reasoning_depth == "S" + + +@pytest.mark.asyncio +async def test_async_list_research(mocker: MockerFixture, client: LinkupClient) -> None: + request_mock = mocker.patch( + "httpx.AsyncClient.request", + return_value=Response( + status_code=200, + content=b""" + { + "data": [ + { + "createdAt": "2026-05-18T00:00:00.000Z", + "error": null, + "id": "cdedcd9f-ab4a-4404-b8c6-b9ca9dc4c837", + "input": { + "outputType": "sourcedAnswer", + "q": "query" + }, + "output": null, + "status": "pending", + "type": "research", + "updatedAt": "2026-05-18T00:00:00.000Z" + } + ], + "metadata": { + "page": 2, + "pageSize": 5, + "total": 11, + "totalPages": 3 + } + } + """, + ), + ) + + research_page = await client.async_list_research(page=2, page_size=5) + + request_mock.assert_called_once_with( + method="GET", + url="/research", + params={ + "page": 2, + "pageSize": 5, + }, + timeout=None, + ) + assert research_page == LinkupResearchTasksPage( + data=[ + LinkupResearchTask( + created_at="2026-05-18T00:00:00.000Z", + error=None, + id="cdedcd9f-ab4a-4404-b8c6-b9ca9dc4c837", + input=LinkupResearchTaskInput( + query="query", + output_type="sourcedAnswer", + ), + output=None, + status="pending", + type="research", + updated_at="2026-05-18T00:00:00.000Z", + ) + ], + metadata=LinkupTaskMetadata( + page=2, + page_size=5, + total=11, + total_pages=3, + ), + ) + + +def test_list_tasks(mocker: MockerFixture, client: LinkupClient) -> None: + request_mock = mocker.patch( + "httpx.Client.request", + return_value=Response( + status_code=200, + content=b""" + { + "data": [ + { + "createdAt": "2026-05-18T00:00:00.000Z", + "error": null, + "id": "bbd897fb-b761-4dd9-bf6a-b41ec52f2de7", + "input": { + "outputType": "sourcedAnswer", + "q": "query" + }, + "output": null, + "status": "processing", + "type": "research", + "updatedAt": "2026-05-18T00:00:00.000Z" + } + ], + "metadata": { + "page": 1, + "pageSize": 10, + "total": 1, + "totalPages": 1 + }, + "quota": { + "inFlight": 1, + "limit": 100 + } + } + """, + ), + ) + + tasks_page = client.list_tasks( + status="pending", + task_type="search", + ) + + request_mock.assert_called_once_with( + method="GET", + url="/tasks", + params={ + "status": "pending", + "type": "search", + }, + timeout=None, + ) + assert isinstance(tasks_page, LinkupTasksPage) + assert isinstance(tasks_page.data[0], LinkupResearchTask) + assert tasks_page.data[0].input.query == "query" + assert tasks_page.quota.in_flight == 1 + + _402_BODY = b'{"error": {"code": "PAYMENT_REQUIRED", "message": "Pay", "details": []}}' _402_BODY_FULL = (