Skip to content

Sentence splitter

Node parsers.

HTMLNodeParser #

Bases: NodeParser

HTML node parser.

Splits a document into Nodes using custom HTML splitting logic.

Parameters:

Name Type Description Default
include_metadata bool

whether to include metadata in nodes

required
include_prev_next_rel bool

whether to include prev/next relationships

required
tags List[str]

HTML tags to extract text from.

['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'b', 'i', 'u', 'section']
Source code in llama_index/core/node_parser/file/html.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class HTMLNodeParser(NodeParser):
    """
    HTML node parser.

    Splits a document into Nodes using custom HTML splitting logic.

    Args:
        include_metadata (bool): whether to include metadata in nodes
        include_prev_next_rel (bool): whether to include prev/next relationships

    """

    tags: List[str] = Field(
        default=DEFAULT_TAGS, description="HTML tags to extract text from."
    )

    @classmethod
    def from_defaults(
        cls,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        callback_manager: Optional[CallbackManager] = None,
        tags: Optional[List[str]] = DEFAULT_TAGS,
    ) -> "HTMLNodeParser":
        callback_manager = callback_manager or CallbackManager([])

        return cls(
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            callback_manager=callback_manager,
            tags=tags,
        )

    @classmethod
    def class_name(cls) -> str:
        """Get class name."""
        return "HTMLNodeParser"

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")

        for node in nodes_with_progress:
            nodes = self.get_nodes_from_node(node)
            all_nodes.extend(nodes)

        return all_nodes

    def get_nodes_from_node(self, node: BaseNode) -> List[TextNode]:
        """Get nodes from document."""
        try:
            from bs4 import BeautifulSoup, Tag
        except ImportError:
            raise ImportError("bs4 is required to read HTML files.")

        text = node.get_content(metadata_mode=MetadataMode.NONE)
        soup = BeautifulSoup(text, "html.parser")
        html_nodes = []
        last_tag = None
        current_section = ""

        tags = soup.find_all(self.tags)
        for tag in tags:
            tag_text = self._extract_text_from_tag(tag)
            if isinstance(tag, Tag) and (tag.name == last_tag or last_tag is None):
                last_tag = tag.name
                current_section += f"{tag_text.strip()}\n"
            else:
                html_nodes.append(
                    self._build_node_from_split(
                        current_section.strip(), node, {"tag": last_tag}
                    )
                )
                if isinstance(tag, Tag):
                    last_tag = tag.name
                current_section = f"{tag_text}\n"

        if current_section:
            html_nodes.append(
                self._build_node_from_split(
                    current_section.strip(), node, {"tag": last_tag}
                )
            )

        return html_nodes

    def _extract_text_from_tag(
        self, tag: Union["Tag", "NavigableString", "PageElement"]
    ) -> str:
        from bs4 import NavigableString, Tag, PageElement

        texts = []
        if isinstance(tag, Tag):
            for elem in tag.children:
                if isinstance(elem, NavigableString):
                    if elem.strip():
                        texts.append(elem.strip())
                elif isinstance(elem, Tag):
                    if elem.name in self.tags:
                        continue
                    else:
                        texts.append(elem.get_text().strip())
                elif isinstance(elem, PageElement):
                    texts.append(elem.get_text().strip())
        else:
            texts.append(tag.get_text().strip())
        return "\n".join(texts)

    def _build_node_from_split(
        self,
        text_split: str,
        node: BaseNode,
        metadata: dict,
    ) -> TextNode:
        """Build node from single text split."""
        node = build_nodes_from_splits([text_split], node, id_func=self.id_func)[0]

        if self.include_metadata:
            node.metadata = {**node.metadata, **metadata}

        return node

class_name classmethod #

class_name() -> str

Get class name.

Source code in llama_index/core/node_parser/file/html.py
51
52
53
54
@classmethod
def class_name(cls) -> str:
    """Get class name."""
    return "HTMLNodeParser"

get_nodes_from_node #

get_nodes_from_node(node: BaseNode) -> List[TextNode]

Get nodes from document.

Source code in llama_index/core/node_parser/file/html.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def get_nodes_from_node(self, node: BaseNode) -> List[TextNode]:
    """Get nodes from document."""
    try:
        from bs4 import BeautifulSoup, Tag
    except ImportError:
        raise ImportError("bs4 is required to read HTML files.")

    text = node.get_content(metadata_mode=MetadataMode.NONE)
    soup = BeautifulSoup(text, "html.parser")
    html_nodes = []
    last_tag = None
    current_section = ""

    tags = soup.find_all(self.tags)
    for tag in tags:
        tag_text = self._extract_text_from_tag(tag)
        if isinstance(tag, Tag) and (tag.name == last_tag or last_tag is None):
            last_tag = tag.name
            current_section += f"{tag_text.strip()}\n"
        else:
            html_nodes.append(
                self._build_node_from_split(
                    current_section.strip(), node, {"tag": last_tag}
                )
            )
            if isinstance(tag, Tag):
                last_tag = tag.name
            current_section = f"{tag_text}\n"

    if current_section:
        html_nodes.append(
            self._build_node_from_split(
                current_section.strip(), node, {"tag": last_tag}
            )
        )

    return html_nodes

JSONNodeParser #

Bases: NodeParser

JSON node parser.

Splits a document into Nodes using custom JSON splitting logic.

Parameters:

Name Type Description Default
include_metadata bool

whether to include metadata in nodes

required
include_prev_next_rel bool

whether to include prev/next relationships

required
Source code in llama_index/core/node_parser/file/json.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class JSONNodeParser(NodeParser):
    """
    JSON node parser.

    Splits a document into Nodes using custom JSON splitting logic.

    Args:
        include_metadata (bool): whether to include metadata in nodes
        include_prev_next_rel (bool): whether to include prev/next relationships

    """

    @classmethod
    def from_defaults(
        cls,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        callback_manager: Optional[CallbackManager] = None,
    ) -> "JSONNodeParser":
        callback_manager = callback_manager or CallbackManager([])

        return cls(
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            callback_manager=callback_manager,
        )

    @classmethod
    def class_name(cls) -> str:
        """Get class name."""
        return "JSONNodeParser"

    def _parse_nodes(
        self, nodes: Sequence[BaseNode], show_progress: bool = False, **kwargs: Any
    ) -> List[BaseNode]:
        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")

        for node in nodes_with_progress:
            nodes = self.get_nodes_from_node(node)
            all_nodes.extend(nodes)

        return all_nodes

    def get_nodes_from_node(self, node: BaseNode) -> List[TextNode]:
        """Get nodes from document."""
        text = node.get_content(metadata_mode=MetadataMode.NONE)
        try:
            data = json.loads(text)
        except json.JSONDecodeError:
            # Handle invalid JSON input here
            return []

        json_nodes = []
        if isinstance(data, dict):
            lines = [*self._depth_first_yield(data, 0, [])]
            json_nodes.extend(
                build_nodes_from_splits(["\n".join(lines)], node, id_func=self.id_func)
            )
        elif isinstance(data, list):
            for json_object in data:
                lines = [*self._depth_first_yield(json_object, 0, [])]
                json_nodes.extend(
                    build_nodes_from_splits(
                        ["\n".join(lines)], node, id_func=self.id_func
                    )
                )
        else:
            raise ValueError("JSON is invalid")

        return json_nodes

    def _depth_first_yield(
        self, json_data: Dict, levels_back: int, path: List[str]
    ) -> Generator[str, None, None]:
        """
        Do depth first yield of all of the leaf nodes of a JSON.

        Combines keys in the JSON tree using spaces.

        If levels_back is set to 0, prints all levels.

        """
        if isinstance(json_data, dict):
            for key, value in json_data.items():
                new_path = path[:]
                new_path.append(key)
                yield from self._depth_first_yield(value, levels_back, new_path)
        elif isinstance(json_data, list):
            for _, value in enumerate(json_data):
                yield from self._depth_first_yield(value, levels_back, path)
        else:
            new_path = path[-levels_back:]
            new_path.append(str(json_data))
            yield " ".join(new_path)

class_name classmethod #

class_name() -> str

Get class name.

Source code in llama_index/core/node_parser/file/json.py
40
41
42
43
@classmethod
def class_name(cls) -> str:
    """Get class name."""
    return "JSONNodeParser"

get_nodes_from_node #

get_nodes_from_node(node: BaseNode) -> List[TextNode]

Get nodes from document.

Source code in llama_index/core/node_parser/file/json.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def get_nodes_from_node(self, node: BaseNode) -> List[TextNode]:
    """Get nodes from document."""
    text = node.get_content(metadata_mode=MetadataMode.NONE)
    try:
        data = json.loads(text)
    except json.JSONDecodeError:
        # Handle invalid JSON input here
        return []

    json_nodes = []
    if isinstance(data, dict):
        lines = [*self._depth_first_yield(data, 0, [])]
        json_nodes.extend(
            build_nodes_from_splits(["\n".join(lines)], node, id_func=self.id_func)
        )
    elif isinstance(data, list):
        for json_object in data:
            lines = [*self._depth_first_yield(json_object, 0, [])]
            json_nodes.extend(
                build_nodes_from_splits(
                    ["\n".join(lines)], node, id_func=self.id_func
                )
            )
    else:
        raise ValueError("JSON is invalid")

    return json_nodes

MarkdownNodeParser #

Bases: NodeParser

Markdown node parser.

Splits a document into Nodes using Markdown header-based splitting logic. Each node contains its text content and the path of headers leading to it.

Parameters:

Name Type Description Default
include_metadata bool

whether to include metadata in nodes

required
include_prev_next_rel bool

whether to include prev/next relationships

required
header_path_separator str

separator char used for section header path metadata

'/'
Source code in llama_index/core/node_parser/file/markdown.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class MarkdownNodeParser(NodeParser):
    """
    Markdown node parser.

    Splits a document into Nodes using Markdown header-based splitting logic.
    Each node contains its text content and the path of headers leading to it.

    Args:
        include_metadata (bool): whether to include metadata in nodes
        include_prev_next_rel (bool): whether to include prev/next relationships
        header_path_separator (str): separator char used for section header path metadata

    """

    header_path_separator: str = Field(
        default="/", description="Separator char used for section header path metadata."
    )

    @classmethod
    def from_defaults(
        cls,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        header_path_separator: str = "/",
        callback_manager: Optional[CallbackManager] = None,
    ) -> "MarkdownNodeParser":
        callback_manager = callback_manager or CallbackManager([])
        return cls(
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            header_path_separator=header_path_separator,
            callback_manager=callback_manager,
        )

    def get_nodes_from_node(self, node: BaseNode) -> List[TextNode]:
        """Get nodes from document by splitting on headers."""
        text = node.get_content(metadata_mode=MetadataMode.NONE)
        markdown_nodes = []
        lines = text.split("\n")
        current_section = ""
        # Keep track of (markdown level, text) for headers
        header_stack: List[tuple[int, str]] = []
        code_block = False

        for line in lines:
            # Track if we're inside a code block to avoid parsing headers in code
            if line.lstrip().startswith("```"):
                code_block = not code_block
                current_section += line + "\n"
                continue

            # Only parse headers if we're not in a code block
            if not code_block:
                header_match = re.match(r"^(#+)\s(.*)", line)
                if header_match:
                    # Save the previous section before starting a new one
                    if current_section.strip():
                        markdown_nodes.append(
                            self._build_node_from_split(
                                current_section.strip(),
                                node,
                                self.header_path_separator.join(
                                    h[1] for h in header_stack[:-1]
                                ),
                            )
                        )

                    header_level = len(header_match.group(1))
                    header_text = header_match.group(2)

                    # Compare against top-of-stack item’s markdown level.
                    # Pop headers of equal or higher markdown level; not necessarily current stack size / depth.
                    # Hierarchy depth gets deeper one level at a time, but markdown headers can jump from H1 to H3, for example.
                    while header_stack and header_stack[-1][0] >= header_level:
                        header_stack.pop()

                    # Add the new header
                    header_stack.append((header_level, header_text))
                    current_section = "#" * header_level + f" {header_text}\n"
                    continue

            current_section += line + "\n"

        # Add the final section
        if current_section.strip():
            markdown_nodes.append(
                self._build_node_from_split(
                    current_section.strip(),
                    node,
                    self.header_path_separator.join(h[1] for h in header_stack[:-1]),
                )
            )

        return markdown_nodes

    def _build_node_from_split(
        self,
        text_split: str,
        node: BaseNode,
        header_path: str,
    ) -> TextNode:
        """Build node from single text split."""
        node = build_nodes_from_splits([text_split], node, id_func=self.id_func)[0]

        if self.include_metadata:
            separator = self.header_path_separator
            node.metadata["header_path"] = (
                # ex: "/header1/header2/" || "/"
                separator + header_path + separator if header_path else separator
            )

        return node

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """Parse nodes."""
        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")

        for node in nodes_with_progress:
            nodes = self.get_nodes_from_node(node)
            all_nodes.extend(nodes)

        return all_nodes

get_nodes_from_node #

get_nodes_from_node(node: BaseNode) -> List[TextNode]

Get nodes from document by splitting on headers.

Source code in llama_index/core/node_parser/file/markdown.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def get_nodes_from_node(self, node: BaseNode) -> List[TextNode]:
    """Get nodes from document by splitting on headers."""
    text = node.get_content(metadata_mode=MetadataMode.NONE)
    markdown_nodes = []
    lines = text.split("\n")
    current_section = ""
    # Keep track of (markdown level, text) for headers
    header_stack: List[tuple[int, str]] = []
    code_block = False

    for line in lines:
        # Track if we're inside a code block to avoid parsing headers in code
        if line.lstrip().startswith("```"):
            code_block = not code_block
            current_section += line + "\n"
            continue

        # Only parse headers if we're not in a code block
        if not code_block:
            header_match = re.match(r"^(#+)\s(.*)", line)
            if header_match:
                # Save the previous section before starting a new one
                if current_section.strip():
                    markdown_nodes.append(
                        self._build_node_from_split(
                            current_section.strip(),
                            node,
                            self.header_path_separator.join(
                                h[1] for h in header_stack[:-1]
                            ),
                        )
                    )

                header_level = len(header_match.group(1))
                header_text = header_match.group(2)

                # Compare against top-of-stack item’s markdown level.
                # Pop headers of equal or higher markdown level; not necessarily current stack size / depth.
                # Hierarchy depth gets deeper one level at a time, but markdown headers can jump from H1 to H3, for example.
                while header_stack and header_stack[-1][0] >= header_level:
                    header_stack.pop()

                # Add the new header
                header_stack.append((header_level, header_text))
                current_section = "#" * header_level + f" {header_text}\n"
                continue

        current_section += line + "\n"

    # Add the final section
    if current_section.strip():
        markdown_nodes.append(
            self._build_node_from_split(
                current_section.strip(),
                node,
                self.header_path_separator.join(h[1] for h in header_stack[:-1]),
            )
        )

    return markdown_nodes

SimpleFileNodeParser #

Bases: NodeParser

Simple file node parser.

Splits a document loaded from a file into Nodes using logic based on the file type automatically detects the NodeParser to use based on file type

Parameters:

Name Type Description Default
include_metadata bool

whether to include metadata in nodes

required
include_prev_next_rel bool

whether to include prev/next relationships

required
Source code in llama_index/core/node_parser/file/simple_file.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class SimpleFileNodeParser(NodeParser):
    """
    Simple file node parser.

    Splits a document loaded from a file into Nodes using logic based on the file type
    automatically detects the NodeParser to use based on file type

    Args:
        include_metadata (bool): whether to include metadata in nodes
        include_prev_next_rel (bool): whether to include prev/next relationships

    """

    @classmethod
    def from_defaults(
        cls,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        callback_manager: Optional[CallbackManager] = None,
    ) -> "SimpleFileNodeParser":
        callback_manager = callback_manager or CallbackManager([])

        return cls(
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            callback_manager=callback_manager,
        )

    @classmethod
    def class_name(cls) -> str:
        """Get class name."""
        return "SimpleFileNodeParser"

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """
        Parse document into nodes.

        Args:
            nodes (Sequence[BaseNode]): nodes to parse

        """
        all_nodes: List[BaseNode] = []
        documents_with_progress = get_tqdm_iterable(
            nodes, show_progress, "Parsing documents into nodes"
        )

        for document in documents_with_progress:
            ext = document.metadata.get("extension", "None")
            if ext in FILE_NODE_PARSERS:
                parser = FILE_NODE_PARSERS[ext](
                    include_metadata=self.include_metadata,
                    include_prev_next_rel=self.include_prev_next_rel,
                    callback_manager=self.callback_manager,
                )

                nodes = parser.get_nodes_from_documents([document], show_progress)
                all_nodes.extend(nodes)
            else:
                # What to do when file type isn't supported yet?
                all_nodes.extend(
                    # build node from document
                    build_nodes_from_splits(
                        [document.get_content(metadata_mode=MetadataMode.NONE)],
                        document,
                        id_func=self.id_func,
                    )
                )

        return all_nodes

class_name classmethod #

class_name() -> str

Get class name.

Source code in llama_index/core/node_parser/file/simple_file.py
49
50
51
52
@classmethod
def class_name(cls) -> str:
    """Get class name."""
    return "SimpleFileNodeParser"

MetadataAwareTextSplitter #

Bases: TextSplitter

Source code in llama_index/core/node_parser/interface.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class MetadataAwareTextSplitter(TextSplitter):
    @abstractmethod
    def split_text_metadata_aware(self, text: str, metadata_str: str) -> List[str]: ...

    def split_texts_metadata_aware(
        self, texts: List[str], metadata_strs: List[str]
    ) -> List[str]:
        if len(texts) != len(metadata_strs):
            raise ValueError("Texts and metadata_strs must have the same length")
        nested_texts = [
            self.split_text_metadata_aware(text, metadata)
            for text, metadata in zip(texts, metadata_strs)
        ]
        return [item for sublist in nested_texts for item in sublist]

    def _get_metadata_str(self, node: BaseNode) -> str:
        """Helper function to get the proper metadata str for splitting."""
        embed_metadata_str = node.get_metadata_str(mode=MetadataMode.EMBED)
        llm_metadata_str = node.get_metadata_str(mode=MetadataMode.LLM)

        # use the longest metadata str for splitting
        if len(embed_metadata_str) > len(llm_metadata_str):
            metadata_str = embed_metadata_str
        else:
            metadata_str = llm_metadata_str

        return metadata_str

    def _parse_nodes(
        self, nodes: Sequence[BaseNode], show_progress: bool = False, **kwargs: Any
    ) -> List[BaseNode]:
        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")

        for node in nodes_with_progress:
            metadata_str = self._get_metadata_str(node)
            splits = self.split_text_metadata_aware(
                node.get_content(metadata_mode=MetadataMode.NONE),
                metadata_str=metadata_str,
            )
            all_nodes.extend(
                build_nodes_from_splits(splits, node, id_func=self.id_func)
            )

        return all_nodes

NodeParser #

Bases: TransformComponent, ABC

Base interface for node parser.

Parameters:

Name Type Description Default
include_metadata bool

Whether or not to consider metadata when splitting.

True
include_prev_next_rel bool

Include prev/next node relationships.

True
callback_manager CallbackManager
<llama_index.core.callbacks.base.CallbackManager object at 0x7f5a821e96d0>
id_func Annotated[Callable, FieldInfo, BeforeValidator, WithJsonSchema, WithJsonSchema, PlainSerializer] | None

Function to generate node IDs.

None
Source code in llama_index/core/node_parser/interface.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class NodeParser(TransformComponent, ABC):
    """Base interface for node parser."""

    model_config = ConfigDict(arbitrary_types_allowed=True)
    include_metadata: bool = Field(
        default=True, description="Whether or not to consider metadata when splitting."
    )
    include_prev_next_rel: bool = Field(
        default=True, description="Include prev/next node relationships."
    )
    callback_manager: CallbackManager = Field(
        default_factory=lambda: CallbackManager([]), exclude=True
    )
    id_func: IdFuncCallable = Field(
        default=default_id_func,
        description="Function to generate node IDs.",
    )

    @abstractmethod
    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]: ...

    async def _aparse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        return self._parse_nodes(nodes, show_progress=show_progress, **kwargs)

    def _postprocess_parsed_nodes(
        self, nodes: List[BaseNode], parent_doc_map: Dict[str, Document]
    ) -> List[BaseNode]:
        # Track search position per document to handle duplicate text correctly
        # Nodes are assumed to be in document order from _parse_nodes
        # We track the START position (not end) to allow for overlapping chunks
        doc_search_positions: Dict[str, int] = {}

        for i, node in enumerate(nodes):
            parent_doc = parent_doc_map.get(node.ref_doc_id or "", None)
            parent_node = node.source_node

            if parent_doc is not None:
                if parent_doc.source_node is not None:
                    node.relationships.update(
                        {
                            NodeRelationship.SOURCE: parent_doc.source_node,
                        }
                    )

                # Get or initialize search position for this document
                doc_id = node.ref_doc_id or ""
                search_start = doc_search_positions.get(doc_id, 0)

                # Search for node content starting from the last found position
                node_content = node.get_content(metadata_mode=MetadataMode.NONE)
                start_char_idx = parent_doc.text.find(node_content, search_start)

                # update start/end char idx
                if start_char_idx >= 0 and isinstance(node, TextNode):
                    node.start_char_idx = start_char_idx
                    node.end_char_idx = start_char_idx + len(node_content)
                    # Update search position to start from next character after this node's START
                    # This allows overlapping chunks to be found correctly
                    doc_search_positions[doc_id] = start_char_idx + 1

                # update metadata
                if self.include_metadata:
                    # Merge parent_doc.metadata into nodes.metadata, giving preference to node's values
                    node.metadata = {**parent_doc.metadata, **node.metadata}

            if parent_node is not None:
                if self.include_metadata:
                    parent_metadata = parent_node.metadata

                    combined_metadata = {**parent_metadata, **node.metadata}

                    # Merge parent_node.metadata into nodes.metadata, giving preference to node's values
                    node.metadata.update(combined_metadata)

            if self.include_prev_next_rel:
                # establish prev/next relationships if nodes share the same source_node
                if (
                    i > 0
                    and node.source_node
                    and nodes[i - 1].source_node
                    and nodes[i - 1].source_node.node_id == node.source_node.node_id  # type: ignore
                ):
                    node.relationships[NodeRelationship.PREVIOUS] = nodes[
                        i - 1
                    ].as_related_node_info()
                if (
                    i < len(nodes) - 1
                    and node.source_node
                    and nodes[i + 1].source_node
                    and nodes[i + 1].source_node.node_id == node.source_node.node_id  # type: ignore
                ):
                    node.relationships[NodeRelationship.NEXT] = nodes[
                        i + 1
                    ].as_related_node_info()

        return nodes

    def get_nodes_from_documents(
        self,
        documents: Sequence[Document],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """
        Parse documents into nodes.

        Args:
            documents (Sequence[Document]): documents to parse
            show_progress (bool): whether to show progress bar

        """
        doc_id_to_document = {doc.id_: doc for doc in documents}

        with self.callback_manager.event(
            CBEventType.NODE_PARSING, payload={EventPayload.DOCUMENTS: documents}
        ) as event:
            nodes = self._parse_nodes(documents, show_progress=show_progress, **kwargs)
            nodes = self._postprocess_parsed_nodes(nodes, doc_id_to_document)

            event.on_end({EventPayload.NODES: nodes})

        return nodes

    async def aget_nodes_from_documents(
        self,
        documents: Sequence[Document],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        doc_id_to_document = {doc.id_: doc for doc in documents}

        with self.callback_manager.event(
            CBEventType.NODE_PARSING, payload={EventPayload.DOCUMENTS: documents}
        ) as event:
            nodes = await self._aparse_nodes(
                documents, show_progress=show_progress, **kwargs
            )
            nodes = self._postprocess_parsed_nodes(nodes, doc_id_to_document)

            event.on_end({EventPayload.NODES: nodes})

        return nodes

    def __call__(self, nodes: Sequence[BaseNode], **kwargs: Any) -> List[BaseNode]:
        return self.get_nodes_from_documents(nodes, **kwargs)  # type: ignore

    async def acall(self, nodes: Sequence[BaseNode], **kwargs: Any) -> List[BaseNode]:
        return await self.aget_nodes_from_documents(nodes, **kwargs)  # type: ignore

get_nodes_from_documents #

get_nodes_from_documents(documents: Sequence[Document], show_progress: bool = False, **kwargs: Any) -> List[BaseNode]

Parse documents into nodes.

Parameters:

Name Type Description Default
documents Sequence[Document]

documents to parse

required
show_progress bool

whether to show progress bar

False
Source code in llama_index/core/node_parser/interface.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def get_nodes_from_documents(
    self,
    documents: Sequence[Document],
    show_progress: bool = False,
    **kwargs: Any,
) -> List[BaseNode]:
    """
    Parse documents into nodes.

    Args:
        documents (Sequence[Document]): documents to parse
        show_progress (bool): whether to show progress bar

    """
    doc_id_to_document = {doc.id_: doc for doc in documents}

    with self.callback_manager.event(
        CBEventType.NODE_PARSING, payload={EventPayload.DOCUMENTS: documents}
    ) as event:
        nodes = self._parse_nodes(documents, show_progress=show_progress, **kwargs)
        nodes = self._postprocess_parsed_nodes(nodes, doc_id_to_document)

        event.on_end({EventPayload.NODES: nodes})

    return nodes

TextSplitter #

Bases: NodeParser

Source code in llama_index/core/node_parser/interface.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class TextSplitter(NodeParser):
    @abstractmethod
    def split_text(self, text: str) -> List[str]: ...

    def split_texts(self, texts: List[str]) -> List[str]:
        nested_texts = [self.split_text(text) for text in texts]
        return [item for sublist in nested_texts for item in sublist]

    def _parse_nodes(
        self, nodes: Sequence[BaseNode], show_progress: bool = False, **kwargs: Any
    ) -> List[BaseNode]:
        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")
        for node in nodes_with_progress:
            splits = self.split_text(node.get_content())

            all_nodes.extend(
                build_nodes_from_splits(splits, node, id_func=self.id_func)
            )

        return all_nodes

HierarchicalNodeParser #

Bases: NodeParser

Hierarchical node parser.

Splits a document into a recursive hierarchy Nodes using a NodeParser.

NOTE: this will return a hierarchy of nodes in a flat list, where there will be overlap between parent nodes (e.g. with a bigger chunk size), and child nodes per parent (e.g. with a smaller chunk size).

For instance, this may return a list of nodes like:

  • list of top-level nodes with chunk size 2048
  • list of second-level nodes, where each node is a child of a top-level node, chunk size 512
  • list of third-level nodes, where each node is a child of a second-level node, chunk size 128

Parameters:

Name Type Description Default
chunk_sizes List[int] | None

The chunk sizes to use when splitting documents, in order of level.

None
node_parser_ids List[str]

List of ids for the node parsers to use when splitting documents, in order of level (first id used for first level, etc.).

<dynamic>
node_parser_map Dict[str, NodeParser]

Map of node parser id to node parser.

required
Source code in llama_index/core/node_parser/relational/hierarchical.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class HierarchicalNodeParser(NodeParser):
    """
    Hierarchical node parser.

    Splits a document into a recursive hierarchy Nodes using a NodeParser.

    NOTE: this will return a hierarchy of nodes in a flat list, where there will be
    overlap between parent nodes (e.g. with a bigger chunk size), and child nodes
    per parent (e.g. with a smaller chunk size).

    For instance, this may return a list of nodes like:

    - list of top-level nodes with chunk size 2048
    - list of second-level nodes, where each node is a child of a top-level node,
      chunk size 512
    - list of third-level nodes, where each node is a child of a second-level node,
      chunk size 128
    """

    chunk_sizes: Optional[List[int]] = Field(
        default=None,
        description=(
            "The chunk sizes to use when splitting documents, in order of level."
        ),
    )
    node_parser_ids: List[str] = Field(
        default_factory=list,
        description=(
            "List of ids for the node parsers to use when splitting documents, "
            + "in order of level (first id used for first level, etc.)."
        ),
    )
    node_parser_map: Dict[str, NodeParser] = Field(
        description="Map of node parser id to node parser.",
    )

    @classmethod
    def from_defaults(
        cls,
        chunk_sizes: Optional[List[int]] = None,
        chunk_overlap: int = 20,
        node_parser_ids: Optional[List[str]] = None,
        node_parser_map: Optional[Dict[str, NodeParser]] = None,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        callback_manager: Optional[CallbackManager] = None,
    ) -> "HierarchicalNodeParser":
        callback_manager = callback_manager or CallbackManager([])

        if node_parser_ids is None:
            if chunk_sizes is None:
                chunk_sizes = [2048, 512, 128]

            node_parser_ids = [f"chunk_size_{chunk_size}" for chunk_size in chunk_sizes]
            node_parser_map = {}
            for chunk_size, node_parser_id in zip(chunk_sizes, node_parser_ids):
                node_parser_map[node_parser_id] = SentenceSplitter(
                    chunk_size=chunk_size,
                    callback_manager=callback_manager,
                    chunk_overlap=chunk_overlap,
                    include_metadata=include_metadata,
                    include_prev_next_rel=include_prev_next_rel,
                )
        else:
            if chunk_sizes is not None:
                raise ValueError("Cannot specify both node_parser_ids and chunk_sizes.")
            if node_parser_map is None:
                raise ValueError(
                    "Must specify node_parser_map if using node_parser_ids."
                )

        return cls(
            chunk_sizes=chunk_sizes,
            node_parser_ids=node_parser_ids,
            node_parser_map=node_parser_map,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            callback_manager=callback_manager,
        )

    @classmethod
    def class_name(cls) -> str:
        return "HierarchicalNodeParser"

    def _recursively_get_nodes_from_nodes(
        self,
        nodes: List[BaseNode],
        level: int,
        show_progress: bool = False,
    ) -> List[BaseNode]:
        """Recursively get nodes from nodes."""
        if level >= len(self.node_parser_ids):
            raise ValueError(
                f"Level {level} is greater than number of text "
                f"splitters ({len(self.node_parser_ids)})."
            )

        # first split current nodes into sub-nodes
        nodes_with_progress = get_tqdm_iterable(
            nodes, show_progress, "Parsing documents into nodes"
        )
        sub_nodes = []
        for node in nodes_with_progress:
            cur_sub_nodes = self.node_parser_map[
                self.node_parser_ids[level]
            ].get_nodes_from_documents([node])
            # add parent relationship from sub node to parent node
            # add child relationship from parent node to sub node
            # NOTE: Only add relationships if level > 0, since we don't want to add
            # relationships for the top-level document objects that we are splitting
            if level > 0:
                for sub_node in cur_sub_nodes:
                    _add_parent_child_relationship(
                        parent_node=node,
                        child_node=sub_node,
                    )

            sub_nodes.extend(cur_sub_nodes)

        # now for each sub-node, recursively split into sub-sub-nodes, and add
        if level < len(self.node_parser_ids) - 1:
            sub_sub_nodes = self._recursively_get_nodes_from_nodes(
                sub_nodes,
                level + 1,
                show_progress=show_progress,
            )
        else:
            sub_sub_nodes = []

        return sub_nodes + sub_sub_nodes

    def get_nodes_from_documents(
        self,
        documents: Sequence[Document],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """Parse document into nodes."""
        with self.callback_manager.event(
            CBEventType.NODE_PARSING, payload={EventPayload.DOCUMENTS: documents}
        ) as event:
            all_nodes: List[BaseNode] = []
            documents_with_progress = get_tqdm_iterable(
                documents, show_progress, "Parsing documents into nodes"
            )

            # TODO: a bit of a hack rn for tqdm
            for doc in documents_with_progress:
                nodes_from_doc = self._recursively_get_nodes_from_nodes([doc], 0)
                all_nodes.extend(nodes_from_doc)

            event.on_end(payload={EventPayload.NODES: all_nodes})

        return all_nodes

    # Unused abstract method
    def _parse_nodes(
        self, nodes: Sequence[BaseNode], show_progress: bool = False, **kwargs: Any
    ) -> List[BaseNode]:
        return list(nodes)

get_nodes_from_documents #

get_nodes_from_documents(documents: Sequence[Document], show_progress: bool = False, **kwargs: Any) -> List[BaseNode]

Parse document into nodes.

Source code in llama_index/core/node_parser/relational/hierarchical.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def get_nodes_from_documents(
    self,
    documents: Sequence[Document],
    show_progress: bool = False,
    **kwargs: Any,
) -> List[BaseNode]:
    """Parse document into nodes."""
    with self.callback_manager.event(
        CBEventType.NODE_PARSING, payload={EventPayload.DOCUMENTS: documents}
    ) as event:
        all_nodes: List[BaseNode] = []
        documents_with_progress = get_tqdm_iterable(
            documents, show_progress, "Parsing documents into nodes"
        )

        # TODO: a bit of a hack rn for tqdm
        for doc in documents_with_progress:
            nodes_from_doc = self._recursively_get_nodes_from_nodes([doc], 0)
            all_nodes.extend(nodes_from_doc)

        event.on_end(payload={EventPayload.NODES: all_nodes})

    return all_nodes

MarkdownElementNodeParser #

Bases: BaseElementNodeParser

Markdown element node parser.

Splits a markdown document into Text Nodes and Index Nodes corresponding to embedded objects (e.g. tables).

Source code in llama_index/core/node_parser/relational/markdown_element.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class MarkdownElementNodeParser(BaseElementNodeParser):
    """
    Markdown element node parser.

    Splits a markdown document into Text Nodes and Index Nodes corresponding to embedded objects
    (e.g. tables).

    """

    @classmethod
    def class_name(cls) -> str:
        return "MarkdownElementNodeParser"

    def get_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
        """Get nodes from node."""
        elements = self.extract_elements(
            node.get_content(), table_filters=[self.filter_table], node_id=node.node_id
        )
        elements = self.extract_html_tables(elements)
        table_elements = self.get_table_elements(elements)
        # extract summaries over table elements
        self.extract_table_summaries(table_elements)
        # convert into nodes
        # will return a list of Nodes and Index Nodes
        nodes = self.get_nodes_from_elements(
            elements, node, ref_doc_text=node.get_content()
        )
        source_document = node.source_node or node.as_related_node_info()
        for n in nodes:
            n.relationships[NodeRelationship.SOURCE] = source_document
            n.metadata.update(node.metadata)
        return nodes

    async def aget_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
        """Get nodes from node."""
        elements = self.extract_elements(
            node.get_content(), table_filters=[self.filter_table], node_id=node.node_id
        )
        table_elements = self.get_table_elements(elements)
        # extract summaries over table elements
        await self.aextract_table_summaries(table_elements)
        # convert into nodes
        # will return a list of Nodes and Index Nodes
        nodes = self.get_nodes_from_elements(
            elements, node, ref_doc_text=node.get_content()
        )
        source_document = node.source_node or node.as_related_node_info()
        for n in nodes:
            n.relationships[NodeRelationship.SOURCE] = source_document
            n.metadata.update(node.metadata)
        return nodes

    def extract_html_tables(self, elements: List[Element]) -> List[Element]:
        """
        Extract html tables from text.

        Returns:
            List[Element]: text elements split by table_text element

        """
        new_elements = []
        for element in elements:
            if element.type != "text":
                # skip when it is not text
                new_elements.append(element)
                continue
            else:
                text = element.element
                last_pos = 0
                i = 0
                n = len(text)

                while i < n:
                    table_start = text.find("<table>", i)
                    if table_start == -1:
                        break

                    table_end = text.find("</table>", table_start)
                    if table_end - table_start <= 7:
                        # not a valid <table></table>
                        break

                    # extract text before the table
                    pre_text = text[last_pos:table_start].strip()
                    if pre_text:
                        new_elements.append(
                            Element(
                                id=f"{element.id}_{len(new_elements)}",
                                type="text",
                                element=pre_text,
                            )
                        )

                    # extract the html table
                    table_content = text[
                        table_start : table_end + 8
                    ]  # 8 is length of </table>
                    new_elements.append(
                        Element(
                            id=f"{element.id}_{len(new_elements)}",
                            type="table_text",
                            element=table_content,
                        )
                    )

                    last_pos = table_end + 8
                    i = last_pos

                # add the last piece of text
                final_text = text[last_pos:].strip()
                if final_text:
                    new_elements.append(
                        Element(
                            id=f"{element.id}_{len(new_elements)}",
                            type="text",
                            element=final_text,
                        )
                    )

        return new_elements

    def extract_elements(
        self,
        text: str,
        node_id: Optional[str] = None,
        table_filters: Optional[List[Callable]] = None,
        **kwargs: Any,
    ) -> List[Element]:
        # get node id for each node so that we can avoid using the same id for different nodes
        """Extract elements from text."""
        lines = text.split("\n")
        currentElement = None

        elements: List[Element] = []
        # Then parse the lines
        for line in lines:
            if line.startswith("```"):
                # check if this is the end of a code block
                if currentElement is not None and currentElement.type == "code":
                    elements.append(currentElement)
                    currentElement = None
                    # if there is some text after the ``` create a text element with it
                    if len(line) > 3:
                        elements.append(
                            Element(
                                id=f"id_{len(elements)}",
                                type="text",
                                element=line.lstrip("```"),
                            )
                        )

                elif line.count("```") == 2 and line[-3] != "`":
                    # check if inline code block (aka have a second ``` in line but not at the end)
                    if currentElement is not None:
                        elements.append(currentElement)
                    currentElement = Element(
                        id=f"id_{len(elements)}",
                        type="code",
                        element=line.lstrip("```"),
                    )
                elif currentElement is not None and currentElement.type == "text":
                    currentElement.element += "\n" + line
                else:
                    if currentElement is not None:
                        elements.append(currentElement)
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="text", element=line
                    )
            elif currentElement is not None and currentElement.type == "code":
                currentElement.element += "\n" + line

            elif line.startswith("|"):
                if currentElement is not None and currentElement.type != "table":
                    if currentElement is not None:
                        elements.append(currentElement)
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="table", element=line
                    )
                elif currentElement is not None:
                    currentElement.element += "\n" + line
                else:
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="table", element=line
                    )
            elif line.startswith("#"):
                if currentElement is not None:
                    elements.append(currentElement)
                currentElement = Element(
                    id=f"id_{len(elements)}",
                    type="title",
                    element=line.lstrip("#"),
                    title_level=len(line) - len(line.lstrip("#")),
                )
            else:
                if currentElement is not None and currentElement.type != "text":
                    elements.append(currentElement)
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="text", element=line
                    )
                elif currentElement is not None:
                    currentElement.element += "\n" + line
                else:
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="text", element=line
                    )
        if currentElement is not None:
            elements.append(currentElement)

        for idx, element in enumerate(elements):
            if element.type == "table":
                should_keep = True
                perfect_table = True

                # verify that the table (markdown) have the same number of columns on each rows
                table_lines = element.element.split("\n")
                table_columns = [len(line.split("|")) for line in table_lines]
                if len(set(table_columns)) > 1:
                    # if the table have different number of columns on each rows, it's not a perfect table
                    # we will store the raw text for such tables instead of converting them to a dataframe
                    perfect_table = False

                # verify that the table (markdown) have at least 2 rows
                if len(table_lines) < 2:
                    should_keep = False

                # apply the table filter, now only filter empty tables
                if should_keep and perfect_table and table_filters is not None:
                    should_keep = all(tf(element) for tf in table_filters)

                # if the element is a table, convert it to a dataframe
                if should_keep:
                    if perfect_table:
                        table = md_to_df(element.element)

                        elements[idx] = Element(
                            id=f"id_{node_id}_{idx}" if node_id else f"id_{idx}",
                            type="table",
                            element=element.element,
                            table=table,
                        )
                    else:
                        # for non-perfect tables, we will store the raw text
                        # and give it a different type to differentiate it from perfect tables
                        elements[idx] = Element(
                            id=f"id_{node_id}_{idx}" if node_id else f"id_{idx}",
                            type="table_text",
                            element=element.element,
                            # table=table
                        )
                else:
                    elements[idx] = Element(
                        id=f"id_{node_id}_{idx}" if node_id else f"id_{idx}",
                        type="text",
                        element=element.element,
                    )
            else:
                # if the element is not a table, keep it as to text
                elements[idx] = Element(
                    id=f"id_{node_id}_{idx}" if node_id else f"id_{idx}",
                    type="text",
                    element=element.element,
                )

        # merge consecutive text elements together for now
        merged_elements: List[Element] = []
        for element in elements:
            if (
                len(merged_elements) > 0
                and element.type == "text"
                and merged_elements[-1].type == "text"
            ):
                merged_elements[-1].element += "\n" + element.element
            else:
                merged_elements.append(element)
        elements = merged_elements
        return merged_elements

    def filter_table(self, table_element: Any) -> bool:
        """Filter tables."""
        table_df = md_to_df(table_element.element)

        # check if table_df is not None, has more than one row, and more than one column
        return table_df is not None and not table_df.empty and len(table_df.columns) > 1

