From 96421d4acbfbcc49d9e409c469dcf5fdb22c0137 Mon Sep 17 00:00:00 2001 From: LittleMouse Date: Tue, 18 Feb 2025 20:31:00 +0800 Subject: [PATCH] [perf] perf inference --- api_server.py | 81 +++++++++++++++++++++++++-------------------------- 1 file changed, 39 insertions(+), 42 deletions(-) diff --git a/api_server.py b/api_server.py index d99a3e1..b4000d0 100644 --- a/api_server.py +++ b/api_server.py @@ -168,26 +168,30 @@ class LlmClientBackend(BaseModelBackend): self._inference_executor = ThreadPoolExecutor(max_workers=self.POOL_SIZE) self._active_tasks = weakref.WeakSet() - async def _parse_content(self, content: Union[str, List[ContentItem]]) -> str: + async def _parse_content(self, content: Union[str, List[ContentItem]], base64_images: list) -> str: + text_parts = [] + if isinstance(content, list): - text_parts = [] for item in content: if item.type == "text" and item.text: - text_parts.append(item.text) + text_parts.append(item.text.strip()) elif item.type == "image_url" and item.image_url: url = item.image_url.get("url", "") if url.startswith("data:image"): base64_data = url.split(",", 1)[1] + base64_images.append(base64_data) else: base64_str = await self.download_image(url) if base64_str: - pass - return " ".join(text_parts) - return str(content) + base64_images.append(base64_str) + else: + text_parts.append(str(content).strip()) + + return " ".join(text_parts).strip() async def _get_client(self, request): try: - await asyncio.wait_for(self._pool_lock.acquire(), timeout=5.0) + await asyncio.wait_for(self._pool_lock.acquire(), timeout=30.0) if self._client_pool: client = self._client_pool.pop() @@ -197,7 +201,7 @@ class LlmClientBackend(BaseModelBackend): if len(self._active_clients) >= self.POOL_SIZE: raise RuntimeError("Connection pool exhausted") - self.logger.debug("🆕 Creating new LLM client") + self.logger.debug("Creating new LLM client") client = LLMClient( host=self.config["host"], port=self.config["port"] @@ -208,7 +212,7 @@ class LlmClientBackend(BaseModelBackend): (m.content for m in request.messages if m.role == "system"), self.config.get("system_prompt", "You are a helpful assistant") ) - parsed_prompt = await self._parse_content(system_content) + parsed_prompt = await self._parse_content(system_content, []) loop = asyncio.get_event_loop() await loop.run_in_executor( @@ -223,10 +227,7 @@ class LlmClientBackend(BaseModelBackend): "max_token_len": request.max_tokens, "temperature": request.temperature, "top_p": request.top_p, - "prompt": next( - (m.content for m in request.messages if m.role == "system"), - self.config.get("system_prompt", "You are a helpful assistant") - ) + "prompt": parsed_prompt } ) ) @@ -239,7 +240,7 @@ class LlmClientBackend(BaseModelBackend): self._client_pool.append(client) self.logger.debug(f"Returned client to pool | ID:{id(client)}") - async def inference_stream(self, query: str, request: ChatCompletionRequest): + async def inference_stream(self, query: str, base64_images: list, request: ChatCompletionRequest): client = await self._get_client(request) task = asyncio.current_task() self._active_tasks.add(task) @@ -247,6 +248,8 @@ class LlmClientBackend(BaseModelBackend): self.logger.debug(f"Starting inference | ClientID:{id(client)} Query length:{len(query)}") loop = asyncio.get_event_loop() + message = client.send_jpeg(base64_images[0], object_type="vlm.jpeg.base64") + print(f"jpeg date:{message}") sync_gen = client.inference_stream( query, object_type="llm.utf-8" @@ -319,44 +322,38 @@ class LlmClientBackend(BaseModelBackend): truncated_messages = self._truncate_history(request.messages) query_lines = [] - isJpeg = False - isUrl = False - base64_data = "" - base64_str = "" - + base64_images = [] + system_prompt = "" + for m in truncated_messages: if m.role == "system": + system_content = await self._parse_content(m.content, base64_images) + system_prompt += f"{system_content}\n" continue - if isinstance(m.content, list): - text_parts = [] - for item in m.content: - if item.type == "text": - text_parts.append(item.text) - elif item.type == "image_url": - url = item.image_url.get("url", "") - if url.startswith("data:image"): - base64_data = url.split(",", 1)[1] - else: - base64_str = await self.download_image(url) - if base64_str: - pass - combined_content = " ".join(text_parts) - query_lines.append(f"{m.role}: {combined_content}") - else: - query_lines.append(f"{m.role}: {m.content}") - - query = "\n".join(query_lines) + message_content = await self._parse_content(m.content, base64_images) + if message_content: + query_lines.append(f"{m.role}: {message_content}") + + final_query = [] + if system_prompt: + final_query.append(system_prompt.strip()) + if base64_images: + pass + # final_query.append("\n".join([f"[IMAGE:{img[:20]}...]" for img in base64_images])) + final_query.append("\n".join(query_lines)) + query = "\n\n".join(filter(None, final_query)) + self.logger.debug( - f"Context truncated: Original {len(request.messages)} → Kept {len(truncated_messages)} " - f"Total length:{len(query)} chars" + f"Processed query | System prompt: {len(system_prompt)} chars | " + f"Images: {len(base64_images)} | Dialogue lines: {len(query_lines)}" ) if request.stream: async def chunk_generator(): try: - async for chunk in self.inference_stream(query, request): + async for chunk in self.inference_stream(query, base64_images, request): yield { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", @@ -388,7 +385,7 @@ class LlmClientBackend(BaseModelBackend): return chunk_generator() else: full_response = "" - async for chunk in self.inference_stream(query, request): + async for chunk in self.inference_stream(query, base64_images, request): full_response += chunk return { "id": f"chatcmpl-{uuid.uuid4()}",