#!/bin/python3 import re import argparse import logging import asyncio import subprocess import copy import aiohttp from pathlib import Path from xml.etree import ElementTree as ET ET.register_namespace('', 'http://maven.apache.org/POM/4.0.0') baseurl = 'https://search.maven.org' base_pom_path = Path('poms') mirrors = [ "https://repo.maven.apache.org/maven2", "https://repo1.maven.org/maven2", "https://oss.sonatype.org/content/repositories/snapshots", "https://packages.confluent.io/maven", "https://registry.quarkus.io/maven", "https://plugins.gradle.org/m2", ] done: set[str] = set() done_lock = asyncio.Lock() num_workers = 50 class PackagePOM: _dependencyManagement: list['Package'] = None def __init__(self, package: 'Package', pom: str): logger.debug(f'{package}: Parsing POM') self.raw_root = ET.fromstring(pom) packaging = self.raw_root.find('packaging') self.is_bom = True if packaging is not None and packaging.text == 'pom' else False if self.is_bom: root_copy = copy.deepcopy(self.raw_root) depman = root_copy.find('dependencyManagement') root_copy.extend(depman.findall('*')) root_copy.remove(depman) self.generated_root = root_copy else: self.generated_root = ET.fromstring( f""" 4.0.0 tmp.{package.groupId} placeholder-{package.artifactId} {package.version} Package {package.artifactId} {package.groupId} {package.artifactId} {package.version} """ ) def write(self, f): tree = ET.ElementTree(self.generated_root) ET.indent(tree) tree.write(f) def get_property(self, prop: str): elem = self.raw_root.find(f'.//properties/{prop}') if elem is not None: return elem.text else: return None @property def dependencyManagement(self) -> list['Package']: if self._dependencyManagement is not None: return self._dependencyManagement self._dependencyManagement = [] def prop_replace(match): prop = match.group(1) value = self.get_property(match.group(1)) logger.debug(f'Replacing property {prop} with {value}') return value for dep in self.raw_root.find('dependencyManagement/dependencies') or []: package = Package( *[ re.sub( r'\$\{([^\}]*)\}', prop_replace, dep.find(tag).text, ) for tag in [ 'groupId', 'artifactId', 'version', ] ] ) self._dependencyManagement.append(package) return self._dependencyManagement class Package: _pom: PackagePOM = None _verified: bool = False def __init__(self, groupId: str, artifactId: str, version: str = None): self.groupId = groupId self.artifactId = artifactId self.version = version def __str__(self) -> str: return f'{self.groupId}:{self.artifactId}:{self.version or "----"}' def __eq__(self, other) -> bool: return ( self.groupId == other.groupId and self.artifactId == other.artifactId and self.version == other.version ) def __hash__(self) -> str: return hash((self.groupId, self.artifactId, self.version)) @property async def pom(self) -> ET: if self._pom is not None: return self._pom if self.version is None: self._query_maven() group_path = self.groupId.replace(".", "/") pom_path = f'{self.artifactId}-{self.version}.pom' filepath = f'{group_path}/{self.artifactId}/{self.version}/{pom_path}' async with aiohttp.ClientSession() as session: for mirror in mirrors: pom_url = f'{mirror}/{filepath}' logger.debug(f'{self}: Downloading pom from {pom_url}') async with session.get(pom_url) as response: if response.status == 200: logger.debug(f'{self}: POM downloaded') self._pom = PackagePOM(self, await response.text()) break else: logger.debug(f'{self}: HTTP error {response.status} from mirror {mirror}') else: logger.warning(f'{self}: Failed for all mirrors') return self._pom @property def _urlquery(self) -> str: q = f'g:{self.groupId}+AND+a:{self.artifactId}' if self.version is not None: q += f'+AND+v:{self.version}' return q async def _query_maven(self) -> None: url = f'{baseurl}/solrsearch/select?q={self._urlquery}&rows=1&wt=json' logger.debug(f'{self}: Querying maven at url {url}') async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: message = await response.json() num = message['response']['numFound'] if num: logger.debug(f'{self}: Query successful') self._verified = True if self.version is None: version = message['response']['docs'][0]['latestVersion'] logger.debug(f'{self}: Using newest version {version}') self.version = version else: logger.warning(f'{self}: No matching packages found') self._verified = False else: self._verified = False logger.warning(f'{self}: HTTP error {response.status} downloading pom') async def verify(self) -> bool: if not self._verified: await self._query_maven() return self._verified def load_package_list(list_path: Path, queue: asyncio.Queue) -> None: logger.info(f'Parsing {list_path}') with list_path.open('r') as f: for line in f.readlines(): sections = line.strip().split(':') if len(sections) < 2 or len(sections) > 3: logger.warning(f'Invalid package format "{line}". It should be "groupID:artifactID" or "groupID:artifactID:version"') continue package = Package( sections[0], sections[1], sections[2] if len(sections) == 3 else None, ) queue.put_nowait(package) async def download(package: Package, queue: asyncio.Queue) -> None: async with done_lock: skip = str(package) in done if skip: logger.info(f'{package}: Already downloaded. Skipping.') elif await package.verify(): async with done_lock: done.add(str(package)) pom_dir = base_pom_path / str(package) pom_path = pom_dir / 'pom.xml' pom_dir.mkdir(exist_ok=True) pom = await package.pom if not pom: return pom.write(pom_path) logger.info(f'{package}: Downloaded') if not pom.is_bom: for dep in pom.dependencyManagement: logger.info(f'{package}: Handling transitive dependency {dep}') await queue.put(dep) else: logger.warning(f'{package}: Package not found. Check package name and internet connection') async def worker(queue: asyncio.Queue) -> None: while True: package = await queue.get() await download(package, queue) queue.task_done() async def main() -> None: queue = asyncio.Queue() tasks = [] load_package_list(Path('package-list.txt'), queue) logger.debug(f'Starting {num_workers} workers') for i in range(num_workers): tasks.append( asyncio.create_task( worker(queue) ) ) await queue.join() logger.debug('Queue is empty. Cancelling workers') for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) logger.info('Generating master POM') subprocess.call(['sh', 'generate_master_pom.sh']) logger = logging.getLogger(__name__) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-w', '--workers', type=int, default=num_workers) parser.add_argument('-v', '--verbose', dest='verbosity', action='count', default=0) args = parser.parse_args() if args.verbosity == 0: log_level = 'WARNING' elif args.verbosity == 1: log_level = 'INFO' else: log_level = 'DEBUG' logging.basicConfig(level=log_level) num_workers = args.workers asyncio.run(main())