#!/bin/python3 import re import argparse import logging import subprocess import json import copy from urllib import request from urllib.error import HTTPError from pathlib import Path from xml.etree import ElementTree as ET ET.register_namespace('', 'http://maven.apache.org/POM/4.0.0') baseurl = 'https://search.maven.org' class PackagePOM: _dependencyManagement: list['Package'] = None def __init__(self, package: 'Package', pom: str): logger.debug(f'{package}: Parsing POM') self.raw_root = ET.fromstring(pom) packaging = self.raw_root.find('packaging') self.is_bom = True if packaging is not None and packaging.text == 'pom' else False if self.is_bom: root_copy = copy.deepcopy(self.raw_root) depman = root_copy.find('dependencyManagement') root_copy.extend(depman.findall('*')) root_copy.remove(depman) self.generated_root = root_copy else: self.generated_root = ET.fromstring( f""" 4.0.0 tmp.{package.groupId} placeholder-{package.artifactId} {package.version} Package {package.artifactId} {package.groupId} {package.artifactId} {package.version} """ ) def write(self, f): tree = ET.ElementTree(self.generated_root) ET.indent(tree) tree.write(f) def get_property(self, prop: str): elem = self.raw_root.find(f'.//properties/{prop}') import pdb; pdb.set_trace() if elem is not None: return elem.text else: return None @property def dependencyManagement(self) -> list['Package']: if self._dependencyManagement is not None: return self._dependencyManagement self._dependencyManagement = [] def prop_replace(match): prop = match.group(1) value = self.get_property(match.group(1)) logger.debug(f'Replacing property {prop} with {value}') return value for dep in self.raw_root.find('dependencyManagement/dependencies') or []: package = Package( *[ re.sub( r'\$\{([^\}]*)\}', prop_replace, dep.find(tag).text, ) for tag in [ 'groupId', 'artifactId', 'version', ] ] ) self._dependencyManagement.append(package) return self._dependencyManagement class Package: _pom: PackagePOM = None _verified: bool = False def __init__(self, groupId: str, artifactId: str, version: str = None): self.groupId = groupId self.artifactId = artifactId self.version = version def __str__(self) -> str: return f'{self.groupId}:{self.artifactId}:{self.version or "----"}' def __eq__(self, other) -> bool: return ( self.groupId == other.groupId and self.artifactId == other.artifactId and self.version == other.version ) def __hash__(self) -> str: return hash((self.groupId, self.artifactId, self.version)) @property def pom(self) -> ET: if self._pom is not None: return self._pom if self.version is None: self._query_maven() group_path = self.groupId.replace(".", "/") pom_path = f'{self.artifactId}-{self.version}.pom' filepath = f'{group_path}/{self.artifactId}/{self.version}/{pom_path}' pom_url = f'{baseurl}/remotecontent?filepath={filepath}' logger.debug(f'{self}: Downloading pom from {pom_url}') try: response = request.urlopen(pom_url) except HTTPError as e: logger.warning(f'{self}: HTTP error downloading pom') logger.debug(e) return None status = response.status if status == 200: logger.debug(f'{self}: POM downloaded') self._pom = PackagePOM(self, response.read()) else: logger.warning(f'{self}: HTTP error {status} downloading pom') return self._pom @property def _urlquery(self) -> str: q = f'g:{self.groupId}+AND+a:{self.artifactId}' if self.version is not None: q += f'+AND+v:{self.version}' return q def _query_maven(self) -> None: url = f'{baseurl}/solrsearch/select?q={self._urlquery}&rows=1&wt=json' logger.debug(f'{self}: Querying maven at url {url}') response = request.urlopen(url) status = response.status if status == 200: message = json.loads(response.read()) num = message['response']['numFound'] if num: logger.debug(f'{self}: Query successful') self._verified = True if self.version is None: version = message['response']['docs'][0]['latestVersion'] logger.info(f'{self}: Using newest version {version}') self.version = version else: logger.warning(f'{self}: No matching packages found') self._verified = False else: self._verified = False logger.warning(f'{self}: HTTP error {status} downloading pom') def verify(self) -> bool: if not self._verified: self._query_maven() return self._verified def load_package_list(list_path: Path) -> list[Package]: packages = [] logger.info(f'Parsing {list_path}') with list_path.open('r') as f: for line in f.readlines(): sections = line.strip().split(':') if len(sections) < 2 or len(sections) > 3: logger.warning(f'Invalid package format "{line}". It should be "groupID:artifactID" or "groupID:artifactID:version"') continue query = Package( sections[0], sections[1], sections[2] if len(sections) == 3 else None, ) packages.append(query) return packages def download(base_path: Path, package: Package, done: [str]) -> None: if str(package) in done: logger.info(f'{package}: Already downloaded. Skipping.') elif package.verify(): pom_dir = base_path / str(package) pom_path = pom_dir / 'pom.xml' pom_dir.mkdir(exist_ok=True) if not package.pom: return package.pom.write(pom_path) done.append(str(package)) logger.info(f'{package}: Downloaded') if not package.pom.is_bom: for dep in package.pom.dependencyManagement: logger.info(f'{package}: Handling transitive dependency {dep}') download(base_path, dep, done) else: logger.warning(f'{package}: Package not found. Check package name and internet connection') def main() -> None: packages = load_package_list(Path('package-list.txt')) base_pom_path = Path('poms') done = [] for package in packages: download(base_pom_path, package, done) subprocess.call(['sh', 'generate_master_pom.sh']) logger = logging.getLogger(__name__) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-v', '--verbose', dest='verbosity', action='count', default=0) args = parser.parse_args() if args.verbosity == 0: log_level = 'WARNING' elif args.verbosity == 1: log_level = 'INFO' else: log_level = 'DEBUG' logging.basicConfig(level=log_level) main()