|
- #!/bin/python3
-
- import re
- import copy
- import random
- import argparse
- import logging
- import asyncio
- import subprocess
- import copy
- import aiohttp
- from pathlib import Path
- from xml.etree import ElementTree as ET
-
-
- ns = {'': 'http://maven.apache.org/POM/4.0.0'}
- ET.register_namespace('', ns[''])
-
- baseurl = 'https://search.maven.org'
- base_pom_path = Path('poms')
- mirrors = [
- "https://repo.maven.apache.org/maven2",
- "https://repo1.maven.org/maven2",
- "https://oss.sonatype.org/content/repositories/snapshots",
- "https://packages.confluent.io/maven",
- "https://registry.quarkus.io/maven",
- "https://plugins.gradle.org/m2",
- ]
-
- done: set[str] = set()
- done_lock = asyncio.Lock()
- in_progress: set[str] = set()
- in_progress_lock = asyncio.Lock()
- num_workers = 50
-
- global_properties: dict[str, dict[str, str]] = {}
-
-
- class TooManyRequestsException(Exception):
- pass
-
-
- class PackageError(Exception):
- pass
-
- class WaitForPackage(Exception):
- def __init__(self, package):
- self.package = package
-
-
- class PackagePOM:
- def __init__(self, package: 'Package', pom: str):
- self._package = package
-
- logger.debug(f'{package}: Parsing POM')
- self.raw_root = ET.fromstring(pom)
-
- self.parent: Package | None = None
-
- if (parent_tag := self.raw_root.find('parent', ns)) is not None:
- parent_group_tag = parent_tag.find('groupId', ns)
- parent_artifact_tag = parent_tag.find('artifactId', ns)
- parent_version_tag = parent_tag.find('version', ns)
- parent_group = parent_group_tag.text if parent_group_tag is not None else None
- parent_artifact = parent_artifact_tag.text if parent_artifact_tag is not None else None
- parent_version = parent_version_tag.text if parent_version_tag is not None else None
-
- logger.debug(f'{package}: Parsing parent {parent_group}:{parent_artifact}:{parent_version}')
-
- if parent_group is not None and parent_artifact is not None and parent_version is not None:
- parent = Package(
- parent_group,
- parent_artifact,
- parent_version,
- )
-
- if str(parent) in done:
- self.parent = parent
- else:
- raise WaitForPackage(parent)
- else:
- raise PackageError(f'Invalid parent {parent_group}:{parent_artifact}:{parent_version}')
-
- logger.debug(f'{package}: Parsing properties')
- parent_props: dict[str, str] = {} if self.parent is None else global_properties[str(self.parent)]
- self.properties = self.resolve_props(parent_props)
- global_properties[str(package)] = self.properties
-
- logger.debug(f'{package}: Parsing packaging')
- if (packaging := self.raw_root.find('packaging', ns)) is not None:
- self.packaging = packaging.text
- else:
- self.packaging = '??'
-
- self.is_bom = self.packaging == 'pom'
-
- if self.packaging == 'pom':
- root_copy = copy.deepcopy(self.raw_root)
- dependencies = root_copy.find('dependencies', ns) or ET.SubElement(root_copy, 'dependencies')
-
- depman = root_copy.find('dependencyManagement', ns)
- if depman is not None:
- tmp_deps = depman.findall('dependencies/*', ns)
- dependencies.extend(tmp_deps)
- root_copy.remove(depman)
-
- tmpGroupId = f'tmp.{package.groupId}'
- tmpArtifactId = f'placeholder.{package.artifactId}'
- tmpVersion = package.version
-
- if (groupId := root_copy.find('groupId', ns)) is not None:
- groupId.text = tmpGroupId
- else:
- logger.info(f"{package}: Inserting new groupId tag in pom")
- ET.SubElement(root_copy, 'groupId').text = tmpGroupId
-
- if (artifactId := root_copy.find('artifactId', ns)) is not None:
- artifactId.text = tmpArtifactId
- else:
- logger.info(f"{package}: Inserting new artifactId tag in pom")
- ET.SubElement(root_copy, 'artifactId').text = tmpArtifactId
-
- if (version := root_copy.find('version', ns)) is not None:
- version.text = tmpVersion
- else:
- logger.info(f"{package}: Inserting new version tag in pom")
- ET.SubElement(root_copy, 'version').text = tmpVersion
-
- # Add a dependency for the pom itself
- self_dep = ET.SubElement(dependencies, 'dependency')
- ET.SubElement(self_dep, 'groupId').text = package.groupId
- ET.SubElement(self_dep, 'artifactId').text = package.artifactId
- ET.SubElement(self_dep, 'version').text = package.version
-
- self.generated_root = root_copy
- else:
- self.generated_root = ET.fromstring(
- f"""
- <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- https://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-
- <modelVersion>4.0.0</modelVersion>
- <groupId>tmp.{package.groupId}</groupId>
- <artifactId>placeholder-{package.artifactId}</artifactId>
- <version>{package.version}</version>
- <name>Package {package.artifactId}</name>
-
- <dependencies>
- <dependency>
- <groupId>{package.groupId}</groupId>
- <artifactId>{package.artifactId}</artifactId>
- <version>{package.version}</version>
- </dependency>
- </dependencies>
- </project>
- """
- )
-
- logger.debug(f'{package}: POM parsed')
-
- def write(self, f):
- tree = ET.ElementTree(self.generated_root)
- ET.indent(tree)
- tree.write(f)
-
-
- def resolve_props(self, initial: dict[str, str]):
- props = initial
-
- for prop_tag in self.raw_root.findall('.//properties/*', ns):
- prop = prop_tag.tag.replace(f'{{{ns[""]}}}', '')
- value = prop_tag.text if prop_tag.text is not None else ''
- logger.debug(f'{self._package}: Setting prop {prop}={value}')
- props[prop] = value
-
- changed = True
- while changed:
- changed = False
-
- for prop, value in props.items():
- new_value = self.prop_replace(value, props)
-
- if new_value != value:
- changed = True
- logger.debug(f'{self._package}: Setting prop {prop}={new_value}')
- props[prop] = new_value
-
- return props
-
- def prop_replace(self, text, props: dict[str, str] | None = None) -> str:
- def lookup_prop(match) -> str:
- prop = match.group(1)
-
- if prop == 'project.groupId':
- value = str(self._package.groupId)
- elif prop == 'project.artifactId':
- value = str(self._package.artifactId)
- elif prop == 'project.version':
- value = str(self._package.version)
- elif prop.startswith('project.build') or prop.startswith('env.') or prop.startswith('maven.'):
- value = ''
- else:
- try:
- value = props[prop] if props is not None else self.properties[prop]
- except KeyError:
- logger.error(f'{self._package}: Could not find property {prop}. Setting it to ""')
- value = ''
-
- logger.debug(f'{self._package}: Replacing property {prop} with {value}')
- return value
-
- return re.sub(
- r'\$\{([^\}]*)\}',
- lookup_prop,
- text,
- )
-
- def _package_from_xml_dep(self, dep: ET.Element) -> 'Package':
- def prop_replace_tag(tag) -> str:
- return self.prop_replace(
- elem.text or '' if (elem := dep.find(tag, ns)) is not None else '',
- )
-
- return Package(
- groupId=prop_replace_tag('groupId'),
- artifactId=prop_replace_tag('artifactId'),
- version=prop_replace_tag('version'),
- )
-
- @property
- def dependency_management(self) -> list['Package']:
- dependencies: list[Package] = []
-
- for dep in self.raw_root.find('dependencyManagement/dependencies', ns) or []:
- package = self._package_from_xml_dep(dep)
- dependencies.append(package)
-
- return dependencies
-
-
- class Package:
- _pom: PackagePOM | None = None
- _verified: bool = False
-
- def __init__(self, groupId: str, artifactId: str, version: str | None = None, implicit: bool = False):
- self.groupId = groupId
- self.artifactId = artifactId
- self.version = version
- self.implicit = implicit
-
- def __str__(self) -> str:
- return f'{self.groupId}:{self.artifactId}:{self.version or "----"}'
-
- def __eq__(self, other) -> bool:
- return (
- self.groupId == other.groupId
- and self.artifactId == other.artifactId
- and self.version == other.version
- )
-
- def __hash__(self) -> int:
- return hash((self.groupId, self.artifactId, self.version))
-
- @property
- def dir_path(self):
- group_path = self.groupId.replace(".", "/")
- return f'{group_path}/{self.artifactId}/{self.version}'
-
- @property
- def base_filename(self):
- return f'{self.artifactId}-{self.version}'
-
- async def download_file(self, extension):
- filepath = f'{self.dir_path}/{self.base_filename}.{extension}'
-
- async with aiohttp.ClientSession() as session:
- for mirror in mirrors:
- pom_url = f'{mirror}/{filepath}'
- logger.debug(f'{self}: Downloading {extension} from {pom_url}')
-
- async with session.get(pom_url) as response:
- if response.status == 200:
- logger.debug(f'{self}: {extension} downloaded')
- return await response.text()
- break
- elif response.status == 429:
- raise TooManyRequestsException()
- else:
- logger.debug(f'{self}: HTTP error {response.status} from mirror {mirror}')
- else:
- logger.warning(f'{self}: File download of {extension} failed for all mirrors')
- return None
-
- @property
- async def pom(self) -> PackagePOM:
- if self._pom is not None:
- return self._pom
-
- if self.version is None:
- await self._query_maven()
-
- self._pom = PackagePOM(self, await self.download_file('pom'))
-
- return self._pom
-
- @property
- def _urlquery(self) -> str:
- q = f'g:{self.groupId}+AND+a:{self.artifactId}'
-
- if self.version is not None:
- q += f'+AND+v:{self.version}'
-
- return q
-
- async def _query_maven(self) -> None:
- url = f'{baseurl}/solrsearch/select?q={self._urlquery}&rows=1&wt=json'
- logger.debug(f'{self}: Querying maven at url {url}')
-
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as response:
- if response.status == 200:
- message = await response.json()
- num = message['response']['numFound']
-
- if num:
- logger.debug(f'{self}: Query successful')
- self._verified = True
- if self.version is None:
- version = message['response']['docs'][0]['latestVersion']
- logger.debug(f'{self}: Using newest version {version}')
- self.version = version
- else:
- if self.implicit:
- logger.debug(f'{self}: No matching packages found')
- else:
- logger.warning(f'{self}: No matching packages found')
-
- self._verified = False
- elif response.status == 429:
- raise TooManyRequestsException()
- else:
- self._verified = False
- logger.warning(f'{self}: HTTP error {response.status} downloading pom')
-
- async def verify(self) -> bool:
- if not self._verified:
- await self._query_maven()
- return self._verified
-
-
- def load_package_list(list_path: Path, queue: asyncio.Queue) -> None:
- logger.info(f'Parsing {list_path}')
-
- with list_path.open('r') as f:
- for line in f.readlines():
- sections = line.strip().split(':')
-
- if len(sections) < 2 or len(sections) > 3:
- logger.warning(f'Invalid package format "{line}". It should be "groupID:artifactID" or "groupID:artifactID:version"')
- continue
-
- package = Package(
- sections[0],
- sections[1],
- sections[2] if len(sections) == 3 else None,
- )
-
- queue.put_nowait(package)
-
- if not package.artifactId.endswith('-jvm'):
- queue.put_nowait(
- Package(
- package.groupId,
- f'{package.artifactId}-jvm',
- package.version,
- True,
- )
- )
-
- async def download(package: Package, queue: asyncio.Queue) -> None:
- async with done_lock:
- is_done = str(package) in done
-
- async with in_progress_lock:
- is_in_progress = str(package) in in_progress
-
- if is_done:
- logger.info(f'{package}: Already downloaded. Skipping.')
- elif is_in_progress:
- logger.info(f'{package}: Already in progress. Skipping.')
- else:
- async with in_progress_lock:
- in_progress.add(str(package))
-
- for _ in range(50):
- try:
- verified = await package.verify()
- break
- except TooManyRequestsException:
- logger.info('Too many requests. Delaying next attempt')
- await asyncio.sleep(3*random.random() + 0.2)
- else:
- logger.error(f'{package}: Verification failed after 50 tries')
- exit(1)
-
- if verified:
- while True:
- try:
- pom = await package.pom
- break
- except TooManyRequestsException:
- logger.info('Too many requests. Delaying next attempt')
- await asyncio.sleep(3*random.random() + 0.2)
- else:
- logger.error(f'{package}: Verification failed after 50 tries')
- exit(1)
-
- if not pom:
- logger.warn(f'{package}: No pom')
- return
-
- pom_dir = base_pom_path / f'{package.groupId}-{package.artifactId}-{package.version}'
- pom_path = pom_dir / 'pom.xml'
-
- pom_dir.mkdir(exist_ok=True)
- pom.write(pom_path)
- logger.info(f'{package}: Downloaded')
-
- if not pom.is_bom:
- for dep in pom.dependency_management:
- logger.info(f'{package}: Handling transitive dependency {dep}')
- await queue.put(dep)
-
- async with done_lock:
- logger.debug(f'{package}: Marking done')
- p = copy.copy(package)
- p.version = None
- done.add(str(package))
- done.add(str(p))
-
- async with in_progress_lock:
- if str(package) in in_progress:
- in_progress.remove(str(package))
- else:
- p = copy.copy(package)
- p.version = None
- if str(p) in in_progress:
- in_progress.remove(str(p))
- else:
- logger.warning(f'{package}: Package is done, but not marked as in progress')
-
- async def worker(queue: asyncio.Queue) -> None:
- while True:
- package = await queue.get()
-
- while True:
- try:
- await download(package, queue)
- break
- except WaitForPackage as e:
- logger.info(f'{package}: Waiting for {e.package}')
- await queue.put(e.package)
- await queue.put(package)
- break
- except PackageError:
- logger.exception(f'{package}: Error while processing package')
- break
- except Exception:
- logger.exception(f'{package}: Unknown error while processing package')
- logger.error(global_properties)
- break
-
-
- queue.task_done()
-
-
- async def main() -> None:
- queue: asyncio.Queue = asyncio.Queue()
- tasks = []
-
- load_package_list(Path('package-list.txt'), queue)
-
- logger.debug(f'Starting {num_workers} workers')
- for i in range(num_workers):
- tasks.append(
- asyncio.create_task(
- worker(queue)
- )
- )
-
- await queue.join()
-
- logger.debug('Queue is empty. Cancelling workers')
- for task in tasks:
- task.cancel()
-
- await asyncio.gather(*tasks, return_exceptions=True)
-
- logger.info('Generating master POM')
- subprocess.call(['sh', 'generate_master_pom.sh'])
-
-
- logger = logging.getLogger(__name__)
-
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('-w', '--workers', type=int, default=num_workers)
- parser.add_argument('-v', '--verbose', dest='verbosity', action='count', default=0)
- args = parser.parse_args()
-
- if args.verbosity == 0:
- log_level = 'WARNING'
- elif args.verbosity == 1:
- log_level = 'INFO'
- else:
- log_level = 'DEBUG'
-
- logging.basicConfig(level=log_level)
-
- num_workers = args.workers
-
- asyncio.run(main())
|