Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/bert/pretrain_bert.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,6 @@ def train_valid_test_datasets_provider(train_val_test_num_samples, vp_stage=None

args = parse_and_validate_args(args_defaults={'tokenizer_type': 'BertWordPieceLowerCase'})
full_config = pretrain_cfg_container_from_args(args)
pretrain(full_config, train_valid_test_datasets_provider, model_provider,
pretrain(full_config, train_valid_test_datasets_provider,
ModelType.encoder_or_decoder,
forward_step)
forward_step, model_provider)
2 changes: 1 addition & 1 deletion examples/mimo/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def model_provider(
pretrain(
full_config,
train_valid_test_datasets_provider,
model_provider,
ModelType.encoder_or_decoder,
forward_step,
model_provider,
)
2 changes: 1 addition & 1 deletion examples/multimodal/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,9 @@ def write_online_eval_to_tensorboard(data, iteration, writer, walltime=None):

pretrain(
train_valid_test_dataloaders_provider,
model_provider,
ModelType.encoder_or_decoder,
forward_step,
model_provider,
args_defaults={'tokenizer_type': 'GPT2BPETokenizer'},
extra_args_provider=add_multimodal_extra_args,
process_non_loss_data_func=write_online_eval_to_tensorboard,
Expand Down
2 changes: 1 addition & 1 deletion examples/t5/pretrain_t5.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ def t5_position_embedding_ranks(pp_ranks):
pretrain(
full_config,
train_valid_test_datasets_provider,
model_provider,
ModelType.encoder_or_decoder,
forward_step,
model_provider,
get_embedding_ranks=t5_embedding_ranks,
get_position_embedding_ranks=t5_position_embedding_ranks,
)
84 changes: 0 additions & 84 deletions gpt_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,90 +49,6 @@ def _set_if_missing(attr: str, value) -> None:
_set_if_missing('yarn_correction_range_round_to_int', args.yarn_correction_range_round_to_int)


def gpt_builder(args, pre_process, post_process, vp_stage=None, config=None, pg_collection=None):
print_rank_0('building GPT model ...')
if config is None:
if args.yaml_cfg is not None:
config = core_transformer_config_from_yaml(args, "language_model")
else:
config = core_transformer_config_from_args(args)
_apply_yarn_config_from_args(config, args)
if args.spec is not None:
transformer_layer_spec = import_module(args.spec)
else:
use_te = args.transformer_impl == "transformer_engine"

if args.experimental_attention_variant is not None:
transformer_layer_spec = get_transformer_block_with_experimental_attention_variant_spec(
config=config, vp_stage=vp_stage
)
elif args.num_experts:
# Define the decoder block spec
transformer_layer_spec = get_gpt_decoder_block_spec(
config,
use_transformer_engine=use_te,
normalization=args.normalization,
qk_l2_norm=args.qk_l2_norm,
vp_stage=vp_stage,
)
elif args.heterogeneous_layers_config_path is not None:
assert not (config.transformer_impl == "inference_optimized")
transformer_layer_spec = get_gpt_heterogeneous_layer_spec(config, use_te)
else:
# Define the decoder layer spec
transformer_layer_spec = _get_transformer_layer_spec(use_te, config)
mtp_block_spec = None
if args.mtp_num_layers is not None:
assert not (config.transformer_impl == "inference_optimized")
if (
hasattr(transformer_layer_spec, 'layer_specs')
and len(transformer_layer_spec.layer_specs) == 0
):
# Get the decoder layer spec explicitly if no decoder layer in the last stage,
# Only happens with block spec (TransformerBlockSubmodules) when using MoE.
transformer_layer_spec_for_mtp = _get_transformer_layer_spec(use_te, config)
else:
# Define the decoder block spec
if args.experimental_attention_variant is not None:
decoder_layer_specs = (
get_transformer_layer_with_experimental_attention_variant_spec(config=config)
)
else:
decoder_layer_specs = get_gpt_decoder_layer_specs(
config,
use_transformer_engine=use_te,
normalization=args.normalization,
qk_l2_norm=args.qk_l2_norm,
vp_stage=vp_stage,
)
transformer_layer_spec_for_mtp = decoder_layer_specs[-1]
# Use spec of the last layer in decoder block as spec of the transformer layer in MTP
mtp_block_spec = get_gpt_mtp_block_spec(
config, transformer_layer_spec_for_mtp, use_transformer_engine=use_te, vp_stage=vp_stage
)

model = GPTModel(
config=config,
transformer_layer_spec=transformer_layer_spec,
vocab_size=args.padded_vocab_size,
max_sequence_length=args.max_position_embeddings,
pre_process=pre_process,
post_process=post_process,
fp16_lm_cross_entropy=args.fp16_lm_cross_entropy,
parallel_output=True,
share_embeddings_and_output_weights=not args.untie_embeddings_and_output_weights,
position_embedding_type=args.position_embedding_type,
rotary_percent=args.rotary_percent,
rotary_base=args.rotary_base,
rope_scaling=args.use_rope_scaling,
mtp_block_spec=mtp_block_spec,
vp_stage=vp_stage,
pg_collection=pg_collection,
)

return model


def _get_transformer_layer_spec(use_te, config):
"""Get transformer layer specification based on configuration.

Expand Down
53 changes: 0 additions & 53 deletions hybrid_builders.py

This file was deleted.

15 changes: 0 additions & 15 deletions mamba_builders.py

This file was deleted.

14 changes: 11 additions & 3 deletions megatron/training/models/gpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,15 @@ def default_layer_spec(config: "GPTModelConfig", vp_stage: int) -> ModuleSpec:
)
elif isinstance(transformer_cfg, HeterogeneousTransformerConfig):
return get_gpt_heterogeneous_layer_spec(transformer_cfg, use_te)
elif use_te:
else:
return _te_or_local_layer_spec(config, vp_stage)

def _te_or_local_layer_spec(config: "GPTModelConfig", vp_stage: int) -> ModuleSpec:
"""Need to be able to call just these branches for mtp transformer layer spec."""

transformer_cfg = config.transformer
use_te = transformer_cfg.transformer_impl == "transformer_engine"
if use_te:
if "use_te_op_fuser" in inspect.signature(get_gpt_layer_with_transformer_engine_spec).parameters:
kwargs = {"use_te_op_fuser": config.use_transformer_engine_op_fuser}
else:
Expand All @@ -105,6 +113,7 @@ def default_layer_spec(config: "GPTModelConfig", vp_stage: int) -> ModuleSpec:
use_kitchen_attention=config.transformer.use_kitchen_attention,
kitchen_attention_backend=config.transformer.kitchen_attention_backend,
mla_down_proj_fusion=getattr(config.transformer, "mla_down_proj_fusion", False),
use_grouped_gemm_for_dense_mlp=config.transformer.use_grouped_gemm_for_dense_mlp,
**kwargs,
)
else:
Expand Down Expand Up @@ -169,7 +178,6 @@ class GPTModelConfig(ModelConfig):
"""Config file when tp_comm_overlap is enabled."""

### settings for default layer spec options ###
use_transformer_engine_op_fuser: bool = False
use_arbitrary_attention_mask: bool | None = None

@override
Expand Down Expand Up @@ -396,7 +404,7 @@ def mtp_block_spec(
if hasattr(transformer_layer_spec, "layer_specs") and len(transformer_layer_spec.layer_specs) == 0:
# Get the decoder layer spec explicitly if no decoder layer in the last stage,
# Only happens with block spec (TransformerBlockSubmodules) when using MoE.
spec = default_layer_spec(config, vp_stage)
spec = _te_or_local_layer_spec(config, vp_stage)
else:
decoder_specs = get_gpt_decoder_layer_specs(transformer_cfg, use_transformer_engine=use_te, normalization=transformer_cfg.normalization, qk_l2_norm=transformer_cfg.qk_l2_norm, vp_stage=vp_stage)
spec = decoder_specs[-1]
Expand Down
8 changes: 0 additions & 8 deletions megatron/training/models/hybrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,6 @@ def build_model(
else:
hybrid_stack_spec = default_hybrid_stack_spec

assert (
getattr(self._model_config.transformer, "virtual_pipeline_model_parallel_size", None) is None
and vp_stage is None
), (
"Virtual pipeline model parallelism is temporarily unsupported in Hybrid "
"models due to upstream MCore HybridModel API dependency"
)

assert self._model_config.vocab_size is not None, "vocab_size must be configured before calling build_model()"
if self._model_config.should_pad_vocab:
padded_vocab_size = calculate_padded_vocab_size(
Expand Down
50 changes: 36 additions & 14 deletions megatron/training/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,6 @@ def reorder_inner_param_groups(optimizer_state_dict):
def pretrain(
cfg_container: PretrainConfigContainer,
train_valid_test_dataset_provider,
model_provider,
model_type,
forward_step_func,
process_non_loss_data_func=None,
Expand Down Expand Up @@ -1094,6 +1093,8 @@ def pretrain(
seed_ep_group=getattr(init_pg_collection, "ep", None),
seed_etp_group=getattr(init_pg_collection, "expt_tp", None),
)
# TODO (@maanug): temporary until initialize.py is refactored to build pgcollection as bridge does
pg_collection = ProcessGroupCollection.use_mpu_process_groups()

timestamp_after_initialize_megatron = time.time()

Expand Down Expand Up @@ -1221,7 +1222,10 @@ def pretrain(
# Model, optimizer, and learning rate.
timers('model-and-optimizer-setup', log_level=0).start(barrier=True)
model, optimizer, opt_param_scheduler = setup_model_and_optimizer(
model_provider, model_type, checkpointing_context=checkpointing_context
model_type,
checkpointing_context=checkpointing_context,
cfg_container=cfg_container,
pg_collection=pg_collection,
)

timers('model-and-optimizer-setup').stop()
Expand Down Expand Up @@ -1262,7 +1266,7 @@ def pretrain(
)

# Build an isolated inference config so training config remains unchanged
inference_config = copy.deepcopy(model_cfg)
inference_config = copy.deepcopy(cfg_container.model)
if args.rl_inference_tensor_model_parallel_size is not None:
inference_config.tensor_model_parallel_size = args.rl_inference_tensor_model_parallel_size
if args.rl_inference_pipeline_model_parallel_size is not None:
Expand Down Expand Up @@ -1306,12 +1310,11 @@ def pretrain(
model_alloc_ctx = nullcontext()

with model_alloc_ctx:
inference_model = get_model(
model_provider,
model_type,
wrap_with_ddp=False,
builder_cls = inference_config.get_builder_cls()
builder = builder_cls(inference_config)
inference_model = builder.build_distributed_models(
pg_collection=inference_pg_collection,
config=inference_config,
wrap_with_ddp=False,
)
inference_model[0].eval()

Expand Down Expand Up @@ -1990,10 +1993,11 @@ def get_megatron_ddp_config(args: argparse.Namespace) -> DistributedDataParallel


def setup_model_and_optimizer(
model_provider_func,
model_type,
checkpointing_context=None,
pg_collection=None,
*,
cfg_container: PretrainConfigContainer,
pg_collection: ProcessGroupCollection,
):
"""Setup model and optimizer."""
args = get_args()
Expand All @@ -2006,9 +2010,27 @@ def setup_model_and_optimizer(
has_rl_optimizer = args.perform_rl_step and not args.no_load_optim
skip_optimizer = not (has_normal_optimizer or has_rl_optimizer)
wrap_with_ddp = not skip_optimizer
model = get_model(
model_provider_func, model_type, wrap_with_ddp=wrap_with_ddp, pg_collection=pg_collection
)

def _build_model_wrapper(wrap_with_ddp: bool):
from megatron.training.utils import start_memory_history_recording

start_memory_history_recording(cfg_container.profiling)

cfg = cfg_container
model_config = cfg.model
builder_cls = model_config.get_builder_cls()
builder = builder_cls(model_config)
return builder.build_distributed_models(
pg_collection=pg_collection,
ddp_config=cfg.ddp,
overlap_param_gather_with_optimizer_step=cfg.optimizer.overlap_param_gather_with_optimizer_step,
use_megatron_fsdp=cfg.dist.use_megatron_fsdp,
use_torch_fsdp2=cfg.dist.use_torch_fsdp2,
wrap_with_ddp=wrap_with_ddp,
data_parallel_random_init=cfg.rng.data_parallel_random_init,
)

model = _build_model_wrapper(wrap_with_ddp)
unwrapped_model = unwrap_model(model)

if args.logits_save_dir is not None:
Expand Down Expand Up @@ -2081,7 +2103,7 @@ def setup_model_and_optimizer(
args.ffn_hidden_size = moe_ffn_hidden_size * args.moe_upcycling_granularity

# get dense model
dense_model_for_upcycling = get_model(model_provider_func, model_type)
dense_model_for_upcycling = _build_model_wrapper(wrap_with_ddp=True)

# recover moe upcycling related args in global args before executing upcycling
args.num_experts = num_experts
Expand Down
1 change: 1 addition & 0 deletions megatron/training/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@
)

from megatron.training.utils.log_utils import append_to_progress_log
from megatron.training.utils.utils import start_memory_history_recording
Loading
Loading