#!/bin/python3 import re import copy import random import argparse import logging import asyncio import subprocess import copy import aiohttp from pathlib import Path from xml.etree import ElementTree as ET ns = {'': 'http://maven.apache.org/POM/4.0.0'} ET.register_namespace('', ns['']) baseurl = 'https://search.maven.org' base_pom_path = Path('poms') mirrors = [ "https://repo.maven.apache.org/maven2", "https://repo1.maven.org/maven2", "https://oss.sonatype.org/content/repositories/snapshots", "https://packages.confluent.io/maven", "https://registry.quarkus.io/maven", "https://plugins.gradle.org/m2", ] done: set[str] = set() done_lock = asyncio.Lock() in_progress: set[str] = set() in_progress_lock = asyncio.Lock() num_workers = 50 global_properties: dict[str, dict[str, str]] = {} class TooManyRequestsException(Exception): pass class PackageError(Exception): pass class WaitForPackage(Exception): def __init__(self, package): self.package = package class PackagePOM: def __init__(self, package: 'Package', pom: str): self._package = package logger.debug(f'{package}: Parsing POM') self.raw_root = ET.fromstring(pom) self.parent: Package | None = None if (parent_tag := self.raw_root.find('parent', ns)) is not None: parent_group_tag = parent_tag.find('groupId', ns) parent_artifact_tag = parent_tag.find('artifactId', ns) parent_version_tag = parent_tag.find('version', ns) parent_group = parent_group_tag.text if parent_group_tag is not None else None parent_artifact = parent_artifact_tag.text if parent_artifact_tag is not None else None parent_version = parent_version_tag.text if parent_version_tag is not None else None logger.debug(f'{package}: Parsing parent {parent_group}:{parent_artifact}:{parent_version}') if parent_group is not None and parent_artifact is not None and parent_version is not None: parent = Package( parent_group, parent_artifact, parent_version, ) if str(parent) in done: self.parent = parent else: raise WaitForPackage(parent) else: raise PackageError(f'Invalid parent {parent_group}:{parent_artifact}:{parent_version}') logger.debug(f'{package}: Parsing properties') parent_props: dict[str, str] = {} if self.parent is None else global_properties[str(self.parent)] self.properties = self.resolve_props(parent_props) global_properties[str(package)] = self.properties logger.debug(f'{package}: Parsing packaging') if (packaging := self.raw_root.find('packaging', ns)) is not None: self.packaging = packaging.text else: self.packaging = '??' self.is_bom = self.packaging == 'pom' if self.packaging == 'pom': root_copy = copy.deepcopy(self.raw_root) depman = root_copy.find('dependencyManagement', ns) if depman is not None: root_copy.extend(depman.findall('*')) root_copy.remove(depman) tmpGroupId = f'tmp.{package.groupId}' tmpArtifactId = f'placeholder.{package.artifactId}' tmpVersion = package.version if (groupId := root_copy.find('groupId', ns)) is not None: groupId.text = tmpGroupId else: logger.info(f"{package}: Inserting new groupId tag in pom") ET.SubElement(root_copy, 'groupId').text = tmpGroupId if (artifactId := root_copy.find('artifactId', ns)) is not None: artifactId.text = tmpArtifactId else: logger.info(f"{package}: Inserting new artifactId tag in pom") ET.SubElement(root_copy, 'artifactId').text = tmpArtifactId if (version := root_copy.find('version', ns)) is not None: version.text = tmpVersion else: logger.info(f"{package}: Inserting new version tag in pom") ET.SubElement(root_copy, 'version').text = tmpVersion # Add a dependency for the pom itself dependencies = root_copy.find('dependencies', ns) or ET.SubElement(root_copy, 'dependencies') self_dep = ET.SubElement(dependencies, 'dependency') ET.SubElement(self_dep, 'groupId').text = package.groupId ET.SubElement(self_dep, 'artifactId').text = package.artifactId ET.SubElement(self_dep, 'version').text = package.version self.generated_root = root_copy else: self.generated_root = ET.fromstring( f""" 4.0.0 tmp.{package.groupId} placeholder-{package.artifactId} {package.version} Package {package.artifactId} {package.groupId} {package.artifactId} {package.version} """ ) logger.debug(f'{package}: POM parsed') def write(self, f): tree = ET.ElementTree(self.generated_root) ET.indent(tree) tree.write(f) def resolve_props(self, initial: dict[str, str]): props = initial for prop_tag in self.raw_root.findall('.//properties/*', ns): prop = prop_tag.tag.replace(f'{{{ns[""]}}}', '') value = prop_tag.text if prop_tag.text is not None else '' logger.debug(f'{self._package}: Setting prop {prop}={value}') props[prop] = value changed = True while changed: changed = False for prop, value in props.items(): new_value = self.prop_replace(value, props) if new_value != value: changed = True logger.debug(f'{self._package}: Setting prop {prop}={new_value}') props[prop] = new_value return props def prop_replace(self, text, props: dict[str, str] | None = None) -> str: def lookup_prop(match) -> str: prop = match.group(1) if prop == 'project.groupId': value = str(self._package.groupId) elif prop == 'project.artifactId': value = str(self._package.artifactId) elif prop == 'project.version': value = str(self._package.version) elif prop.startswith('project.build') or prop.startswith('env.') or prop.startswith('maven.'): value = '' else: try: value = props[prop] if props is not None else self.properties[prop] except KeyError: logger.error(f'{self._package}: Could not find property {prop}. Setting it to ""') value = '' logger.debug(f'{self._package}: Replacing property {prop} with {value}') return value return re.sub( r'\$\{([^\}]*)\}', lookup_prop, text, ) def _package_from_xml_dep(self, dep: ET.Element) -> 'Package': def prop_replace_tag(tag) -> str: return self.prop_replace( elem.text or '' if (elem := dep.find(tag, ns)) is not None else '', ) return Package( groupId=prop_replace_tag('groupId'), artifactId=prop_replace_tag('artifactId'), version=prop_replace_tag('version'), ) @property def dependency_management(self) -> list['Package']: dependencies: list[Package] = [] for dep in self.raw_root.find('dependencyManagement/dependencies', ns) or []: package = self._package_from_xml_dep(dep) dependencies.append(package) return dependencies class Package: _pom: PackagePOM | None = None _verified: bool = False def __init__(self, groupId: str, artifactId: str, version: str | None = None, implicit: bool = False): self.groupId = groupId self.artifactId = artifactId self.version = version self.implicit = implicit def __str__(self) -> str: return f'{self.groupId}:{self.artifactId}:{self.version or "----"}' def __eq__(self, other) -> bool: return ( self.groupId == other.groupId and self.artifactId == other.artifactId and self.version == other.version ) def __hash__(self) -> int: return hash((self.groupId, self.artifactId, self.version)) @property def dir_path(self): group_path = self.groupId.replace(".", "/") return f'{group_path}/{self.artifactId}/{self.version}' @property def base_filename(self): return f'{self.artifactId}-{self.version}' async def download_file(self, extension): filepath = f'{self.dir_path}/{self.base_filename}.{extension}' async with aiohttp.ClientSession() as session: for mirror in mirrors: pom_url = f'{mirror}/{filepath}' logger.debug(f'{self}: Downloading {extension} from {pom_url}') async with session.get(pom_url) as response: if response.status == 200: logger.debug(f'{self}: {extension} downloaded') return await response.text() break elif response.status == 429: raise TooManyRequestsException() else: logger.debug(f'{self}: HTTP error {response.status} from mirror {mirror}') else: logger.warning(f'{self}: File download of {extension} failed for all mirrors') return None @property async def pom(self) -> PackagePOM: if self._pom is not None: return self._pom if self.version is None: await self._query_maven() self._pom = PackagePOM(self, await self.download_file('pom')) return self._pom @property def _urlquery(self) -> str: q = f'g:{self.groupId}+AND+a:{self.artifactId}' if self.version is not None: q += f'+AND+v:{self.version}' return q async def _query_maven(self) -> None: url = f'{baseurl}/solrsearch/select?q={self._urlquery}&rows=1&wt=json' logger.debug(f'{self}: Querying maven at url {url}') async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: message = await response.json() num = message['response']['numFound'] if num: logger.debug(f'{self}: Query successful') self._verified = True if self.version is None: version = message['response']['docs'][0]['latestVersion'] logger.debug(f'{self}: Using newest version {version}') self.version = version else: if self.implicit: logger.debug(f'{self}: No matching packages found') else: logger.warning(f'{self}: No matching packages found') self._verified = False elif response.status == 429: raise TooManyRequestsException() else: self._verified = False logger.warning(f'{self}: HTTP error {response.status} downloading pom') async def verify(self) -> bool: if not self._verified: await self._query_maven() return self._verified def load_package_list(list_path: Path, queue: asyncio.Queue) -> None: logger.info(f'Parsing {list_path}') with list_path.open('r') as f: for line in f.readlines(): sections = line.strip().split(':') if len(sections) < 2 or len(sections) > 3: logger.warning(f'Invalid package format "{line}". It should be "groupID:artifactID" or "groupID:artifactID:version"') continue package = Package( sections[0], sections[1], sections[2] if len(sections) == 3 else None, ) queue.put_nowait(package) if not package.artifactId.endswith('-jvm'): queue.put_nowait( Package( package.groupId, f'{package.artifactId}-jvm', package.version, True, ) ) async def download(package: Package, queue: asyncio.Queue) -> None: async with done_lock: skip = str(package) in done async with in_progress_lock: skip = skip or (str(package) in in_progress) if skip: logger.info(f'{package}: Already downloaded. Skipping.') else: async with in_progress_lock: in_progress.add(str(package)) if await package.verify(): pom_dir = base_pom_path / f'{package.groupId}-{package.artifactId}-{package.version}' pom_path = pom_dir / 'pom.xml' pom_dir.mkdir(exist_ok=True) pom = await package.pom if not pom: return pom.write(pom_path) logger.info(f'{package}: Downloaded') if not pom.is_bom: for dep in pom.dependency_management: logger.info(f'{package}: Handling transitive dependency {dep}') await queue.put(dep) async with done_lock: logger.debug(f'{package}: Marking done') p = copy.copy(package) p.version = None done.add(str(package)) done.add(str(p)) async with in_progress_lock: if str(package) in in_progress: in_progress.remove(str(package)) else: p = copy.copy(package) p.version = None if str(p) in in_progress: in_progress.remove(str(p)) else: logger.warning(f'{package}: Package is done, but not marked as in progress') async def worker(queue: asyncio.Queue) -> None: while True: package = await queue.get() while True: try: await download(package, queue) break except TooManyRequestsException: logger.info('Too many requests. Delaying next attempt') await asyncio.sleep(3*random.random() + 0.2) except WaitForPackage as e: logger.info(f'{package}: Waiting for {e.package}') await queue.put(e.package) await queue.put(package) break except PackageError: logger.exception(f'{package}: Error while processing package') break except Exception: logger.exception(f'{package}: Unknown error while processing package') logger.error(global_properties) break queue.task_done() async def main() -> None: queue: asyncio.Queue = asyncio.Queue() tasks = [] load_package_list(Path('package-list.txt'), queue) logger.debug(f'Starting {num_workers} workers') for i in range(num_workers): tasks.append( asyncio.create_task( worker(queue) ) ) await queue.join() logger.debug('Queue is empty. Cancelling workers') for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) logger.info('Generating master POM') subprocess.call(['sh', 'generate_master_pom.sh']) logger = logging.getLogger(__name__) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-w', '--workers', type=int, default=num_workers) parser.add_argument('-v', '--verbose', dest='verbosity', action='count', default=0) args = parser.parse_args() if args.verbosity == 0: log_level = 'WARNING' elif args.verbosity == 1: log_level = 'INFO' else: log_level = 'DEBUG' logging.basicConfig(level=log_level) num_workers = args.workers asyncio.run(main())