Datasets streaming bottlenecks: storage client memory/GC pressure vs sharding/cache/request amplification?
Hmm… This isn’t based on personal experience, but since it doesn’t seem like a simple problem, it looks like it would be best to sort things out first. This looks like a pretty layered problem:
Direct answer
Yes — I think this is a real pain point, but I would not frame it as one single bottleneck or as one generic “storage client problem”.
For large-scale training, the bottleneck can absolutely come from the data access layer rather than raw network bandwidth. But in practice, “data access layer” usually means several interacting pieces:
dataset layout
-> sharding / rank-node-worker assignment
-> DataLoader prefetch / multiprocessing / pinned memory
-> Hub/Xet or S3/GCS/Azure/fsspec/FUSE transport
-> local cache placement: NVMe/SSD vs NFS/shared filesystem
-> training topology: world_size, nodes, workers, epochs
So my short answer would be:
| Question | Practical answer |
|---|---|
| Is this a real pain point? | Yes, especially for multi-worker or multi-node training, large Parquet/WebDataset-style corpora, object storage, and shared filesystems. |
| Is it just raw bandwidth? | Usually no. Request patterns, shard layout, cache locality, DataLoader behavior, and memory pressure can dominate. |
| Is request amplification still the main issue? | Not always. HF has already improved the Hub streaming path significantly, including data-file resolution, request reduction, Parquet prefetching, and buffering. See Streaming datasets: 100x More Efficient. |
| Is memory/RSS growth real? | There are reports of it, but I would not immediately blame only datasets. PyTorch DataLoader workers, prefetching, pinned memory, transforms, decoding, Arrow/Parquet readers, and fsspec caches can all contribute. |
| Is sharding important? | Very important. Bad alignment between shard count and world_size can cause skipped samples, duplicated reads, poor locality, or uneven work. |
| Would a Rust streaming/cache layer help? | Maybe, but only if it solves topology-aware shard scheduling, bounded prefetch/memory, local cache locality, and observability. “Rust” alone is not the product; the useful thing would be a clearer data-plane contract. |
| What should users do today? | Use topology-friendly shards, split by node/rank explicitly, keep cache on local NVMe/SSD, tune DataLoader conservatively, and materialize heavy preprocessing when needed. |
So I would describe the minimum useful integration not as “a faster downloader”, but as something closer to:
a topology-aware streaming data plane for training.
That would include:
- shard planning aware of
world_size, ranks, nodes, and DataLoader workers - node-local cache locality across epochs
- bounded-memory prefetching
- clear interaction with PyTorch DataLoader
- support for Hub/Xet and object-storage-like backends
- diagnostics for skipped samples, duplicate reads, cache hits, remote bytes, request count, worker RSS, and per-rank throughput
Without those pieces, a lower-level cache or storage client might improve one workload while leaving the main distributed-training failure mode unresolved.
1. Why this is not just “streaming is slow”
For small or exploratory use, streaming=True is often exactly the right tool: it lets you iterate over large datasets without downloading everything first. The official docs describe dataset streaming as a way to work with a dataset as you iterate over it, instead of fully downloading it first:
- Datasets streaming docs
- Dataset vs IterableDataset
But for large-scale training, especially with many DataLoader workers, multiple ranks, multiple nodes, remote object storage, or shared filesystems, the bottleneck is often not simply “network bandwidth”.
A useful first-pass model is:
streaming performance = layout + sharding + DataLoader + cache + storage + topology
This is why the same dataset can feel fine in one setup and painful in another:
| Setup | Likely behavior |
|---|---|
| single process, local cache, simple iteration | usually easy to reason about |
| one GPU, remote Parquet, moderate workers | often workable, but DataLoader tuning matters |
| multi-GPU single node | shard splitting and batch consistency start to matter |
| multi-node DDP | rank/node-aware sharding and cache locality become first-class |
| object storage + many small files | request count and metadata/listing overhead can dominate |
| NFS/shared cache | cache contention and filesystem behavior can dominate |
| heavy transform/decode in workers | CPU/RSS/DataLoader behavior can dominate |
2. Important context: some older request-amplification issues have changed
One reason this topic is delicate is that Hugging Face has already improved part of the streaming stack.
The official post Streaming datasets: 100x More Efficient describes improvements such as:
- 100x fewer requests
- 10x faster data resolution
- 2x sample/sec
- 0 worker crashes at 256 concurrent workers
- better data-files caching across DataLoader workers
- Parquet prefetching
- more configurable buffering
So I would avoid a blanket claim like:
“HF streaming causes request storms.”
A safer statement is:
Some request-amplification paths have been improved, but large-scale training can still suffer when dataset layout, sharding, cache placement, and DataLoader concurrency are not aligned.
That distinction matters. The remaining pain is probably less about “plain download is slow” and more about distributed coordination.
Relevant links:
- Streaming datasets: 100x More Efficient
- Datasets streaming docs
- IterableDataset / main classes docs
3. Dataset layout matters more than it first appears
A streaming dataset is not just “a dataset stored remotely”.
The physical layout can determine:
- how many remote requests are needed
- how many shards each rank must open
- whether shards can be assigned cleanly to nodes
- whether cache locality is possible
- how much data is skipped
- whether prefetch helps or hurts
- whether one worker can keep a GPU fed
Important layout variables:
| Variable | Why it matters |
|---|---|
| Number of shards | Too few shards can limit parallelism; too many tiny shards can increase request and metadata overhead |
| Shard size | Huge shards can reduce scheduling flexibility; tiny shards can increase overhead |
| Parquet row group size | Affects range-read granularity and filtering/projection behavior |
| Compression | Can move the bottleneck from network to CPU |
| File count | Many small files can be painful on object storage |
| Balance across shards | Uneven shards can cause per-rank imbalance |
Divisibility by world_size |
Important for avoiding wasteful skip behavior |
| Format | Parquet, Arrow, WebDataset tar shards, MDS, raw JSONL, image/audio files all behave differently |
A practical rule:
If you control the dataset layout, design shards for the training topology, not only for storage convenience.
For large training jobs, I would prefer:
- enough shards to distribute across ranks/workers
- shard counts divisible by common
world_sizevalues - reasonably balanced shard sizes
- Parquet/WebDataset/MDS-style layouts over many tiny raw files
- predictable row-group or shard boundaries
- avoiding a single huge file unless the reader can split it well
Relevant links:
- HF Datasets streaming docs
- HF Hub dataset streaming guide
- WebDataset multinode docs
- MosaicML StreamingDataset
- StreamingDataset API docs
4. The num_shards vs world_size issue is probably central
One of the most important practical questions is:
Does the dataset shard cleanly across the distributed training topology?
The HF docs for iterable datasets describe a distinction that is easy to miss:
- if the dataset has a number of shards that divides evenly across
world_size, shards can be assigned more cleanly - otherwise, each node may keep only a fraction of examples and skip the others
That means a bad shard topology can turn into:
- unnecessary reads
- wasted remote traffic
- low cache hit rate
- poor locality
- uneven work across ranks
- slow epoch boundaries
A practical mental model:
good case:
rank/node reads mostly the shards it will actually train on
bad case:
rank/node touches lots of data, then discards/skips much of it
For small datasets, this may be acceptable.
For 10 TB, 100 TB, or multi-node training, this can become the whole problem.
Relevant links:
- HF Datasets main classes / IterableDataset docs
- Issue: IterableDataset distributed/DataLoader sharding performance traps
- Issue: streaming datasets doesn’t work properly with multi-node
- Issue: Problem in training iterable dataset
- Forum: Loading webdatasets across multiple nodes
5. split_dataset_by_node() helps, but it is not a universal fix
For multi-node training, split_dataset_by_node() is an important tool.
The forum answer in Loading webdatasets across multiple nodes is especially useful because it states the practical distinction clearly:
- by default, each node may stream the full dataset and skip samples to avoid duplicates
split_dataset_by_node()can assign shards to each node so each node streams only what it uses
That is a big difference.
But it does not mean every distributed streaming problem disappears.
Things that still need care:
| Issue | Why it matters |
|---|---|
| Shard count not divisible by world size | Can still cause unevenness or skipping |
| Shuffle behavior | Shuffling can fight cache locality |
| Epoch boundaries | Some reports mention hangs or crashes after an epoch |
| Batch size consistency | DDP/gradient sync can dislike uneven batches |
| Worker-level splitting | Node-level splitting is not always enough |
| Resumption | Mid-epoch restart can be expensive if ordering is not deterministic |
| Cache reuse | A split that changes every epoch can reduce cache locality |
Useful links:
- Forum: How to use split_dataset_by_node and shuffle on IterableDataset
- Forum: Keeping IterableDataset node-wise split fixed during DDP
- Forum: Loading webdatasets across multiple nodes
- Issue: streaming datasets doesn’t work properly with multi-node
6. DataLoader settings can make a streaming issue look like a memory leak
If RSS grows during streaming, I would not immediately conclude:
“HF Datasets has a memory leak.”
It might be true in some cases, but it is not the first conclusion I would jump to.
A streaming pipeline often involves:
remote read
-> decompression / decoding
-> Python objects
-> Arrow / Parquet reader
-> map/filter/transform
-> DataLoader worker process
-> prefetch queue
-> collate_fn
-> pinned memory
-> GPU transfer
Any of these can hold references or buffer data.
Important DataLoader knobs:
| Knob | Why it matters |
|---|---|
num_workers |
More worker processes can increase parallelism and memory use |
prefetch_factor |
Each worker can prepare multiple batches ahead |
pin_memory |
Puts fetched tensors in pinned host memory for faster GPU transfer |
persistent_workers |
Keeps worker processes alive across epochs |
custom collate_fn |
Can accidentally retain objects or create large temporary structures |
| decode/transform in workers | Can move CPU and memory pressure into worker processes |
Relevant links:
- PyTorch DataLoader docs
- TorchData Stateful DataLoader docs
- PyTorch issue: pin_memory / prefetch / persistent_workers memory behavior
- Forum: A streaming dataset’s memory footprint continually grows
- Issue: Memory leak when streaming
A good debugging baseline:
DataLoader(
dataset,
batch_size=<batch_size>,
num_workers=0,
pin_memory=False,
persistent_workers=False,
)
Then add concurrency back one variable at a time:
# Step 1: no multiprocessing
num_workers = 0
# Step 2: one worker
num_workers = 1
# Step 3: several workers, low prefetch
num_workers = 4
prefetch_factor = 1
# Step 4: only then test pinning/persistence
pin_memory = True
persistent_workers = True
If memory only grows after num_workers > 0, it may be DataLoader multiprocessing, prefetch, transform, or worker lifetime.
If memory grows even with num_workers=0, the issue is more likely in dataset iteration, decoding, transformation, Arrow/Parquet, Python object retention, or user code.
7. Cache placement is part of the runtime contract
For large jobs, cache is not just a convenience.
It is part of the runtime design.
Relevant Hugging Face cache variables:
| Variable | Role |
|---|---|
HF_HOME |
Base directory for Hugging Face local state |
HF_HUB_CACHE |
Hub repository/file cache |
HF_XET_CACHE |
Xet chunk cache |
HF_DATASETS_CACHE |
Datasets cache files, not the same as Hub file cache |
Links:
- huggingface_hub environment variables
- huggingface_hub cache management
- Datasets cache management
- Issue: HF_DATASETS_CACHE ignored?
A common cluster problem is that $HOME points to NFS or another shared filesystem.
That can make defaults bad:
# maybe bad on clusters if $HOME is on NFS
~/.cache/huggingface
A safer pattern for large training jobs is usually:
export HF_HOME=/local_nvme/hf
export HF_HUB_CACHE=/local_nvme/hf/hub
export HF_XET_CACHE=/local_nvme/hf/xet
export HF_DATASETS_CACHE=/local_nvme/hf/datasets
The exact paths depend on the cluster, but the principle is:
Put high-churn caches on node-local SSD/NVMe when possible.
HF’s upload guide explicitly recommends putting HF_XET_CACHE on a local disk such as NVMe/SSD when uploading from distributed filesystems like NFS, EBS, Lustre, or FSx.
Relevant links:
- HF upload guide: local Xet cache on NVMe/SSD
- Environment variables: HF_XET_CACHE
- Hub local cache docs
8. Hub/Xet is a separate layer from Datasets iteration
It is also useful to separate:
Datasets iteration problem
vs
Hub transfer/cache problem
The Hub has moved toward Xet-backed storage and hf_xet for file transfer and chunk caching.
That means a dataset load can involve more than “plain HTTP file download”.
But not every Datasets streaming problem is an Xet problem.
I would separate these cases:
| Symptom | More likely layer |
|---|---|
| CLI download stalls | Hub/Xet/network/cache |
load_dataset(..., streaming=True) stalls but CLI download works |
Datasets iteration / reader / transform / DataLoader |
| Xet disabled changes behavior | Hub/Xet/cache path |
| only S3/GCS/FUSE path is slow | object storage client / filesystem abstraction |
| only multi-node training is slow | sharding/topology/cache locality |
only num_workers > 0 leaks memory |
DataLoader / multiprocessing / prefetch |
Potential diagnostic toggles for Hub/Xet transfer paths:
# Current default path
hf download <repo_id> --repo-type dataset
# Xet high-performance mode
HF_XET_HIGH_PERFORMANCE=1 hf download <repo_id> --repo-type dataset
# Xet-disabled diagnostic path
HF_HUB_DISABLE_XET=1 hf download <repo_id> --repo-type dataset
I would treat HF_HUB_DISABLE_XET=1 as a diagnostic switch, not a universal permanent fix.
Relevant links:
- Using Xet storage
- huggingface_hub environment variables
- HF upload guide
9. S3/fsspec/FUSE/object storage can be a hidden bottleneck
If the dataset is not on the Hub, there is another layer:
S3 / GCS / Azure Blob / custom object storage
-> fsspec / s3fs / gcsfs / adlfs
-> range reads / block cache / retry policy
-> Python reader / Parquet reader / DataLoader worker
A path that looks like a normal filesystem may not behave like a normal filesystem.
FUSE/object-store mounts can hide:
- range-request amplification
- poor random-read behavior
- slow stat/list operations
- file handle pressure
- local block cache growth
- retry storms
- credential refresh issues
- differences between sequential and random reads
For S3/fsspec specifically, block size and cache type can strongly affect both speed and memory.
Relevant links:
- s3fs issue: large file read, block size, cache type
- fsspec documentation
- s3fs documentation
A practical rule:
If changing only the storage backend changes the symptom, do not debug only
datasets.
10. Direct remote streaming is not always the safest production path
For exploration, streaming=True is excellent.
For production-scale training, I would be more cautious.
If the job is expensive, multi-node, or long-running, it may be safer to stage data explicitly:
remote dataset
-> node-local NVMe / local shard cache
-> train from local shards
or:
remote raw data
-> preprocessing job
-> materialized Parquet/WebDataset/MDS shards
-> training job
This is not as elegant as direct streaming, but it is often easier to debug and reproduce.
Workaround options:
| If you see… | Consider… |
|---|---|
| low GPU utilization | increase DataLoader concurrency carefully; profile CPU/decode/storage |
| high RSS | reduce workers/prefetch/pinning; remove transforms; test num_workers=0 |
| repeated remote reads | local cache or pre-stage to NVMe |
| skip-heavy distributed reads | redesign shard count or use explicit node/rank splitting |
| NFS stalls | move cache to node-local SSD/NVMe |
| object-store request pressure | larger shards, better row groups, fewer tiny files |
| heavy preprocessing | materialize processed data |
| mid-epoch failures | local staging or deterministic/resumable streaming format |
11. Role-based responsibilities
I would not put all responsibility on one party.
Different actors can improve different parts of the stack.
| Role | Practical action |
|---|---|
| Dataset publisher | publish topology-friendly shard layouts; document shard count, shard size, row group size |
| Training user | choose split_dataset_by_node(), tune DataLoader, control cache paths |
| Infra/platform owner | provide node-local SSD/NVMe cache space; avoid hot caches on NFS |
| HF Datasets maintainers | improve topology-aware sharding, diagnostics, docs, world_size guidance |
| Hub/Xet maintainers | improve transfer observability, cache behavior, failure diagnostics |
| PyTorch/DataLoader maintainers | clarify/optimize worker, prefetch, pinned-memory behavior |
| Storage-client maintainers | improve object-store read/cache semantics and visibility |
The important point is:
A “streaming problem” may require a dataset-layout fix, not a streaming-library fix.
12. What diagnostics would make this much easier?
A lot of confusion comes from not knowing which layer is doing extra work.
Useful diagnostics would include:
| Metric | Why it helps |
|---|---|
| per-rank samples consumed | detect imbalance |
| per-rank shards opened | detect unnecessary shard touching |
| skipped samples / skipped bytes | detect wasteful sharding |
| cache hit/miss rate | detect poor locality |
| remote requests per worker | detect request amplification |
| bytes downloaded per rank | detect duplication |
| DataLoader queue depth | detect input bottleneck |
| worker RSS | detect worker memory growth |
| pinned memory use | detect host memory pressure |
| time per shard transition | detect metadata or open overhead |
| retry count / HTTP status | detect backend instability |
| row groups read | detect Parquet granularity issues |
Even a small debug mode that reports some of these would help users avoid guessing.
A useful log format might look like:
rank=3 node=0 worker=5
samples=120000
shards_opened=18
shards_assigned=18
remote_bytes=42GB
skipped_samples=0
cache_hits=1532
cache_misses=12
rss_peak=7.8GB
avg_batch_wait_ms=41
This kind of visibility would make it much easier to tell whether the problem is layout, sharding, cache, DataLoader, or storage.
13. Practical checklist
If I had to debug a large training job today, I would use this order.
Step 1: Record versions and environment
python - <<'PY'
import platform, os
print("python", platform.python_version())
print("platform", platform.platform())
for name in [
"HF_HOME",
"HF_HUB_CACHE",
"HF_XET_CACHE",
"HF_DATASETS_CACHE",
"HF_HUB_DISABLE_XET",
"HF_XET_HIGH_PERFORMANCE",
]:
print(name, os.environ.get(name))
try:
import datasets
print("datasets", datasets.__version__)
except Exception as e:
print("datasets unavailable", repr(e))
try:
import huggingface_hub
print("huggingface_hub", huggingface_hub.__version__)
except Exception as e:
print("huggingface_hub unavailable", repr(e))
try:
import torch
print("torch", torch.__version__)
except Exception as e:
print("torch unavailable", repr(e))
try:
import pyarrow
print("pyarrow", pyarrow.__version__)
except Exception as e:
print("pyarrow unavailable", repr(e))
try:
import hf_xet
print("hf_xet", getattr(hf_xet, "__version__", "unknown"))
except Exception as e:
print("hf_xet unavailable", repr(e))
PY
Step 2: Inspect layout
Record:
dataset format:
total size:
file count:
shard count:
average shard size:
largest shard:
smallest shard:
Parquet row group size:
compression:
remote backend:
Step 3: Check topology
Record:
nodes:
ranks per node:
world_size:
DataLoader workers per rank:
batch size per rank:
prefetch_factor:
pin_memory:
persistent_workers:
Step 4: Test minimal iteration
from datasets import load_dataset
ds = load_dataset("<dataset>", split="train", streaming=True)
for i, x in zip(range(10_000), ds):
if i % 1000 == 0:
print(i)
No DataLoader, no transforms, no Trainer.
Step 5: Add DataLoader with no multiprocessing
from torch.utils.data import DataLoader
loader = DataLoader(
ds,
batch_size=<batch_size>,
num_workers=0,
pin_memory=False,
)
for i, batch in enumerate(loader):
if i % 100 == 0:
print(i)
Step 6: Add workers gradually
loader = DataLoader(
ds,
batch_size=<batch_size>,
num_workers=1,
pin_memory=False,
persistent_workers=False,
)
Then try:
loader = DataLoader(
ds,
batch_size=<batch_size>,
num_workers=4,
prefetch_factor=1,
pin_memory=False,
persistent_workers=False,
)
Only after that, test:
pin_memory=True
persistent_workers=True
Step 7: Test distributed split explicitly
from datasets import load_dataset
from datasets.distributed import split_dataset_by_node
ds = load_dataset("<dataset>", split="train", streaming=True)
ds = split_dataset_by_node(ds, rank=<rank>, world_size=<world_size>)
Then verify that each rank sees the expected data volume and does not duplicate work.
Step 8: Move cache to local disk
export HF_HOME=/local_nvme/hf
export HF_HUB_CACHE=/local_nvme/hf/hub
export HF_XET_CACHE=/local_nvme/hf/xet
export HF_DATASETS_CACHE=/local_nvme/hf/datasets
Then rerun the smallest reproduction.
Step 9: Decide whether to stream, stage, or materialize
| Situation | Safer choice |
|---|---|
| quick exploration | direct streaming |
| single-node training | streaming may be fine if layout is good |
| multi-node production training | consider pre-staging or topology-aware shard layout |
| heavy preprocessing | materialize processed data |
| unstable object-store path | local staging/cache |
| strict restart/resume needs | deterministic/resumable streaming format |
14. Current practical conclusion
I would summarize it this way:
streaming=True is a useful interface, but large-scale training needs more than lazy iteration.
It needs a data access plan that is aware of:
- dataset layout
- shard count
- world size
- node/rank/worker assignment
- cache locality
- DataLoader memory behavior
- storage backend semantics
- restart/resume behavior
So the safest practical recommendation is not:
“Just turn on streaming.”
It is closer to:
Design the dataset layout and cache path for the training topology, then use streaming as one component of that data plane.
For many production-scale jobs, the best workaround may be:
1. preprocess/materialize if transforms are heavy
2. publish balanced shards
3. choose shard counts compatible with expected world sizes
4. split explicitly by node/rank
5. keep cache on local NVMe/SSD
6. tune DataLoader concurrency conservatively
7. monitor skipped samples, cache hits, remote bytes, worker RSS
That does not mean direct streaming is bad.
It means direct remote streaming is only one point in the design space.
For small jobs, it is simple and convenient.
For large distributed training, topology-aware sharding and cache locality become first-class concerns.
15. Link collection
Hugging Face official docs / posts
- Streaming datasets: 100x More Efficient
- Datasets streaming docs
- Dataset vs IterableDataset
- Datasets main classes / IterableDataset reference
- Datasets cache management
- Hub local cache
- huggingface_hub environment variables
- huggingface_hub cache management
- Upload files to the Hub / Xet cache guidance
- Using Xet storage
Hugging Face forum / issues
- Forum: Loading webdatasets across multiple nodes
- Forum: How to use split_dataset_by_node and shuffle on IterableDataset
- Forum: Keeping IterableDataset node-wise split fixed during DDP
- Forum: A streaming dataset’s memory footprint continually grows
- Forum: Speeding up Streaming of Large Datasets / FineWeb
- Issue: IterableDataset distributed/DataLoader sharding performance traps
- Issue: streaming datasets doesn’t work properly with multi-node
- Issue: Problem in training iterable dataset
- Issue: Memory leak when streaming
- Issue: More easily support streaming local files
- Issue: Make Dataset streaming queries retryable
- Issue: HF_DATASETS_CACHE ignored?
PyTorch / DataLoader
- PyTorch DataLoader docs
- TorchData Stateful DataLoader docs
- PyTorch issue: pin_memory / prefetch / persistent_workers memory behavior
Alternative / adjacent streaming systems
- WebDataset multinode docs
- WebDataset GitHub
- WebDataset PyPI notes
- MosaicML StreamingDataset GitHub
- MosaicML StreamingDataset API docs
- Databricks/MosaicML blog: StreamingDataset
Object storage / filesystem layer
- fsspec docs
- s3fs docs
- s3fs issue: large file read, block size, cache type
Discussion in the ATmosphere