-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmigrations.py
More file actions
134 lines (116 loc) · 5.1 KB
/
migrations.py
File metadata and controls
134 lines (116 loc) · 5.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from sqlalchemy import text, inspect, MetaData, Table, Column
from sqlalchemy.ext.asyncio import AsyncSession
import logging
from models import Base, User, ChatMessage
from config import ADMIN_USER_ID
import asyncio
logger = logging.getLogger(__name__)
async def get_table_columns(session: AsyncSession, table_name: str) -> list:
"""Get list of existing columns in the database table"""
try:
result = await session.execute(text(f"""
SELECT column_name
FROM information_schema.columns
WHERE table_name = '{table_name}'
"""))
return [row[0] for row in result]
except Exception as e:
logger.error(f"Error getting columns for table {table_name}: {e}")
return []
def get_model_columns(table_name: str) -> list:
"""Get list of columns from the model"""
if table_name == 'users':
return [c.name for c in User.__table__.columns]
elif table_name == 'chat_messages':
return [c.name for c in ChatMessage.__table__.columns]
return []
async def notify_admin(message: str):
"""Send notification to admin"""
try:
# Import bot locally to avoid circular import
import importlib
bot_module = importlib.import_module('bot')
bot = bot_module.bot
# Ensure we're in an async context
if asyncio.get_event_loop().is_running():
await bot.send_message(ADMIN_USER_ID, message)
else:
logger.warning("Cannot send admin notification: not in async context")
except Exception as e:
logger.error(f"Failed to send admin notification: {e}")
async def migrate_database(session: AsyncSession):
"""Apply database migrations"""
try:
# Create tables if they don't exist
await session.execute(text("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
user_id BIGINT UNIQUE NOT NULL,
username VARCHAR,
first_name VARCHAR,
last_name VARCHAR,
is_premium BOOLEAN DEFAULT FALSE,
premium_until TIMESTAMP,
requests_today INTEGER DEFAULT 0,
last_request_date DATE
)
"""))
await session.execute(text("""
CREATE TABLE IF NOT EXISTS chat_messages (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
role VARCHAR,
content TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Check if user_id column needs to be migrated from INTEGER to BIGINT
result = await session.execute(text("""
SELECT data_type
FROM information_schema.columns
WHERE table_name = 'users' AND column_name = 'user_id'
"""))
current_type = result.scalar_one_or_none()
if current_type == 'integer':
logger.info("Migrating user_id column from INTEGER to BIGINT...")
# Change column type to BIGINT
await session.execute(text("""
ALTER TABLE users ALTER COLUMN user_id TYPE BIGINT
"""))
logger.info("Successfully migrated user_id column to BIGINT")
elif current_type == 'bigint':
logger.info("user_id column is already BIGINT type")
else:
logger.warning(f"Unexpected user_id column type: {current_type}")
# Process users table
db_columns = await get_table_columns(session, 'users')
model_columns = get_model_columns('users')
missing_columns = [col for col in model_columns if col not in db_columns]
for column in missing_columns:
column_type = next(c.type for c in User.__table__.columns if c.name == column)
await session.execute(text(f"""
ALTER TABLE users ADD COLUMN IF NOT EXISTS {column} {column_type}
"""))
# Process chat_messages table
db_columns = await get_table_columns(session, 'chat_messages')
model_columns = get_model_columns('chat_messages')
missing_columns = [col for col in model_columns if col not in db_columns]
for column in missing_columns:
column_type = next(c.type for c in ChatMessage.__table__.columns if c.name == column)
await session.execute(text(f"""
ALTER TABLE chat_messages ADD COLUMN IF NOT EXISTS {column} {column_type}
"""))
await session.commit()
logger.info("Database migrations applied successfully")
# Notify admin about successful migration
await notify_admin(
"✅ Database migrations completed successfully!\n"
"All tables and columns are up to date."
)
return True
except Exception as e:
await session.rollback()
logger.error(f"Error applying migrations: {e}")
# Notify admin about migration failure
await notify_admin(f"❌ Database migration failed!\nError: {str(e)}")
return False