get_nodes_from_node #

get_nodes_from_node(node: TextNode) -> List[BaseNode]

Get nodes from node.

Source code in llama_index/core/node_parser/relational/markdown_element.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def get_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
    """Get nodes from node."""
    elements = self.extract_elements(
        node.get_content(), table_filters=[self.filter_table], node_id=node.node_id
    )
    elements = self.extract_html_tables(elements)
    table_elements = self.get_table_elements(elements)
    # extract summaries over table elements
    self.extract_table_summaries(table_elements)
    # convert into nodes
    # will return a list of Nodes and Index Nodes
    nodes = self.get_nodes_from_elements(
        elements, node, ref_doc_text=node.get_content()
    )
    source_document = node.source_node or node.as_related_node_info()
    for n in nodes:
        n.relationships[NodeRelationship.SOURCE] = source_document
        n.metadata.update(node.metadata)
    return nodes

aget_nodes_from_node async #

aget_nodes_from_node(node: TextNode) -> List[BaseNode]

Get nodes from node.

Source code in llama_index/core/node_parser/relational/markdown_element.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async def aget_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
    """Get nodes from node."""
    elements = self.extract_elements(
        node.get_content(), table_filters=[self.filter_table], node_id=node.node_id
    )
    table_elements = self.get_table_elements(elements)
    # extract summaries over table elements
    await self.aextract_table_summaries(table_elements)
    # convert into nodes
    # will return a list of Nodes and Index Nodes
    nodes = self.get_nodes_from_elements(
        elements, node, ref_doc_text=node.get_content()
    )
    source_document = node.source_node or node.as_related_node_info()
    for n in nodes:
        n.relationships[NodeRelationship.SOURCE] = source_document
        n.metadata.update(node.metadata)
    return nodes

