Skip to content

Commit 91c689a

Browse files
committed
Add icon uploading to objectstore
1 parent 7e25670 commit 91c689a

File tree

7 files changed

+514
-16
lines changed

7 files changed

+514
-16
lines changed

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ sentry-kafka-schemas==2.1.2
1919
sentry-sdk>=2.36.0
2020
sortedcontainers>=2.4.0
2121
typing-extensions>=4.15.0
22+
zstandard>=0.18.0

src/launchpad/api/update_api_models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class UpdateData(BaseModel):
5757
artifact_type: int
5858
apple_app_info: Optional[AppleAppInfo] = None
5959
dequeued_at: Optional[datetime] = Field(None, description="Timestamp when message was dequeued from Kafka")
60+
app_icon_id: Optional[str] = None
6061

6162
@field_serializer("dequeued_at")
6263
def serialize_datetime(self, dt: datetime | None) -> str | None:

src/launchpad/artifact_processor.py

Lines changed: 82 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
import sentry_sdk
1515

16-
from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import PreprodArtifactEvents
16+
from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import (
17+
PreprodArtifactEvents,
18+
)
1719

1820
from launchpad.api.update_api_models import AppleAppInfo as AppleAppInfoModel
1921
from launchpad.api.update_api_models import PutSizeFailed, UpdateData
@@ -41,6 +43,12 @@
4143
from launchpad.size.models.common import BaseAppInfo
4244
from launchpad.tracing import request_context
4345
from launchpad.utils.logging import get_logger
46+
from launchpad.utils.objectstore.service import (
47+
Client as ObjectstoreClient,
48+
)
49+
from launchpad.utils.objectstore.service import (
50+
ClientBuilder as ObjectstoreClientBuilder,
51+
)
4452
from launchpad.utils.statsd import StatsdInterface, get_statsd
4553

4654
logger = get_logger(__name__)
@@ -49,12 +57,23 @@
4957
class ArtifactProcessor:
5058
"""Handles the processing of artifacts including download, analysis, and upload."""
5159

52-
def __init__(self, sentry_client: SentryClient, statsd: StatsdInterface) -> None:
60+
def __init__(
61+
self,
62+
sentry_client: SentryClient,
63+
statsd: StatsdInterface,
64+
objectstore_client: ObjectstoreClient,
65+
) -> None:
5366
self._sentry_client = sentry_client
5467
self._statsd = statsd
68+
self._objectstore_client = objectstore_client
5569

