Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ TELEGRAM_API_ID=your_telegram_api_id_here
TELEGRAM_API_HASH=your_telegram_api_hash_here
TELEGRAM_USE_LOCAL_API=true
TELEGRAM_LOCAL_API_URL=http://localhost:8081

# Image Processing Configuration
MAX_IMAGE_RESOLUTION_VISION=1024
MAX_IMAGE_RESOLUTION_EDIT=4096
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ __pycache__/
.DS_Store
data/
.vscode/

# Temporary media directories
temp_photos/
temp_docs/
temp_audio/
83 changes: 72 additions & 11 deletions app/agents/image_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
from google import genai
from google.genai import types
import PIL.Image
import PIL.ImageOps
from image_utils import JPEG_FORMATS
load_dotenv()

openai_api_key = os.getenv("OPENAI_API_KEY")
openai_client = OpenAI(api_key=openai_api_key)
google_api_key = os.getenv("GOOGLE_API_KEY")
genai_client = genai.Client(api_key=google_api_key)
max_image_resolution_edit = int(os.getenv("MAX_IMAGE_RESOLUTION_EDIT", "4096") or "4096")

class TextPart:
def __init__(self, text):
Expand Down Expand Up @@ -273,6 +276,43 @@ def _resolve_image_path(self, image_path: str) -> Path:
return relative_to_images
return p # Return original so the caller can report the correct path

def _prepare_image_for_edit(self, image_path: Path, temp_paths: List[Path]) -> Path:
"""Create a temporary JPEG for resized or non-JPEG edit inputs."""
try:
with PIL.Image.open(image_path) as img:
image_format = (img.format or "").upper()
needs_resizing = max_image_resolution_edit > 0 and max(img.size) > max_image_resolution_edit
needs_conversion = image_format not in JPEG_FORMATS

if needs_resizing or needs_conversion:
img = PIL.ImageOps.exif_transpose(img)

if needs_resizing:
img.thumbnail(
(max_image_resolution_edit, max_image_resolution_edit),
getattr(PIL.Image, "Resampling", PIL.Image).LANCZOS
)

if img.mode != "RGB":
img = img.convert("RGB")

temp_path = self.images_path / f"edit_input_{uuid.uuid4()}.jpg"
temp_paths.append(temp_path)
img.save(temp_path, format="JPEG", quality=90, optimize=True)
return temp_path
except Exception as e:
print(f"Failed to prepare image for edit, using original file: {e}")

return image_path

def _cleanup_temp_images(self, temp_paths: List[Path]) -> None:
for temp_path in temp_paths:
try:
if temp_path.exists():
temp_path.unlink()
except Exception as e:
print(f"Failed to remove temporary image {temp_path}: {e}")