extract_html_tables #

extract_html_tables(elements: List[Element]) -> List[Element]

Extract html tables from text.

Returns:

Type Description
List[Element]

List[Element]: text elements split by table_text element

Source code in llama_index/core/node_parser/relational/markdown_element.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def extract_html_tables(self, elements: List[Element]) -> List[Element]:
    """
    Extract html tables from text.

    Returns:
        List[Element]: text elements split by table_text element

    """
    new_elements = []
    for element in elements:
        if element.type != "text":
            # skip when it is not text
            new_elements.append(element)
            continue
        else:
            text = element.element
            last_pos = 0
            i = 0
            n = len(text)

            while i < n:
                table_start = text.find("<table>", i)
                if table_start == -1:
                    break

                table_end = text.find("</table>", table_start)
                if table_end - table_start <= 7:
                    # not a valid <table></table>
                    break

                # extract text before the table
                pre_text = text[last_pos:table_start].strip()
                if pre_text:
                    new_elements.append(
                        Element(
                            id=f"{element.id}_{len(new_elements)}",
                            type="text",
                            element=pre_text,
                        )
                    )

                # extract the html table
                table_content = text[
                    table_start : table_end + 8
                ]  # 8 is length of </table>
                new_elements.append(
                    Element(
                        id=f"{element.id}_{len(new_elements)}",
                        type="table_text",
                        element=table_content,
                    )
                )

                last_pos = table_end + 8
                i = last_pos

            # add the last piece of text
            final_text = text[last_pos:].strip()
            if final_text:
                new_elements.append(
                    Element(
                        id=f"{element.id}_{len(new_elements)}",
                        type="text",
                        element=final_text,
                    )
                )

    return new_elements

extract_elements #

extract_elements(text: str, node_id: Optional[str] = None, table_filters: Optional[List[Callable]] = None, **kwargs: Any) -> List[Element]

Extract elements from text.

Source code in llama_index/core/node_parser/relational/markdown_element.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def extract_elements(
    self,
    text: str,
    node_id: Optional[str] = None,
    table_filters: Optional[List[Callable]] = None,
    **kwargs: Any,
) -> List[Element]:
    # get node id for each node so that we can avoid using the same id for different nodes
    """Extract elements from text."""
    lines = text.split("\n")
    currentElement = None

    elements: List[Element] = []
    # Then parse the lines
    for line in lines:
        if line.startswith("```"):
            # check if this is the end of a code block
            if currentElement is not None and currentElement.type == "code":
                elements.append(currentElement)
                currentElement = None
                # if there is some text after the ``` create a text element with it
                if len(line) > 3:
                    elements.append(
                        Element(
                            id=f"id_{len(elements)}",
                            type="text",
                            element=line.lstrip("```"),
                        )
                    )

            elif line.count("```") == 2 and line[-3] != "`":
                # check if inline code block (aka have a second ``` in line but not at the end)
                if currentElement is not None:
                    elements.append(currentElement)
                currentElement = Element(
                    id=f"id_{len(elements)}",
                    type="code",
                    element=line.lstrip("```"),
                )
            elif currentElement is not None and currentElement.type == "text":
                currentElement.element += "\n" + line
            else:
                if currentElement is not None:
                    elements.append(currentElement)
                currentElement = Element(
                    id=f"id_{len(elements)}", type="text", element=line
                )
        elif currentElement is not None and currentElement.type == "code":
            currentElement.element += "\n" + line

        elif line.startswith("|"):
            if currentElement is not None and currentElement.type != "table":
                if currentElement is not None:
                    elements.append(currentElement)
                currentElement = Element(
                    id=f"id_{len(elements)}", type="table", element=line
                )
            elif currentElement is not None:
                currentElement.element += "\n" + line
            else:
                currentElement = Element(
                    id=f"id_{len(elements)}", type="table", element=line
                )
        elif line.startswith("#"):
            if currentElement is not None:
                elements.append(currentElement)
            currentElement = Element(
                id=f"id_{len(elements)}",
                type="title",
                element=line.lstrip("#"),
                title_level=len(line) - len(line.lstrip("#")),
            )
        else:
            if currentElement is not None and currentElement.type != "text":
                elements.append(currentElement)
                currentElement = Element(
                    id=f"id_{len(elements)}", type="text", element=line
                )
            elif currentElement is not None:
                currentElement.element += "\n" + line
            else:
                currentElement = Element(
                    id=f"id_{len(elements)}", type="text", element=line
                )
    if currentElement is not None:
        elements.append(currentElement)

    for idx, element in enumerate(elements):
        if element.type == "table":
            should_keep = True
            perfect_table = True

            # verify that the table (markdown) have the same number of columns on each rows
            table_lines = element.element.split("\n")
            table_columns = [len(line.split("|")) for line in table_lines]
            if len(set(table_columns)) > 1:
                # if the table have different number of columns on each rows, it's not a perfect table
                # we will store the raw text for such tables instead of converting them to a dataframe
                perfect_table = False

            # verify that the table (markdown) have at least 2 rows
            if len(table_lines) < 2:
                should_keep = False

            # apply the table filter, now only filter empty tables
            if should_keep and perfect_table and table_filters is not None:
                should_keep = all(tf(element) for tf in table_filters)

            # if the element is a table, convert it to a dataframe
            if should_keep:
                if perfect_table:
                    table = md_to_df(element.element)

                    elements[idx] = Element(
                        id=f"id_{node_id}_{idx}" if node_id else f"id_{idx}",
                        type="table",
                        element=element.element,
                        table=table,
                    )
                else:
                    # for non-perfect tables, we will store the raw text
                    # and give it a different type to differentiate it from perfect tables
                    elements[idx] = Element(
                        id=f"id_{node_id}_{idx}" if node_id else f"id_{idx}",
                        type="table_text",
                        element=element.element,
                        # table=table
                    )
            else:
                elements[idx] = Element(
                    id=f"id_{node_id}_{idx}" if node_id else f"id_{idx}",
                    type="text",
                    element=element.element,
                )
        else:
            # if the element is not a table, keep it as to text
            elements[idx] = Element(
                id=f"id_{node_id}_{idx}" if node_id else f"id_{idx}",
                type="text",
                element=element.element,
            )

    # merge consecutive text elements together for now
    merged_elements: List[Element] = []
    for element in elements:
        if (
            len(merged_elements) > 0
            and element.type == "text"
            and merged_elements[-1].type == "text"
        ):
            merged_elements[-1].element += "\n" + element.element
        else:
            merged_elements.append(element)
    elements = merged_elements
    return merged_elements

filter_table #

filter_table(table_element: Any) -> bool

Filter tables.

Source code in llama_index/core/node_parser/relational/markdown_element.py
288
289
290
291
292
293
def filter_table(self, table_element: Any) -> bool:
    """Filter tables."""
    table_df = md_to_df(table_element.element)

    # check if table_df is not None, has more than one row, and more than one column
    return table_df is not None and not table_df.empty and len(table_df.columns) > 1

UnstructuredElementNodeParser #

Bases: BaseElementNodeParser

Unstructured element node parser.

Splits a document into Text Nodes and Index Nodes corresponding to embedded objects (e.g. tables).

Parameters:

Name Type Description Default
partitioning_parameters Dict[str, Any] | None

Extra dictionary representing parameters of the partitioning process.

