Skip to content

Commit f37ad61

Browse files
authored
Merge pull request #1128 from airweave-ai/fix/mistral-error-handling
Fix: Mistral error handling
2 parents e5f51fe + ed1cb7f commit f37ad61

File tree

2 files changed

+86
-46
lines changed

2 files changed

+86
-46
lines changed

backend/airweave/platform/converters/mistral_converter.py

Lines changed: 74 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import os
77
import tempfile
88
import time
9-
from typing import Dict, List
9+
from typing import Dict, List, Optional
1010

1111
from httpx import HTTPStatusError, ReadTimeout, TimeoutException
1212
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
@@ -940,12 +940,77 @@ async def _download_batch_results( # noqa: C901
940940
logger.debug(f"Parsed {len(upload_key_results)} OCR results from batch")
941941
return upload_key_results
942942

943+
@staticmethod
944+
def _warn_if_empty_markdown(
945+
markdown: Optional[str],
946+
file_name: str,
947+
chunk_idx: Optional[int] = None,
948+
total_chunks: Optional[int] = None,
949+
):
950+
if isinstance(markdown, str) and not markdown.strip():
951+
if chunk_idx is None:
952+
logger.warning(f"OCR returned empty markdown for {file_name}")
953+
else:
954+
logger.warning(
955+
f"OCR returned empty markdown for chunk {chunk_idx}/{total_chunks} "
956+
f"of {file_name}"
957+
)
958+
959+
def _process_single_chunk_result(
960+
self,
961+
upload_key_results: Dict[str, str],
962+
unique_key: str,
963+
chunk_path: str,
964+
file_name: str,
965+
) -> Optional[str]:
966+
upload_key = f"{unique_key}__chunk_{chunk_path}"
967+
markdown = upload_key_results.get(upload_key)
968+
969+
if markdown is None:
970+
logger.error(
971+
f"Conversion failed for {file_name}: Upload or OCR failed "
972+
f"(check logs above for details)"
973+
)
974+
return None
975+
976+
self._warn_if_empty_markdown(markdown, file_name)
977+
return markdown
978+
979+
def _process_multi_chunk_result(
980+
self,
981+
upload_key_results: Dict[str, str],
982+
unique_key: str,
983+
chunk_paths: List[str],
984+
file_name: str,
985+
) -> Optional[str]:
986+
combined = ""
987+
988+
for idx, chunk_path in enumerate(chunk_paths, start=1):
989+
upload_key = f"{unique_key}__chunk_{chunk_path}"
990+
chunk_md = upload_key_results.get(upload_key)
991+
992+
if chunk_md is None:
993+
logger.error(
994+
f"Conversion failed for {file_name}: Chunk {idx}/{len(chunk_paths)} "
995+
f"upload or OCR failed"
996+
)
997+
return None
998+
999+
self._warn_if_empty_markdown(chunk_md, file_name, idx, len(chunk_paths))
1000+
1001+
if idx > 1:
1002+
combined += "\n\n---\n\n"
1003+
combined += chunk_md
1004+
1005+
logger.debug(f"Successfully combined {len(chunk_paths)} chunks for {file_name}")
1006+
return combined
1007+
9431008
async def _combine_chunk_results(
9441009
self,
9451010
upload_key_results: Dict[str, str],
9461011
file_chunks_map: Dict[str, List[str]],
9471012
original_file_paths: List[str],
948-
) -> Dict[str, str]:
1013+
) -> Dict[str, str]: # noqa: C901
9491014
"""Combine chunk markdown back to original files.
9501015
9511016
Args:
@@ -968,53 +1033,16 @@ async def _combine_chunk_results(
9681033
for unique_key, chunk_paths in file_chunks_map.items():
9691034
# Strip __batch_idx_X suffix to get original path
9701035
original_path = unique_key.split("__batch_idx_")[0]
1036+
file_name = os.path.basename(original_path)
9711037

9721038
if len(chunk_paths) == 1:
973-
# No splitting occurred
974-
chunk_path = chunk_paths[0]
975-
upload_key = f"{unique_key}__chunk_{chunk_path}"
976-
markdown = upload_key_results.get(upload_key)
977-
978-
if markdown:
979-
final_results[original_path] = markdown
980-
else:
981-
# Explicit: OCR/upload failed for this file
982-
logger.error(
983-
f"Conversion failed for {os.path.basename(original_path)}: "
984-
f"Upload or OCR failed (check logs above for details)"
985-
)
986-
final_results[original_path] = None
1039+
final_results[original_path] = self._process_single_chunk_result(
1040+
upload_key_results, unique_key, chunk_paths[0], file_name
1041+
)
9871042
else:
988-
# Recombine chunks
989-
combined = ""
990-
all_chunks_ok = True
991-
992-
for i, chunk_path in enumerate(chunk_paths):
993-
upload_key = f"{unique_key}__chunk_{chunk_path}"
994-
chunk_md = upload_key_results.get(upload_key)
995-
996-
if not chunk_md:
997-
# Explicit: This chunk failed
998-
logger.error(
999-
f"Conversion failed for {os.path.basename(original_path)}: "
1000-
f"Chunk {i + 1}/{len(chunk_paths)} upload or OCR failed"
1001-
)
1002-
all_chunks_ok = False
1003-
break
1004-
1005-
if i > 0:
1006-
combined += "\n\n---\n\n" # Chunk separator
1007-
combined += chunk_md
1008-
1009-
if all_chunks_ok:
1010-
final_results[original_path] = combined
1011-
logger.debug(
1012-
f"Successfully combined {len(chunk_paths)} chunks for "
1013-
f"{os.path.basename(original_path)}"
1014-
)
1015-
else:
1016-
# Already logged error above
1017-
final_results[original_path] = None
1043+
final_results[original_path] = self._process_multi_chunk_result(
1044+
upload_key_results, unique_key, chunk_paths, file_name
1045+
)
10181046

10191047
# Mark files that failed during preparation as None
10201048
for path in original_file_paths:

backend/airweave/platform/sync/entity_pipeline.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,18 @@ async def _build_metadata(entity: BaseEntity):
10141014
except SyncFailureError:
10151015
# Infrastructure failure from converter - propagate to fail entire sync
10161016
raise
1017+
except EntityProcessingError as e:
1018+
# Recoverable converter issue - skip sub-batch without error log
1019+
converter_name = converter.__class__.__name__
1020+
sync_context.logger.warning(
1021+
f"Batch conversion skipped for {converter_name} sub-batch: {e}"
1022+
)
1023+
failed_entities.extend(sub_batch)
1024+
for entity in sub_batch:
1025+
sync_context.logger.warning(
1026+
f"Skipping {entity.__class__.__name__}[{entity.entity_id}] "
1027+
f"due to recoverable converter error"
1028+
)
10171029
except Exception as e:
10181030
# Unexpected errors - mark entire sub-batch as failed but continue
10191031
converter_name = converter.__class__.__name__

0 commit comments

Comments
 (0)