From 40325ebad366c33a0f1f8d0d1074c2c6b6830d9e Mon Sep 17 00:00:00 2001 From: Viktor Kurilko Date: Thu, 19 Mar 2026 18:47:20 +0700 Subject: [PATCH 1/5] Use use gpsubprocess instead of subprocess Decoding the results of the subcommands has negative consequences in the Python 2 environment. In the Python 2, decode returns a unicode string whereas previously it returned a regular string. This leads to unexpected errors when trying to convert a unicode string to a byte string if it contains non-ascii characters. This patch changes the approach. Instead of decoding the results of the subcommands, the Python 3 environment now uses text mode to work with subprocesses and files. In most uses of subprocess.Popen has been replaced with the already existing wrapper gpsubprocess.Popen. A number of improvements have been made to this end. It now use a text mode in the Python 3 environment. There is a new wrapper for check_output that also uses text mode for Python 3. The gpoperation mechanism has been reworked. Instead of sending raw pickled serialized data, this data is now sent in base64 format. This was done so that the data could be sent in text mode. In gpssh, the decoding of the result is only for Python 3. In Python 2, outputting unicode strings via print can lead to an error. --- gpMgmt/bin/gpcheckperf | 25 ++++++++-------- gpMgmt/bin/gpload.py | 7 +++-- gpMgmt/bin/gpload_test/gpload/TEST.py | 5 ++-- gpMgmt/bin/gpload_test/gpload2/TEST.py | 7 +++-- gpMgmt/bin/gpload_test/gpload2/TEST_REMOTE.py | 5 ++-- .../gpload_test/gpload2/TEST_local_base.py | 9 +++--- gpMgmt/bin/gpmemreport | 9 +++--- gpMgmt/bin/gpmemwatcher | 3 +- gpMgmt/bin/gppylib/commands/base.py | 8 +++-- gpMgmt/bin/gppylib/commands/gp.py | 3 +- gpMgmt/bin/gppylib/gpsubprocess.py | 30 ++++++++++++++++--- gpMgmt/bin/gppylib/operations/utils.py | 7 +++-- .../test/unit/test_unit_gpinitsystem.py | 15 +++++----- gpMgmt/bin/gppylib/util/ssh_utils.py | 5 +++- gpMgmt/bin/gpscp | 5 ++-- gpMgmt/bin/gpsd | 15 +++++----- gpMgmt/bin/gpssh-exkeys | 17 +++++------ gpMgmt/bin/minirepro | 13 ++++---- gpMgmt/sbin/gpconfig_helper.py | 4 +-- gpMgmt/sbin/gpoperation.py | 7 ++--- gpMgmt/sbin/packcore | 9 +++--- .../test/behave/mgmt_utils/steps/gpstart.py | 5 ++-- .../behave/mgmt_utils/steps/mgmt_utils.py | 11 +++---- .../steps/unreachable_hosts_mgmt_utils.py | 11 +++---- gpMgmt/test/behave_utils/cluster_expand.py | 7 +++-- .../gp_replica_check/gp_replica_check.py | 13 ++++---- src/backend/gporca/scripts/fix_mdps.py | 3 +- .../scripts/get_debug_event_counters.py | 3 +- src/test/isolation2/sql_isolation_testcase.py | 5 ++-- 29 files changed, 152 insertions(+), 114 deletions(-) diff --git a/gpMgmt/bin/gpcheckperf b/gpMgmt/bin/gpcheckperf index a9416f09ef73..b688e16a4d94 100755 --- a/gpMgmt/bin/gpcheckperf +++ b/gpMgmt/bin/gpcheckperf @@ -50,6 +50,7 @@ try: from gppylib.util import ssh_utils from gppylib.commands import unix from gppylib.commands.base import * + from gppylib import gpsubprocess except ImportError as e: sys.exit('Error: unable to import module: ' + str(e)) @@ -107,10 +108,10 @@ def gpssh(cmd, verbose): if GV.opt['-v']: print('[Info]', strcmd(c)) - p = subprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + p = gpsubprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out = p.stdout.read(-1) rc = p.wait() - return not rc, out.decode('utf-8') + return not rc, out def gpscp(src, dst): @@ -128,8 +129,8 @@ def gpscp(src, dst): c.append(dst) if GV.opt['-v']: print('[Info]', strcmd(c)) - p = subprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - out = p.stdout.read(-1).decode('utf-8') + p = gpsubprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + out = p.stdout.read(-1) rc = p.wait() if rc: sys.exit('[Error] command failed: gpscp {0} =:{1} with output: {2}'.format(src, dst, out)) @@ -139,7 +140,7 @@ def run_cmd(cmd, peer): """ To create subprocess on specified host with given command """ - proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = gpsubprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) proc.x_cmd = cmd proc.x_peer = peer return proc @@ -166,7 +167,7 @@ def gpsync(src, dst): for p in proc: for line in p.stdout.readlines(): - print('[Out {0}] {1}' .format(p.x_peer, line.decode('utf-8'))) + print('[Out {0}] {1}' .format(p.x_peer, line)) rc = p.wait() if rc: sys.exit('[Error] command failed for host:{0} cmd:{1} with status:{2} error: "{3}"' @@ -352,8 +353,8 @@ def run(cmd): try: if GV.opt['-v']: print('[Info]', cmd) - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) - out = p.stdout.read(-1).decode('utf-8') + p = gpsubprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + out = p.stdout.read(-1) ok = p.wait() except KeyboardInterrupt: raise @@ -604,7 +605,7 @@ def spawnNetperfTestBetween(x, y, netperf_path, netserver_port, sec=5): try: if GV.opt['-v']: print('[Info]', strcmd(c)) - proc = subprocess.Popen(c, stdout=subprocess.PIPE) + proc = gpsubprocess.Popen(c, stdout=subprocess.PIPE) except KeyboardInterrupt: killProc(proc) raise @@ -621,7 +622,7 @@ def reapNetperfTest(proc, x, y): if proc: try: rc = proc.wait() - out = proc.stdout.read(-1).decode('utf-8') + out = proc.stdout.read(-1) ok = not killProc(proc) proc = None except KeyboardInterrupt as ki: @@ -827,8 +828,8 @@ def get_host_map(hostlist): proc = None try: if GV.opt['-v']: print('[Info]', strcmd(cmd)) - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - out = proc.stdout.read(-1).decode('utf-8') + proc = gpsubprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + out = proc.stdout.read(-1) rc = proc.wait() if rc: raise Exception('ssh error with the following command:\n%s with output: %s' % (cmd, out)) diff --git a/gpMgmt/bin/gpload.py b/gpMgmt/bin/gpload.py index 1a5b9c84b1ba..09657463cd5c 100755 --- a/gpMgmt/bin/gpload.py +++ b/gpMgmt/bin/gpload.py @@ -65,6 +65,7 @@ try: from gppylib.gpversion import GpVersion + from gppylib import gpsubprocess except ImportError: sys.stderr.write("gpload can't import gpversion, will run in GPDB6 compatibility mode.\n") withGpVersion = False @@ -1710,7 +1711,7 @@ def start_gpfdists(self): cmd += ' '.join(popenList) needshell = True - a = subprocess.Popen(cmd, stdout=subprocess.PIPE, + a = gpsubprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=cfds, shell=needshell) self.subprocesses.append(a) @@ -1731,7 +1732,7 @@ def start_gpfdists(self): while 1: readLock.acquire() - line = a.stdout.readline().decode('utf-8') + line = a.stdout.readline() readLock.release() if not line: self.log(self.ERROR,'failed to start gpfdist: ' + @@ -3072,7 +3073,7 @@ def stop_gpfdists(self): if platform.system() in ['Windows', 'Microsoft']: # win32 API is better but hard for us # to install, so we use the crude method - subprocess.Popen("taskkill /F /T /PID %i" % a.pid, + gpsubprocess.Popen("taskkill /F /T /PID %i" % a.pid, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/gpMgmt/bin/gpload_test/gpload/TEST.py b/gpMgmt/bin/gpload_test/gpload/TEST.py index f727fe935e34..1cef75acdc61 100755 --- a/gpMgmt/bin/gpload_test/gpload/TEST.py +++ b/gpMgmt/bin/gpload_test/gpload/TEST.py @@ -17,6 +17,7 @@ except: import subprocess from pygresql import pg +from gppylib import gpsubprocess """ Global Values @@ -149,8 +150,8 @@ def run(cmd): function, so you can theoretically pass any value that is valid for the second parameter of open(). """ - p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) - out = p.communicate()[0].decode('utf-8') + p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + out = p.communicate()[0] ret = [] ret.append(out) rc = False if p.wait() else True diff --git a/gpMgmt/bin/gpload_test/gpload2/TEST.py b/gpMgmt/bin/gpload_test/gpload2/TEST.py index 173d52375bc3..62e41e2ed052 100755 --- a/gpMgmt/bin/gpload_test/gpload2/TEST.py +++ b/gpMgmt/bin/gpload_test/gpload2/TEST.py @@ -16,6 +16,7 @@ except: import subprocess from pygresql import pg +from gppylib import gpsubprocess def get_port_from_conf(): file = os.environ.get('MASTER_DATA_DIRECTORY')+'/postgresql.conf' @@ -305,8 +306,8 @@ def run(cmd): function, so you can theoretically pass any value that is valid for the second parameter of open(). """ - p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) - out = p.communicate()[0].decode('utf-8') + p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + out = p.communicate()[0] ret = [] ret.append(out) rc = False if p.wait() else True @@ -382,7 +383,7 @@ def modify_sql_file(num): def copy_data(source='',target=''): cmd = 'cp '+ mkpath('data/' + source) + ' ' + mkpath(target) - p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) return p.communicate() hostNameAddrs = get_ip(HOST) diff --git a/gpMgmt/bin/gpload_test/gpload2/TEST_REMOTE.py b/gpMgmt/bin/gpload_test/gpload2/TEST_REMOTE.py index 63af6cbf9156..0ab9c2c80c82 100755 --- a/gpMgmt/bin/gpload_test/gpload2/TEST_REMOTE.py +++ b/gpMgmt/bin/gpload_test/gpload2/TEST_REMOTE.py @@ -14,6 +14,7 @@ import subprocess32 as subprocess except: import subprocess +from gppylib import gpsubprocess from shutil import copyfile from pygresql import pg @@ -208,8 +209,8 @@ def run(cmd): function, so you can theoretically pass any value that is valid for the second parameter of open(). """ - p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) - out = p.communicate()[0].decode('utf-8') + p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + out = p.communicate()[0] ret = [] ret.append(out) rc = False if p.wait() else True diff --git a/gpMgmt/bin/gpload_test/gpload2/TEST_local_base.py b/gpMgmt/bin/gpload_test/gpload2/TEST_local_base.py index 3d3738d1e161..96770766f265 100644 --- a/gpMgmt/bin/gpload_test/gpload2/TEST_local_base.py +++ b/gpMgmt/bin/gpload_test/gpload2/TEST_local_base.py @@ -20,7 +20,7 @@ string_types = basestring # from gppylib.commands.gp import get_coordinatordatadir - +from gppylib import gpsubprocess try: import subprocess32 as subprocess except ImportError: @@ -86,8 +86,8 @@ def run(cmd): function, so you can theoretically pass any value that is valid for the second parameter of open(). """ - p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) - out = p.communicate()[0].decode('utf-8') + p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + out = p.communicate()[0] ret = [] ret.append(out) rc = False if p.wait() else True @@ -449,9 +449,8 @@ def modify_sql_file(num): def copy_data(source='',target=''): cmd = 'cp '+ mkpath('data/' + source) + ' ' + mkpath(target) - p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) _, err = p.communicate() - err = err.decode('utf-8') if err: sys.stderr.write(str(err)) sys.exit(2) diff --git a/gpMgmt/bin/gpmemreport b/gpMgmt/bin/gpmemreport index cffb7f57ce48..aaa5c4f6acd3 100755 --- a/gpMgmt/bin/gpmemreport +++ b/gpMgmt/bin/gpmemreport @@ -378,11 +378,11 @@ def parsePSOutput(psLines): def getNextSample(file, start, end): - line = file.readline().decode('utf-8') + line = file.readline() if not line: return None while line == '\n': - line = file.readline().decode('utf-8') + line = file.readline() print(line) if not line.startswith('>>>'): raise Exception("Processing file failed - did not find timestamp where expected") @@ -390,11 +390,10 @@ def getNextSample(file, start, end): process_this_listing = (start is None or timestamp >= start) and (end is None or timestamp <= end) - header = file.readline().decode('utf-8').split() + header = file.readline().split() n_cols = len(header) rows = [] for line in file: - line = line.decode('utf-8') if line == '\n': break if process_this_listing: @@ -428,7 +427,7 @@ if __name__ == '__main__': (options, args) = parseCmdLine() start = datetime.strptime(options.start_time, datetime_format) if options.start_time else None end = datetime.strptime(options.end_time, datetime_format) if options.end_time else None - f = gzip.open(args[0], 'r') + f = gzip.open(args[0], 'rt') try: sample = getNextSample(f, start, end) while sample: diff --git a/gpMgmt/bin/gpmemwatcher b/gpMgmt/bin/gpmemwatcher index ae2a6e1a600f..464df6d86637 100755 --- a/gpMgmt/bin/gpmemwatcher +++ b/gpMgmt/bin/gpmemwatcher @@ -24,6 +24,7 @@ import time from optparse import OptionParser from datetime import datetime from gppylib.commands import gp +from gppylib import gpsubprocess pidfile = 'reswatch.pid' ps_file = 'ps.out.gz' @@ -98,7 +99,7 @@ def runPSProcess(sleepInt=60): open_files.append(outfile) while True: outfile.write(datetime.now().strftime("\n\n>>>%y:%m:%d:%H:%M:%S<<<\n").encode('utf-8')) - cmd = subprocess.Popen(ps_cmd, shell=True, stdout=subprocess.PIPE) + cmd = gpsubprocess.Popen(ps_cmd, shell=True, stdout=subprocess.PIPE) outfile.writelines(cmd.stdout) outfile.flush() time.sleep(sleepInt) diff --git a/gpMgmt/bin/gppylib/commands/base.py b/gpMgmt/bin/gppylib/commands/base.py index d453dc72eb3d..d9087cfde6b9 100755 --- a/gpMgmt/bin/gppylib/commands/base.py +++ b/gpMgmt/bin/gppylib/commands/base.py @@ -460,11 +460,11 @@ def execute(self, cmd, wait=True): cmd.pid = self.proc.pid if wait: (rc, stdout_value, stderr_value) = self.proc.communicate2(input=self.stdin) - assert isinstance(stdout_value, bytes) - assert isinstance(stderr_value, bytes) + assert isinstance(stdout_value, str) + assert isinstance(stderr_value, str) self.completed = True cmd.set_results(CommandResult( - rc, stdout_value.decode('utf-8'), stderr_value.decode('utf-8'), self.completed, self.halt)) + rc, stdout_value, stderr_value, self.completed, self.halt)) def cancel(self): if self.proc: @@ -532,6 +532,8 @@ class Command(object): propagate_env_map = {} # specific environment variables for this command instance def __init__(self, name, cmdStr, ctxt=LOCAL, remoteHost=None, stdin=None, gphome=None): + assert stdin is None or isinstance(stdin, str) + self.name = name self.cmdStr = cmdStr self.exec_context = createExecutionContext(ctxt, remoteHost, stdin=stdin, diff --git a/gpMgmt/bin/gppylib/commands/gp.py b/gpMgmt/bin/gppylib/commands/gp.py index 39f7851aa076..cebc46538495 100644 --- a/gpMgmt/bin/gppylib/commands/gp.py +++ b/gpMgmt/bin/gppylib/commands/gp.py @@ -28,6 +28,7 @@ from .unix import * from gppylib import pgconf from gppylib.utils import writeLinesToFile, createFromSingleHostFile, shellEscape +from gppylib import gpsubprocess logger = get_default_logger() @@ -1679,7 +1680,7 @@ def list_addrs(hostname=None, include_loopback=False): else: args = cmd - result = subprocess.check_output(args).decode('utf-8') + result = gpsubprocess.check_output(args) return result.split('START_CMD_OUTPUT\n')[1].splitlines() if __name__ == '__main__': diff --git a/gpMgmt/bin/gppylib/gpsubprocess.py b/gpMgmt/bin/gppylib/gpsubprocess.py index 0f1dc424fe35..e9a021e36475 100644 --- a/gpMgmt/bin/gppylib/gpsubprocess.py +++ b/gpMgmt/bin/gppylib/gpsubprocess.py @@ -25,6 +25,7 @@ except: import subprocess from gppylib import gplog +import sys logger=gplog.get_default_logger() @@ -35,7 +36,11 @@ class Popen(subprocess.Popen): cancelRequested=False - + def __init__(self, *args, **kwargs): + if sys.version_info[0] == 3: + kwargs['text'] = True + super(Popen, self).__init__(*args, **kwargs) + def communicate2(self, timeout=2,input=None): """ An extension to communicate() that allows for external cancels to abort processing. @@ -70,6 +75,10 @@ def communicate2(self, timeout=2,input=None): self._finish_read_files(timeout,output,error) (resout,reserr)=self._postprocess_outputs(output,error) + if sys.version_info[0] == 3 and isinstance(resout, bytes): + resout = resout.decode('utf-8') + if sys.version_info[0] == 3 and isinstance(reserr, bytes): + reserr = reserr.decode('utf-8') return (self.returncode,resout,reserr) @@ -105,11 +114,19 @@ def _postprocess_outputs(self,output,error): # object do the translation: It is based on stdio, which is # impossible to combine with select (unless forcing no # buffering). - if self.universal_newlines and hasattr(file, 'newlines'): + if self.universal_newlines and (sys.version_info[0] == 3 or hasattr(file, 'newlines')): + if sys.version_info[0] == 3: + kargs = { + "encoding": "utf-8", + "errors": "strict", + } + else: + kargs = {} + if output: - output = self._translate_newlines(output) + output = self._translate_newlines(output, **kargs) if error: - error = self._translate_newlines(error) + error = self._translate_newlines(error, **kargs) return (output,error) @@ -231,3 +248,8 @@ def __select (self, iwtd, owtd, ewtd, timeout=None): return ([],[],[]) else: raise + +def check_output(*popenargs, **kwargs): + if sys.version_info[0] == 3: + kwargs['text'] = True + return subprocess.check_output(*popenargs, **kwargs) diff --git a/gpMgmt/bin/gppylib/operations/utils.py b/gpMgmt/bin/gppylib/operations/utils.py index e84bc4b894cf..5558d2fdfc70 100644 --- a/gpMgmt/bin/gppylib/operations/utils.py +++ b/gpMgmt/bin/gppylib/operations/utils.py @@ -46,10 +46,11 @@ def __init__(self, operation, host, msg_ctx=""): def execute(self): execname = os.path.split(sys.argv[0])[-1] - pickled_execname = pickle.dumps(execname) - pickled_operation = pickle.dumps(self.operation) + pickled_data = base64.urlsafe_b64encode(pickle.dumps((execname, self.operation))) + if sys.version_info[0] == 3: + pickled_data = pickled_data.decode('ascii') cmd = Command('pickling an operation', 'echo "START_CMD_OUTPUT"; $GPHOME/sbin/gpoperation.py', - ctxt=REMOTE, remoteHost=self.host, stdin = pickled_execname + pickled_operation) + ctxt=REMOTE, remoteHost=self.host, stdin = pickled_data) cmd.run(validateAfter=True) msg = "Output on host %s: %s" % (self.host, cmd.get_results().stdout) if self.msg_ctx: diff --git a/gpMgmt/bin/gppylib/test/unit/test_unit_gpinitsystem.py b/gpMgmt/bin/gppylib/test/unit/test_unit_gpinitsystem.py index 517e01827211..df4ba75105ee 100644 --- a/gpMgmt/bin/gppylib/test/unit/test_unit_gpinitsystem.py +++ b/gpMgmt/bin/gppylib/test/unit/test_unit_gpinitsystem.py @@ -6,7 +6,8 @@ try: from subprocess32 import Popen, PIPE except: - from subprocess import Popen, PIPE + from subprocess import PIPE +from gppylib.gpsubprocess import Popen class GpInitSystemTest(GpTestCase): def setUp(self): @@ -15,37 +16,37 @@ def setUp(self): def test_option_cluster_configfile_and_input_configfile_should_error(self): p = Popen([self.gpinitsystem_path, '-c', 'cluster_configfile', '-I', 'input_configfile'], stdout=PIPE) - output = p.stdout.read().decode('utf-8') + output = p.stdout.read() self.assertIn("[FATAL]:-Options [-c] and [-I] cannot be used at the same time.", output) self.assertNotIn("Creates a new Greengage Database instance", output) self.assertNotIn("Initializes a Greengage Database system by using configuration", output) def test_option_without_neither_cluster_configfile_and_input_configfile_should_error(self): p = Popen([self.gpinitsystem_path], stdout=PIPE) - output = p.stdout.read().decode('utf-8') + output = p.stdout.read() self.assertIn("[FATAL]:-At least one of two options, [-c] or [-I], is required", output) self.assertNotIn("Creates a new Greengage Database instance", output) self.assertNotIn("Initializes a Greengage Database system by using configuration", output) def test_option_with_input_configfile_should_work(self): p = Popen([self.gpinitsystem_path, '-I', 'input_configfile'], stdout=PIPE) - output = p.stdout.read().decode('utf-8') + output = p.stdout.read() self.assertIn("[INFO]:-Checking configuration parameters, please wait...", output) def test_option_with_cluster_configfile_should_work(self): p = Popen([self.gpinitsystem_path, '-c', 'cluster_configfile'], stdout=PIPE) - output = p.stdout.read().decode('utf-8') + output = p.stdout.read() self.assertIn("[INFO]:-Checking configuration parameters, please wait...", output) def test_option_help_prints_docs_usage(self): p = Popen([self.gpinitsystem_path, '--help'], stdout=PIPE) - output = p.stdout.read().decode('utf-8') + output = p.stdout.read() self.assertIn("Initializes a Greengage Database system by using configuration", output) self.assertNotIn("Creates a new Greengage Database instance", output) def test_invalid_option_prints_raw_usage(self): p = Popen([self.gpinitsystem_path, '--unknown-option'], stdout=PIPE) - output = p.stdout.read().decode('utf-8') + output = p.stdout.read() self.assertIn("[ERROR]:-Unknown option --unknown-option", output) self.assertIn("Creates a new Greengage Database instance", output) self.assertNotIn("Initializes a Greengage Database system by using configuration", output) diff --git a/gpMgmt/bin/gppylib/util/ssh_utils.py b/gpMgmt/bin/gppylib/util/ssh_utils.py index 09b562a2c9d1..cee4e8781977 100644 --- a/gpMgmt/bin/gppylib/util/ssh_utils.py +++ b/gpMgmt/bin/gppylib/util/ssh_utils.py @@ -302,7 +302,10 @@ def executeCommand(self, command): for s in self.pxssh_list: # Split the output into an array of lines so that we can add text to the beginning of # each line - output = s.before.decode('utf-8').split('\n') + output_raw = s.before + if sys.version_info[0] == 3: + output_raw = output_raw.decode('utf-8') + output = output_raw.split('\n') output = output[1:-1] commandoutput.append(output) diff --git a/gpMgmt/bin/gpscp b/gpMgmt/bin/gpscp index d9243e660cd8..3200fd6740c8 100755 --- a/gpMgmt/bin/gpscp +++ b/gpMgmt/bin/gpscp @@ -19,6 +19,7 @@ from __future__ import print_function # disable deprecationwarnings from builtins import object import warnings +from gppylib import gpsubprocess warnings.simplefilter('ignore', DeprecationWarning) @@ -108,7 +109,7 @@ def parseCommandLine(): ############# def run(cmd, peer): if GV.opt['-v']: print('[INFO]', cmd) - p = subprocess.Popen(cmd, + p = gpsubprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -151,7 +152,7 @@ try: errmsg = None for p in proc: for out in p.stdout: - print('[OUT %s] %s' % (p.x_peer, out.decode('utf-8'))) + print('[OUT %s] %s' % (p.x_peer, out)) status = p.wait() if status: errmsg = '[ERROR %s] exit %d, cmd - %s' % (p.x_peer, status, p.x_cmd) diff --git a/gpMgmt/bin/gpsd b/gpMgmt/bin/gpsd index 6b376fb2ce3d..d4d058e8be4e 100755 --- a/gpMgmt/bin/gpsd +++ b/gpMgmt/bin/gpsd @@ -13,6 +13,7 @@ import sys from contextlib import closing from optparse import OptionParser from pygresql import pgdb +from gppylib import gpsubprocess if sys.version_info[0] == 3: string_types = str @@ -37,11 +38,11 @@ def ResultIter(cursor, arraysize=1000): def getVersion(envOpts): - cmd = subprocess.Popen('psql --pset footer -Atqc "select version()" template1', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=envOpts) + cmd = gpsubprocess.Popen('psql --pset footer -Atqc "select version()" template1', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=envOpts) if cmd.wait() != 0: - sys.stderr.write('\nError while trying to find GPDB version.\n\n' + cmd.communicate()[1].decode('utf-8') + '\n\n') + sys.stderr.write('\nError while trying to find GPDB version.\n\n' + cmd.communicate()[1] + '\n\n') sys.exit(1) - return cmd.communicate()[0].decode('utf-8') + return cmd.communicate()[0] def get_num_segments(cursor): query = "select count(*) from gp_segment_configuration where role='p' and content >=0;" @@ -55,17 +56,17 @@ def get_num_segments(cursor): def dumpSchema(connectionInfo, envOpts): dmp_cmd = 'pg_dump -h %s -p %s -U %s -s -x --gp-syntax -O %s' % connectionInfo - p = subprocess.Popen(dmp_cmd, shell=True, stderr=subprocess.PIPE, env=envOpts) + p = gpsubprocess.Popen(dmp_cmd, shell=True, stderr=subprocess.PIPE, env=envOpts) if p.wait() != 0: - sys.stderr.write('\nError while dumping schema.\n\n' + p.communicate()[1].decode('utf-8') + '\n\n') + sys.stderr.write('\nError while dumping schema.\n\n' + p.communicate()[1] + '\n\n') sys.exit(1) def dumpGlobals(connectionInfo, envOpts): dmp_cmd = 'pg_dumpall -h %s -p %s -U %s -l %s -g --no-gp-syntax' % connectionInfo - p = subprocess.Popen(dmp_cmd, shell=True, stderr=subprocess.PIPE, env=envOpts) + p = gpsubprocess.Popen(dmp_cmd, shell=True, stderr=subprocess.PIPE, env=envOpts) if p.wait() != 0: - sys.stderr.write('\nError while dumping globals.\n\n' + p.communicate()[1].decode('utf-8') + '\n\n') + sys.stderr.write('\nError while dumping globals.\n\n' + p.communicate()[1] + '\n\n') sys.exit(1) diff --git a/gpMgmt/bin/gpssh-exkeys b/gpMgmt/bin/gpssh-exkeys index 2572627a2d17..e7acbe08aee7 100755 --- a/gpMgmt/bin/gpssh-exkeys +++ b/gpMgmt/bin/gpssh-exkeys @@ -53,6 +53,7 @@ try: from gppylib.commands import unix from gppylib.util import ssh_utils from gppylib.gpparseopts import OptParser + from gppylib import gpsubprocess except ImportError as e: sys.exit('Error: unable to import module: ' + str(e)) @@ -147,10 +148,8 @@ class Host(object): args = ['ssh', self.m_host, '-o', 'BatchMode=yes', '-o', 'StrictHostKeyChecking=yes', '-n', cmd] - p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = gpsubprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() - stdout = stdout.decode('utf-8') - stderr = stderr.decode('utf-8') if GV.opt['-v']: print('[INFO %s]: exit status=%s' % (self.m_host, p.returncode)) if stdout: @@ -174,8 +173,8 @@ class Host(object): # Grab the tar stream from stdout with open(os.path.join(tempDir, '%s.tar' % self.m_host), 'wb') as tarfile: - p = subprocess.Popen(args, stdout=tarfile, stderr=subprocess.PIPE) - stderr = p.communicate()[1].decode('utf-8') + p = gpsubprocess.Popen(args, stdout=tarfile, stderr=subprocess.PIPE) + stderr = p.communicate()[1] if p.returncode: print(('[WARNING %s] cannot fetch existing authentication files: tar rc=%s;' @@ -530,8 +529,8 @@ try: # Ensure the local host can password-less ssh into each remote host for remoteHost in GV.allHosts: cmd = ['ssh', remoteHost.host(), '-o', 'BatchMode=yes', '-o', 'StrictHostKeyChecking=yes', 'true'] - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stderr = p.communicate()[1].decode('utf-8') + p = gpsubprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stderr = p.communicate()[1] if p.returncode: print('[ERROR]: Failed to ssh to %s. %s' % (remoteHost.host(), stderr), file=sys.stderr) print('[ERROR]: Expected passwordless ssh to host %s' % remoteHost.host(), file=sys.stderr) @@ -657,8 +656,8 @@ try: os.mkdir(hostDir) cmd = 'cd %s && tar xf %s' % (hostDir, tarfileName) if GV.opt['-v']: print('[INFO %s]: %s' % (h.host(), cmd)) - tarproc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - tarerr = tarproc.communicate()[1].decode('utf-8') + tarproc = gpsubprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + tarerr = tarproc.communicate()[1] if tarproc.returncode != 0: print('[WARNING %s] cannot extract SSH files;' % h.host(), file=sys.stderr) for line in tarerr.splitlines(): diff --git a/gpMgmt/bin/minirepro b/gpMgmt/bin/minirepro index 5e05c67c1785..e12abe5ff825 100755 --- a/gpMgmt/bin/minirepro +++ b/gpMgmt/bin/minirepro @@ -67,6 +67,7 @@ import os, sys, re, json, platform, subprocess from optparse import OptionParser from pygresql import pgdb from datetime import datetime +from gppylib import gpsubprocess if sys.version_info[0] == 3: string_types = str @@ -155,19 +156,19 @@ def dump_query(connectionInfo, query_file): query_cmd = "psql %s --pset footer --no-psqlrc -Atq -h %s -p %s -U %s -f %s" % (db, host, port, user, toolkit_sql) print(query_cmd) - p = subprocess.Popen(query_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ) + p = gpsubprocess.Popen(query_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ) if p.wait() != 0: - errormsg = p.communicate()[1].decode('utf-8') + errormsg = p.communicate()[1] sys.stderr.writelines('\nError when executing function gp_dump_query_oids.\n\n' + errormsg + '\n\n') sys.exit(1) outmsg, errormsg = p.communicate() if errormsg: - errormsg = errormsg.decode('utf-8') + errormsg = errormsg sys.stderr.writelines('\nError when executing function gp_dump_query_oids.\n\n' + errormsg + '\n\n') sys.exit(1) - return outmsg.decode('utf-8') + return outmsg # relation and function oids will be extracted from the dump string def parse_oids(cursor, json_oids): @@ -204,9 +205,9 @@ def pg_dump_object(mr_query, connectionInfo, envOpts): dmp_cmd = "%s --relation-oids %s --function-oids %s -f %s" % \ (dmp_cmd, mr_query.relids, mr_query.funcids, E(out_file)) print(dmp_cmd) - p = subprocess.Popen(dmp_cmd, shell=True, stderr=subprocess.PIPE, env=envOpts) + p = gpsubprocess.Popen(dmp_cmd, shell=True, stderr=subprocess.PIPE, env=envOpts) if p.wait() != 0: - sys.stderr.write('\nError while dumping schema.\n\n' + p.communicate()[1].decode('utf-8') + '\n\n') + sys.stderr.write('\nError while dumping schema.\n\n' + p.communicate()[1] + '\n\n') sys.exit(1) def dump_tuple_count(cur, oid_str, f_out): diff --git a/gpMgmt/sbin/gpconfig_helper.py b/gpMgmt/sbin/gpconfig_helper.py index 9e25710517ad..4519a811e65a 100755 --- a/gpMgmt/sbin/gpconfig_helper.py +++ b/gpMgmt/sbin/gpconfig_helper.py @@ -135,10 +135,8 @@ def add_parameter(filename, name, value): # NOTE: though apparently not documented, postgresQL returns the last valid value def get_parameter(filename, name): - with open(filename, 'rb') as f: + with open(filename, 'rt') as f: for line in reversed(f.readlines()): - if sys.version_info[0] == 3: - line = line.decode('utf-8') parts = line.split("=", 1) if len(parts) > 1 and parts[0].lstrip().startswith(name): return parts[1].strip() diff --git a/gpMgmt/sbin/gpoperation.py b/gpMgmt/sbin/gpoperation.py index e790fb559f8a..ff59ea5f5203 100755 --- a/gpMgmt/sbin/gpoperation.py +++ b/gpMgmt/sbin/gpoperation.py @@ -7,10 +7,10 @@ import base64 if sys.version_info[0] == 2: - stdin = sys.stdin + stdin = sys.stdin.read() stdout = sys.stdout else: - stdin = sys.stdin.buffer + stdin = sys.stdin.buffer.read() stdout = sys.stdout.buffer class NullDevice(object): @@ -32,11 +32,10 @@ def flush(self): hostname = unix.getLocalHostname() username = unix.getUserName() -execname = pickle.load(stdin) +execname, operation = pickle.loads(base64.urlsafe_b64decode(stdin)) gplog.setup_tool_logging(execname, hostname, username) logger = gplog.get_default_logger() -operation = pickle.load(stdin) try: ret = operation.run() diff --git a/gpMgmt/sbin/packcore b/gpMgmt/sbin/packcore index 98f0374d2a64..1c7a16626084 100755 --- a/gpMgmt/sbin/packcore +++ b/gpMgmt/sbin/packcore @@ -11,8 +11,8 @@ import shutil import stat import sys from optparse import OptionParser -from subprocess import Popen, PIPE, STDOUT - +from subprocess import PIPE, STDOUT +from gppylib.gpsubprocess import Popen def _getPlatformInfo(): if which('lsb_release') is None: @@ -30,7 +30,7 @@ def _getPlatformInfo(): def _getFileInfo(coreFile): cmd = Popen(['/usr/bin/file', '--version'], stdout=PIPE, stderr=STDOUT) - fileVersion = cmd.communicate()[0].split()[0].strip().decode('utf-8') + fileVersion = cmd.communicate()[0].split()[0].strip() # file allow setting parameters from command line from version 5.21, refer: # https://github.com/file/file/commit/6ce24f35cd4a43c4bdd249e8e0c4952c1f8eac67 # Set ELF program sections processed for core files to suppres "too many @@ -40,7 +40,7 @@ def _getFileInfo(coreFile): opts += ['-P', 'elf_phnum=2048'] opts += [coreFile] cmd = Popen(opts, stdout=PIPE) - return cmd.communicate()[0].decode('utf-8') + return cmd.communicate()[0] def _isCore(fileCmdOutput): @@ -147,7 +147,6 @@ def _getLibraryListWithGDB(coreFile, binary): # path strings in following lines. header = False for line in result.splitlines(): - line = line.decode('utf-8') if header: begin = line.find(os.path.sep) if begin >= 0: diff --git a/gpMgmt/test/behave/mgmt_utils/steps/gpstart.py b/gpMgmt/test/behave/mgmt_utils/steps/gpstart.py index 402e075f13e7..e80553be2fd8 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/gpstart.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/gpstart.py @@ -9,6 +9,7 @@ from test.behave_utils.utils import wait_for_unblocked_transactions from gppylib.commands.base import Command from gppylib.db import dbconn +from gppylib import gpsubprocess def _run_sql(sql, opts=None): env = None @@ -81,7 +82,7 @@ def impl(context, cmd): Runs `yes | cmd`. """ - p = subprocess.Popen( + p = gpsubprocess.Popen( ["bash", "-c", "yes | %s" % cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -89,8 +90,6 @@ def impl(context, cmd): ) context.stdout_message, context.stderr_message = p.communicate() - context.stdout_message = context.stdout_message.decode('utf-8') - context.stderr_message = context.stderr_message.decode('utf-8') context.ret_code = p.returncode diff --git a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py index 4d30c08ba47c..76e6adf5f29d 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py @@ -21,10 +21,11 @@ import time from contextlib import closing try: - from subprocess32 import check_output, Popen, PIPE + from subprocess32 import PIPE except: - from subprocess import check_output, Popen, PIPE -import subprocess + from subprocess import PIPE +from gppylib import gpsubprocess +from gppylib.gpsubprocess import Popen, check_output from collections import defaultdict import psutil @@ -4101,7 +4102,7 @@ def impl(context, command, input): if input == "no mode but presses enter": input = os.linesep p = Popen(command.split(), stdout=PIPE, stdin=PIPE, stderr=PIPE) - stdout, stderr = p.communicate(input=input.encode('utf-8')) + stdout, stderr = p.communicate(input=input) p.stdin.close() @@ -4117,7 +4118,7 @@ def impl(context, command, input): time.sleep(120) # interrupt the process. p.terminate() - p.communicate(input=input.encode()) + p.communicate(input=input) def are_on_different_subnets(primary_hostname, mirror_hostname): diff --git a/gpMgmt/test/behave/mgmt_utils/steps/unreachable_hosts_mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/unreachable_hosts_mgmt_utils.py index 80d20f7daf73..be20114fdbae 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/unreachable_hosts_mgmt_utils.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/unreachable_hosts_mgmt_utils.py @@ -8,6 +8,7 @@ from gppylib.gparray import GpArray from gppylib.db import dbconn from gppylib.commands.base import Command, REMOTE +from gppylib import gpsubprocess # These utilities are intended to help various Behave tests handle disconnecting # and reconnecting hosts from the current GPDB cluster. They are not intended to be @@ -75,8 +76,8 @@ def add_or_remove_blackhole_route(disconnected_hosts, spare_hosts, disconnect=Fa def _blackhole_route_helper(disconnect_host, hosts, disconnect=False): cmd = "cat /etc/hosts | grep {} | head -1 ".format(disconnect_host) cmd += "| awk '{print $1}'" - disconnect_addr = subprocess.check_output(["bash", "-c", cmd]) - disconnect_addr = disconnect_addr.decode('utf-8').strip() + disconnect_addr = gpsubprocess.check_output(["bash", "-c", cmd]) + disconnect_addr = disconnect_addr.strip() for host in hosts: if host == disconnect_host: @@ -87,7 +88,7 @@ def _blackhole_route_helper(disconnect_host, hosts, disconnect=False): subcmd = "delete" cmd = "sudo ip route {} {}".format(subcmd, disconnect_addr) - subprocess.check_output(["ssh", host, cmd]) + gpsubprocess.check_output(["ssh", host, cmd]) @given('all postgres processes are killed on "{disconnected}" hosts') @then('all postgres processes are killed on "{disconnected}" hosts') @@ -103,7 +104,7 @@ def impl(context, disconnected): "rm -fr /data/gpdata/mirror/*"] for host in disconnected_hosts: for cmd in cmds: - subprocess.check_output(["ssh", host, cmd]) + gpsubprocess.check_output(["ssh", host, cmd]) @given('An entry to {action} {env} env var is added on all hosts of cluster') @@ -119,7 +120,7 @@ def impl(context, action, env): hosts = GpArray.initFromCatalog(dbconn.DbURL()).getHostList() for host in hosts: for cmd in cmds: - subprocess.check_output(["ssh", host, cmd]) + gpsubprocess.check_output(["ssh", host, cmd]) # This step is very specific to the CCP CI cluster. diff --git a/gpMgmt/test/behave_utils/cluster_expand.py b/gpMgmt/test/behave_utils/cluster_expand.py index 55da5e51fbff..c1cc1544cdc2 100755 --- a/gpMgmt/test/behave_utils/cluster_expand.py +++ b/gpMgmt/test/behave_utils/cluster_expand.py @@ -3,11 +3,12 @@ import glob from datetime import datetime, timedelta try: - from subprocess32 import Popen, PIPE + from subprocess32 import PIPE except: - from subprocess import Popen, PIPE + from subprocess import PIPE from .utils import run_gpcommand +from gppylib.gpsubprocess import Popen from gppylib.commands.base import Command from gppylib.db import dbconn @@ -65,7 +66,7 @@ def do_interview(self, hosts=None, num_of_segments=1, directory_pairs=None, has_ if mirror: p1.stdin.write(("%s\n" % mirror).encode('utf-8')) - output = p1.communicate()[0].decode('utf-8') + output = p1.communicate()[0] return output, p1.wait() diff --git a/gpcontrib/gp_replica_check/gp_replica_check.py b/gpcontrib/gp_replica_check/gp_replica_check.py index 560f75d7547c..ade3a670b856 100755 --- a/gpcontrib/gp_replica_check/gp_replica_check.py +++ b/gpcontrib/gp_replica_check/gp_replica_check.py @@ -39,6 +39,7 @@ import threading import queue import time +from gppylib import gpsubprocess if sys.version_info[0] == 3: from shlex import quote @@ -74,7 +75,7 @@ def run(self): print("Primary segment for content %d is down" % self.content) else: try: - res = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True).decode('utf-8') + res = gpsubprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True) self.result = True if res.strip().split('\n')[-2].strip() == 't' else False with self.lock: print(self) @@ -94,7 +95,7 @@ def create_restartpoint_on_ckpt_record_replay(set): cmd = "gpconfig -r create_restartpoint_on_ckpt_record_replay --skipvalidation && gpstop -u" print(cmd) try: - res = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True).decode('utf-8') + res = gpsubprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True) print(res) except subprocess.CalledProcessError as e: print('returncode: (%s), cmd: (%s), output: (%s)' % (e.returncode, e.cmd, e.output)) @@ -114,10 +115,10 @@ def install_extension(databases): database_list = [s.strip() for s in databases.split(',')] print("Creating gp_replica_check extension on databases if needed:") - datnames = subprocess.check_output('psql postgres -t -A -c "%s"' % get_datname_sql, stderr=subprocess.STDOUT, shell=True).decode('utf-8').split('\n') + datnames = gpsubprocess.check_output('psql postgres -t -A -c "%s"' % get_datname_sql, stderr=subprocess.STDOUT, shell=True).split('\n') for datname in datnames: if len(datname) >= 1 and (datname.strip() in database_list or 'all' in database_list): - print(subprocess.check_output('psql %s -t -c "%s"' % (quote(datname), create_ext_sql), stderr=subprocess.STDOUT, shell=True).decode('utf-8')) + print(gpsubprocess.check_output('psql %s -t -c "%s"' % (quote(datname), create_ext_sql), stderr=subprocess.STDOUT, shell=True)) # Get the primary and mirror servers, for each content ID. def get_segments(): @@ -129,7 +130,7 @@ def get_segments(): AND gscp.role = 'p' AND gscm.role = 'm' ''' - seglist = subprocess.check_output('psql postgres -t -c "%s"' % seglist_sql, stderr=subprocess.STDOUT, shell=True).decode('utf-8').split('\n') + seglist = gpsubprocess.check_output('psql postgres -t -c "%s"' % seglist_sql, stderr=subprocess.STDOUT, shell=True).split('\n') segmap = {} for segrow in seglist: segelements = [s.strip() for s in segrow.split('|')] @@ -150,7 +151,7 @@ def get_databases(databases): database_list = [s.strip() for s in databases.split(',')] - dbrawlist = subprocess.check_output('psql postgres -t -A -c "%s"' % dblist_sql, stderr=subprocess.STDOUT, shell=True).decode('utf-8').split('\n') + dbrawlist = gpsubprocess.check_output('psql postgres -t -A -c "%s"' % dblist_sql, stderr=subprocess.STDOUT, shell=True).split('\n') dblist = [] for dbrow in dbrawlist: dbname = dbrow diff --git a/src/backend/gporca/scripts/fix_mdps.py b/src/backend/gporca/scripts/fix_mdps.py index fe640b1a02f8..bf9f49fec686 100755 --- a/src/backend/gporca/scripts/fix_mdps.py +++ b/src/backend/gporca/scripts/fix_mdps.py @@ -14,13 +14,14 @@ from __future__ import print_function import sys import subprocess +from gppylib import gpsubprocess import re import argparse dryrun = False def run_command(command): - p = subprocess.Popen(command, + p = gpsubprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) return iter(p.stdout.readline, b'') diff --git a/src/backend/gporca/scripts/get_debug_event_counters.py b/src/backend/gporca/scripts/get_debug_event_counters.py index 16cc4d5a10a7..4aa6922dcf6e 100755 --- a/src/backend/gporca/scripts/get_debug_event_counters.py +++ b/src/backend/gporca/scripts/get_debug_event_counters.py @@ -3,6 +3,7 @@ from __future__ import print_function import sys import subprocess +from gppylib import gpsubprocess import re import argparse import os @@ -111,7 +112,7 @@ def print_or_insert_header_row(csv): # ----------------------------------------------------------------------------- def run_command(command): - p = subprocess.Popen(command, + p = gpsubprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) return iter(p.stdout.readline, b'') diff --git a/src/test/isolation2/sql_isolation_testcase.py b/src/test/isolation2/sql_isolation_testcase.py index 6106caaaed1d..716fa3703997 100755 --- a/src/test/isolation2/sql_isolation_testcase.py +++ b/src/test/isolation2/sql_isolation_testcase.py @@ -28,6 +28,7 @@ import subprocess32 as subprocess except: import subprocess +from gppylib import gpsubprocess import re import multiprocessing import tempfile @@ -116,7 +117,7 @@ def __init__(self, output_file='', initfile_prefix=''): self.v_cnt = 0 # open pseudo-terminal to interact with subprocess self.master_fd, self.slave_fd = pty.openpty() - self.sh_proc = subprocess.Popen(['/bin/bash', '--noprofile', '--norc', '--noediting', '-i'], + self.sh_proc = gpsubprocess.Popen(['/bin/bash', '--noprofile', '--norc', '--noediting', '-i'], stdin=self.slave_fd, stdout=self.slave_fd, stderr=self.slave_fd, @@ -740,7 +741,7 @@ def process_command(self, command, output_file, global_sh_executor): if mode != '\\retcode': raise Exception('Invalid execution mode: {}'.format(mode)) - cmd_output = subprocess.Popen(sql.strip(), stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True) + cmd_output = gpsubprocess.Popen(sql.strip(), stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True) if not bg_mode: stdout, _ = cmd_output.communicate() print(file=output_file) From 3ef68d7c083ca8b690e608f941de350e246f8dbf Mon Sep 17 00:00:00 2001 From: Viktor Kurilko Date: Thu, 19 Mar 2026 20:06:53 +0700 Subject: [PATCH 2/5] fix --- gpMgmt/bin/gpload.py | 4 ++-- gpMgmt/bin/gppylib/gpsubprocess.py | 11 ++++------- gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py | 1 - 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/gpMgmt/bin/gpload.py b/gpMgmt/bin/gpload.py index 09657463cd5c..58bb2fc036c6 100755 --- a/gpMgmt/bin/gpload.py +++ b/gpMgmt/bin/gpload.py @@ -777,7 +777,7 @@ def run(self): while 1: # Windows select does not support select on non-file fd's, so we can use the lock fix. Deadlock is possible here. # We need to look into the Python windows module to see if there is another way to do this in Windows. - line = self.fd.readline().decode('utf-8') + line = self.fd.readline() if not line: break self.gpload.log(self.gpload.DEBUG, 'gpfdist: ' + line.strip('\n')) @@ -790,7 +790,7 @@ def run(self): ) if retList[0] == [self.fd]: self.theLock.acquire() - line = self.fd.readline().decode('utf-8') + line = self.fd.readline() self.theLock.release() else: continue diff --git a/gpMgmt/bin/gppylib/gpsubprocess.py b/gpMgmt/bin/gppylib/gpsubprocess.py index e9a021e36475..4f88509e484b 100644 --- a/gpMgmt/bin/gppylib/gpsubprocess.py +++ b/gpMgmt/bin/gppylib/gpsubprocess.py @@ -75,10 +75,6 @@ def communicate2(self, timeout=2,input=None): self._finish_read_files(timeout,output,error) (resout,reserr)=self._postprocess_outputs(output,error) - if sys.version_info[0] == 3 and isinstance(resout, bytes): - resout = resout.decode('utf-8') - if sys.version_info[0] == 3 and isinstance(reserr, bytes): - reserr = reserr.decode('utf-8') return (self.returncode,resout,reserr) @@ -114,7 +110,8 @@ def _postprocess_outputs(self,output,error): # object do the translation: It is based on stdio, which is # impossible to combine with select (unless forcing no # buffering). - if self.universal_newlines and (sys.version_info[0] == 3 or hasattr(file, 'newlines')): + # In Python 3, we always use the text mode. Also, decoding happens here. + if sys.version_info[0] == 3 or (self.universal_newlines and hasattr(file, 'newlines')): if sys.version_info[0] == 3: kargs = { "encoding": "utf-8", @@ -123,9 +120,9 @@ def _postprocess_outputs(self,output,error): else: kargs = {} - if output: + if output is not None: output = self._translate_newlines(output, **kargs) - if error: + if error is not None: error = self._translate_newlines(error, **kargs) return (output,error) diff --git a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py index 76e6adf5f29d..e79710019495 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py @@ -24,7 +24,6 @@ from subprocess32 import PIPE except: from subprocess import PIPE -from gppylib import gpsubprocess from gppylib.gpsubprocess import Popen, check_output from collections import defaultdict From 09671487f27cfc62e6e3d491beabc0b58e0c1dec Mon Sep 17 00:00:00 2001 From: Viktor Kurilko Date: Thu, 19 Mar 2026 23:25:53 +0700 Subject: [PATCH 3/5] fix2 --- gpMgmt/bin/gpcheckperf | 13 ++++++++++--- gpMgmt/bin/gpmemwatcher | 4 ++-- gpMgmt/bin/gppylib/commands/base.py | 2 -- gpMgmt/bin/gppylib/gpsubprocess.py | 8 ++++---- gpMgmt/bin/gppylib/operations/utils.py | 4 +--- .../bin/gppylib/test/unit/test_unit_gpinitsystem.py | 2 +- gpMgmt/test/behave/mgmt_utils/steps/gpstart.py | 4 ++-- gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py | 10 ++++++++-- gpMgmt/test/behave_utils/cluster_expand.py | 12 ++++++------ 9 files changed, 34 insertions(+), 25 deletions(-) diff --git a/gpMgmt/bin/gpcheckperf b/gpMgmt/bin/gpcheckperf index b688e16a4d94..471b0e425b80 100755 --- a/gpMgmt/bin/gpcheckperf +++ b/gpMgmt/bin/gpcheckperf @@ -54,6 +54,13 @@ try: except ImportError as e: sys.exit('Error: unable to import module: ' + str(e)) +if sys.version_info[0] == 3: + import io + StringIO = io.StringIO +else: + import StringIO + StringIO = BytesIO = StringIO.StringIO + GPHOME = os.getenv('GPHOME') if GPHOME is None: sys.exit('Please set GPHOME environment variable') @@ -472,7 +479,7 @@ def copyExecOver(fname): def parseMultiDDResult(out): # parse output of "time -p" - out = io.StringIO(out) + out = StringIO(out) result = {} bytes = 0 for line in out: @@ -548,7 +555,7 @@ def runStreamTest(): (ok, out) = gpssh(cmd, GV.opt['-V']) if not ok: sys.exit('[Error] command failed: %s with output: %s' % (cmd, out)) - out = io.StringIO(out) + out = StringIO(out) result = {} for line in out: i = line.find(']') @@ -644,7 +651,7 @@ def reapNetperfTest(proc, x, y): print('[Warning] netperf failed on %s -> %s' % (x, y)) return [] - for line in io.StringIO(out): + for line in StringIO(out): line = line.split() if len(line) != 5: continue diff --git a/gpMgmt/bin/gpmemwatcher b/gpMgmt/bin/gpmemwatcher index 464df6d86637..485fc3755e5c 100755 --- a/gpMgmt/bin/gpmemwatcher +++ b/gpMgmt/bin/gpmemwatcher @@ -95,10 +95,10 @@ def cleanupLogFiles(signum, frame): def runPSProcess(sleepInt=60): ps_cmd = 'ps -ewwopid,ppid,rss,vsz,pmem,pcpu,time,etime,start_time,wchan,stat,psr,args ' try: - outfile = gzip.open(ps_file, 'w') + outfile = gzip.open(ps_file, 'wt') open_files.append(outfile) while True: - outfile.write(datetime.now().strftime("\n\n>>>%y:%m:%d:%H:%M:%S<<<\n").encode('utf-8')) + outfile.write(datetime.now().strftime("\n\n>>>%y:%m:%d:%H:%M:%S<<<\n")) cmd = gpsubprocess.Popen(ps_cmd, shell=True, stdout=subprocess.PIPE) outfile.writelines(cmd.stdout) outfile.flush() diff --git a/gpMgmt/bin/gppylib/commands/base.py b/gpMgmt/bin/gppylib/commands/base.py index d9087cfde6b9..9371a7200c78 100755 --- a/gpMgmt/bin/gppylib/commands/base.py +++ b/gpMgmt/bin/gppylib/commands/base.py @@ -532,8 +532,6 @@ class Command(object): propagate_env_map = {} # specific environment variables for this command instance def __init__(self, name, cmdStr, ctxt=LOCAL, remoteHost=None, stdin=None, gphome=None): - assert stdin is None or isinstance(stdin, str) - self.name = name self.cmdStr = cmdStr self.exec_context = createExecutionContext(ctxt, remoteHost, stdin=stdin, diff --git a/gpMgmt/bin/gppylib/gpsubprocess.py b/gpMgmt/bin/gppylib/gpsubprocess.py index 4f88509e484b..d06b27ed50d6 100644 --- a/gpMgmt/bin/gppylib/gpsubprocess.py +++ b/gpMgmt/bin/gppylib/gpsubprocess.py @@ -113,17 +113,17 @@ def _postprocess_outputs(self,output,error): # In Python 3, we always use the text mode. Also, decoding happens here. if sys.version_info[0] == 3 or (self.universal_newlines and hasattr(file, 'newlines')): if sys.version_info[0] == 3: - kargs = { + kwargs = { "encoding": "utf-8", "errors": "strict", } else: - kargs = {} + kwargs = {} if output is not None: - output = self._translate_newlines(output, **kargs) + output = self._translate_newlines(output, **kwargs) if error is not None: - error = self._translate_newlines(error, **kargs) + error = self._translate_newlines(error, **kwargs) return (output,error) diff --git a/gpMgmt/bin/gppylib/operations/utils.py b/gpMgmt/bin/gppylib/operations/utils.py index 5558d2fdfc70..a8db738833e2 100644 --- a/gpMgmt/bin/gppylib/operations/utils.py +++ b/gpMgmt/bin/gppylib/operations/utils.py @@ -46,9 +46,7 @@ def __init__(self, operation, host, msg_ctx=""): def execute(self): execname = os.path.split(sys.argv[0])[-1] - pickled_data = base64.urlsafe_b64encode(pickle.dumps((execname, self.operation))) - if sys.version_info[0] == 3: - pickled_data = pickled_data.decode('ascii') + pickled_data = base64.urlsafe_b64encode(pickle.dumps((execname, self.operation))).decode('ascii') cmd = Command('pickling an operation', 'echo "START_CMD_OUTPUT"; $GPHOME/sbin/gpoperation.py', ctxt=REMOTE, remoteHost=self.host, stdin = pickled_data) cmd.run(validateAfter=True) diff --git a/gpMgmt/bin/gppylib/test/unit/test_unit_gpinitsystem.py b/gpMgmt/bin/gppylib/test/unit/test_unit_gpinitsystem.py index df4ba75105ee..fb7f213f99f4 100644 --- a/gpMgmt/bin/gppylib/test/unit/test_unit_gpinitsystem.py +++ b/gpMgmt/bin/gppylib/test/unit/test_unit_gpinitsystem.py @@ -4,7 +4,7 @@ from .gp_unittest import * from io import StringIO try: - from subprocess32 import Popen, PIPE + from subprocess32 import PIPE except: from subprocess import PIPE from gppylib.gpsubprocess import Popen diff --git a/gpMgmt/test/behave/mgmt_utils/steps/gpstart.py b/gpMgmt/test/behave/mgmt_utils/steps/gpstart.py index e80553be2fd8..2cf8afc2ca08 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/gpstart.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/gpstart.py @@ -89,9 +89,9 @@ def impl(context, cmd): preexec_fn=_handle_sigpipe, ) - context.stdout_message, context.stderr_message = p.communicate() + returncode, context.stdout_message, context.stderr_message = p.communicate2() - context.ret_code = p.returncode + context.ret_code = returncode @given('the host for the {seg_type} on content {content} is made unreachable') def impl(context, seg_type, content): diff --git a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py index e79710019495..d52873f0d63d 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py @@ -1,5 +1,7 @@ from __future__ import print_function from __future__ import division + +import sys from builtins import next from builtins import filter from builtins import map @@ -2119,7 +2121,7 @@ def impl(context, filename, contain, output): cmd = Command(name='Running remote command: %s' % cmd_str, cmdStr=cmd_str) cmd.run(validateAfter=True) - actual = cmd.get_stdout() + actual = cmd.get_stdout().decode('utf-8') if valuesShouldExist and (output not in actual): raise Exception('File %s on host %s does not contain "%s"' % (filepath, host, output)) if (not valuesShouldExist) and (output in actual): @@ -4112,7 +4114,11 @@ def impl(context, command, input): @when('the user runs {command}, selects {input} and interrupt the process') def impl(context, command, input): p = Popen(command.split(), stdout=PIPE, stdin=PIPE, stderr=PIPE) - p.stdin.write(input.encode()) + + if sys.version_info[0] == 2: + input = input.encode('utf-8') + + p.stdin.write(input) p.stdin.flush() time.sleep(120) # interrupt the process. diff --git a/gpMgmt/test/behave_utils/cluster_expand.py b/gpMgmt/test/behave_utils/cluster_expand.py index c1cc1544cdc2..795f20917394 100755 --- a/gpMgmt/test/behave_utils/cluster_expand.py +++ b/gpMgmt/test/behave_utils/cluster_expand.py @@ -47,24 +47,24 @@ def do_interview(self, hosts=None, num_of_segments=1, directory_pairs=None, has_ # Cannot guarantee that this is not flaky either. # Would you like to initiate a new System Expansion Yy|Nn (default=N): - p1.stdin.write(b"y\n") + p1.stdin.write("y\n") # **Enter a blank line to only add segments to existing hosts**[]: - p1.stdin.write(("%s\n" % (",".join(hosts) if hosts else "")).encode('utf-8')) + p1.stdin.write("%s\n" % (",".join(hosts) if hosts else "")) if has_mirrors: #What type of mirroring strategy would you like? spread|grouped (default=grouped): - p1.stdin.write(b"\n") + p1.stdin.write("\n") #How many new primary segments per host do you want to add? (default=0): - p1.stdin.write(("%s\n" % num_of_segments).encode('utf-8')) + p1.stdin.write("%s\n" % num_of_segments) # Enter new primary data directory # for directory in directory_pairs: primary, mirror = directory - p1.stdin.write(("%s\n" % primary).encode('utf-8')) + p1.stdin.write("%s\n" % primary) if mirror: - p1.stdin.write(("%s\n" % mirror).encode('utf-8')) + p1.stdin.write("%s\n" % mirror) output = p1.communicate()[0] From 1a3f473e3f81ba208f45ad81ad046d48272588bc Mon Sep 17 00:00:00 2001 From: Viktor Kurilko Date: Fri, 20 Mar 2026 11:57:21 +0700 Subject: [PATCH 4/5] fix3 --- gpMgmt/bin/lib/pexpect/__init__.py | 12 +++--------- gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py | 4 +++- src/backend/gporca/scripts/fix_mdps.py | 2 +- .../gporca/scripts/get_debug_event_counters.py | 2 +- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/gpMgmt/bin/lib/pexpect/__init__.py b/gpMgmt/bin/lib/pexpect/__init__.py index 41d8d51bc10d..50ff9f1811d4 100644 --- a/gpMgmt/bin/lib/pexpect/__init__.py +++ b/gpMgmt/bin/lib/pexpect/__init__.py @@ -63,12 +63,6 @@ ''' -from builtins import chr -from builtins import zip -from builtins import bytes -from builtins import range -from past.builtins import basestring -from builtins import object try: import os import sys @@ -1774,9 +1768,9 @@ class spawnu(spawn): linesep = os.linesep crlf = '\r\n' else: - string_type = str - allowed_string_types = (str, ) - _chr = staticmethod(chr) + string_type = unicode + allowed_string_types = (unicode, ) + _chr = staticmethod(unichr) linesep = os.linesep.decode('ascii') crlf = '\r\n'.decode('ascii') # This can handle unicode in both Python 2 and 3 diff --git a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py index d52873f0d63d..6d20fbcaa2c2 100644 --- a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py +++ b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py @@ -2121,7 +2121,9 @@ def impl(context, filename, contain, output): cmd = Command(name='Running remote command: %s' % cmd_str, cmdStr=cmd_str) cmd.run(validateAfter=True) - actual = cmd.get_stdout().decode('utf-8') + actual = cmd.get_stdout() + if sys.version_info[0] == 2: + actual = actual.decode('utf-8') if valuesShouldExist and (output not in actual): raise Exception('File %s on host %s does not contain "%s"' % (filepath, host, output)) if (not valuesShouldExist) and (output in actual): diff --git a/src/backend/gporca/scripts/fix_mdps.py b/src/backend/gporca/scripts/fix_mdps.py index bf9f49fec686..3c2e33fdaf11 100755 --- a/src/backend/gporca/scripts/fix_mdps.py +++ b/src/backend/gporca/scripts/fix_mdps.py @@ -24,7 +24,7 @@ def run_command(command): p = gpsubprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - return iter(p.stdout.readline, b'') + return iter(p.stdout.readline, '') def parseInputFile(inputFile): failed_tests = [] diff --git a/src/backend/gporca/scripts/get_debug_event_counters.py b/src/backend/gporca/scripts/get_debug_event_counters.py index 4aa6922dcf6e..f62cb174b106 100755 --- a/src/backend/gporca/scripts/get_debug_event_counters.py +++ b/src/backend/gporca/scripts/get_debug_event_counters.py @@ -115,7 +115,7 @@ def run_command(command): p = gpsubprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - return iter(p.stdout.readline, b'') + return iter(p.stdout.readline, '') def processLogFile(logFileLines, allruns): From f21bba328a6b3b633f1cf1e98e7cc8d5eba79092 Mon Sep 17 00:00:00 2001 From: Viktor Kurilko Date: Fri, 20 Mar 2026 12:51:11 +0700 Subject: [PATCH 5/5] fix4 --- gpMgmt/bin/gppylib/gpsubprocess.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gpMgmt/bin/gppylib/gpsubprocess.py b/gpMgmt/bin/gppylib/gpsubprocess.py index d06b27ed50d6..d413396d67b9 100644 --- a/gpMgmt/bin/gppylib/gpsubprocess.py +++ b/gpMgmt/bin/gppylib/gpsubprocess.py @@ -39,6 +39,7 @@ class Popen(subprocess.Popen): def __init__(self, *args, **kwargs): if sys.version_info[0] == 3: kwargs['text'] = True + kwargs['encoding'] = 'utf-8' super(Popen, self).__init__(*args, **kwargs) def communicate2(self, timeout=2,input=None): @@ -249,4 +250,5 @@ def __select (self, iwtd, owtd, ewtd, timeout=None): def check_output(*popenargs, **kwargs): if sys.version_info[0] == 3: kwargs['text'] = True + kwargs['encoding'] = 'utf-8' return subprocess.check_output(*popenargs, **kwargs)