5670
@staticmethod
57-
def process_message(payload: PreprodArtifactEvents, service_config=None, artifact_processor=None, statsd=None):
71+
def process_message(
72+
payload: PreprodArtifactEvents,
73+
service_config=None,
74+
artifact_processor=None,
75+
statsd=None,
76+
):
5877
"""Process an artifact message with proper context and metrics.
5978
This is used by the Kafka workers and so has to set up the context from scratch.
6079
If components are not provided, they will be created.
@@ -66,15 +85,18 @@ def process_message(payload: PreprodArtifactEvents, service_config=None, artifac
6685

6786
initialize_sentry_sdk()
6887

88+
organization_id = payload["organization_id"]
89+
project_id = payload["project_id"]
90+
artifact_id = payload["artifact_id"]
91+
6992
if statsd is None:
7093
statsd = get_statsd()
7194
if artifact_processor is None:
7295
sentry_client = SentryClient(base_url=service_config.sentry_base_url)
73-
artifact_processor = ArtifactProcessor(sentry_client, statsd)
74-
75-
organization_id = payload["organization_id"]
76-
project_id = payload["project_id"]
77-
artifact_id = payload["artifact_id"]
96+
objectstore_client = ObjectstoreClientBuilder(
97+
usecase="app-icons", options={"base_url": "http://localhost:8888/"}
98+
).for_project(organization_id, project_id)
99+
artifact_processor = ArtifactProcessor(sentry_client, statsd, objectstore_client)
78100

79101
requested_features = []
80102
for feature in payload.get("requested_features", []):
@@ -92,7 +114,10 @@ def process_message(payload: PreprodArtifactEvents, service_config=None, artifac
92114
stack.enter_context(
93115
statsd.timed(
94116
"artifact.processing.duration",
95-
tags=[f"project_id:{project_id}", f"organization_id:{organization_id}"],
117+
tags=[
118+
f"project_id:{project_id}",
119+
f"organization_id:{organization_id}",
120+
],
96121
)
97122
)
98123
scope = stack.enter_context(sentry_sdk.new_scope())
@@ -116,7 +141,11 @@ def process_message(payload: PreprodArtifactEvents, service_config=None, artifac
116141
)
117142

118143
def process_artifact(
119-
self, organization_id: str, project_id: str, artifact_id: str, requested_features: list[PreprodFeature]
144+
self,
145+
organization_id: str,
146+
project_id: str,
147+
artifact_id: str,
148+
requested_features: list[PreprodFeature],
120149
) -> None:
121150
"""Process an artifact with the requested features."""
122151
dequeued_at = datetime.now()
@@ -125,8 +154,16 @@ def process_artifact(
125154
path = stack.enter_context(self._download_artifact(organization_id, project_id, artifact_id))
126155
artifact = self._parse_artifact(organization_id, project_id, artifact_id, path)
127156
analyzer = self._create_analyzer(artifact)
128-
129-
info = self._preprocess_artifact(organization_id, project_id, artifact_id, artifact, analyzer, dequeued_at)
157+
app_icon_object_id = self._process_app_icon(organization_id, project_id, artifact_id, artifact)
158+
info = self._preprocess_artifact(
159+
organization_id,
160+
project_id,
161+
artifact_id,
162+
artifact,
163+
analyzer,
164+
dequeued_at,
165+
app_icon_object_id,
166+
)
130167

131168
if PreprodFeature.SIZE_ANALYSIS in requested_features:
132169
self._do_size(organization_id, project_id, artifact_id, artifact, analyzer)
@@ -145,7 +182,8 @@ def _download_artifact(
145182

146183
with tempfile.NamedTemporaryFile(suffix=".zip") as tf:
147184
with self._statsd.timed(
148-
"artifact.download.duration", tags=[f"project_id:{project_id}", f"organization_id:{organization_id}"]
185+
"artifact.download.duration",
186+
tags=[f"project_id:{project_id}", f"organization_id:{organization_id}"],
149187
):
150188
size = self._sentry_client.download_artifact(
151189
org=organization_id,
@@ -189,14 +227,15 @@ def _preprocess_artifact(
189227
artifact: Artifact,
190228
analyzer: AndroidAnalyzer | AppleAppAnalyzer,
191229
dequeued_at: datetime,
230+
app_icon_id: str | None,
192231
) -> AppleAppInfo | BaseAppInfo:
193232
logger.info(f"Preprocessing for {artifact_id} (project: {project_id}, org: {organization_id})")
194233
try:
195234
info = self._retry_operation(
196235
lambda: analyzer.preprocess(cast(Any, artifact)),
197236
OperationName.PREPROCESSING,
198237
)
199-
update_data = self._prepare_update_data(info, artifact, dequeued_at)
238+
update_data = self._prepare_update_data(info, artifact, dequeued_at, app_icon_id)
200239
self._sentry_client.update_artifact(
201240
org=organization_id,
202241
project=project_id,
@@ -217,6 +256,22 @@ def _preprocess_artifact(
217256
else:
218257
return info
219258

259+
def _process_app_icon(
260+
self,
261+
organization_id: str,
262+
project_id: str,
263+
artifact_id: str,
264+
artifact: Artifact,
265+
) -> str | None:
266+
logger.info(f"Processing app icon for {artifact_id} (project: {project_id}, org: {organization_id})")
267+
app_icon = artifact.get_app_icon()
268+
if app_icon is None:
269+
logger.info(f"No app icon found for {artifact_id} (project: {project_id}, org: {organization_id})")
270+
return None
271+
272+
app_icon_id = self._objectstore_client.put(app_icon, compression="none")
273+
return app_icon_id
274+
220275
def _do_distribution(
221276
self,
222277
organization_id: str,
@@ -376,7 +431,13 @@ def _update_size_error_from_exception(
376431
if error_message == ProcessingErrorMessage.UNKNOWN_ERROR:
377432
error_message = _guess_message(error_code, e)
378433
self._update_size_error(
379-
organization_id, project_id, artifact_id, error_code, error_message, str(e), identifier=identifier
434+
organization_id,
435+
project_id,
436+
artifact_id,
437+
error_code,
438+
error_message,
439+
str(e),
440+
identifier=identifier,
380441
)
381442

382443
def _update_size_error(
@@ -413,7 +474,11 @@ def _update_size_error(
413474
logger.exception(f"Failed to update artifact with error {message}")
414475

415476
def _prepare_update_data(
416-
self, app_info: AppleAppInfo | BaseAppInfo, artifact: Artifact, dequeued_at: datetime
477+
self,
478+
app_info: AppleAppInfo | BaseAppInfo,
479+
artifact: Artifact,
480+
dequeued_at: datetime,
481+
app_icon_id: str | None,
417482
) -> Dict[str, Any]:
418483
def _get_artifact_type(artifact: Artifact) -> ArtifactType:
419484
if isinstance(artifact, ZippedXCArchive):
@@ -450,6 +515,7 @@ def _get_artifact_type(artifact: Artifact) -> ArtifactType:
450515
artifact_type=_get_artifact_type(artifact).value,
451516
apple_app_info=apple_app_info,
452517
dequeued_at=dequeued_at,
518+
app_icon_id=app_icon_id,
453519
)
454520

455521
return update_data.model_dump(exclude_none=True)

src/launchpad/service.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,20 +127,23 @@ class ServiceConfig:
127127

128128
sentry_base_url: str
129129
projects_to_skip: list[str]
130+
objectstore_base_url: str
130131

131132

132133
def get_service_config() -> ServiceConfig:
133134
"""Get service configuration from environment."""
134135
sentry_base_url = os.getenv("SENTRY_BASE_URL")
135136
projects_to_skip_str = os.getenv("PROJECT_IDS_TO_SKIP")
136137
projects_to_skip = projects_to_skip_str.split(",") if projects_to_skip_str else []
138+
objectstore_base_url = os.getenv("OBJECTSTORE_BASE_URL")
137139

138140
if sentry_base_url is None:
139141
sentry_base_url = "http://getsentry.default"
140142

141143
return ServiceConfig(
142144
sentry_base_url=sentry_base_url,
143145
projects_to_skip=projects_to_skip,
146+
objectstore_base_url=objectstore_base_url,
144147
)
145148

146149

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
from __future__ import annotations
2+
3+
import itertools
4+
import re
5+
6+
from collections.abc import Mapping
7+
from dataclasses import dataclass
8+
from datetime import timedelta
9+
from typing import Literal, cast
10+
11+
Compression = Literal["zstd"]
12+
13+
HEADER_EXPIRATION = "x-sn-expiration"
14+
HEADER_META_PREFIX = "x-snme-"
15+
16+
17+
@dataclass
18+
class TimeToIdle:
19+
delta: timedelta
20+
21+
22+
@dataclass
23+
class TimeToLive:
24+
delta: timedelta
25+
26+
27+
ExpirationPolicy = TimeToIdle | TimeToLive
28+
29+
30+
@dataclass
31+
class Metadata:
32+
compression: Compression | None
33+
expiration_policy: ExpirationPolicy | None
34+
custom: dict[str, str]
35+
36+
@classmethod
37+
def from_headers(cls, headers: Mapping[str, str]) -> Metadata:
38+
compression = None
39+
expiration_policy = None
40+
custom_metadata = {}
41+
for k, v in headers.items():
42+
if k == "content-encoding":
43+
compression = cast(Compression | None, v)
44+
elif k == HEADER_EXPIRATION:
45+
expiration_policy = parse_expiration(v)
46+
elif k.startswith(HEADER_META_PREFIX):
47+
custom_metadata[k[len(HEADER_META_PREFIX) :]] = v
48+
return Metadata(compression, expiration_policy, custom_metadata)
49+
50+
51+
def format_expiration(expiration_policy: ExpirationPolicy) -> str:
52+
if isinstance(expiration_policy, TimeToIdle):
53+
return f"tti:{format_timedelta(expiration_policy.delta)}"
54+
elif isinstance(expiration_policy, TimeToLive):
55+
return f"ttl:{format_timedelta(expiration_policy.delta)}"
56+
57+
58+
def parse_expiration(value: str) -> ExpirationPolicy | None:
59+
if value.startswith("tti:"):
60+
return TimeToIdle(parse_timedelta(value[4:]))
61+
elif value.startswith("ttl:"):
62+
return TimeToLive(parse_timedelta(value[4:]))
63+
64+
return None
65+
66+
67+
def format_timedelta(delta: timedelta) -> str:
68+
days = delta.days
69+
output = f"{days} days" if days else ""
70+
if seconds := delta.seconds:
71+
if output:
72+
output += " "
73+
output += f"{seconds} seconds"
74+
75+
return output
76+
77+
78+
TIME_SPLIT = re.compile(r"[^\W\d_]+|\d+")
79+
80+
81+
def parse_timedelta(delta: str) -> timedelta:
82+
words = TIME_SPLIT.findall(delta)
83+
seconds = 0
84+
85+
for num, unit in itertools.batched(words, n=2, strict=True):
86+
num = int(num)
87+
multiplier = 0
88+
89+
if unit.startswith("w"):
90+
multiplier = 86400 * 7
91+
elif unit.startswith("d"):
92+
multiplier = 86400
93+
elif unit.startswith("h"):
94+
multiplier = 3600
95+
elif unit.startswith("m") and not unit.startswith("ms"):
96+
multiplier = 60
97+
elif unit.startswith("s"):
98+
multiplier = 1
99+
100+
seconds += num * multiplier
101+
102+
return timedelta(seconds=seconds)

0 commit comments

Comments
 (0)