Shortcuts

collie.data.dataloader 源代码

from torch.utils.data import DistributedSampler, DataLoader
from deepspeed.runtime.data_pipeline.data_sampling.data_sampler import DeepSpeedDataSampler
from deepspeed.accelerator import get_accelerator
from deepspeed.runtime.data_pipeline.constants import CURRICULUM_LEARNING, \
    DATA_SAMPLING_NUM_WORKERS, DATA_SAMPLING, CURRICULUM_LEARNING_ENABLED

from collie.utils import env
from .batch_sampler import CollieBatchSampler

[文档]class CollieDataLoader(object): """ **CoLLiE** 封装的 DataLoader。 在流水线并行的情景下每次迭代取出 ``batch_size * accumulation_steps`` 个 sample。 :param dataset: :param batch_size: :param pin_memory: :param shuffle: :param accumulation_steps: :param collate_fn: :param num_workers: :param sampler: :param drop_last: 当最后一个 batch 样本数不足时是否丢弃。在流水线情况下如果为 ``False``,则会补齐最后一个 batch。 :param data_efficiency_config: DeepSpeed 中关于 ``Data Effiency`` 部分的设置 """ def __init__(self, dataset, batch_size, accumulation_steps=1, shuffle=False, pin_memory=True, collate_fn=None, num_workers=None, sampler=None, drop_last=False, data_efficiency_config={}): self.batch_size = batch_size if env.pp_size > 1: self.batch_size *= accumulation_steps try: self.curriculum_learning_enabled = data_efficiency_config[DATA_SAMPLING][CURRICULUM_LEARNING][CURRICULUM_LEARNING_ENABLED] except KeyError: self.curriculum_learning_enabled = False if self.curriculum_learning_enabled: sampler = DeepSpeedDataSampler( data_efficiency_config, len(dataset), self.batch_size, env.dp_rank, env.dp_size, env.dp_group, accumulation_steps, env.rank, drop_last=drop_last ) device_count = get_accelerator().device_count() num_workers = data_efficiency_config[DATA_SAMPLING][DATA_SAMPLING_NUM_WORKERS] else: if sampler is None: sampler = DistributedSampler( dataset=dataset, num_replicas=env.dp_size, rank=env.dp_rank, shuffle=shuffle ) device_count = 1 if num_workers is None: num_workers = 2 * device_count self.num_workers = num_workers self.sampler = sampler self.dataset = dataset self.collate_fn = collate_fn self.device_count = device_count self.pin_memory = pin_memory self.data = None self.drop_last = drop_last self.post_process_func = None if self.drop_last: self.len = len(self.sampler) // self.batch_size else: from math import ceil self.len = ceil(len(self.sampler) / self.batch_size) def __iter__(self): # TODO why DeepSpeedDataLoader do so? self._create_dataloader() return self def __len__(self): return self.len def __next__(self): if self.curriculum_learning_enabled: data = next(self.data_iterator) if self.post_process_func is not None: data = self.post_process_func(data, self.sampler.state_dict()) return data else: return next(self.data) def _create_dataloader(self): if self.curriculum_learning_enabled: self.dataloader = DataLoader( self.dataset, pin_memory=self.pin_memory, batch_sampler=self.sampler, num_workers=self.num_workers, collate_fn=self.collate_fn ) self.data_iterator = iter(self.dataloader) return self.dataloader else: if self.drop_last: last_batch = "drop" elif env.pp_size > 1: last_batch = "fill" else: last_batch = "normal" batch_sampler = CollieBatchSampler(self.sampler, self.batch_size, last_batch) self.dataloader = DataLoader(self.dataset, batch_sampler=batch_sampler, collate_fn=self.collate_fn, num_workers=self.num_workers) self.data = (x for x in self.dataloader) return self.dataloader