{}
Source code in llama_index/core/node_parser/relational/unstructured_element.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class UnstructuredElementNodeParser(BaseElementNodeParser):
    """
    Unstructured element node parser.

    Splits a document into Text Nodes and Index Nodes corresponding to embedded objects
    (e.g. tables).

    """

    partitioning_parameters: Optional[Dict[str, Any]] = Field(
        default={},
        description="Extra dictionary representing parameters of the partitioning process.",
    )

    def __init__(
        self,
        callback_manager: Optional[CallbackManager] = None,
        llm: Optional[Any] = None,
        summary_query_str: str = DEFAULT_SUMMARY_QUERY_STR,
        partitioning_parameters: Optional[Dict[str, Any]] = {},
    ) -> None:
        """Initialize."""
        try:
            import lxml  # noqa  # pants: no-infer-dep
            import unstructured  # noqa  # pants: no-infer-dep
        except ImportError:
            raise ImportError(
                "You must install the `unstructured` and `lxml` "
                "package to use this node parser."
            )
        callback_manager = callback_manager or CallbackManager([])

        return super().__init__(
            callback_manager=callback_manager,
            llm=llm,
            summary_query_str=summary_query_str,
            partitioning_parameters=partitioning_parameters,
        )

    @classmethod
    def class_name(cls) -> str:
        return "UnstructuredElementNodeParser"

    def get_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
        """Get nodes from node."""
        elements = self.extract_elements(
            node.get_content(), table_filters=[self.filter_table]
        )
        table_elements = self.get_table_elements(elements)
        # extract summaries over table elements
        self.extract_table_summaries(table_elements)
        # convert into nodes
        # will return a list of Nodes and Index Nodes
        nodes = self.get_nodes_from_elements(
            elements, node, ref_doc_text=node.get_content()
        )

        source_document = node.source_node or node.as_related_node_info()
        for n in nodes:
            n.relationships[NodeRelationship.SOURCE] = source_document
            n.metadata.update(node.metadata)
        return nodes

    async def aget_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
        """Get nodes from node."""
        elements = self.extract_elements(
            node.get_content(), table_filters=[self.filter_table]
        )
        table_elements = self.get_table_elements(elements)
        # extract summaries over table elements
        await self.aextract_table_summaries(table_elements)
        # convert into nodes
        # will return a list of Nodes and Index Nodes
        nodes = self.get_nodes_from_elements(
            elements, node, ref_doc_text=node.get_content()
        )

        source_document = node.source_node or node.as_related_node_info()
        for n in nodes:
            n.relationships[NodeRelationship.SOURCE] = source_document
            n.metadata.update(node.metadata)
        return nodes

    def extract_elements(
        self, text: str, table_filters: Optional[List[Callable]] = None, **kwargs: Any
    ) -> List[Element]:
        """Extract elements from text."""
        from unstructured.partition.html import partition_html  # pants: no-infer-dep

        table_filters = table_filters or []
        partitioning_parameters = self.partitioning_parameters or {}
        elements = partition_html(text=text, **partitioning_parameters)
        output_els = []
        for idx, element in enumerate(elements):
            if "unstructured.documents.elements.Table" in str(type(element)):
                should_keep = all(tf(element) for tf in table_filters)
                if should_keep:
                    table_df = html_to_df(str(element.metadata.text_as_html))
                    output_els.append(
                        Element(
                            id=f"id_{idx}",
                            type="table",
                            element=element,
                            table=table_df,
                        )
                    )
                else:
                    # if not a table, keep it as Text as we don't want to lose context
                    from unstructured.documents.elements import Text

                    new_element = Text(str(element))
                    output_els.append(
                        Element(id=f"id_{idx}", type="text", element=new_element)
                    )
            else:
                output_els.append(Element(id=f"id_{idx}", type="text", element=element))
        return output_els

    def filter_table(self, table_element: Any) -> bool:
        """Filter tables."""
        table_df = html_to_df(table_element.metadata.text_as_html)

        # check if table_df is not None, has more than one row, and more than one column
        return table_df is not None and not table_df.empty and len(table_df.columns) > 1

get_nodes_from_node #

get_nodes_from_node(node: TextNode) -> List[BaseNode]

Get nodes from node.

Source code in llama_index/core/node_parser/relational/unstructured_element.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def get_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
    """Get nodes from node."""
    elements = self.extract_elements(
        node.get_content(), table_filters=[self.filter_table]
    )
    table_elements = self.get_table_elements(elements)
    # extract summaries over table elements
    self.extract_table_summaries(table_elements)
    # convert into nodes
    # will return a list of Nodes and Index Nodes
    nodes = self.get_nodes_from_elements(
        elements, node, ref_doc_text=node.get_content()
    )

    source_document = node.source_node or node.as_related_node_info()
    for n in nodes:
        n.relationships[NodeRelationship.SOURCE] = source_document
        n.metadata.update(node.metadata)
    return nodes

aget_nodes_from_node async #

aget_nodes_from_node(node: TextNode) -> List[BaseNode]

Get nodes from node.

Source code in llama_index/core/node_parser/relational/unstructured_element.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
async def aget_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
    """Get nodes from node."""
    elements = self.extract_elements(
        node.get_content(), table_filters=[self.filter_table]
    )
    table_elements = self.get_table_elements(elements)
    # extract summaries over table elements
    await self.aextract_table_summaries(table_elements)
    # convert into nodes
    # will return a list of Nodes and Index Nodes
    nodes = self.get_nodes_from_elements(
        elements, node, ref_doc_text=node.get_content()
    )

    source_document = node.source_node or node.as_related_node_info()
    for n in nodes:
        n.relationships[NodeRelationship.SOURCE] = source_document
        n.metadata.update(node.metadata)
    return nodes

extract_elements #

extract_elements(text: str, table_filters: Optional[List[Callable]] = None, **kwargs: Any) -> List[Element]

Extract elements from text.

Source code in llama_index/core/node_parser/relational/unstructured_element.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def extract_elements(
    self, text: str, table_filters: Optional[List[Callable]] = None, **kwargs: Any
) -> List[Element]:
    """Extract elements from text."""
    from unstructured.partition.html import partition_html  # pants: no-infer-dep

    table_filters = table_filters or []
    partitioning_parameters = self.partitioning_parameters or {}
    elements = partition_html(text=text, **partitioning_parameters)
    output_els = []
    for idx, element in enumerate(elements):
        if "unstructured.documents.elements.Table" in str(type(element)):
            should_keep = all(tf(element) for tf in table_filters)
            if should_keep:
                table_df = html_to_df(str(element.metadata.text_as_html))
                output_els.append(
                    Element(
                        id=f"id_{idx}",
                        type="table",
                        element=element,
                        table=table_df,
                    )
                )
            else:
                # if not a table, keep it as Text as we don't want to lose context
                from unstructured.documents.elements import Text

                new_element = Text(str(element))
                output_els.append(
                    Element(id=f"id_{idx}", type="text", element=new_element)
                )
        else:
            output_els.append(Element(id=f"id_{idx}", type="text", element=element))
    return output_els

filter_table #

filter_table(table_element: Any) -> bool

Filter tables.

Source code in llama_index/core/node_parser/relational/unstructured_element.py
136
137
138
139
140
141
def filter_table(self, table_element: Any) -> bool:
    """Filter tables."""
    table_df = html_to_df(table_element.metadata.text_as_html)

    # check if table_df is not None, has more than one row, and more than one column
    return table_df is not None and not table_df.empty and len(table_df.columns) > 1

LlamaParseJsonNodeParser #

Bases: BaseElementNodeParser

Llama Parse Json format element node parser.

Splits a json format document from LlamaParse into Text Nodes and Index Nodes corresponding to embedded objects (e.g. tables).

Source code in llama_index/core/node_parser/relational/llama_parse_json_element.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
class LlamaParseJsonNodeParser(BaseElementNodeParser):
    """
    Llama Parse Json format element node parser.

    Splits a json format document from LlamaParse into Text Nodes and Index Nodes
    corresponding to embedded objects (e.g. tables).

    """

    @classmethod
    def class_name(cls) -> str:
        return "LlamaParseJsonNodeParser"

    def get_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
        """Get nodes from node."""
        elements = self.extract_elements(
            node.get_content(),
            table_filters=[self.filter_table],
            node_id=node.id_,
            node_metadata=node.metadata,
        )
        table_elements = self.get_table_elements(elements)
        # extract summaries over table elements
        self.extract_table_summaries(table_elements)
        # convert into nodes
        # will return a list of Nodes and Index Nodes
        return self.get_nodes_from_elements(
            elements, node, ref_doc_text=node.get_content()
        )

    async def aget_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
        """Get nodes from node."""
        elements = self.extract_elements(
            node.get_content(),
            table_filters=[self.filter_table],
            node_id=node.id_,
            node_metadata=node.metadata,
        )
        table_elements = self.get_table_elements(elements)
        # extract summaries over table elements
        await self.aextract_table_summaries(table_elements)
        # convert into nodes
        # will return a list of Nodes and Index Nodes
        return self.get_nodes_from_elements(
            elements, node, ref_doc_text=node.get_content()
        )

    def extract_elements(
        self,
        text: str,
        mode: Optional[str] = "json",
        node_id: Optional[str] = None,
        node_metadata: Optional[Dict[str, Any]] = None,
        table_filters: Optional[List[Callable]] = None,
        **kwargs: Any,
    ) -> List[Element]:
        # get node id for each node so that we can avoid using the same id for different nodes
        """
        Extract elements from json based nodes.

        Args:
            text: node's text content
            mode: different modes for returning different types of elements based on the selected mode
            node_id: unique id for the node
            node_metadata: metadata for the node. the json output for the nodes contains a lot of fields for elements

        """
        elements: List[Element] = []
        currentElement = None
        page_number = node_metadata.get("page") if node_metadata is not None else 0

        if mode == "json" and node_metadata is not None:
            json_items = node_metadata.get("items") or []
            for element_idx, json_item in enumerate(json_items):
                ele_type = json_item.get("type")
                if ele_type == "heading":
                    elements.append(
                        Element(
                            id=f"id_page_{page_number}_heading_{element_idx}",
                            type="heading",
                            title_level=json_item.get("lvl"),
                            element=json_item.get("value"),
                            markdown=json_item.get("md"),
                            page_number=page_number,
                        )
                    )
                elif ele_type == "text":
                    elements.append(
                        Element(
                            id=f"id_page_{page_number}_text_{element_idx}",
                            type="text",
                            element=json_item.get("value"),
                            markdown=json_item.get("md"),
                            page_number=page_number,
                        )
                    )
                elif ele_type == "table":
                    elements.append(
                        Element(
                            id=f"id_page_{page_number}_table_{element_idx}",
                            type="table",
                            element=json_item.get("rows"),
                            markdown=json_item.get("md"),
                            page_number=page_number,
                        )
                    )
        elif mode == "images" and node_metadata is not None:
            # only get images from json metadata
            images = node_metadata.get("images") or []
            for idx, image in enumerate(images):
                elements.append(
                    Element(
                        id=f"id_page_{page_number}_image_{idx}",
                        type="image",
                        element=image,
                    )
                )
        else:
            lines = text.split("\n")
            # Then parse the lines from raw text of json
            for line in lines:
                if line.startswith("```"):
                    # check if this is the end of a code block
                    if currentElement is not None and currentElement.type == "code":
                        elements.append(currentElement)
                        currentElement = None
                        # if there is some text after the ``` create a text element with it
                        if len(line) > 3:
                            elements.append(
                                Element(
                                    id=f"id_{len(elements)}",
                                    type="text",
                                    element=line.lstrip("```"),
                                )
                            )

                    elif line.count("```") == 2 and line[-3] != "`":
                        # check if inline code block (aka have a second ``` in line but not at the end)
                        if currentElement is not None:
                            elements.append(currentElement)
                        currentElement = Element(
                            id=f"id_{len(elements)}",
                            type="code",
                            element=line.lstrip("```"),
                        )
                    elif currentElement is not None and currentElement.type == "text":
                        currentElement.element += "\n" + line
                    else:
                        if currentElement is not None:
                            elements.append(currentElement)
                        currentElement = Element(
                            id=f"id_{len(elements)}", type="text", element=line
                        )

                elif currentElement is not None and currentElement.type == "code":
                    currentElement.element += "\n" + line

                elif line.startswith("|"):
                    if currentElement is not None and currentElement.type != "table":
                        if currentElement is not None:
                            elements.append(currentElement)
                        currentElement = Element(
                            id=f"id_{len(elements)}", type="table", element=line
                        )
                    elif currentElement is not None:
                        currentElement.element += "\n" + line
                    else:
                        currentElement = Element(
                            id=f"id_{len(elements)}", type="table", element=line
                        )
                elif line.startswith("#"):
                    if currentElement is not None:
                        elements.append(currentElement)
                    currentElement = Element(
                        id=f"id_{len(elements)}",
                        type="title",
                        element=line.lstrip("#"),
                        title_level=len(line) - len(line.lstrip("#")),
                    )
                else:
                    if currentElement is not None and currentElement.type != "text":
                        elements.append(currentElement)
                        currentElement = Element(
                            id=f"id_{len(elements)}", type="text", element=line
                        )
                    elif currentElement is not None:
                        currentElement.element += "\n" + line
                    else:
                        currentElement = Element(
                            id=f"id_{len(elements)}", type="text", element=line
                        )
        if currentElement is not None:
            elements.append(currentElement)

        for idx, element in enumerate(elements):
            if element.type == "table":
                assert element.markdown is not None

                should_keep = True
                perfect_table = True

                # verify that the table (markdown) have the same number of columns on each rows
                table_lines = element.markdown.split("\n")
                table_columns = [len(line.split("|")) for line in table_lines]
                if len(set(table_columns)) > 1:
                    # if the table have different number of columns on each rows, it's not a perfect table
                    # we will store the raw text for such tables instead of converting them to a dataframe
                    perfect_table = False

                # verify that the table (markdown) have at least 2 rows
                if len(table_lines) < 2:
                    should_keep = False

                # apply the table filter, now only filter empty tables
                if should_keep and perfect_table and table_filters is not None:
                    should_keep = all(tf(element) for tf in table_filters)

                # if the element is a table, convert it to a dataframe
                if should_keep:
                    if perfect_table:
                        assert element.markdown is not None
                        table = md_to_df(element.markdown)

                        elements[idx] = Element(
                            id=(
                                f"id_page_{page_number}_{node_id}_{idx}"
                                if node_id
                                else f"id_{idx}"
                            ),
                            type="table",
                            element=element,
                            table=table,
                        )
                    else:
                        # for non-perfect tables, we will store the raw text
                        # and give it a different type to differentiate it from perfect tables
                        elements[idx] = Element(
                            id=(
                                f"id_page_{page_number}_{node_id}_{idx}"
                                if node_id
                                else f"id_{idx}"
                            ),
                            type="table_text",
                            element=element.element,
                            # table=table
                        )
                else:
                    elements[idx] = Element(
                        id=(
                            f"id_page_{page_number}_{node_id}_{idx}"
                            if node_id
                            else f"id_page_{page_number}_{idx}"
                        ),
                        type="text",
                        element=element.element,
                    )
            else:
                # if the element is not a table, keep it as to text
                elements[idx] = Element(
                    id=(
                        f"id_page_{page_number}_{node_id}_{idx}"
                        if node_id
                        else f"id_page_{page_number}_{idx}"
                    ),
                    type="text",
                    element=element.element,
                )

        # merge consecutive text elements together for now
        merged_elements: List[Element] = []
        for element in elements:
            if (
                len(merged_elements) > 0
                and element.type == "text"
                and merged_elements[-1].type == "text"
            ):
                if isinstance(element.element, list):
                    merged_elements[-1].element += "\n" + " ".join(
                        str(e) for e in element.element
                    )
                else:
                    merged_elements[-1].element += "\n" + element.element
            else:
                merged_elements.append(element)
        elements = merged_elements
        return merged_elements

    def filter_table(self, table_element: Any) -> bool:
        """Filter tables."""
        # convert markdown of the table to df
        table_df = md_to_df(table_element.markdown)

        # check if table_df is not None, has more than one row, and more than one column
        return table_df is not None and not table_df.empty and len(table_df.columns) > 1

get_nodes_from_node #

get_nodes_from_node(node: TextNode) -> List[BaseNode]

Get nodes from node.

Source code in llama_index/core/node_parser/relational/llama_parse_json_element.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def get_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
    """Get nodes from node."""
    elements = self.extract_elements(
        node.get_content(),
        table_filters=[self.filter_table],
        node_id=node.id_,
        node_metadata=node.metadata,
    )
    table_elements = self.get_table_elements(elements)
    # extract summaries over table elements
    self.extract_table_summaries(table_elements)
    # convert into nodes
    # will return a list of Nodes and Index Nodes
    return self.get_nodes_from_elements(
        elements, node, ref_doc_text=node.get_content()
    )

aget_nodes_from_node async #

aget_nodes_from_node(node: TextNode) -> List[BaseNode]

Get nodes from node.

Source code in llama_index/core/node_parser/relational/llama_parse_json_element.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def aget_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
    """Get nodes from node."""
    elements = self.extract_elements(
        node.get_content(),
        table_filters=[self.filter_table],
        node_id=node.id_,
        node_metadata=node.metadata,
    )
    table_elements = self.get_table_elements(elements)
    # extract summaries over table elements
    await self.aextract_table_summaries(table_elements)
    # convert into nodes
    # will return a list of Nodes and Index Nodes
    return self.get_nodes_from_elements(
        elements, node, ref_doc_text=node.get_content()
    )

extract_elements #

extract_elements(text: str, mode: Optional[str] = 'json', node_id: Optional[str] = None, node_metadata: Optional[Dict[str, Any]] = None, table_filters: Optional[List[Callable]] = None, **kwargs: Any) -> List[Element]

Extract elements from json based nodes.

Parameters:

Name Type Description Default
text str

node's text content

required
mode Optional[str]

different modes for returning different types of elements based on the selected mode

'json'
node_id Optional[str]

unique id for the node

None
node_metadata Optional[Dict[str, Any]]

metadata for the node. the json output for the nodes contains a lot of fields for elements

