From 5175ae206aa1f02700ce7f38422c4718dcbfafa0 Mon Sep 17 00:00:00 2001 From: Avik Datta Date: Tue, 7 Apr 2026 23:23:12 +0100 Subject: [PATCH 1/4] format change --- .../utils/dag22_bclconvert_demult_utils.py | 68 ++++++++++++------- 1 file changed, 43 insertions(+), 25 deletions(-) diff --git a/igf_airflow/utils/dag22_bclconvert_demult_utils.py b/igf_airflow/utils/dag22_bclconvert_demult_utils.py index 45816867..42f9da1a 100644 --- a/igf_airflow/utils/dag22_bclconvert_demult_utils.py +++ b/igf_airflow/utils/dag22_bclconvert_demult_utils.py @@ -4318,10 +4318,12 @@ def bclconvert_singularity_wrapper( bcl_num_decompression_threads: int = 1, bcl_num_parallel_tiles: int = 1, lane_id : int = 0, - tile_id_list: tuple = (), + tiles: str|None = None, + exclude_tiles: str|None = None, first_tile_only: bool = False, - dry_run: bool = False) \ - -> str: + dry_run: bool = False, + additional_params: dict[str, str]| None = None + ) -> str: try: check_file_path(image_path) check_file_path(input_dir) @@ -4331,39 +4333,55 @@ def bclconvert_singularity_wrapper( temp_dir = get_temp_dir(use_ephemeral_space=True) bclconvert_cmd = [ "bcl-convert", - "--bcl-input-directory", input_dir, - "--output-directory", output_dir, - "--sample-sheet", samplesheet_file, - "--bcl-num-conversion-threads", str(bcl_num_conversion_threads), - "--bcl-num-compression-threads", str(bcl_num_compression_threads), - "--bcl-num-decompression-threads", str(bcl_num_decompression_threads), - "--bcl-num-parallel-tiles", str(bcl_num_parallel_tiles), - "--bcl-sampleproject-subdirectories", "true", - "--strict-mode", "true"] + f"--bcl-input-directory {str(input_dir)}", + f"--output-directory {str(output_dir)}", + f"--sample-sheet {str(samplesheet_file)}", + f"--bcl-num-conversion-threads {str(bcl_num_conversion_threads)}", + f"--bcl-num-compression-threads {str(bcl_num_compression_threads)}", + f"--bcl-num-decompression-threads {str(bcl_num_decompression_threads)}", + f"--bcl-num-parallel-tiles {str(bcl_num_parallel_tiles)}", + "--bcl-sampleproject-subdirectories true", + "--strict-mode true" + ] if first_tile_only: - bclconvert_cmd.\ - extend(["--first-tile-only", "true"]) + bclconvert_cmd.append( + "--first-tile-only true" + ) if lane_id > 0: - bclconvert_cmd.\ - extend(["--bcl-only-lane", str(lane_id)]) - if len(tile_id_list) > 0: - bclconvert_cmd.\ - extend(["--tiles", ",".join(tile_id_list)]) - bclconvert_cmd = \ - ' '.join(bclconvert_cmd) + bclconvert_cmd.append( + f"--bcl-only-lane {str(lane_id)}", + ) + if tiles is not None: + bclconvert_cmd.append( + f"--tiles {str(tiles)}" + ) + if exclude_tiles is not None: + bclconvert_cmd.append( + f"--exclude-tiles {str(exclude_tiles)}" + ) + if additional_params is not None: + for key, val in additional_params.items(): + bclconvert_cmd.append( + f"{str(key)} {str(val)}" + ) + bclconvert_cmd = ' '.join(bclconvert_cmd) bind_paths = [ f'{temp_dir}:/var/log', os.path.dirname(samplesheet_file), input_dir, - os.path.dirname(output_dir)] + os.path.dirname(output_dir) + ] cmd = execute_singuarity_cmd( image_path=image_path, command_string=bclconvert_cmd, bind_dir_list=bind_paths, - dry_run=dry_run) + dry_run=dry_run + ) return cmd - except: - raise + except Exception as e: + raise ValueError( + f"Failed with error: {e}" + ) def run_bclconvert_func(**context): try: From 0e8f8e4b6f1381ab81f92e81822d26ad10f88d41 Mon Sep 17 00:00:00 2001 From: Avik Datta Date: Tue, 7 Apr 2026 23:23:23 +0100 Subject: [PATCH 2/4] change for test barcodes --- .../dag23_test_bclconvert_demult_utils.py | 172 ++++++++++-------- 1 file changed, 99 insertions(+), 73 deletions(-) diff --git a/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py b/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py index 64e887d8..62f9ce13 100644 --- a/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py +++ b/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py @@ -564,103 +564,127 @@ def bcl_convert_run_func(**context): try: ti = context.get('ti') dag_run = context.get('dag_run') - demult_dir_key = \ - context['params'].\ - get( + demult_dir_key = ( + context['params'] + .get( 'demult_dir_key', 'demult_dir') - demult_info_key = \ - context['params'].\ - get( + ) + demult_info_key = ( + context['params'] + .get( 'demult_info_key', 'demult_info') - formatted_samplesheet_xcom_key = \ - context['params'].\ - get( + ) + formatted_samplesheet_xcom_key = ( + context['params'] + .get( 'formatted_samplesheet_xcom_key', 'formatted_samplesheet_data') - formatted_samplesheet_xcom_task = \ - context['params'].\ - get( + ) + formatted_samplesheet_xcom_task = ( + context['params'] + .get( 'formatted_samplesheet_xcom_task', 'get_formatted_samplesheets') - samplesheet_index = \ - context['params'].\ - get('samplesheet_index') - index_column = \ - context['params'].\ - get('index_column', 'index') - lane_column = \ - context['params'].\ - get('lane_column', 'lane') - filtered_df = \ - _fetch_formatted_samplesheet_info_from_task_instance( - ti=ti, - samplesheet_index=samplesheet_index, - index_column=index_column, - samplesheet_key=formatted_samplesheet_xcom_key, - samplesheet_task=formatted_samplesheet_xcom_task) + ) + samplesheet_index = ( + context['params'] + .get('samplesheet_index') + ) + index_column = ( + context['params'] + .get('index_column', 'index') + ) + lane_column = ( + context['params'] + .get('lane_column', 'lane') + ) + filtered_df = _fetch_formatted_samplesheet_info_from_task_instance( + ti=ti, + samplesheet_index=samplesheet_index, + index_column=index_column, + samplesheet_key=formatted_samplesheet_xcom_key, + samplesheet_task=formatted_samplesheet_xcom_task + ) lane_id = filtered_df[lane_column].values[0] if str(lane_id) != 'all': lane_id = int(lane_id) else: lane_id = 0 - mod_samplesheet_xcom_key = \ - context['params'].\ - get( + mod_samplesheet_xcom_key = ( + context['params'] + .get( 'mod_samplesheet_xcom_key', 'mod_samplesheet') - mod_samplesheet_xcom_task = \ - context['params'].\ - get('mod_samplesheet_xcom_task') - samplesheet_file = \ - ti.xcom_pull( - task_ids=mod_samplesheet_xcom_task, - key=mod_samplesheet_xcom_key) - check_file_path(samplesheet_file) + ) + mod_samplesheet_xcom_task = ( + context['params'] + .get('mod_samplesheet_xcom_task') + ) + samplesheet_file = ti.xcom_pull( + task_ids=mod_samplesheet_xcom_task, + key=mod_samplesheet_xcom_key + ) + check_file_path( + samplesheet_file + ) seqrun_id = None - if dag_run is not None and \ - dag_run.conf is not None and \ - dag_run.conf.get('seqrun_id') is not None: - seqrun_id = \ - dag_run.conf.get('seqrun_id') + if ( + dag_run is not None and + dag_run.conf is not None and + dag_run.conf.get('seqrun_id') is not None + ): + seqrun_id = dag_run.conf.get( + 'seqrun_id' + ) if seqrun_id is None: - raise ValueError('seqrun_id is not found in dag_run.conf') + raise ValueError( + 'seqrun_id is not found in dag_run.conf' + ) # seqrun path - seqrun_path = \ - os.path.join(HPC_SEQRUN_BASE_PATH, seqrun_id) - temp_dir = \ - get_temp_dir(use_ephemeral_space=True) - demult_dir = \ - os.path.join( - temp_dir, - 'demult') - cmd = \ - bclconvert_singularity_wrapper( - image_path=BCLCONVERT_IMAGE, - input_dir=seqrun_path, - output_dir=demult_dir, - lane_id=lane_id, - samplesheet_file=samplesheet_file, - bcl_num_conversion_threads=1, - bcl_num_compression_threads=1, - bcl_num_decompression_threads=1, - bcl_num_parallel_tiles=1, - first_tile_only=True) + seqrun_path = os.path.join( + HPC_SEQRUN_BASE_PATH, + seqrun_id + ) + temp_dir = get_temp_dir( + use_ephemeral_space=True + ) + demult_dir = os.path.join( + temp_dir, + 'demult' + ) + cmd = bclconvert_singularity_wrapper( + image_path=BCLCONVERT_IMAGE, + input_dir=seqrun_path, + output_dir=demult_dir, + lane_id=lane_id, + samplesheet_file=samplesheet_file, + bcl_num_conversion_threads=1, + bcl_num_compression_threads=1, + bcl_num_decompression_threads=1, + bcl_num_parallel_tiles=1, + tiles='110[1-5]' + ) check_file_path( os.path.join( demult_dir, 'Reports', - 'Demultiplex_Stats.csv')) + 'Demultiplex_Stats.csv' + ) + ) ti.xcom_push( key=demult_dir_key, - value=demult_dir) + value=demult_dir + ) demult_info = { index_column: samplesheet_index, - 'demult_dir': demult_dir} + 'demult_dir': demult_dir + } ti.xcom_push( key=demult_info_key, - value=demult_info) + value=demult_info + ) except Exception as e: log.error(e) ti = context.get('ti') @@ -669,16 +693,18 @@ def bcl_convert_run_func(**context): f"dag_id={ti.dag_id}", f"run_id={ti.run_id}", f"task_id={ti.task_id}", - f"attempt={ti.try_number}.log"] - message = \ - f"Error: {e}, Log: {os.path.join(*log_file_path)}" + f"attempt={ti.try_number}.log" + ] + message = f"Error: {e}," + \ + f"Log: {os.path.join(*log_file_path)}" send_log_to_channels( slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, task_id=context['task'].task_id, dag_id=context['task'].dag_id, comment=message, - reaction='fail') + reaction='fail' + ) raise def _fetch_formatted_samplesheet_info_from_task_instance( From c1015856d376171ea13f48e9a53d37499ba01983 Mon Sep 17 00:00:00 2001 From: Avik Datta Date: Tue, 7 Apr 2026 23:27:26 +0100 Subject: [PATCH 3/4] change the regexp --- igf_airflow/utils/dag23_test_bclconvert_demult_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py b/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py index 62f9ce13..206f1ef1 100644 --- a/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py +++ b/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py @@ -664,7 +664,7 @@ def bcl_convert_run_func(**context): bcl_num_compression_threads=1, bcl_num_decompression_threads=1, bcl_num_parallel_tiles=1, - tiles='110[1-5]' + tiles='110[1-3]' ) check_file_path( os.path.join( From 3190f033959f428ddb897ef69e5d269d85616ec9 Mon Sep 17 00:00:00 2001 From: Avik Datta Date: Tue, 7 Apr 2026 23:35:24 +0100 Subject: [PATCH 4/4] bugfix --- igf_airflow/utils/dag23_test_bclconvert_demult_utils.py | 2 +- test/igf_airflow/dag22_bclconvert_demult_utils_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py b/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py index 206f1ef1..6c8f1588 100644 --- a/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py +++ b/igf_airflow/utils/dag23_test_bclconvert_demult_utils.py @@ -654,7 +654,7 @@ def bcl_convert_run_func(**context): temp_dir, 'demult' ) - cmd = bclconvert_singularity_wrapper( + _ = bclconvert_singularity_wrapper( image_path=BCLCONVERT_IMAGE, input_dir=seqrun_path, output_dir=demult_dir, diff --git a/test/igf_airflow/dag22_bclconvert_demult_utils_test.py b/test/igf_airflow/dag22_bclconvert_demult_utils_test.py index 08d48e4e..e40ccc91 100644 --- a/test/igf_airflow/dag22_bclconvert_demult_utils_test.py +++ b/test/igf_airflow/dag22_bclconvert_demult_utils_test.py @@ -330,7 +330,7 @@ def test_bclconvert_singularity_wrapper(self): output_dir=self.output_dir, samplesheet_file=self.samplesheet_file, lane_id=1, - tile_id_list=('s_1_1102', 's_1_1103'), + tiles='s_1_1102,s_1_1103', dry_run=True) self.assertTrue('--tiles s_1_1102,s_1_1103' in cmd)