Skip to content

vllm.tool_parsers.deepseekv32_tool_parser

DeepSeekV32ToolParser

Bases: ToolParser

example tool call content: <|DSML|function_calls> <|DSML|invoke name="get_weather"> <|DSML|parameter name="location" string="true">杭州</|DSML|parameter> <|DSML|parameter name="date" string="true">2024-01-16</|DSML|parameter> </|DSML|invoke> <|DSML|invoke name="get_weather"> <|DSML|parameter name="location" string="true">北京</|DSML|parameter> <|DSML|parameter name="date" string="true">2024-01-16</|DSML|parameter> </|DSML|invoke> </|DSML|function_calls>

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
class DeepSeekV32ToolParser(ToolParser):
    """
    example tool call content:
    <|DSML|function_calls>
    <|DSML|invoke name="get_weather">
    <|DSML|parameter name="location" string="true">杭州</|DSML|parameter>
    <|DSML|parameter name="date" string="true">2024-01-16</|DSML|parameter>
    </|DSML|invoke>
    <|DSML|invoke name="get_weather">
    <|DSML|parameter name="location" string="true">北京</|DSML|parameter>
    <|DSML|parameter name="date" string="true">2024-01-16</|DSML|parameter>
    </|DSML|invoke>
    </|DSML|function_calls>
    """

    def __init__(self, tokenizer: TokenizerLike):
        super().__init__(tokenizer)

        self.prev_tool_call_arr: list[dict] = []

        # Sentinel token
        self.tool_call_start_token: str = "<|DSML|function_calls>"

        # Streaming state
        self.is_tool_call_started: bool = False
        self.current_tool_index: int = 0

        # Regex patterns for complete parsing
        self.tool_call_complete_regex = re.compile(
            r"<|DSML|function_calls>(.*?)</|DSML|function_calls>", re.DOTALL
        )
        self.invoke_complete_regex = re.compile(
            r'<|DSML|invoke\s+name="([^"]+)"\s*>(.*?)</|DSML|invoke>', re.DOTALL
        )
        self.parameter_complete_regex = re.compile(
            r'<|DSML|parameter\s+name="([^"]+)"\s+string="(?:true|false)"\s*>(.*?)</|DSML|parameter>',
            re.DOTALL,
        )

        if not self.model_tokenizer:
            raise ValueError(
                "The model tokenizer must be passed to the ToolParser "
                "constructor during construction."
            )

        logger.debug(
            "vLLM Successfully import tool parser %s !", self.__class__.__name__
        )

    def adjust_request(self, request):
        request = super().adjust_request(request)
        if request.tools and request.tool_choice != "none":
            # Ensure tool call tokens
            # (<|DSML|function_calls>, </|DSML|function_calls>)
            # are not skippedduring decoding.
            # Even though they are not marked as special tokens,
            # setting skip_special_tokens=False ensures proper handling in
            # transformers 5.x where decoding behavior may have changed.
            request.skip_special_tokens = False
        return request

    def _generate_tool_call_id(self) -> str:
        """Generate a unique tool call ID."""
        return f"call_{uuid.uuid4().hex[:24]}"

    def _parse_invoke_params(self, invoke_str: str) -> dict:
        param_dict = dict()
        for param_name, param_val in self.parameter_complete_regex.findall(invoke_str):
            param_dict[param_name] = param_val
        return param_dict

    def _convert_param_value(self, value: str, param_type: str) -> Any:
        """Convert parameter value to the correct type."""
        if value.lower() == "null":
            return None

        param_type = param_type.lower()
        if param_type in ["string", "str", "text"]:
            return value
        elif param_type in ["integer", "int"]:
            try:
                return int(value)
            except (ValueError, TypeError):
                return value
        elif param_type in ["number", "float"]:
            try:
                val = float(value)
                return val if val != int(val) else int(val)
            except (ValueError, TypeError):
                return value
        elif param_type in ["boolean", "bool"]:
            return value.lower() in ["true", "1"]
        elif param_type in ["object", "array"]:
            try:
                return json.loads(value)
            except json.JSONDecodeError:
                return value
        else:
            # Try JSON parse first, fallback to string
            try:
                return json.loads(value)
            except json.JSONDecodeError:
                return value

    def _convert_params_with_schema(
        self,
        function_name: str,
        param_dict: dict[str, str],
        request: ChatCompletionRequest | None,
    ) -> dict[str, Any]:
        """Convert raw string param values using the tool schema types."""
        param_config: dict = {}
        if request and request.tools:
            for tool in request.tools:
                if (
                    hasattr(tool, "function")
                    and tool.function.name == function_name
                    and hasattr(tool.function, "parameters")
                ):
                    schema = tool.function.parameters
                    if isinstance(schema, dict) and "properties" in schema:
                        param_config = schema["properties"]
                    break

        converted: dict[str, Any] = {}
        for name, value in param_dict.items():
            param_type = "string"
            if name in param_config and isinstance(param_config[name], dict):
                param_type = param_config[name].get("type", "string")
            converted[name] = self._convert_param_value(value, param_type)
        return converted

    def extract_tool_calls(
        self,
        model_output: str,
        request: ChatCompletionRequest,
    ) -> ExtractedToolCallInformation:
        """Extract tool calls from complete model output (non-streaming)."""
        # Quick check
        if self.tool_call_start_token not in model_output:
            return ExtractedToolCallInformation(
                tools_called=False, tool_calls=[], content=model_output
            )

        try:
            tool_calls = []

            # Find all complete tool_call blocks
            for tool_call_match in self.tool_call_complete_regex.findall(model_output):
                # Find all invokes within this tool_call
                for invoke_name, invoke_content in self.invoke_complete_regex.findall(
                    tool_call_match
                ):
                    param_dict = self._parse_invoke_params(invoke_content)
                    tool_calls.append(
                        ToolCall(
                            type="function",
                            function=FunctionCall(
                                name=invoke_name,
                                arguments=json.dumps(param_dict, ensure_ascii=False),
                            ),
                        )
                    )

            if not tool_calls:
                return ExtractedToolCallInformation(
                    tools_called=False, tool_calls=[], content=model_output
                )

            # Extract content before first tool call
            first_tool_idx = model_output.find(self.tool_call_start_token)
            content = model_output[:first_tool_idx] if first_tool_idx > 0 else None

            return ExtractedToolCallInformation(
                tools_called=True, tool_calls=tool_calls, content=content
            )

        except Exception:
            logger.exception("Error extracting tool calls")
            return ExtractedToolCallInformation(
                tools_called=False, tool_calls=[], content=model_output
            )

    def _reset_streaming_state(self):
        """Reset all streaming state."""
        self.current_tool_index = 0
        self.is_tool_call_started = False
        self.prev_tool_call_arr.clear()
        self.streamed_args_for_tool.clear()

    def _extract_delta_tool_calls(
        self,
        current_text: str,
        request: ChatCompletionRequest | None,
    ) -> list[DeltaToolCall]:
        """Extract DeltaToolCalls from newly completed <invoke> blocks.

        Tracks progress via ``current_tool_index`` so each block is
        extracted exactly once across successive streaming calls.
        """
        complete_invokes = self.invoke_complete_regex.findall(current_text)
        delta_tool_calls: list[DeltaToolCall] = []

        while len(complete_invokes) > self.current_tool_index:
            invoke_name, invoke_body = complete_invokes[self.current_tool_index]
            param_dict = self._parse_invoke_params(invoke_body)

            converted = self._convert_params_with_schema(
                invoke_name, param_dict, request
            )
            args_json = json.dumps(converted, ensure_ascii=False)
            idx = self.current_tool_index
            self.current_tool_index += 1

            self.prev_tool_call_arr.append(
                {"name": invoke_name, "arguments": converted}
            )
            self.streamed_args_for_tool.append(args_json)

            delta_tool_calls.append(
                DeltaToolCall(
                    index=idx,
                    id=self._generate_tool_call_id(),
                    function=DeltaFunctionCall(
                        name=invoke_name,
                        arguments=args_json,
                    ),
                    type="function",
                )
            )

        return delta_tool_calls

    def extract_tool_calls_streaming(
        self,
        previous_text: str,
        current_text: str,
        delta_text: str,
        previous_token_ids: Sequence[int],  # pylint: disable=unused-argument
        current_token_ids: Sequence[int],  # pylint: disable=unused-argument
        delta_token_ids: Sequence[int],
        request: ChatCompletionRequest,
    ) -> DeltaMessage | None:
        """Extract tool calls from streaming model output.

        Uses a buffer-until-complete-invoke strategy: tokens are buffered
        until a complete invoke block is available, then parsed and emitted
        in one shot.
        """

        # First chunk of a new stream — reset state from prior request.
        if not previous_text:
            self._reset_streaming_state()

        # Detect whether we've entered the tool-call region.
        # Use current_text (not delta_text) since the start token may
        # be split across chunks.
        content_before = None
        if self.is_tool_call_started:
            pass
        elif self.tool_call_start_token in current_text:
            # Tool-call region found, capture any plain text before it.
            self.is_tool_call_started = True
            start_idx = current_text.index(self.tool_call_start_token)
            content_before = current_text[len(previous_text) : start_idx] or None
        else:
            # Still in plain-text region, forward as content.
            return DeltaMessage(content=delta_text) if delta_text else None

        # Inside tool-call region: emit any newly completed invokes.
        delta_tool_calls = self._extract_delta_tool_calls(current_text, request)

        if delta_tool_calls or content_before:
            return DeltaMessage(
                content=content_before,
                tool_calls=delta_tool_calls,
            )

        # Empty delta with token ids means EOS or closing tag; return
        # non-None so the serving framework can finalize finish_reason.
        if not delta_text and delta_token_ids and self.prev_tool_call_arr:
            return DeltaMessage(content="")

        return None

