diff --git a/examples/cronjob_crud.py b/examples/cronjob_crud.py index 9cbb91d814..1fc4d95081 100644 --- a/examples/cronjob_crud.py +++ b/examples/cronjob_crud.py @@ -1,5 +1,4 @@ #!/usr/bin/python3 -# -*- coding:utf-8 -*- import json import time diff --git a/examples/duration-gep2257.py b/examples/duration-gep2257.py index a7bda1bdb2..f38e06b489 100644 --- a/examples/duration-gep2257.py +++ b/examples/duration-gep2257.py @@ -1,5 +1,4 @@ #!/usr/bin/python3 -# -*- coding:utf-8 -*- """ This example uses kubernetes.utils.duration to parse and display diff --git a/examples/pod_portforward.py b/examples/pod_portforward.py index 13672f181f..d888dec367 100644 --- a/examples/pod_portforward.py +++ b/examples/pod_portforward.py @@ -19,8 +19,7 @@ import select import socket import time - -import six.moves.urllib.request as urllib_request +from urllib import request as urllib_request from kubernetes import config from kubernetes.client import Configuration diff --git a/kubernetes/base/config/__init__.py b/kubernetes/base/config/__init__.py index 3f49ce0e91..5988b55cc1 100644 --- a/kubernetes/base/config/__init__.py +++ b/kubernetes/base/config/__init__.py @@ -16,9 +16,13 @@ from .config_exception import ConfigException from .incluster_config import load_incluster_config -from .kube_config import (KUBE_CONFIG_DEFAULT_LOCATION, - list_kube_config_contexts, load_kube_config, - load_kube_config_from_dict, new_client_from_config, new_client_from_config_dict) +from .kube_config import ( + KUBE_CONFIG_DEFAULT_LOCATION, + list_kube_config_contexts, + load_kube_config, + load_kube_config_from_dict, + new_client_from_config, + new_client_from_config_dict) def load_config(**kwargs): diff --git a/kubernetes/base/config/dateutil.py b/kubernetes/base/config/dateutil.py index 02e94bda2d..51b2cf32d5 100644 --- a/kubernetes/base/config/dateutil.py +++ b/kubernetes/base/config/dateutil.py @@ -52,24 +52,24 @@ def parse_rfc3339(s): if not s.tzinfo: return s.replace(tzinfo=UTC) return s - + m = _re_rfc3339.fullmatch(s.strip()) if m is None: raise ValueError( f"Invalid RFC3339 datetime: {s!r} " "(expected YYYY-MM-DDTHH:MM:SS[.frac][Z|±HH:MM])" ) - + groups = m.groups() dt = [0] * 7 for x in range(6): dt[x] = int(groups[x]) - + us = 0 if groups[6] is not None: partial_sec = float(groups[6].replace(",", ".")) us = int(MICROSEC_PER_SEC * partial_sec) - + tz = UTC if groups[7] is not None and groups[7] not in ('Z', 'z', ' '): tz_match = _re_timezone.search(groups[7]) @@ -87,7 +87,7 @@ def parse_rfc3339(s): if tz_groups[2]: minute = int(tz_groups[2]) tz = TimezoneInfo(hour, minute) - + try: return datetime.datetime( year=dt[0], month=dt[1], day=dt[2], @@ -99,7 +99,6 @@ def parse_rfc3339(s): ) from e - def format_rfc3339(date_time): if date_time.tzinfo is None: date_time = date_time.replace(tzinfo=UTC) diff --git a/kubernetes/base/config/dateutil_test.py b/kubernetes/base/config/dateutil_test.py index 6f5d8af80c..4eab8fab81 100644 --- a/kubernetes/base/config/dateutil_test.py +++ b/kubernetes/base/config/dateutil_test.py @@ -84,8 +84,6 @@ def test_parse_rfc3339_invalid_formats(self): with self.assertRaises(ValueError): parse_rfc3339(invalid_input) - - def test_parse_rfc3339_with_whitespace(self): """Test that leading/trailing whitespace is handled""" actual = parse_rfc3339(" 2017-07-25T04:44:21Z ") diff --git a/kubernetes/base/config/exec_provider.py b/kubernetes/base/config/exec_provider.py index 95168f7f6e..37d31f8eb9 100644 --- a/kubernetes/base/config/exec_provider.py +++ b/kubernetes/base/config/exec_provider.py @@ -19,7 +19,7 @@ from .config_exception import ConfigException -class ExecProvider(object): +class ExecProvider: """ Implementation of the proposal for out-of-tree client authentication providers as described here -- @@ -58,7 +58,7 @@ def __init__(self, exec_config, cwd, cluster=None): else: self.cluster = None self.cwd = cwd or None - + @property def shell(self): # for windows systems `shell` should be `True` diff --git a/kubernetes/base/config/exec_provider_test.py b/kubernetes/base/config/exec_provider_test.py index 0318627c60..876788a206 100644 --- a/kubernetes/base/config/exec_provider_test.py +++ b/kubernetes/base/config/exec_provider_test.py @@ -175,7 +175,10 @@ def test_with_cluster_info(self, mock): instance = mock.return_value instance.wait.return_value = 0 instance.communicate.return_value = (self.output_ok, '') - ep = ExecProvider(self.input_with_cluster, None, ConfigNode("cluster", {'server': 'name.company.com'})) + ep = ExecProvider( + self.input_with_cluster, None, ConfigNode( + "cluster", { + 'server': 'name.company.com'})) result = ep.run() self.assertTrue(isinstance(result, dict)) self.assertTrue('token' in result) @@ -213,8 +216,10 @@ def test_with_cluster_info_from_exec_extension(self, mock): obj = json.loads(mock.call_args.kwargs["env"]["KUBERNETES_EXEC_INFO"]) self.assertEqual(obj["spec"]["cluster"]["server"], "name.company.com") - self.assertEqual(obj["spec"]["cluster"]["config"]["namespace"], "myproject") + self.assertEqual(obj["spec"]["cluster"]["config"] + ["namespace"], "myproject") self.assertEqual(obj["spec"]["cluster"]["config"]["name"], "mycluster") + if __name__ == '__main__': unittest.main() diff --git a/kubernetes/base/config/incluster_config.py b/kubernetes/base/config/incluster_config.py index 86070df43b..fca6d6a1de 100644 --- a/kubernetes/base/config/incluster_config.py +++ b/kubernetes/base/config/incluster_config.py @@ -34,7 +34,7 @@ def _join_host_port(host, port): return template % (host, port) -class InClusterConfigLoader(object): +class InClusterConfigLoader: def __init__(self, token_filename, cert_filename, diff --git a/kubernetes/base/config/kube_config.py b/kubernetes/base/config/kube_config.py index fc88f7f1fa..4dea1c9628 100644 --- a/kubernetes/base/config/kube_config.py +++ b/kubernetes/base/config/kube_config.py @@ -22,14 +22,12 @@ import platform import subprocess import tempfile -import time from collections import namedtuple import oauthlib.oauth2 import urllib3 import yaml from requests_oauthlib import OAuth2Session -from six import PY3 from kubernetes.client import ApiClient, Configuration from kubernetes.config.exec_provider import ExecProvider @@ -45,7 +43,6 @@ google_auth_available = False - EXPIRY_SKEW_PREVENTION_DELAY = datetime.timedelta(minutes=5) KUBE_CONFIG_DEFAULT_LOCATION = os.environ.get('KUBECONFIG', '~/.kube/config') ENV_KUBECONFIG_PATH_SEPARATOR = ';' if platform.system() == 'Windows' else ':' @@ -62,7 +59,10 @@ def _cleanup_temp_files(): _temp_files = {} -def _create_temp_file_with_content(content, temp_file_path=None, force_recreate=False): +def _create_temp_file_with_content( + content, + temp_file_path=None, + force_recreate=False): if len(_temp_files) == 0: atexit.register(_cleanup_temp_files) # Because we may change context several times, try to remember files we @@ -85,7 +85,7 @@ def _is_expired(expiry): datetime.datetime.now(tz=UTC)) -class FileOrData(object): +class FileOrData: """Utility class to read content of obj[%data_key_name] or file's content of obj[%file_key_name] and represent it as file or data. Note that the data is preferred. The obj[%file_key_name] will be used iff @@ -145,13 +145,15 @@ def _write_file(self, force_rewrite=False): else: content = self._data self._file = _create_temp_file_with_content( - base64.standard_b64decode(content), self._temp_file_path, force_recreate=force_rewrite) + base64.standard_b64decode(content), + self._temp_file_path, + force_recreate=force_rewrite) else: self._file = _create_temp_file_with_content( self._data, self._temp_file_path, force_recreate=force_rewrite) -class CommandTokenSource(object): +class CommandTokenSource: def __init__(self, cmd, args, tokenKey, expiryKey): self._cmd = cmd self._args = args @@ -191,7 +193,7 @@ def token(self): expiry=parse_rfc3339(data['credential']['token_expiry'])) -class KubeConfigLoader(object): +class KubeConfigLoader: def __init__(self, config_dict, active_context=None, get_google_credentials=None, @@ -247,7 +249,7 @@ def _refresh_credentials(): 'config' in self._user['auth-provider'] and 'cmd-path' in self._user['auth-provider']['config']): return _refresh_credentials_with_cmd_path() - + # Make the Google auth block optional. if google_auth_available: credentials, project_id = google.auth.default(scopes=[ @@ -259,7 +261,7 @@ def _refresh_credentials(): return credentials else: return None - + if get_google_credentials: self._get_google_credentials = get_google_credentials else: @@ -316,8 +318,6 @@ def _load_auth_provider_token(self): if provider['name'] == 'oidc': return self._load_oid_token(provider) - - def _load_gcp_token(self, provider): if (('config' not in provider) or ('access-token' not in provider['config']) or @@ -363,14 +363,9 @@ def _load_oid_token(self, provider): # https://tools.ietf.org/html/rfc7515#appendix-C return - if PY3: - jwt_attributes = json.loads( - base64.urlsafe_b64decode(parts[1] + padding).decode('utf-8') - ) - else: - jwt_attributes = json.loads( - base64.b64decode(parts[1] + padding) - ) + jwt_attributes = json.loads( + base64.urlsafe_b64decode(parts[1] + padding).decode('utf-8') + ) expire = jwt_attributes.get('exp') @@ -392,14 +387,9 @@ def _refresh_oidc(self, provider): if 'idp-certificate-authority-data' in provider['config']: ca_cert = tempfile.NamedTemporaryFile(delete=True) - if PY3: - cert = base64.b64decode( - provider['config']['idp-certificate-authority-data'] - ).decode('utf-8') - else: - cert = base64.b64decode( - provider['config']['idp-certificate-authority-data'] + "==" - ) + cert = base64.b64decode( + provider['config']['idp-certificate-authority-data'] + ).decode('utf-8') with open(ca_cert.name, 'w') as fh: fh.write(cert) @@ -454,7 +444,10 @@ def _load_from_exec_plugin(self): return try: base_path = self._get_base_path(self._cluster.path) - status = ExecProvider(self._user['exec'], base_path, self._cluster).run() + status = ExecProvider( + self._user['exec'], + base_path, + self._cluster).run() if 'token' in status: self.token = "Bearer %s" % status['token'] elif 'clientCertificateData' in status: @@ -547,7 +540,13 @@ def _refresh_api_key(client_configuration): self._set_config(client_configuration) client_configuration.refresh_api_key_hook = _refresh_api_key # copy these keys directly from self to configuration object - keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl','tls_server_name'] + keys = [ + 'host', + 'ssl_ca_cert', + 'cert_file', + 'key_file', + 'verify_ssl', + 'tls_server_name'] for key in keys: if key in self.__dict__: setattr(client_configuration, key, getattr(self, key)) @@ -565,7 +564,7 @@ def current_context(self): return self._current_context.value -class ConfigNode(object): +class ConfigNode: """Remembers each config key's path and construct a relevant exception message in case of missing keys. The assumption is all access keys are present in a well-formed kube-config.""" diff --git a/kubernetes/base/config/kube_config_test.py b/kubernetes/base/config/kube_config_test.py index b8063009eb..fd0308dafc 100644 --- a/kubernetes/base/config/kube_config_test.py +++ b/kubernetes/base/config/kube_config_test.py @@ -25,20 +25,27 @@ from unittest import mock import yaml -from six import PY3, next from kubernetes.client import Configuration from .config_exception import ConfigException from .dateutil import UTC, format_rfc3339, parse_rfc3339 -from .kube_config import (ENV_KUBECONFIG_PATH_SEPARATOR, CommandTokenSource, - ConfigNode, FileOrData, KubeConfigLoader, - KubeConfigMerger, _cleanup_temp_files, - _create_temp_file_with_content, - _get_kube_config_loader, - _get_kube_config_loader_for_yaml_file, - list_kube_config_contexts, load_kube_config, - load_kube_config_from_dict, new_client_from_config, new_client_from_config_dict) +from .kube_config import ( + ENV_KUBECONFIG_PATH_SEPARATOR, + CommandTokenSource, + ConfigNode, + FileOrData, + KubeConfigLoader, + KubeConfigMerger, + _cleanup_temp_files, + _create_temp_file_with_content, + _get_kube_config_loader, + _get_kube_config_loader_for_yaml_file, + list_kube_config_contexts, + load_kube_config, + load_kube_config_from_dict, + new_client_from_config, + new_client_from_config_dict) BEARER_TOKEN_FORMAT = "Bearer %s" @@ -89,10 +96,10 @@ def _raise_exception(st): TEST_PASSWORD = "pass" # token for me:pass TEST_BASIC_TOKEN = "Basic bWU6cGFzcw==" -DATETIME_EXPIRY_PAST = datetime.datetime.now(tz=UTC - ).replace(tzinfo=None) - datetime.timedelta(minutes=PAST_EXPIRY_TIMEDELTA) -DATETIME_EXPIRY_FUTURE = datetime.datetime.now(tz=UTC - ).replace(tzinfo=None) + datetime.timedelta(minutes=FUTURE_EXPIRY_TIMEDELTA) +DATETIME_EXPIRY_PAST = datetime.datetime.now(tz=UTC).replace( + tzinfo=None) - datetime.timedelta(minutes=PAST_EXPIRY_TIMEDELTA) +DATETIME_EXPIRY_FUTURE = datetime.datetime.now(tz=UTC).replace( + tzinfo=None) + datetime.timedelta(minutes=FUTURE_EXPIRY_TIMEDELTA) TEST_TOKEN_EXPIRY_PAST = _format_expiry_datetime(DATETIME_EXPIRY_PAST) TEST_SSL_HOST = "https://test-host" @@ -300,7 +307,7 @@ class TestConfigNode(BaseTestCase): ]} def setUp(self): - super(TestConfigNode, self).setUp() + super().setUp() self.node = ConfigNode("test_obj", self.test_obj) def test_normal_map_array_operations(self): @@ -996,7 +1003,8 @@ def test_oidc_with_refresh(self, mock_ApiClient, mock_OAuth2Session): @mock.patch('kubernetes.config.kube_config.OAuth2Session.refresh_token') @mock.patch('kubernetes.config.kube_config.ApiClient.request') - def test_oidc_with_idp_ca_file_refresh(self, mock_ApiClient, mock_OAuth2Session): + def test_oidc_with_idp_ca_file_refresh( + self, mock_ApiClient, mock_OAuth2Session): mock_response = mock.MagicMock() type(mock_response).status = mock.PropertyMock( return_value=200 @@ -1066,7 +1074,6 @@ def test_oidc_fails_if_invalid_padding_length(self): None, ) - def test_user_pass(self): expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN) actual = FakeConfig() @@ -1274,12 +1281,8 @@ def test_list_kube_config_contexts(self): config_file=config_file) self.assertDictEqual(self.TEST_KUBE_CONFIG['contexts'][0], active_context) - if PY3: - self.assertCountEqual(self.TEST_KUBE_CONFIG['contexts'], - contexts) - else: - self.assertItemsEqual(self.TEST_KUBE_CONFIG['contexts'], - contexts) + self.assertCountEqual(self.TEST_KUBE_CONFIG['contexts'], + contexts) def test_new_client_from_config(self): config_file = self._create_temp_file( diff --git a/kubernetes/base/dynamic/client.py b/kubernetes/base/dynamic/client.py index 64163d7b5c..f2963a2a8f 100644 --- a/kubernetes/base/dynamic/client.py +++ b/kubernetes/base/dynamic/client.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import six import json from kubernetes import watch @@ -48,6 +47,7 @@ class VersionNotSupportedError(NotImplementedError): def meta_request(func): """ Handles parsing response structure and translating API Exceptions """ + def inner(self, *args, **kwargs): serialize_response = kwargs.pop('serialize', True) serializer = kwargs.pop('serializer', ResourceInstance) @@ -57,19 +57,15 @@ def inner(self, *args, **kwargs): raise api_exception(e) if serialize_response: try: - if six.PY2: - return serializer(self, json.loads(resp.data)) return serializer(self, json.loads(resp.data.decode('utf8'))) except ValueError: - if six.PY2: - return resp.data return resp.data.decode('utf8') return resp return inner -class DynamicClient(object): +class DynamicClient: """ A kubernetes client that dynamically discovers and interacts with the kubernetes API """ @@ -94,7 +90,9 @@ def version(self): def ensure_namespace(self, resource, namespace, body): namespace = namespace or body.get('metadata', {}).get('namespace') if not namespace: - raise ValueError("Namespace is required for {}.{}".format(resource.group_version, resource.kind)) + raise ValueError( + "Namespace is required for {}.{}".format( + resource.group_version, resource.kind)) return namespace def serialize_body(self, body): @@ -118,19 +116,44 @@ def create(self, resource, body=None, namespace=None, **kwargs): path = resource.path(namespace=namespace) return self.request('post', path, body=body, **kwargs) - def delete(self, resource, name=None, namespace=None, body=None, label_selector=None, field_selector=None, **kwargs): + def delete( + self, + resource, + name=None, + namespace=None, + body=None, + label_selector=None, + field_selector=None, + **kwargs): if not (name or label_selector or field_selector): - raise ValueError("At least one of name|label_selector|field_selector is required") - if resource.namespaced and not (label_selector or field_selector or namespace): - raise ValueError("At least one of namespace|label_selector|field_selector is required") + raise ValueError( + "At least one of name|label_selector|field_selector is required") + if resource.namespaced and not ( + label_selector or field_selector or namespace): + raise ValueError( + "At least one of namespace|label_selector|field_selector is required") path = resource.path(name=name, namespace=namespace) - return self.request('delete', path, body=body, label_selector=label_selector, field_selector=field_selector, **kwargs) - - def replace(self, resource, body=None, name=None, namespace=None, **kwargs): + return self.request( + 'delete', + path, + body=body, + label_selector=label_selector, + field_selector=field_selector, + **kwargs) + + def replace( + self, + resource, + body=None, + name=None, + namespace=None, + **kwargs): body = self.serialize_body(body) name = name or body.get('metadata', {}).get('name') if not name: - raise ValueError("name is required to replace {}.{}".format(resource.group_version, resource.kind)) + raise ValueError( + "name is required to replace {}.{}".format( + resource.group_version, resource.kind)) if resource.namespaced: namespace = self.ensure_namespace(resource, namespace, body) path = resource.path(name=name, namespace=namespace) @@ -140,20 +163,37 @@ def patch(self, resource, body=None, name=None, namespace=None, **kwargs): body = self.serialize_body(body) name = name or body.get('metadata', {}).get('name') if not name: - raise ValueError("name is required to patch {}.{}".format(resource.group_version, resource.kind)) + raise ValueError( + "name is required to patch {}.{}".format( + resource.group_version, resource.kind)) if resource.namespaced: namespace = self.ensure_namespace(resource, namespace, body) - content_type = kwargs.pop('content_type', 'application/strategic-merge-patch+json') + content_type = kwargs.pop('content_type', + 'application/strategic-merge-patch+json') path = resource.path(name=name, namespace=namespace) - return self.request('patch', path, body=body, content_type=content_type, **kwargs) - - def server_side_apply(self, resource, body=None, name=None, namespace=None, force_conflicts=None, **kwargs): + return self.request( + 'patch', + path, + body=body, + content_type=content_type, + **kwargs) + + def server_side_apply( + self, + resource, + body=None, + name=None, + namespace=None, + force_conflicts=None, + **kwargs): body = self.serialize_body(body) name = name or body.get('metadata', {}).get('name') if not name: - raise ValueError("name is required to patch {}.{}".format(resource.group_version, resource.kind)) + raise ValueError( + "name is required to patch {}.{}".format( + resource.group_version, resource.kind)) if resource.namespaced: namespace = self.ensure_namespace(resource, namespace, body) @@ -161,9 +201,24 @@ def server_side_apply(self, resource, body=None, name=None, namespace=None, forc kwargs.update({'content_type': 'application/apply-patch+yaml'}) path = resource.path(name=name, namespace=namespace) - return self.request('patch', path, body=body, force_conflicts=force_conflicts, **kwargs) - - def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None, watcher=None, allow_watch_bookmarks=None): + return self.request( + 'patch', + path, + body=body, + force_conflicts=force_conflicts, + **kwargs) + + def watch( + self, + resource, + namespace=None, + name=None, + label_selector=None, + field_selector=None, + resource_version=None, + timeout=None, + watcher=None, + allow_watch_bookmarks=None): """ Stream events for a resource from the Kubernetes API @@ -194,9 +249,11 @@ def watch(self, resource, namespace=None, name=None, label_selector=None, field_ # If you want to gracefully stop the stream watcher watcher.stop() """ - if not watcher: watcher = watch.Watch() + if not watcher: + watcher = watch.Watch() - # Use field selector to query for named instance so the watch parameter is handled properly. + # Use field selector to query for named instance so the watch parameter + # is handled properly. if name: field_selector = f"metadata.name={name}" @@ -225,7 +282,9 @@ def request(self, method, path, body=None, **params): if params.get('_continue') is not None: query_params.append(('continue', params['_continue'])) if params.get('include_uninitialized') is not None: - query_params.append(('includeUninitialized', params['include_uninitialized'])) + query_params.append( + ('includeUninitialized', + params['include_uninitialized'])) if params.get('field_selector') is not None: query_params.append(('fieldSelector', params['field_selector'])) if params.get('label_selector') is not None: @@ -233,17 +292,21 @@ def request(self, method, path, body=None, **params): if params.get('limit') is not None: query_params.append(('limit', params['limit'])) if params.get('resource_version') is not None: - query_params.append(('resourceVersion', params['resource_version'])) + query_params.append( + ('resourceVersion', params['resource_version'])) if params.get('timeout_seconds') is not None: query_params.append(('timeoutSeconds', params['timeout_seconds'])) if params.get('watch') is not None: query_params.append(('watch', params['watch'])) if params.get('grace_period_seconds') is not None: - query_params.append(('gracePeriodSeconds', params['grace_period_seconds'])) + query_params.append( + ('gracePeriodSeconds', params['grace_period_seconds'])) if params.get('propagation_policy') is not None: - query_params.append(('propagationPolicy', params['propagation_policy'])) + query_params.append( + ('propagationPolicy', params['propagation_policy'])) if params.get('orphan_dependents') is not None: - query_params.append(('orphanDependents', params['orphan_dependents'])) + query_params.append( + ('orphanDependents', params['orphan_dependents'])) if params.get('dry_run') is not None: query_params.append(('dryRun', params['dry_run'])) if params.get('field_manager') is not None: @@ -251,15 +314,18 @@ def request(self, method, path, body=None, **params): if params.get('force_conflicts') is not None: query_params.append(('force', params['force_conflicts'])) if params.get('allow_watch_bookmarks') is not None: - query_params.append(('allowWatchBookmarks', params['allow_watch_bookmarks'])) + query_params.append( + ('allowWatchBookmarks', params['allow_watch_bookmarks'])) header_params = params.get('header_params', {}) form_params = [] local_var_files = {} # Checking Accept header. - new_header_params = dict((key.lower(), value) for key, value in header_params.items()) - if not 'accept' in new_header_params: + new_header_params = { + key.lower(): value for key, + value in header_params.items()} + if 'accept' not in new_header_params: header_params['Accept'] = self.client.select_header_accept([ 'application/json', 'application/yaml', @@ -269,7 +335,8 @@ def request(self, method, path, body=None, **params): if params.get('content_type'): header_params['Content-Type'] = params['content_type'] else: - header_params['Content-Type'] = self.client.select_header_content_type(['*/*']) + header_params['Content-Type'] = self.client.select_header_content_type([ + '*/*']) # Authentication setting auth_settings = ['BearerToken'] @@ -320,8 +387,11 @@ def validate(self, definition, version=None, strict=False): except kubernetes_validate.utils.ValidationError as e: errors.append("resource definition validation error at %s: %s" % ('.'.join([str(item) for item in e.path]), e.message)) # noqa: B306 except VersionNotSupportedError: - errors.append("Kubernetes version %s is not supported by kubernetes-validate" % version) + errors.append( + "Kubernetes version %s is not supported by kubernetes-validate" % + version) except kubernetes_validate.utils.SchemaNotFoundError as e: - warnings.append("Could not find schema for object kind %s with API version %s in Kubernetes version %s (possibly Custom Resource?)" % - (e.kind, e.api_version, e.version)) + warnings.append( + "Could not find schema for object kind %s with API version %s in Kubernetes version %s (possibly Custom Resource?)" % + (e.kind, e.api_version, e.version)) return warnings, errors diff --git a/kubernetes/base/dynamic/discovery.py b/kubernetes/base/dynamic/discovery.py index 3dd28af268..124b4ac7df 100644 --- a/kubernetes/base/dynamic/discovery.py +++ b/kubernetes/base/dynamic/discovery.py @@ -13,7 +13,6 @@ # limitations under the License. import os -import six import json import logging import hashlib @@ -33,7 +32,7 @@ DISCOVERY_PREFIX = 'apis' -class Discoverer(object): +class Discoverer: """ A convenient container for storing discovered API resources. Allows easy searching and retrieval of specific resources. @@ -43,15 +42,16 @@ class Discoverer(object): def __init__(self, client, cache_file): self.client = client - default_cache_id = self.client.configuration.host - if six.PY3: - default_cache_id = default_cache_id.encode('utf-8') + default_cache_id = self.client.configuration.host.encode('utf-8') try: - default_cachefile_name = 'osrcp-{0}.json'.format(hashlib.md5(default_cache_id, usedforsecurity=False).hexdigest()) + default_cachefile_name = 'osrcp-{0}.json'.format( + hashlib.md5(default_cache_id, usedforsecurity=False).hexdigest()) except TypeError: # usedforsecurity is only supported in 3.9+ - default_cachefile_name = 'osrcp-{0}.json'.format(hashlib.md5(default_cache_id).hexdigest()) - self.__cache_file = cache_file or os.path.join(tempfile.gettempdir(), default_cachefile_name) + default_cachefile_name = 'osrcp-{0}.json'.format( + hashlib.md5(default_cache_id).hexdigest()) + self.__cache_file = cache_file or os.path.join( + tempfile.gettempdir(), default_cachefile_name) self.__init_cache() def __init_cache(self, refresh=False): @@ -60,8 +60,10 @@ def __init_cache(self, refresh=False): refresh = True else: try: - with open(self.__cache_file, 'r') as f: - self._cache = json.load(f, cls=partial(CacheDecoder, self.client)) + with open(self.__cache_file) as f: + self._cache = json.load( + f, cls=partial( + CacheDecoder, self.client)) if self._cache.get('library_version') != __version__: # Version mismatch, need to refresh cache self.invalidate_cache() @@ -89,7 +91,13 @@ def api_groups(self): pass @abstractmethod - def search(self, prefix=None, group=None, api_version=None, kind=None, **kwargs): + def search( + self, + prefix=None, + group=None, + api_version=None, + kind=None, + **kwargs): pass @abstractmethod @@ -102,21 +110,32 @@ def version(self): def default_groups(self, request_resources=False): groups = {} - groups['api'] = { '': { - 'v1': (ResourceGroup( True, resources=self.get_resources_for_api_version('api', '', 'v1', True) ) - if request_resources else ResourceGroup(True)) - }} - - groups[DISCOVERY_PREFIX] = {'': { - 'v1': ResourceGroup(True, resources = {"List": [ResourceList(self.client)]}) - }} + groups['api'] = { + '': { + 'v1': ( + ResourceGroup( + True, + resources=self.get_resources_for_api_version( + 'api', + '', + 'v1', + True)) if request_resources else ResourceGroup(True))}} + + groups[DISCOVERY_PREFIX] = { + '': { + 'v1': ResourceGroup( + True, resources={ + "List": [ + ResourceList( + self.client)]})}} return groups def parse_api_groups(self, request_resources=False, update=False): """ Discovers all API groups present in the cluster """ if not self._cache.get('resources') or update: self._cache['resources'] = self._cache.get('resources', {}) - groups_response = self.client.request('GET', '/{}'.format(DISCOVERY_PREFIX)).groups + groups_response = self.client.request( + 'GET', '/{}'.format(DISCOVERY_PREFIX)).groups groups = self.default_groups(request_resources=request_resources) @@ -124,12 +143,20 @@ def parse_api_groups(self, request_resources=False, update=False): new_group = {} for version_raw in group['versions']: version = version_raw['version'] - resource_group = self._cache.get('resources', {}).get(DISCOVERY_PREFIX, {}).get(group['name'], {}).get(version) + resource_group = self._cache.get( + 'resources', + {}).get( + DISCOVERY_PREFIX, + {}).get( + group['name'], + {}).get(version) preferred = version_raw == group['preferredVersion'] resources = resource_group.resources if resource_group else {} if request_resources: - resources = self.get_resources_for_api_version(DISCOVERY_PREFIX, group['name'], version, preferred) - new_group[version] = ResourceGroup(preferred, resources=resources) + resources = self.get_resources_for_api_version( + DISCOVERY_PREFIX, group['name'], version, preferred) + new_group[version] = ResourceGroup( + preferred, resources=resources) groups[DISCOVERY_PREFIX][group['name']] = new_group self._cache['resources'].update(groups) self._write_cache() @@ -143,14 +170,17 @@ def just_json(_, serialized): if not self._cache.get('version'): try: self._cache['version'] = { - 'kubernetes': self.client.request('get', '/version', serializer=just_json) - } + 'kubernetes': self.client.request( + 'get', '/version', serializer=just_json)} except (ValueError, MaxRetryError) as e: - if isinstance(e, MaxRetryError) and not isinstance(e.reason, ProtocolError): + if isinstance( + e, MaxRetryError) and not isinstance( + e.reason, ProtocolError): raise if not self.client.configuration.host.startswith("https://"): - raise ValueError("Host value %s should start with https:// when talking to HTTPS endpoint" % - self.client.configuration.host) + raise ValueError( + "Host value %s should start with https:// when talking to HTTPS endpoint" % + self.client.configuration.host) else: raise @@ -164,13 +194,20 @@ def get_resources_for_api_version(self, prefix, group, version, preferred): path = '/'.join(filter(None, [prefix, group, version])) try: - resources_response = self.client.request('GET', path).resources or [] + resources_response = self.client.request( + 'GET', path).resources or [] except (ServiceUnavailableError, JSONDecodeError): # Handle both service unavailable errors and JSON decode errors resources_response = [] - resources_raw = list(filter(lambda resource: '/' not in resource['name'], resources_response)) - subresources_raw = list(filter(lambda resource: '/' in resource['name'], resources_response)) + resources_raw = list( + filter( + lambda resource: '/' not in resource['name'], + resources_response)) + subresources_raw = list( + filter( + lambda resource: '/' in resource['name'], + resources_response)) for subresource in subresources_raw: resource, name = subresource['name'].split('/', 1) if not subresources.get(resource): @@ -179,7 +216,12 @@ def get_resources_for_api_version(self, prefix, group, version, preferred): for resource in resources_raw: # Prevent duplicate keys - for key in ('prefix', 'group', 'api_version', 'client', 'preferred'): + for key in ( + 'prefix', + 'group', + 'api_version', + 'client', + 'preferred'): resource.pop(key, None) resourceobj = Resource( @@ -193,7 +235,11 @@ def get_resources_for_api_version(self, prefix, group, version, preferred): ) resources[resource['kind']].append(resourceobj) - resource_list = ResourceList(self.client, group=group, api_version=version, base_kind=resource['kind']) + resource_list = ResourceList( + self.client, + group=group, + api_version=version, + base_kind=resource['kind']) resources[resource_list.kind].append(resource_list) return resources @@ -206,17 +252,22 @@ def get(self, **kwargs): # If there are multiple matches, prefer exact matches on api_version if len(results) > 1 and kwargs.get('api_version'): results = [ - result for result in results if result.group_version == kwargs['api_version'] - ] + result for result in results if result.group_version == kwargs['api_version']] # If there are multiple matches, prefer non-List kinds - if len(results) > 1 and not all([isinstance(x, ResourceList) for x in results]): - results = [result for result in results if not isinstance(result, ResourceList)] + if len(results) > 1 and not all( + [isinstance(x, ResourceList) for x in results]): + results = [ + result for result in results if not isinstance( + result, ResourceList)] if len(results) == 1: return results[0] elif not results: - raise ResourceNotFoundError('No matches found for {}'.format(kwargs)) + raise ResourceNotFoundError( + 'No matches found for {}'.format(kwargs)) else: - raise ResourceNotUniqueError('Multiple matches found for {}: {}'.format(kwargs, results)) + raise ResourceNotUniqueError( + 'Multiple matches found for {}: {}'.format( + kwargs, results)) class LazyDiscoverer(Discoverer): @@ -240,21 +291,28 @@ def __maybe_write_cache(self): @property def api_groups(self): - return self.parse_api_groups(request_resources=False, update=True)['apis'].keys() + return self.parse_api_groups( + request_resources=False, + update=True)['apis'].keys() def search(self, **kwargs): - # In first call, ignore ResourceNotFoundError and set default value for results + # In first call, ignore ResourceNotFoundError and set default value for + # results try: - results = self.__search(self.__build_search(**kwargs), self.__resources, []) + results = self.__search( + self.__build_search( + **kwargs), self.__resources, []) except ResourceNotFoundError: results = [] if not results: self.invalidate_cache() - results = self.__search(self.__build_search(**kwargs), self.__resources, []) + results = self.__search( + self.__build_search( + **kwargs), self.__resources, []) self.__maybe_write_cache() return results - def __search(self, parts, resources, reqParams): + def __search(self, parts, resources, reqParams): part = parts[0] if part != '*': @@ -263,7 +321,9 @@ def __search(self, parts, resources, reqParams): return [] elif isinstance(resourcePart, ResourceGroup): if len(reqParams) != 2: - raise ValueError("prefix and group params should be present, have %s" % reqParams) + raise ValueError( + "prefix and group params should be present, have %s" % + reqParams) # Check if we've requested resources for this group if not resourcePart.resources: prefix, group, version = reqParams[0], reqParams[1], part @@ -275,11 +335,13 @@ def __search(self, parts, resources, reqParams): self._cache['resources'][prefix][group][version] = resourcePart self.__update_cache = True - return self.__search(parts[1:], resourcePart.resources, reqParams) + return self.__search( + parts[1:], resourcePart.resources, reqParams) elif isinstance(resourcePart, dict): # In this case parts [0] will be a specified prefix, group, version # as we recurse - return self.__search(parts[1:], resourcePart, reqParams + [part] ) + return self.__search( + parts[1:], resourcePart, reqParams + [part]) else: if parts[1] != '*' and isinstance(parts[1], dict): for _resource in resourcePart: @@ -293,10 +355,17 @@ def __search(self, parts, resources, reqParams): else: matches = [] for key in resources.keys(): - matches.extend(self.__search([key] + parts[1:], resources, reqParams)) + matches.extend(self.__search( + [key] + parts[1:], resources, reqParams)) return matches - def __build_search(self, prefix=None, group=None, api_version=None, kind=None, **kwargs): + def __build_search( + self, + prefix=None, + group=None, + api_version=None, + kind=None, + **kwargs): if not group and api_version and '/' in api_version: group, api_version = api_version.split('/') @@ -313,7 +382,7 @@ def __iter__(self): prefix, group, version, rg.preferred) self._cache['resources'][prefix][group][version] = rg self.__update_cache = True - for _, resource in six.iteritems(rg.resources): + for _, resource in rg.resources.items(): yield resource self.__maybe_write_cache() @@ -337,8 +406,9 @@ def discover(self): @property def api_groups(self): """ list available api groups """ - return self.parse_api_groups(request_resources=True, update=True)['apis'].keys() - + return self.parse_api_groups( + request_resources=True, + update=True)['apis'].keys() def search(self, **kwargs): """ Takes keyword arguments and returns matching resources. The search @@ -351,13 +421,23 @@ def search(self, **kwargs): The arbitrary arguments can be any valid attribute for an Resource object """ - results = self.__search(self.__build_search(**kwargs), self.__resources) + results = self.__search( + self.__build_search( + **kwargs), self.__resources) if not results: self.invalidate_cache() - results = self.__search(self.__build_search(**kwargs), self.__resources) + results = self.__search( + self.__build_search( + **kwargs), self.__resources) return results - def __build_search(self, prefix=None, group=None, api_version=None, kind=None, **kwargs): + def __build_search( + self, + prefix=None, + group=None, + api_version=None, + kind=None, + **kwargs): if not group and api_version and '/' in api_version: group, api_version = api_version.split('/') @@ -397,8 +477,9 @@ def __iter__(self): yield resource -class ResourceGroup(object): +class ResourceGroup: """Helper class for Discoverer container""" + def __init__(self, preferred, resources=None): self.preferred = preferred self.resources = resources or {} @@ -420,7 +501,8 @@ def default(self, o): class CacheDecoder(json.JSONDecoder): def __init__(self, client, *args, **kwargs): self.client = client - json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs) + json.JSONDecoder.__init__( + self, object_hook=self.object_hook, *args, **kwargs) def object_hook(self, obj): if '_type' not in obj: @@ -431,5 +513,8 @@ def object_hook(self, obj): elif _type == 'ResourceList': return ResourceList(self.client, **obj) elif _type == 'ResourceGroup': - return ResourceGroup(obj['preferred'], resources=self.object_hook(obj['resources'])) + return ResourceGroup( + obj['preferred'], + resources=self.object_hook( + obj['resources'])) return obj diff --git a/kubernetes/base/dynamic/exceptions.py b/kubernetes/base/dynamic/exceptions.py index c8b908e7d5..215a1bbcc5 100644 --- a/kubernetes/base/dynamic/exceptions.py +++ b/kubernetes/base/dynamic/exceptions.py @@ -44,6 +44,7 @@ def api_exception(e): class DynamicApiError(ApiException): """ Generic API Error for the dynamic client """ + def __init__(self, e, tb=None): self.status = e.status self.reason = e.reason @@ -54,19 +55,24 @@ def __init__(self, e, tb=None): def __str__(self): error_message = [str(self.status), "Reason: {}".format(self.reason)] if self.headers: - error_message.append("HTTP response headers: {}".format(self.headers)) + error_message.append( + "HTTP response headers: {}".format( + self.headers)) if self.body: error_message.append("HTTP response body: {}".format(self.body)) if self.original_traceback: - error_message.append("Original traceback: \n{}".format(self.original_traceback)) + error_message.append( + "Original traceback: \n{}".format( + self.original_traceback)) return '\n'.join(error_message) def summary(self): if self.body: - if self.headers and self.headers.get('Content-Type') == 'application/json': + if self.headers and self.headers.get( + 'Content-Type') == 'application/json': message = json.loads(self.body).get('message') if message: return message @@ -75,36 +81,64 @@ def summary(self): else: return "{} Reason: {}".format(self.status, self.reason) + class ResourceNotFoundError(Exception): """ Resource was not found in available APIs """ + + class ResourceNotUniqueError(Exception): """ Parameters given matched multiple API resources """ + class KubernetesValidateMissing(Exception): """ kubernetes-validate is not installed """ # HTTP Errors + + class BadRequestError(DynamicApiError): """ 400: StatusBadRequest """ + + class UnauthorizedError(DynamicApiError): """ 401: StatusUnauthorized """ + + class ForbiddenError(DynamicApiError): """ 403: StatusForbidden """ + + class NotFoundError(DynamicApiError): """ 404: StatusNotFound """ + + class MethodNotAllowedError(DynamicApiError): """ 405: StatusMethodNotAllowed """ + + class ConflictError(DynamicApiError): """ 409: StatusConflict """ + + class GoneError(DynamicApiError): """ 410: StatusGone """ + + class UnprocessibleEntityError(DynamicApiError): """ 422: StatusUnprocessibleEntity """ + + class TooManyRequestsError(DynamicApiError): """ 429: StatusTooManyRequests """ + + class InternalServerError(DynamicApiError): """ 500: StatusInternalServer """ + + class ServiceUnavailableError(DynamicApiError): """ 503: StatusServiceUnavailable """ + + class ServerTimeoutError(DynamicApiError): """ 504: StatusServerTimeout """ diff --git a/kubernetes/base/dynamic/resource.py b/kubernetes/base/dynamic/resource.py index 58a60ec402..d61f4b0ae9 100644 --- a/kubernetes/base/dynamic/resource.py +++ b/kubernetes/base/dynamic/resource.py @@ -19,15 +19,29 @@ from pprint import pformat -class Resource(object): +class Resource: """ Represents an API resource type, containing the information required to build urls for requests """ - def __init__(self, prefix=None, group=None, api_version=None, kind=None, - namespaced=False, verbs=None, name=None, preferred=False, client=None, - singularName=None, shortNames=None, categories=None, subresources=None, **kwargs): + def __init__( + self, + prefix=None, + group=None, + api_version=None, + kind=None, + namespaced=False, + verbs=None, + name=None, + preferred=False, + client=None, + singularName=None, + shortNames=None, + categories=None, + subresources=None, + **kwargs): if None in (api_version, kind, prefix): - raise ValueError("At least prefix, kind, and api_version must be provided") + raise ValueError( + "At least prefix, kind, and api_version must be provided") self.prefix = prefix self.group = group @@ -61,7 +75,9 @@ def to_dict(self): 'singularName': self.singular_name, 'shortNames': self.short_names, 'categories': self.categories, - 'subresources': {k: sr.to_dict() for k, sr in self.subresources.items()}, + 'subresources': { + k: sr.to_dict() for k, + sr in self.subresources.items()}, } d.update(self.extra_args) return d @@ -73,7 +89,8 @@ def group_version(self): return self.api_version def __repr__(self): - return '<{}({}/{})>'.format(self.__class__.__name__, self.group_version, self.name) + return '<{}({}/{})>'.format(self.__class__.__name__, + self.group_version, self.name) @property def urls(self): @@ -108,7 +125,14 @@ def __getattr__(self, name): class ResourceList(Resource): """ Represents a list of API objects """ - def __init__(self, client, group='', api_version='v1', base_kind='', kind=None, base_resource_lookup=None): + def __init__( + self, + client, + group='', + api_version='v1', + base_kind='', + kind=None, + base_resource_lookup=None): self.client = client self.group = group self.api_version = api_version @@ -121,10 +145,12 @@ def base_resource(self): if self.__base_resource: return self.__base_resource elif self.base_resource_lookup: - self.__base_resource = self.client.resources.get(**self.base_resource_lookup) + self.__base_resource = self.client.resources.get( + **self.base_resource_lookup) return self.__base_resource elif self.base_kind: - self.__base_resource = self.client.resources.get(group=self.group, api_version=self.api_version, kind=self.base_kind) + self.__base_resource = self.client.resources.get( + group=self.group, api_version=self.api_version, kind=self.base_kind) return self.__base_resource return None @@ -141,16 +167,20 @@ def _items_to_resources(self, body): } """ if body is None: - raise ValueError("You must provide a body when calling methods on a ResourceList") + raise ValueError( + "You must provide a body when calling methods on a ResourceList") api_version = body['apiVersion'] kind = body['kind'] items = body.get('items') if not items: - raise ValueError('The `items` field in the body must be populated when calling methods on a ResourceList') + raise ValueError( + 'The `items` field in the body must be populated when calling methods on a ResourceList') if self.kind != kind: - raise ValueError('Methods on a {} must be called with a body containing the same kind. Received {} instead'.format(self.kind, kind)) + raise ValueError( + 'Methods on a {} must be called with a body containing the same kind. Received {} instead'.format( + self.kind, kind)) return { 'api_version': api_version, @@ -164,7 +194,8 @@ def _item_to_resource(self, item): if not resource: api_version = item.get('apiVersion', self.api_version) kind = item.get('kind', self.base_kind) - resource = self.client.resources.get(api_version=api_version, kind=kind) + resource = self.client.resources.get( + api_version=api_version, kind=kind) return { 'resource': resource, 'definition': item, @@ -174,26 +205,30 @@ def _item_to_resource(self, item): def get(self, body, name=None, namespace=None, **kwargs): if name: - raise ValueError('Operations on ResourceList objects do not support the `name` argument') + raise ValueError( + 'Operations on ResourceList objects do not support the `name` argument') resource_list = self._items_to_resources(body) response = copy.deepcopy(body) response['items'] = [ - item['resource'].get(name=item['name'], namespace=item['namespace'] or namespace, **kwargs).to_dict() - for item in resource_list['items'] - ] + item['resource'].get( + name=item['name'], + namespace=item['namespace'] or namespace, + **kwargs).to_dict() for item in resource_list['items']] return ResourceInstance(self, response) def delete(self, body, name=None, namespace=None, **kwargs): if name: - raise ValueError('Operations on ResourceList objects do not support the `name` argument') + raise ValueError( + 'Operations on ResourceList objects do not support the `name` argument') resource_list = self._items_to_resources(body) response = copy.deepcopy(body) response['items'] = [ - item['resource'].delete(name=item['name'], namespace=item['namespace'] or namespace, **kwargs).to_dict() - for item in resource_list['items'] - ] + item['resource'].delete( + name=item['name'], + namespace=item['namespace'] or namespace, + **kwargs).to_dict() for item in resource_list['items']] return ResourceInstance(self, response) def verb_mapper(self, verb, body, **kwargs): @@ -241,17 +276,20 @@ def __init__(self, parent, **kwargs): self.api_version = parent.api_version self.kind = kwargs.pop('kind') self.name = kwargs.pop('name') - self.subresource = kwargs.pop('subresource', None) or self.name.split('/')[1] + self.subresource = kwargs.pop( + 'subresource', None) or self.name.split('/')[1] self.namespaced = kwargs.pop('namespaced', False) self.verbs = kwargs.pop('verbs', None) self.extra_args = kwargs - #TODO(fabianvf): Determine proper way to handle differences between resources + subresources + # TODO(fabianvf): Determine proper way to handle differences between + # resources + subresources def create(self, body=None, name=None, namespace=None, **kwargs): name = name or body.get('metadata', {}).get('name') body = self.parent.client.serialize_body(body) if self.parent.namespaced: - namespace = self.parent.client.ensure_namespace(self.parent, namespace, body) + namespace = self.parent.client.ensure_namespace( + self.parent, namespace, body) path = self.path(name=name, namespace=namespace) return self.parent.client.request('post', path, body=body, **kwargs) @@ -259,9 +297,14 @@ def create(self, body=None, name=None, namespace=None, **kwargs): def urls(self): full_prefix = '{}/{}'.format(self.prefix, self.group_version) return { - 'full': '/{}/{}/{{name}}/{}'.format(full_prefix, self.parent.name, self.subresource), - 'namespaced_full': '/{}/namespaces/{{namespace}}/{}/{{name}}/{}'.format(full_prefix, self.parent.name, self.subresource) - } + 'full': '/{}/{}/{{name}}/{}'.format( + full_prefix, + self.parent.name, + self.subresource), + 'namespaced_full': '/{}/namespaces/{{namespace}}/{}/{{name}}/{}'.format( + full_prefix, + self.parent.name, + self.subresource)} def __getattr__(self, name): return partial(getattr(self.parent.client, name), self) @@ -278,7 +321,7 @@ def to_dict(self): return d -class ResourceInstance(object): +class ResourceInstance: """ A parsed instance of an API resource. It exists solely to ease interaction with API objects by allowing attributes to be accessed with '.' notation. @@ -337,15 +380,15 @@ def __repr__(self): ) def __getattr__(self, name): - if not '_ResourceInstance__initialised' in self.__dict__: - return super(ResourceInstance, self).__getattr__(name) + if '_ResourceInstance__initialised' not in self.__dict__: + return super().__getattr__(name) return getattr(self.attributes, name) def __setattr__(self, name, value): - if not '_ResourceInstance__initialised' in self.__dict__: - return super(ResourceInstance, self).__setattr__(name, value) + if '_ResourceInstance__initialised' not in self.__dict__: + return super().__setattr__(name, value) elif name in self.__dict__: - return super(ResourceInstance, self).__setattr__(name, value) + return super().__setattr__(name, value) else: self.attributes[name] = value @@ -359,7 +402,7 @@ def __dir__(self): return dir(type(self)) + list(self.attributes.__dict__.keys()) -class ResourceField(object): +class ResourceField: """ A parsed instance of an API resource attribute. It exists solely to ease interaction with API objects by allowing attributes to be accessed with '.' notation @@ -378,7 +421,8 @@ def __getitem__(self, name): return self.__dict__.get(name) # Here resource.items will return items if available or resource.__dict__.items function if not - # resource.get will call resource.__dict__.get after attempting resource.__dict__.get('get') + # resource.get will call resource.__dict__.get after attempting + # resource.__dict__.get('get') def __getattr__(self, name): return self.__dict__.get(name, getattr(self.__dict__, name, None)) @@ -389,8 +433,7 @@ def __dir__(self): return dir(type(self)) + list(self.__dict__.keys()) def __iter__(self): - for k, v in self.__dict__.items(): - yield (k, v) + yield from self.__dict__.items() def to_dict(self): return self.__serialize(self) diff --git a/kubernetes/base/dynamic/test_client.py b/kubernetes/base/dynamic/test_client.py index 2043226a5b..8dc6ec9b22 100644 --- a/kubernetes/base/dynamic/test_client.py +++ b/kubernetes/base/dynamic/test_client.py @@ -202,10 +202,16 @@ def test_async_namespaced_custom_resources(self): 'spec': {} } - async_resp = changeme_api.create(body=changeme_manifest, namespace='default', async_req=True) + async_resp = changeme_api.create( + body=changeme_manifest, + namespace='default', + async_req=True) self.assertEqual(async_resp.metadata.name, changeme_name) - async_resp = changeme_api.get(name=changeme_name, namespace='default', async_req=True) + async_resp = changeme_api.get( + name=changeme_name, + namespace='default', + async_req=True) self.assertEqual(async_resp.metadata.name, changeme_name) changeme_manifest['spec']['size'] = 3 @@ -217,7 +223,10 @@ def test_async_namespaced_custom_resources(self): ) self.assertEqual(async_resp.spec.size, 3) - async_resp = changeme_api.get(name=changeme_name, namespace='default', async_req=True) + async_resp = changeme_api.get( + name=changeme_name, + namespace='default', + async_req=True) self.assertEqual(async_resp.spec.size, 3) async_resp = changeme_api.get(namespace='default', async_req=True) @@ -521,20 +530,22 @@ def test_server_side_apply_api(self): name = 'pod-' + short_uuid() pod_manifest = { - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': {'labels': {'name': name}, - 'name': name}, - 'spec': {'containers': [{ - 'image': 'nginx', - 'name': 'nginx', + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': {'labels': {'name': name}, + 'name': name}, + 'spec': {'containers': [{ + 'image': 'nginx', + 'name': 'nginx', 'ports': [{'containerPort': 80, - 'protocol': 'TCP'}]}]}} + 'protocol': 'TCP'}]}]}} resp = api.server_side_apply( namespace='default', body=pod_manifest, field_manager='kubernetes-unittests', dry_run="All") - self.assertEqual('kubernetes-unittests', resp.metadata.managedFields[0].manager) + self.assertEqual( + 'kubernetes-unittests', + resp.metadata.managedFields[0].manager) class TestDynamicClientSerialization(unittest.TestCase): @@ -551,7 +562,10 @@ def setUpClass(cls): } def test_dict_type(self): - self.assertEqual(self.client.serialize_body(self.pod_manifest), self.pod_manifest) + self.assertEqual( + self.client.serialize_body( + self.pod_manifest), + self.pod_manifest) def test_resource_instance_type(self): inst = ResourceInstance(self.client, self.pod_manifest) diff --git a/kubernetes/base/dynamic/test_discovery.py b/kubernetes/base/dynamic/test_discovery.py index 639ccdd334..b5ed3e3007 100644 --- a/kubernetes/base/dynamic/test_discovery.py +++ b/kubernetes/base/dynamic/test_discovery.py @@ -51,7 +51,8 @@ def test_cache_decoder_resource_and_subresource(self): # do Discoverer.__init__ client = DynamicClient(api_client.ApiClient(configuration=self.config)) - # the resources of client will use _cache['resources'] decode from cache file + # the resources of client will use _cache['resources'] decode from + # cache file deploy2 = client.resources.get(kind='Deployment') # test Resource is the same diff --git a/kubernetes/base/hack/boilerplate/boilerplate.py b/kubernetes/base/hack/boilerplate/boilerplate.py index eec04b4583..3f5ca59dfa 100755 --- a/kubernetes/base/hack/boilerplate/boilerplate.py +++ b/kubernetes/base/hack/boilerplate/boilerplate.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function import argparse import datetime @@ -60,7 +59,7 @@ def get_refs(): args.boilerplate_dir, "boilerplate.*.txt")): extension = os.path.basename(path).split(".")[1] - ref_file = open(path, 'r') + ref_file = open(path) ref = ref_file.read().splitlines() ref_file.close() refs[extension] = ref @@ -70,7 +69,7 @@ def get_refs(): def file_passes(filename, refs, regexs): try: - f = open(filename, 'r') + f = open(filename) except Exception as exc: print("Unable to open %s: %s" % (filename, exc), file=verbose_out) return False @@ -172,7 +171,7 @@ def get_files(extensions): def get_dates(): years = datetime.datetime.now().year - return '(%s)' % '|'.join((str(year) for year in range(2014, years+1))) + return '(%s)' % '|'.join(str(year) for year in range(2014, years + 1)) def get_regexs(): diff --git a/kubernetes/base/leaderelection/electionconfig.py b/kubernetes/base/leaderelection/electionconfig.py index 8ae8847e13..8fe6e8977b 100644 --- a/kubernetes/base/leaderelection/electionconfig.py +++ b/kubernetes/base/leaderelection/electionconfig.py @@ -19,7 +19,14 @@ class Config: # Validate config, exit if an error is detected - def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading): + def __init__( + self, + lock, + lease_duration, + renew_deadline, + retry_period, + onstarted_leading, + onstopped_leading): self.jitter_factor = 1.2 if lock is None: @@ -30,7 +37,8 @@ def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted sys.exit("lease_duration must be greater than renew_deadline") if renew_deadline <= self.jitter_factor * retry_period: - sys.exit("renewDeadline must be greater than retry_period*jitter_factor") + sys.exit( + "renewDeadline must be greater than retry_period*jitter_factor") if lease_duration < 1: sys.exit("lease_duration must be greater than one") @@ -54,6 +62,7 @@ def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted else: self.onstopped_leading = onstopped_leading - # Default callback for when the current candidate if a leader, stops leading + # Default callback for when the current candidate if a leader, stops + # leading def on_stoppedleading_callback(self): logger.info("stopped leading".format(self.lock.identity)) diff --git a/kubernetes/base/leaderelection/example.py b/kubernetes/base/leaderelection/example.py index 3b3336c8e3..fd3e524ab2 100644 --- a/kubernetes/base/leaderelection/example.py +++ b/kubernetes/base/leaderelection/example.py @@ -43,9 +43,16 @@ def example_func(): # In that case, a default callback function will be used # Create config -config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17, - renew_deadline=15, retry_period=5, onstarted_leading=example_func, - onstopped_leading=None) +config = electionconfig.Config( + ConfigMapLock( + lock_name, + lock_namespace, + candidate_id), + lease_duration=17, + renew_deadline=15, + retry_period=5, + onstarted_leading=example_func, + onstopped_leading=None) # Enter leader election leaderelection.LeaderElection(config).run() diff --git a/kubernetes/base/leaderelection/leaderelection.py b/kubernetes/base/leaderelection/leaderelection.py index bbeb813505..afee426e1b 100644 --- a/kubernetes/base/leaderelection/leaderelection.py +++ b/kubernetes/base/leaderelection/leaderelection.py @@ -19,16 +19,13 @@ import threading from .leaderelectionrecord import LeaderElectionRecord import logging -# if condition to be removed when support for python2 will be removed -if sys.version_info > (3, 0): - from http import HTTPStatus -else: - import httplib +from http import HTTPStatus + logger = logging.getLogger("leaderelection") """ This package implements leader election using an annotation in a Kubernetes object. -The onstarted_leading function is run in a thread and when it returns, if it does +The onstarted_leading function is run in a thread and when it returns, if it does it might not be safe to run it again in a process. At first all candidates are considered followers. The one to create a lock or update @@ -55,11 +52,14 @@ def __init__(self, election_config): def run(self): # Try to create/ acquire a lock if self.acquire(): - logger.info("{} successfully acquired lease".format(self.election_config.lock.identity)) + logger.info( + "{} successfully acquired lease".format( + self.election_config.lock.identity)) # Start leading and call OnStartedLeading() threading.daemon = True - threading.Thread(target=self.election_config.onstarted_leading).start() + threading.Thread( + target=self.election_config.onstarted_leading).start() self.renew_loop() @@ -68,7 +68,9 @@ def run(self): def acquire(self): # Follower - logger.info("{} is a follower".format(self.election_config.lock.identity)) + logger.info( + "{} is a follower".format( + self.election_config.lock.identity)) retry_period = self.election_config.retry_period while True: @@ -81,7 +83,8 @@ def acquire(self): def renew_loop(self): # Leader - logger.info("Leader has entered renew loop and will try to update lease continuously") + logger.info( + "Leader has entered renew loop and will try to update lease continuously") retry_period = self.election_config.retry_period renew_deadline = self.election_config.renew_deadline * 1000 @@ -109,34 +112,46 @@ def try_acquire_or_renew(self): now = datetime.datetime.fromtimestamp(now_timestamp) # Check if lock is created - lock_status, old_election_record = self.election_config.lock.get(self.election_config.lock.name, - self.election_config.lock.namespace) + lock_status, old_election_record = self.election_config.lock.get( + self.election_config.lock.name, self.election_config.lock.namespace) # create a default Election record for this candidate - leader_election_record = LeaderElectionRecord(self.election_config.lock.identity, - str(self.election_config.lease_duration), str(now), str(now)) + leader_election_record = LeaderElectionRecord( + self.election_config.lock.identity, str( + self.election_config.lease_duration), str(now), str(now)) # A lock is not created with that name, try to create one if not lock_status: # To be removed when support for python2 will be removed if sys.version_info > (3, 0): - if json.loads(old_election_record.body)['code'] != HTTPStatus.NOT_FOUND: - logger.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name, - old_election_record.reason)) + if json.loads(old_election_record.body)[ + 'code'] != HTTPStatus.NOT_FOUND: + logger.info( + "Error retrieving resource lock {} as {}".format( + self.election_config.lock.name, + old_election_record.reason)) return False else: - if json.loads(old_election_record.body)['code'] != httplib.NOT_FOUND: - logger.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name, - old_election_record.reason)) + if json.loads(old_election_record.body)[ + 'code'] != httplib.NOT_FOUND: + logger.info( + "Error retrieving resource lock {} as {}".format( + self.election_config.lock.name, + old_election_record.reason)) return False - logger.info("{} is trying to create a lock".format(leader_election_record.holder_identity)) - create_status = self.election_config.lock.create(name=self.election_config.lock.name, - namespace=self.election_config.lock.namespace, - election_record=leader_election_record) + logger.info( + "{} is trying to create a lock".format( + leader_election_record.holder_identity)) + create_status = self.election_config.lock.create( + name=self.election_config.lock.name, + namespace=self.election_config.lock.namespace, + election_record=leader_election_record) if create_status is False: - logger.info("{} Failed to create lock".format(leader_election_record.holder_identity)) + logger.info( + "{} Failed to create lock".format( + leader_election_record.holder_identity)) return False self.observed_record = leader_election_record @@ -156,16 +171,21 @@ def try_acquire_or_renew(self): # Report transitions if self.observed_record and self.observed_record.holder_identity != old_election_record.holder_identity: - logger.info("Leader has switched to {}".format(old_election_record.holder_identity)) + logger.info( + "Leader has switched to {}".format( + old_election_record.holder_identity)) if self.observed_record is None or old_election_record.__dict__ != self.observed_record.__dict__: self.observed_record = old_election_record self.observed_time_milliseconds = int(time.time() * 1000) - # If This candidate is not the leader and lease duration is yet to finish + # If This candidate is not the leader and lease duration is yet to + # finish if (self.election_config.lock.identity != self.observed_record.holder_identity and self.observed_time_milliseconds + self.election_config.lease_duration * 1000 > int(now_timestamp * 1000)): - logger.info("yet to finish lease_duration, lease held by {} and has not expired".format(old_election_record.holder_identity)) + logger.info( + "yet to finish lease_duration, lease held by {} and has not expired".format( + old_election_record.holder_identity)) return False # If this candidate is the Leader @@ -177,15 +197,20 @@ def try_acquire_or_renew(self): def update_lock(self, leader_election_record): # Update object with latest election record - update_status = self.election_config.lock.update(self.election_config.lock.name, - self.election_config.lock.namespace, - leader_election_record) + update_status = self.election_config.lock.update( + self.election_config.lock.name, + self.election_config.lock.namespace, + leader_election_record) if update_status is False: - logger.info("{} failed to acquire lease".format(leader_election_record.holder_identity)) + logger.info( + "{} failed to acquire lease".format( + leader_election_record.holder_identity)) return False self.observed_record = leader_election_record self.observed_time_milliseconds = int(time.time() * 1000) - logger.info("leader {} has successfully acquired lease".format(leader_election_record.holder_identity)) + logger.info( + "leader {} has successfully acquired lease".format( + leader_election_record.holder_identity)) return True diff --git a/kubernetes/base/leaderelection/leaderelection_test.py b/kubernetes/base/leaderelection/leaderelection_test.py index 9fb6d9bcf4..6b75580dcb 100644 --- a/kubernetes/base/leaderelection/leaderelection_test.py +++ b/kubernetes/base/leaderelection/leaderelection_test.py @@ -25,6 +25,7 @@ thread_lock = threading.RLock() + class LeaderElectionTest(unittest.TestCase): def test_simple_leader_election(self): election_history = [] @@ -40,7 +41,15 @@ def on_update(): def on_change(): election_history.append("change record") - mock_lock = MockResourceLock("mock", "mock_namespace", "mock", thread_lock, on_create, on_update, on_change, None) + mock_lock = MockResourceLock( + "mock", + "mock_namespace", + "mock", + thread_lock, + on_create, + on_update, + on_change, + None) def on_started_leading(): leadership_history.append("start leading") @@ -49,15 +58,23 @@ def on_stopped_leading(): leadership_history.append("stop leading") # Create config 4.5 4 3 - config = electionconfig.Config(lock=mock_lock, lease_duration=2.5, - renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading, - onstopped_leading=on_stopped_leading) + config = electionconfig.Config( + lock=mock_lock, + lease_duration=2.5, + renew_deadline=2, + retry_period=1.5, + onstarted_leading=on_started_leading, + onstopped_leading=on_stopped_leading) # Enter leader election leaderelection.LeaderElection(config).run() - self.assert_history(election_history, ["create record", "update record", "update record", "update record"]) - self.assert_history(leadership_history, ["get leadership", "start leading", "stop leading"]) + self.assert_history( + election_history, [ + "create record", "update record", "update record", "update record"]) + self.assert_history( + leadership_history, [ + "get leadership", "start leading", "stop leading"]) def test_leader_election(self): election_history = [] @@ -73,7 +90,15 @@ def on_update_A(): def on_change_A(): election_history.append("A gets leadership") - mock_lock_A = MockResourceLock("mock", "mock_namespace", "MockA", thread_lock, on_create_A, on_update_A, on_change_A, None) + mock_lock_A = MockResourceLock( + "mock", + "mock_namespace", + "MockA", + thread_lock, + on_create_A, + on_update_A, + on_change_A, + None) mock_lock_A.renew_count_max = 3 def on_started_leading_A(): @@ -82,9 +107,13 @@ def on_started_leading_A(): def on_stopped_leading_A(): leadership_history.append("A stops leading") - config_A = electionconfig.Config(lock=mock_lock_A, lease_duration=2.5, - renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading_A, - onstopped_leading=on_stopped_leading_A) + config_A = electionconfig.Config( + lock=mock_lock_A, + lease_duration=2.5, + renew_deadline=2, + retry_period=1.5, + onstarted_leading=on_started_leading_A, + onstopped_leading=on_stopped_leading_A) def on_create_B(): election_history.append("B creates record") @@ -96,7 +125,15 @@ def on_update_B(): def on_change_B(): leadership_history.append("B gets leadership") - mock_lock_B = MockResourceLock("mock", "mock_namespace", "MockB", thread_lock, on_create_B, on_update_B, on_change_B, None) + mock_lock_B = MockResourceLock( + "mock", + "mock_namespace", + "MockB", + thread_lock, + on_create_B, + on_update_B, + on_change_B, + None) mock_lock_B.renew_count_max = 4 def on_started_leading_B(): @@ -105,18 +142,24 @@ def on_started_leading_B(): def on_stopped_leading_B(): leadership_history.append("B stops leading") - config_B = electionconfig.Config(lock=mock_lock_B, lease_duration=2.5, - renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading_B, - onstopped_leading=on_stopped_leading_B) + config_B = electionconfig.Config( + lock=mock_lock_B, + lease_duration=2.5, + renew_deadline=2, + retry_period=1.5, + onstarted_leading=on_started_leading_B, + onstopped_leading=on_stopped_leading_B) mock_lock_B.leader_record = mock_lock_A.leader_record threading.daemon = True # Enter leader election for A - threading.Thread(target=leaderelection.LeaderElection(config_A).run()).start() + threading.Thread( + target=leaderelection.LeaderElection(config_A).run()).start() # Enter leader election for B - threading.Thread(target=leaderelection.LeaderElection(config_B).run()).start() + threading.Thread( + target=leaderelection.LeaderElection(config_B).run()).start() time.sleep(5) @@ -136,7 +179,6 @@ def on_stopped_leading_B(): "B starts leading", "B stops leading"]) - """Expected behavior: to check if the leader stops leading if it fails to update the lock within the renew_deadline and stops leading after finally timing out. The difference between each try comes out to be approximately the sleep time. @@ -145,10 +187,11 @@ def on_stopped_leading_B(): on try update: 1.5s on update: zzz s on try update: 3s - on update: zzz s + on update: zzz s on try update: 4.5s on try update: 6s Timeout - Leader Exits""" + def test_Leader_election_with_renew_deadline(self): election_history = [] leadership_history = [] @@ -166,7 +209,15 @@ def on_change(): def on_try_update(): election_history.append("try update record") - mock_lock = MockResourceLock("mock", "mock_namespace", "mock", thread_lock, on_create, on_update, on_change, on_try_update) + mock_lock = MockResourceLock( + "mock", + "mock_namespace", + "mock", + thread_lock, + on_create, + on_update, + on_change, + on_try_update) mock_lock.renew_count_max = 3 def on_started_leading(): @@ -176,9 +227,13 @@ def on_stopped_leading(): leadership_history.append("stop leading") # Create config - config = electionconfig.Config(lock=mock_lock, lease_duration=2.5, - renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading, - onstopped_leading=on_stopped_leading) + config = electionconfig.Config( + lock=mock_lock, + lease_duration=2.5, + renew_deadline=2, + retry_period=1.5, + onstarted_leading=on_started_leading, + onstopped_leading=on_stopped_leading) # Enter leader election leaderelection.LeaderElection(config).run() @@ -192,7 +247,9 @@ def on_stopped_leading(): "try update record", "try update record"]) - self.assert_history(leadership_history, ["get leadership", "start leading", "stop leading"]) + self.assert_history( + leadership_history, [ + "get leadership", "start leading", "stop leading"]) def assert_history(self, history, expected): self.assertIsNotNone(expected) @@ -200,13 +257,26 @@ def assert_history(self, history, expected): self.assertEqual(len(expected), len(history)) for idx in range(len(history)): - self.assertEqual(history[idx], expected[idx], - msg="Not equal at index {}, expected {}, got {}".format(idx, expected[idx], - history[idx])) + self.assertEqual( + history[idx], + expected[idx], + msg="Not equal at index {}, expected {}, got {}".format( + idx, + expected[idx], + history[idx])) class MockResourceLock: - def __init__(self, name, namespace, identity, shared_lock, on_create=None, on_update=None, on_change=None, on_try_update=None): + def __init__( + self, + name, + namespace, + identity, + shared_lock, + on_create=None, + on_update=None, + on_change=None, + on_try_update=None): # self.leader_record is shared between two MockResourceLock objects self.leader_record = [] self.renew_count = 0 diff --git a/kubernetes/base/leaderelection/leaderelectionrecord.py b/kubernetes/base/leaderelection/leaderelectionrecord.py index ebb550d4d1..0174860be2 100644 --- a/kubernetes/base/leaderelection/leaderelectionrecord.py +++ b/kubernetes/base/leaderelection/leaderelectionrecord.py @@ -15,7 +15,12 @@ class LeaderElectionRecord: # Annotation used in the lock object - def __init__(self, holder_identity, lease_duration, acquire_time, renew_time): + def __init__( + self, + holder_identity, + lease_duration, + acquire_time, + renew_time): self.holder_identity = holder_identity self.lease_duration = lease_duration self.acquire_time = acquire_time diff --git a/kubernetes/base/leaderelection/resourcelock/configmaplock.py b/kubernetes/base/leaderelection/resourcelock/configmaplock.py index 719152edea..1d8dd383fe 100644 --- a/kubernetes/base/leaderelection/resourcelock/configmaplock.py +++ b/kubernetes/base/leaderelection/resourcelock/configmaplock.py @@ -39,7 +39,7 @@ def __init__(self, name, namespace, identity): 'leaseDurationSeconds': None, 'acquireTime': None, 'renewTime': None - } + } # get returns the election record from a ConfigMap Annotation def get(self, name, namespace): @@ -49,22 +49,28 @@ def get(self, name, namespace): :return: 'True, election record' if object found else 'False, exception response' """ try: - api_response = self.api_instance.read_namespaced_config_map(name, namespace) + api_response = self.api_instance.read_namespaced_config_map( + name, namespace) - # If an annotation does not exist - add the leader_electionrecord_annotationkey + # If an annotation does not exist - add the + # leader_electionrecord_annotationkey annotations = api_response.metadata.annotations if annotations is None or annotations == '': - api_response.metadata.annotations = {self.leader_electionrecord_annotationkey: ''} + api_response.metadata.annotations = { + self.leader_electionrecord_annotationkey: ''} self.configmap_reference = api_response return True, None - # If an annotation exists but, the leader_electionrecord_annotationkey does not then add it as a key + # If an annotation exists but, the + # leader_electionrecord_annotationkey does not then add it as a key if not annotations.get(self.leader_electionrecord_annotationkey): - api_response.metadata.annotations = {self.leader_electionrecord_annotationkey: ''} + api_response.metadata.annotations = { + self.leader_electionrecord_annotationkey: ''} self.configmap_reference = api_response return True, None - lock_record = self.get_lock_object(json.loads(annotations[self.leader_electionrecord_annotationkey])) + lock_record = self.get_lock_object(json.loads( + annotations[self.leader_electionrecord_annotationkey])) self.configmap_reference = api_response return True, lock_record @@ -79,11 +85,14 @@ def create(self, name, namespace, election_record): :return: 'True' if object is created else 'False' if failed """ body = client.V1ConfigMap( - metadata={"name": name, - "annotations": {self.leader_electionrecord_annotationkey: json.dumps(self.get_lock_dict(election_record))}}) + metadata={ + "name": name, "annotations": { + self.leader_electionrecord_annotationkey: json.dumps( + self.get_lock_dict(election_record))}}) try: - api_response = self.api_instance.create_namespaced_config_map(namespace, body, pretty=True) + api_response = self.api_instance.create_namespaced_config_map( + namespace, body, pretty=True) return True except ApiException as e: logger.info("Failed to create lock as {}".format(e)) @@ -98,9 +107,10 @@ def update(self, name, namespace, updated_record): """ try: # Set the updated record - self.configmap_reference.metadata.annotations[self.leader_electionrecord_annotationkey] = json.dumps(self.get_lock_dict(updated_record)) - api_response = self.api_instance.replace_namespaced_config_map(name=name, namespace=namespace, - body=self.configmap_reference) + self.configmap_reference.metadata.annotations[self.leader_electionrecord_annotationkey] = json.dumps( + self.get_lock_dict(updated_record)) + api_response = self.api_instance.replace_namespaced_config_map( + name=name, namespace=namespace, body=self.configmap_reference) return True except ApiException as e: logger.info("Failed to update lock as {}".format(e)) @@ -125,5 +135,5 @@ def get_lock_dict(self, leader_election_record): self.lock_record['leaseDurationSeconds'] = leader_election_record.lease_duration self.lock_record['acquireTime'] = leader_election_record.acquire_time self.lock_record['renewTime'] = leader_election_record.renew_time - - return self.lock_record \ No newline at end of file + + return self.lock_record diff --git a/kubernetes/base/stream/stream.py b/kubernetes/base/stream/stream.py index e34dedfc3b..6c375945f2 100644 --- a/kubernetes/base/stream/stream.py +++ b/kubernetes/base/stream/stream.py @@ -17,7 +17,12 @@ from . import ws_client -def _websocket_request(websocket_request, force_kwargs, api_method, *args, **kwargs): +def _websocket_request( + websocket_request, + force_kwargs, + api_method, + *args, + **kwargs): """Override the ApiClient.request method with an alternative websocket based method and call the supplied Kubernetes API method with that in place.""" if force_kwargs: @@ -32,7 +37,8 @@ def _websocket_request(websocket_request, force_kwargs, api_method, *args, **kwa prev_request = api_client.request binary = kwargs.pop('binary', False) try: - api_client.request = functools.partial(websocket_request, configuration, binary=binary) + api_client.request = functools.partial( + websocket_request, configuration, binary=binary) out = api_method(*args, **kwargs) # The api_client insists on converting this to a string using its representation, so we have # to do this dance to strip it of the b' prefix and ' suffix, encode it byte-per-byte (latin1), @@ -40,11 +46,14 @@ def _websocket_request(websocket_request, force_kwargs, api_method, *args, **kwa # However, if _preload_content=False is passed, then the entire WSClient is returned instead # of a response, and we want to leave it alone if binary and kwargs.get('_preload_content', True): - out = out[2:-1].encode('latin1').decode('unicode_escape').encode('latin1') + out = out[2:- + 1].encode('latin1').decode('unicode_escape').encode('latin1') return out finally: api_client.request = prev_request stream = functools.partial(_websocket_request, ws_client.websocket_call, None) -portforward = functools.partial(_websocket_request, ws_client.portforward_call, {'_preload_content':False}) +portforward = functools.partial( + _websocket_request, ws_client.portforward_call, { + '_preload_content': False}) diff --git a/kubernetes/base/stream/ws_client.py b/kubernetes/base/stream/ws_client.py index 44b6123325..d2e8b07c1e 100644 --- a/kubernetes/base/stream/ws_client.py +++ b/kubernetes/base/stream/ws_client.py @@ -22,16 +22,12 @@ import ssl import threading import time - -import six -import yaml - - -from six.moves.urllib.parse import urlencode, urlparse, urlunparse -from six import StringIO, BytesIO - +from urllib.parse import urlencode, urlparse, urlunparse +from io import StringIO, BytesIO from websocket import WebSocket, ABNF, enableTrace, WebSocketConnectionClosedException from base64 import urlsafe_b64decode + +import yaml from requests.utils import should_bypass_proxies STDIN_CHANNEL = 0 @@ -40,12 +36,14 @@ ERROR_CHANNEL = 3 RESIZE_CHANNEL = 4 + class _IgnoredIO: def write(self, _x): pass def getvalue(self): - raise TypeError("Tried to read_all() from a WSClient configured to not capture. Did you mean `capture_all=True`?") + raise TypeError( + "Tried to read_all() from a WSClient configured to not capture. Did you mean `capture_all=True`?") class WSClient: @@ -98,7 +96,7 @@ def readline_channel(self, channel, timeout=None): if self.newline in data: index = data.find(self.newline) ret = data[:index] - data = data[index+1:] + data = data[index + 1:] if data: self._channels[channel] = data else: @@ -109,12 +107,12 @@ def readline_channel(self, channel, timeout=None): def write_channel(self, channel, data): """Write data to a channel.""" # check if we're writing binary data or not - binary = six.PY3 and type(data) == six.binary_type + binary = isinstance(data, bytes) opcode = ABNF.OPCODE_BINARY if binary else ABNF.OPCODE_TEXT channel_prefix = chr(channel) if binary: - channel_prefix = six.binary_type(channel_prefix, "ascii") + channel_prefix = bytes(channel_prefix, "ascii") payload = channel_prefix + data self.sock.send(payload, opcode=opcode) @@ -200,11 +198,11 @@ def update(self, timeout=0): return elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT: data = frame.data - if six.PY3 and not self.binary: + if not self.binary: data = data.decode("utf-8", "replace") if len(data) > 1: channel = data[0] - if six.PY3 and not self.binary: + if not self.binary: channel = ord(channel) data = data[1:] if data: @@ -227,6 +225,7 @@ def run_forever(self, timeout=None): else: while self.is_open(): self.update(timeout=None) + @property def returncode(self): """ @@ -242,7 +241,8 @@ def returncode(self): if err['status'] == "Success": self._returncode = 0 else: - self._returncode = int(err['details']['causes'][0]['message']) + self._returncode = int( + err['details']['causes'][0]['message']) return self._returncode def close(self, **kwargs): @@ -264,6 +264,7 @@ def getheader(self, name, default=None): """Returns a given response header.""" return None + class PortForward: def __init__(self, websocket, ports): """A websocket client with support for port forwarding. @@ -279,11 +280,10 @@ def __init__(self, websocket, ports): self.local_ports[port_number] = self._Port(ix, port_number) # There is a thread run per PortForward instance which performs the translation between the # raw socket data sent by the python application and the websocket protocol. This thread - # terminates after either side has closed all ports, and after flushing all pending data. - proxy = threading.Thread( - name="Kubernetes port forward proxy: %s" % ', '.join([str(port) for port in ports]), - target=self._proxy - ) + # terminates after either side has closed all ports, and after flushing + # all pending data. + proxy = threading.Thread(name="Kubernetes port forward proxy: %s" % ', '.join( + [str(port) for port in ports]), target=self._proxy) proxy.daemon = True proxy.start() @@ -310,7 +310,7 @@ def __init__(self, ix, port_number): # The remote port number self.port_number = port_number # The websocket channel byte number for this port - self.channel = six.int2byte(ix * 2) + self.channel = bytes((ix * 2,)) # A socket pair is created to provide a means of translating the data flow # between the python application and the kubernetes websocket. The self.python # half of the socket pair is used by the _proxy method to receive and send data @@ -322,7 +322,8 @@ def __init__(self, ix, port_number): # intercepting setting AF_INET socket options that would error against an AF_UNIX # socket. self.socket = self._Socket(s) - # Data accumulated from the websocket to be sent to the python application. + # Data accumulated from the websocket to be sent to the python + # application. self.data = b'' # All data sent from kubernetes on the port error channel. self.error = None @@ -341,7 +342,8 @@ def setsockopt(self, level, optname, value): return self._socket.setsockopt(level, optname, value) - # Proxy all socket data between the python code and the kubernetes websocket. + # Proxy all socket data between the python code and the kubernetes + # websocket. def _proxy(self): channel_ports = [] channel_initialized = [] @@ -358,8 +360,8 @@ def _proxy(self): # The data to send on the websocket socket kubernetes_data = b'' while True: - rlist = [] # List of sockets to read from - wlist = [] # List of sockets to write to + rlist = [] # List of sockets to read from + wlist = [] # List of sockets to write to if self.websocket.connected: rlist.append(self.websocket) if kubernetes_data: @@ -378,7 +380,8 @@ def _proxy(self): local_all_closed = False else: port.python.close() - if local_all_closed and not (self.websocket.connected and kubernetes_data): + if local_all_closed and not ( + self.websocket.connected and kubernetes_data): self.websocket.close() return r, w, _ = select.select(rlist, wlist, []) @@ -387,17 +390,20 @@ def _proxy(self): pending = True while pending: try: - opcode, frame = self.websocket.recv_data_frame(True) + opcode, frame = self.websocket.recv_data_frame( + True) except WebSocketConnectionClosedException: for port in self.local_ports.values(): port.python.close() return if opcode == ABNF.OPCODE_BINARY: if not frame.data: - raise RuntimeError("Unexpected frame data size") - channel = six.byte2int(frame.data) + raise RuntimeError( + "Unexpected frame data size") + channel = frame.data[0] if channel >= len(channel_ports): - raise RuntimeError("Unexpected channel number: %s" % channel) + raise RuntimeError( + "Unexpected channel number: %s" % channel) port = channel_ports[channel] if channel_initialized[channel]: if channel % 2: @@ -412,15 +418,19 @@ def _proxy(self): raise RuntimeError( "Unexpected initial channel frame data size" ) - port_number = six.byte2int(frame.data[1:2]) + (six.byte2int(frame.data[2:3]) * 256) + port_number = frame.data[1:2][0] + \ + (frame.data[2:3][0] * 256) if port_number != port.port_number: raise RuntimeError( - "Unexpected port number in initial channel frame: %s" % port_number - ) + "Unexpected port number in initial channel frame: %s" % port_number) channel_initialized[channel] = True elif opcode not in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG, ABNF.OPCODE_CLOSE): - raise RuntimeError("Unexpected websocket opcode: %s" % opcode) - if not (isinstance(self.websocket.sock, ssl.SSLSocket) and self.websocket.sock.pending()): + raise RuntimeError( + "Unexpected websocket opcode: %s" % opcode) + if not ( + isinstance( + self.websocket.sock, + ssl.SSLSocket) and self.websocket.sock.pending()): pending = False else: port = local_ports[sock] @@ -471,7 +481,7 @@ def create_websocket(configuration, url, headers=None): # http headers we get from the generated code header = [] if headers and 'authorization' in headers: - header.append("authorization: %s" % headers['authorization']) + header.append("authorization: %s" % headers['authorization']) if headers and 'sec-websocket-protocol' in headers: header.append("sec-websocket-protocol: %s" % headers['sec-websocket-protocol']) @@ -497,32 +507,36 @@ def create_websocket(configuration, url, headers=None): websocket = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False) connect_opt = { - 'header': header + 'header': header } if configuration.proxy or configuration.proxy_headers: - connect_opt = websocket_proxycare(connect_opt, configuration, url, headers) + connect_opt = websocket_proxycare( + connect_opt, configuration, url, headers) websocket.connect(url, **connect_opt) return websocket + def websocket_proxycare(connect_opt, configuration, url, headers): """ An internal function to be called in api-client when a websocket create is requested. """ if configuration.no_proxy: - connect_opt.update({ 'http_no_proxy': configuration.no_proxy.split(',') }) + connect_opt.update( + {'http_no_proxy': configuration.no_proxy.split(',')}) if configuration.proxy: proxy_url = urlparse(configuration.proxy) - connect_opt.update({'http_proxy_host': proxy_url.hostname, 'http_proxy_port': proxy_url.port}) + connect_opt.update( + {'http_proxy_host': proxy_url.hostname, 'http_proxy_port': proxy_url.port}) if configuration.proxy_headers: - for key,value in configuration.proxy_headers.items(): + for key, value in configuration.proxy_headers.items(): if key == 'proxy-authorization' and value.startswith('Basic'): b64value = value.split()[1] auth = urlsafe_b64decode(b64value).decode().split(':') - connect_opt.update({'http_proxy_auth': (auth[0], auth[1]) }) - return(connect_opt) + connect_opt.update({'http_proxy_auth': (auth[0], auth[1])}) + return (connect_opt) def websocket_call(configuration, _method, url, **kwargs): @@ -537,7 +551,12 @@ def websocket_call(configuration, _method, url, **kwargs): capture_all = kwargs.get("capture_all", True) binary = kwargs.get('binary', False) try: - client = WSClient(configuration, url, headers, capture_all, binary=binary) + client = WSClient( + configuration, + url, + headers, + capture_all, + binary=binary) if not _preload_content: return client client.run_forever(timeout=_request_timeout) @@ -566,7 +585,9 @@ def portforward_call(configuration, _method, url, **kwargs): except ValueError: raise ApiValueError("Invalid port number: %s" % port) if not (0 < port_number < 65536): - raise ApiValueError("Port number must be between 0 and 65536: %s" % port) + raise ApiValueError( + "Port number must be between 0 and 65536: %s" % + port) if port_number in ports: raise ApiValueError("Duplicate port numbers: %s" % port) ports.append(port_number) diff --git a/kubernetes/base/stream/ws_client_test.py b/kubernetes/base/stream/ws_client_test.py index 3f8c022874..818641de62 100644 --- a/kubernetes/base/stream/ws_client_test.py +++ b/kubernetes/base/stream/ws_client_test.py @@ -28,6 +28,8 @@ urllib3.disable_warnings() except ImportError: pass + + @pytest.fixture(autouse=True) def dummy_kubeconfig(tmp_path, monkeypatch): # Creating a kubeconfig @@ -54,14 +56,16 @@ def dummy_kubeconfig(tmp_path, monkeypatch): def dictval(dict_obj, key, default=None): - + return dict_obj.get(key, default) + class DummyProxy(threading.Thread): """ A minimal HTTP proxy that flags any CONNECT request and returns 200 OK. Listens on 127.0.0.1:8888 by default. """ + def __init__(self, host='127.0.0.1', port=8888): super().__init__(daemon=True) self.host = host @@ -82,6 +86,7 @@ def run(self): finally: conn.close() + class WSClientTest(unittest.TestCase): def test_websocket_client(self): @@ -94,59 +99,74 @@ def test_websocket_client(self): ('https://api.domain.com', 'wss://api.domain.com'), ('http://api.domain.com/', 'ws://api.domain.com/'), ('https://api.domain.com/', 'wss://api.domain.com/'), - ]: + ]: self.assertEqual(get_websocket_url(url), ws_url) def test_websocket_proxycare(self): for proxy, idpass, no_proxy, expect_host, expect_port, expect_auth, expect_noproxy in [ - ( None, None, None, None, None, None, None ), - ( 'http://proxy.example.com:8080/', None, None, 'proxy.example.com', 8080, None, None ), - ( 'http://proxy.example.com:8080/', 'user:pass', None, 'proxy.example.com', 8080, ('user','pass'), None), - ( 'http://proxy.example.com:8080/', 'user:pass', '', 'proxy.example.com', 8080, ('user','pass'), None), - ( 'http://proxy.example.com:8080/', 'user:pass', '*', 'proxy.example.com', 8080, ('user','pass'), ['*']), - ( 'http://proxy.example.com:8080/', 'user:pass', '.example.com', 'proxy.example.com', 8080, ('user','pass'), ['.example.com']), - ( 'http://proxy.example.com:8080/', 'user:pass', 'localhost,.local,.example.com', 'proxy.example.com', 8080, ('user','pass'), ['localhost','.local','.example.com']), - ]: + (None, None, None, None, None, None, None), + ('http://proxy.example.com:8080/', None, None, 'proxy.example.com', 8080, None, None), + ('http://proxy.example.com:8080/', 'user:pass', None, 'proxy.example.com', 8080, ('user', 'pass'), None), + ('http://proxy.example.com:8080/', 'user:pass', '', 'proxy.example.com', 8080, ('user', 'pass'), None), + ('http://proxy.example.com:8080/', 'user:pass', '*', 'proxy.example.com', 8080, ('user', 'pass'), ['*']), + ('http://proxy.example.com:8080/', 'user:pass', '.example.com', 'proxy.example.com', 8080, ('user', 'pass'), ['.example.com']), + ('http://proxy.example.com:8080/', 'user:pass', 'localhost,.local,.example.com', 'proxy.example.com', 8080, ('user', 'pass'), ['localhost', '.local', '.example.com']), + ]: # input setup cfg = Configuration() if proxy: cfg.proxy = proxy if idpass: - cfg.proxy_headers = urllib3.util.make_headers(proxy_basic_auth=idpass) + cfg.proxy_headers = urllib3.util.make_headers( + proxy_basic_auth=idpass) if no_proxy is not None: cfg.no_proxy = no_proxy - - + connect_opts = websocket_proxycare({}, cfg, None, None) assert dictval(connect_opts, 'http_proxy_host') == expect_host assert dictval(connect_opts, 'http_proxy_port') == expect_port assert dictval(connect_opts, 'http_proxy_auth') == expect_auth assert dictval(connect_opts, 'http_no_proxy') == expect_noproxy + @pytest.fixture(scope="module") def dummy_proxy(): - #Dummy Proxy + # Dummy Proxy proxy = DummyProxy(port=8888) proxy.start() yield proxy + @pytest.fixture(autouse=True) def clear_proxy_env(monkeypatch): - for var in ("HTTP_PROXY", "http_proxy", "HTTPS_PROXY", "https_proxy", "NO_PROXY", "no_proxy"): + for var in ( + "HTTP_PROXY", + "http_proxy", + "HTTPS_PROXY", + "https_proxy", + "NO_PROXY", + "no_proxy"): monkeypatch.delenv(var, raising=False) + def apply_proxy_to_conf(): - #apply HTTPS_PROXY env var and set it as global. + # apply HTTPS_PROXY env var and set it as global. cfg = client.Configuration.get_default_copy() cfg.proxy = os.getenv("HTTPS_PROXY") cfg.no_proxy = os.getenv("NO_PROXY", "") client.Configuration.set_default(cfg) + def test_rest_call_ignores_env(dummy_proxy, monkeypatch): # HTTPS_PROXY to dummy proxy monkeypatch.setenv("HTTPS_PROXY", "http://127.0.0.1:8888") # Avoid real HTTP request - monkeypatch.setattr(client.CoreV1Api, "list_namespace", lambda self, *_args, **_kwargs: None) + monkeypatch.setattr( + client.CoreV1Api, + "list_namespace", + lambda self, + *_args, + **_kwargs: None) # Load config using kubeconfig config.load_kube_config(config_file=os.environ["KUBECONFIG"]) apply_proxy_to_conf() @@ -158,17 +178,20 @@ def test_rest_call_ignores_env(dummy_proxy, monkeypatch): v1.list_namespace(_preload_content=False) assert not dummy_proxy.received_connect, "REST path should ignore HTTPS_PROXY" + def test_websocket_call_honors_env(dummy_proxy, monkeypatch): # set HTTPS_PROXY again monkeypatch.setenv("HTTPS_PROXY", "http://127.0.0.1:8888") # Load kubeconfig config.load_kube_config(config_file=os.environ["KUBECONFIG"]) apply_proxy_to_conf() - opts = websocket_proxycare({}, client.Configuration.get_default_copy(), None, None) + opts = websocket_proxycare( + {}, client.Configuration.get_default_copy(), None, None) assert opts.get('http_proxy_host') == '127.0.0.1' assert opts.get('http_proxy_port') == 8888 # Optionally verify no_proxy parsing assert opts.get('http_no_proxy') is None + if __name__ == '__main__': unittest.main() diff --git a/kubernetes/base/tox.ini b/kubernetes/base/tox.ini index 37a188f127..91cbf9b555 100644 --- a/kubernetes/base/tox.ini +++ b/kubernetes/base/tox.ini @@ -1,13 +1,21 @@ [tox] skipsdist = True envlist = - py3{5,6,7,8,9} - py3{5,6,7,8,9}-functional + py3{8,9} + py31{0,1,2,3,4,5} + py3{8,9}-functional + py31{0,1,2,3,4,5}-functional [testenv] -passenv = TOXENV CI TRAVIS TRAVIS_* +passenv = + TOXENV + CI + TRAVIS + TRAVIS_* +deps = + pytest commands = python -V - pip install pytest + pytest ./run_tox.sh pytest diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 44fbe49141..31d03bacd8 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -67,15 +67,18 @@ def iter_resp_lines(resp): buffer.extend(segment.encode("utf-8")) else: raise TypeError( - f"Received invalid segment type, {type(segment)}, from stream. Accepts only 'str' or 'bytes'.") + f"Received invalid segment type, { + type(segment)}, from stream. Accepts only 'str' or 'bytes'.") - # Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte) + # Split by newline (safe for utf-8 because multi-byte sequences cannot + # contain the newline byte) next_newline = buffer.find(b'\n') while next_newline != -1: - # Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character + # Convert bytes to a valid utf-8 string, replacing any invalid + # utf-8 with the '�' character line = buffer[:next_newline].decode( "utf-8", errors="replace") - buffer = buffer[next_newline+1:] + buffer = buffer[next_newline + 1:] if line: yield line else: @@ -83,7 +86,7 @@ def iter_resp_lines(resp): next_newline = buffer.find(b'\n') -class Watch(object): +class Watch: def __init__(self, return_type=None): self._raw_return_type = return_type @@ -121,9 +124,11 @@ def unmarshal_event(self, data, return_type): if js['type'] == 'BOOKMARK': # Extract and store resource_version from BOOKMARK event for # efficiency. No deserialization as event can be incomplete. - if isinstance(js['object'], dict) and 'metadata' in js['object']: + if isinstance(js['object'], + dict) and 'metadata' in js['object']: metadata = js['object']['metadata'] - if isinstance(metadata, dict) and 'resourceVersion' in metadata: + if isinstance(metadata, + dict) and 'resourceVersion' in metadata: self.resource_version = metadata['resourceVersion'] elif js['type'] != 'ERROR': obj = SimpleNamespace(data=json.dumps(js['raw_object'])) @@ -217,10 +222,10 @@ def stream(self, func, *args, **kwargs): retry_after_410 = False yield event else: - if line: + if line: yield line # Normal non-empty line - else: - yield '' # Only yield one empty line + else: + yield '' # Only yield one empty line if self._stop: break finally: diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index 07869095b6..73bff6afc1 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -99,7 +99,8 @@ def test_watch_with_interspersed_newlines(self): # Consume all test events from the mock service, stopping when no more data is available. # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is - # the only way to do so. Without that, the stream will re-read the test data forever. + # the only way to do so. Without that, the stream will re-read the test + # data forever. for e in w.stream(fake_api.get_namespaces, timeout_seconds=1): # Here added a statement for exception for empty lines. if e is None: @@ -118,10 +119,12 @@ def test_watch_with_multibyte_utf8(self): '{"type":"MODIFIED","object":{"data":{"utf-8":"© 1"},"metadata":{"name":"test1","resourceVersion":"1"}}}\n', # same copyright character expressed as bytes b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2\xA9 2"},"metadata":{"name":"test2","resourceVersion":"2"}}}\n' - # same copyright character with bytes split across two stream chunks + # same copyright character with bytes split across two stream + # chunks b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2', b'\xA9 3"},"metadata":{"n', - # more chunks of the same event, sent as a mix of bytes and strings + # more chunks of the same event, sent as a mix of bytes and + # strings 'ame":"test3","resourceVersion":"3"', '}}}', b'\n' @@ -136,7 +139,8 @@ def test_watch_with_multibyte_utf8(self): # Consume all test events from the mock service, stopping when no more data is available. # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is - # the only way to do so. Without that, the stream will re-read the test data forever. + # the only way to do so. Without that, the stream will re-read the test + # data forever. for event in w.stream(fake_api.get_configmaps, timeout_seconds=1): count += 1 self.assertEqual("MODIFIED", event['type']) @@ -182,14 +186,15 @@ def test_watch_with_invalid_utf8(self): # Consume all test events from the mock service, stopping when no more data is available. # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is - # the only way to do so. Without that, the stream will re-read the test data forever. + # the only way to do so. Without that, the stream will re-read the test + # data forever. for event in w.stream(fake_api.get_configmaps, timeout_seconds=1): count += 1 self.assertEqual("MODIFIED", event['type']) self.assertEqual("test%d" % count, event['object'].metadata.name) self.assertEqual("😄 %d" % count, event['object'].data["utf-8"]) # expect N replacement characters in test N - self.assertEqual("� %d".replace('�', '�'*count) % + self.assertEqual("� %d".replace('�', '�' * count) % count, event['object'].data["invalid"]) self.assertEqual(3, count) @@ -506,7 +511,7 @@ def test_watch_with_error_event_and_timeout_param(self): amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() - + @classmethod def setUpClass(cls): cls.api = Mock() @@ -514,29 +519,31 @@ def setUpClass(cls): def test_pod_log_empty_lines(self): pod_name = "demo-bug" - + try: self.api.create_namespaced_pod = Mock() self.api.read_namespaced_pod = Mock() self.api.delete_namespaced_pod = Mock() self.api.read_namespaced_pod_log = Mock() - #pod creating step + # pod creating step self.api.create_namespaced_pod.return_value = None - - #Checking pod status + + # Checking pod status mock_pod = Mock() mock_pod.status.phase = "Running" self.api.read_namespaced_pod.return_value = mock_pod - + # Printing at pod output - self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"]) + self.api.read_namespaced_pod_log.return_value = iter( + ["Hello from Docker\n"]) # Wait for the pod to reach 'Running' timeout = 60 start_time = time.time() while time.time() - start_time < timeout: - pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace) + pod = self.api.read_namespaced_pod( + name=pod_name, namespace=self.namespace) if pod.status.phase == "Running": break time.sleep(2) @@ -546,23 +553,27 @@ def test_pod_log_empty_lines(self): # Reading and streaming logs using Watch (mocked) w = Watch() log_output = [] - #Mock logs used for this test + # Mock logs used for this test w.stream = Mock(return_value=[ - "Hello from Docker", - "", - "", - "\n\n", - "Another log line", - "", - "\n", - "Final log" - ]) - for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): + "Hello from Docker", + "", + "", + "\n\n", + "Another log line", + "", + "\n", + "Final log" + ]) + for event in w.stream( + self.api.read_namespaced_pod_log, + name=pod_name, + namespace=self.namespace, + follow=True): log_output.append(event) print(event) # Print outputs - print(f"Captured logs: {log_output}") + print(f"Captured logs: {log_output}") # self.assertTrue(any("Hello from Docker" in line for line in log_output)) # self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") expected_log = [ @@ -575,15 +586,20 @@ def test_pod_log_empty_lines(self): "\n", "Final log" ] - - self.assertEqual(log_output, expected_log, "Captured logs do not match expected logs") + + self.assertEqual( + log_output, + expected_log, + "Captured logs do not match expected logs") except ApiException as e: self.fail(f"Kubernetes API exception: {e}") finally: - #checking pod is calling for delete - self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) - self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) + # checking pod is calling for delete + self.api.delete_namespaced_pod( + name=pod_name, namespace=self.namespace) + self.api.delete_namespaced_pod.assert_called_once_with( + name=pod_name, namespace=self.namespace) # Comment out the test below, it does not work currently. # def test_watch_with_deserialize_param(self): @@ -594,11 +610,11 @@ def test_pod_log_empty_lines(self): # fake_resp.close = Mock() # fake_resp.release_conn = Mock() # fake_resp.stream = Mock(return_value=[test_json + '\n']) -# +# # fake_api = Mock() # fake_api.get_namespaces = Mock(return_value=fake_resp) # fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' -# +# # # test case with deserialize=True # w = Watch() # for e in w.stream(fake_api.get_namespaces, deserialize=True): @@ -609,7 +625,7 @@ def test_pod_log_empty_lines(self): # self.assertEqual("1", e['object'].metadata.resource_version) # # Verify that the original object is saved # self.assertEqual(json.loads(test_json)['object'], e['raw_object']) -# +# # # test case with deserialize=False # w = Watch() # for e in w.stream(fake_api.get_namespaces, deserialize=False): @@ -618,13 +634,13 @@ def test_pod_log_empty_lines(self): # self.assertIsInstance(e['object'], dict) # self.assertEqual("test1", e['object']['metadata']['name']) # self.assertEqual("1", e['object']['metadata']['resourceVersion']) -# +# # # verify the api is called twice # fake_api.get_namespaces.assert_has_calls([ # call(_preload_content=False, watch=True), # call(_preload_content=False, watch=True) # ]) - + if __name__ == '__main__': unittest.main() diff --git a/kubernetes/e2e_test/__init__.py b/kubernetes/e2e_test/__init__.py index 19f5e722fb..0680747644 100644 --- a/kubernetes/e2e_test/__init__.py +++ b/kubernetes/e2e_test/__init__.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at diff --git a/kubernetes/e2e_test/test_apps.py b/kubernetes/e2e_test/test_apps.py index 2735ed3cba..f4f333a055 100644 --- a/kubernetes/e2e_test/test_apps.py +++ b/kubernetes/e2e_test/test_apps.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at diff --git a/kubernetes/e2e_test/test_batch.py b/kubernetes/e2e_test/test_batch.py index 9935df3135..bb2bbb63c7 100644 --- a/kubernetes/e2e_test/test_batch.py +++ b/kubernetes/e2e_test/test_batch.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at diff --git a/kubernetes/e2e_test/test_client.py b/kubernetes/e2e_test/test_client.py index 15689291e5..bd6faa507a 100644 --- a/kubernetes/e2e_test/test_client.py +++ b/kubernetes/e2e_test/test_client.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -19,9 +17,10 @@ import time import unittest import uuid -import six import io import gzip +import urllib.request as urllib_request +from http import HTTPStatus from kubernetes.client import api_client from kubernetes.client.api import core_v1_api @@ -30,12 +29,6 @@ from kubernetes.stream.ws_client import ERROR_CHANNEL from kubernetes.client.rest import ApiException -import six.moves.urllib.request as urllib_request - -if six.PY3: - from http import HTTPStatus -else: - import httplib def short_uuid(): @@ -88,8 +81,7 @@ def test_pod_apis(self): resp = api.read_namespaced_service_account(name='default', namespace='default') except ApiException as e: - if (six.PY3 and e.status != HTTPStatus.NOT_FOUND) or ( - six.PY3 is False and e.status != httplib.NOT_FOUND): + if e.status != HTTPStatus.NOT_FOUND: print('error: %s' % e) self.fail( msg="unexpected error getting default service account") @@ -220,8 +212,7 @@ def test_exit_code(self): resp = api.read_namespaced_service_account(name='default', namespace='default') except ApiException as e: - if (six.PY3 and e.status != HTTPStatus.NOT_FOUND) or ( - six.PY3 is False and e.status != httplib.NOT_FOUND): + if e.status != HTTPStatus.NOT_FOUND: print('error: %s' % e) self.fail( msg="unexpected error getting default service account") diff --git a/kubernetes/e2e_test/test_utils.py b/kubernetes/e2e_test/test_utils.py index b06c0a6b4a..433089efa1 100644 --- a/kubernetes/e2e_test/test_utils.py +++ b/kubernetes/e2e_test/test_utils.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at diff --git a/kubernetes/e2e_test/test_watch.py b/kubernetes/e2e_test/test_watch.py index 134e9c26fd..6879d6d350 100644 --- a/kubernetes/e2e_test/test_watch.py +++ b/kubernetes/e2e_test/test_watch.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at diff --git a/kubernetes/utils/__init__.py b/kubernetes/utils/__init__.py index c83d54fe76..92ab696782 100644 --- a/kubernetes/utils/__init__.py +++ b/kubernetes/utils/__init__.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import from .create_from_yaml import (FailToCreateError, create_from_dict, create_from_yaml, create_from_directory)