From 46cb4f780c91d270d69cfc86516c1e8a6cc2ec42 Mon Sep 17 00:00:00 2001 From: Mattia Rizzolo Date: Sat, 8 Aug 2015 11:45:49 +0000 Subject: reproducible: scheduler: slight refactor to enable multiple architecture support This moves all the limits handling to a dedicated class and the data to a dedicated object, instead of embedding them into the scheduling code, where they are hard to find and to edit. --- TODO | 1 - bin/reproducible_scheduler.py | 260 ++++++++++++++++++++++++++++++------------ 2 files changed, 188 insertions(+), 73 deletions(-) diff --git a/TODO b/TODO index bb3dcecd..d1e5b0fa 100644 --- a/TODO +++ b/TODO @@ -281,7 +281,6 @@ properties: ==== reproducible Debian armhf * then: include armhf in index_scheduled -* then: armhf scheduling (only for testing for now, introduce "arch-factor", eg. amd64=10 and armhf=3, to schedule less on some archs, also limits (when to schedule) should be different for different archs) ** call_apt_update() needs to be moved from _scheduler.py to jobs so its run on all build nodes * then: armhf building - run the job on jenkins.d.n and build1 on one host and build2 on another and then run debbindiff on jenkins.d.n… * monitor their temperatures via munin via ssh: diff --git a/bin/reproducible_scheduler.py b/bin/reproducible_scheduler.py index 9a317eaa..9dc75ed8 100755 --- a/bin/reproducible_scheduler.py +++ b/bin/reproducible_scheduler.py @@ -26,6 +26,124 @@ from reproducible_html_indexes import generate_schedule from reproducible_html_packages import gen_packages_html from reproducible_html_packages import purge_old_pages + +LIMITS = { + 'untested': { + 'amd64': { + 'testing': {'*': 44}, + 'unstable': {'*': 44}, + 'experimental': {'*': 44}, + }, + 'armhf': { + 'testing': {'*': 44}, + 'unstable': {'*': 44}, + 'experimental': {'*': 44}, + }, + }, + 'new': { + 'amd64': { + 'testing': {1: (100, 25), 2: (200, 20), '*': 15}, + 'unstable': {1: (100, 25), 2: (200, 20), '*': 15}, + 'experimental': {1: (100, 25), 2: (200, 20), '*': 15}, + }, + 'armhf': { + 'testing': {1: (100, 25), 2: (200, 20), '*': 15}, + 'unstable': {1: (100, 25), 2: (200, 20), '*': 15}, + 'experimental': {1: (100, 25), 2: (200, 20), '*': 15}, + }, + }, + 'ftbfs': { + 'amd64': { + 'testing': {1: (250, 4), 2: (350, 2), '*': 0}, + 'unstable': {1: (250, 4), 2: (350, 2), '*': 0}, + 'experimental': {1: (250, 4), 2: (350, 2), '*': 0}, + }, + 'armhf': { + 'testing': {1: (250, 4), 2: (350, 2), '*': 0}, + 'unstable': {1: (250, 4), 2: (350, 2), '*': 0}, + 'experimental': {1: (250, 4), 2: (350, 2), '*': 0}, + } + }, + 'old': { + 'amd64': { + 'testing': {1: (300, 87.5), 2: (400, 62.5), '*': 0}, + 'unstable': {1: (300, 17.5), 2: (400, 12.5), '*': 0}, + 'experimental': {1: (300, 3.5), 2: (400, 2.5), '*': 0}, + }, + 'armhf': { + 'testing': {1: (300, 87.5), 2: (400, 62.5), '*': 0}, + 'unstable': {1: (300, 17.5), 2: (400, 12.5), '*': 0}, + 'experimental': {1: (300, 3.5), 2: (400, 2.5), '*': 0}, + } + } +} + +# define an "arch factor", so the scheduling limits differ between archs +ARCH_SHARE = { + 'amd64': 10, + 'armhf': 3, +} + + +class Limit: + def __init__(self, arch, queue): + self.arch = arch + self.queue = queue + + def _get_arch_multiplier(self): + try: + return ARCH_SHARE[self.arch] + except KeyError: + log.error('No arch factor defined for %s, returning 1') + return 1 + + def _get_level(self, stage): + try: + return LIMITS[self.queue][self.arch][self.suite][stage][0] + except KeyError: + log.error('No limit defined for the %s queue on %s/%s stage %s. ' + 'Returning 1', self.queue, self.suite, self.arch, stage) + return 1 + except IndexError: + log.critical('The limit is not in the format "(level, limit)". ' + 'I can\'t guess what you want, giving up') + sys.exit(1) + + def get_level(self, stage): + return int(self._get_level(stage) * self._get_arch_multiplier()) + + def _get_limit(self, stage): + try: + limit = LIMITS[self.queue][self.arch][self.suite][stage] + limit = limit[1] + except KeyError: + log.error('No limit defined for the %s queue on %s/%s stage %s. ' + 'Returning 1', self.queue, self.suite, self.arch, stage) + return 1 + except IndexError: + log.critical('The limit is not in the format "(level, limit)". ' + 'I can\'t guess what you want, giving up') + sys.exit(1) + except TypeError: + # this is the case of the default target + if isinstance(limit, int): + pass + else: + raise + return limit + + def get_limit(self, stage): + return int(self._get_limit(stage) * self._get_arch_multiplier()) + + def get_staged_limit(self, current_total): + if current_total <= self.get_level(1): + return self.get_limit(1) + elif current_total <= self.get_level(2): + return self.get_limit(2) + else: + return self.get_limit('*') + + def call_apt_update(suite): # try three times, before failing the job for i in [1, 2, 3]: @@ -178,7 +296,7 @@ def queue_packages(all_pkgs, packages, date): def schedule_packages(packages): pkgs = ((x, packages[x]) for x in packages) - log.debug('IDs about to be scheduled: ' + str(x[0] for x in pkgs)) + log.debug('IDs about to be scheduled: ' + str(packages.keys())) query = 'INSERT INTO schedule ' + \ '(package_id, date_scheduled, date_build_started) ' + \ 'VALUES (?, ?, "")' @@ -194,32 +312,32 @@ def add_up_numbers(package_type): return package_type_sum -def query_untested_packages(suite, limit): +def query_untested_packages(suite, arch, limit): criteria = 'not tested before, randomly sorted' query = """SELECT DISTINCT sources.id, sources.name FROM sources - WHERE sources.suite='{suite}' + WHERE sources.suite='{suite}' AND sources.architecture='{arch}' AND sources.id NOT IN (SELECT schedule.package_id FROM schedule) AND sources.id NOT IN (SELECT results.package_id FROM results) ORDER BY random() - LIMIT {limit}""".format(suite=suite, limit=limit) + LIMIT {limit}""".format(suite=suite, arch=arch, limit=limit) packages = query_db(query) print_schedule_result(suite, criteria, packages) return packages -def query_new_versions(suite, limit): +def query_new_versions(suite, arch, limit): criteria = 'tested before, new version available, sorted by last build date' query = """SELECT DISTINCT s.id, s.name, s.version, r.version FROM sources AS s JOIN results AS r ON s.id = r.package_id - WHERE s.suite='{suite}' + WHERE s.suite='{suite}' AND s.architecture='{arch}' AND s.version != r.version AND r.status != 'blacklisted' AND s.id IN (SELECT package_id FROM results) AND s.id NOT IN (SELECT schedule.package_id FROM schedule) ORDER BY r.build_date - LIMIT {limit}""".format(suite=suite, limit=limit) + LIMIT {limit}""".format(suite=suite, arch=arch, limit=limit) pkgs = query_db(query) # this is to avoid costant rescheduling of packages in our exp repository packages = [(x[0], x[1]) for x in pkgs if version_compare(x[2], x[3]) > 0] @@ -227,45 +345,49 @@ def query_new_versions(suite, limit): return packages -def query_old_ftbfs_versions(suite, limit): +def query_old_ftbfs_versions(suite, arch, limit): criteria = 'status ftbfs, no bug filed, tested at least ten days ago, ' + \ 'no new version available, sorted by last build date' query = """SELECT DISTINCT s.id, s.name FROM sources AS s JOIN results AS r ON s.id = r.package_id JOIN notes AS n ON n.package_id=s.id - WHERE s.suite='{suite}' + WHERE s.suite='{suite}' AND s.architecture='{arch}' AND r.status = 'FTBFS' AND ( n.bugs = '[]' OR n.bugs IS NULL ) AND r.build_date < datetime('now', '-10 day') AND s.id NOT IN (SELECT schedule.package_id FROM schedule) ORDER BY r.build_date - LIMIT {limit}""".format(suite=suite, limit=limit) + LIMIT {limit}""".format(suite=suite, arch=arch, limit=limit) packages = query_db(query) print_schedule_result(suite, criteria, packages) return packages -def query_old_versions(suite, limit): +def query_old_versions(suite, arch, limit): criteria = 'tested at least two weeks ago, no new version available, ' + \ 'sorted by last build date' query = """SELECT DISTINCT s.id, s.name FROM sources AS s JOIN results AS r ON s.id = r.package_id - WHERE s.suite='{suite}' + WHERE s.suite='{suite}' AND s.architecture='{arch}' AND r.status != 'blacklisted' AND r.build_date < datetime('now', '-14 day') AND s.id NOT IN (SELECT schedule.package_id FROM schedule) ORDER BY r.build_date - LIMIT {limit}""".format(suite=suite, limit=limit) + LIMIT {limit}""".format(suite=suite, arch=arch, limit=limit) packages = query_db(query) print_schedule_result(suite, criteria, packages) return packages -def schedule_untested_packages(total): +def schedule_untested_packages(arch, total): packages = {} + limit = Limit(arch, 'untested') for suite in SUITES: - log.info('Requesting 444 untested packages in ' + suite + '...') - packages[suite] = query_untested_packages(suite, 444) + limit.suite = suite + many_untested = limit.get_limit('*') + log.info('Requesting %s untested packages in %s/%s...', + many_untested, suite, arch) + packages[suite] = query_untested_packages(suite, arch, many_untested) log.info('Received ' + str(len(packages[suite])) + ' untested packages in ' + suite + ' to schedule.') log.info('==============================================================') @@ -276,18 +398,17 @@ def schedule_untested_packages(total): return packages, msg -def schedule_new_versions(total): +def schedule_new_versions(arch, total): packages = {} - if total <= 100: - many_new = 250 - elif total <= 200: - many_new = 200 - else: - many_new = 150 + limit = Limit(arch, 'new') for suite in SUITES: - log.info('Requesting ' + str(many_new) + ' new versions in ' + suite + '...') - packages[suite] = query_new_versions(suite, many_new) - log.info('Received ' + str(len(packages[suite])) + ' new packages in ' + suite + ' to schedule.') + limit.suite = suite + many_new = limit.get_staged_limit(total) + log.info('Requesting %s new versions in %s/%s...', + many_new, suite, arch) + packages[suite] = query_new_versions(suite, arch, many_new) + log.info('Received ' + str(len(packages[suite])) + + ' new packages in ' + suite + ' to schedule.') log.info('==============================================================') if add_up_numbers(packages) != '0': msg = add_up_numbers(packages) + ' with new versions' @@ -296,20 +417,17 @@ def schedule_new_versions(total): return packages, msg -def schedule_old_ftbfs_versions(total): +def schedule_old_ftbfs_versions(arch, total): packages = {} - if total <= 250: - old_ftbfs = 42 - elif total <= 350: - old_ftbfs = 23 - else: - old_ftbfs = 0 + limit = Limit(arch, 'ftbfs') for suite in SUITES: - if suite == 'experimental': - old_ftbfs = 0 # experiemental rarely get's fixed over time... - log.info('Requesting ' + str(old_ftbfs) + ' old ftbfs packages in ' + suite + '...') - packages[suite] = query_old_ftbfs_versions(suite, old_ftbfs) - log.info('Received ' + str(len(packages[suite])) + ' old ftbfs packages in ' + suite + ' to schedule.') + limit.suite = suite + old_ftbfs = limit.get_staged_limit(total) + log.info('Requesting %s old ftbfs packages in %s/%s...', old_ftbfs, + suite, arch) + packages[suite] = query_old_ftbfs_versions(suite, arch, old_ftbfs) + log.info('Received ' + str(len(packages[suite])) + + ' old ftbfs packages in ' + suite + ' to schedule.') log.info('==============================================================') if add_up_numbers(packages) != '0': msg = add_up_numbers(packages) + ' ftbfs versions without bugs filed' @@ -318,24 +436,17 @@ def schedule_old_ftbfs_versions(total): return packages, msg -def schedule_old_versions(total): +def schedule_old_versions(arch, total): packages = {} - if total <= 300: - many_old_base = 35 # multiplied by 20 or 10 or 1, see below - elif total <= 400: - many_old_base = 25 # also... - else: - many_old_base = 0 # ... + limit = Limit(arch, 'old') for suite in SUITES: - if suite == 'unstable': - suite_many_old = int(many_old_base*5) # unstable changes the most and is most relevant ### was 20, lowered due to gcc5 transition - elif suite == 'testing': - suite_many_old = int(many_old_base*25) # re-schedule testing less than unstable as we care more more about unstable (atm) ### was 10, raised due to gcc5... - else: - suite_many_old = int(many_old_base) # experimental is roughly one twentieth of the size of the other suites - log.info('Requesting ' + str(suite_many_old) + ' old packages in ' + suite + '...') - packages[suite] = query_old_versions(suite, suite_many_old) - log.info('Received ' + str(len(packages[suite])) + ' old packages in ' + suite + ' to schedule.') + limit.suite = suite + many_old = limit.get_staged_limit(total) + log.info('Requesting %s old packages in %s/%s...', many_old, + suite, arch) + packages[suite] = query_old_versions(suite, arch, many_old) + log.info('Received ' + str(len(packages[suite])) + + ' old packages in ' + suite + ' to schedule.') log.info('==============================================================') if add_up_numbers(packages) != '0': msg = add_up_numbers(packages) + ' known versions' @@ -344,10 +455,11 @@ def schedule_old_versions(total): return packages, msg -def scheduler(): +def scheduler(arch): query = 'SELECT count(*) ' + \ - 'FROM schedule AS p JOIN sources AS s ON p.package_id=s.id ' - total = int(query_db(query)[0][0]) + 'FROM schedule AS p JOIN sources AS s ON p.package_id=s.id ' + \ + 'WHERE s.architecture="{arch}"' + total = int(query_db(query.format(arch=arch))[0][0]) log.info('Currently scheduled packages in all suites: ' + str(total)) if total > 750: generate_schedule() # from reproducible_html_indexes @@ -358,10 +470,10 @@ def scheduler(): log.info(str(total) + ' packages already scheduled' + ', scheduling some more...') log.info('==============================================================') - untested, msg_untested = schedule_untested_packages(total) - new, msg_new = schedule_new_versions(total+len(untested)) - old_ftbfs, msg_old_ftbfs = schedule_old_ftbfs_versions(total+len(untested)+len(new)) - old, msg_old = schedule_old_versions(total+len(untested)+len(new)+len(old_ftbfs)) + untested, msg_untested = schedule_untested_packages(arch, total) + new, msg_new = schedule_new_versions(arch, total+len(untested)) + old_ftbfs, msg_old_ftbfs = schedule_old_ftbfs_versions(arch, total+len(untested)+len(new)) + old, msg_old = schedule_old_versions(arch, total+len(untested)+len(new)+len(old_ftbfs)) now_queued_here = {} # make sure to schedule packages in unstable first @@ -373,7 +485,8 @@ def scheduler(): for suite in priotized_suite_order: query = 'SELECT count(*) ' + \ 'FROM schedule AS p JOIN sources AS s ON p.package_id=s.id ' + \ - 'WHERE s.suite="{suite}"'.format(suite=suite) + 'WHERE s.suite="{suite}" AND s.architecture="{arch}"' + query=query.format(suite=suite, arch=arch) now_queued_here[suite] = int(query_db(query)[0][0]) + \ len(untested[suite]+new[suite]+old[suite]) # schedule packages differently in the queue... @@ -388,7 +501,7 @@ def scheduler(): # update the scheduled page generate_schedule() # from reproducible_html_indexes # build the kgb message text - message = 'Scheduled in ' + '+'.join(SUITES) + ': ' + message = 'Scheduled in ' + '+'.join(SUITES) + ' (' + arch + '): ' if msg_untested: message += msg_untested message += ' and ' if msg_new and not msg_old_ftbfs and not msg_old else '' @@ -420,12 +533,15 @@ if __name__ == '__main__': call_apt_update(suite) update_sources(suite) purge_old_pages() - try: - overall = int(query_db('SELECT count(*) FROM schedule')[0][0]) - except: - overall = 9999 - if overall > 750: - log.info(str(overall) + ' packages already scheduled, nothing to do.') - sys.exit() - log.info(str(overall) + ' packages already scheduled, scheduling some more...') - scheduler() + query = 'SELECT count(*) ' + \ + 'FROM schedule AS p JOIN sources AS s ON s.id=p.package_id ' + \ + 'WHERE s.architecture="{}"' + for arch in ARCHS: + log.info('Scheduling for %s...', arch) + overall = int(query_db(query.format(arch))[0][0]) + if overall > 750: + log.info('%s packages already scheduled, nothing to do.', overall) + continue + log.info('%s packages already scheduled, scheduling some more...', + overall) + scheduler(arch) -- cgit v1.2.3-70-g09d2