#!/bin/python3 import re import random import argparse import logging import asyncio import subprocess import copy import aiohttp from pathlib import Path from xml.etree import ElementTree as ET ns = {'': 'http://maven.apache.org/POM/4.0.0'} ET.register_namespace('', ns['']) baseurl = 'https://search.maven.org' output_path: Path = Path() mirrors = [ "https://repo.maven.apache.org/maven2", "https://repo1.maven.org/maven2", "https://oss.sonatype.org/content/repositories/snapshots", "https://packages.confluent.io/maven", "https://registry.quarkus.io/maven", "https://plugins.gradle.org/m2", ] done: set[str] = set() done_lock = asyncio.Lock() num_workers = 50 class TooManyRequestsException(Exception): pass class PackagePOM: def __init__(self, package: 'Package', pom: str): logger.debug(f'{package}: Parsing POM') self._package = package self.raw_root = ET.fromstring(pom) if (packaging := self.raw_root.find('packaging', ns)) is not None: self.packaging = packaging.text else: self.packaging = '??' self.is_bom = self.packaging == 'pom' if self.packaging == 'pom': self.packages = [package, *self.dependency_management] else: self.packages = [package] logger.debug(f'{package}: POM parsed') def get_property(self, prop: str): elem = self.raw_root.find(f'.//properties/{prop}', ns) if elem is not None: return elem.text else: return None def _package_from_xml_dep(self, dep: ET.Element) -> 'Package': def lookup_prop(match) -> str: prop = match.group(1) if prop == 'project.groupId': value = str(self._package.groupId) elif prop == 'project.artifactId': value = str(self._package.artifactId) elif prop == 'project.version': value = str(self._package.version) else: value = prop_replace(self.get_property(prop)) logger.debug(f'{self._package}: Replacing property {prop} with {value}') return value def prop_replace(text) -> str: return re.sub( r'\$\{([^\}]*)\}', lookup_prop, text, ) return Package( *[ prop_replace( elem.text or '' if (elem := dep.find(tag, ns)) is not None else '', ) for tag in [ 'groupId', 'artifactId', 'version', ] ] ) @property def dependency_management(self) -> list['Package']: dependencies: list[Package] = [] for dep in self.raw_root.find('dependencyManagement/dependencies', ns) or []: package = self._package_from_xml_dep(dep) dependencies.append(package) return dependencies class Package: _pom: PackagePOM | None = None _verified: bool = False def __init__(self, groupId: str, artifactId: str, version: str | None = None, implicit: bool = False): self.groupId = groupId self.artifactId = artifactId self.version = version self.implicit = implicit def __str__(self) -> str: return f'{self.groupId}:{self.artifactId}:{self.version or "----"}' def __eq__(self, other) -> bool: return ( self.groupId == other.groupId and self.artifactId == other.artifactId and self.version == other.version ) def __hash__(self) -> int: return hash((self.groupId, self.artifactId, self.version)) @property def dir_path(self): group_path = self.groupId.replace(".", "/") return f'{group_path}/{self.artifactId}/{self.version}' @property def base_filename(self): return f'{self.artifactId}-{self.version}' async def download_file(self, extension): filepath = f'{self.dir_path}/{self.base_filename}.{extension}' async with aiohttp.ClientSession() as session: for mirror in mirrors: pom_url = f'{mirror}/{filepath}' logger.debug(f'{self}: Downloading {extension} from {pom_url}') async with session.get(pom_url) as response: if response.status == 200: logger.debug(f'{self}: {extension} downloaded') return await response.text() break elif response.status == 429: raise TooManyRequestsException() else: logger.error(f'{self}: HTTP error {response.status} from mirror {mirror}') else: logger.warning(f'{self}: File download of {extension} failed for all mirrors') return None @property async def pom(self) -> PackagePOM: if self._pom is not None: return self._pom if self.version is None: await self._query_maven() self._pom = PackagePOM(self, await self.download_file('pom')) return self._pom @property def _urlquery(self) -> str: q = f'g:{self.groupId}+AND+a:{self.artifactId}' if self.version is not None: q += f'+AND+v:{self.version}' return q async def _query_maven(self) -> None: url = f'{baseurl}/solrsearch/select?q={self._urlquery}&rows=1&wt=json' logger.debug(f'{self}: Querying maven at url {url}') async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: message = await response.json() num = message['response']['numFound'] if num: logger.debug(f'{self}: Query successful') self._verified = True if self.version is None: version = message['response']['docs'][0]['latestVersion'] self.version = version logger.debug(f'{self}: Using newest version {version}') else: if self.implicit: logger.debug(f'{self}: No matching packages found') else: logger.warning(f'{self}: No matching packages found') self._verified = False elif response.status == 429: raise TooManyRequestsException() else: self._verified = False logger.error(f'{self}: HTTP error {response.status} downloading pom') async def verify(self) -> bool: if not self._verified: await self._query_maven() return self._verified def load_package_list(list_path: Path, queue: asyncio.Queue) -> None: logger.info(f'Parsing {list_path}') with list_path.open('r') as f: for line in f.readlines(): sections = line.strip().split(':') if len(sections) < 2 or len(sections) > 3: logger.warning(f'Invalid package format "{line}". It should be "groupID:artifactID" or "groupID:artifactID:version"') continue package = Package( sections[0], sections[1], sections[2] if len(sections) == 3 else None, ) queue.put_nowait(package) if not package.artifactId.endswith('-jvm'): queue.put_nowait( Package( package.groupId, f'{package.artifactId}-jvm', package.version, True, ) ) async def download(package: Package, queue: asyncio.Queue) -> None: async with done_lock: skip = str(package) in done if skip: logger.info(f'{package}: Already downloaded. Skipping.') elif await package.verify(): pom = await package.pom if pom: logger.info(f'{package}: Done') async with done_lock: for p in pom.packages: if not p.version: logger.warning(f'{p}: No version found!') logger.debug(f'{p}: Adding from BOM') done.add(str(p)) else: logger.warning(f'{package}: No POM for package') async def worker(queue: asyncio.Queue) -> None: while True: package = await queue.get() while True: try: await download(package, queue) break except TooManyRequestsException: logger.debug('Too many requests. Delaying next attempt') await asyncio.sleep(3*random.random()) queue.task_done() async def main() -> None: queue: asyncio.Queue = asyncio.Queue() tasks = [] load_package_list(Path('package-list.txt'), queue) logger.debug(f'Starting {num_workers} workers') for i in range(num_workers): tasks.append( asyncio.create_task( worker(queue) ) ) await queue.join() logger.debug('Queue is empty. Cancelling workers') for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) logger.info('Generating list of all packages') async with done_lock: with open(output_path, 'w') as f: for p in sorted(done): f.write(p + '\n') logger = logging.getLogger(__name__) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-w', '--workers', type=int, default=num_workers) parser.add_argument('-v', '--verbose', dest='verbosity', action='count', default=0) parser.add_argument('-o', '--output', type=Path, default=Path('full-package-list.txt')) args = parser.parse_args() if args.verbosity == 0: log_level = 'WARNING' elif args.verbosity == 1: log_level = 'INFO' else: log_level = 'DEBUG' logging.basicConfig(level=log_level) num_workers = args.workers output_path = args.output asyncio.run(main())