_convert_param_value

_convert_param_value(value: str, param_type: str) -> Any

Convert parameter value to the correct type.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def _convert_param_value(self, value: str, param_type: str) -> Any:
    """Convert parameter value to the correct type."""
    if value.lower() == "null":
        return None

    param_type = param_type.lower()
    if param_type in ["string", "str", "text"]:
        return value
    elif param_type in ["integer", "int"]:
        try:
            return int(value)
        except (ValueError, TypeError):
            return value
    elif param_type in ["number", "float"]:
        try:
            val = float(value)
            return val if val != int(val) else int(val)
        except (ValueError, TypeError):
            return value
    elif param_type in ["boolean", "bool"]:
        return value.lower() in ["true", "1"]
    elif param_type in ["object", "array"]:
        try:
            return json.loads(value)
        except json.JSONDecodeError:
            return value
    else:
        # Try JSON parse first, fallback to string
        try:
            return json.loads(value)
        except json.JSONDecodeError:
            return value

_convert_params_with_schema

_convert_params_with_schema(
    function_name: str,
    param_dict: dict[str, str],
    request: ChatCompletionRequest | None,
) -> dict[str, Any]

Convert raw string param values using the tool schema types.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def _convert_params_with_schema(
    self,
    function_name: str,
    param_dict: dict[str, str],
    request: ChatCompletionRequest | None,
) -> dict[str, Any]:
    """Convert raw string param values using the tool schema types."""
    param_config: dict = {}
    if request and request.tools:
        for tool in request.tools:
            if (
                hasattr(tool, "function")
                and tool.function.name == function_name
                and hasattr(tool.function, "parameters")
            ):
                schema = tool.function.parameters
                if isinstance(schema, dict) and "properties" in schema:
                    param_config = schema["properties"]
                break

    converted: dict[str, Any] = {}
    for name, value in param_dict.items():
        param_type = "string"
        if name in param_config and isinstance(param_config[name], dict):
            param_type = param_config[name].get("type", "string")
        converted[name] = self._convert_param_value(value, param_type)
    return converted