None
Source code in llama_index/core/node_parser/relational/llama_parse_json_element.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def extract_elements(
    self,
    text: str,
    mode: Optional[str] = "json",
    node_id: Optional[str] = None,
    node_metadata: Optional[Dict[str, Any]] = None,
    table_filters: Optional[List[Callable]] = None,
    **kwargs: Any,
) -> List[Element]:
    # get node id for each node so that we can avoid using the same id for different nodes
    """
    Extract elements from json based nodes.

    Args:
        text: node's text content
        mode: different modes for returning different types of elements based on the selected mode
        node_id: unique id for the node
        node_metadata: metadata for the node. the json output for the nodes contains a lot of fields for elements

    """
    elements: List[Element] = []
    currentElement = None
    page_number = node_metadata.get("page") if node_metadata is not None else 0

    if mode == "json" and node_metadata is not None:
        json_items = node_metadata.get("items") or []
        for element_idx, json_item in enumerate(json_items):
            ele_type = json_item.get("type")
            if ele_type == "heading":
                elements.append(
                    Element(
                        id=f"id_page_{page_number}_heading_{element_idx}",
                        type="heading",
                        title_level=json_item.get("lvl"),
                        element=json_item.get("value"),
                        markdown=json_item.get("md"),
                        page_number=page_number,
                    )
                )
            elif ele_type == "text":
                elements.append(
                    Element(
                        id=f"id_page_{page_number}_text_{element_idx}",
                        type="text",
                        element=json_item.get("value"),
                        markdown=json_item.get("md"),
                        page_number=page_number,
                    )
                )
            elif ele_type == "table":
                elements.append(
                    Element(
                        id=f"id_page_{page_number}_table_{element_idx}",
                        type="table",
                        element=json_item.get("rows"),
                        markdown=json_item.get("md"),
                        page_number=page_number,
                    )
                )
    elif mode == "images" and node_metadata is not None:
        # only get images from json metadata
        images = node_metadata.get("images") or []
        for idx, image in enumerate(images):
            elements.append(
                Element(
                    id=f"id_page_{page_number}_image_{idx}",
                    type="image",
                    element=image,
                )
            )
    else:
        lines = text.split("\n")
        # Then parse the lines from raw text of json
        for line in lines:
            if line.startswith("```"):
                # check if this is the end of a code block
                if currentElement is not None and currentElement.type == "code":
                    elements.append(currentElement)
                    currentElement = None
                    # if there is some text after the ``` create a text element with it
                    if len(line) > 3:
                        elements.append(
                            Element(
                                id=f"id_{len(elements)}",
                                type="text",
                                element=line.lstrip("```"),
                            )
                        )

                elif line.count("```") == 2 and line[-3] != "`":
                    # check if inline code block (aka have a second ``` in line but not at the end)
                    if currentElement is not None:
                        elements.append(currentElement)
                    currentElement = Element(
                        id=f"id_{len(elements)}",
                        type="code",
                        element=line.lstrip("```"),
                    )
                elif currentElement is not None and currentElement.type == "text":
                    currentElement.element += "\n" + line
                else:
                    if currentElement is not None:
                        elements.append(currentElement)
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="text", element=line
                    )

            elif currentElement is not None and currentElement.type == "code":
                currentElement.element += "\n" + line

            elif line.startswith("|"):
                if currentElement is not None and currentElement.type != "table":
                    if currentElement is not None:
                        elements.append(currentElement)
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="table", element=line
                    )
                elif currentElement is not None:
                    currentElement.element += "\n" + line
                else:
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="table", element=line
                    )
            elif line.startswith("#"):
                if currentElement is not None:
                    elements.append(currentElement)
                currentElement = Element(
                    id=f"id_{len(elements)}",
                    type="title",
                    element=line.lstrip("#"),
                    title_level=len(line) - len(line.lstrip("#")),
                )
            else:
                if currentElement is not None and currentElement.type != "text":
                    elements.append(currentElement)
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="text", element=line
                    )
                elif currentElement is not None:
                    currentElement.element += "\n" + line
                else:
                    currentElement = Element(
                        id=f"id_{len(elements)}", type="text", element=line
                    )
    if currentElement is not None:
        elements.append(currentElement)

    for idx, element in enumerate(elements):
        if element.type == "table":
            assert element.markdown is not None

            should_keep = True
            perfect_table = True

            # verify that the table (markdown) have the same number of columns on each rows
            table_lines = element.markdown.split("\n")
            table_columns = [len(line.split("|")) for line in table_lines]
            if len(set(table_columns)) > 1:
                # if the table have different number of columns on each rows, it's not a perfect table
                # we will store the raw text for such tables instead of converting them to a dataframe
                perfect_table = False

            # verify that the table (markdown) have at least 2 rows
            if len(table_lines) < 2:
                should_keep = False

            # apply the table filter, now only filter empty tables
            if should_keep and perfect_table and table_filters is not None:
                should_keep = all(tf(element) for tf in table_filters)

            # if the element is a table, convert it to a dataframe
            if should_keep:
                if perfect_table:
                    assert element.markdown is not None
                    table = md_to_df(element.markdown)

                    elements[idx] = Element(
                        id=(
                            f"id_page_{page_number}_{node_id}_{idx}"
                            if node_id
                            else f"id_{idx}"
                        ),
                        type="table",
                        element=element,
                        table=table,
                    )
                else:
                    # for non-perfect tables, we will store the raw text
                    # and give it a different type to differentiate it from perfect tables
                    elements[idx] = Element(
                        id=(
                            f"id_page_{page_number}_{node_id}_{idx}"
                            if node_id
                            else f"id_{idx}"
                        ),
                        type="table_text",
                        element=element.element,
                        # table=table
                    )
            else:
                elements[idx] = Element(
                    id=(
                        f"id_page_{page_number}_{node_id}_{idx}"
                        if node_id
                        else f"id_page_{page_number}_{idx}"
                    ),
                    type="text",
                    element=element.element,
                )
        else:
            # if the element is not a table, keep it as to text
            elements[idx] = Element(
                id=(
                    f"id_page_{page_number}_{node_id}_{idx}"
                    if node_id
                    else f"id_page_{page_number}_{idx}"
                ),
                type="text",
                element=element.element,
            )

    # merge consecutive text elements together for now
    merged_elements: List[Element] = []
    for element in elements:
        if (
            len(merged_elements) > 0
            and element.type == "text"
            and merged_elements[-1].type == "text"
        ):
            if isinstance(element.element, list):
                merged_elements[-1].element += "\n" + " ".join(
                    str(e) for e in element.element
                )
            else:
                merged_elements[-1].element += "\n" + element.element
        else:
            merged_elements.append(element)
    elements = merged_elements
    return merged_elements

filter_table #

filter_table(table_element: Any) -> bool

Filter tables.

Source code in llama_index/core/node_parser/relational/llama_parse_json_element.py
298
299
300
301
302
303
304
def filter_table(self, table_element: Any) -> bool:
    """Filter tables."""
    # convert markdown of the table to df
    table_df = md_to_df(table_element.markdown)

    # check if table_df is not None, has more than one row, and more than one column
    return table_df is not None and not table_df.empty and len(table_df.columns) > 1

CodeSplitter #

Bases: TextSplitter

Split code using a AST parser.

Thank you to Kevin Lu / SweepAI for suggesting this elegant code splitting solution. https://docs.sweep.dev/blogs/chunking-2m-files

Parameters:

Name Type Description Default
language str

The programming language of the code being split.

required
chunk_lines int

The number of lines to include in each chunk.

40
chunk_lines_overlap int

How many lines of code each chunk overlaps with.

15
max_chars int

Maximum number of characters per chunk.

1500
Source code in llama_index/core/node_parser/text/code.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class CodeSplitter(TextSplitter):
    """
    Split code using a AST parser.

    Thank you to Kevin Lu / SweepAI for suggesting this elegant code splitting solution.
    https://docs.sweep.dev/blogs/chunking-2m-files
    """

    language: str = Field(
        description="The programming language of the code being split."
    )
    chunk_lines: int = Field(
        default=DEFAULT_CHUNK_LINES,
        description="The number of lines to include in each chunk.",
        gt=0,
    )
    chunk_lines_overlap: int = Field(
        default=DEFAULT_LINES_OVERLAP,
        description="How many lines of code each chunk overlaps with.",
        gt=0,
    )
    max_chars: int = Field(
        default=DEFAULT_MAX_CHARS,
        description="Maximum number of characters per chunk.",
        gt=0,
    )
    _parser: Any = PrivateAttr()

    def __init__(
        self,
        language: str,
        chunk_lines: int = DEFAULT_CHUNK_LINES,
        chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP,
        max_chars: int = DEFAULT_MAX_CHARS,
        parser: Any = None,
        callback_manager: Optional[CallbackManager] = None,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        id_func: Optional[Callable[[int, Document], str]] = None,
    ) -> None:
        """Initialize a CodeSplitter."""
        from tree_sitter import Parser  # pants: no-infer-dep

        callback_manager = callback_manager or CallbackManager([])
        id_func = id_func or default_id_func

        super().__init__(
            language=language,
            chunk_lines=chunk_lines,
            chunk_lines_overlap=chunk_lines_overlap,
            max_chars=max_chars,
            callback_manager=callback_manager,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            id_func=id_func,
        )

        if parser is None:
            try:
                import tree_sitter_language_pack  # pants: no-infer-dep

                parser = tree_sitter_language_pack.get_parser(language)  # type: ignore
            except ImportError:
                raise ImportError(
                    "Please install tree_sitter_language_pack to use CodeSplitter."
                    "Or pass in a parser object."
                )
            except Exception:
                print(
                    f"Could not get parser for language {language}. Check "
                    "https://github.com/Goldziher/tree-sitter-language-pack?tab=readme-ov-file#available-languages "
                    "for a list of valid languages."
                )
                raise
        if not isinstance(parser, Parser):
            raise ValueError("Parser must be a tree-sitter Parser object.")

        self._parser = parser

    @classmethod
    def from_defaults(
        cls,
        language: str,
        chunk_lines: int = DEFAULT_CHUNK_LINES,
        chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP,
        max_chars: int = DEFAULT_MAX_CHARS,
        callback_manager: Optional[CallbackManager] = None,
        parser: Any = None,
    ) -> "CodeSplitter":
        """Create a CodeSplitter with default values."""
        return cls(
            language=language,
            chunk_lines=chunk_lines,
            chunk_lines_overlap=chunk_lines_overlap,
            max_chars=max_chars,
            callback_manager=callback_manager,
            parser=parser,
        )

    @classmethod
    def class_name(cls) -> str:
        return "CodeSplitter"

    def _chunk_node(self, node: Any, text_bytes: bytes, last_end: int = 0) -> List[str]:
        """
        Recursively chunk a node into smaller pieces based on character limits.

        Args:
            node (Any): The AST node to chunk.
            text_bytes (bytes): The original source code text as bytes.
            last_end (int, optional): The ending position of the last processed chunk. Defaults to 0.

        Returns:
            List[str]: A list of code chunks that respect the max_chars limit.

        """
        new_chunks = []
        current_chunk = ""
        for child in node.children:
            if child.end_byte - child.start_byte > self.max_chars:
                # Child is too big, recursively chunk the child
                if len(current_chunk) > 0:
                    new_chunks.append(current_chunk)
                current_chunk = ""
                new_chunks.extend(self._chunk_node(child, text_bytes, last_end))
            elif (
                len(current_chunk) + child.end_byte - child.start_byte > self.max_chars
            ):
                # Child would make the current chunk too big, so start a new chunk
                new_chunks.append(current_chunk)
                current_chunk = text_bytes[last_end : child.end_byte].decode("utf-8")
            else:
                current_chunk += text_bytes[last_end : child.end_byte].decode("utf-8")
            last_end = child.end_byte
        if len(current_chunk) > 0:
            new_chunks.append(current_chunk)
        return new_chunks

    def split_text(self, text: str) -> List[str]:
        """
        Split incoming code into chunks using the AST parser.

        This method parses the input code into an AST and then chunks it while preserving
        syntactic structure. It handles error cases and ensures the code can be properly parsed.

        Args:
            text (str): The source code text to split.

        Returns:
            List[str]: A list of code chunks.

        Raises:
            ValueError: If the code cannot be parsed for the specified language.

        """
        """Split incoming code and return chunks using the AST."""
        with self.callback_manager.event(
            CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]}
        ) as event:
            text_bytes = bytes(text, "utf-8")
            tree = self._parser.parse(text_bytes)

            if (
                not tree.root_node.children
                or tree.root_node.children[0].type != "ERROR"
            ):
                chunks = [
                    chunk.strip()
                    for chunk in self._chunk_node(tree.root_node, text_bytes)
                ]
                event.on_end(
                    payload={EventPayload.CHUNKS: chunks},
                )

                return chunks
            else:
                raise ValueError(f"Could not parse code with language {self.language}.")

from_defaults classmethod #

from_defaults(language: str, chunk_lines: int = DEFAULT_CHUNK_LINES, chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP, max_chars: int = DEFAULT_MAX_CHARS, callback_manager: Optional[CallbackManager] = None, parser: Any = None) -> CodeSplitter

Create a CodeSplitter with default values.

Source code in llama_index/core/node_parser/text/code.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@classmethod
def from_defaults(
    cls,
    language: str,
    chunk_lines: int = DEFAULT_CHUNK_LINES,
    chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP,
    max_chars: int = DEFAULT_MAX_CHARS,
    callback_manager: Optional[CallbackManager] = None,
    parser: Any = None,
) -> "CodeSplitter":
    """Create a CodeSplitter with default values."""
    return cls(
        language=language,
        chunk_lines=chunk_lines,
        chunk_lines_overlap=chunk_lines_overlap,
        max_chars=max_chars,
        callback_manager=callback_manager,
        parser=parser,
    )

split_text #

split_text(text: str) -> List[str]

Split incoming code into chunks using the AST parser.

This method parses the input code into an AST and then chunks it while preserving syntactic structure. It handles error cases and ensures the code can be properly parsed.

Parameters:

Name Type Description Default
text str

The source code text to split.

required

Returns:

Type Description
List[str]

List[str]: A list of code chunks.

Raises:

Type Description
ValueError

If the code cannot be parsed for the specified language.

Source code in llama_index/core/node_parser/text/code.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def split_text(self, text: str) -> List[str]:
    """
    Split incoming code into chunks using the AST parser.

    This method parses the input code into an AST and then chunks it while preserving
    syntactic structure. It handles error cases and ensures the code can be properly parsed.

    Args:
        text (str): The source code text to split.

    Returns:
        List[str]: A list of code chunks.

    Raises:
        ValueError: If the code cannot be parsed for the specified language.

    """
    """Split incoming code and return chunks using the AST."""
    with self.callback_manager.event(
        CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]}
    ) as event:
        text_bytes = bytes(text, "utf-8")
        tree = self._parser.parse(text_bytes)

        if (
            not tree.root_node.children
            or tree.root_node.children[0].type != "ERROR"
        ):
            chunks = [
                chunk.strip()
                for chunk in self._chunk_node(tree.root_node, text_bytes)
            ]
            event.on_end(
                payload={EventPayload.CHUNKS: chunks},
            )

            return chunks
        else:
            raise ValueError(f"Could not parse code with language {self.language}.")

LangchainNodeParser #

Bases: TextSplitter

Basic wrapper around langchain's text splitter.

TODO: Figure out how to make this metadata aware.

Source code in llama_index/core/node_parser/text/langchain.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class LangchainNodeParser(TextSplitter):
    """
    Basic wrapper around langchain's text splitter.

    TODO: Figure out how to make this metadata aware.
    """

    _lc_splitter: "LC_TextSplitter" = PrivateAttr()

    def __init__(
        self,
        lc_splitter: "LC_TextSplitter",
        callback_manager: Optional[CallbackManager] = None,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        id_func: Optional[Callable[[int, Document], str]] = None,
    ):
        """Initialize with parameters."""
        id_func = id_func or default_id_func

        super().__init__(
            callback_manager=callback_manager or CallbackManager(),
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            id_func=id_func,
        )
        self._lc_splitter = lc_splitter

    def split_text(self, text: str) -> List[str]:
        """Split text into sentences."""
        return self._lc_splitter.split_text(text)

split_text #

split_text(text: str) -> List[str]

Split text into sentences.

Source code in llama_index/core/node_parser/text/langchain.py
43
44
45
def split_text(self, text: str) -> List[str]:
    """Split text into sentences."""
    return self._lc_splitter.split_text(text)

SemanticSplitterNodeParser #

Bases: NodeParser

Semantic node parser.

Splits a document into Nodes, with each node being a group of semantically related sentences.

Parameters:

Name Type Description Default
buffer_size int

number of sentences to group together when evaluating semantic similarity

1
embed_model BaseEmbedding

(BaseEmbedding): embedding model to use

required
sentence_splitter Optional[Callable]

splits text into sentences

<function split_by_sentence_tokenizer.<locals>.<lambda> at 0x7f5a7dd35b20>
breakpoint_percentile_threshold int

dissimilarity threshold for creating semantic breakpoints, lower value will generate more nodes

95
include_metadata bool

whether to include metadata in nodes

required
include_prev_next_rel bool

whether to include prev/next relationships