def _resolve_gpt_size(self, resolution: str, aspect_ratio: str) -> str:
"""Translate resolution + aspect_ratio into a GPT Image 2 pixel size string.

Expand Down Expand Up @@ -418,6 +458,8 @@ async def _generate_multimodal_image_and_text(self, prompt: str, style: str = "3
async def _image_editing(self, prompt: str, image_path: str, model: str = "Normal", aspect_ratio: str = "16:9", resolution: str = "2K", gpt_quality: str = "auto", variants: int = 1, caption: str = "Here is your edited image") -> str:
"""Edit an existing image using Gemini or GPT Image 2"""
print(f"Editing image - Prompt: {prompt}, Image: {image_path}, Model: {model}, Aspect Ratio: {aspect_ratio}, Resolution: {resolution}, GPT Quality: {gpt_quality}, Variants: {variants}")
temp_paths: List[Path] = []
source_image = None
try:
image_path_obj = self._resolve_image_path(image_path)
if not image_path_obj.exists():
Expand All @@ -433,8 +475,9 @@ async def _image_editing(self, prompt: str, image_path: str, model: str = "Norma
)
all_results.append(r)
return "\n".join(all_results)

source_image = PIL.Image.open(image_path_obj)

source_image_path = self._prepare_image_for_edit(image_path_obj, temp_paths)
source_image = PIL.Image.open(source_image_path)

model_name = "gemini-3-pro-image-preview" if model.lower() == "pro" else "gemini-3.1-flash-image-preview"

Expand Down Expand Up @@ -465,7 +508,7 @@ async def _image_editing(self, prompt: str, image_path: str, model: str = "Norma
)

contents = response.candidates[0].content.parts

for content in contents:
if 'text' in content.model_fields_set:
try:
Expand All @@ -490,24 +533,32 @@ async def _image_editing(self, prompt: str, image_path: str, model: str = "Norma
document=file,
caption=variant_caption
)

all_saved_paths.append(str(transformed_image_path))

if all_saved_paths:
paths_str = "\n".join(f" - {p}" for p in all_saved_paths)
result_message += f"Edited {len(all_saved_paths)} image(s) and sent to user.\nSaved to:\n{paths_str}\nImage transformation completed successfully.\n"
else:
result_message += "Warning: No transformed image was generated. The model only provided text response.\n"

return result_message
except Exception as e:
return f"Error editing image: {str(e)}"
finally:
if source_image:
source_image.close()
self._cleanup_temp_images(temp_paths)

async def _gpt_image_edit(self, prompt: str, image_paths: List[Path], size: str = "2048x1152", quality: str = "auto", caption: str = "Here is your edited image") -> str:
"""Edit one or more images using OpenAI GPT Image 2"""
print(f"Editing image(s) with GPT Image 2 - Prompt: {prompt}, Images: {image_paths}, Size: {size}, Quality: {quality}")
temp_paths: List[Path] = []
try:
image_files = [open(str(p), "rb") for p in image_paths]
image_files = [
open(str(self._prepare_image_for_edit(p, temp_paths)), "rb")
for p in image_paths
]

kwargs = {
"model": "gpt-image-2-2026-04-21",
Expand All @@ -526,6 +577,7 @@ async def _gpt_image_edit(self, prompt: str, image_paths: List[Path], size: str
finally:
for f in image_files:
f.close()
self._cleanup_temp_images(temp_paths)

image_base64 = result.data[0].b64_json
image_bytes = base64.b64decode(image_base64)
Expand Down Expand Up @@ -658,6 +710,8 @@ async def _gpt_image_generate(self, prompt: str, size: str = "2048x1152", qualit
async def _image_composition(self, prompt: str, image_paths: List[str], model: str = "Normal", aspect_ratio: str = "16:9", resolution: str = "2K", gpt_quality: str = "auto", variants: int = 1, caption: str = "Here is your composed image") -> str:
"""Compose a new image from multiple input images using Gemini or GPT Image 2"""
print(f"Composing image - Prompt: {prompt}, Images: {image_paths}, Model: {model}, Aspect Ratio: {aspect_ratio}, Resolution: {resolution}, GPT Quality: {gpt_quality}, Variants: {variants}")
temp_paths: List[Path] = []
images: List[PIL.Image.Image] = []
try:
resolved_paths = []
for image_path in image_paths:
Expand All @@ -683,7 +737,11 @@ async def _image_composition(self, prompt: str, image_paths: List[str], model: s
if len(resolved_paths) > 3:
return "Error: Maximum 3 images are supported for composition with Normal/Pro mode. Use GPT mode for more images."

images = [PIL.Image.open(p) for p in resolved_paths]
prepared_paths = [
self._prepare_image_for_edit(p, temp_paths)
for p in resolved_paths
]
images = [PIL.Image.open(p) for p in prepared_paths]

input_contents = []
for image in images:
Expand Down Expand Up @@ -716,7 +774,7 @@ async def _image_composition(self, prompt: str, image_paths: List[str], model: s
)

response_parts = response.candidates[0].content.parts

for part in response_parts:
if 'text' in part.model_fields_set and part.text:
try:
Expand Down Expand Up @@ -753,8 +811,11 @@ async def _image_composition(self, prompt: str, image_paths: List[str], model: s
result_message += f"Composed {len(all_saved_paths)} image(s) and sent to user.\nSaved to:\n{paths_str}\nImage composition completed successfully.\n"
else:
result_message += "Warning: No composed image was generated. The model only provided text response.\n"

return result_message
except Exception as e:
return f"Error composing image: {str(e)}"

finally:
for image in images:
image.close()
self._cleanup_temp_images(temp_paths)
37 changes: 37 additions & 0 deletions app/image_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pathlib import Path

from PIL import Image, ImageOps
from pillow_heif import register_heif_opener


register_heif_opener()

IMAGE_EXTENSIONS = (".jpg", ".jpeg", ".heic", ".heif")
JPEG_FORMATS = {"JPEG", "JPG"}


def is_jpeg_image(image_path: str | Path) -> bool:
with Image.open(image_path) as img:
return (img.format or "").upper() in JPEG_FORMATS


def save_image_as_jpeg(
source_path: str | Path,
target_path: str | Path,
*,
max_resolution: int = 0,
quality: int = 90,
) -> None:
with Image.open(source_path) as img:
img = ImageOps.exif_transpose(img)

if max_resolution > 0 and max(img.size) > max_resolution:
img.thumbnail(
(max_resolution, max_resolution),
getattr(Image, "Resampling", Image).LANCZOS,
)

if img.mode != "RGB":
img = img.convert("RGB")

img.save(target_path, format="JPEG", quality=quality, optimize=True)
Loading