_extract_delta_tool_calls

_extract_delta_tool_calls(
    current_text: str, request: ChatCompletionRequest | None
) -> list[DeltaToolCall]

Extract DeltaToolCalls from newly completed blocks.

Tracks progress via current_tool_index so each block is extracted exactly once across successive streaming calls.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def _extract_delta_tool_calls(
    self,
    current_text: str,
    request: ChatCompletionRequest | None,
) -> list[DeltaToolCall]:
    """Extract DeltaToolCalls from newly completed <invoke> blocks.

    Tracks progress via ``current_tool_index`` so each block is
    extracted exactly once across successive streaming calls.
    """
    complete_invokes = self.invoke_complete_regex.findall(current_text)
    delta_tool_calls: list[DeltaToolCall] = []

    while len(complete_invokes) > self.current_tool_index:
        invoke_name, invoke_body = complete_invokes[self.current_tool_index]
        param_dict = self._parse_invoke_params(invoke_body)

        converted = self._convert_params_with_schema(
            invoke_name, param_dict, request
        )
        args_json = json.dumps(converted, ensure_ascii=False)
        idx = self.current_tool_index
        self.current_tool_index += 1

        self.prev_tool_call_arr.append(
            {"name": invoke_name, "arguments": converted}
        )
        self.streamed_args_for_tool.append(args_json)

        delta_tool_calls.append(
            DeltaToolCall(
                index=idx,
                id=self._generate_tool_call_id(),
                function=DeltaFunctionCall(
                    name=invoke_name,
                    arguments=args_json,
                ),
                type="function",
            )
        )

    return delta_tool_calls