required
Source code in llama_index/core/node_parser/text/semantic_splitter.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
class SemanticSplitterNodeParser(NodeParser):
    """
    Semantic node parser.

    Splits a document into Nodes, with each node being a group of semantically related sentences.

    Args:
        buffer_size (int): number of sentences to group together when evaluating semantic similarity
        embed_model: (BaseEmbedding): embedding model to use
        sentence_splitter (Optional[Callable]): splits text into sentences
        breakpoint_percentile_threshold (int): dissimilarity threshold for creating semantic breakpoints, lower value will generate more nodes
        include_metadata (bool): whether to include metadata in nodes
        include_prev_next_rel (bool): whether to include prev/next relationships

    """

    sentence_splitter: SentenceSplitterCallable = Field(
        default_factory=split_by_sentence_tokenizer,
        description="The text splitter to use when splitting documents.",
        exclude=True,
    )

    embed_model: SerializeAsAny[BaseEmbedding] = Field(
        description="The embedding model to use to for semantic comparison",
    )

    buffer_size: int = Field(
        default=1,
        description=(
            "The number of sentences to group together when evaluating semantic similarity. "
            "Set to 1 to consider each sentence individually. "
            "Set to >1 to group sentences together."
        ),
    )

    breakpoint_percentile_threshold: int = Field(
        default=95,
        description=(
            "The percentile of cosine dissimilarity that must be exceeded between a "
            "group of sentences and the next to form a node.  The smaller this "
            "number is, the more nodes will be generated"
        ),
    )

    @classmethod
    def class_name(cls) -> str:
        return "SemanticSplitterNodeParser"

    @classmethod
    def from_defaults(
        cls,
        embed_model: Optional[BaseEmbedding] = None,
        breakpoint_percentile_threshold: Optional[int] = 95,
        buffer_size: Optional[int] = 1,
        sentence_splitter: Optional[Callable[[str], List[str]]] = None,
        original_text_metadata_key: str = DEFAULT_OG_TEXT_METADATA_KEY,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        callback_manager: Optional[CallbackManager] = None,
        id_func: Optional[Callable[[int, Document], str]] = None,
    ) -> "SemanticSplitterNodeParser":
        callback_manager = callback_manager or CallbackManager([])

        sentence_splitter = sentence_splitter or split_by_sentence_tokenizer()
        if embed_model is None:
            try:
                from llama_index.embeddings.openai import (
                    OpenAIEmbedding,
                )  # pants: no-infer-dep

                embed_model = embed_model or OpenAIEmbedding()
            except ImportError:
                raise ImportError(
                    "`llama-index-embeddings-openai` package not found, "
                    "please run `pip install llama-index-embeddings-openai`"
                )

        id_func = id_func or default_id_func

        return cls(
            embed_model=embed_model,
            breakpoint_percentile_threshold=breakpoint_percentile_threshold,
            buffer_size=buffer_size,
            sentence_splitter=sentence_splitter,
            original_text_metadata_key=original_text_metadata_key,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            callback_manager=callback_manager,
            id_func=id_func,
        )

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """Parse document into nodes."""
        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")

        for node in nodes_with_progress:
            nodes = self.build_semantic_nodes_from_documents([node], show_progress)
            all_nodes.extend(nodes)

        return all_nodes

    async def _aparse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """Asynchronously parse document into nodes."""
        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")

        for node in nodes_with_progress:
            nodes = await self.abuild_semantic_nodes_from_documents(
                [node], show_progress
            )
            all_nodes.extend(nodes)

        return all_nodes

    def build_semantic_nodes_from_documents(
        self,
        documents: Sequence[Document],
        show_progress: bool = False,
    ) -> List[BaseNode]:
        """Build window nodes from documents."""
        all_nodes: List[BaseNode] = []
        for doc in documents:
            text = doc.text
            text_splits = self.sentence_splitter(text)

            sentences = self._build_sentence_groups(text_splits)

            combined_sentence_embeddings = self.embed_model.get_text_embedding_batch(
                [s["combined_sentence"] for s in sentences],
                show_progress=show_progress,
            )

            for i, embedding in enumerate(combined_sentence_embeddings):
                sentences[i]["combined_sentence_embedding"] = embedding

            distances = self._calculate_distances_between_sentence_groups(sentences)

            chunks = self._build_node_chunks(sentences, distances)

            nodes = build_nodes_from_splits(
                chunks,
                doc,
                id_func=self.id_func,
            )

            all_nodes.extend(nodes)

        return all_nodes

    async def abuild_semantic_nodes_from_documents(
        self,
        documents: Sequence[Document],
        show_progress: bool = False,
    ) -> List[BaseNode]:
        """Asynchronously build window nodes from documents."""
        all_nodes: List[BaseNode] = []
        for doc in documents:
            text = doc.text
            text_splits = self.sentence_splitter(text)

            sentences = self._build_sentence_groups(text_splits)

            combined_sentence_embeddings = (
                await self.embed_model.aget_text_embedding_batch(
                    [s["combined_sentence"] for s in sentences],
                    show_progress=show_progress,
                )
            )

            for i, embedding in enumerate(combined_sentence_embeddings):
                sentences[i]["combined_sentence_embedding"] = embedding

            distances = self._calculate_distances_between_sentence_groups(sentences)

            chunks = self._build_node_chunks(sentences, distances)

            nodes = build_nodes_from_splits(
                chunks,
                doc,
                id_func=self.id_func,
            )

            all_nodes.extend(nodes)

        return all_nodes

    def _build_sentence_groups(
        self, text_splits: List[str]
    ) -> List[SentenceCombination]:
        sentences: List[SentenceCombination] = [
            {
                "sentence": x,
                "index": i,
                "combined_sentence": "",
                "combined_sentence_embedding": [],
            }
            for i, x in enumerate(text_splits)
        ]

        # Group sentences and calculate embeddings for sentence groups
        for i in range(len(sentences)):
            combined_sentence = ""

            for j in range(i - self.buffer_size, i):
                if j >= 0:
                    combined_sentence += sentences[j]["sentence"]

            combined_sentence += sentences[i]["sentence"]

            for j in range(i + 1, i + 1 + self.buffer_size):
                if j < len(sentences):
                    combined_sentence += sentences[j]["sentence"]

            sentences[i]["combined_sentence"] = combined_sentence

        return sentences

    def _calculate_distances_between_sentence_groups(
        self, sentences: List[SentenceCombination]
    ) -> List[float]:
        distances = []
        for i in range(len(sentences) - 1):
            embedding_current = sentences[i]["combined_sentence_embedding"]
            embedding_next = sentences[i + 1]["combined_sentence_embedding"]

            similarity = self.embed_model.similarity(embedding_current, embedding_next)

            distance = 1 - similarity

            distances.append(distance)

        return distances

    def _build_node_chunks(
        self, sentences: List[SentenceCombination], distances: List[float]
    ) -> List[str]:
        chunks = []
        if len(distances) > 0:
            breakpoint_distance_threshold = np.percentile(
                distances, self.breakpoint_percentile_threshold
            )

            indices_above_threshold = [
                i for i, x in enumerate(distances) if x > breakpoint_distance_threshold
            ]

            # Chunk sentences into semantic groups based on percentile breakpoints
            start_index = 0

            for index in indices_above_threshold:
                group = sentences[start_index : index + 1]
                combined_text = "".join([d["sentence"] for d in group])
                chunks.append(combined_text)

                start_index = index + 1

            if start_index < len(sentences):
                combined_text = "".join(
                    [d["sentence"] for d in sentences[start_index:]]
                )
                chunks.append(combined_text)
        else:
            # If, for some reason we didn't get any distances (i.e. very, very small documents) just
            # treat the whole document as a single node
            chunks = [" ".join([s["sentence"] for s in sentences])]

        return chunks

build_semantic_nodes_from_documents #

build_semantic_nodes_from_documents(documents: Sequence[Document], show_progress: bool = False) -> List[BaseNode]

Build window nodes from documents.

Source code in llama_index/core/node_parser/text/semantic_splitter.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def build_semantic_nodes_from_documents(
    self,
    documents: Sequence[Document],
    show_progress: bool = False,
) -> List[BaseNode]:
    """Build window nodes from documents."""
    all_nodes: List[BaseNode] = []
    for doc in documents:
        text = doc.text
        text_splits = self.sentence_splitter(text)

        sentences = self._build_sentence_groups(text_splits)

        combined_sentence_embeddings = self.embed_model.get_text_embedding_batch(
            [s["combined_sentence"] for s in sentences],
            show_progress=show_progress,
        )

        for i, embedding in enumerate(combined_sentence_embeddings):
            sentences[i]["combined_sentence_embedding"] = embedding

        distances = self._calculate_distances_between_sentence_groups(sentences)

        chunks = self._build_node_chunks(sentences, distances)

        nodes = build_nodes_from_splits(
            chunks,
            doc,
            id_func=self.id_func,
        )

        all_nodes.extend(nodes)

    return all_nodes

abuild_semantic_nodes_from_documents async #

abuild_semantic_nodes_from_documents(documents: Sequence[Document], show_progress: bool = False) -> List[BaseNode]

Asynchronously build window nodes from documents.

Source code in llama_index/core/node_parser/text/semantic_splitter.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
async def abuild_semantic_nodes_from_documents(
    self,
    documents: Sequence[Document],
    show_progress: bool = False,
) -> List[BaseNode]:
    """Asynchronously build window nodes from documents."""
    all_nodes: List[BaseNode] = []
    for doc in documents:
        text = doc.text
        text_splits = self.sentence_splitter(text)

        sentences = self._build_sentence_groups(text_splits)

        combined_sentence_embeddings = (
            await self.embed_model.aget_text_embedding_batch(
                [s["combined_sentence"] for s in sentences],
                show_progress=show_progress,
            )
        )

        for i, embedding in enumerate(combined_sentence_embeddings):
            sentences[i]["combined_sentence_embedding"] = embedding

        distances = self._calculate_distances_between_sentence_groups(sentences)

        chunks = self._build_node_chunks(sentences, distances)

        nodes = build_nodes_from_splits(
            chunks,
            doc,
            id_func=self.id_func,
        )

        all_nodes.extend(nodes)

    return all_nodes

SemanticDoubleMergingSplitterNodeParser #

Bases: NodeParser

Semantic double merging text splitter.

Splits a document into Nodes, with each node being a group of semantically related sentences.

Parameters:

Name Type Description Default
language_config LanguageConfig

chooses language and spacy language model to be used

<llama_index.core.node_parser.text.semantic_double_merging_splitter.LanguageConfig object at 0x7f5a84d1e0f0>
initial_threshold float

sets threshold for initializing new chunk

0.6
appending_threshold float

sets threshold for appending new sentences to chunk

0.8
merging_threshold float

sets threshold for merging whole chunks

0.8
max_chunk_size int

maximum size of chunk (in characters)

1000
merging_range int

How many chunks 'ahead' beyond the nearest neighbor to be merged if similar (1 or 2 available)

1
merging_separator str

The separator to use when merging chunks. Defaults to a single space.

' '
sentence_splitter Optional[Callable]

splits text into sentences

<function split_by_sentence_tokenizer.<locals>.<lambda> at 0x7f5a7dd34680>
Source code in llama_index/core/node_parser/text/semantic_double_merging_splitter.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
class SemanticDoubleMergingSplitterNodeParser(NodeParser):
    """
    Semantic double merging text splitter.

    Splits a document into Nodes, with each node being a group of semantically related sentences.

    Args:
        language_config (LanguageConfig): chooses language and spacy language model to be used
        initial_threshold (float): sets threshold for initializing new chunk
        appending_threshold (float): sets threshold for appending new sentences to chunk
        merging_threshold (float): sets threshold for merging whole chunks
        max_chunk_size (int): maximum size of chunk (in characters)
        merging_range (int): How many chunks 'ahead' beyond the nearest neighbor to be merged if similar (1 or 2 available)
        merging_separator (str): The separator to use when merging chunks. Defaults to a single space.
        sentence_splitter (Optional[Callable]): splits text into sentences

    """

    language_config: LanguageConfig = Field(
        default=LanguageConfig(),
        description="Config that selects language and spacy model for chunking",
    )

    initial_threshold: float = Field(
        default=0.6,
        description=(
            "The value of semantic similarity that must be exceeded between two"
            "sentences to create a new chunk.  The bigger this "
            "value is, the more nodes will be generated. Range is from 0 to 1."
        ),
    )

    appending_threshold: float = Field(
        default=0.8,
        description=(
            "The value of semantic similarity that must be exceeded between a "
            "chunk and new sentence to add this sentence to existing chunk.  The bigger this "
            "value is, the more nodes will be generated. Range is from 0 to 1."
        ),
    )

    merging_threshold: float = Field(
        default=0.8,
        description=(
            "The value of semantic similarity that must be exceeded between two chunks "
            "to form a bigger chunk.  The bigger this value is,"
            "the more nodes will be generated. Range is from 0 to 1."
        ),
    )

    max_chunk_size: int = Field(
        default=1000,
        description="Maximum length of chunk that can be subjected to verification (number of characters)",
    )

    merging_range: int = Field(
        default=1,
        description=(
            "How many chunks 'ahead' beyond the nearest neighbor"
            "should the algorithm check during the second pass"
            "(possible options are 1 or 2"
        ),
    )

    merging_separator: str = Field(
        default=" ",
        description="The separator to use when merging chunks. Defaults to a single space.",
    )

    sentence_splitter: Callable[[str], List[str]] = Field(
        default_factory=split_by_sentence_tokenizer,
        description="The text splitter to use when splitting documents.",
        exclude=True,
    )

    @classmethod
    def class_name(cls) -> str:
        return "SemanticDoubleMergingSplitterNodeParser"

    @classmethod
    def from_defaults(
        cls,
        language_config: Optional[LanguageConfig] = LanguageConfig(),
        initial_threshold: Optional[float] = 0.6,
        appending_threshold: Optional[float] = 0.8,
        merging_threshold: Optional[float] = 0.8,
        max_chunk_size: Optional[int] = 1000,
        merging_range: Optional[int] = 1,
        merging_separator: Optional[str] = " ",
        sentence_splitter: Optional[Callable[[str], List[str]]] = None,
        original_text_metadata_key: str = DEFAULT_OG_TEXT_METADATA_KEY,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        callback_manager: Optional[CallbackManager] = None,
        id_func: Optional[Callable[[int, Document], str]] = None,
    ) -> "SemanticDoubleMergingSplitterNodeParser":
        callback_manager = callback_manager or CallbackManager([])

        sentence_splitter = sentence_splitter or split_by_sentence_tokenizer()

        id_func = id_func or default_id_func

        return cls(
            language_config=language_config,
            initial_threshold=initial_threshold,
            appending_threshold=appending_threshold,
            merging_threshold=merging_threshold,
            max_chunk_size=max_chunk_size,
            merging_range=merging_range,
            merging_separator=merging_separator,
            sentence_splitter=sentence_splitter,
            original_text_metadata_key=original_text_metadata_key,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            callback_manager=callback_manager,
            id_func=id_func,
        )

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """Parse document into nodes."""
        # Load model
        self.language_config.load_model()

        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")

        for node in nodes_with_progress:
            nodes = self.build_semantic_nodes_from_nodes([node])
            all_nodes.extend(nodes)
        return all_nodes

    def build_semantic_nodes_from_documents(
        self,
        documents: Sequence[Document],
    ) -> List[BaseNode]:
        """Build window nodes from documents."""
        return self.build_semantic_nodes_from_nodes(documents)

    def build_semantic_nodes_from_nodes(
        self,
        nodes: Sequence[BaseNode],
    ) -> List[BaseNode]:
        """Build window nodes from nodes."""
        all_nodes: List[BaseNode] = []

        for node in nodes:
            text = node.get_content()
            sentences = self.sentence_splitter(text)
            sentences = [s.strip() for s in sentences]
            initial_chunks = self._create_initial_chunks(sentences)
            chunks = self._merge_initial_chunks(initial_chunks)

            split_nodes = build_nodes_from_splits(
                chunks,
                node,
                id_func=self.id_func,
            )

            previous_node: Optional[BaseNode] = None
            for split_node in split_nodes:
                if previous_node:
                    split_node.relationships[NodeRelationship.PREVIOUS] = (
                        previous_node.as_related_node_info()
                    )
                    previous_node.relationships[NodeRelationship.NEXT] = (
                        split_node.as_related_node_info()
                    )
                previous_node = split_node
            all_nodes.extend(split_nodes)

        return all_nodes

    def _create_initial_chunks(self, sentences: List[str]) -> List[str]:
        initial_chunks: List[str] = []
        chunk = sentences[0]  # ""
        new = True

        assert self.language_config.nlp is not None

        for sentence in sentences[1:]:
            if new:
                # check if 2 sentences got anything in common

                if (
                    self.language_config.nlp(
                        self._clean_text_advanced(chunk)
                    ).similarity(
                        self.language_config.nlp(self._clean_text_advanced(sentence))
                    )
                    < self.initial_threshold
                    and len(chunk) + len(sentence) + 1 <= self.max_chunk_size
                ):
                    # if not then leave first sentence as separate chunk
                    initial_chunks.append(chunk)
                    chunk = sentence
                    continue

                chunk_sentences = [chunk]
                if len(chunk) + len(sentence) + 1 <= self.max_chunk_size:
                    chunk_sentences.append(sentence)
                    chunk = self.merging_separator.join(chunk_sentences)
                    new = False
                else:
                    new = True
                    initial_chunks.append(chunk)
                    chunk = sentence
                    continue
                last_sentences = self.merging_separator.join(chunk_sentences[-2:])
                # new = False

            elif (
                self.language_config.nlp(
                    self._clean_text_advanced(last_sentences)
                ).similarity(
                    self.language_config.nlp(self._clean_text_advanced(sentence))
                )
                > self.appending_threshold
                and len(chunk) + len(sentence) + 1 <= self.max_chunk_size
            ):
                # elif nlp(last_sentences).similarity(nlp(sentence)) > self.threshold:
                chunk_sentences.append(sentence)
                last_sentences = self.merging_separator.join(chunk_sentences[-2:])
                chunk += self.merging_separator + sentence
            else:
                initial_chunks.append(chunk)
                chunk = sentence  # ""
                new = True
        initial_chunks.append(chunk)

        return initial_chunks

    def _merge_initial_chunks(self, initial_chunks: List[str]) -> List[str]:
        chunks: List[str] = []
        skip = 0
        current = initial_chunks[0]

        assert self.language_config.nlp is not None

        # TODO avoid connecting 1st chunk with 3rd if 2nd one is above some value, or if its length is above some value

        for i in range(1, len(initial_chunks)):
            # avoid connecting same chunk multiple times
            if skip > 0:
                skip -= 1
                continue

            current_nlp = self.language_config.nlp(self._clean_text_advanced(current))

            if len(current) >= self.max_chunk_size:
                chunks.append(current)
                current = initial_chunks[i]

            # check if 1st and 2nd chunk should be connected
            elif (
                current_nlp.similarity(
                    self.language_config.nlp(
                        self._clean_text_advanced(initial_chunks[i])
                    )
                )
                > self.merging_threshold
                and len(current) + len(initial_chunks[i]) + 1 <= self.max_chunk_size
            ):
                current += self.merging_separator + initial_chunks[i]

            # check if 1st and 3rd chunk are similar, if yes then merge 1st, 2nd, 3rd together
            elif (
                i <= len(initial_chunks) - 2
                and current_nlp.similarity(
                    self.language_config.nlp(
                        self._clean_text_advanced(initial_chunks[i + 1])
                    )
                )
                > self.merging_threshold
                and len(current)
                + len(initial_chunks[i])
                + len(initial_chunks[i + 1])
                + 2
                <= self.max_chunk_size
            ):
                current += (
                    self.merging_separator
                    + initial_chunks[i]
                    + self.merging_separator
                    + initial_chunks[i + 1]
                )
                skip = 1

            # check if 1st and 4th chunk are smilar, if yes then merge 1st, 2nd, 3rd and 4th together
            elif (
                i < len(initial_chunks) - 2
                and current_nlp.similarity(
                    self.language_config.nlp(
                        self._clean_text_advanced(initial_chunks[i + 2])
                    )
                )
                > self.merging_threshold
                and self.merging_range == 2
                and len(current)
                + len(initial_chunks[i])
                + len(initial_chunks[i + 1])
                + len(initial_chunks[i + 2])
                + 3
                <= self.max_chunk_size
            ):
                current += (
                    self.merging_separator
                    + initial_chunks[i]
                    + self.merging_separator
                    + initial_chunks[i + 1]
                    + self.merging_separator
                    + initial_chunks[i + 2]
                )
                skip = 2

            else:
                chunks.append(current)
                current = initial_chunks[i]

        chunks.append(current)
        return chunks

    def _clean_text_advanced(self, text: str) -> str:
        text = text.lower()
        # Remove urls
        text = re.sub(r"http\S+|www\S+|https\S+", "", text, flags=re.MULTILINE)
        # Remove punctuations
        text = text.translate(str.maketrans("", "", string.punctuation))
        # Remove stopwords
        tokens = globals_helper.punkt_tokenizer.tokenize(text)
        filtered_words = [w for w in tokens if w not in self.language_config.stopwords]

        return " ".join(filtered_words)

