Skip to main content
Version: devel

Control nested MongoDB data

info

The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/nested_data

About this Example

In this example, you'll find a Python script that demonstrates how to control nested data using the dlt library.

We'll learn how to:

Full source code

# NOTE: this line is only for dlt CI purposes, you may delete it if you are using this example
__source_name__ = "mongodb"

from itertools import islice
from typing import Any, Dict, Iterator, Optional

from bson.decimal128 import Decimal128
from bson.objectid import ObjectId
from pendulum import _datetime # noqa: I251
from pymongo import MongoClient

import dlt
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import TDataItem
from dlt.common.utils import map_nested_in_place

CHUNK_SIZE = 10000


# You can limit how deep dlt goes when generating child tables.
# By default, the library will descend and generate child tables
# for all nested lists, without a limit.
# In this example, we specify that we only want to generate child tables up to level 2,
# so there will be only one level of child tables within child tables.
@dlt.source(max_table_nesting=2)
def mongodb_collection(
connection_url: str = dlt.secrets.value,
database: Optional[str] = dlt.config.value,
collection: str = dlt.config.value,
incremental: Optional[dlt.sources.incremental] = None, # type: ignore[type-arg]
write_disposition: Optional[str] = dlt.config.value,
) -> Any:
# set up mongo client
client: Any = MongoClient(connection_url, uuidRepresentation="standard", tz_aware=True)
mongo_database = client.get_default_database() if not database else client[database]
collection_obj = mongo_database[collection]

def collection_documents(
client: Any,
collection: Any,
incremental: Optional[dlt.sources.incremental[Any]] = None,
) -> Iterator[TDataItem]:
LoaderClass = CollectionLoader

loader = LoaderClass(client, collection, incremental=incremental)
yield from loader.load_documents()

return dlt.resource( # type: ignore
collection_documents,
name=collection_obj.name,
primary_key="_id",
write_disposition=write_disposition,
)(client, collection_obj, incremental=incremental)


class CollectionLoader:
def __init__(
self,
client: Any,
collection: Any,
incremental: Optional[dlt.sources.incremental[Any]] = None,
) -> None:
self.client = client
self.collection = collection
self.incremental = incremental
if incremental:
self.cursor_field = incremental.cursor_path
self.last_value = incremental.last_value
else:
self.cursor_column = None
self.last_value = None

@property
def _filter_op(self) -> Dict[str, Any]:
if not self.incremental or not self.last_value:
return {}
if self.incremental.last_value_func is max:
return {self.cursor_field: {"$gte": self.last_value}}
elif self.incremental.last_value_func is min:
return {self.cursor_field: {"$lt": self.last_value}}
return {}

def load_documents(self) -> Iterator[TDataItem]:
cursor = self.collection.find(self._filter_op)
while docs_slice := list(islice(cursor, CHUNK_SIZE)):
yield map_nested_in_place(convert_mongo_objs, docs_slice)


def convert_mongo_objs(value: Any) -> Any:
if isinstance(value, (ObjectId, Decimal128)):
return str(value)
if isinstance(value, _datetime.datetime):
return ensure_pendulum_datetime(value)
return value


if __name__ == "__main__":
# When we created the source, we set max_table_nesting to 2.
# This ensures that the generated tables do not have more than two
# levels of nesting, even if the original data structure is more deeply nested.
pipeline = dlt.pipeline(
pipeline_name="mongodb_pipeline",
destination="duckdb",
dataset_name="unpacked_data",
)
source_data = mongodb_collection(collection="movies", write_disposition="replace")
load_info = pipeline.run(source_data)
print(load_info)
tables = pipeline.last_trace.last_normalize_info.row_counts
tables.pop("_dlt_pipeline_state")
assert len(tables) == 7, pipeline.last_trace.last_normalize_info

# make sure nothing failed
load_info.raise_on_failed_jobs()

# The second method involves setting the max_table_nesting attribute directly
# on the source data object.
# This allows for dynamic control over the maximum nesting
# level for a specific data source.
# Here the nesting level is adjusted before running the pipeline.
pipeline = dlt.pipeline(
pipeline_name="mongodb_pipeline",
destination="duckdb",
dataset_name="not_unpacked_data",
)
source_data = mongodb_collection(collection="movies", write_disposition="replace")
source_data.max_table_nesting = 0
load_info = pipeline.run(source_data)
print(load_info)
tables = pipeline.last_trace.last_normalize_info.row_counts
tables.pop("_dlt_pipeline_state")
assert len(tables) == 1, pipeline.last_trace.last_normalize_info

# make sure nothing failed
load_info.raise_on_failed_jobs()

# The third method involves applying data type hints to specific columns in the data.
# In this case, we tell dlt that column 'cast' (containing a list of actors)
# in 'movies' table should have type complex which means
# that it will be loaded as JSON/struct and not as child table.
pipeline = dlt.pipeline(
pipeline_name="mongodb_pipeline",
destination="duckdb",
dataset_name="unpacked_data_without_cast",
)
source_data = mongodb_collection(collection="movies", write_disposition="replace")
source_data.movies.apply_hints(columns={"cast": {"data_type": "complex"}})
load_info = pipeline.run(source_data)
print(load_info)
tables = pipeline.last_trace.last_normalize_info.row_counts
tables.pop("_dlt_pipeline_state")
assert len(tables) == 6, pipeline.last_trace.last_normalize_info

# make sure nothing failed
load_info.raise_on_failed_jobs()

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.