-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequest_handler.py
More file actions
95 lines (80 loc) · 2.63 KB
/
Copy pathrequest_handler.py
File metadata and controls
95 lines (80 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# request_handler.py
# Usage: Centralized request handling for Zoom API interactions
import requests
import access_request
DEFAULT_TIMEOUT = 10
def _build_headers(extra_headers=None):
access_token = access_request.access_token
if not access_token:
raise RuntimeError("Missing access token. Authorize the application first.")
headers = {"Authorization": f"Bearer {access_token}"}
if extra_headers:
headers.update(extra_headers)
return headers
def _refresh_tokens():
tokens = access_request.refresh_access_token(
access_request.client_id,
access_request.client_secret,
access_request.refresh_token,
)
access_request.access_token = tokens["access_token"]
access_request.refresh_token = tokens["refresh_token"]
return tokens
def make_request(
resource="user_info",
*,
method=None,
endpoint=None,
params=None,
data=None,
json=None,
headers=None,
timeout=DEFAULT_TIMEOUT,
):
if resource == "user_info":
method = method or "GET"
endpoint = endpoint or "https://api.zoom.us/v2/users/me"
elif not (method and endpoint):
raise ValueError("Custom requests require both method and endpoint.")
method = method.upper()
request_headers = _build_headers(headers)
try:
response = requests.request(
method,
endpoint,
params=params,
data=data,
json=json,
headers=request_headers,
timeout=timeout,
)
except requests.RequestException as exc:
raise requests.RequestException(f"Network error during Zoom API request: {exc}") from exc
if response.status_code == 401:
if not access_request.refresh_token:
raise requests.HTTPError(
f"Unauthorized request: {response.text}",
response=response,
)
_refresh_tokens()
request_headers = _build_headers(headers)
try:
response = requests.request(
method,
endpoint,
params=params,
data=data,
json=json,
headers=request_headers,
timeout=timeout,
)
except requests.RequestException as exc:
raise requests.RequestException(f"Network error during Zoom API retry: {exc}") from exc
if not response.ok:
raise requests.HTTPError(
f"Zoom API request failed: {response.status_code} - {response.text}",
response=response,
)
return response.json()
def get_user_info():
return make_request("user_info")