Skip to content

Commit c752f55

Browse files
authored
Merge pull request #1131 from airweave-ai/feat/cloud-agnostic-storage
feat: replace Azure storage with cloud-agnostic filesystem backend
2 parents 25199c3 + 1b95926 commit c752f55

File tree

2 files changed

+41
-292
lines changed

2 files changed

+41
-292
lines changed

backend/airweave/core/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ class Settings(BaseSettings):
122122
QDRANT_PORT: Optional[int] = None
123123
TEXT2VEC_INFERENCE_URL: str = "http://localhost:9878"
124124

125+
# Storage configuration (filesystem-based, cloud-agnostic)
126+
STORAGE_PATH: str = "./local_storage" # In K8s: /data/airweave-storage (PVC mount)
127+
125128
OPENAI_API_KEY: Optional[str] = None
126129
ANTHROPIC_API_KEY: Optional[str] = None
127130
MISTRAL_API_KEY: Optional[str] = None

backend/airweave/platform/storage/storage_client.py

Lines changed: 38 additions & 292 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1-
"""Azure Storage client with environment-aware configuration."""
1+
"""Cloud-agnostic storage client using filesystem (local or PVC-mounted).
2+
3+
The storage client uses a simple filesystem backend that works everywhere:
4+
- Local development: ./local_storage directory
5+
- Kubernetes with PVC: /data/airweave-storage (mounted from PVC)
6+
- Any environment: Configured via settings.STORAGE_PATH
7+
8+
No cloud-specific SDKs required - pure filesystem operations.
9+
"""
210

3-
import os
411
from abc import ABC, abstractmethod
512
from pathlib import Path
613
from typing import BinaryIO, List, Optional
714

8-
from azure.core.exceptions import ClientAuthenticationError, ResourceNotFoundError
9-
from azure.identity import DefaultAzureCredential
10-
from azure.storage.blob import BlobServiceClient
11-
1215
from airweave.core.config import settings
1316
from airweave.core.logging import ContextualLogger, logger
1417