_generate_tool_call_id

_generate_tool_call_id() -> str

Generate a unique tool call ID.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def _generate_tool_call_id(self) -> str:
    """Generate a unique tool call ID."""
    return f"call_{uuid.uuid4().hex[:24]}"

_reset_streaming_state

_reset_streaming_state()

Reset all streaming state.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def _reset_streaming_state(self):
    """Reset all streaming state."""
    self.current_tool_index = 0
    self.is_tool_call_started = False
    self.prev_tool_call_arr.clear()
    self.streamed_args_for_tool.clear()

extract_tool_calls

extract_tool_calls(
    model_output: str, request: ChatCompletionRequest
) -> ExtractedToolCallInformation

Extract tool calls from complete model output (non-streaming).

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def extract_tool_calls(
    self,
    model_output: str,
    request: ChatCompletionRequest,
) -> ExtractedToolCallInformation:
    """Extract tool calls from complete model output (non-streaming)."""
    # Quick check
    if self.tool_call_start_token not in model_output:
        return ExtractedToolCallInformation(
            tools_called=False, tool_calls=[], content=model_output
        )

    try:
        tool_calls = []

        # Find all complete tool_call blocks
        for tool_call_match in self.tool_call_complete_regex.findall(model_output):
            # Find all invokes within this tool_call
            for invoke_name, invoke_content in self.invoke_complete_regex.findall(
                tool_call_match
            ):
                param_dict = self._parse_invoke_params(invoke_content)
                tool_calls.append(
                    ToolCall(
                        type="function",
                        function=FunctionCall(
                            name=invoke_name,
                            arguments=json.dumps(param_dict, ensure_ascii=False),
                        ),
                    )
                )

        if not tool_calls:
            return ExtractedToolCallInformation(
                tools_called=False, tool_calls=[], content=model_output
            )

        # Extract content before first tool call
        first_tool_idx = model_output.find(self.tool_call_start_token)
        content = model_output[:first_tool_idx] if first_tool_idx > 0 else None

        return ExtractedToolCallInformation(
            tools_called=True, tool_calls=tool_calls, content=content
        )

    except Exception:
        logger.exception("Error extracting tool calls")
        return ExtractedToolCallInformation(
            tools_called=False, tool_calls=[], content=model_output
        )

extract_tool_calls_streaming

extract_tool_calls_streaming(
    previous_text: str,
    current_text: str,
    delta_text: str,
    previous_token_ids: Sequence[int],
    current_token_ids: Sequence[int],
    delta_token_ids: Sequence[int],
    request: ChatCompletionRequest,
) -> DeltaMessage | None

Extract tool calls from streaming model output.

Uses a buffer-until-complete-invoke strategy: tokens are buffered until a complete invoke block is available, then parsed and emitted in one shot.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def extract_tool_calls_streaming(
    self,
    previous_text: str,
    current_text: str,
    delta_text: str,
    previous_token_ids: Sequence[int],  # pylint: disable=unused-argument
    current_token_ids: Sequence[int],  # pylint: disable=unused-argument
    delta_token_ids: Sequence[int],
    request: ChatCompletionRequest,
) -> DeltaMessage | None:
    """Extract tool calls from streaming model output.

    Uses a buffer-until-complete-invoke strategy: tokens are buffered
    until a complete invoke block is available, then parsed and emitted
    in one shot.
    """

    # First chunk of a new stream — reset state from prior request.
    if not previous_text:
        self._reset_streaming_state()

    # Detect whether we've entered the tool-call region.
    # Use current_text (not delta_text) since the start token may
    # be split across chunks.
    content_before = None
    if self.is_tool_call_started:
        pass
    elif self.tool_call_start_token in current_text:
        # Tool-call region found, capture any plain text before it.
        self.is_tool_call_started = True
        start_idx = current_text.index(self.tool_call_start_token)
        content_before = current_text[len(previous_text) : start_idx] or None
    else:
        # Still in plain-text region, forward as content.
        return DeltaMessage(content=delta_text) if delta_text else None

    # Inside tool-call region: emit any newly completed invokes.
    delta_tool_calls = self._extract_delta_tool_calls(current_text, request)

    if delta_tool_calls or content_before:
        return DeltaMessage(
            content=content_before,
            tool_calls=delta_tool_calls,
        )

    # Empty delta with token ids means EOS or closing tag; return
    # non-None so the serving framework can finalize finish_reason.
    if not delta_text and delta_token_ids and self.prev_tool_call_arr:
        return DeltaMessage(content="")

    return None