From c02d0c557b6e116fc2cd56d037de9ca3c33d9671 Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Tue, 27 Sep 2022 10:21:15 -0700 Subject: [PATCH 01/15] MANIFEST.json is copied --- packages/cli/internal/pkg/cli/workflow/manager.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index d14e9cf2..4f4e5fb5 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -73,6 +73,7 @@ type runProps struct { path string packPath string workflowUrl string + manifestPath string inputsPath string input Input optionFileUrl string @@ -311,6 +312,12 @@ func (m *Manager) uploadWorkflowToS3() { if m.err != nil { return } + err := copyFileRecursivelyToLocation("extra", m.path) + + if err != nil { + m.err = err + return + } objectKey := fmt.Sprintf("%s/%s", m.baseWorkflowKey, workflowZip) log.Debug().Msgf("updloading '%s' to 's3://%s/%s", m.packPath, m.bucketName, objectKey) m.err = m.S3.UploadFile(m.bucketName, objectKey, m.packPath) From 0a4d89f267f90fadcafca65ef4ecf329b9d776d1 Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Tue, 27 Sep 2022 13:23:51 -0700 Subject: [PATCH 02/15] added manifest struct for parsing --- packages/cli/internal/pkg/cli/workflow/manager.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index 4f4e5fb5..3fe1d88c 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -119,6 +119,12 @@ type workflowOutputProps struct { workflowRunLogOutputs map[string]interface{} } +type manifestProps struct { + mainWorkFlowURL string `json:"mainWorkFlowURL"` + inputFileURLs []string `json:"inputFileURLs"` + engineOptions string `json:"engineOptions"` +} + type Manager struct { Project storage.ProjectClient Config storage.ConfigClient @@ -312,7 +318,7 @@ func (m *Manager) uploadWorkflowToS3() { if m.err != nil { return } - err := copyFileRecursivelyToLocation("extra", m.path) + err := copyFileRecursivelyToLocation("extra", filepath.Join(m.path, "MANIFEST.json")) if err != nil { m.err = err From bc408733439f3e65d75d0a22a8b17759843a83b8 Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Thu, 29 Sep 2022 16:35:09 -0700 Subject: [PATCH 03/15] inputsFile now added to MANIFEST --- .../cli/internal/pkg/cli/workflow/manager.go | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index 3fe1d88c..6e9ca041 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -120,9 +120,9 @@ type workflowOutputProps struct { } type manifestProps struct { - mainWorkFlowURL string `json:"mainWorkFlowURL"` - inputFileURLs []string `json:"inputFileURLs"` - engineOptions string `json:"engineOptions"` + MainWorkFlowURL string `json:"mainWorkFlowURL"` + InputFileURLs []string `json:"inputFileURLs"` + EngineOptions string `json:"engineOptions"` } type Manager struct { @@ -318,7 +318,8 @@ func (m *Manager) uploadWorkflowToS3() { if m.err != nil { return } - err := copyFileRecursivelyToLocation("extra", filepath.Join(m.path, "MANIFEST.json")) + err := copyFileRecursivelyToLocation("extra", m.path) + m.manifestPath = filepath.Join("extra", "MANIFEST.json") if err != nil { m.err = err @@ -338,6 +339,7 @@ func (m *Manager) readInput(inputUrl string) { } log.Debug().Msgf("Input file override URL: %s", inputUrl) m.inputsPath = osutils.StripFileURLPrefix(inputUrl) // We actually support only local files + m.parseAndAddToManifest() bytes, err := m.Storage.ReadAsBytes(inputUrl) log.Debug().Msgf("content is:\n'%s'", string(bytes)) if err != nil { @@ -361,6 +363,30 @@ func (m *Manager) parseInputToArguments() { m.arguments = []string{arguments} } +func (m *Manager) parseAndAddToManifest() { + bytes, err := m.Storage.ReadAsBytes(m.manifestPath) + if err != nil { + m.err = err + return + } + var data manifestProps + if err := json.Unmarshal(bytes, &data); err != nil { + m.err = err + return + } + data.InputFileURLs = append(data.InputFileURLs, m.inputsPath) + bytes, err = json.Marshal(data) + if err != nil { + m.err = err + return + } + err = m.Storage.WriteFromBytes(m.manifestPath, bytes) + if err != nil { + m.err = err + return + } +} + func (m *Manager) uploadInputsToS3() { if m.err != nil || m.input == nil { return From 35189164dd74d1e60374201d55e11ed7a0e4fe6c Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Mon, 10 Oct 2022 16:10:55 -0700 Subject: [PATCH 04/15] code added to test suite for parsing function --- .../pkg/cli/workflow/workflow_run_test.go | 109 +++++++++++++++++- 1 file changed, 104 insertions(+), 5 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go index b6d0aa7b..3db6dbc6 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "os" "path/filepath" "testing" @@ -43,6 +44,8 @@ const ( testWorkflowKey = "project/" + testProjectName + "/userid/" + testUserId + "/context/" + testContext1Name + "/workflow/" + testLocalWorkflowName testWorkflowZipKey = testWorkflowKey + "/workflow.zip" testWorkflowLocalUrl = "workflow/path/file.wdl" + testMANIFESTPath = "extra/MANIFEST.json" + testMANIFEST = `{"mainWorkflowURL": "haplotypecaller-gvcf-gatk4.wdl","inputFileURLs": ["haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json"],"engineOptions": "--no-cache"}` testFullWorkflowLocalUrl = testProjectFileDir + "/" + testWorkflowLocalUrl testTempDir = "/directory/workflow" testWorkflowS3Url = "s3://workflow/path/file.wdl" @@ -88,11 +91,12 @@ type WorkflowRunTestSuite struct { origCompressToTmp func(srcPath string) (string, error) origWriteToTmp func(namePattern, content string) (string, error) - testProjSpec spec.Project - wfInstance ddb.WorkflowInstance - testStackInfo cfn.StackInfo - workAbsDir string - inputsAbsDir string + testProjSpec spec.Project + wfInstance ddb.WorkflowInstance + testStackInfo cfn.StackInfo + workAbsDir string + inputsAbsDir string + testAppendedMANIFEST string manager *Manager } @@ -125,6 +129,7 @@ func (s *WorkflowRunTestSuite) BeforeTest(_, _ string) { s.workAbsDir, err = os.Getwd() require.NoError(s.T(), err) s.inputsAbsDir = filepath.Join(s.workAbsDir, testArgumentsDir) + s.testAppendedMANIFEST = "{\"mainWorkFlowURL\":\"haplotypecaller-gvcf-gatk4.wdl\",\"inputFileURLs\":[\"haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json\",\"" + filepath.Join(testArgumentsDir, testArgsFileName) + "\"],\"engineOptions\":\"--no-cache\"}" s.manager = &Manager{ Project: s.mockProjectClient, @@ -198,6 +203,20 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithS3Args() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) + eq := gomock.GotFormatterAdapter( + gomock.GotFormatterFunc( + func(i interface{}) string { + return fmt.Sprintf("%s", i) + }), + gomock.WantFormatter( + gomock.StringerFunc(func() string { + return fmt.Sprintf("%s", s.testAppendedMANIFEST) + }), + gomock.Eq([]byte(s.testAppendedMANIFEST)), + ), + ) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputS3), &testInputS3Map) @@ -207,6 +226,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithS3Args() { s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().NoError(err) { @@ -228,6 +248,20 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithLocalArgs() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) + eq := gomock.GotFormatterAdapter( + gomock.GotFormatterFunc( + func(i interface{}) string { + return fmt.Sprintf("%s", i) + }), + gomock.WantFormatter( + gomock.StringerFunc(func() string { + return fmt.Sprintf("%s", s.testAppendedMANIFEST) + }), + gomock.Eq([]byte(s.testAppendedMANIFEST)), + ), + ) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputLocalToS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) @@ -239,6 +273,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithLocalArgs() { s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().NoError(err) { @@ -263,6 +298,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_NoArgs() { s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().NoError(err) { @@ -280,6 +316,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_OptionsFile() { s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, "", testOptionFilePath) if s.Assert().NoError(err) { s.Assert().Equal(testRun1Id, actualId) @@ -291,7 +328,22 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_WithLocalArgs() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + fmt.Printf("%s, Jonathan", testArgumentsPath) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) + eq := gomock.GotFormatterAdapter( + gomock.GotFormatterFunc( + func(i interface{}) string { + return fmt.Sprintf("%s", i) + }), + gomock.WantFormatter( + gomock.StringerFunc(func() string { + return fmt.Sprintf("%s", s.testAppendedMANIFEST) + }), + gomock.Eq([]byte(s.testAppendedMANIFEST)), + ), + ) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputLocalToS3).Return(testTmpAttachmentPath, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockProjectClient.EXPECT().GetLocation().AnyTimes().Return(testProjectFileDir) @@ -305,6 +357,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_WithLocalArgs() { s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, testArgumentsPath, "") if s.Assert().NoError(err) { @@ -321,6 +374,8 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_NoArgs() { s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, "", "") if s.Assert().NoError(err) { s.Assert().Equal(testRun1Id, actualId) @@ -330,6 +385,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_NoArgs() { func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadProjectSpecFailure() { errorMessage := "failed to read project specification" s.mockProjectClient.EXPECT().Read().Return(spec.Project{}, errors.New(errorMessage)) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, "", "", "") if s.Assert().Error(err) { @@ -340,6 +396,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadProjectSpecFailure() { func (s *WorkflowRunTestSuite) TestRunWorkflow_MissingWorkflowSpec() { s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, "dummy", "", "") if s.Assert().Error(err) { @@ -353,6 +410,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_InvalidWorkflowDefinitionUrl() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testInvalidWorkflowName, "", "") if s.Assert().Error(err) { @@ -371,6 +429,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CompressionFailed() { s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(false) s.mockZip.EXPECT().CompressToTmp(testFullWorkflowLocalUrl).Return("", errors.New(errorMessage)) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -385,6 +444,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_SSMClientFailed() { errorMessage := "cannot connect to SSM" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return("", errors.New(errorMessage)) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -404,6 +464,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_UploadToS3Failed() { s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) @@ -432,6 +493,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadArgsFailed() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte{}, errors.New(errorMessage)) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") @@ -456,8 +518,23 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_UploadInputFailed() { s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) + eq := gomock.GotFormatterAdapter( + gomock.GotFormatterFunc( + func(i interface{}) string { + return fmt.Sprintf("%s", i) + }), + gomock.WantFormatter( + gomock.StringerFunc(func() string { + return fmt.Sprintf("%s", s.testAppendedMANIFEST) + }), + gomock.Eq([]byte(s.testAppendedMANIFEST)), + ), + ) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) s.mockInputClient.EXPECT().UpdateInputs(s.inputsAbsDir, testInputS3Map, testOutputBucket, testFilePathKey).Return(nil, errors.New(errorMessage)) @@ -480,6 +557,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CfnFailed() { s.mockFileInfo.EXPECT().IsDir().Return(false) s.mockZip.EXPECT().CompressToTmp(testFullWorkflowLocalUrl).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(cfn.StackInfo{}, errors.New(errorMessage)) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) @@ -500,6 +578,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CfnMissingWesUrlFailed() { s.mockFileInfo.EXPECT().IsDir().Return(false) s.mockZip.EXPECT().CompressToTmp(testFullWorkflowLocalUrl).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) stackInfo := cfn.StackInfo{ Id: testStackId, @@ -529,6 +608,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_WesFailed() { s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return("", errors.New(errorMessage)) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) @@ -545,6 +625,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_DeployValidationFailed() { errorMessage := "context 'TestContext1' is not deployed" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, cfn.StackDoesNotExistError) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -558,6 +639,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_DeployValidationCfnFailed() { errorMessage := "some cfn error" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, errors.New(errorMessage)) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -576,6 +658,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CreateTempDir() { s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return("", errors.New(errorMessage)) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -596,6 +679,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CopyError() { s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(errors.New(errorMessage)) s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -618,6 +702,20 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_RemoveErrorStillWorks() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) + eq := gomock.GotFormatterAdapter( + gomock.GotFormatterFunc( + func(i interface{}) string { + return fmt.Sprintf("%s", i) + }), + gomock.WantFormatter( + gomock.StringerFunc(func() string { + return fmt.Sprintf("%s", s.testAppendedMANIFEST) + }), + gomock.Eq([]byte(s.testAppendedMANIFEST)), + ), + ) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputS3), &testInputS3Map) @@ -625,6 +723,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_RemoveErrorStillWorks() { s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) + s.mockOs.EXPECT().RemoveAll("extra").Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) From a7a779e720ec3aba0fc67957c6ec13a94900ee0e Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Mon, 10 Oct 2022 16:11:41 -0700 Subject: [PATCH 05/15] added parsing function to workflow run --- packages/cli/internal/pkg/cli/workflow/workflow_run.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run.go b/packages/cli/internal/pkg/cli/workflow/workflow_run.go index 206a4141..a72d7bea 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run.go @@ -20,6 +20,7 @@ func (m *Manager) RunWorkflow(contextName, workflowName, inputsFileUrl string, o } m.calculateFinalLocation() m.readInput(inputsFileUrl) + m.parseAndAddToManifest() m.uploadInputsToS3() m.parseInputToArguments() m.readOptionFile(optionFileUrl) From 97f03fe562cb4404c41413b64e85ffa7c2b60784 Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Mon, 10 Oct 2022 16:12:51 -0700 Subject: [PATCH 06/15] added debugging logs and refactored parse function --- .../cli/internal/pkg/cli/workflow/manager.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index 6e9ca041..4791cd23 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -318,9 +318,8 @@ func (m *Manager) uploadWorkflowToS3() { if m.err != nil { return } + log.Debug().Msgf("copying %s to extra directory", m.path) err := copyFileRecursivelyToLocation("extra", m.path) - m.manifestPath = filepath.Join("extra", "MANIFEST.json") - if err != nil { m.err = err return @@ -339,7 +338,6 @@ func (m *Manager) readInput(inputUrl string) { } log.Debug().Msgf("Input file override URL: %s", inputUrl) m.inputsPath = osutils.StripFileURLPrefix(inputUrl) // We actually support only local files - m.parseAndAddToManifest() bytes, err := m.Storage.ReadAsBytes(inputUrl) log.Debug().Msgf("content is:\n'%s'", string(bytes)) if err != nil { @@ -364,6 +362,11 @@ func (m *Manager) parseInputToArguments() { } func (m *Manager) parseAndAddToManifest() { + if m.err != nil || m.inputsPath == "" { + return + } + m.manifestPath = filepath.Join("extra", "MANIFEST.json") + log.Debug().Msgf("Reading %s", m.manifestPath) bytes, err := m.Storage.ReadAsBytes(m.manifestPath) if err != nil { m.err = err @@ -573,6 +576,14 @@ func (m *Manager) cleanUpAttachments() { log.Warn().Msgf("Failed to clean up temporary file '%s': %s", attachment, err) } } + log.Debug().Msgf("removing extra directory") + err := removeAll("extra") + if err != nil { + log.Warn().Msgf("Failed to remove extra directory") + m.err = err + return + } + } func (m *Manager) runWorkflow() { From 78d90b8aac31f2738c87075fba007f93d29f299f Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Tue, 11 Oct 2022 12:54:40 -0700 Subject: [PATCH 07/15] refactored manager.go for readability --- .../cli/internal/pkg/cli/workflow/manager.go | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index 4791cd23..ea617b38 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -119,7 +119,7 @@ type workflowOutputProps struct { workflowRunLogOutputs map[string]interface{} } -type manifestProps struct { +type ManifestProps struct { MainWorkFlowURL string `json:"mainWorkFlowURL"` InputFileURLs []string `json:"inputFileURLs"` EngineOptions string `json:"engineOptions"` @@ -318,8 +318,8 @@ func (m *Manager) uploadWorkflowToS3() { if m.err != nil { return } - log.Debug().Msgf("copying %s to extra directory", m.path) - err := copyFileRecursivelyToLocation("extra", m.path) + log.Debug().Msgf("copying %s to temp directory", m.path) + err := copyFileRecursivelyToLocation("temp", m.path) if err != nil { m.err = err return @@ -361,18 +361,20 @@ func (m *Manager) parseInputToArguments() { m.arguments = []string{arguments} } -func (m *Manager) parseAndAddToManifest() { +// writeTempManifest writes the inputsFile included in the command line to the temporary MANIFEST.json located in temp directory +// This function is only called if there is a path included in the command line with the --inputsFile flag +func (m *Manager) writeTempManifest() { if m.err != nil || m.inputsPath == "" { return } - m.manifestPath = filepath.Join("extra", "MANIFEST.json") + m.manifestPath = filepath.Join("temp", "MANIFEST.json") log.Debug().Msgf("Reading %s", m.manifestPath) bytes, err := m.Storage.ReadAsBytes(m.manifestPath) if err != nil { m.err = err return } - var data manifestProps + var data ManifestProps if err := json.Unmarshal(bytes, &data); err != nil { m.err = err return @@ -568,6 +570,16 @@ func (m *Manager) saveAttachments() { } } +func (m *Manager) removeTempManifest() { + log.Debug().Msgf("removing temp directory") + err := removeAll("temp") + if err != nil { + log.Warn().Msgf("Failed to remove temp directory") + m.err = err + return + } +} + func (m *Manager) cleanUpAttachments() { for _, attachment := range m.attachments { log.Debug().Msgf("cleaning up '%s'", attachment) @@ -576,14 +588,7 @@ func (m *Manager) cleanUpAttachments() { log.Warn().Msgf("Failed to clean up temporary file '%s': %s", attachment, err) } } - log.Debug().Msgf("removing extra directory") - err := removeAll("extra") - if err != nil { - log.Warn().Msgf("Failed to remove extra directory") - m.err = err - return - } - + m.removeTempManifest() } func (m *Manager) runWorkflow() { From c1d1a36c3079694af81348d34acc03672137b109 Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Tue, 11 Oct 2022 12:54:58 -0700 Subject: [PATCH 08/15] changed name of new function --- packages/cli/internal/pkg/cli/workflow/workflow_run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run.go b/packages/cli/internal/pkg/cli/workflow/workflow_run.go index a72d7bea..5e22adfd 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run.go @@ -20,7 +20,7 @@ func (m *Manager) RunWorkflow(contextName, workflowName, inputsFileUrl string, o } m.calculateFinalLocation() m.readInput(inputsFileUrl) - m.parseAndAddToManifest() + m.writeTempManifest() m.uploadInputsToS3() m.parseInputToArguments() m.readOptionFile(optionFileUrl) From cebd1abebfc95b70f71b4bc4e1d54a7da0759cc0 Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Tue, 11 Oct 2022 12:55:24 -0700 Subject: [PATCH 09/15] refactored for readability and changed directory name --- .../pkg/cli/workflow/workflow_run_test.go | 132 ++++++------------ 1 file changed, 44 insertions(+), 88 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go index 3db6dbc6..76038472 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go @@ -44,7 +44,7 @@ const ( testWorkflowKey = "project/" + testProjectName + "/userid/" + testUserId + "/context/" + testContext1Name + "/workflow/" + testLocalWorkflowName testWorkflowZipKey = testWorkflowKey + "/workflow.zip" testWorkflowLocalUrl = "workflow/path/file.wdl" - testMANIFESTPath = "extra/MANIFEST.json" + testMANIFESTPath = "temp/MANIFEST.json" testMANIFEST = `{"mainWorkflowURL": "haplotypecaller-gvcf-gatk4.wdl","inputFileURLs": ["haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json"],"engineOptions": "--no-cache"}` testFullWorkflowLocalUrl = testProjectFileDir + "/" + testWorkflowLocalUrl testTempDir = "/directory/workflow" @@ -101,6 +101,22 @@ type WorkflowRunTestSuite struct { manager *Manager } +func (s *WorkflowRunTestSuite) formatManifestBytesToString() gomock.Matcher { + eq := gomock.GotFormatterAdapter( + gomock.GotFormatterFunc( + func(i interface{}) string { + return fmt.Sprintf("%s", i) + }), + gomock.WantFormatter( + gomock.StringerFunc(func() string { + return fmt.Sprintf("%s", s.testAppendedMANIFEST) + }), + gomock.Eq([]byte(s.testAppendedMANIFEST)), + ), + ) + return eq +} + func (s *WorkflowRunTestSuite) BeforeTest(_, _ string) { s.ctrl = gomock.NewController(s.T()) s.mockProjectClient = storagemocks.NewMockProjectClient(s.ctrl) @@ -204,19 +220,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithS3Args() { uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - eq := gomock.GotFormatterAdapter( - gomock.GotFormatterFunc( - func(i interface{}) string { - return fmt.Sprintf("%s", i) - }), - gomock.WantFormatter( - gomock.StringerFunc(func() string { - return fmt.Sprintf("%s", s.testAppendedMANIFEST) - }), - gomock.Eq([]byte(s.testAppendedMANIFEST)), - ), - ) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputS3), &testInputS3Map) @@ -226,7 +230,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithS3Args() { s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().NoError(err) { @@ -249,19 +253,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithLocalArgs() { uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - eq := gomock.GotFormatterAdapter( - gomock.GotFormatterFunc( - func(i interface{}) string { - return fmt.Sprintf("%s", i) - }), - gomock.WantFormatter( - gomock.StringerFunc(func() string { - return fmt.Sprintf("%s", s.testAppendedMANIFEST) - }), - gomock.Eq([]byte(s.testAppendedMANIFEST)), - ), - ) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputLocalToS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) @@ -273,7 +265,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithLocalArgs() { s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().NoError(err) { @@ -298,7 +290,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_NoArgs() { s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().NoError(err) { @@ -316,7 +308,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_OptionsFile() { s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, "", testOptionFilePath) if s.Assert().NoError(err) { s.Assert().Equal(testRun1Id, actualId) @@ -331,19 +323,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_WithLocalArgs() { fmt.Printf("%s, Jonathan", testArgumentsPath) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - eq := gomock.GotFormatterAdapter( - gomock.GotFormatterFunc( - func(i interface{}) string { - return fmt.Sprintf("%s", i) - }), - gomock.WantFormatter( - gomock.StringerFunc(func() string { - return fmt.Sprintf("%s", s.testAppendedMANIFEST) - }), - gomock.Eq([]byte(s.testAppendedMANIFEST)), - ), - ) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputLocalToS3).Return(testTmpAttachmentPath, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockProjectClient.EXPECT().GetLocation().AnyTimes().Return(testProjectFileDir) @@ -357,7 +337,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_WithLocalArgs() { s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, testArgumentsPath, "") if s.Assert().NoError(err) { @@ -374,7 +354,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_NoArgs() { s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, "", "") if s.Assert().NoError(err) { @@ -385,7 +365,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_NoArgs() { func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadProjectSpecFailure() { errorMessage := "failed to read project specification" s.mockProjectClient.EXPECT().Read().Return(spec.Project{}, errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, "", "", "") if s.Assert().Error(err) { @@ -396,7 +376,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadProjectSpecFailure() { func (s *WorkflowRunTestSuite) TestRunWorkflow_MissingWorkflowSpec() { s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, "dummy", "", "") if s.Assert().Error(err) { @@ -410,7 +390,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_InvalidWorkflowDefinitionUrl() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testInvalidWorkflowName, "", "") if s.Assert().Error(err) { @@ -429,7 +409,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CompressionFailed() { s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(false) s.mockZip.EXPECT().CompressToTmp(testFullWorkflowLocalUrl).Return("", errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -444,7 +424,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_SSMClientFailed() { errorMessage := "cannot connect to SSM" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return("", errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -464,7 +444,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_UploadToS3Failed() { s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) @@ -493,7 +473,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadArgsFailed() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte{}, errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") @@ -518,23 +498,11 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_UploadInputFailed() { s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - eq := gomock.GotFormatterAdapter( - gomock.GotFormatterFunc( - func(i interface{}) string { - return fmt.Sprintf("%s", i) - }), - gomock.WantFormatter( - gomock.StringerFunc(func() string { - return fmt.Sprintf("%s", s.testAppendedMANIFEST) - }), - gomock.Eq([]byte(s.testAppendedMANIFEST)), - ), - ) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) s.mockInputClient.EXPECT().UpdateInputs(s.inputsAbsDir, testInputS3Map, testOutputBucket, testFilePathKey).Return(nil, errors.New(errorMessage)) @@ -557,7 +525,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CfnFailed() { s.mockFileInfo.EXPECT().IsDir().Return(false) s.mockZip.EXPECT().CompressToTmp(testFullWorkflowLocalUrl).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(cfn.StackInfo{}, errors.New(errorMessage)) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) @@ -578,7 +546,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CfnMissingWesUrlFailed() { s.mockFileInfo.EXPECT().IsDir().Return(false) s.mockZip.EXPECT().CompressToTmp(testFullWorkflowLocalUrl).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) stackInfo := cfn.StackInfo{ Id: testStackId, @@ -608,7 +576,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_WesFailed() { s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return("", errors.New(errorMessage)) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) @@ -625,7 +593,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_DeployValidationFailed() { errorMessage := "context 'TestContext1' is not deployed" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, cfn.StackDoesNotExistError) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -639,7 +607,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_DeployValidationCfnFailed() { errorMessage := "some cfn error" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -658,7 +626,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CreateTempDir() { s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return("", errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -679,7 +647,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CopyError() { s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(errors.New(errorMessage)) s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -703,19 +671,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_RemoveErrorStillWorks() { uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - eq := gomock.GotFormatterAdapter( - gomock.GotFormatterFunc( - func(i interface{}) string { - return fmt.Sprintf("%s", i) - }), - gomock.WantFormatter( - gomock.StringerFunc(func() string { - return fmt.Sprintf("%s", s.testAppendedMANIFEST) - }), - gomock.Eq([]byte(s.testAppendedMANIFEST)), - ), - ) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, eq).Return(nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputS3), &testInputS3Map) @@ -723,7 +679,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_RemoveErrorStillWorks() { s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) - s.mockOs.EXPECT().RemoveAll("extra").Return(nil) + s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) From 9b281ebce9ce5962e95ef5566b742aa045a49342 Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Wed, 12 Oct 2022 16:42:44 -0700 Subject: [PATCH 10/15] refactored workflow run to use same temp folder to avoid duplication --- .../cli/internal/pkg/cli/workflow/manager.go | 88 +++++++++++-------- .../internal/pkg/cli/workflow/workflow_run.go | 12 +-- .../pkg/cli/workflow/workflow_run_test.go | 66 ++++---------- 3 files changed, 77 insertions(+), 89 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index ea617b38..9cf89c6c 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -31,6 +31,7 @@ import ( var ( compressToTmp = zipfile.CompressToTmp workflowZip = "workflow.zip" + manifestFilename = "MANIFEST.json" removeFile = os.Remove removeAll = os.RemoveAll osStat = os.Stat @@ -74,6 +75,7 @@ type runProps struct { packPath string workflowUrl string manifestPath string + tempPath string inputsPath string input Input optionFileUrl string @@ -249,41 +251,36 @@ func (m *Manager) packWorkflowPath() { return } - var absoluteWorkflowPath string + // var absoluteWorkflowPath string if fileInfo.IsDir() { - absoluteWorkflowPath, err = createTempDir("", "workflow_*") - log.Debug().Msgf("workflow path '%s' is a directory, packing contents ...", absoluteWorkflowPath) - if err != nil { - m.err = err - return - } - defer func() { - err = removeAll(absoluteWorkflowPath) - if err != nil { - log.Warn().Msgf("Failed to delete temporary folder '%s'", m.packPath) - } - }() - - log.Debug().Msgf("recursively copying content of '%s' to '%s'", m.path, absoluteWorkflowPath) - err = copyFileRecursivelyToLocation(absoluteWorkflowPath, m.path) - if err != nil { - log.Error().Err(err) - m.err = err - return - } + // absoluteWorkflowPath, err = createTempDir("", "workflow_*") + log.Debug().Msgf("workflow path '%s' is a directory, packing contents ...", m.tempPath) + // if err != nil { + // m.err = err + // return + // } + defer m.deleteTempDir() + + // log.Debug().Msgf("recursively copying content of '%s' to '%s'", m.path, m.tempPath) + // err = copyFileRecursivelyToLocation(absoluteWorkflowPath, m.path) + // if err != nil { + // log.Error().Err(err) + // m.err = err + // return + // } log.Debug().Msgf("updating file references and loading packed content to '%s/%s'", m.bucketName, m.baseWorkflowKey) - err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, absoluteWorkflowPath, m.bucketName, m.baseWorkflowKey) + err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, m.tempPath, m.bucketName, m.baseWorkflowKey) if err != nil { log.Error().Err(err) m.err = err return } } else { - absoluteWorkflowPath = m.path + m.tempPath = m.path } - m.packPath, m.err = compressToTmp(absoluteWorkflowPath) + m.packPath, m.err = compressToTmp(m.tempPath) } func (m *Manager) setOutputBucket() { @@ -361,13 +358,43 @@ func (m *Manager) parseInputToArguments() { m.arguments = []string{arguments} } +func (m *Manager) initializeTempDir() { + if m.err != nil { + return + } + var err error + m.tempPath, err = createTempDir("", "workflow_*") + log.Debug().Msgf("created temp directory at: '%s'", m.tempPath) + if err != nil { + m.err = err + return + } + log.Debug().Msgf("recursively copying content of '%s' to '%s'", m.path, m.tempPath) + err = copyFileRecursivelyToLocation(m.tempPath, m.path) + if err != nil { + log.Error().Err(err) + m.err = err + return + } +} + +func (m *Manager) deleteTempDir() { + if m.tempPath == "" { + return + } + err := removeAll(m.tempPath) + if err != nil { + log.Warn().Msgf("Failed to delete temporary folder '%s'", m.tempPath) + } +} + // writeTempManifest writes the inputsFile included in the command line to the temporary MANIFEST.json located in temp directory // This function is only called if there is a path included in the command line with the --inputsFile flag func (m *Manager) writeTempManifest() { if m.err != nil || m.inputsPath == "" { return } - m.manifestPath = filepath.Join("temp", "MANIFEST.json") + m.manifestPath = filepath.Join(m.tempPath, manifestFilename) log.Debug().Msgf("Reading %s", m.manifestPath) bytes, err := m.Storage.ReadAsBytes(m.manifestPath) if err != nil { @@ -570,16 +597,6 @@ func (m *Manager) saveAttachments() { } } -func (m *Manager) removeTempManifest() { - log.Debug().Msgf("removing temp directory") - err := removeAll("temp") - if err != nil { - log.Warn().Msgf("Failed to remove temp directory") - m.err = err - return - } -} - func (m *Manager) cleanUpAttachments() { for _, attachment := range m.attachments { log.Debug().Msgf("cleaning up '%s'", attachment) @@ -588,7 +605,6 @@ func (m *Manager) cleanUpAttachments() { log.Warn().Msgf("Failed to clean up temporary file '%s': %s", attachment, err) } } - m.removeTempManifest() } func (m *Manager) runWorkflow() { diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run.go b/packages/cli/internal/pkg/cli/workflow/workflow_run.go index 5e22adfd..39eb4b4b 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run.go @@ -11,18 +11,20 @@ func (m *Manager) RunWorkflow(contextName, workflowName, inputsFileUrl string, o m.validateContextIsDeployed(contextName) m.setOutputBucket() m.parseWorkflowLocation() + m.readInput(inputsFileUrl) // initialize temp folder here, then delete once we get to packWorkflowPath? + m.initializeTempDir() + m.writeTempManifest() + m.uploadInputsToS3() + m.parseInputToArguments() if m.isUploadRequired() { m.setBaseObjectKey(contextName, workflowName) m.setWorkflowPath() - m.packWorkflowPath() + m.packWorkflowPath() // where temp folder is initially created m.uploadWorkflowToS3() m.cleanUpWorkflow() } m.calculateFinalLocation() - m.readInput(inputsFileUrl) - m.writeTempManifest() - m.uploadInputsToS3() - m.parseInputToArguments() + // where everything was m.readOptionFile(optionFileUrl) m.setContextStackInfo(contextName) m.setWesUrl() diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go index 76038472..41295abd 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go @@ -44,8 +44,6 @@ const ( testWorkflowKey = "project/" + testProjectName + "/userid/" + testUserId + "/context/" + testContext1Name + "/workflow/" + testLocalWorkflowName testWorkflowZipKey = testWorkflowKey + "/workflow.zip" testWorkflowLocalUrl = "workflow/path/file.wdl" - testMANIFESTPath = "temp/MANIFEST.json" - testMANIFEST = `{"mainWorkflowURL": "haplotypecaller-gvcf-gatk4.wdl","inputFileURLs": ["haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json"],"engineOptions": "--no-cache"}` testFullWorkflowLocalUrl = testProjectFileDir + "/" + testWorkflowLocalUrl testTempDir = "/directory/workflow" testWorkflowS3Url = "s3://workflow/path/file.wdl" @@ -54,6 +52,9 @@ const ( testArgsFileName = "args.txt" testArgumentsDir = "workflow/path/" testArgumentsPath = testArgumentsDir + testArgsFileName + testMANIFESTPath = testTempDir + "/MANIFEST.json" + testMANIFEST = `{"mainWorkflowURL": "haplotypecaller-gvcf-gatk4.wdl","inputFileURLs": ["haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json"],"engineOptions": "--no-cache"}` + testAppendedMANIFEST = "{\"mainWorkFlowURL\":\"haplotypecaller-gvcf-gatk4.wdl\",\"inputFileURLs\":[\"haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json\",\"" + testArgumentsDir + testArgsFileName + "\"],\"engineOptions\":\"--no-cache\"}" testOptionFileName = "test.json" testOptionFilePath = "file://path/to/" + testOptionFileName testWesUrl = "https://TestWesUrl.com/prod" @@ -109,9 +110,9 @@ func (s *WorkflowRunTestSuite) formatManifestBytesToString() gomock.Matcher { }), gomock.WantFormatter( gomock.StringerFunc(func() string { - return fmt.Sprintf("%s", s.testAppendedMANIFEST) + return fmt.Sprintf("%s", testAppendedMANIFEST) }), - gomock.Eq([]byte(s.testAppendedMANIFEST)), + gomock.Eq([]byte(testAppendedMANIFEST)), ), ) return eq @@ -145,7 +146,6 @@ func (s *WorkflowRunTestSuite) BeforeTest(_, _ string) { s.workAbsDir, err = os.Getwd() require.NoError(s.T(), err) s.inputsAbsDir = filepath.Join(s.workAbsDir, testArgumentsDir) - s.testAppendedMANIFEST = "{\"mainWorkFlowURL\":\"haplotypecaller-gvcf-gatk4.wdl\",\"inputFileURLs\":[\"haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json\",\"" + filepath.Join(testArgumentsDir, testArgsFileName) + "\"],\"engineOptions\":\"--no-cache\"}" s.manager = &Manager{ Project: s.mockProjectClient, @@ -230,7 +230,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithS3Args() { s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().NoError(err) { @@ -265,7 +264,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithLocalArgs() { s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().NoError(err) { @@ -290,7 +288,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_NoArgs() { s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().NoError(err) { @@ -303,12 +300,12 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_OptionsFile() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockStorageClient.EXPECT().ReadAsBytes(testOptionFilePath).Return([]byte(testOptionFileLocal), nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, "", testOptionFilePath) if s.Assert().NoError(err) { s.Assert().Equal(testRun1Id, actualId) @@ -320,7 +317,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_WithLocalArgs() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - fmt.Printf("%s, Jonathan", testArgumentsPath) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) @@ -337,7 +333,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_WithLocalArgs() { s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, testArgumentsPath, "") if s.Assert().NoError(err) { @@ -350,11 +345,11 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_NoArgs() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.wfInstance.WorkflowName = testS3WorkflowName s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testS3WorkflowName, "", "") if s.Assert().NoError(err) { @@ -365,7 +360,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_NoArgs() { func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadProjectSpecFailure() { errorMessage := "failed to read project specification" s.mockProjectClient.EXPECT().Read().Return(spec.Project{}, errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, "", "", "") if s.Assert().Error(err) { @@ -376,7 +370,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadProjectSpecFailure() { func (s *WorkflowRunTestSuite) TestRunWorkflow_MissingWorkflowSpec() { s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, "dummy", "", "") if s.Assert().Error(err) { @@ -390,7 +383,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_InvalidWorkflowDefinitionUrl() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testInvalidWorkflowName, "", "") if s.Assert().Error(err) { @@ -404,12 +396,12 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CompressionFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot compress file" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(false) s.mockZip.EXPECT().CompressToTmp(testFullWorkflowLocalUrl).Return("", errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -424,7 +416,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_SSMClientFailed() { errorMessage := "cannot connect to SSM" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return("", errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -444,7 +435,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_UploadToS3Failed() { s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) @@ -463,18 +453,8 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadArgsFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot read input" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) - s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) - s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) - s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) - s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) - s.mockFileInfo.EXPECT().IsDir().Return(true) - s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) - s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte{}, errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) - s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().Error(err) { @@ -491,21 +471,20 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_UploadInputFailed() { s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockProjectClient.EXPECT().GetLocation().AnyTimes().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) - s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) - s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) - s.mockFileInfo.EXPECT().IsDir().Return(true) - s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) - s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) - s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) s.mockInputClient.EXPECT().UpdateInputs(s.inputsAbsDir, testInputS3Map, testOutputBucket, testFilePathKey).Return(nil, errors.New(errorMessage)) + // s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) + // s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) + // s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) + // s.mockFileInfo.EXPECT().IsDir().Return(true) + // s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) + // s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) + // s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().Error(err) { @@ -519,13 +498,13 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CfnFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot call CFN" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(false) s.mockZip.EXPECT().CompressToTmp(testFullWorkflowLocalUrl).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(cfn.StackInfo{}, errors.New(errorMessage)) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) @@ -541,12 +520,12 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CfnMissingWesUrlFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "wes endpoint for workflow type 'TypeLanguage' is missing in engine stack 'TestStackId'" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(false) s.mockZip.EXPECT().CompressToTmp(testFullWorkflowLocalUrl).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) stackInfo := cfn.StackInfo{ Id: testStackId, @@ -567,16 +546,15 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_WesFailed() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot call WES" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) - s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return("", errors.New(errorMessage)) s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) @@ -593,7 +571,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_DeployValidationFailed() { errorMessage := "context 'TestContext1' is not deployed" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, cfn.StackDoesNotExistError) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -607,7 +584,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_DeployValidationCfnFailed() { errorMessage := "some cfn error" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -621,12 +597,8 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CreateTempDir() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) errorMessage := "cannot dir error" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) - s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) - s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return("", errors.New(errorMessage)) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -647,7 +619,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CopyError() { s.mockFileInfo.EXPECT().IsDir().Return(true) s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(errors.New(errorMessage)) s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") if s.Assert().Error(err) { @@ -679,7 +650,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_RemoveErrorStillWorks() { s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) s.mockDdb.EXPECT().WriteWorkflowInstance(context.Background(), s.wfInstance).Return(nil) - s.mockOs.EXPECT().RemoveAll("temp").Return(nil) s.mockOs.EXPECT().Remove(testCompressedTmpPath).After(uploadCall).Return(nil) s.mockOs.EXPECT().Remove(testTmpAttachmentPath).Return(nil) From 16287a6e50237993d95a71d6156f8ac69150ae7c Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Wed, 12 Oct 2022 16:44:50 -0700 Subject: [PATCH 11/15] removed comments --- packages/cli/internal/pkg/cli/workflow/manager.go | 15 --------------- .../cli/internal/pkg/cli/workflow/workflow_run.go | 5 ++--- .../pkg/cli/workflow/workflow_run_test.go | 7 ------- 3 files changed, 2 insertions(+), 25 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index 9cf89c6c..da461241 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -251,24 +251,9 @@ func (m *Manager) packWorkflowPath() { return } - // var absoluteWorkflowPath string if fileInfo.IsDir() { - // absoluteWorkflowPath, err = createTempDir("", "workflow_*") log.Debug().Msgf("workflow path '%s' is a directory, packing contents ...", m.tempPath) - // if err != nil { - // m.err = err - // return - // } defer m.deleteTempDir() - - // log.Debug().Msgf("recursively copying content of '%s' to '%s'", m.path, m.tempPath) - // err = copyFileRecursivelyToLocation(absoluteWorkflowPath, m.path) - // if err != nil { - // log.Error().Err(err) - // m.err = err - // return - // } - log.Debug().Msgf("updating file references and loading packed content to '%s/%s'", m.bucketName, m.baseWorkflowKey) err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, m.tempPath, m.bucketName, m.baseWorkflowKey) if err != nil { diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run.go b/packages/cli/internal/pkg/cli/workflow/workflow_run.go index 39eb4b4b..11b51ff4 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run.go @@ -11,7 +11,7 @@ func (m *Manager) RunWorkflow(contextName, workflowName, inputsFileUrl string, o m.validateContextIsDeployed(contextName) m.setOutputBucket() m.parseWorkflowLocation() - m.readInput(inputsFileUrl) // initialize temp folder here, then delete once we get to packWorkflowPath? + m.readInput(inputsFileUrl) m.initializeTempDir() m.writeTempManifest() m.uploadInputsToS3() @@ -19,12 +19,11 @@ func (m *Manager) RunWorkflow(contextName, workflowName, inputsFileUrl string, o if m.isUploadRequired() { m.setBaseObjectKey(contextName, workflowName) m.setWorkflowPath() - m.packWorkflowPath() // where temp folder is initially created + m.packWorkflowPath() m.uploadWorkflowToS3() m.cleanUpWorkflow() } m.calculateFinalLocation() - // where everything was m.readOptionFile(optionFileUrl) m.setContextStackInfo(contextName) m.setWesUrl() diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go index 41295abd..d9daa8a1 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go @@ -478,13 +478,6 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_UploadInputFailed() { testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) s.mockInputClient.EXPECT().UpdateInputs(s.inputsAbsDir, testInputS3Map, testOutputBucket, testFilePathKey).Return(nil, errors.New(errorMessage)) - // s.mockInputClient.EXPECT().UpdateInputReferencesAndUploadToS3(testFullWorkflowLocalUrl, testTempDir, testOutputBucket, testWorkflowKey).Return(nil) - // s.mockOs.EXPECT().RemoveAll(testTempDir).Return(nil) - // s.mockOs.EXPECT().Stat(testFullWorkflowLocalUrl).Return(s.mockFileInfo, nil) - // s.mockFileInfo.EXPECT().IsDir().Return(true) - // s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) - // s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) - // s.mockOs.EXPECT().Remove(testCompressedTmpPath).Return(nil) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") if s.Assert().Error(err) { From d187776cc7b02fbecd51863ab80b7c1ae7be277b Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Mon, 17 Oct 2022 09:15:51 -0700 Subject: [PATCH 12/15] code refactored for workflow run to work --- .../cli/internal/pkg/cli/workflow/manager.go | 31 +++++++++++++----- .../internal/pkg/cli/workflow/workflow_run.go | 9 +++--- .../pkg/cli/workflow/workflow_run_test.go | 32 +++++++++++++------ 3 files changed, 50 insertions(+), 22 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index da461241..6e1a413e 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -36,6 +36,7 @@ var ( removeAll = os.RemoveAll osStat = os.Stat createTempDir = os.MkdirTemp + copyFile = osutils.CopyFile copyFileRecursivelyToLocation = osutils.CopyFileRecursivelyToLocation writeToTmp = func(namePattern, content string) (string, error) { f, err := os.CreateTemp("", namePattern) @@ -255,7 +256,7 @@ func (m *Manager) packWorkflowPath() { log.Debug().Msgf("workflow path '%s' is a directory, packing contents ...", m.tempPath) defer m.deleteTempDir() log.Debug().Msgf("updating file references and loading packed content to '%s/%s'", m.bucketName, m.baseWorkflowKey) - err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, m.tempPath, m.bucketName, m.baseWorkflowKey) + // err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, m.tempPath, m.bucketName, m.baseWorkflowKey) if err != nil { log.Error().Err(err) m.err = err @@ -300,12 +301,6 @@ func (m *Manager) uploadWorkflowToS3() { if m.err != nil { return } - log.Debug().Msgf("copying %s to temp directory", m.path) - err := copyFileRecursivelyToLocation("temp", m.path) - if err != nil { - m.err = err - return - } objectKey := fmt.Sprintf("%s/%s", m.baseWorkflowKey, workflowZip) log.Debug().Msgf("updloading '%s' to 's3://%s/%s", m.packPath, m.bucketName, objectKey) m.err = m.S3.UploadFile(m.bucketName, objectKey, m.packPath) @@ -334,6 +329,26 @@ func (m *Manager) readInput(inputUrl string) { m.input = input } +func (m *Manager) copyInputToTemp() { + if m.err != nil || m.inputsPath == "" { + return + } + log.Debug().Msgf("Copying input file to temp: %s", m.tempPath) + absInputsPath, err := filepath.Abs(m.inputsPath) + bytes, err := m.Storage.ReadAsBytes(m.inputsPath) + + if err != nil { + m.err = err + return + } + dest := filepath.Join(m.tempPath, filepath.Base(absInputsPath)) + err = m.Storage.WriteFromBytes(dest, bytes) + if err != nil { + m.err = err + return + } +} + func (m *Manager) parseInputToArguments() { if m.err != nil || m.input == nil { return @@ -380,7 +395,7 @@ func (m *Manager) writeTempManifest() { return } m.manifestPath = filepath.Join(m.tempPath, manifestFilename) - log.Debug().Msgf("Reading %s", m.manifestPath) + log.Debug().Msgf("Reading temp manifest %s", m.manifestPath) bytes, err := m.Storage.ReadAsBytes(m.manifestPath) if err != nil { m.err = err diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run.go b/packages/cli/internal/pkg/cli/workflow/workflow_run.go index 11b51ff4..ff291762 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run.go @@ -11,15 +11,16 @@ func (m *Manager) RunWorkflow(contextName, workflowName, inputsFileUrl string, o m.validateContextIsDeployed(contextName) m.setOutputBucket() m.parseWorkflowLocation() - m.readInput(inputsFileUrl) + m.setWorkflowPath() m.initializeTempDir() - m.writeTempManifest() + m.readInput(inputsFileUrl) m.uploadInputsToS3() + m.copyInputToTemp() + m.writeTempManifest() m.parseInputToArguments() if m.isUploadRequired() { m.setBaseObjectKey(contextName, workflowName) - m.setWorkflowPath() - m.packWorkflowPath() + m.packWorkflowPath() m.uploadWorkflowToS3() m.cleanUpWorkflow() } diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go index d9daa8a1..1dbf946f 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go @@ -51,6 +51,7 @@ const ( testCompressedTmpPath = "/tmp/123/workflow_1343535" testArgsFileName = "args.txt" testArgumentsDir = "workflow/path/" + testTempArgPath = testTempDir + "/" + testArgsFileName testArgumentsPath = testArgumentsDir + testArgsFileName testMANIFESTPath = testTempDir + "/MANIFEST.json" testMANIFEST = `{"mainWorkflowURL": "haplotypecaller-gvcf-gatk4.wdl","inputFileURLs": ["haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json"],"engineOptions": "--no-cache"}` @@ -102,7 +103,7 @@ type WorkflowRunTestSuite struct { manager *Manager } -func (s *WorkflowRunTestSuite) formatManifestBytesToString() gomock.Matcher { +func (s *WorkflowRunTestSuite) formatBytesToString(fileContents string) gomock.Matcher { eq := gomock.GotFormatterAdapter( gomock.GotFormatterFunc( func(i interface{}) string { @@ -110,9 +111,9 @@ func (s *WorkflowRunTestSuite) formatManifestBytesToString() gomock.Matcher { }), gomock.WantFormatter( gomock.StringerFunc(func() string { - return fmt.Sprintf("%s", testAppendedMANIFEST) + return fmt.Sprintf("%s", fileContents) }), - gomock.Eq([]byte(testAppendedMANIFEST)), + gomock.Eq([]byte(fileContents)), ), ) return eq @@ -219,8 +220,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithS3Args() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testTempArgPath, s.formatBytesToString(testInputS3)).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatBytesToString(testAppendedMANIFEST)).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputS3), &testInputS3Map) @@ -251,8 +254,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_WithLocalArgs() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testTempArgPath, s.formatBytesToString(testInputLocal)).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatBytesToString(testAppendedMANIFEST)).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputLocalToS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) @@ -300,6 +305,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_LocalFile_OptionsFile() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockStorageClient.EXPECT().ReadAsBytes(testOptionFilePath).Return([]byte(testOptionFileLocal), nil) @@ -318,8 +324,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_WithLocalArgs() { s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testTempArgPath, s.formatBytesToString(testInputLocal)).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatBytesToString(testAppendedMANIFEST)).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputLocalToS3).Return(testTmpAttachmentPath, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockProjectClient.EXPECT().GetLocation().AnyTimes().Return(testProjectFileDir) @@ -345,6 +353,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_S3Object_NoArgs() { s.mockCfn.EXPECT().GetStackStatus(testContext1Stack).Return(types.StackStatusCreateComplete, nil) s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockCfn.EXPECT().GetStackInfo(testContext1Stack).Return(s.testStackInfo, nil) s.mockWes.EXPECT().RunWorkflow(context.Background(), gomock.Any()).Return(testRun1Id, nil) @@ -454,6 +463,8 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_ReadArgsFailed() { errorMessage := "cannot read input" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) + s.mockTmp.EXPECT().TempDir("", "workflow_*").Return(testTempDir, nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte{}, errors.New(errorMessage)) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, testArgumentsPath, "") @@ -471,10 +482,8 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_UploadInputFailed() { s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockProjectClient.EXPECT().GetLocation().AnyTimes().Return(testProjectFileDir) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) - s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputLocal), nil) - s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) + s.mockTmp.EXPECT().TempDir("", "workflow_*").AnyTimes().Return(testTempDir, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputLocal), &testInputS3Map) s.mockInputClient.EXPECT().UpdateInputs(s.inputsAbsDir, testInputS3Map, testOutputBucket, testFilePathKey).Return(nil, errors.New(errorMessage)) @@ -591,6 +600,7 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_CreateTempDir() { errorMessage := "cannot dir error" s.mockProjectClient.EXPECT().Read().Return(s.testProjSpec, nil) s.mockSsmClient.EXPECT().GetOutputBucket().Return(testOutputBucket, nil) + s.mockProjectClient.EXPECT().GetLocation().Return(testProjectFileDir) s.mockTmp.EXPECT().TempDir("", "workflow_*").Return("", errors.New(errorMessage)) actualId, err := s.manager.RunWorkflow(testContext1Name, testLocalWorkflowName, "", "") @@ -634,8 +644,10 @@ func (s *WorkflowRunTestSuite) TestRunWorkflow_RemoveErrorStillWorks() { s.mockZip.EXPECT().CompressToTmp(testTempDir).Return(testCompressedTmpPath, nil) uploadCall := s.mockS3Client.EXPECT().UploadFile(testOutputBucket, testWorkflowZipKey, testCompressedTmpPath).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().ReadAsBytes(testArgumentsPath).Return([]byte(testInputS3), nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testTempArgPath, s.formatBytesToString(testInputS3)).Return(nil) s.mockStorageClient.EXPECT().ReadAsBytes(testMANIFESTPath).Return([]byte(testMANIFEST), nil) - s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatManifestBytesToString()).Return(nil) + s.mockStorageClient.EXPECT().WriteFromBytes(testMANIFESTPath, s.formatBytesToString(testAppendedMANIFEST)).Return(nil) s.mockTmp.EXPECT().Write(testArgsFileName+"_*", testInputS3).Return(testTmpAttachmentPath, nil) testInputS3Map := make(map[string]interface{}) _ = json.Unmarshal([]byte(testInputS3), &testInputS3Map) From baf33b6ba1314ece788c7be766d0b5590d95f99e Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Mon, 17 Oct 2022 13:29:19 -0700 Subject: [PATCH 13/15] updated code to upload input --- packages/cli/internal/pkg/cli/workflow/manager.go | 5 ++--- packages/cli/internal/pkg/cli/workflow/workflow_run_test.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index 6e1a413e..5512db49 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -36,7 +36,6 @@ var ( removeAll = os.RemoveAll osStat = os.Stat createTempDir = os.MkdirTemp - copyFile = osutils.CopyFile copyFileRecursivelyToLocation = osutils.CopyFileRecursivelyToLocation writeToTmp = func(namePattern, content string) (string, error) { f, err := os.CreateTemp("", namePattern) @@ -256,7 +255,7 @@ func (m *Manager) packWorkflowPath() { log.Debug().Msgf("workflow path '%s' is a directory, packing contents ...", m.tempPath) defer m.deleteTempDir() log.Debug().Msgf("updating file references and loading packed content to '%s/%s'", m.bucketName, m.baseWorkflowKey) - // err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, m.tempPath, m.bucketName, m.baseWorkflowKey) + err = m.InputClient.UpdateInputReferencesAndUploadToS3(m.path, m.tempPath, m.bucketName, m.baseWorkflowKey) if err != nil { log.Error().Err(err) m.err = err @@ -406,7 +405,7 @@ func (m *Manager) writeTempManifest() { m.err = err return } - data.InputFileURLs = append(data.InputFileURLs, m.inputsPath) + data.InputFileURLs = append(data.InputFileURLs, filepath.Base(m.inputsPath)) bytes, err = json.Marshal(data) if err != nil { m.err = err diff --git a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go index 1dbf946f..6c78745c 100644 --- a/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go +++ b/packages/cli/internal/pkg/cli/workflow/workflow_run_test.go @@ -55,7 +55,7 @@ const ( testArgumentsPath = testArgumentsDir + testArgsFileName testMANIFESTPath = testTempDir + "/MANIFEST.json" testMANIFEST = `{"mainWorkflowURL": "haplotypecaller-gvcf-gatk4.wdl","inputFileURLs": ["haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json"],"engineOptions": "--no-cache"}` - testAppendedMANIFEST = "{\"mainWorkFlowURL\":\"haplotypecaller-gvcf-gatk4.wdl\",\"inputFileURLs\":[\"haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json\",\"" + testArgumentsDir + testArgsFileName + "\"],\"engineOptions\":\"--no-cache\"}" + testAppendedMANIFEST = "{\"mainWorkFlowURL\":\"haplotypecaller-gvcf-gatk4.wdl\",\"inputFileURLs\":[\"haplotypecaller-gvcf-gatk4.hg38.wgs.inputs.json\",\"" + testArgsFileName + "\"],\"engineOptions\":\"--no-cache\"}" testOptionFileName = "test.json" testOptionFilePath = "file://path/to/" + testOptionFileName testWesUrl = "https://TestWesUrl.com/prod" From b027fa1d00ab7b1ee58465622a1046ffaeac4aaf Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Tue, 18 Oct 2022 09:55:53 -0700 Subject: [PATCH 14/15] updated input required logic --- packages/cli/internal/pkg/cli/workflow/manager.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index 5512db49..fc927c7c 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -31,7 +31,7 @@ import ( var ( compressToTmp = zipfile.CompressToTmp workflowZip = "workflow.zip" - manifestFilename = "MANIFEST.json" + manifestFilename = storage.ManifestFileName removeFile = os.Remove removeAll = os.RemoveAll osStat = os.Stat @@ -227,7 +227,9 @@ func (m *Manager) isUploadRequired() bool { scheme := strings.ToLower(m.parsedSourceURL.Scheme) m.isLocal = scheme == "" || scheme == "file" log.Debug().Msgf("workflow location is local? '%t', upload is required? '%t'", m.isLocal, m.isLocal) - return m.isLocal + inputIncluded := m.inputsPath != "" + log.Debug().Msgf("does input file exist? '%t'", inputIncluded) + return m.isLocal || inputIncluded } func (m *Manager) setWorkflowPath() { From be0877ab70544b87c56b662733227d9511ce0561 Mon Sep 17 00:00:00 2001 From: Jonathan Sandoval Date: Wed, 26 Oct 2022 10:55:39 -0700 Subject: [PATCH 15/15] refactored logic for wes adapter --- .../cli/internal/pkg/cli/workflow/manager.go | 3 +- .../wes/adapters/CromwellWESAdapter.py | 336 +++++++++--------- 2 files changed, 162 insertions(+), 177 deletions(-) diff --git a/packages/cli/internal/pkg/cli/workflow/manager.go b/packages/cli/internal/pkg/cli/workflow/manager.go index fc927c7c..fce412ee 100644 --- a/packages/cli/internal/pkg/cli/workflow/manager.go +++ b/packages/cli/internal/pkg/cli/workflow/manager.go @@ -122,7 +122,7 @@ type workflowOutputProps struct { } type ManifestProps struct { - MainWorkFlowURL string `json:"mainWorkFlowURL"` + MainWorkflowURL string `json:"mainWorkflowURL"` InputFileURLs []string `json:"inputFileURLs"` EngineOptions string `json:"engineOptions"` } @@ -238,6 +238,7 @@ func (m *Manager) setWorkflowPath() { } projectLocation := m.Project.GetLocation() workflowPath := m.parsedSourceURL.Path + log.Debug().Msgf("location ay: %s \n workflow %s", projectLocation, workflowPath) m.path = filepath.Join(projectLocation, workflowPath) log.Debug().Msgf("workflow path is '%s", m.path) } diff --git a/packages/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py b/packages/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py index 8a787430..a782a378 100644 --- a/packages/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py +++ b/packages/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py @@ -206,7 +206,6 @@ def run_workflow( # Check if the WES workflow service is healthy. This will throw an exception if not. self._check_if_wes_service_healthy_() - self.logger.debug(f"RUN_WORKFLOW :: wes service is healthy") if not workflow_params: workflow_params = {} @@ -236,7 +235,9 @@ def run_workflow( f"RUN_WORKFLOW :: retrieving '{workflow_url}' => {tmpdir}" ) try: - props = get_workflow_from_s3(workflow_url, tmpdir, workflow_type) + props = self.get_workflow_from_s3( + workflow_url, tmpdir, workflow_type + ) except RuntimeError as e: raise InvalidRequestError(e) self.logger.debug( @@ -260,15 +261,6 @@ def run_workflow( f"RUN_WORKFLOW :: retrieved workflow attachment : {file.filename}" ) - if workflow_params.get("workflowInputs"): - if file.filename == workflow_params.get("workflowInputs"): - # these are inputs supplied at the command line - # they take highest priority and should be last on the list - if not files.get("workflowInputFiles"): - files["workflowInputFiles"] = [] - - files["workflowInputFiles"] += [file] - # create indexed workflow input keys and files # it should be workflowInputs, workflowInputs_2, ... , workflowInputs_5 if files.get("workflowInputFiles"): @@ -464,219 +456,211 @@ def _server_path(self, *args): args = [str(arg) for arg in args] return "/".join([self.url_prefix] + args) + def get_workflow_from_s3(self, s3_uri: str, localpath: str, workflow_type: str): + """ + Retrieves a workflow from S3 -def get_workflow_from_s3(s3_uri: str, localpath: str, workflow_type: str): - """ - Retrieves a workflow from S3 - - :param s3_uri: The S3 URI to the workflow (e.g. s3://bucketname/path/to/workflow.zip) - :param localpath: The location on the local filesystem to download the workflow - :param workflow_type: Type of workflow to expect (e.g. wdl, cwl, etc) + :param s3_uri: The S3 URI to the workflow (e.g. s3://bucketname/path/to/workflow.zip) + :param localpath: The location on the local filesystem to download the workflow + :param workflow_type: Type of workflow to expect (e.g. wdl, cwl, etc) - :rtype: dict of `data` and `files` + :rtype: dict of `data` and `files` - If the object is a generic file the file is set as `workflowSource` + If the object is a generic file the file is set as `workflowSource` - If the object is a `workflow.zip` file containing a single file, that file is set as `workflowSource` + If the object is a `workflow.zip` file containing a single file, that file is set as `workflowSource` - If the object is a `workflow.zip` file containing multiple files with a MANIFEST.json the MANIFEST is expected to have - * a mainWorkflowURL property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource` - * optionally, if an inputFileURLs property exists that provides a list of relative file paths in the zip to input.json, it will be used to set `workflowInputs` - * optionally, if an optionFileURL property exists that provides a relative file path in the zip to an options.json file, it will be used to set `workflowOptions` + If the object is a `workflow.zip` file containing multiple files with a MANIFEST.json the MANIFEST is expected to have + * a mainWorkflowURL property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource` + * optionally, if an inputFileURLs property exists that provides a list of relative file paths in the zip to input.json, it will be used to set `workflowInputs` + * optionally, if an optionFileURL property exists that provides a relative file path in the zip to an options.json file, it will be used to set `workflowOptions` - If the object is a `workflow.zip` file containing multiple files without a MANIFEST.json - * a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource` - * optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found - * optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions` + If the object is a `workflow.zip` file containing multiple files without a MANIFEST.json + * a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource` + * optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found + * optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions` - If the object is a `workflow.zip` file containing multiple files, the `workflow.zip` file is set as `workflowDependencies` - """ - s3 = boto3.resource("s3") - - u = urlparse(s3_uri) - bucket = s3.Bucket(u.netloc) - key = u.path[1:] + If the object is a `workflow.zip` file containing multiple files, the `workflow.zip` file is set as `workflowDependencies` + """ + s3 = boto3.resource("s3") - data = dict() - files = dict() + u = urlparse(s3_uri) + bucket = s3.Bucket(u.netloc) + key = u.path[1:] - if not key: - raise RuntimeError("invalid or missing S3 object key") + data = dict() + files = dict() - try: - file = path.join(localpath, path.basename(key)) - bucket.download_file(key, file) - except botocore.exceptions.ClientError as e: - raise RuntimeError(f"invalid S3 object: {e}") + if not key: + raise RuntimeError("invalid or missing S3 object key") - if path.basename(file) == "workflow.zip": try: - props = parse_workflow_zip_file(file, workflow_type) - except Exception as e: - raise RuntimeError(f"{s3_uri} is not a valid workflow.zip file: {e}") - - if props.get("data"): - data.update(props.get("data")) - - if props.get("files"): - files.update(props.get("files")) - else: - files["workflowSource"] = open(file, "rb") - - return {"data": data, "files": files} - + file = path.join(localpath, path.basename(key)) + bucket.download_file(key, file) + except botocore.exceptions.ClientError as e: + raise RuntimeError(f"invalid S3 object: {e}") + + if path.basename(file) == "workflow.zip": + try: + props = self.parse_workflow_zip_file(file, workflow_type) + except Exception as e: + raise RuntimeError(f"{s3_uri} is not a valid workflow.zip file: {e}") + + if props.get("data"): + data.update(props.get("data")) + + if props.get("files"): + files.update(props.get("files")) + else: + files["workflowSource"] = open(file, "rb") -def parse_workflow_zip_file(file, workflow_type): - """ - Processes a workflow zip bundle + return {"data": data, "files": files} - :param file: String or Path-like path to a workflow.zip file - :param workflow_type: String, type of workflow to expect (e.g. "wdl") + def parse_workflow_zip_file(self, file, workflow_type): + """ + Processes a workflow zip bundle - :rtype: dict of `data` and `files` + :param file: String or Path-like path to a workflow.zip file + :param workflow_type: String, type of workflow to expect (e.g. "wdl") - If the zip only contains a single file, that file is set as `workflowSource` + :rtype: dict of `data` and `files` - If the zip contains multiple files with a MANIFEST.json file, the MANIFEST is used to determine - appropriate `data` and `file` arguments. (See: parse_workflow_manifest_file()) + If the zip only contains a single file, that file is set as `workflowSource` - If the zip contains multiple files without a MANIFEST.json file: - * a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource` - * optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found - * optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions` + If the zip contains multiple files with a MANIFEST.json file, the MANIFEST is used to determine + appropriate `data` and `file` arguments. (See: parse_workflow_manifest_file()) - If the zip contains multiple files, the original zip is set as `workflowDependencies` - """ - data = dict() - files = dict() + If the zip contains multiple files without a MANIFEST.json file: + * a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource` + * optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found + * optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions` - wd = path.dirname(file) - with zipfile.ZipFile(file) as zip: - zip.extractall(wd) + If the zip contains multiple files, the original zip is set as `workflowDependencies` + """ + data = dict() + files = dict() - contents = zip.namelist() - if not contents: - raise RuntimeError("empty workflow.zip") + wd = path.dirname(file) + with zipfile.ZipFile(file) as zip: + zip.extractall(wd) - if len(contents) == 1: - # single file workflow - files["workflowSource"] = open(path.join(wd, contents[0]), "rb") + contents = zip.namelist() + if not contents: + raise RuntimeError("empty workflow.zip") - else: - # multifile workflow - if "MANIFEST.json" in contents: - props = parse_workflow_manifest_file(path.join(wd, "MANIFEST.json")) - - if props.get("data"): - data.update(props.get("data")) - - if props.get("files"): - files.update(props.get("files")) + if len(contents) == 1: + # single file workflow + files["workflowSource"] = open(path.join(wd, contents[0]), "rb") else: - if not f"main.{workflow_type.lower()}" in contents: - raise RuntimeError(f"'main.{workflow_type}' file not found") - - files["workflowSource"] = open( - path.join(wd, f"main.{workflow_type.lower()}"), "rb" - ) - - input_files = [f for f in contents if f.startswith("inputs")] - if input_files: - if not files.get("workflowInputFiles"): - files["workflowInputFiles"] = [] + # multifile workflow + if "MANIFEST.json" in contents: + props = self.parse_workflow_manifest_file( + path.join(wd, "MANIFEST.json") + ) - for input_file in input_files: - files[f"workflowInputFiles"] += [ - open(path.join(wd, input_file), "rb") - ] + if props.get("data"): + data.update(props.get("data")) - if "options.json" in contents: - files["workflowOptions"] = open(path.join(wd, "options.json"), "rb") + if props.get("files"): + files.update(props.get("files")) - # add the original zip bundle as a workflow dependencies file - files["workflowDependencies"] = open(file, "rb") + else: + if not f"main.{workflow_type.lower()}" in contents: + raise RuntimeError(f"'main.{workflow_type}' file not found") - return {"data": data, "files": files} + files["workflowSource"] = open( + path.join(wd, f"main.{workflow_type.lower()}"), "rb" + ) + if "options.json" in contents: + files["workflowOptions"] = open( + path.join(wd, "options.json"), "rb" + ) -def parse_workflow_manifest_file(manifest_file): - """ - Reads a MANIFEST.json file for a workflow zip bundle + # add the original zip bundle as a workflow dependencies file + files["workflowDependencies"] = open(file, "rb") - :param manifest_file: String or Path-like path to a MANIFEST.json file + return {"data": data, "files": files} - :rtype: dict of `data` and `files` + def parse_workflow_manifest_file(self, manifest_file): + """ + Reads a MANIFEST.json file for a workflow zip bundle + + :param manifest_file: String or Path-like path to a MANIFEST.json file + + :rtype: dict of `data` and `files` + + MANIFEST.json is expected to be formatted like: + .. code-block:: json + { + "mainWorkflowURL": "relpath/to/workflow", + "inputFileURLs": [ + "relpath/to/input-file-1", + "relpath/to/input-file-2", + ... + ], + "optionsFileURL" "relpath/to/option-file + } - MANIFEST.json is expected to be formatted like: - .. code-block:: json - { - "mainWorkflowURL": "relpath/to/workflow", - "inputFileURLs": [ - "relpath/to/input-file-1", - "relpath/to/input-file-2", - ... - ], - "optionsFileURL" "relpath/to/option-file - } + The `mainWorkflowURL` property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource` - The `mainWorkflowURL` property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource` + The inputFileURLs property is optional and provides a list of relative file paths in the zip to input.json files. The list is assumed + to be in the order the inputs should be applied - e.g. higher list index is higher priority. If present, it will be used to set + `workflowInputs(_\d)` arguments. - The inputFileURLs property is optional and provides a list of relative file paths in the zip to input.json files. The list is assumed - to be in the order the inputs should be applied - e.g. higher list index is higher priority. If present, it will be used to set - `workflowInputs(_\d)` arguments. + The optionsFileURL property is optional and provides a relative file path in the zip to an options.json file. If present, it will be + used to set `workflowOptions`. - The optionsFileURL property is optional and provides a relative file path in the zip to an options.json file. If present, it will be - used to set `workflowOptions`. + """ + data = dict() + files = dict() + with open(manifest_file, "rt") as f: + manifest = json.loads(f.read()) + u = urlparse(manifest["mainWorkflowURL"]) - """ - data = dict() - files = dict() - with open(manifest_file, "rt") as f: - manifest = json.loads(f.read()) - - u = urlparse(manifest["mainWorkflowURL"]) - if not u.scheme or u.scheme == "file": - # expect "/path/to/file" or "file:///path/to/file" - # root is relative to the zip root - files["workflowSource"] = open( - workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb" - ) + if not u.scheme or u.scheme == "file": + # expect "/path/to/file" or "file:///path/to/file" + # root is relative to the zip root + files["workflowSource"] = open( + workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb" + ) - else: - data["workflowUrl"] = manifest["mainWorkflowUrl"] + else: + data["workflowUrl"] = manifest["mainWorkflowUrl"] - if manifest.get("inputFileURLs"): - if not files.get("workflowInputFiles"): + if manifest.get("inputFileURLs"): files["workflowInputFiles"] = [] - for url in manifest["inputFileURLs"]: - u = urlparse(url) - if not u.scheme or u.scheme == "file": - files[f"workflowInputFiles"] += [ - open( - workflow_manifest_url_to_path(u, path.dirname(manifest_file)), - "rb", + for url in manifest["inputFileURLs"]: + u = urlparse(url) + if not u.scheme or u.scheme == "file": + files[f"workflowInputFiles"] += [ + open( + workflow_manifest_url_to_path( + u, path.dirname(manifest_file) + ), + "rb", + ) + ] + + else: + raise InvalidRequestError( + f"unsupported input file url scheme for: '{url}'" ) - ] + if manifest.get("optionsFileURL"): + u = urlparse(manifest["optionsFileURL"]) + if not u.scheme or u.scheme == "file": + files["workflowOptions"] = open( + workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb" + ) else: raise InvalidRequestError( - f"unsupported input file url scheme for: '{url}'" + f"unsupported option file url scheme for: '{manifest['optionFileURL']}'" ) - if manifest.get("optionsFileURL"): - u = urlparse(manifest["optionsFileURL"]) - if not u.scheme or u.scheme == "file": - files["workflowOptions"] = open( - workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb" - ) - else: - raise InvalidRequestError( - f"unsupported option file url scheme for: '{manifest['optionFileURL']}'" - ) - - return {"data": data, "files": files} + return {"data": data, "files": files} def workflow_manifest_url_to_path(url, parent_dir=None):