build_semantic_nodes_from_documents #

build_semantic_nodes_from_documents(documents: Sequence[Document]) -> List[BaseNode]

Build window nodes from documents.

Source code in llama_index/core/node_parser/text/semantic_double_merging_splitter.py
198
199
200
201
202
203
def build_semantic_nodes_from_documents(
    self,
    documents: Sequence[Document],
) -> List[BaseNode]:
    """Build window nodes from documents."""
    return self.build_semantic_nodes_from_nodes(documents)

build_semantic_nodes_from_nodes #

build_semantic_nodes_from_nodes(nodes: Sequence[BaseNode]) -> List[BaseNode]

Build window nodes from nodes.

Source code in llama_index/core/node_parser/text/semantic_double_merging_splitter.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def build_semantic_nodes_from_nodes(
    self,
    nodes: Sequence[BaseNode],
) -> List[BaseNode]:
    """Build window nodes from nodes."""
    all_nodes: List[BaseNode] = []

    for node in nodes:
        text = node.get_content()
        sentences = self.sentence_splitter(text)
        sentences = [s.strip() for s in sentences]
        initial_chunks = self._create_initial_chunks(sentences)
        chunks = self._merge_initial_chunks(initial_chunks)

        split_nodes = build_nodes_from_splits(
            chunks,
            node,
            id_func=self.id_func,
        )

        previous_node: Optional[BaseNode] = None
        for split_node in split_nodes:
            if previous_node:
                split_node.relationships[NodeRelationship.PREVIOUS] = (
                    previous_node.as_related_node_info()
                )
                previous_node.relationships[NodeRelationship.NEXT] = (
                    split_node.as_related_node_info()
                )
            previous_node = split_node
        all_nodes.extend(split_nodes)

    return all_nodes

SentenceSplitter #

Bases: MetadataAwareTextSplitter

Parse text with a preference for complete sentences.

In general, this class tries to keep sentences and paragraphs together. Therefore compared to the original TokenTextSplitter, there are less likely to be hanging sentences or parts of sentences at the end of the node chunk.

Parameters:

Name Type Description Default
chunk_size int

The token chunk size for each chunk.

1024
chunk_overlap int

The token overlap of each chunk when splitting.

200
separator str

Default separator for splitting into words

' '
paragraph_separator str

Separator between paragraphs.

'\n\n\n'
secondary_chunking_regex str | None

Backup regex for splitting into sentences.

'[^,.;。?!]+[,.;。?!]?|[,.;。?!]'
Source code in llama_index/core/node_parser/text/sentence.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
class SentenceSplitter(MetadataAwareTextSplitter):
    """
    Parse text with a preference for complete sentences.

    In general, this class tries to keep sentences and paragraphs together. Therefore
    compared to the original TokenTextSplitter, there are less likely to be
    hanging sentences or parts of sentences at the end of the node chunk.
    """

    chunk_size: int = Field(
        default=DEFAULT_CHUNK_SIZE,
        description="The token chunk size for each chunk.",
        gt=0,
    )
    chunk_overlap: int = Field(
        default=SENTENCE_CHUNK_OVERLAP,
        description="The token overlap of each chunk when splitting.",
        ge=0,
    )
    separator: str = Field(
        default=" ", description="Default separator for splitting into words"
    )
    paragraph_separator: str = Field(
        default=DEFAULT_PARAGRAPH_SEP, description="Separator between paragraphs."
    )
    secondary_chunking_regex: Optional[str] = Field(
        default=CHUNKING_REGEX, description="Backup regex for splitting into sentences."
    )

    _chunking_tokenizer_fn: Callable[[str], List[str]] = PrivateAttr()
    _tokenizer: Callable = PrivateAttr()
    _split_fns: List[Callable] = PrivateAttr()
    _sub_sentence_split_fns: List[Callable] = PrivateAttr()

    def __init__(
        self,
        separator: str = " ",
        chunk_size: int = DEFAULT_CHUNK_SIZE,
        chunk_overlap: int = SENTENCE_CHUNK_OVERLAP,
        tokenizer: Optional[Callable] = None,
        paragraph_separator: str = DEFAULT_PARAGRAPH_SEP,
        chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None,
        secondary_chunking_regex: Optional[str] = CHUNKING_REGEX,
        callback_manager: Optional[CallbackManager] = None,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        id_func: Optional[Callable] = None,
    ):
        """Initialize with parameters."""
        if chunk_overlap > chunk_size:
            raise ValueError(
                f"Got a larger chunk overlap ({chunk_overlap}) than chunk size "
                f"({chunk_size}), should be smaller."
            )
        id_func = id_func or default_id_func
        callback_manager = callback_manager or CallbackManager([])
        super().__init__(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            secondary_chunking_regex=secondary_chunking_regex,
            separator=separator,
            paragraph_separator=paragraph_separator,
            callback_manager=callback_manager,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            id_func=id_func,
        )
        self._chunking_tokenizer_fn = (
            chunking_tokenizer_fn or split_by_sentence_tokenizer()
        )
        self._tokenizer = tokenizer or get_tokenizer()

        self._split_fns = [
            split_by_sep(paragraph_separator),
            self._chunking_tokenizer_fn,
        ]

        if secondary_chunking_regex:
            self._sub_sentence_split_fns = [
                split_by_regex(secondary_chunking_regex),
                split_by_sep(separator),
                split_by_char(),
            ]
        else:
            self._sub_sentence_split_fns = [
                split_by_sep(separator),
                split_by_char(),
            ]

    @classmethod
    def from_defaults(
        cls,
        separator: str = " ",
        chunk_size: int = DEFAULT_CHUNK_SIZE,
        chunk_overlap: int = SENTENCE_CHUNK_OVERLAP,
        tokenizer: Optional[Callable] = None,
        paragraph_separator: str = DEFAULT_PARAGRAPH_SEP,
        chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None,
        secondary_chunking_regex: str = CHUNKING_REGEX,
        callback_manager: Optional[CallbackManager] = None,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
    ) -> "SentenceSplitter":
        """Initialize with parameters."""
        callback_manager = callback_manager or CallbackManager([])
        return cls(
            separator=separator,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            tokenizer=tokenizer,
            paragraph_separator=paragraph_separator,
            chunking_tokenizer_fn=chunking_tokenizer_fn,
            secondary_chunking_regex=secondary_chunking_regex,
            callback_manager=callback_manager,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
        )

    @classmethod
    def class_name(cls) -> str:
        return "SentenceSplitter"

    def split_text_metadata_aware(self, text: str, metadata_str: str) -> List[str]:
        metadata_len = len(self._tokenizer(metadata_str))
        effective_chunk_size = self.chunk_size - metadata_len
        if effective_chunk_size <= 0:
            raise ValueError(
                f"Metadata length ({metadata_len}) is longer than chunk size "
                f"({self.chunk_size}). Consider increasing the chunk size or "
                "decreasing the size of your metadata to avoid this."
            )
        elif effective_chunk_size < 50:
            print(
                f"Metadata length ({metadata_len}) is close to chunk size "
                f"({self.chunk_size}). Resulting chunks are less than 50 tokens. "
                "Consider increasing the chunk size or decreasing the size of "
                "your metadata to avoid this.",
                flush=True,
            )

        return self._split_text(text, chunk_size=effective_chunk_size)

    def split_text(self, text: str) -> List[str]:
        return self._split_text(text, chunk_size=self.chunk_size)

    def _split_text(self, text: str, chunk_size: int) -> List[str]:
        """
        _Split incoming text and return chunks with overlap size.

        Has a preference for complete sentences, phrases, and minimal overlap.
        """
        if text == "":
            return [text]

        with self.callback_manager.event(
            CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]}
        ) as event:
            splits = self._split(text, chunk_size)
            chunks = self._merge(splits, chunk_size)

            event.on_end(payload={EventPayload.CHUNKS: chunks})

        return chunks

    def _split(self, text: str, chunk_size: int) -> List[_Split]:
        r"""
        Break text into splits that are smaller than chunk size.

        The order of splitting is:
        1. split by paragraph separator
        2. split by chunking tokenizer (default is nltk sentence tokenizer)
        3. split by second chunking regex (default is "[^,\.;]+[,\.;]?")
        4. split by default separator (" ")

        """
        token_size = self._token_size(text)
        if token_size <= chunk_size:
            return [_Split(text, is_sentence=True, token_size=token_size)]

        text_splits_by_fns, is_sentence = self._get_splits_by_fns(text)

        text_splits = []
        for text_split_by_fns in text_splits_by_fns:
            token_size = self._token_size(text_split_by_fns)
            if token_size <= chunk_size:
                text_splits.append(
                    _Split(
                        text_split_by_fns,
                        is_sentence=is_sentence,
                        token_size=token_size,
                    )
                )
            else:
                recursive_text_splits = self._split(
                    text_split_by_fns, chunk_size=chunk_size
                )
                text_splits.extend(recursive_text_splits)
        return text_splits

    def _merge(self, splits: List[_Split], chunk_size: int) -> List[str]:
        """Merge splits into chunks."""
        chunks: List[str] = []
        cur_chunk: List[Tuple[str, int]] = []  # list of (text, length)
        last_chunk: List[Tuple[str, int]] = []
        cur_chunk_len = 0
        new_chunk = True

        def close_chunk() -> None:
            nonlocal chunks, cur_chunk, last_chunk, cur_chunk_len, new_chunk

            chunks.append("".join([text for text, length in cur_chunk]))
            last_chunk = cur_chunk
            cur_chunk = []
            cur_chunk_len = 0
            new_chunk = True

            # add overlap to the next chunk using the last one first
            if len(last_chunk) > 0:
                last_index = len(last_chunk) - 1
                while (
                    last_index >= 0
                    and cur_chunk_len + last_chunk[last_index][1] <= self.chunk_overlap
                ):
                    overlap_text, overlap_length = last_chunk[last_index]
                    cur_chunk_len += overlap_length
                    cur_chunk.insert(0, (overlap_text, overlap_length))
                    last_index -= 1

        split_idx = 0
        while split_idx < len(splits):
            cur_split = splits[split_idx]
            if cur_split.token_size > chunk_size:
                raise ValueError("Single token exceeded chunk size")
            if cur_chunk_len + cur_split.token_size > chunk_size and not new_chunk:
                # if adding split to current chunk exceeds chunk size: close out chunk
                close_chunk()
            else:
                # If this is a new chunk with overlap, and adding the split would
                # exceed chunk_size, remove overlap to make room
                if new_chunk and cur_chunk_len + cur_split.token_size > chunk_size:
                    # Remove overlap from the beginning until split fits
                    while (
                        len(cur_chunk) > 0
                        and cur_chunk_len + cur_split.token_size > chunk_size
                    ):
                        _, length = cur_chunk.pop(0)
                        cur_chunk_len -= length

                if (
                    cur_split.is_sentence
                    or cur_chunk_len + cur_split.token_size <= chunk_size
                    or new_chunk  # new chunk, always add at least one split
                ):
                    # add split to chunk
                    cur_chunk_len += cur_split.token_size
                    cur_chunk.append((cur_split.text, cur_split.token_size))
                    split_idx += 1
                    new_chunk = False
                else:
                    # close out chunk
                    close_chunk()

        # handle the last chunk
        if not new_chunk:
            chunk = "".join([text for text, length in cur_chunk])
            chunks.append(chunk)

        # run postprocessing to remove blank spaces
        return self._postprocess_chunks(chunks)

    def _postprocess_chunks(self, chunks: List[str]) -> List[str]:
        """
        Post-process chunks.
        Remove whitespace only chunks and remove leading and trailing whitespace.
        """
        new_chunks = []
        for chunk in chunks:
            stripped_chunk = chunk.strip()
            if stripped_chunk == "":
                continue
            new_chunks.append(stripped_chunk)
        return new_chunks

    def _token_size(self, text: str) -> int:
        return len(self._tokenizer(text))

    def _get_splits_by_fns(self, text: str) -> Tuple[List[str], bool]:
        for split_fn in self._split_fns:
            splits = split_fn(text)
            if len(splits) > 1:
                return splits, True

        for split_fn in self._sub_sentence_split_fns:
            splits = split_fn(text)
            if len(splits) > 1:
                break

        return splits, False

from_defaults classmethod #

from_defaults(separator: str = ' ', chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = SENTENCE_CHUNK_OVERLAP, tokenizer: Optional[Callable] = None, paragraph_separator: str = DEFAULT_PARAGRAPH_SEP, chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None, secondary_chunking_regex: str = CHUNKING_REGEX, callback_manager: Optional[CallbackManager] = None, include_metadata: bool = True, include_prev_next_rel: bool = True) -> SentenceSplitter

Initialize with parameters.

Source code in llama_index/core/node_parser/text/sentence.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def from_defaults(
    cls,
    separator: str = " ",
    chunk_size: int = DEFAULT_CHUNK_SIZE,
    chunk_overlap: int = SENTENCE_CHUNK_OVERLAP,
    tokenizer: Optional[Callable] = None,
    paragraph_separator: str = DEFAULT_PARAGRAPH_SEP,
    chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None,
    secondary_chunking_regex: str = CHUNKING_REGEX,
    callback_manager: Optional[CallbackManager] = None,
    include_metadata: bool = True,
    include_prev_next_rel: bool = True,
) -> "SentenceSplitter":
    """Initialize with parameters."""
    callback_manager = callback_manager or CallbackManager([])
    return cls(
        separator=separator,
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        tokenizer=tokenizer,
        paragraph_separator=paragraph_separator,
        chunking_tokenizer_fn=chunking_tokenizer_fn,
        secondary_chunking_regex=secondary_chunking_regex,
        callback_manager=callback_manager,
        include_metadata=include_metadata,
        include_prev_next_rel=include_prev_next_rel,
    )

SentenceWindowNodeParser #

Bases: NodeParser

Sentence window node parser.

Splits a document into Nodes, with each node being a sentence. Each node contains a window from the surrounding sentences in the metadata.

Parameters:

Name Type Description Default
sentence_splitter Optional[Callable]

splits text into sentences

<function split_by_sentence_tokenizer.<locals>.<lambda> at 0x7f5a7dd34680>
include_metadata bool

whether to include metadata in nodes

required
include_prev_next_rel bool

whether to include prev/next relationships

required
window_size int

The number of sentences on each side of a sentence to capture.