@@ -50,171 +53,17 @@ async def file_exists(
5053
pass
5154

5255

53-
class AzureStorageBackend(StorageBackend):
54-
"""Azure Blob Storage backend implementation."""
55-
56-
def __init__(self, blob_service_client: BlobServiceClient):
57-
"""Initialize Azure storage backend.
58-
59-
Args:
60-
blob_service_client: Azure BlobServiceClient instance
61-
"""
62-
self.client = blob_service_client
63-
64-
async def list_containers(self, logger: ContextualLogger) -> List[str]:
65-
"""List all containers in the storage account.
66-
67-
Returns:
68-
List of container names
69-
70-
Raises:
71-
Exception: If listing fails
72-
"""
73-
try:
74-
containers = [c.name for c in self.client.list_containers()]
75-
logger.with_context(containers=containers).debug(f"Listed {len(containers)} containers")
76-
return containers
77-
except Exception as e:
78-
logger.error(f"Failed to list containers: {e}")
79-
raise
80-
81-
async def upload_file(
82-
self, logger: ContextualLogger, container_name: str, blob_name: str, data: BinaryIO
83-
) -> bool:
84-
"""Upload a file to Azure Blob Storage.
85-
86-
Args:
87-
logger: The logger to use
88-
container_name: Name of the container
89-
blob_name: Name of the blob
90-
data: File data to upload
56+
class FilesystemStorageBackend(StorageBackend):
57+
"""Filesystem storage backend (local disk or PVC-mounted).
9158
92-
Returns:
93-
True if successful
94-
95-
Raises:
96-
Exception: If upload fails
97-
"""
98-
try:
99-
container_client = self.client.get_container_client(container_name)
100-
blob_client = container_client.get_blob_client(blob_name)
101-
blob_client.upload_blob(data, overwrite=True)
102-
logger.with_context(
103-
container=container_name,
104-
blob=blob_name,
105-
).info("Uploaded blob successfully")
106-
return True
107-
except Exception as e:
108-
logger.with_context(
109-
container=container_name,
110-
blob=blob_name,
111-
).error(f"Failed to upload blob: {e}")
112-
raise
113-
114-
async def download_file(
115-
self, logger: ContextualLogger, container_name: str, blob_name: str
116-
) -> Optional[bytes]:
117-
"""Download a file from Azure Blob Storage.
118-
119-
Args:
120-
logger: The logger to use
121-
container_name: Name of the container
122-
blob_name: Name of the blob
123-
124-
Returns:
125-
File content as bytes, or None if not found
126-
127-
Raises:
128-
Exception: If download fails (except for not found)
129-
"""
130-
try:
131-
container_client = self.client.get_container_client(container_name)
132-
blob_client = container_client.get_blob_client(blob_name)
133-
data = blob_client.download_blob().readall()
134-
logger.with_context(
135-
container=container_name,
136-
blob=blob_name,
137-
size=len(data),
138-
).info("Downloaded blob successfully")
139-
return data
140-
except ResourceNotFoundError:
141-
logger.with_context(container=container_name, blob=blob_name).warning("Blob not found")
142-
return None
143-
except Exception as e:
144-
logger.with_context(
145-
container=container_name,
146-
blob=blob_name,
147-
).error(f"Failed to download blob: {e}")
148-
raise
149-
150-
async def delete_file(
151-
self, logger: ContextualLogger, container_name: str, blob_name: str
152-
) -> bool:
153-
"""Delete a file from Azure Blob Storage.
154-
155-
Args:
156-
logger: The logger to use
157-
container_name: Name of the container
158-
blob_name: Name of the blob
159-
160-
Returns:
161-
True if successful
162-
163-
Raises:
164-
Exception: If deletion fails
165-
"""
166-
try:
167-
container_client = self.client.get_container_client(container_name)
168-
blob_client = container_client.get_blob_client(blob_name)
169-
blob_client.delete_blob()
170-
logger.with_context(
171-
container=container_name,
172-
blob=blob_name,
173-
).info("Deleted blob successfully")
174-
return True
175-
except ResourceNotFoundError:
176-
logger.with_context(container=container_name, blob=blob_name).warning("Blob not found")
177-
return False
178-
except Exception as e:
179-
logger.with_context(
180-
container=container_name,
181-
blob=blob_name,
182-
).error(f"Failed to delete blob: {e}")
183-
raise
184-
185-
async def file_exists(
186-
self, logger: ContextualLogger, container_name: str, blob_name: str
187-
) -> bool:
188-
"""Check if a file exists in Azure Blob Storage.
189-
190-
Args:
191-
logger: The logger to use
192-
container_name: Name of the container
193-
blob_name: Name of the blob
194-
195-
Returns:
196-
True if the blob exists
197-
"""
198-
try:
199-
container_client = self.client.get_container_client(container_name)
200-
blob_client = container_client.get_blob_client(blob_name)
201-
return blob_client.exists()
202-
except Exception as e:
203-
logger.with_context(
204-
container=container_name,
205-
blob=blob_name,
206-
).error(f"Failed to check blob existence: {e}")
207-
return False
208-
209-
210-
class LocalStorageBackend(StorageBackend):
211-
"""Local filesystem storage backend implementation."""
59+
Works everywhere - local development, Kubernetes with PVC, any platform.
60+
"""
21261

21362
def __init__(self, base_path: Path):
214-
"""Initialize local storage backend.
63+
"""Initialize filesystem storage backend.
21564
21665
Args:
217-
base_path: Base directory for local storage
66+
base_path: Base directory for storage (from settings.STORAGE_PATH)
21867
"""
21968
self.base_path = base_path
22069
self.base_path.mkdir(parents=True, exist_ok=True)
@@ -385,157 +234,54 @@ def __init__(self, backend: Optional[StorageBackend] = None):
385234
self._log_configuration()
386235

387236
def _configure_backend(self) -> StorageBackend:
388-
"""Configure storage backend based on environment.
389-
390-
Returns:
391-
Configured storage backend
392-
393-
Raises:
394-
RuntimeError: If configuration fails
395-
"""
396-
if settings.ENVIRONMENT == "local":
397-
return self._configure_local_backend()
398-
else:
399-
return self._configure_azure_backend()
237+
"""Configure filesystem storage backend.
400238
401-
def _configure_local_backend(self) -> StorageBackend:
402-
"""Configure backend for local development.
403-
404-
Tries Azure first, falls back to local disk.
239+
Uses settings.STORAGE_PATH for the storage location:
240+
- Local dev: ./local_storage (default)
241+
- Kubernetes: /data/airweave-storage (PVC mount)
405242
406243
Returns:
407-
Configured storage backend
244+
Configured filesystem storage backend
408245
"""
409-
# Check if we should skip Azure and use local storage directly
410-
if os.getenv("SKIP_AZURE_STORAGE", "false").lower() == "true":
411-
logger.debug("SKIP_AZURE_STORAGE is set, using local disk storage")
412-
local_path = Path("./local_storage")
413-
self._ensure_default_containers(local_path)
414-
return LocalStorageBackend(local_path)
415-
416-
# Try Azure connection first
417-
try:
418-
credential = DefaultAzureCredential()
419-
storage_account = os.getenv(
420-
"AZURE_STORAGE_ACCOUNT_NAME", self._get_default_storage_account()
421-
)
422-
423-
if not storage_account:
424-
raise ValueError("No storage account name available")
425-
426-
blob_client = BlobServiceClient(
427-
account_url=f"https://{storage_account}.blob.core.windows.net",
428-
credential=credential,
429-
)
430-
431-
# Test connection
432-
try:
433-
# Just try to get the first container to test connection
434-
next(iter(blob_client.list_containers()), None)
435-
except Exception:
436-
# If we can't list containers, connection has failed
437-
raise
246+
storage_path = Path(settings.STORAGE_PATH)
438247

439-
logger.with_context(
440-
storage_account=storage_account,
441-
).info("Connected to Azure Storage via Azure CLI")
442-
return AzureStorageBackend(blob_client)
248+
logger.info(f"Configuring filesystem storage", extra={"storage_path": str(storage_path)})
443249

444-
except (ClientAuthenticationError, Exception) as e:
445-
logger.warning(
446-
f"Azure connection failed, using local disk: {e} (error type: {type(e).__name__})"
447-
)
250+
# Ensure base directory exists
251+
storage_path.mkdir(parents=True, exist_ok=True)
448252

449-
# Fall back to local disk
450-
local_path = Path("./local_storage")
451-
self._ensure_default_containers(local_path)
452-
return LocalStorageBackend(local_path)
253+
# Create default containers (subdirectories)
254+
self._ensure_default_containers(storage_path)
453255

454-
def _configure_azure_backend(self) -> StorageBackend:
455-
"""Configure Azure backend for dev/prod environments.
256+
logger.info(
257+
"Filesystem storage configured successfully", extra={"storage_path": str(storage_path)}
258+
)
456259

457-
Returns:
458-
Configured Azure storage backend
459-
460-
Raises:
461-
RuntimeError: If Azure connection fails
462-
"""
463-
try:
464-
credential = DefaultAzureCredential()
465-
storage_account = os.getenv(
466-
"AZURE_STORAGE_ACCOUNT_NAME", self._get_default_storage_account()
467-
)
468-
469-
if not storage_account:
470-
raise ValueError("No storage account name configured")
471-
472-
blob_client = BlobServiceClient(
473-
account_url=f"https://{storage_account}.blob.core.windows.net",
474-
credential=credential,
475-
)
476-
477-
# Test connection
478-
try:
479-
# Just try to get the first container to test connection
480-
next(iter(blob_client.list_containers()), None)
481-
except Exception:
482-
# If we can't list containers, connection has failed
483-
raise
484-
485-
logger.with_context(
486-
environment=settings.ENVIRONMENT,
487-
storage_account=storage_account,
488-
).info("Connected to Azure Storage using managed identity")
489-
return AzureStorageBackend(blob_client)
490-
491-
except Exception as e:
492-
logger.with_context(
493-
environment=settings.ENVIRONMENT,
494-
error_type=type(e).__name__,
495-
).error(f"Failed to connect to Azure Storage: {e}")
496-
raise RuntimeError(f"Azure Storage connection failed: {e}") from e
497-
498-
def _get_default_storage_account(self) -> str:
499-
"""Get default storage account name based on environment.
500-
501-
Returns:
502-
Storage account name
503-
"""
504-
env_map = {
505-
"local": "airweavecoredevstorage", # Use dev storage when running locally
506-
"dev": "airweavecoredevstorage",
507-
"prd": "airweavecoreprdsstorage",
508-
}
509-
return env_map.get(settings.ENVIRONMENT, "")
260+
return FilesystemStorageBackend(storage_path)
510261

511262
def _ensure_default_containers(self, base_path: Path) -> None:
512-
"""Ensure default containers exist for local storage.
263+
"""Ensure default storage containers (subdirectories) exist.
513264
514265
Args:
515-
base_path: Base directory for local storage
266+
base_path: Base directory for storage
516267
"""
517268
default_containers = ["sync-data", "sync-metadata", "processed-files", "backup"]
518269
for container in default_containers:
519270
(base_path / container).mkdir(parents=True, exist_ok=True)
520271

272+
logger.debug(
273+
f"Ensured {len(default_containers)} storage containers exist",
274+
extra={"containers": default_containers},
275+
)
276+
521277
def _log_configuration(self) -> None:
522278
"""Log the current storage configuration."""
523279
backend_type = type(self.backend).__name__
524280
logger.with_context(
525-
environment=settings.ENVIRONMENT,
281+
storage_path=settings.STORAGE_PATH,
526282
backend_type=backend_type,
527-
is_local_disk=isinstance(self.backend, LocalStorageBackend),
528283
).info("Storage client configured")
529284

530-
@property
531-
def is_local_disk(self) -> bool:
532-
"""Check if using local disk storage.
533-
534-
Returns:
535-
True if using local disk storage
536-
"""
537-
return isinstance(self.backend, LocalStorageBackend)
538-
539285
# Delegate all storage operations to the backend
540286
async def list_containers(self, logger: ContextualLogger) -> List[str]:
541287
"""List all containers/directories."""

0 commit comments

Comments
 (0)