#!/bin/python3 import re import random import argparse import logging import asyncio import subprocess import copy import aiohttp from pathlib import Path from xml.etree import ElementTree as ET ns = {'': 'http://maven.apache.org/POM/4.0.0'} ET.register_namespace('', ns['']) baseurl = 'https://search.maven.org' base_pom_path = Path('poms') mirrors = [ "https://repo.maven.apache.org/maven2", "https://repo1.maven.org/maven2", "https://oss.sonatype.org/content/repositories/snapshots", "https://packages.confluent.io/maven", "https://registry.quarkus.io/maven", "https://plugins.gradle.org/m2", ] done: set[str] = set() done_lock = asyncio.Lock() num_workers = 50 class TooManyRequestsException(Exception): pass class PackagePOM: def __init__(self, package: 'Package', pom: str): logger.debug(f'{package}: Parsing POM') self.raw_root = ET.fromstring(pom) if (packaging := self.raw_root.find('packaging', ns)) is not None: self.packaging = packaging.text else: self.packaging = '??' self.is_bom = self.packaging == 'pom' if self.packaging == 'pom': root_copy = copy.deepcopy(self.raw_root) depman = root_copy.find('dependencyManagement', ns) if depman is not None: root_copy.extend(depman.findall('*')) root_copy.remove(depman) tmpGroupId = f'tmp.{package.groupId}' tmpArtifactId = f'placeholder.{package.artifactId}' tmpVersion = package.version if (groupId := root_copy.find('groupId', ns)) is not None: groupId.text = tmpGroupId else: logger.info(f"{package}: Inserting new groupId tag in pom") ET.SubElement(root_copy, 'groupId').text = tmpGroupId if (artifactId := root_copy.find('artifactId', ns)) is not None: artifactId.text = tmpArtifactId else: logger.info(f"{package}: Inserting new artifactId tag in pom") ET.SubElement(root_copy, 'artifactId').text = tmpArtifactId if (version := root_copy.find('version', ns)) is not None: version.text = tmpVersion else: logger.info(f"{package}: Inserting new version tag in pom") ET.SubElement(root_copy, 'version').text = tmpVersion # Add a dependency for the pom itself if (dependencies := root_copy.find('dependencies', ns)) is not None: self_dep = ET.SubElement(dependencies, 'dependency') ET.SubElement(self_dep, 'groupId').text = package.groupId ET.SubElement(self_dep, 'artifactId').text = package.artifactId ET.SubElement(self_dep, 'version').text = package.version else: logger.warning(f"{package}: No dependencies tag in pom") self.generated_root = root_copy else: self.generated_root = ET.fromstring( f""" 4.0.0 tmp.{package.groupId} placeholder-{package.artifactId} {package.version} Package {package.artifactId} {package.groupId} {package.artifactId} {package.version} """ ) logger.debug(f'{package}: POM parsed') def write(self, f): tree = ET.ElementTree(self.generated_root) ET.indent(tree) tree.write(f) def get_property(self, prop: str): elem = self.raw_root.find(f'.//properties/{prop}', ns) if elem is not None: return elem.text else: return None def _package_from_xml_dep(self, dep: ET.Element): def prop_replace(match): prop = match.group(1) value = self.get_property(match.group(1)) logger.debug(f'Replacing property {prop} with {value}') return value return Package( *[ re.sub( r'\$\{([^\}]*)\}', prop_replace, elem.text or '' if (elem := dep.find(tag, ns)) is not None else '', ) for tag in [ 'groupId', 'artifactId', 'version', ] ] ) @property def dependency_management(self) -> list['Package']: dependencies: list[Package] = [] for dep in self.raw_root.find('dependencyManagement/dependencies', ns) or []: package = self._package_from_xml_dep(dep) dependencies.append(package) return dependencies class Package: _pom: PackagePOM | None = None _verified: bool = False def __init__(self, groupId: str, artifactId: str, version: str | None = None, implicit: bool = False): self.groupId = groupId self.artifactId = artifactId self.version = version self.implicit = implicit def __str__(self) -> str: return f'{self.groupId}:{self.artifactId}:{self.version or "----"}' def __eq__(self, other) -> bool: return ( self.groupId == other.groupId and self.artifactId == other.artifactId and self.version == other.version ) def __hash__(self) -> int: return hash((self.groupId, self.artifactId, self.version)) @property def dir_path(self): group_path = self.groupId.replace(".", "/") return f'{group_path}/{self.artifactId}/{self.version}' @property def base_filename(self): return f'{self.artifactId}-{self.version}' async def download_file(self, extension): filepath = f'{self.dir_path}/{self.base_filename}.{extension}' async with aiohttp.ClientSession() as session: for mirror in mirrors: pom_url = f'{mirror}/{filepath}' logger.debug(f'{self}: Downloading {extension} from {pom_url}') async with session.get(pom_url) as response: if response.status == 200: logger.debug(f'{self}: {extension} downloaded') return await response.text() break elif response.status == 429: raise TooManyRequestsException() else: logger.debug(f'{self}: HTTP error {response.status} from mirror {mirror}') else: logger.warning(f'{self}: File download of {extension} failed for all mirrors') return None @property async def pom(self) -> PackagePOM: if self._pom is not None: return self._pom if self.version is None: await self._query_maven() self._pom = PackagePOM(self, await self.download_file('pom')) return self._pom @property def _urlquery(self) -> str: q = f'g:{self.groupId}+AND+a:{self.artifactId}' if self.version is not None: q += f'+AND+v:{self.version}' return q async def _query_maven(self) -> None: url = f'{baseurl}/solrsearch/select?q={self._urlquery}&rows=1&wt=json' logger.debug(f'{self}: Querying maven at url {url}') async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: message = await response.json() num = message['response']['numFound'] if num: logger.debug(f'{self}: Query successful') self._verified = True if self.version is None: version = message['response']['docs'][0]['latestVersion'] logger.debug(f'{self}: Using newest version {version}') self.version = version else: if self.implicit: logger.debug(f'{self}: No matching packages found') else: logger.warning(f'{self}: No matching packages found') self._verified = False elif response.status == 429: raise TooManyRequestsException() else: self._verified = False logger.warning(f'{self}: HTTP error {response.status} downloading pom') async def verify(self) -> bool: if not self._verified: await self._query_maven() return self._verified def load_package_list(list_path: Path, queue: asyncio.Queue) -> None: logger.info(f'Parsing {list_path}') with list_path.open('r') as f: for line in f.readlines(): sections = line.strip().split(':') if len(sections) < 2 or len(sections) > 3: logger.warning(f'Invalid package format "{line}". It should be "groupID:artifactID" or "groupID:artifactID:version"') continue package = Package( sections[0], sections[1], sections[2] if len(sections) == 3 else None, ) queue.put_nowait(package) if not package.artifactId.endswith('-jvm'): queue.put_nowait( Package( package.groupId, f'{package.artifactId}-jvm', package.version, True, ) ) async def download(package: Package, queue: asyncio.Queue) -> None: async with done_lock: skip = str(package) in done if skip: logger.info(f'{package}: Already downloaded. Skipping.') elif await package.verify(): async with done_lock: done.add(str(package)) pom_dir = base_pom_path / f'{package.groupId}-{package.artifactId}-{package.version}' pom_path = pom_dir / 'pom.xml' pom_dir.mkdir(exist_ok=True) pom = await package.pom if not pom: return pom.write(pom_path) logger.info(f'{package}: Downloaded') if not pom.is_bom: for dep in pom.dependency_management: logger.info(f'{package}: Handling transitive dependency {dep}') await queue.put(dep) async def worker(queue: asyncio.Queue) -> None: while True: package = await queue.get() while True: try: await download(package, queue) break except TooManyRequestsException: logger.info('Too many requests. Delaying next attempt') await asyncio.sleep(3*random.random() + 0.2) queue.task_done() async def main() -> None: queue: asyncio.Queue = asyncio.Queue() tasks = [] load_package_list(Path('package-list.txt'), queue) logger.debug(f'Starting {num_workers} workers') for i in range(num_workers): tasks.append( asyncio.create_task( worker(queue) ) ) await queue.join() logger.debug('Queue is empty. Cancelling workers') for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) logger.info('Generating master POM') subprocess.call(['sh', 'generate_master_pom.sh']) logger = logging.getLogger(__name__) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-w', '--workers', type=int, default=num_workers) parser.add_argument('-v', '--verbose', dest='verbosity', action='count', default=0) args = parser.parse_args() if args.verbosity == 0: log_level = 'WARNING' elif args.verbosity == 1: log_level = 'INFO' else: log_level = 'DEBUG' logging.basicConfig(level=log_level) num_workers = args.workers asyncio.run(main())