3
window_metadata_key str

The metadata key to store the sentence window under.

'window'
original_text_metadata_key str

The metadata key to store the original sentence in.

'original_text'
Source code in llama_index/core/node_parser/text/sentence_window.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class SentenceWindowNodeParser(NodeParser):
    """
    Sentence window node parser.

    Splits a document into Nodes, with each node being a sentence.
    Each node contains a window from the surrounding sentences in the metadata.

    Args:
        sentence_splitter (Optional[Callable]): splits text into sentences
        include_metadata (bool): whether to include metadata in nodes
        include_prev_next_rel (bool): whether to include prev/next relationships

    """

    sentence_splitter: Callable[[str], List[str]] = Field(
        default_factory=split_by_sentence_tokenizer,
        description="The text splitter to use when splitting documents.",
        exclude=True,
    )
    window_size: int = Field(
        default=DEFAULT_WINDOW_SIZE,
        description="The number of sentences on each side of a sentence to capture.",
        gt=0,
    )
    window_metadata_key: str = Field(
        default=DEFAULT_WINDOW_METADATA_KEY,
        description="The metadata key to store the sentence window under.",
    )
    original_text_metadata_key: str = Field(
        default=DEFAULT_OG_TEXT_METADATA_KEY,
        description="The metadata key to store the original sentence in.",
    )

    @classmethod
    def class_name(cls) -> str:
        return "SentenceWindowNodeParser"

    @classmethod
    def from_defaults(
        cls,
        sentence_splitter: Optional[Callable[[str], List[str]]] = None,
        window_size: int = DEFAULT_WINDOW_SIZE,
        window_metadata_key: str = DEFAULT_WINDOW_METADATA_KEY,
        original_text_metadata_key: str = DEFAULT_OG_TEXT_METADATA_KEY,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        callback_manager: Optional[CallbackManager] = None,
        id_func: Optional[Callable[[int, Document], str]] = None,
    ) -> "SentenceWindowNodeParser":
        callback_manager = callback_manager or CallbackManager([])

        sentence_splitter = sentence_splitter or split_by_sentence_tokenizer()

        id_func = id_func or default_id_func

        return cls(
            sentence_splitter=sentence_splitter,
            window_size=window_size,
            window_metadata_key=window_metadata_key,
            original_text_metadata_key=original_text_metadata_key,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            callback_manager=callback_manager,
            id_func=id_func,
        )

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """Parse document into nodes."""
        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")

        for node in nodes_with_progress:
            nodes = self.build_window_nodes_from_documents([node])
            all_nodes.extend(nodes)

        return all_nodes

    def build_window_nodes_from_documents(
        self, documents: Sequence[Document]
    ) -> List[BaseNode]:
        """Build window nodes from documents."""
        all_nodes: List[BaseNode] = []
        for doc in documents:
            text = doc.text
            text_splits = self.sentence_splitter(text)
            nodes = build_nodes_from_splits(
                text_splits,
                doc,
                id_func=self.id_func,
            )

            # add window to each node
            for i, node in enumerate(nodes):
                window_nodes = nodes[
                    max(0, i - self.window_size) : min(
                        i + self.window_size + 1, len(nodes)
                    )
                ]

                node.metadata[self.window_metadata_key] = " ".join(
                    [n.text for n in window_nodes]
                )
                node.metadata[self.original_text_metadata_key] = node.text

                # exclude window metadata from embed and llm
                node.excluded_embed_metadata_keys.extend(
                    [self.window_metadata_key, self.original_text_metadata_key]
                )
                node.excluded_llm_metadata_keys.extend(
                    [self.window_metadata_key, self.original_text_metadata_key]
                )

            all_nodes.extend(nodes)

        return all_nodes

build_window_nodes_from_documents #

build_window_nodes_from_documents(documents: Sequence[Document]) -> List[BaseNode]

Build window nodes from documents.

Source code in llama_index/core/node_parser/text/sentence_window.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def build_window_nodes_from_documents(
    self, documents: Sequence[Document]
) -> List[BaseNode]:
    """Build window nodes from documents."""
    all_nodes: List[BaseNode] = []
    for doc in documents:
        text = doc.text
        text_splits = self.sentence_splitter(text)
        nodes = build_nodes_from_splits(
            text_splits,
            doc,
            id_func=self.id_func,
        )

        # add window to each node
        for i, node in enumerate(nodes):
            window_nodes = nodes[
                max(0, i - self.window_size) : min(
                    i + self.window_size + 1, len(nodes)
                )
            ]

            node.metadata[self.window_metadata_key] = " ".join(
                [n.text for n in window_nodes]
            )
            node.metadata[self.original_text_metadata_key] = node.text

            # exclude window metadata from embed and llm
            node.excluded_embed_metadata_keys.extend(
                [self.window_metadata_key, self.original_text_metadata_key]
            )
            node.excluded_llm_metadata_keys.extend(
                [self.window_metadata_key, self.original_text_metadata_key]
            )

        all_nodes.extend(nodes)

    return all_nodes

TokenTextSplitter #

Bases: MetadataAwareTextSplitter

Implementation of splitting text that looks at word tokens.

Parameters:

Name Type Description Default
chunk_size int

The token chunk size for each chunk.

1024
chunk_overlap int

The token overlap of each chunk when splitting.

20
separator str

Default separator for splitting into words

' '
backup_separators List

Additional separators for splitting.

<dynamic>
keep_whitespaces bool

Whether to keep leading/trailing whitespaces in the chunk.

False
Source code in llama_index/core/node_parser/text/token.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class TokenTextSplitter(MetadataAwareTextSplitter):
    """Implementation of splitting text that looks at word tokens."""

    chunk_size: int = Field(
        default=DEFAULT_CHUNK_SIZE,
        description="The token chunk size for each chunk.",
        gt=0,
    )
    chunk_overlap: int = Field(
        default=DEFAULT_CHUNK_OVERLAP,
        description="The token overlap of each chunk when splitting.",
        ge=0,
    )
    separator: str = Field(
        default=" ", description="Default separator for splitting into words"
    )
    backup_separators: List = Field(
        default_factory=list, description="Additional separators for splitting."
    )

    keep_whitespaces: bool = Field(
        default=False,
        description="Whether to keep leading/trailing whitespaces in the chunk.",
    )

    _tokenizer: Callable = PrivateAttr()
    _split_fns: List[Callable] = PrivateAttr()

    def __init__(
        self,
        chunk_size: int = DEFAULT_CHUNK_SIZE,
        chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
        tokenizer: Optional[Callable] = None,
        callback_manager: Optional[CallbackManager] = None,
        separator: str = " ",
        backup_separators: Optional[List[str]] = ["\n"],
        keep_whitespaces: bool = False,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        id_func: Optional[Callable[[int, Document], str]] = None,
    ):
        """Initialize with parameters."""
        if chunk_overlap > chunk_size:
            raise ValueError(
                f"Got a larger chunk overlap ({chunk_overlap}) than chunk size "
                f"({chunk_size}), should be smaller."
            )
        callback_manager = callback_manager or CallbackManager([])
        id_func = id_func or default_id_func
        super().__init__(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separator=separator,
            backup_separators=backup_separators,
            keep_whitespaces=keep_whitespaces,
            callback_manager=callback_manager,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            id_func=id_func,
        )
        self._tokenizer = tokenizer or get_tokenizer()
        all_seps = [separator] + (backup_separators or [])
        self._split_fns = [split_by_sep(sep) for sep in all_seps] + [split_by_char()]

    @classmethod
    def from_defaults(
        cls,
        chunk_size: int = DEFAULT_CHUNK_SIZE,
        chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
        separator: str = " ",
        backup_separators: Optional[List[str]] = ["\n"],
        callback_manager: Optional[CallbackManager] = None,
        keep_whitespaces: bool = False,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        id_func: Optional[Callable[[int, Document], str]] = None,
    ) -> "TokenTextSplitter":
        """Initialize with default parameters."""
        callback_manager = callback_manager or CallbackManager([])
        return cls(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separator=separator,
            backup_separators=backup_separators,
            keep_whitespaces=keep_whitespaces,
            callback_manager=callback_manager,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            id_func=id_func,
        )

    @classmethod
    def class_name(cls) -> str:
        return "TokenTextSplitter"

    def split_text_metadata_aware(self, text: str, metadata_str: str) -> List[str]:
        """Split text into chunks, reserving space required for metadata str."""
        metadata_len = len(self._tokenizer(metadata_str)) + DEFAULT_METADATA_FORMAT_LEN
        effective_chunk_size = self.chunk_size - metadata_len
        if effective_chunk_size <= 0:
            raise ValueError(
                f"Metadata length ({metadata_len}) is longer than chunk size "
                f"({self.chunk_size}). Consider increasing the chunk size or "
                "decreasing the size of your metadata to avoid this."
            )
        elif effective_chunk_size < 50:
            print(
                f"Metadata length ({metadata_len}) is close to chunk size "
                f"({self.chunk_size}). Resulting chunks are less than 50 tokens. "
                "Consider increasing the chunk size or decreasing the size of "
                "your metadata to avoid this.",
                flush=True,
            )

        return self._split_text(text, chunk_size=effective_chunk_size)

    def split_text(self, text: str) -> List[str]:
        """Split text into chunks."""
        return self._split_text(text, chunk_size=self.chunk_size)

    def _split_text(self, text: str, chunk_size: int) -> List[str]:
        """Split text into chunks up to chunk_size."""
        if text == "":
            return [text]

        with self.callback_manager.event(
            CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]}
        ) as event:
            splits = self._split(text, chunk_size)
            chunks = self._merge(splits, chunk_size)

            event.on_end(
                payload={EventPayload.CHUNKS: chunks},
            )

        return chunks

    def _split(self, text: str, chunk_size: int) -> List[str]:
        """
        Break text into splits that are smaller than chunk size.

        The order of splitting is:
        1. split by separator
        2. split by backup separators (if any)
        3. split by characters

        NOTE: the splits contain the separators.
        """
        if len(self._tokenizer(text)) <= chunk_size:
            return [text]

        for split_fn in self._split_fns:
            splits = split_fn(text)
            if len(splits) > 1:
                break

        new_splits = []
        for split in splits:
            split_len = len(self._tokenizer(split))
            if split_len <= chunk_size:
                new_splits.append(split)
            else:
                # recursively split
                new_splits.extend(self._split(split, chunk_size=chunk_size))
        return new_splits

    def _merge(self, splits: List[str], chunk_size: int) -> List[str]:
        """
        Merge splits into chunks.

        The high-level idea is to keep adding splits to a chunk until we
        exceed the chunk size, then we start a new chunk with overlap.

        When we start a new chunk, we pop off the first element of the previous
        chunk until the total length is less than the chunk size.
        """
        chunks: List[str] = []

        cur_chunk: List[str] = []
        cur_len = 0
        for split in splits:
            split_len = len(self._tokenizer(split))
            if split_len > chunk_size:
                _logger.warning(
                    f"Got a split of size {split_len}, ",
                    f"larger than chunk size {chunk_size}.",
                )

            # if we exceed the chunk size after adding the new split, then
            # we need to end the current chunk and start a new one
            if cur_len + split_len > chunk_size:
                # end the previous chunk
                chunk = (
                    "".join(cur_chunk)
                    if self.keep_whitespaces
                    else "".join(cur_chunk).strip()
                )
                if chunk:
                    chunks.append(chunk)

                # start a new chunk with overlap
                # keep popping off the first element of the previous chunk until:
                #   1. the current chunk length is less than chunk overlap
                #   2. the total length is less than chunk size
                while cur_len > self.chunk_overlap or cur_len + split_len > chunk_size:
                    # pop off the first element
                    first_chunk = cur_chunk.pop(0)
                    cur_len -= len(self._tokenizer(first_chunk))

            cur_chunk.append(split)
            cur_len += split_len

        # handle the last chunk
        chunk = (
            "".join(cur_chunk) if self.keep_whitespaces else "".join(cur_chunk).strip()
        )
        if chunk:
            chunks.append(chunk)

        return chunks

from_defaults classmethod #

from_defaults(chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP, separator: str = ' ', backup_separators: Optional[List[str]] = ['\n'], callback_manager: Optional[CallbackManager] = None, keep_whitespaces: bool = False, include_metadata: bool = True, include_prev_next_rel: bool = True, id_func: Optional[Callable[[int, Document], str]] = None) -> TokenTextSplitter

Initialize with default parameters.

Source code in llama_index/core/node_parser/text/token.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@classmethod
def from_defaults(
    cls,
    chunk_size: int = DEFAULT_CHUNK_SIZE,
    chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
    separator: str = " ",
    backup_separators: Optional[List[str]] = ["\n"],
    callback_manager: Optional[CallbackManager] = None,
    keep_whitespaces: bool = False,
    include_metadata: bool = True,
    include_prev_next_rel: bool = True,
    id_func: Optional[Callable[[int, Document], str]] = None,
) -> "TokenTextSplitter":
    """Initialize with default parameters."""
    callback_manager = callback_manager or CallbackManager([])
    return cls(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separator=separator,
        backup_separators=backup_separators,
        keep_whitespaces=keep_whitespaces,
        callback_manager=callback_manager,
        include_metadata=include_metadata,
        include_prev_next_rel=include_prev_next_rel,
        id_func=id_func,
    )

split_text_metadata_aware #

split_text_metadata_aware(text: str, metadata_str: str) -> List[str]

Split text into chunks, reserving space required for metadata str.

Source code in llama_index/core/node_parser/text/token.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def split_text_metadata_aware(self, text: str, metadata_str: str) -> List[str]:
    """Split text into chunks, reserving space required for metadata str."""
    metadata_len = len(self._tokenizer(metadata_str)) + DEFAULT_METADATA_FORMAT_LEN
    effective_chunk_size = self.chunk_size - metadata_len
    if effective_chunk_size <= 0:
        raise ValueError(
            f"Metadata length ({metadata_len}) is longer than chunk size "
            f"({self.chunk_size}). Consider increasing the chunk size or "
            "decreasing the size of your metadata to avoid this."
        )
    elif effective_chunk_size < 50:
        print(
            f"Metadata length ({metadata_len}) is close to chunk size "
            f"({self.chunk_size}). Resulting chunks are less than 50 tokens. "
            "Consider increasing the chunk size or decreasing the size of "
            "your metadata to avoid this.",
            flush=True,
        )

    return self._split_text(text, chunk_size=effective_chunk_size)

split_text #

split_text(text: str) -> List[str]

Split text into chunks.

Source code in llama_index/core/node_parser/text/token.py
138
139
140
def split_text(self, text: str) -> List[str]:
    """Split text into chunks."""
    return self._split_text(text, chunk_size=self.chunk_size)

get_leaf_nodes #

get_leaf_nodes(nodes: List[BaseNode]) -> List[BaseNode]

Get leaf nodes.

Source code in llama_index/core/node_parser/relational/hierarchical.py
25
26
27
28
29
30
31
def get_leaf_nodes(nodes: List[BaseNode]) -> List[BaseNode]:
    """Get leaf nodes."""
    leaf_nodes = []
    for node in nodes:
        if NodeRelationship.CHILD not in node.relationships:
            leaf_nodes.append(node)
    return leaf_nodes

get_root_nodes #

get_root_nodes(nodes: List[BaseNode]) -> List[BaseNode]

Get root nodes.

Source code in llama_index/core/node_parser/relational/hierarchical.py
34
35
36
37
38
39
40
def get_root_nodes(nodes: List[BaseNode]) -> List[BaseNode]:
    """Get root nodes."""
    root_nodes = []
    for node in nodes:
        if NodeRelationship.PARENT not in node.relationships:
            root_nodes.append(node)
    return root_nodes

get_child_nodes #

get_child_nodes(nodes: List[BaseNode], all_nodes: List[BaseNode]) -> List[BaseNode]

Get child nodes of nodes from given all_nodes.

Source code in llama_index/core/node_parser/relational/hierarchical.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def get_child_nodes(nodes: List[BaseNode], all_nodes: List[BaseNode]) -> List[BaseNode]:
    """Get child nodes of nodes from given all_nodes."""
    children_ids = []
    for node in nodes:
        if NodeRelationship.CHILD not in node.relationships:
            continue

        children_ids.extend([r.node_id for r in (node.child_nodes or [])])

    child_nodes = []
    for candidate_node in all_nodes:
        if candidate_node.node_id not in children_ids:
            continue
        child_nodes.append(candidate_node)

    return child_nodes

get_deeper_nodes #

get_deeper_nodes(nodes: List[BaseNode], depth: int = 1) -> List[BaseNode]

Get children of root nodes in given nodes that have given depth.

Source code in llama_index/core/node_parser/relational/hierarchical.py
61
62
63
64
65
66
67
68
69
70
71
72
73
def get_deeper_nodes(nodes: List[BaseNode], depth: int = 1) -> List[BaseNode]:
    """Get children of root nodes in given nodes that have given depth."""
    if depth < 0:
        raise ValueError("Depth cannot be a negative number!")
    root_nodes = get_root_nodes(nodes)
    if not root_nodes:
        raise ValueError("There is no root nodes in given nodes!")

    deeper_nodes = root_nodes
    for _ in range(depth):
        deeper_nodes = get_child_nodes(deeper_nodes, nodes)

    return deeper_nodes

options: members: - SentenceSplitter