feature request: native function calling for Open WebUI tools #141
huangboming
started this conversation in
Ideas
Replies: 3 comments
-
|
I've done some work for this. It's just a simple demo, which works but not perfect. Here's the full code: """
title: Gemini Manifold google_genai
id: gemini_manifold_google_genai
description: Manifold function for Gemini Developer API. Supports native image generation, grounding with Google Search and streaming. Uses google-genai.
author: suurt8ll
author_url: https://github.com/suurt8ll
funding_url: https://github.com/suurt8ll/open_webui_functions
license: MIT
version: 1.19.0rc1
requirements: google-genai==1.16.1
"""
# This is a helper function that provides a manifold for Google's Gemini Studio API.
# Be sure to check out my GitHub repository for more information! Contributions, questions and suggestions are very welcome.
# Supported features:
# - Native image generation (image output), use "gemini-2.0-flash-exp-image-generation"
# - Document understanding (PDF and plaintext files). (Gemini Manifold Companion >= 1.4.0 required)
# - Display citations in the front-end.
# - Image input
# - YouTube video input (automatically detects youtube.com and youtu.be URLs in messages)
# - Streaming
# - Grounding with Google Search (this requires installing "Gemini Manifold Companion" >= 1.2.0 filter, see GitHub README)
# - Permissive safety settings (Gemini Manifold Companion >= 1.3.0 required)
# - Each user can decide to use their own API key.
# - Token usage data
# - Code execution tool. (Gemini Manifold Companion >= 1.1.0 required)
# Features that are supported by API but not yet implemented in the manifold:
# TODO Audio input support.
# TODO Video input support (other than YouTube URLs).
from google import genai
from google.genai import types
import inspect
import asyncio
import copy
import json
import time
from functools import cache
from aiocache import cached
from aiocache.base import BaseCache
from fastapi.datastructures import State
import io
import mimetypes
import os
import uuid
import base64
import re
import fnmatch
import sys
from loguru import logger
from fastapi import Request
import pydantic_core
from pydantic import BaseModel, Field, field_validator
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import (
Any,
AsyncGenerator,
Literal,
TYPE_CHECKING,
cast,
)
from open_webui.models.chats import Chats
from open_webui.models.files import FileForm, Files
from open_webui.storage.provider import Storage
if TYPE_CHECKING:
from loguru import Record
from loguru._handler import Handler # type: ignore
from utils.manifold_types import * # My personal types in a separate file for more robustness.
# Setting auditable=False avoids duplicate output for log levels that would be printed out by the main log.
log = logger.bind(auditable=False)
class GenaiApiError(Exception):
"""Custom exception for errors during Genai API interactions."""
def __init__(self, message):
super().__init__(message)
class Pipe:
class Valves(BaseModel):
GEMINI_API_KEY: str | None = Field(default=None)
USER_MUST_PROVIDE_AUTH_CONFIG: bool = Field(
default=False,
description="""Whether to require users (including admins) to provide their own authentication configuration.
User can provide these through UserValves. Setting this to True will disallow users from using Vertex AI.
Default value is False.""",
)
AUTH_WHITELIST: str | None = Field(
default=None,
description="""Comma separated list of user emails that are allowed to bypassUSER_MUST_PROVIDE_AUTH_CONFIG and use the default authentication configuration.
Default value is None (no users are whitelisted).""",
)
# FIXME: Default to None.
GEMINI_API_BASE_URL: str = Field(
default="https://generativelanguage.googleapis.com",
description="The base URL for calling the Gemini API",
)
USE_VERTEX_AI: bool = Field(
default=False,
description="""Whether to use Google Cloud Vertex AI instead of the standard Gemini API.
If VERTEX_PROJECT is not set then the plugin will use the Gemini Developer API.
Default value is False.
Users can opt out of this by setting USE_VERTEX_AI to False in their UserValves.""",
)
VERTEX_PROJECT: str | None = Field(
default=None,
description="""The Google Cloud project ID to use with Vertex AI.
Default value is None.""",
)
# FIXME: Default to None.
VERTEX_LOCATION: str = Field(
default="global",
description="""The Google Cloud region to use with Vertex AI.
Default value is 'global'.""",
)
MODEL_WHITELIST: str = Field(
default="*",
description="""Comma-separated list of allowed model names.
Supports `fnmatch` patterns: *, ?, [seq], [!seq].
Default value is * (all models allowed).""",
)
MODEL_BLACKLIST: str | None = Field(
default=None,
description="""Comma-separated list of blacklisted model names.
Supports `fnmatch` patterns: *, ?, [seq], [!seq].
Default value is None (no blacklist).""",
)
CACHE_MODELS: bool = Field(
default=True,
description="Whether to request models only on first load and when white- or blacklist changes.",
)
THINKING_BUDGET: int = Field(
ge=0,
le=24576,
default=8192,
description="""Gemini 2.5 Flash only. Indicates the thinking budget in tokens.
0 means no thinking. Default value is 8192.
See <https://cloud.google.com/vertex-ai/generative-ai/docs/thinking> for more.""",
)
USE_FILES_API: bool = Field(
title="Use Files API",
default=True,
description="Save the image files using Open WebUI's API for files.",
)
THINKING_MODEL_PATTERN: str = Field(
default=r"thinking|gemini-2.5",
description="Regex pattern to identify thinking models.",
)
EMIT_INTERVAL: int = Field(
default=1,
description="Interval in seconds between status updates during thinking.",
)
EMIT_STATUS_UPDATES: bool = Field(
default=False,
description="Whether to emit status updates during model thinking.",
)
LOG_LEVEL: Literal[
"TRACE", "DEBUG", "INFO", "SUCCESS", "WARNING", "ERROR", "CRITICAL"
] = Field(
default="INFO",
description="Select logging level. Use `docker logs -f open-webui` to view logs.",
)
ENABLE_URL_CONTEXT_TOOL: bool = Field(
default=False,
description="Enable the URL context tool to allow the model to fetch and use content from provided URLs. This tool is only compatible with specific models.",
)
class UserValves(BaseModel):
# TODO: Add more options that can be changed by the user.
GEMINI_API_KEY: str | None = Field(
default=None,
description="""Gemini Developer API key.
Default value is None (uses the default from Valves, same goes for other options below).""",
)
GEMINI_API_BASE_URL: str | None = Field(
default=None,
description="""The base URL for calling the Gemini API
Default value is None.""",
)
USE_VERTEX_AI: bool | None | Literal[""] = Field(
default=None,
description="""Whether to use Google Cloud Vertex AI instead of the standard Gemini API.
Default value is None.""",
)
VERTEX_PROJECT: str | None = Field(
default=None,
description="""The Google Cloud project ID to use with Vertex AI.
Default value is None.""",
)
VERTEX_LOCATION: str | None = Field(
default=None,
description="""The Google Cloud region to use with Vertex AI.
Default value is None.""",
)
THINKING_BUDGET: int | None | Literal[""] = Field(
default=None,
description="""Gemini 2.5 Flash only. Indicates the thinking budget in tokens.
0 means no thinking. Default value is None (uses the default from Valves).
See <https://cloud.google.com/vertex-ai/generative-ai/docs/thinking> for more.""",
)
ENABLE_URL_CONTEXT_TOOL: bool = Field(
default=False,
description="Enable the URL context tool to allow the model to fetch and use content from provided URLs. This tool is only compatible with specific models.",
)
@field_validator("THINKING_BUDGET", mode="after")
@classmethod
def validate_thinking_budget_range(cls, v):
if v is not None and v != "":
if not (0 <= v <= 24576):
raise ValueError(
"THINKING_BUDGET must be between 0 and 24576, inclusive."
)
return v
def __init__(self):
self.valves = self.Valves()
async def pipes(self) -> list["ModelData"]:
"""Register all available Google models."""
self._add_log_handler(self.valves.LOG_LEVEL)
# Clear cache if caching is disabled
if not self.valves.CACHE_MODELS:
log.debug("CACHE_MODELS is False, clearing model cache.")
cache_instance = getattr(self._get_genai_models, "cache")
await cast(BaseCache, cache_instance).clear()
log.info("Fetching and filtering models from Google API.")
# Get and filter models (potentially cached based on API key, base URL, white- and blacklist)
try:
client_args = self._prepare_client_args(self.valves)
client_args += [self.valves.MODEL_WHITELIST, self.valves.MODEL_BLACKLIST]
filtered_models = await self._get_genai_models(*client_args)
except GenaiApiError:
error_msg = "Error getting the models from Google API, check the logs."
return [self._return_error_model(error_msg, exception=True)]
log.info(f"Returning {len(filtered_models)} models to Open WebUI.")
log.debug("Model list:", payload=filtered_models, _log_truncation_enabled=False)
return filtered_models
async def pipe(
self,
body: "Body",
__user__: "UserData",
__request__: Request,
__event_emitter__: Callable[["Event"], Awaitable[None]],
__metadata__: dict[str, Any],
__tools__: dict[str, dict],
) -> AsyncGenerator | str | None:
self._add_log_handler(self.valves.LOG_LEVEL)
# Apply settings from the user
valves: Pipe.Valves = self._get_merged_valves(
self.valves, __user__.get("valves"), __user__.get("email")
)
log.debug(
f"USE_VERTEX_AI: {valves.USE_VERTEX_AI}, VERTEX_PROJECT set: {bool(valves.VERTEX_PROJECT)}, API_KEY set: {bool(valves.GEMINI_API_KEY)}"
)
log.debug(
f"Getting genai client (potentially cached) for user {__user__['email']}."
)
client = self._get_user_client(valves, __user__["email"])
log.trace("__metadata__:", payload=__metadata__)
# Check if user is chatting with an error model for some reason.
if "error" in __metadata__["model"]["id"]:
error_msg = f"There has been an error during model retrival phase: {str(__metadata__['model'])}"
raise ValueError(error_msg)
# Get the message history directly from the backend.
# This allows us to see data about sources and files data.
chat_id = __metadata__.get("chat_id", "")
if chat := Chats.get_chat_by_id_and_user_id(id=chat_id, user_id=__user__["id"]):
chat_content: "ChatObjectDataTD" = chat.chat # type: ignore
# Last message is the upcoming assistant response, at this point in the logic it's empty.
messages_db = chat_content.get("messages")[:-1]
else:
warn_msg = f"Chat with ID - {chat_id} - not found. Can't filter out the citation marks."
log.warning(warn_msg)
messages_db = None
system_prompt = self._pop_system_prompt(body.get("messages"))
if messages_db and len(messages_db) != len(body.get("messages")):
warn_msg = (
f"Messages in the body ({len(body.get('messages'))}) and "
f"messages in the database ({len(messages_db)}) do not match. "
"This is likely due to a bug in Open WebUI. "
"Cannot filter out citation marks or upload files."
)
log.warning(warn_msg)
await self._emit_toast(warn_msg, __event_emitter__, "warning")
messages_db = None
features = __metadata__.get("features", {}) or {}
log.info(
"Converting Open WebUI's `body` dict into list of `Content` objects that `google-genai` understands."
)
contents = await self._genai_contents_from_messages(
body.get("messages"),
messages_db,
features.get("upload_documents", False),
__event_emitter__,
)
# Assemble GenerateContentConfig
safety_settings: list[types.SafetySetting] | None = __metadata__.get(
"safety_settings"
)
model_name = re.sub(r"^.*?[./]", "", body.get("model", ""))
# API does not stream thoughts sadly. See https://github.com/googleapis/python-genai/issues/226#issuecomment-2631657100
thinking_conf = None
if model_name == "gemini-2.5-flash-preview-04-17":
log.info(f"Model ID '{model_name}' allows adjusting the thinking settings.")
thinking_conf = types.ThinkingConfig(
thinking_budget=valves.THINKING_BUDGET,
include_thoughts=None,
)
# TODO: Take defaults from the general front-end config.
gen_content_conf = types.GenerateContentConfig(
system_instruction=system_prompt,
temperature=body.get("temperature"),
top_p=body.get("top_p"),
top_k=body.get("top_k"),
max_output_tokens=body.get("max_tokens"),
stop_sequences=body.get("stop"),
safety_settings=safety_settings,
thinking_config=thinking_conf,
)
gen_content_conf.response_modalities = ["Text"]
if (
"gemini-2.0-flash-exp-image-generation" in model_name
or "gemma" in model_name
):
if "gemini-2.0-flash-exp-image-generation" in model_name:
gen_content_conf.response_modalities.append("Image")
# TODO: append to user message instead.
if gen_content_conf.system_instruction:
gen_content_conf.system_instruction = None
log.warning(
"Image Generation model does not support the system prompt message! Removing the system prompt."
)
gen_content_conf.tools = []
# Add URL context tool if enabled and model is compatible
if valves.ENABLE_URL_CONTEXT_TOOL:
compatible_models_for_url_context = [
"gemini-2.5-pro-preview-05-06",
"gemini-2.5-flash-preview-05-20",
"gemini-2.0-flash",
"gemini-2.0-flash-live-001",
]
if model_name in compatible_models_for_url_context:
log.info(
f"Model {model_name} is compatible with URL context tool. Enabling."
)
gen_content_conf.tools.append(
types.Tool(url_context=types.UrlContext())
)
else:
log.warning(
f"URL context tool is enabled, but model {model_name} is not in the compatible list. Skipping."
)
if features.get("google_search_tool"):
log.info("Using grounding with Google Search as a Tool.")
gen_content_conf.tools.append(
types.Tool(google_search=types.GoogleSearch())
)
elif features.get("google_search_retrieval"):
log.info("Using grounding with Google Search Retrieval.")
gs = types.GoogleSearchRetrieval(
dynamic_retrieval_config=types.DynamicRetrievalConfig(
dynamic_threshold=features.get("google_search_retrieval_threshold")
)
)
gen_content_conf.tools.append(types.Tool(google_search_retrieval=gs))
# NB: It is not possible to use both Search and Code execution at the same time,
# however, it can be changed later, so let's just handle it as a common error
if features.get("google_code_execution"):
log.info("Using code execution on Google side.")
gen_content_conf.tools.append(
types.Tool(code_execution=types.ToolCodeExecution())
)
gen_content_args = {
"model": model_name,
"contents": contents,
"config": gen_content_conf,
}
log.debug("Passing these args to the Google API:", payload=gen_content_args)
# Add native function call tools
# NOTE: Only global tool servers are supported for native function calling.
# User tool servers are not supported because the `tool.callable` attribute
# is not available in the metadata passed to this pipe.
if __metadata__.get("function_calling") == "native":
for tool in __tools__.values():
spec = tool.get("spec")
callable = tool.get("callable", None)
if spec and callable:
gen_content_conf.tools.append(
types.Tool(function_declarations=[{k: v for k, v in spec.items() if k != "type"}])
)
if body.get("stream", False):
# Streaming response
response_stream: AsyncIterator[types.GenerateContentResponse] = (
await client.aio.models.generate_content_stream(**gen_content_args) # type: ignore
)
log.info("Streaming enabled. Returning AsyncGenerator.")
return self._stream_response_generator(
client,
response_stream,
__request__,
valves,
gen_content_args,
__event_emitter__,
__metadata__,
__user__["id"],
__tools__,
)
else:
# Non-streaming response.
if "gemini-2.0-flash-exp-image-generation" in model_name:
warn_msg = "Non-streaming responses with native image gen are not currently supported! Stay tuned! Please enable streaming."
await self._emit_error(warn_msg, __event_emitter__, exception=False) # Use existing error emitter
raise NotImplementedError(warn_msg)
# TODO: Support native image gen here too.
# TODO: Support code execution here too.
# TODO: Make this configurable.
MAX_FUNCTION_CALLS = 5 # Safety break for non-streaming tool call loops
num_function_calls_executed = 0
# Make a mutable copy of contents for the loop
current_contents_for_loop = list(gen_content_args["contents"])
while num_function_calls_executed < MAX_FUNCTION_CALLS:
log.debug(f"Non-streaming call to LLM. Iteration: {num_function_calls_executed + 1}")
# Prepare arguments for this iteration
iter_gen_content_args = {
"model": model_name, # Use the correctly derived model_name
"contents": current_contents_for_loop,
"config": gen_content_conf, # Use the correctly derived gen_content_conf
}
res = await client.aio.models.generate_content(**iter_gen_content_args)
candidate = self._get_first_candidate(res.candidates)
if not candidate or not candidate.content or not candidate.content.parts:
log.warning("Non-streaming response has no valid candidate or content parts.")
error_msg = "Model returned no content after a tool call or initially."
# If this happens after some tool calls, the toasts would have been shown.
# The final response will be an error.
await self._emit_error(error_msg, __event_emitter__, exception=False, done=True)
return ""
# Collect all text parts and function calls from the response
text_parts_content = []
function_calls_to_execute_this_turn = []
for part in candidate.content.parts:
if part.text:
text_parts_content.append(part.text)
elif part.function_call:
function_calls_to_execute_this_turn.append(part.function_call)
# If there's direct text output, it might be the final answer or text accompanying function calls.
# For Gemini, usually, if function calls are made, text is minimal or explanatory.
# We prioritize function calls if present. If only text, that's the final response.
if text_parts_content and not function_calls_to_execute_this_turn:
final_text_response = "".join(text_parts_content)
log.info("Non-streaming response (text) received successfully!")
log.debug("Non-streaming response content:", payload=final_text_response)
# Emit the final text content via completion event before post-processing
await self._emit_completion(__event_emitter__, content=final_text_response, done=False) # Send content, done=False for now
await self._do_post_processing(res, __event_emitter__, __metadata__, __request__) # This should ideally set done=True
return final_text_response
elif function_calls_to_execute_this_turn:
num_function_calls_executed += len(function_calls_to_execute_this_turn)
# Add the model's response (that asked for tools) to our history for the loop
current_contents_for_loop.append(types.Content(role="model", parts=candidate.content.parts))
tool_response_user_parts = [] # To collect FunctionResponseParts
for fc_to_exec in function_calls_to_execute_this_turn:
tool_name = fc_to_exec.name
tool_args_str = str(fc_to_exec.args) # For UI
if len(tool_args_str) > 100: tool_args_str = tool_args_str[:100] + "..."
await self._emit_toast(
msg=f"Calling tool `{tool_name}` with args: `{tool_args_str}`",
event_emitter=__event_emitter__, toastType="info"
)
log.info(f"Model asked to call function (non-streaming): {tool_name}")
actual_tool_result, error_for_llm = await self._process_function_call_part(
__metadata__.get("tools", {}), fc_to_exec
)
tool_response_payload_for_llm: dict[str, Any]
if error_for_llm:
await self._emit_toast(
msg=f"Error calling tool `{tool_name}`: {error_for_llm}",
event_emitter=__event_emitter__, toastType="error"
)
tool_response_payload_for_llm = {"result": error_for_llm}
else:
ui_tool_result_str = str(actual_tool_result)
if len(ui_tool_result_str) > 300: ui_tool_result_str = ui_tool_result_str[:300] + "..."
await self._emit_toast(
msg=f"Tool `{tool_name}` returned: `{ui_tool_result_str}`",
event_emitter=__event_emitter__, toastType="info"
)
if isinstance(actual_tool_result, (str, int, float, bool, list, dict)) or actual_tool_result is None:
tool_response_payload_for_llm = {"result": actual_tool_result}
else:
str_equiv_result = str(actual_tool_result)
tool_response_payload_for_llm = {"result": str_equiv_result}
log.warning(
f"Tool {tool_name} (non-streaming) returned complex object. Converted to string '{str_equiv_result[:100]}...'."
)
tool_response_user_parts.append(
types.Part.from_function_response(name=tool_name, response=tool_response_payload_for_llm)
)
# Add all tool responses as a single user message to the history for the loop
current_contents_for_loop.append(types.Content(role="user", parts=tool_response_user_parts))
# Loop continues to query the model with these tool results
else: # No text and no function calls, but parts were present.
log.warning("Non-streaming: Model returned no actionable content (text or function call).")
finish_reason_name = candidate.finish_reason.name if candidate.finish_reason else "UNKNOWN"
if finish_reason_name not in ("STOP", "MAX_TOKENS"):
error_msg = f"Model stopped unexpectedly. Finish reason: {finish_reason_name}"
await self._emit_error(error_msg, __event_emitter__, exception=False, done=True)
return ""
# Valid stop but empty response
await self._do_post_processing(res, __event_emitter__, __metadata__, __request__) # This should set done=True
return "" # Exit loop
if num_function_calls_executed >= MAX_FUNCTION_CALLS:
log.warning(f"Reached max function calls ({MAX_FUNCTION_CALLS}) in non-streaming mode.")
error_msg = "Exceeded maximum number of tool calls. The conversation will stop."
await self._emit_error(error_msg, __event_emitter__, exception=False, done=True)
return "Error: Exceeded maximum tool calls."
# Fallback, should ideally be covered by conditions inside the loop
log.warning("Non-streaming loop exited without returning a text response or hitting max calls.")
await self._emit_error("Unexpected end of non-streaming processing.", __event_emitter__, done=True)
return ""
# region 1. Helper methods inside the Pipe class
# region 1.1 Client initialization
@staticmethod
@cache
def _get_or_create_genai_client(
api_key: str | None = None,
base_url: str | None = None,
use_vertex_ai: bool | None = None,
vertex_project: str | None = None,
vertex_location: str | None = None,
) -> genai.Client:
"""
Creates a genai.Client instance or retrieves it from cache.
Raises GenaiApiError on failure.
"""
if not vertex_project and not api_key:
# FIXME: More detailed reason in the exception (tell user to set the API key).
msg = "Neither VERTEX_PROJECT nor GEMINI_API_KEY is set."
raise GenaiApiError(msg)
if use_vertex_ai and vertex_project:
kwargs = {
"vertexai": True,
"project": vertex_project,
"location": vertex_location,
}
api = "Vertex AI"
else: # Covers (use_vertex_ai and not vertex_project) OR (not use_vertex_ai)
if use_vertex_ai and not vertex_project:
log.warning(
"Vertex AI is enabled but no project is set. "
"Using Gemini Developer API."
)
# This also implicitly covers the case where api_key might be None,
# which is handled by the initial check or the SDK.
kwargs = {
"api_key": api_key,
"http_options": types.HttpOptions(base_url=base_url),
}
api = "Gemini Developer API"
try:
client = genai.Client(**kwargs)
log.success(f"{api} Genai client successfully initialized.")
return client
except Exception as e:
raise GenaiApiError(f"{api} Genai client initialization failed: {e}") from e
def _get_user_client(self, valves: "Pipe.Valves", user_email: str) -> genai.Client:
user_whitelist = (
valves.AUTH_WHITELIST.split(",") if valves.AUTH_WHITELIST else []
)
log.debug(
f"User whitelist: {user_whitelist}, user email: {user_email}, "
f"USER_MUST_PROVIDE_AUTH_CONFIG: {valves.USER_MUST_PROVIDE_AUTH_CONFIG}"
)
if valves.USER_MUST_PROVIDE_AUTH_CONFIG and user_email not in user_whitelist:
if not valves.GEMINI_API_KEY:
error_msg = (
"User must provide their own authentication configuration. "
"Please set GEMINI_API_KEY in your UserValves."
)
raise ValueError(error_msg)
try:
client_args = self._prepare_client_args(valves)
client = self._get_or_create_genai_client(*client_args)
except GenaiApiError as e:
error_msg = f"Failed to initialize genai client for user {user_email}: {e}"
# FIXME: include correct traceback.
raise ValueError(error_msg) from e
return client
@staticmethod
def _prepare_client_args(
source_valves: "Pipe.Valves | Pipe.UserValves",
) -> list[str | bool | None]:
"""Prepares arguments for _get_or_create_genai_client from source_valves."""
ATTRS = [
"GEMINI_API_KEY",
"GEMINI_API_BASE_URL",
"USE_VERTEX_AI",
"VERTEX_PROJECT",
"VERTEX_LOCATION",
]
return [getattr(source_valves, attr) for attr in ATTRS]
# endregion 1.1 Client initialization
# region 1.2 Model retrival from Google API
@cached() # aiocache.cached for async method
async def _get_genai_models(
self,
api_key: str | None,
base_url: str | None,
use_vertex_ai: bool | None, # User's preference from config
vertex_project: str | None,
vertex_location: str | None,
whitelist_str: str,
blacklist_str: str | None,
) -> list["ModelData"]:
"""
Gets valid Google models from API(s) and filters them.
If use_vertex_ai, vertex_project, and api_key are all provided,
models are fetched from both Vertex AI and Gemini Developer API and merged.
"""
all_raw_models: list[types.Model] = []
# Condition for fetching from both sources
fetch_both = bool(use_vertex_ai and vertex_project and api_key)
if fetch_both:
log.info(
"Attempting to fetch models from both Gemini Developer API and Vertex AI."
)
gemini_models_list: list[types.Model] = []
vertex_models_list: list[types.Model] = []
# TODO: perf, consider parallelizing these two fetches
# 1. Fetch from Gemini Developer API
try:
gemini_client = self._get_or_create_genai_client(
api_key=api_key,
base_url=base_url,
use_vertex_ai=False, # Explicitly target Gemini API
vertex_project=None,
vertex_location=None,
)
gemini_models_list = await self._fetch_models_from_client_internal(
gemini_client, "Gemini Developer API"
)
except GenaiApiError as e:
log.warning(
f"Failed to initialize or retrieve models from Gemini Developer API: {e}"
)
except Exception as e:
log.warning(
f"An unexpected error occurred with Gemini Developer API models: {e}",
exc_info=True,
)
# 2. Fetch from Vertex AI
try:
vertex_client = self._get_or_create_genai_client(
use_vertex_ai=True, # Explicitly target Vertex AI
vertex_project=vertex_project,
vertex_location=vertex_location,
api_key=None, # API key is not used for Vertex AI with project auth
base_url=base_url, # Pass base_url for potential Vertex custom endpoints
)
vertex_models_list = await self._fetch_models_from_client_internal(
vertex_client, "Vertex AI"
)
except GenaiApiError as e:
log.warning(
f"Failed to initialize or retrieve models from Vertex AI: {e}"
)
except Exception as e:
log.warning(
f"An unexpected error occurred with Vertex AI models: {e}",
exc_info=True,
)
# 3. Combine and de-duplicate
# Prioritize models from Gemini Developer API in case of ID collision
combined_models_dict: dict[str, types.Model] = {}
for model in gemini_models_list:
if model.name:
model_id = Pipe.strip_prefix(model.name)
if model_id and model_id not in combined_models_dict:
combined_models_dict[model_id] = model
else:
log.trace(
f"Gemini model without a name encountered: {model.display_name or 'N/A'}"
)
for model in vertex_models_list:
if model.name:
model_id = Pipe.strip_prefix(model.name)
if model_id:
if model_id not in combined_models_dict:
combined_models_dict[model_id] = model
else:
log.info(
f"Duplicate model ID '{model_id}' from Vertex AI already sourced from Gemini API. Keeping Gemini API version."
)
else:
log.trace(
f"Vertex AI model without a name encountered: {model.display_name or 'N/A'}"
)
all_raw_models = list(combined_models_dict.values())
log.info(
f"Fetched {len(gemini_models_list)} models from Gemini API, "
f"{len(vertex_models_list)} from Vertex AI. "
f"Combined to {len(all_raw_models)} unique models."
)
if not all_raw_models and (gemini_models_list or vertex_models_list):
log.warning(
"Models were fetched but resulted in an empty list after de-duplication, possibly due to missing names or empty/duplicate IDs."
)
if not all_raw_models and not gemini_models_list and not vertex_models_list:
raise GenaiApiError(
"Failed to retrieve models: Both Gemini Developer API and Vertex AI attempts yielded no models."
)
else: # Single source logic
# Determine if we are effectively using Vertex AI or Gemini API
# This depends on user's config (use_vertex_ai) and availability of project/key
client_target_is_vertex = bool(use_vertex_ai and vertex_project)
client_source_name = (
"Vertex AI" if client_target_is_vertex else "Gemini Developer API"
)
log.info(
f"Attempting to fetch models from a single source: {client_source_name}."
)
try:
client = self._get_or_create_genai_client(
api_key=api_key,
base_url=base_url,
use_vertex_ai=client_target_is_vertex, # Pass the determined target
vertex_project=vertex_project if client_target_is_vertex else None,
vertex_location=(
vertex_location if client_target_is_vertex else None
),
)
all_raw_models = await self._fetch_models_from_client_internal(
client, client_source_name
)
if not all_raw_models:
raise GenaiApiError(
f"No models retrieved from {client_source_name}. This could be due to an API error, network issue, or no models being available."
)
except GenaiApiError as e:
raise GenaiApiError(
f"Failed to get models from {client_source_name}: {e}"
) from e
except Exception as e:
log.error(
f"An unexpected error occurred while configuring client or fetching models from {client_source_name}: {e}",
exc_info=True,
)
raise GenaiApiError(
f"An unexpected error occurred while retrieving models from {client_source_name}: {e}"
) from e
# --- Common processing for all_raw_models ---
if not all_raw_models:
log.warning("No models available after attempting all configured sources.")
return []
log.info(f"Processing {len(all_raw_models)} unique raw models.")
generative_models: list[types.Model] = []
for model in all_raw_models:
if model.name is None:
log.trace(
f"Skipping model with no name during generative filter: {model.display_name or 'N/A'}"
)
continue
actions = model.supported_actions
if (
actions is None or "generateContent" in actions
): # Includes models if actions is None (e.g., Vertex)
generative_models.append(model)
else:
log.trace(
f"Model '{model.name}' (ID: {Pipe.strip_prefix(model.name)}) skipped, not generative (actions: {actions})."
)
if not generative_models:
log.warning(
"No generative models found after filtering all retrieved models."
)
return []
def match_patterns(
name_to_check: str, list_of_patterns_str: str | None
) -> bool:
if not list_of_patterns_str:
return False
patterns = [
pat for pat in list_of_patterns_str.replace(" ", "").split(",") if pat
] # Ensure pat is not empty
return any(fnmatch.fnmatch(name_to_check, pat) for pat in patterns)
filtered_models_data: list["ModelData"] = []
for model in generative_models:
# model.name is guaranteed non-None by generative_models filter logic
stripped_name = Pipe.strip_prefix(model.name) # type: ignore
if not stripped_name:
log.warning(
f"Model '{model.name}' (display: {model.display_name}) resulted in an empty ID after stripping. Skipping."
)
continue
passes_whitelist = not whitelist_str or match_patterns(
stripped_name, whitelist_str
)
passes_blacklist = not blacklist_str or not match_patterns(
stripped_name, blacklist_str
)
if passes_whitelist and passes_blacklist:
filtered_models_data.append(
{
"id": stripped_name,
"name": model.display_name or stripped_name,
"description": model.description,
}
)
else:
log.trace(
f"Model ID '{stripped_name}' filtered out by whitelist/blacklist. Whitelist match: {passes_whitelist}, Blacklist pass: {passes_blacklist}"
)
log.info(
f"Filtered {len(generative_models)} generative models down to {len(filtered_models_data)} models based on white/blacklists."
)
return filtered_models_data
# TODO: Use cache for this method too?
async def _fetch_models_from_client_internal(
self, client: genai.Client, source_name: str
) -> list[types.Model]:
"""Helper to fetch models from a given client and handle common exceptions."""
try:
google_models_pager = await client.aio.models.list(
config={"query_base": True} # Fetch base models by default
)
models = [model async for model in google_models_pager]
log.info(f"Retrieved {len(models)} models from {source_name}.")
log.trace(
f"All models returned by {source_name}:", payload=models
) # Can be verbose
return models
except Exception as e:
log.error(f"Retrieving models from {source_name} failed: {e}")
# Return empty list; caller decides if this is fatal for the whole operation.
return []
@staticmethod
def _return_error_model(
error_msg: str, warning: bool = False, exception: bool = True
) -> "ModelData":
"""Returns a placeholder model for communicating error inside the pipes method to the front-end."""
if warning:
log.opt(depth=1, exception=False).warning(error_msg)
else:
log.opt(depth=1, exception=exception).error(error_msg)
return {
"id": "error",
"name": "[gemini_manifold] " + error_msg,
"description": error_msg,
}
@staticmethod
def strip_prefix(model_name: str) -> str:
"""
Extract the model identifier using regex, handling various naming conventions.
e.g., "gemini_manifold_google_genai.gemini-2.5-flash-preview-04-17" -> "gemini-2.5-flash-preview-04-17"
e.g., "models/gemini-1.5-flash-001" -> "gemini-1.5-flash-001"
e.g., "publishers/google/models/gemini-1.5-pro" -> "gemini-1.5-pro"
"""
# Use regex to remove everything up to and including the last '/' or the first '.'
stripped = re.sub(r"^(?:.*/|[^.]*\.)", "", model_name)
return stripped
# endregion 1.2 Model retrival from Google API
# region 1.3 Open WebUI's body.messages -> list[genai.types.Content] conversion
def _pop_system_prompt(self, messages: list["Message"]) -> str | None:
"""
Pops the system prompt from the messages list.
System prompt is always the first message in the list.
"""
if not messages:
return None
first_message = messages[0]
if first_message.get("role") == "system":
first_message = cast("SystemMessage", first_message)
system_prompt = first_message.get("content")
log.info("System prompt found in the messages list.")
log.debug("System prompt:", payload=system_prompt)
messages.pop(0)
return system_prompt
return None
async def _genai_contents_from_messages(
self,
messages_body: list["Message"],
messages_db: list["ChatMessageTD"] | None,
upload_documents: bool,
event_emitter: Callable[["Event"], Awaitable[None]],
) -> list[types.Content]:
"""Transforms `body.messages` list into list of `genai.types.Content` objects"""
contents = []
parts = []
for i, message in enumerate(messages_body):
role = message.get("role")
if role == "user":
message = cast("UserMessage", message)
files = []
if messages_db:
message_db = messages_db[i]
if upload_documents:
files = message_db.get("files", [])
parts = await self._process_user_message(message, files, event_emitter)
elif role == "assistant":
message = cast("AssistantMessage", message)
# Google API's assistant role is "model"
role = "model"
sources = None
if messages_db:
message_db = messages_db[i]
sources = message_db.get("sources")
parts = self._process_assistant_message(message, sources)
else:
warn_msg = f"Message {i} has an invalid role: {role}. Skipping to the next message."
log.warning(warn_msg)
await self._emit_toast(warn_msg, event_emitter, "warning")
continue
contents.append(types.Content(role=role, parts=parts))
return contents
async def _process_user_message(
self,
message: "UserMessage",
files: list["FileAttachmentTD"],
event_emitter: Callable[["Event"], Awaitable[None]],
) -> list[types.Part]:
user_parts = []
if files:
log.info(f"Adding {len(files)} files to the user message.")
for file in files:
log.debug("Processing file:", payload=file)
file_id = file.get("file", {}).get("id")
document_bytes, mime_type = self._get_file_data(file_id)
if not document_bytes or not mime_type:
# Warnings are logged by the method above.
continue
if mime_type.startswith("text/") or mime_type == "application/pdf":
log.debug(
f"{mime_type} is supported by Google API! Creating `types.Part` model for it."
)
user_parts.append(
types.Part.from_bytes(data=document_bytes, mime_type=mime_type)
)
else:
warn_msg = f"{mime_type} is not supported by Google API! Skipping file {file_id}."
log.warning(warn_msg)
await self._emit_toast(warn_msg, event_emitter, "warning")
user_content = message.get("content")
if isinstance(user_content, str):
user_content_list: list["Content"] = [
{"type": "text", "text": user_content}
]
elif isinstance(user_content, list):
user_content_list = user_content
else:
warn_msg = f"User message content is not a string or list, skipping to the next message."
log.warning(warn_msg)
await self._emit_toast(warn_msg, event_emitter, "warning")
return user_parts
for c in user_content_list:
c_type = c.get("type")
if c_type == "text":
c = cast("TextContent", c)
# Don't process empty strings.
if c_text := c.get("text"):
# YouTube URL extraction is now handled by _genai_parts_from_text
user_parts.extend(self._genai_parts_from_text(c_text))
elif c_type == "image_url":
c = cast("ImageContent", c)
if img_part := self._genai_part_from_image_url(
c.get("image_url").get("url")
):
user_parts.append(img_part)
else:
warn_msg = f"User message content type {c_type} is not supported, skipping to the next message."
log.warning(warn_msg)
await self._emit_toast(warn_msg, event_emitter, "warning")
continue
return user_parts
def _process_assistant_message(
self, message: "AssistantMessage", sources: list["Source"] | None
) -> list[types.Part]:
assistant_text = message.get("content")
if sources:
assistant_text = self._remove_citation_markers(assistant_text, sources)
return self._genai_parts_from_text(assistant_text)
def _genai_part_from_image_url(self, image_url: str) -> types.Part | None:
"""
Processes an image URL and returns a genai.types.Part object from it
Handles GCS, data URIs, and standard URLs.
"""
try:
if image_url.startswith("gs://"):
# FIXME: mime type helper would error out here, it only handles filenames.
return types.Part.from_uri(
file_uri=image_url, mime_type=self._get_mime_type(image_url)
)
elif image_url.startswith("data:image"):
match = re.match(r"data:(image/\w+);base64,(.+)", image_url)
if match:
return types.Part.from_bytes(
data=base64.b64decode(match.group(2)),
mime_type=match.group(1),
)
else:
raise ValueError("Invalid data URI for image.")
else: # Assume standard URL
# FIXME: mime type helper would error out here too, it only handles filenames.
return types.Part.from_uri(
file_uri=image_url, mime_type=self._get_mime_type(image_url)
)
except Exception:
# TODO: Send warnin toast to user in front-end.
log.exception(f"Error processing image URL: {image_url[:64]}[...]")
return None
def _genai_parts_from_text(self, text: str) -> list[types.Part]:
parts: list[types.Part] = []
last_pos = 0
# Regex to find markdown images or YouTube URLs
# Markdown image: !\[.*?\]\((data:(image/[^;]+);base64,([^)]+)|/api/v1/files/([a-f0-9\-]+)/content)\)
# YouTube URL: (https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/)[^&\s]+)
pattern = re.compile(
r"!\[.*?\]\((data:(image/[^;]+);base64,([^)]+)|/api/v1/files/([a-f0-9\-]+)/content)\)|"
r"(https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/)[^&\s]+)"
)
for match in pattern.finditer(text):
# Add text before the current match
text_segment = text[last_pos : match.start()]
if text_segment.strip(): # Ensure non-empty, stripped text
parts.append(types.Part.from_text(text=text_segment.strip()))
# Determine if it's an image or a YouTube URL
if match.group(1): # It's a markdown image
if match.group(2): # Base64 encoded image
try:
mime_type = match.group(2)
base64_data = match.group(3)
image_part = types.Part.from_bytes(
data=base64.b64decode(base64_data),
mime_type=mime_type,
)
parts.append(image_part)
except Exception:
log.exception("Error decoding base64 image:")
elif match.group(4): # File URL for image
file_id = match.group(4)
image_bytes, mime_type = self._get_file_data(file_id)
if image_bytes and mime_type:
image_part = types.Part.from_bytes(
data=image_bytes, mime_type=mime_type
)
parts.append(image_part)
elif match.group(5): # It's a YouTube URL
youtube_url = match.group(5)
log.info(f"Found YouTube URL: {youtube_url}")
parts.append(types.Part(file_data=types.FileData(file_uri=youtube_url)))
last_pos = match.end()
# Add remaining text after the last match
remaining_text = text[last_pos:]
if remaining_text.strip(): # Ensure non-empty, stripped text
parts.append(types.Part.from_text(text=remaining_text.strip()))
# If no matches were found at all (e.g. plain text), the original text (stripped) is added as a single part.
if not parts and text.strip():
parts.append(types.Part.from_text(text=text.strip()))
# If parts list is empty and original text was only whitespace, return empty list.
# Otherwise, if parts were added, or if it was plain text, it's handled above.
# This check ensures that if text was " ", we don't add a Part for " ".
# The .strip() in the conditions above should handle this, but as a safeguard:
if not parts and not text.strip():
return []
return parts
def _extract_youtube_urls(self, text: str) -> list[str]:
"""
Extracts YouTube URLs from a given text.
Supports standard youtube.com/watch?v= URLs and shortened youtu.be URLs
"""
youtube_urls = []
# Match standard YouTube URLs
for match in re.finditer(
r"https?://(?:www\.)?youtube\.com/watch\?v=[^&\s]+", text
):
youtube_urls.append(match.group(0))
# Match shortened YouTube URLs
for match in re.finditer(r"https?://(?:www\.)?youtu\.be/[^&\s]+", text):
youtube_urls.append(match.group(0))
if youtube_urls:
# TODO: toast
log.info(f"Extracted YouTube URLs: {youtube_urls}")
return youtube_urls
# endregion 1.3 Open WebUI's body.messages -> list[genai.types.Content] conversion
# region 1.4 Model response streaming
async def _stream_response_generator(
self,
client: genai.Client,
response_stream: AsyncIterator[types.GenerateContentResponse],
__request__: Request,
valves: "Pipe.Valves",
gen_content_args: dict,
event_emitter: Callable[["Event"], Awaitable[None]],
metadata: dict[str, Any],
user_id: str,
tools: dict[str, dict] = {},
is_function_call_response: bool = False,
) -> AsyncGenerator[str, None]:
"""
Yields text chunks from the stream and spawns metadata processing task on completion.
"""
final_response_chunk: types.GenerateContentResponse | None = None
error_occurred = False
# Start thinking timer (model name check is inside this method).
model_name = gen_content_args.get("model", "")
thinking_budget = valves.THINKING_BUDGET
start_time, thinking_timer_task = None, None
if not is_function_call_response: # Start timer only for initial call, not recursive ones
start_time, thinking_timer_task = await self._start_thinking_timer(
model_name, event_emitter, thinking_budget
)
try:
async for chunk in response_stream:
final_response_chunk = chunk
if not is_function_call_response and thinking_timer_task: # Stop timer on first chunk of initial call
await self._cancel_thinking_timer(
thinking_timer_task, start_time, event_emitter, model_name, thinking_budget
)
# Set timer task to None to avoid duplicate cancellation in finally block and subsequent stream chunks.
thinking_timer_task = None
if not (candidate := self._get_first_candidate(chunk.candidates)):
log.warning("Stream chunk has no candidates, skipping.")
continue
if not (parts := candidate.content and candidate.content.parts):
log.warning(
"candidate does not contain content or content.parts, skipping."
)
continue
# Process parts and yield text
collected_fcs_for_this_chunk = []
# First pass: yield immediate content like text, images, code display
for part in parts:
# To my knowledge it's not possible for a part to have multiple fields below at the same time.
if part.text:
yield part.text
elif part.inline_data:
# _process_image_part returns a Markdown URL.
yield (
self._process_image_part(
part.inline_data,
gen_content_args,
user_id,
__request__,
)
or ""
)
elif part.executable_code:
yield (
self._process_executable_code_part(part.executable_code)
or ""
)
elif part.code_execution_result:
yield (
self._process_code_execution_result_part(
part.code_execution_result
)
or ""
)
elif part.function_call:
collected_fcs_for_this_chunk.append(part.function_call)
if collected_fcs_for_this_chunk:
# Process all collected function calls from this chunk
# These are the parts from the model that requested the function calls
model_fc_request_parts = [types.Part(function_call=fc) for fc in collected_fcs_for_this_chunk]
# We need to create a mutable list of contents for this turn's interactions
current_turn_contents = list(gen_content_args["contents"])
current_turn_contents.append(types.Content(role="model", parts=model_fc_request_parts))
tool_response_user_parts = [] # To collect FunctionResponseParts
for fc_to_exec in collected_fcs_for_this_chunk:
# UI Feedback: Calling tool
yield f"\n\nAssistant: Calling tool `{fc_to_exec.name}` with arguments: `{fc_to_exec.args}`\n\n"
actual_tool_result, error_for_llm = await self._process_function_call_part(
tools, fc_to_exec
)
tool_response_payload_for_llm: dict[str, Any]
if error_for_llm:
yield f"\n\nAssistant: Error calling tool `{fc_to_exec.name}`: {error_for_llm}\n\n"
tool_response_payload_for_llm = {"result": error_for_llm}
else:
ui_tool_result_str = str(actual_tool_result)
if len(ui_tool_result_str) > 300:
ui_tool_result_str = ui_tool_result_str[:300] + "..."
yield f"\n\nAssistant: Tool `{fc_to_exec.name}` returned: `{ui_tool_result_str}`\n\n"
# Ensure result is JSON serializable for the API
if isinstance(actual_tool_result, (str, int, float, bool, list, dict)) or actual_tool_result is None:
tool_response_payload_for_llm = {"result": actual_tool_result}
else:
str_equiv_result = str(actual_tool_result)
tool_response_payload_for_llm = {"result": str_equiv_result}
log.warning(
f"Tool {fc_to_exec.name} returned a complex object of type {type(actual_tool_result)}. "
f"Converted to string '{str_equiv_result[:100]}...' for function response to LLM."
)
tool_response_user_parts.append(
types.Part.from_function_response(name=fc_to_exec.name, response=tool_response_payload_for_llm)
)
current_turn_contents.append(types.Content(role="user", parts=tool_response_user_parts))
# Create a new gen_content_args for the recursive call to ensure 'contents' is updated correctly.
recursive_gen_content_args = gen_content_args.copy()
recursive_gen_content_args["contents"] = current_turn_contents
# Make the recursive call to the LLM with tool responses
response_stream_recursive: AsyncIterator[types.GenerateContentResponse] = (
await client.aio.models.generate_content_stream(**recursive_gen_content_args) # type: ignore
)
# Yield from the recursive call
async for text_chunk_recursive in self._stream_response_generator(
client, response_stream_recursive, __request__, valves,
recursive_gen_content_args, # Pass the updated args for this recursive leg
event_emitter, metadata, user_id,
tools,
is_function_call_response=True # Indicate this is a recursive call
):
yield text_chunk_recursive
except Exception as e:
error_occurred = True
# Ensure timer is cancelled if it was started and an error occurs early
if not is_function_call_response and thinking_timer_task:
await self._cancel_thinking_timer(
thinking_timer_task, start_time, event_emitter, model_name, thinking_budget, errored=True
)
thinking_timer_task = None
error_msg = f"Stream ended with error: {e}"
log.exception(error_msg) # Log full exception
# Yield the error message to the user as part of the stream
yield f"\n\nAssistant: An error occurred during streaming: {error_msg}\n\n"
# We might not want to call _emit_error here if we are yielding, to avoid double messages.
# The `finally` block will handle post-processing.
finally:
if not is_function_call_response: # Only main call handles final timer cancellation and post-processing
await self._cancel_thinking_timer(
thinking_timer_task, start_time, event_emitter, model_name, thinking_budget, errored=error_occurred
)
log.info("Stream processing completed.")
log.debug("Last chunk received (if any):", payload=final_response_chunk)
try:
await self._do_post_processing(
final_response_chunk, event_emitter, metadata, __request__, error_occurred
)
except Exception as e:
post_proc_error_msg = f"Post-processing failed with error: {e}"
log.exception(post_proc_error_msg)
await self._emit_toast(post_proc_error_msg, event_emitter, "error")
log.debug("AsyncGenerator for initial call finished its run.")
def _process_image_part(
self, inline_data, gen_content_args: dict, user_id: str, request: Request
) -> str | None:
"""Handles image data conversion to markdown."""
mime_type = inline_data.mime_type
image_data = inline_data.data
if self.valves.USE_FILES_API:
image_url = self._upload_image(
image_data,
mime_type,
gen_content_args.get("model", ""),
"Not implemented yet. TAKE IT FROM gen_content_args contents",
user_id,
request,
)
return f"" if image_url else None
else:
encoded = base64.b64encode(image_data).decode()
return f""
def _upload_image(
self,
image_data: bytes,
mime_type: str,
model: str,
prompt: str,
user_id: str,
__request__: Request,
) -> str | None:
"""
Helper method that uploads the generated image to a storage provider configured inside Open WebUI settings.
Returns the url to uploaded image.
"""
image_format = mimetypes.guess_extension(mime_type)
id = str(uuid.uuid4())
# TODO: Better filename? Prompt as the filename?
name = os.path.basename(f"generated-image{image_format}")
imagename = f"{id}_{name}"
image = io.BytesIO(image_data)
image_metadata = {
"model": model,
"prompt": prompt,
}
# Upload the image to user configured storage provider.
log.info("Uploading the model generated image to Open WebUI backend.")
log.debug("Uploading to the configured storage provider.")
try:
# Dynamically check if 'tags' parameter exists
sig = inspect.signature(Storage.upload_file)
has_tags = "tags" in sig.parameters
except Exception as e:
log.error(f"Error checking Storage.upload_file signature: {e}")
has_tags = False # Default to old behavior
try:
if has_tags:
# New version with tags support >=v0.6.6
contents, image_path = Storage.upload_file(image, imagename, tags={})
else:
# Old version without tags <v0.6.5
contents, image_path = Storage.upload_file(image, imagename) # type: ignore
except Exception:
error_msg = "Error occurred during upload to the storage provider."
log.exception(error_msg)
return None
# Add the image file to files database.
log.debug("Adding the image file to Open WebUI files database.")
file_item = Files.insert_new_file(
user_id,
FileForm(
id=id,
filename=name,
path=image_path,
meta={
"name": name,
"content_type": mime_type,
"size": len(contents),
"data": image_metadata,
},
),
)
if not file_item:
log.warning(
"Files.insert_new_file did not return anything. Image upload to Open WebUI database likely failed."
)
return None
# Get the image url.
image_url: str = __request__.app.url_path_for(
"get_file_content_by_id", id=file_item.id
)
log.success("Image upload finished!")
return image_url
def _process_executable_code_part(
self, executable_code_part: types.ExecutableCode | None
) -> str | None:
"""
Processes an executable code part and returns the formatted string representation.
"""
if not executable_code_part:
return None
lang_name = "python" # Default language
if executable_code_part_lang_enum := executable_code_part.language:
if lang_name := executable_code_part_lang_enum.name:
lang_name = executable_code_part_lang_enum.name.lower()
else:
log.warning(
f"Could not extract language name from {executable_code_part_lang_enum}. Default to python."
)
else:
log.warning("Language Enum is None, defaulting to python.")
if executable_code_part_code := executable_code_part.code:
return f"```{lang_name}\n{executable_code_part_code.rstrip()}\n```\n\n"
return ""
def _process_code_execution_result_part(
self, code_execution_result_part: types.CodeExecutionResult | None
) -> str | None:
"""
Processes a code execution result part and returns the formatted string representation.
"""
if not code_execution_result_part:
return None
if code_execution_result_part_output := code_execution_result_part.output:
return f"**Output:**\n\n```\n{code_execution_result_part_output.rstrip()}\n```\n\n"
else:
return None
async def _process_function_call_part(
self, tools: dict[str, dict], function_call_part: types.FunctionCall | None
) -> tuple[Any | None, str | None]: # Returns (actual_tool_result, error_string_for_llm_if_any)
"""
Executes a function call and returns its actual result.
If an error occurs (tool not found, not callable, execution error),
it returns None for the result and an error message string for the LLM.
"""
if not function_call_part:
return None, "Error: Function call part was None."
tool_config = tools.get(function_call_part.name, None)
if not tool_config:
error_msg = f"Error: Tool '{function_call_part.name}' not found."
log.warning(error_msg)
return None, error_msg
function_callable = tool_config.get("callable")
if not callable(function_callable):
error_msg = f"Error: Callable for tool '{function_call_part.name}' not found or not callable."
log.warning(error_msg)
return None, error_msg
try:
log.debug(f"Calling tool '{function_call_part.name}' with args: {function_call_part.args}")
funtion_call_result = await function_callable(**function_call_part.args)
result_str_snippet = str(funtion_call_result)
if len(result_str_snippet) > 200:
result_str_snippet = result_str_snippet[:200] + "..."
log.info(f"Tool '{function_call_part.name}' executed successfully. Result snippet: {result_str_snippet}")
return funtion_call_result, None
except Exception as e:
error_msg = f"Error executing tool {function_call_part.name}: {str(e)}"
log.exception(error_msg)
return None, error_msg
# endregion 1.4 Model response streaming
# region 1.5 Thinking status message
def _get_budget_str(self, model_name: str, thinking_budget: int) -> str:
return (
f" • {thinking_budget} tokens budget"
if model_name == "gemini-2.5-flash-preview-04-17" and thinking_budget > 0
else ""
)
def is_thinking_model(self, model_id: str) -> bool:
"""Check if the model is a thinking model based on the valve pattern."""
try:
result = bool(
re.search(self.valves.THINKING_MODEL_PATTERN, model_id, re.IGNORECASE)
)
return result
except Exception:
log.exception("Error checking if model is a thinking model")
return False
async def _start_thinking_timer(
self,
model_name: str,
event_emitter: Callable[["Event"], Awaitable[None]],
thinking_budget: int,
) -> tuple[float | None, asyncio.Task[None] | None]:
# Check if this is a thinking model and exit early if not.
# Exit also if thinking budget is explicitly set to 0 and Gemini 2.5 Flash is selected.
if not self.is_thinking_model(model_name) or (
thinking_budget == 0 and model_name == "gemini-2.5-flash-preview-04-17"
):
return None, None
# Indicates if emitted status messages should be visible in the front-end.
hidden = not self.valves.EMIT_STATUS_UPDATES
# Emit initial 'Thinking' status
await self._emit_status(
f"Thinking • 0s{self._get_budget_str(model_name, thinking_budget)}",
event_emitter=event_emitter,
done=False,
hidden=hidden,
)
# Record the start time
start_time = time.time()
# Start the thinking timer
# NOTE: It's important to note that the model could not be actually thinking
# when the status message starts. API could be just slow or the chat data
# payload could still be uploading.
thinking_timer_task = asyncio.create_task(
self._thinking_timer(
event_emitter, model_name, thinking_budget, hidden=hidden
)
)
return start_time, thinking_timer_task
async def _thinking_timer(
self,
event_emitter: Callable[["Event"], Awaitable[None]],
model_name: str,
thinking_budget: int,
hidden=False,
) -> None:
"""Asynchronous task to emit periodic status updates."""
elapsed = 0
try:
log.info("Thinking timer started.")
while True:
await asyncio.sleep(self.valves.EMIT_INTERVAL)
elapsed += self.valves.EMIT_INTERVAL
# Format elapsed time
if elapsed < 60:
time_str = f"{elapsed}s"
else:
minutes, seconds = divmod(elapsed, 60)
time_str = f"{minutes}m {seconds}s"
status_message = f"Thinking • {time_str}{self._get_budget_str(model_name, thinking_budget)}"
await self._emit_status(
status_message,
event_emitter=event_emitter,
done=False,
hidden=hidden,
)
except asyncio.CancelledError:
log.debug("Timer task cancelled.")
except Exception:
log.exception("Error in timer task")
async def _cancel_thinking_timer(
self,
timer_task: asyncio.Task[None] | None,
start_time: float | None,
event_emitter: Callable[["Event"], Awaitable[None]],
model_name: str,
thinking_budget: int,
errored: bool = False,
):
# Check if task was already canceled.
if not timer_task:
return
# Cancel the timer.
timer_task.cancel()
try:
await timer_task
except asyncio.CancelledError:
log.info(f"Thinking timer task successfully cancelled.")
except Exception:
log.exception(f"Error cancelling thinking timer task.")
# Indicates if emitted status messages should be visible in the front-end.
hidden = not self.valves.EMIT_STATUS_UPDATES
if errored:
final_status = f"Thinking phase encountered an error."
await self._emit_status(final_status, event_emitter=event_emitter, done=True, hidden=True)
elif start_time:
# ... (existing success message logic) ...
total_elapsed = int(time.time() - start_time)
if total_elapsed < 60:
total_time_str = f"{total_elapsed}s"
else:
minutes, seconds = divmod(total_elapsed, 60)
total_time_str = f"{minutes}m {seconds}s"
final_status = f"Thinking completed • took {total_time_str}{self._get_budget_str(model_name, thinking_budget)}"
await self._emit_status(
final_status, event_emitter=event_emitter, done=True, hidden=hidden
)
else: # Timer was likely never truly started or cancelled before any progress
final_status = f"Thinking phase concluded." # Generic message
await self._emit_status(final_status, event_emitter=event_emitter, done=True, hidden=True)
# endregion 1.5 Thinking status message
# region 1.6 Post-processing
async def _do_post_processing(
self,
model_response: types.GenerateContentResponse | None,
event_emitter: Callable[["Event"], Awaitable[None]],
metadata: dict[str, Any],
request: Request,
stream_error_happened: bool = False,
):
"""Handles emitting usage, grounding, and sources after the main response/stream is done."""
log.info("Post-processing the model response.")
if stream_error_happened:
log.warning(
"An error occured during the stream, cannot do post-processing."
)
# All the needed metadata is always in the last chunk, so if error happened then we cannot do anything.
return
if not model_response:
log.warning("model_response is empty, cannot do post-processing.")
return
if not (candidate := self._get_first_candidate(model_response.candidates)):
log.warning(
"Response does not contain any canditates. Cannot do post-processing."
)
return
finish_reason = candidate.finish_reason
if finish_reason not in (
types.FinishReason.STOP,
types.FinishReason.MAX_TOKENS,
):
# MAX_TOKENS is often acceptable, but others might indicate issues.
error_msg = f"Stream finished with sus reason:\n\n{finish_reason}."
await self._emit_toast(error_msg, event_emitter, "error")
log.error(error_msg)
return
else:
log.debug(f"Response has correct finish reason: {finish_reason}.")
# TODO: Emit a toast message if url context retrieval was not successful.
# Emit token usage data.
if usage_event := self._get_usage_data_event(model_response):
log.debug("Emitting usage data:", payload=usage_event)
# TODO: catch potential errors?
await event_emitter(usage_event)
self._add_grounding_data_to_state(model_response, metadata, request)
def _add_grounding_data_to_state(
self,
response: types.GenerateContentResponse,
chat_metadata: dict[str, Any],
request: Request,
):
candidate = self._get_first_candidate(response.candidates)
grounding_metadata_obj = candidate.grounding_metadata if candidate else None
chat_id: str = chat_metadata.get("chat_id", "")
message_id: str = chat_metadata.get("message_id", "")
storage_key = f"grounding_{chat_id}_{message_id}"
if grounding_metadata_obj:
log.debug(
f"Found grounding metadata. Storing in in request's app state using key {storage_key}."
)
# Using shared `request.app.state` to pass grounding metadata to Filter.outlet.
# This is necessary because the Pipe finishes during the initial `/api/completion` request,
# while Filter.outlet is invoked by a separate, later `/api/chat/completed` request.
# `request.state` does not persist across these distinct request lifecycles.
app_state: State = request.app.state
app_state._state[storage_key] = grounding_metadata_obj
else:
log.debug(f"Response {message_id} does not have grounding metadata.")
def _remove_citation_markers(self, text: str, sources: list["Source"]) -> str:
original_text = text
processed: set[str] = set()
for source in sources:
supports = [
metadata["supports"]
for metadata in source.get("metadata", [])
if "supports" in metadata
]
supports = [item for sublist in supports for item in sublist]
for support in supports:
support = types.GroundingSupport(**support)
indices = support.grounding_chunk_indices
segment = support.segment
if not (indices and segment):
continue
segment_text = segment.text
if not segment_text:
continue
# Using a shortened version because user could edit the assistant message in the front-end.
# If citation segment get's edited, then the markers would not be removed. Shortening reduces the
# chances of this happening.
segment_end = segment_text[-32:]
if segment_end in processed:
continue
processed.add(segment_end)
citation_markers = "".join(f"[{index + 1}]" for index in indices)
# Find the position of the citation markers in the text
pos = text.find(segment_text + citation_markers)
if pos != -1:
# Remove the citation markers
text = (
text[: pos + len(segment_text)]
+ text[pos + len(segment_text) + len(citation_markers) :]
)
trim = len(original_text) - len(text)
log.debug(
f"Citation removal finished. Returning text str that is {trim} character shorter than the original input."
)
return text
def _get_usage_data_event(
self,
response: types.GenerateContentResponse,
) -> "ChatCompletionEvent | None":
"""
Extracts usage data from a GenerateContentResponse object.
Returns None if any of the core metrics (prompt_tokens, completion_tokens, total_tokens)
cannot be reliably determined.
Args:
response: The GenerateContentResponse object.
Returns:
A dictionary containing the usage data, formatted as a ResponseUsage type,
or None if any core metrics are missing.
"""
if not response.usage_metadata:
log.warning(
"Usage_metadata is missing from the response. Cannot reliably determine usage."
)
return None
usage_data = response.usage_metadata.model_dump()
usage_data["prompt_tokens"] = usage_data.pop("prompt_token_count")
usage_data["completion_tokens"] = usage_data.pop("candidates_token_count")
usage_data["total_tokens"] = usage_data.pop("total_token_count")
# Remove null values and turn ModalityTokenCount into dict.
for k, v in usage_data.copy().items():
if k in ("prompt_tokens", "completion_tokens", "total_tokens"):
continue
if not v:
del usage_data[k]
completion_event: "ChatCompletionEvent" = {
"type": "chat:completion",
"data": {"usage": usage_data},
}
return completion_event
# endregion 1.6 Post-processing
# region 1.7 Event emissions
async def _emit_completion(
self,
event_emitter: Callable[["Event"], Awaitable[None]],
content: str | None = None,
done: bool = False,
error: str | None = None,
sources: list["Source"] | None = None,
):
"""Constructs and emits completion event."""
emission: "ChatCompletionEvent" = {
"type": "chat:completion",
"data": {"done": done},
}
if content:
emission["data"]["content"] = content
if error:
emission["data"]["error"] = {"detail": error}
if sources:
emission["data"]["sources"] = sources
await event_emitter(emission)
async def _emit_status(
self,
message: str,
event_emitter: Callable[["Event"], Awaitable[None]],
done: bool = False,
hidden: bool = False,
) -> None:
"""Emit status updates asynchronously."""
try:
if not self.valves.EMIT_STATUS_UPDATES:
return
status_event: "StatusEvent" = {
"type": "status",
"data": {"description": message, "done": done, "hidden": hidden},
}
await event_emitter(status_event)
log.debug(f"Emitted status:", payload=status_event)
except Exception:
log.exception("Error emitting status")
async def _emit_error(
self,
error_msg: str,
event_emitter: Callable[["Event"], Awaitable[None]],
warning: bool = False,
exception: bool = True,
) -> None:
"""Emits an event to the front-end that causes it to display a nice red error message."""
if warning:
log.opt(depth=1, exception=False).warning(error_msg)
else:
log.opt(depth=1, exception=exception).error(error_msg)
await self._emit_completion(
error=f"\n{error_msg}", event_emitter=event_emitter, done=True
)
async def _emit_toast(
self,
msg: str,
event_emitter: Callable[["Event"], Awaitable[None]],
toastType: Literal["info", "success", "warning", "error"] = "info",
) -> None:
# TODO: Use this method in more places, even for info toasts.
event: NotificationEvent = {
"type": "notification",
"data": {"type": toastType, "content": msg},
}
await event_emitter(event)
# endregion 1.7 Event emissions
# region 1.8 Utility helpers
@staticmethod
def _get_merged_valves(
default_valves: "Pipe.Valves",
user_valves: "Pipe.UserValves | None",
user_email: str,
) -> "Pipe.Valves":
"""
Merges UserValves into a base Valves configuration.
The general rule is that if a field in UserValves is not None, it overrides
the corresponding field in the default_valves. Otherwise, the default_valves
field value is used.
Exceptions:
- If default_valves.USER_MUST_PROVIDE_AUTH_CONFIG is True, then GEMINI_API_KEY and
VERTEX_PROJECT in the merged result will be taken directly from
user_valves (even if they are None), ignoring the values in default_valves.
Args:
default_valves: The base Valves object with default configurations.
user_valves: An optional UserValves object with user-specific overrides.
If None, a copy of default_valves is returned.
Returns:
A new Valves object representing the merged configuration.
"""
if user_valves is None:
# If no user-specific valves are provided, return a copy of the default valves.
return default_valves.model_copy(deep=True)
# Start with the values from the base `Valves`
merged_data = default_valves.model_dump()
# Override with non-None values from `UserValves`
# Iterate over fields defined in the UserValves model
for field_name in Pipe.UserValves.model_fields:
# getattr is safe as field_name comes from model_fields of user_valves' type
user_value = getattr(user_valves, field_name)
if user_value is not None and user_value != "":
# Only update if the field is also part of the main Valves model
# (keys of merged_data are fields of default_valves)
if field_name in merged_data:
merged_data[field_name] = user_value
user_whitelist = (
default_valves.AUTH_WHITELIST.split(",")
if default_valves.AUTH_WHITELIST
else []
)
# Apply special logic based on default_valves.USER_MUST_PROVIDE_AUTH_CONFIG
if (
default_valves.USER_MUST_PROVIDE_AUTH_CONFIG
and user_email not in user_whitelist
):
# If USER_MUST_PROVIDE_AUTH_CONFIG is True and user is not in the whitelist,
# then user must provide their own GEMINI_API_KEY
# User is disallowed from using Vertex AI in this case.
merged_data["GEMINI_API_KEY"] = user_valves.GEMINI_API_KEY
merged_data["VERTEX_PROJECT"] = None
merged_data["USE_VERTEX_AI"] = False
# Create a new Valves instance with the merged data.
# Pydantic will validate the data against the Valves model definition during instantiation.
return Pipe.Valves(**merged_data)
def _is_flat_dict(self, data: Any) -> bool:
"""
Checks if a dictionary contains only non-dict/non-list values (is one level deep).
"""
if not isinstance(data, dict):
return False
return not any(isinstance(value, (dict, list)) for value in data.values())
def _truncate_long_strings(
self, data: Any, max_len: int, truncation_marker: str, truncation_enabled: bool
) -> Any:
"""
Recursively traverses a data structure (dicts, lists) and truncates
long string values. Creates copies to avoid modifying original data.
Args:
data: The data structure (dict, list, str, int, float, bool, None) to process.
max_len: The maximum allowed length for string values.
truncation_marker: The string to append to truncated values.
truncation_enabled: Whether truncation is enabled.
Returns:
A potentially new data structure with long strings truncated.
"""
if not truncation_enabled or max_len <= len(truncation_marker):
# If truncation is disabled or max_len is too small, return original
# Make a copy only if it's a mutable type we might otherwise modify
if isinstance(data, (dict, list)):
return copy.deepcopy(data) # Ensure deep copy for nested structures
return data # Primitives are immutable
if isinstance(data, str):
if len(data) > max_len:
return data[: max_len - len(truncation_marker)] + truncation_marker
return data # Return original string if not truncated
elif isinstance(data, dict):
# Process dictionary items, creating a new dict
return {
k: self._truncate_long_strings(
v, max_len, truncation_marker, truncation_enabled
)
for k, v in data.items()
}
elif isinstance(data, list):
# Process list items, creating a new list
return [
self._truncate_long_strings(
item, max_len, truncation_marker, truncation_enabled
)
for item in data
]
else:
# Return non-string, non-container types as is (they are immutable)
return data
def plugin_stdout_format(self, record: "Record") -> str:
"""
Custom format function for the plugin's logs.
Serializes and truncates data passed under the 'payload' key in extra.
"""
# Configuration Keys
LOG_OPTIONS_PREFIX = "_log_"
TRUNCATION_ENABLED_KEY = f"{LOG_OPTIONS_PREFIX}truncation_enabled"
MAX_LENGTH_KEY = f"{LOG_OPTIONS_PREFIX}max_length"
TRUNCATION_MARKER_KEY = f"{LOG_OPTIONS_PREFIX}truncation_marker"
DATA_KEY = "payload"
original_extra = record["extra"]
# Extract the data intended for serialization using the chosen key
data_to_process = original_extra.get(DATA_KEY)
serialized_data_json = ""
if data_to_process is not None:
try:
serializable_data = pydantic_core.to_jsonable_python(
data_to_process, serialize_unknown=True
)
# Determine truncation settings
truncation_enabled = original_extra.get(TRUNCATION_ENABLED_KEY, True)
max_length = original_extra.get(MAX_LENGTH_KEY, 256)
truncation_marker = original_extra.get(TRUNCATION_MARKER_KEY, "[...]")
# If max_length was explicitly provided, force truncation enabled
if MAX_LENGTH_KEY in original_extra:
truncation_enabled = True
# Truncate long strings
truncated_data = self._truncate_long_strings(
serializable_data,
max_length,
truncation_marker,
truncation_enabled,
)
# Serialize the (potentially truncated) data
if self._is_flat_dict(truncated_data) and not isinstance(
truncated_data, list
):
json_string = json.dumps(
truncated_data, separators=(",", ":"), default=str
)
# Add a simple prefix if it's compact
serialized_data_json = " - " + json_string
else:
json_string = json.dumps(truncated_data, indent=2, default=str)
# Prepend with newline for readability
serialized_data_json = "\n" + json_string
except (TypeError, ValueError) as e: # Catch specific serialization errors
serialized_data_json = f" - {{Serialization Error: {e}}}"
except (
Exception
) as e: # Catch any other unexpected errors during processing
serialized_data_json = f" - {{Processing Error: {e}}}"
# Add the final JSON string (or error message) back into the record
record["extra"]["_plugin_serialized_data"] = serialized_data_json
# Base template
base_template = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
# Append the serialized data
base_template += "{extra[_plugin_serialized_data]}"
# Append the exception part
base_template += "\n{exception}"
# Return the format string template
return base_template.rstrip()
@cache
def _add_log_handler(self, log_level: str):
"""
Adds or updates the loguru handler specifically for this plugin.
Includes logic for serializing and truncating extra data.
The handler is added only if the log_level has changed since the last call.
"""
def plugin_filter(record: "Record"):
"""Filter function to only allow logs from this plugin (based on module name)."""
return record["name"] == __name__
# Get the desired level name and number
desired_level_name = log_level
try:
# Use the public API to get level details
desired_level_info = log.level(desired_level_name)
desired_level_no = desired_level_info.no
except ValueError:
log.error(
f"Invalid LOG_LEVEL '{desired_level_name}' configured for plugin {__name__}. Cannot add/update handler."
)
return # Stop processing if the level is invalid
# Access the internal state of the log
handlers: dict[int, "Handler"] = log._core.handlers # type: ignore
handler_id_to_remove = None
found_correct_handler = False
for handler_id, handler in handlers.items():
existing_filter = handler._filter # Access internal attribute
# Check if the filter matches our plugin_filter
# Comparing function objects directly can be fragile if they are recreated.
# Comparing by name and module is more robust for functions defined at module level.
is_our_filter = (
existing_filter is not None # Make sure a filter is set
and hasattr(existing_filter, "__name__")
and existing_filter.__name__ == plugin_filter.__name__
and hasattr(existing_filter, "__module__")
and existing_filter.__module__ == plugin_filter.__module__
)
if is_our_filter:
existing_level_no = handler.levelno
log.trace(
f"Found existing handler {handler_id} for {__name__} with level number {existing_level_no}."
)
# Check if the level matches the desired level
if existing_level_no == desired_level_no:
log.debug(
f"Handler {handler_id} for {__name__} already exists with the correct level '{desired_level_name}'."
)
found_correct_handler = True
break # Found the correct handler, no action needed
else:
# Found our handler, but the level is wrong. Mark for removal.
log.info(
f"Handler {handler_id} for {__name__} found, but log level differs "
f"(existing: {existing_level_no}, desired: {desired_level_no}). "
f"Removing it to update."
)
handler_id_to_remove = handler_id
break # Found the handler to replace, stop searching
# Remove the old handler if marked for removal
if handler_id_to_remove is not None:
try:
log.remove(handler_id_to_remove)
log.debug(f"Removed handler {handler_id_to_remove} for {__name__}.")
except ValueError:
# This might happen if the handler was somehow removed between the check and now
log.warning(
f"Could not remove handler {handler_id_to_remove} for {__name__}. It might have already been removed."
)
# If removal failed but we intended to remove, we should still proceed to add
# unless found_correct_handler is somehow True (which it shouldn't be if handler_id_to_remove was set).
# Add a new handler if no correct one was found OR if we just removed an incorrect one
if not found_correct_handler:
log.add(
sys.stdout,
level=desired_level_name,
format=self.plugin_stdout_format,
filter=plugin_filter,
)
log.debug(
f"Added new handler to loguru for {__name__} with level {desired_level_name}."
)
def _get_mime_type(self, file_uri: str) -> str:
"""
Determines MIME type based on file extension using the mimetypes module.
"""
mime_type, encoding = mimetypes.guess_type(file_uri)
if mime_type is None:
return "application/octet-stream" # Default MIME type if unknown
return mime_type
def _get_first_candidate(
self, candidates: list[types.Candidate] | None
) -> types.Candidate | None:
"""Selects the first candidate, logging a warning if multiple exist."""
if not candidates:
# Logging warnings is handled downstream.
return None
if len(candidates) > 1:
log.warning("Multiple candidates found, defaulting to first candidate.")
return candidates[0]
def _get_file_data(self, file_id: str) -> tuple[bytes | None, str | None]:
if not file_id:
# TODO: Emit toast
log.warning(f"file_id is empty. Cannot continue.")
return None, None
file_model = Files.get_file_by_id(file_id)
if file_model is None:
# TODO: Emit toast
log.warning(f"File {file_id} not found in the backend's database.")
return None, None
if not (file_path := file_model.path):
# TODO: Emit toast
log.warning(
f"File {file_id} was found in the database but it lacks `path` field. Cannot Continue."
)
return None, None
if file_model.meta is None:
# TODO: Emit toast
log.warning(
f"File {file_path} was found in the database but it lacks `meta` field. Cannot continue."
)
return None, None
if not (content_type := file_model.meta.get("content_type")):
# TODO: Emit toast
log.warning(
f"File {file_path} was found in the database but it lacks `meta.content_type` field. Cannot continue."
)
return None, None
try:
with open(file_path, "rb") as file:
image_data = file.read()
return image_data, content_type
except FileNotFoundError:
# TODO: Emit toast
log.exception(f"File {file_path} not found on disk.")
return None, content_type
except Exception:
# TODO: Emit toast
log.exception(f"Error processing file {file_path}")
return None, content_type
# endregion 1.8 Utility helpers
# endregion 1. Helper methods inside the Pipe class |
Beta Was this translation helpful? Give feedback.
0 replies
-
|
My current implementation is quite basic, characterized by the following:
|
Beta Was this translation helpful? Give feedback.
0 replies
-
|
That would be a very useful feature. I'll take a look at this solution when I finish with other things. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment

Uh oh!
There was an error while loading. Please reload this page.
-
Gemini models natively support function calling, referring to their API documentation.
I've successfully tested Gemini 2.5 Flash Preview to invoke Open WebUI Tools in the default function calling mode (
Advanced Params -> Function Callingset toDefault), and it works acceptably. When I set it toNative, it went wrong.Native function calling offers superior capabilities, enabling the model to dynamically leverage tools throughout its generation process, which is a significant advantage.
Beta Was this translation helpful? Give feedback.
All reactions