# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import random import numpy as np import torch from nemo.utils import AppState, logging try: from apex.transformer.log_util import set_logging_level HAVE_APEX = True except (ImportError, ModuleNotFoundError): HAVE_APEX = False try: from megatron.core import tensor_parallel from megatron.core.parallel_state import ( RankGenerator, get_pipeline_model_parallel_rank, set_expert_model_parallel_rank, set_expert_model_parallel_world_size, set_pipeline_model_parallel_rank, set_pipeline_model_parallel_world_size, set_tensor_model_parallel_rank, set_tensor_model_parallel_world_size, ) HAVE_MEGATRON_CORE = True except (ImportError, ModuleNotFoundError): HAVE_MEGATRON_CORE = False try: from megatron.core.num_microbatches_calculator import ( ConstantNumMicroBatchesCalculator, get_current_global_batch_size, get_micro_batch_size, get_num_microbatches, init_num_microbatches_calculator, ) MCORE_MB_CALCULATOR = True except (ImportError, ModuleNotFoundError): logging.warning("Megatron num_microbatches_calculator not found, using Apex version.") if HAVE_APEX: from apex.transformer.microbatches import ConstantNumMicroBatches as ConstantNumMicroBatchesCalculator from apex.transformer.pipeline_parallel.utils import ( get_current_global_batch_size, get_micro_batch_size, get_num_microbatches, ) from apex.transformer.pipeline_parallel.utils import ( setup_microbatch_calculator as init_num_microbatches_calculator, ) MCORE_MB_CALCULATOR = False def initialize_model_parallel_for_nemo( world_size, global_rank, local_rank, tensor_model_parallel_size=1, expert_model_parallel_size=1, expert_tensor_parallel_size=None, pipeline_model_parallel_size=1, virtual_pipeline_model_parallel_size=None, pipeline_model_parallel_split_rank=None, pipeline_model_parallel_comm_backend=None, context_parallel_size=1, encoder_tensor_model_parallel_size=0, encoder_pipeline_model_parallel_size=0, micro_batch_size=None, global_batch_size=None, rampup_batch_size=None, use_fp8=False, init_mpi_proc_group=False, seed=1234, apex_transformer_log_level=30, use_tp_pp_dp_mapping=False, use_te_rng_tracker=False, num_distributed_optimizer_instances=1, nccl_communicator_config_path=None, use_sharp=False, use_gloo_process_groups: bool = True, ): """Initialize model parallel groups in NeMo.""" assert ( pipeline_model_parallel_split_rank is None or pipeline_model_parallel_split_rank == 0 ), "pipeline_model_parallel_split_rank is deprecated." assert encoder_pipeline_model_parallel_size == 0 and ( encoder_tensor_model_parallel_size == 0 or encoder_tensor_model_parallel_size == tensor_model_parallel_size ), ( "encoder_pipeline_model_parallel_size is temporarily " "unavailable. We are working on a refactoring to add it back." ) # updating NeMo globals app_state = AppState() app_state.global_rank = global_rank app_state.world_size = world_size app_state.local_rank = local_rank app_state.use_tp_pp_dp_mapping = use_tp_pp_dp_mapping app_state.expert_model_parallel_size = expert_model_parallel_size app_state.tensor_model_parallel_size = tensor_model_parallel_size app_state.pipeline_model_parallel_size = pipeline_model_parallel_size app_state.virtual_pipeline_model_parallel_size = virtual_pipeline_model_parallel_size app_state.context_parallel_size = context_parallel_size app_state.encoder_tensor_model_parallel_size = encoder_tensor_model_parallel_size app_state.encoder_pipeline_model_parallel_size = encoder_pipeline_model_parallel_size app_state.pipeline_model_parallel_comm_backend = pipeline_model_parallel_comm_backend app_state.use_fp8 = use_fp8 app_state.use_sharp = use_sharp app_state.init_mpi_proc_group = init_mpi_proc_group app_state.expert_tensor_parallel_size = expert_tensor_parallel_size app_state.num_distributed_optimizer_instances = num_distributed_optimizer_instances app_state.nccl_communicator_config_path = nccl_communicator_config_path app_state.use_gloo_process_groups = use_gloo_process_groups ( app_state.tensor_model_parallel_rank, app_state.pipeline_model_parallel_rank, app_state.expert_model_parallel_rank, app_state.expert_tensor_parallel_rank, app_state.model_parallel_size, app_state.data_parallel_size, app_state.pipeline_model_parallel_split_rank, app_state.virtual_pipeline_model_parallel_rank, ) = fake_initialize_model_parallel( world_size=world_size, rank=global_rank, tensor_model_parallel_size_=tensor_model_parallel_size, pipeline_model_parallel_size_=pipeline_model_parallel_size, virtual_pipeline_model_parallel_size_=virtual_pipeline_model_parallel_size, pipeline_model_parallel_split_rank_=pipeline_model_parallel_split_rank, context_parallel_size_=context_parallel_size, expert_model_parallel_size_=expert_model_parallel_size, expert_tensor_parallel_size_=expert_tensor_parallel_size, encoder_tensor_model_parallel_size_=encoder_tensor_model_parallel_size, encoder_pipeline_model_parallel_size_=encoder_pipeline_model_parallel_size, use_tp_pp_dp_mapping=use_tp_pp_dp_mapping, ) # update apex.transformer globals set_tensor_model_parallel_world_size(app_state.tensor_model_parallel_size) set_tensor_model_parallel_rank(app_state.tensor_model_parallel_rank) set_expert_model_parallel_world_size(app_state.expert_model_parallel_size) set_expert_model_parallel_rank(app_state.expert_model_parallel_rank) set_pipeline_model_parallel_world_size( app_state.pipeline_model_parallel_size + app_state.encoder_pipeline_model_parallel_size ) set_pipeline_model_parallel_rank(app_state.pipeline_model_parallel_rank) tensor_parallel.random.initialize_rng_tracker(use_te_rng_tracker=use_te_rng_tracker) if seed is not None: # @chcui not setting seed is for model conversion. always set seed for training/inference. _set_random_seed(seed) if global_batch_size and micro_batch_size is not None: # TODO: add rampup_batch_size here when we have it implemented if MCORE_MB_CALCULATOR: from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR if _GLOBAL_NUM_MICROBATCHES_CALCULATOR is None: init_num_microbatches_calculator( rank=global_rank, global_batch_size=global_batch_size, micro_batch_size=micro_batch_size, data_parallel_size=app_state.data_parallel_size, rampup_batch_size=rampup_batch_size, decrease_batch_size_if_needed=False, ) else: if isinstance(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, ConstantNumMicroBatchesCalculator): assert get_current_global_batch_size() == global_batch_size assert get_micro_batch_size() == micro_batch_size assert get_num_microbatches() == global_batch_size // ( micro_batch_size * app_state.data_parallel_size ) else: raise Exception("Microbatch calculator already initialized.") else: from apex.transformer.pipeline_parallel.utils import _GLOBAL_NUM_MICROBATCHES_CALCULATOR if _GLOBAL_NUM_MICROBATCHES_CALCULATOR is None: init_num_microbatches_calculator( rank=global_rank, global_batch_size=global_batch_size, micro_batch_size=micro_batch_size, data_parallel_size=app_state.data_parallel_size, rampup_batch_size=rampup_batch_size, decrease_batch_size_if_needed=False, ) else: if isinstance(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, ConstantNumMicroBatchesCalculator): assert get_current_global_batch_size() == global_batch_size assert get_micro_batch_size() == micro_batch_size assert get_num_microbatches() == global_batch_size // ( micro_batch_size * app_state.data_parallel_size ) else: raise Exception("Microbatch calculator already initialized.") app_state._is_megatron_initialized = True if HAVE_APEX: set_logging_level(apex_transformer_log_level) def _set_random_seed(seed_): """Set random seed for reproducability.""" if seed_ is not None and seed_ > 0: # Ensure that different pipeline MP stages get different seeds. seed = seed_ + (100 * get_pipeline_model_parallel_rank()) random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.device_count() > 0: tensor_parallel.model_parallel_cuda_manual_seed(seed) else: raise ValueError('Seed ({}) should be a positive integer.'.format(seed_)) def set_jit_fusion_options(): """Set PyTorch JIT layer fusion options.""" # set flags if we are using the 21.10 container if torch.__version__ == "1.10.0a0+0aef44c": # nvfuser torch._C._jit_set_profiling_executor(True) torch._C._jit_set_profiling_mode(True) torch._C._jit_override_can_fuse_on_cpu(False) torch._C._jit_override_can_fuse_on_gpu(False) torch._C._jit_set_texpr_fuser_enabled(False) torch._C._jit_set_nvfuser_enabled(True) torch._C._debug_set_autodiff_subgraph_inlining(False) def fake_initialize_model_parallel( world_size, rank, tensor_model_parallel_size_, pipeline_model_parallel_size_, pipeline_model_parallel_split_rank_=None, virtual_pipeline_model_parallel_size_=None, expert_model_parallel_size_=1, expert_tensor_parallel_size_=None, context_parallel_size_=1, encoder_tensor_model_parallel_size_=0, encoder_pipeline_model_parallel_size_=0, use_tp_pp_dp_mapping=False, ): """ Fake initialize model data parallel groups so that we can instantiate model parallel models before DDP is initialized. This is needed because PTL execution flow is init model, init trainer -> call trainer.fit(model). DDP is initialized during .fit. This function is taken from megatron.core.parallel_state and modified so that the distributed groups are not created. We only need the tensor parallel and pipeline parallel ranks to instantiate the model. Arguments: tensor_model_parallel_size: number of GPUs used to parallelize model tensor. pipeline_model_parallel_size: number of GPUs used to parallelize model pipeline. context_parallel_size: number of GPUs used to parallelize tokens of each input. Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize the model pipeline. The present function will create 8 tensor model-parallel groups, 4 pipeline model-parallel groups and 8 data-parallel groups as: 8 data_parallel groups: [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15] 8 tensor model-parallel groups: [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15] 4 pipeline model-parallel groups: [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15] Note that for efficiency, the caller should make sure adjacent ranks are on the same DGX box. For example if we are using 2 DGX-1 boxes with a total of 16 GPUs, rank 0 to 7 belong to the first box and ranks 8 to 15 belong to the second box. """ assert pipeline_model_parallel_split_rank_ is None, "pipeline_model_parallel_split_rank is deprecated." assert encoder_pipeline_model_parallel_size_ == 0 and ( encoder_tensor_model_parallel_size_ == 0 or encoder_tensor_model_parallel_size_ == tensor_model_parallel_size_ ), ( "encoder_pipeline_model_parallel_size is temporarily " "unavailable. We are working on a refactoring to add it back." ) # Get world size and rank. Ensure some consistencies. tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size) pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size) model_parallel_size = tensor_model_parallel_size * pipeline_model_parallel_size context_parallel_size = min(context_parallel_size_, world_size) if encoder_pipeline_model_parallel_size_ is None: encoder_pipeline_model_parallel_size = 0 else: encoder_pipeline_model_parallel_size = encoder_pipeline_model_parallel_size_ if encoder_tensor_model_parallel_size_ == 0 and encoder_pipeline_model_parallel_size_ > 0: encoder_tensor_model_parallel_size = tensor_model_parallel_size else: encoder_tensor_model_parallel_size = encoder_tensor_model_parallel_size_ if encoder_tensor_model_parallel_size > 0: assert encoder_pipeline_model_parallel_size > 0 assert ( encoder_tensor_model_parallel_size <= tensor_model_parallel_size ), "We do not support encoders with more TP than the decoder." encoder_model_size = ( encoder_tensor_model_parallel_size * encoder_pipeline_model_parallel_size * context_parallel_size ) decoder_model_size = tensor_model_parallel_size * pipeline_model_parallel_size * context_parallel_size total_model_size = encoder_model_size + decoder_model_size assert world_size % total_model_size == 0, ( f'world_size: {world_size} must be divisible by total world_size: ' f'(decoder_)tensor_model_parallel_size {tensor_model_parallel_size} ' f'* (decoder_)pipeline_model_parallel_size {pipeline_model_parallel_size} ' f'* (decoder_)context_parallel_size {context_parallel_size} + ' f'encoder_tensor_model_parallel_size {encoder_tensor_model_parallel_size} ' f'* encoder_pipeline_model_parallel_size {encoder_pipeline_model_parallel_size} ' f'* context_parallel_size {context_parallel_size}' ) data_parallel_size = world_size // total_model_size encoder_world_size = encoder_model_size * data_parallel_size decoder_world_size = decoder_model_size * data_parallel_size assert encoder_world_size + decoder_world_size == world_size virtual_pipeline_model_parallel_rank = None if virtual_pipeline_model_parallel_size_ is not None: virtual_pipeline_model_parallel_rank = 0 if encoder_world_size > 0: encoder_rank_generator = RankGenerator( tp=encoder_tensor_model_parallel_size, ep=1, dp=data_parallel_size, pp=encoder_pipeline_model_parallel_size, cp=context_parallel_size, order='tp-cp-ep-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp', rank_offset=0, ) else: encoder_rank_generator = None decoder_rank_generator = RankGenerator( tp=tensor_model_parallel_size, ep=1, dp=data_parallel_size, pp=pipeline_model_parallel_size, cp=context_parallel_size, order='tp-cp-ep-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp', rank_offset=encoder_world_size, ) # Build expert rank generator if expert_tensor_parallel_size_ is None: expert_tensor_parallel_size_ = tensor_model_parallel_size expert_tensor_model_pipeline_parallel_size = ( expert_tensor_parallel_size_ * expert_model_parallel_size_ * pipeline_model_parallel_size ) expert_data_parallel_size = decoder_world_size // expert_tensor_model_pipeline_parallel_size if decoder_world_size % expert_tensor_model_pipeline_parallel_size != 0: raise RuntimeError( f"decoder world_size ({decoder_world_size}) is not divisible by " f"expert_tensor_model_pipeline_parallel size ({expert_tensor_model_pipeline_parallel_size})" ) expert_decoder_rank_generator = RankGenerator( tp=expert_tensor_parallel_size_, ep=expert_model_parallel_size_, dp=expert_data_parallel_size, pp=pipeline_model_parallel_size, cp=1, order='tp-cp-ep-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp', rank_offset=encoder_world_size, ) assert ( not use_tp_pp_dp_mapping or pipeline_model_parallel_size == 1 or expert_data_parallel_size == data_parallel_size ), "When not using pp-last rank ordering, the data parallel size of the attention and moe layers must be the same" assert decoder_rank_generator.get_ranks("pp") == expert_decoder_rank_generator.get_ranks( "pp" ), f"Pipeline parallel groups are expected to be the same for Non-Expert and Expert part, \ but got {decoder_rank_generator.get_ranks('pp')} and {expert_decoder_rank_generator.get_ranks('pp')}" def generator_wrapper(group_type, is_expert=False, **kwargs): from itertools import cycle """The `RankGenerator` class produces a hyper-rectangle for a given set of tensor, pipeline, data, expert, and context parallelism. If we have an encoder, in addition to the default decoder, we essentially instantiate two `RankGenerator` classes to construct the parallelism for each module separately, and we then have to stitch them together for the right groups. For now, this means pp and tp-pp.""" if is_expert: d_ranks = expert_decoder_rank_generator.get_ranks(group_type, **kwargs) else: d_ranks = decoder_rank_generator.get_ranks(group_type, **kwargs) if encoder_rank_generator is None: for x in d_ranks: yield x return e_ranks = encoder_rank_generator.get_ranks(group_type, **kwargs) if group_type == 'pp': # Map 1 encoder tp rank to several decoder tp ranks, because # these won't be the same size. for x, y in zip(cycle(e_ranks), d_ranks): yield x + y elif group_type == 'tp-pp': # For this group, we can just return the concatenated # groups together, because their sizes are the same. assert len(e_ranks) == len(d_ranks) for x, y in zip(e_ranks, d_ranks): yield x + y else: for x in e_ranks: yield x for x in d_ranks: yield x # Build the data-parallel groups. all_data_parallel_group_ranks_with_cp = [] for ranks in generator_wrapper('dp'): if rank in ranks: data_parallel_group = list(ranks) logging.info(f'Rank {rank} has data parallel group : {data_parallel_group}') for ranks_with_cp in generator_wrapper('dp-cp'): all_data_parallel_group_ranks_with_cp.append(ranks_with_cp) if rank in ranks_with_cp: data_parallel_group_with_cp = ranks_with_cp logging.info( f'Rank {rank} has combined group of data parallel and context parallel : {data_parallel_group_with_cp}' ) data_parallel_rank = data_parallel_group.index(rank) logging.info( f'All data parallel group ranks with context parallel combined: {all_data_parallel_group_ranks_with_cp}' ) logging.info(f'Ranks {rank} has data parallel rank: {data_parallel_rank}') # Build the context-parallel groups. all_context_parallel_group_ranks = [] for ranks in generator_wrapper('cp'): all_context_parallel_group_ranks.append(ranks) if rank in ranks: context_parallel_group = ranks logging.info(f'Rank {rank} has context parallel group: {context_parallel_group}') context_parallel_rank = context_parallel_group.index(rank) logging.info(f'All context parallel group ranks: {all_context_parallel_group_ranks}') logging.info(f'Ranks {rank} has context parallel rank: {context_parallel_rank}') # Build the model-parallel groups. all_model_parallel_group_ranks = [] for ranks in generator_wrapper('tp-pp'): all_model_parallel_group_ranks.append(ranks) if rank in ranks: logging.info(f'Rank {rank} has model parallel group: {list(ranks)}') logging.info(f'All model parallel group ranks: {all_model_parallel_group_ranks}') # Build the tensor model-parallel groups. all_tensor_model_parallel_group_ranks = [] tensor_model_parallel_group = None for ranks in generator_wrapper('tp'): all_tensor_model_parallel_group_ranks.append(ranks) if rank in ranks: tensor_model_parallel_group = ranks logging.info(f'Rank {rank} has tensor model parallel group: {tensor_model_parallel_group}') tensor_model_parallel_rank = tensor_model_parallel_group.index(rank) logging.info(f'All tensor model parallel group ranks: {all_tensor_model_parallel_group_ranks}') logging.info(f'Rank {rank} has tensor model parallel rank: {tensor_model_parallel_rank}') # EP rank expert_model_parallel_rank = 0 if expert_model_parallel_size_ is not None and expert_model_parallel_size_ > 1: all_expert_model_parallel_ranks = [] for ranks in generator_wrapper('ep', is_expert=True): all_expert_model_parallel_ranks.append(ranks) if rank in ranks: expert_model_parallel_rank = list(ranks).index(rank) logging.info(f'All expert model parallel group ranks: {all_expert_model_parallel_ranks}') logging.info(f'Rank {rank} has expert model parallel rank: {expert_model_parallel_rank}') # ETP expert_tensor_parallel_rank = 0 if expert_tensor_parallel_size_ is not None and expert_tensor_parallel_size_ > 1: all_expert_tensor_parallel_ranks = [] for ranks in generator_wrapper('tp', is_expert=True): all_expert_tensor_parallel_ranks.append(ranks) if rank in ranks: expert_tensor_parallel_rank = list(ranks).index(rank) logging.info(f'All expert tensor parallel group ranks: {all_expert_tensor_parallel_ranks}') logging.info(f'Rank {rank} has expert tensor parallel rank: {expert_tensor_parallel_rank}') # Build the pipeline model-parallel groups and embedding groups # (first and last rank in each pipeline model-parallel group). all_pipeline_model_parallel_group_ranks = [] all_embedding_group_ranks = [] pipeline_model_parallel_group = None embedding_group = None embedding_rank = None for ranks in generator_wrapper('pp'): all_pipeline_model_parallel_group_ranks.append(ranks) if rank in ranks: pipeline_model_parallel_group = ranks logging.info(f'Rank {rank} has pipeline model parallel group: {pipeline_model_parallel_group}') # Setup embedding group (to exchange gradients between # first and last stages). if len(ranks) > 1: embedding_ranks = [ranks[0], ranks[-1]] all_embedding_group_ranks.append(embedding_ranks) else: embedding_ranks = ranks all_embedding_group_ranks.append(list(embedding_ranks)) if rank in embedding_ranks: embedding_group = list(embedding_ranks) logging.info(f'Rank {rank} has embedding group: {embedding_group}') pipeline_model_parallel_rank = pipeline_model_parallel_group.index(rank) if embedding_group is not None: embedding_rank = embedding_group.index(rank) logging.info(f'All pipeline model parallel group ranks: {all_pipeline_model_parallel_group_ranks}') logging.info(f'Rank {rank} has pipeline model parallel rank {pipeline_model_parallel_rank}') logging.info(f'All embedding group ranks: {all_pipeline_model_parallel_group_ranks}') logging.info(f'Rank {rank} has embedding rank: {embedding_rank}') return ( tensor_model_parallel_rank, pipeline_model_parallel_rank, expert_model_parallel_rank, expert_tensor_parallel_rank, model_parallel_size, data_parallel_size, pipeline_model_parallel_split_rank_, virtual_pipeline_model_parallel_rank, )