diff --git a/inau-dispatcher.py b/inau-dispatcher.py index 6401c8cca141047fcee716a19fa0e372c336a3f9..970a8648376e8ff0d76ca959736878a74edc2429 100755 --- a/inau-dispatcher.py +++ b/inau-dispatcher.py @@ -28,7 +28,9 @@ from lib import db Session = sessionmaker() allbuilders = {} -#repositories = [] +users = {} + +# TODO Build a repository against ALL platforms and avoid emails entirely def __sendEmail(to_addrs, subject, body): if to_addrs: @@ -40,144 +42,152 @@ def __sendEmail(to_addrs, subject, body): msg['To'] = ', '.join(to_addrs) smtpClient.sendmail(from_addr=sender, to_addrs=list(to_addrs), msg=msg.as_string()) -def sendEmail(session, recipients, subject, body): - users = set() - for user in session.query(db.Users).filter(db.Users.notify==True).all(): - users.add(user.name + "@" + args.smtpdomain) - to_addrs = set(recipients).intersection(users) +def sendEmail(recipients, subject, body): + notifiable = set() + for user in users: + if (user.notify==True): + notifiable.add(user.name + "@" + args.smtpdomain) + to_addrs = set(recipients).intersection(notifiable) __sendEmail(to_addrs, subject, body) -def sendEmailAdmins(session, subject, body): +def sendEmailAdmins(subject, body): to_addrs = set() - for admin in session.query(db.Users).filter(db.Users.admin==True).all(): - to_addrs.add(admin.name + "@" + args.smtpdomain) + for user in users: + if (user.admin==True): + to_addrs.add(user.name + "@" + args.smtpdomain) __sendEmail(to_addrs, subject, body) -class JobType(IntEnum): - kill = 0, - build = 1, - update = 2 +# TODO Move follow classes to a separate file +class Die: + pass -class Job: - def __init__(self, type, repository_name=None, repository_url=None, repository_type=None, - platform_id=None, build_tag=None, build_id=None, emails=None): - self.type = type +class Update: + def __init__(self, repository_name, repository_url, build_tag): self.repository_name = repository_name self.repository_url = repository_url - self.repository_type = repository_type self.build_tag = build_tag - self.platform_id = platform_id - self.build_id = build_id + +class Build(Update): + def __init__(self, repository_name, repository_url, build_tag): + Update.__init__(self, repository_name, repository_url, build_tag) + self.status = '' + self.output = '' + +class Store(Build): + def __init__(self, repository_name, repository_url, build_tag, repository_id, repository_type, emails): + Build.__init__(self, repository_name, repository_url, build_tag) + self.repository_id = repository_id + self.repository_type = repository_type self.emails = emails class Builder: - def __init__(self, name): + def __init__(self, name, platform_id): self.name = name + self.platform_id = platform_id + self.platdir = args.repo + '/' + str(platform_id) self.queue = Queue() - self.process = Process(target=self.build, name=name) + self.process = Process(target=self.handler) self.process.start() - def build(self): - print("Parte buidler di " + self.name) # FIXME Debug + + def update(self, job): + logger.info("Checkouting " + job.build_tag + " from " + job.repository_url + "...") + builddir = self.platdir + "/" + job.repository_name + if not os.path.isdir(self.platdir): + os.mkdir(self.platdir) + if os.path.isdir(builddir): + subprocess.run(["git -C " + builddir + " remote update"], shell=True, check=True) + subprocess.run(["git -C " + builddir + " submodule update --remote --force --recursive"], shell=True, check=True) + else: + subprocess.run(["git clone --recurse-submodule " + job.repository_url + " " + builddir], shell=True, check=True) + subprocess.run(["git -C " + builddir + " reset --hard " + job.build_tag], shell=True, check=True) + + def build(self, job): + logging.info("Building " + job.build_tag + " from " + job.repository_url + "...") + builddir = self.platdir + "/" + job.repository_name + with paramiko.SSHClient() as sshClient: + sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + sshClient.connect(hostname=self.name, port=22, username="inau", + key_filename="/home/inau/.ssh/id_rsa.pub") + _, raw, _ = sshClient.exec_command("(source /etc/profile; cd " + builddir + + " && (test -f *.pro && qmake && cuuimake --plain-text-output);" + + " make -j`getconf _NPROCESSORS_ONLN`) 2>&1") + job.status = raw.channel.recv_exit_status() + job.output = raw.read().decode('latin-1') # utf-8 is rejected by Mysql despite it is properly configured + + def store(self, job): + logging.info("Storing " + job.build_tag + " from " + job.repository_url + "...") + + build = db.Builds(repository_id=job.repository_id, platform_id=self.platform_id, tag=os.path.basename(job.build_tag), + status=job.status, output=job.output) + self.session.add(build) + self.session.commit() + + builddir = self.platdir + "/" + job.repository_name + outcome = job.repository_name + " " + os.path.basename(job.build_tag) + if job.status != 0: + outcome += ": built failed on " + self.name + else: + outcome += ": built successfully on " + self.name + if job.repository_type == db.RepositoryType.cplusplus or job.repository_type == db.RepositoryType.python \ + or job.repository_type == db.RepositoryType.shellscript: + basedir = builddir + "/bin/" + elif job.repository_type == db.RepositoryType.configuration: + basedir = builddir + "/etc/" + else: + raiseException('Invalid type') + + artifacts = [] + for r, d, f in os.walk(basedir): + dir = "" + if r != basedir: + dir = os.path.basename(r) + "/" + for file in f: + hashFile = "" + with open(basedir + dir + file,"rb") as fd: + bytes = fd.read() + hashFile = hashlib.sha256(bytes).hexdigest(); + if not os.path.isfile(args.store + hashFile): + shutil.copyfile(basedir + dir + file, args.store + hashFile, follow_symlinks=False) + artifacts.append(db.Artifacts(build_id=build.id, hash=hashFile, filename=dir+file)) + self.session.add_all(artifacts) + self.session.commit() + sendEmail(job.emails, outcome, job.output) + + def handler(self): + logger.info("Starting process for builder " + self.name + "...") + + engine.dispose() + self.session = Session() while True: try: - print("buidler di " + self.name + " in attesa...") # FIXME Debug job = self.queue.get() - if job.type == JobType.kill: - print("Si ferma buodler di " + self.name) # FIXME Debug + if isinstance(job, Die): + logger.info("Stopping process for builder " + self.name + "...") break - print("buidler di " + self.name + " in azione... su: ") # FIXME Debug - print(job.type) # FIXME Debug - print(job.repository_name) # FIXME Debug - print(job.repository_url) # FIXME Debug - print(job.repository_type) # FIXME Debug - print(job.build_tag) # FIXME Debug - print(job.platform_id) # FIXME Debug - print(job.build_id) # FIXME Debug - print(job.emails) # FIXME Debug - - engine.dispose() - session = Session() - - try: - platdir = args.repo + '/' + str(job.platform_id) - builddir = platdir + "/" + job.repository_name - if not os.path.isdir(platdir): - os.mkdir(platdir) - if os.path.isdir(builddir): - subprocess.run(["git -C " + builddir + " remote update"], shell=True, check=True) - subprocess.run(["git -C " + builddir + " submodule update --remote --force --recursive"], shell=True, check=True) - else: - ret = subprocess.run(["git clone --recurse-submodule " + job.repository_url + " " + builddir], shell=True, check=True) - subprocess.run(["git -C " + builddir + " reset --hard " + job.build_tag], shell=True, check=True) - - if job.type == JobType.update: - continue - - with paramiko.SSHClient() as sshClient: - sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - sshClient.connect(hostname=self.name, port=22, username="inau", - key_filename="/home/inau/.ssh/id_rsa.pub") - _, raw, _ = sshClient.exec_command("(source /etc/profile; cd " + builddir - + " && (test -f *.pro && qmake && cuuimake --plain-text-output);" - + " make -j`getconf _NPROCESSORS_ONLN`) 2>&1") - status = raw.channel.recv_exit_status() - output = raw.read().decode('latin-1') # utf-8 is rejected by Mysql despite the right character set is configured - - if job.build_id: - build = session.query(db.Builds).filter(db.Builds.id==job.build_id).one() - build.date = datetime.datetime.now() - build.status = status - build.output = output - session.commit() - - outcome = job.repository_name + " " + os.path.basename(job.build_tag) - if status != 0: - outcome += ": built failed on " + self.name - else: - outcome += ": built successfully on " + self.name - if job.build_id: - if job.repository_type == db.RepositoryType.cplusplus or job.repository_type == db.RepositoryType.python \ - or job.repository_type == db.RepositoryType.shellscript: - basedir = builddir + "/bin/" - elif job.repository_type == db.RepositoryType.configuration: - basedir = builddir + "/etc/" - else: - raiseException('Invalid type') - - artifacts = [] - for r, d, f in os.walk(basedir): - dir = "" - if r != basedir: - dir = os.path.basename(r) + "/" - for file in f: - hashFile = "" - with open(basedir + dir + file,"rb") as fd: - bytes = fd.read() - hashFile = hashlib.sha256(bytes).hexdigest(); - if not os.path.isfile(args.store + hashFile): - shutil.copyfile(basedir + dir + file, args.store + hashFile, follow_symlinks=False) - artifacts.append(db.Artifacts(build_id=job.build_id, hash=hashFile, filename=dir+file)) - session.add_all(artifacts) - session.commit() - - sendEmail(session, job.emails, outcome, output) - - except subprocess.CalledProcessError as c: - print("C 1:", c) # TODO - except Exception as e: - session.rollback() - print("E 1:", e, type(e)) # TODO - finally: - session.close() - - # TODO Come funzione in background????? + if isinstance(job, Update): + self.update(job) + + if isinstance(job, Build): + self.build(job) + + if isinstance(job, Store): + self.store(job) + + except subprocess.CalledProcessError as c: + sendEmailAdmins("Subprocess failed", str(c)) + logger.error("Subprocess failed: ", str(c)) + session.rollback() + except Exception as e: + sendEmailAdmins("Generic error", str(e)) + logger.error("Generic error: ", str(e)) + session.rollback() except KeyboardInterrupt as k: + self.session.rollback() break - except Exception as e: - print("E 2: ", e) # TODO - + finally: + self.session.close() def signalHandler(signalNumber, frame): reconcile() @@ -186,164 +196,151 @@ def reconcile(): logger.info('Reconciling...') session = Session() - try: -# global allbuilders, repositories global allbuilders + global users + + users = session.query(db.Users).all() newbuilders = {} - oldbuilders = {} + oldbuilders = allbuilders for b in session.query(db.Builders).all(): try: - newbuilders[b.platform_id].append(Builder(b.name)) + newbuilders[b.platform_id].append(Builder(b.name, b.platform_id)) except KeyError: - newbuilders[b.platform_id] = [Builder(b.name)] - oldbuilders = allbuilders + newbuilders[b.platform_id] = [Builder(b.name, b.platform_id)] allbuilders = newbuilders for oldbuilder in oldbuilders.values(): for b in oldbuilder: - b.queue.put(Job(type=JobType.kill)) + b.queue.put(Die()) b.process.join() - -# newrepositories = [] -# for repository in session.query(db.Repositories2).all(): -# newrepositories.append(repository) -# repositories = newrepositories -# -# for repo in session.query(db.Repositories2).join(db.Providers). \ -# with_entities(db.Repositories2.id, db.Repositories2.name, db.Repositories2.type, db.Providers.url).all(): +# TODO build missing tags + # Build missing tags +# for repo in session.query(db.Repositories).join(db.Providers). \ +# filter(db.Repositories.name == "cs/ds/fake"). \ +# with_entities(db.Repositories.id, db.Repositories.name, +# db.Repositories.type, db.Repositories.platform_id, db.Providers.url).all(): # req = requests.get('https://gitlab.elettra.eu/api/v4/projects/' # + urllib.parse.quote(repo.name, safe='') + '/repository/tags') # data = req.json() -# if req.status_code == 200: -# # Retrieve commited tags +# if req.status_code != 200: +# logger.error('Error looking for repository ' + repo.name); +# else: +# # Retrieve committed tags # ctags = [] # for tag in data: # if tag['target'] != tag['commit']['id']: # ctags.append(tag['name']) # ctags.sort(key=StrictVersion) +# print(ctags) # -# for platform_id, builders in allbuilders.items(): -# builds = session.query(db.Builds).filter(db.Builds.repository_id==repo.id, -# db.Builds.platform_id==platform_id).all() -# # Retrieve builded tags -# btags = [] -# for build in builds: -# btags.append(build.tag) -# btags.sort(key=StrictVersion) +# # Retrieve built tags +# btags = [] +# for build in session.query(db.Builds).filter(db.Builds.repository_id==repo.id).all(): +# btags.append(build.tag) +# btags.sort(key=StrictVersion) +# print(btags) # -# mtags = list(set(ctags).difference(set(btags))) -# mtags.sort(key=StrictVersion) -# -# if mtags: -# i = ctags.index(mtags[0]) -# if i: -# # Re-build the previous built version -# idx = builders.index(min(builders, key=lambda x:x.queue.qsize())) -# builders[idx].queue.put(Job(type=JobType.build, repository_name = repo.name, -# repository_url = repo.url + ":" + repo.name, repository_type = repo.type, -# platform_id = platform_id, build_tag = ctags[i-1])) +# # Calculate missing tags +# mtags = list(set(ctags).difference(set(btags))) +# mtags.sort(key=StrictVersion) +# print(mtags) +# +# if mtags: +# i = ctags.index(mtags[0]) +# if i: +# # Re-build the previous built version +# idx = allbuilders[repo.platform_id].index(min(allbuilders[repo.platform_id], +# key=lambda x:x.queue.qsize())) +# allbuilders[repo.platform_id][idx].queue.put(Job(type=JobType.build, +# repository_name = repo.name, repository_url = repo.url+ "/" + repo.name + ".git", +# repository_type = repo.type, +# build_tag='refs/tags/' + ctags[i-1])) # -# # Build missing tags +# # Build missing tags +# for mtag in mtags: # emails = [] -# for mtag in mtags: -# idx = builders.index(min(builders, key=lambda x:x.queue.qsize())) -# emails.clear() -# for tag in data: -# if tag['name'] == mtag: -# emails = [tag['commit']['author_email']] -# break -# build = db.Builds(repository_id=repo.id, platform_id=platform_id, tag=mtag) -# session.add(build) -# session.commit() -# -# builders[idx].queue.put(Job(type=JobType.build, repository_name = repo.name, -# repository_url = repo.url + ":" + repo.name, repository_type = repo.type, -# platform_id = platform_id, build_tag = mtag, build_id = build.id, emails=emails)) +# for tag in data: +# if tag['name'] == mtag: +# emails = [tag['commit']['author_email']] +# break +# build = db.Builds(repository_id=repo.id, platform_id=repo.platform_id, tag=mtag) +# session.add(build) +# session.commit() # +# idx = allbuilders[repo.platform_id].index(min(allbuilders[repo.platform_id], +# key=lambda x:x.queue.qsize())) +# allbuilders[repo.platform_id][idx].queue.put(Job(type=JobType.build, +# repository_name = repo.name, repository_url = repo.url+ "/" + repo.name + ".git", +# repository_type = repo.type, build_tag='refs/tags/' + mtag, +# build_id = build.id, emails = emails)) except Exception as e: + sendEmailAdmins("Reconcilation failed", str(e)) + logger.error("Reconciliation failed: ", str(e)) session.rollback() - print("E 3: ", e) # TODO finally: session.close() class Server(BaseHTTPRequestHandler): def do_POST(self): - content_length = int(self.headers['Content-Length']) - post_data = self.rfile.read(content_length) - - if self.headers['Content-Type'] != 'application/json': - self.send_response(415) - self.end_headers() - return - - post_json = json.loads(post_data.decode('utf-8')) - print(post_json) # FIXME DEBUG - - # Tag deletion - if post_json['after'] == '0000000000000000000000000000000000000000': - self.send_response(415) - self.end_headers() - return + engine.dispose() + session = Session() + try: + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) - # Check if the tag is lightweight - if post_json['after'] == post_json['commits'][0]['id']: - self.send_response(400) - self.end_headers() - return + if self.headers['Content-Type'] != 'application/json': + self.send_response(415) + self.end_headers() + return - builds = [] - rn = '' - rt = '' + post_json = json.loads(post_data.decode('utf-8')) + logger.debug(post_json) - session = Session() - for r in session.query(db.Repositories).filter(db.Repositories.name==post_json['project']['path_with_namespace']).all(): - rn = r.name - rt = r.type - if r.name == "cs/ds/makefiles" and self.headers['X-Gitlab-Event'] == 'Push Hook' and post_json['event_name'] == 'push': - jt = JobType.update - elif self.headers['X-Gitlab-Event'] == 'Tag Push Hook' and post_json['event_name'] == 'tag_push': - jt = JobType.build - else: - self.send_response(400) + # Tag deletion + if post_json['after'] == '0000000000000000000000000000000000000000': + self.send_response(200) self.end_headers() - session.close() return - builds.append(db.Builds(repository_id=r.id, platform_id=r.platform_id, tag=os.path.basename(post_json['ref']))) - - if not builds: - self.send_response(404) - self.end_headers() - session.close() - return - - if jt == JobType.build: - try: - session.add_all(builds) - session.commit() - except: - session.rollback() - session.close() - self.send_response(500) + # Check if the tag is lightweight + if post_json['after'] == post_json['commits'][0]['id']: + self.send_response(200) self.end_headers() return - - for build in builds: - print('Assign the job to the builder with shortest queue length...') - idx = allbuilders[build.platform_id].index(min(allbuilders[build.platform_id], - key=lambda x:x.queue.qsize())) - allbuilders[build.platform_id][idx].queue.put(Job(type=jt, repository_name = rn, - repository_url = post_json['project']['http_url'], repository_type = rt, - platform_id = build.platform_id, build_tag=post_json['ref'], build_id=build.id, - emails=[post_json['commits'][0]['author']['email'], post_json['user_email']])) - - self.send_response(200) - self.end_headers() - session.close() + for r in session.query(db.Repositories).filter(db.Repositories.name==post_json['project']['path_with_namespace']).all(): + if r.name == "cs/ds/makefiles" and self.headers['X-Gitlab-Event'] == 'Push Hook' and post_json['event_name'] == 'push': + job = Update(repository_name = r.name, repository_url = post_json['project']['http_url'], build_tag=post_json['ref']) + elif self.headers['X-Gitlab-Event'] == 'Tag Push Hook' and post_json['event_name'] == 'tag_push': + job = Store(repository_name = r.name, repository_url = post_json['project']['http_url'], build_tag=post_json['ref'], + repository_id = r.id, repository_type = r.type, emails=[post_json['commits'][0]['author']['email'], post_json['user_email']]) + else: + continue + + # Assign the job to the builder with shortest queue length + idx = allbuilders[r.platform_id].index(min(allbuilders[r.platform_id], + key=lambda x:x.queue.qsize())) + allbuilders[r.platform_id][idx].queue.put(job) + +# TODO Enable protected_tags +# req = requests.post('https://gitlab.elettra.eu/api/v4/projects/' + urllib.parse.quote(rn, safe='') +# + '/protected_tags?name=' + post_json['ref'] + '&create_access_level=40') + + self.send_response(200) + self.end_headers() + except Exception as e: + sendEmailAdmins("Receive new tag failed", str(e)) + logger.error("Receive new tag failed: ", str(e)) + session.rollback() + self.send_response(500) + self.end_headers() + finally: + session.close() + +# TODO Migrate to multi-thread HTTP server? def run(address, port, server_class=HTTPServer, handler_class=Server): logger.info('Starting...') server_address = (address, port) @@ -356,39 +353,38 @@ def run(address, port, server_class=HTTPServer, handler_class=Server): logger.info('Stopping...') if __name__ == '__main__': + # TODO Migrate to configuration file to avoid hard-coded defaults + # Retrieve arguments parser = argparse.ArgumentParser() - parser.add_argument("--db", type=str, help='Database URI to connect to', required=True) + parser.add_argument("--db", type=str, help='Database URI to connect to', required=True) # TODO Enable password-less access to database parser.add_argument('--bind', type=str, default='localhost', help='IP Address or hostname to bind to') parser.add_argument('--port', type=int, default=443, help='Port to listen to') - parser.add_argument("--store", type=str, default='/scratch/build/files-store/') - parser.add_argument("--repo", type=str, default='/scratch/build/repositories/') - parser.add_argument("--smtpserver", type=str, default="smtp") - parser.add_argument("--smtpsender", type=str, default="noreply") - parser.add_argument("--smtpdomain", type=str, default="elettra.eu") + parser.add_argument("--store", type=str, default='/scratch/build/files-store/', help='Directory where store produced binaries') + parser.add_argument("--repo", type=str, default='/scratch/build/repositories/', help='Directory where checkout git repositories') + parser.add_argument("--smtpserver", type=str, default="smtp", help='Hostname of the SMTP server') + parser.add_argument("--smtpsender", type=str, default="noreply", help='Email sender') + parser.add_argument("--smtpdomain", type=str, default="elettra.eu", help='Email domain') args = parser.parse_args() + print("Start inau-dispatcher using", args.db, "on interface", args.bind, "and port", + args.port, "file store directory", args.store, "repositories clone directory", args.repo, + "SMTP server", args.smtpserver, "SMTP sender", args.smtpsender, "SMTP domain", args.smtpdomain) + if os.getpgrp() == os.tcgetpgrp(sys.stdout.fileno()): - # Executed in foreground (Development) + # Executed in foreground so redirect log to terminal and enable SQL echoing (Development) logging.basicConfig(level=logging.INFO) engine = create_engine(args.db, pool_pre_ping=True, echo=True) else: - # Executed in background (Production) + # Executed in background so redirect log to syslog and disable SQL echoing (Production) syslog_handler = logging.handlers.SysLogHandler(address='/dev/log') logging.basicConfig(level=logging.INFO, handlers=[syslog_handler]) engine = create_engine(args.db, pool_pre_ping=True, echo=False) + logger = logging.getLogger('inau-dispatcher') - logger = logging.getLogger('inauDispatcher') - Session.configure(bind=engine) reconcile() - - signal.signal(signal.SIGUSR1, signalHandler) + signal.signal(signal.SIGHUP, signalHandler) if args.bind: run(args.bind,args.port) - - # FIXME It is necessary? - for platform_id, builders in allbuilders.items(): - for builder in builders: - builder.process.join() diff --git a/lib/db.py b/lib/db.py index f8a24d7677586061552fe482b6db5b1ea72fb158..d475344f68b31ddc8c746b9585e70ecdfd1437a5 100644 --- a/lib/db.py +++ b/lib/db.py @@ -1,6 +1,6 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey, func -#from sqlalchemy.orm import relationship +from sqlalchemy.orm import relationship from enum import Enum, IntEnum import datetime @@ -17,22 +17,22 @@ class Architectures(Base): __tablename__ = 'architectures' id = Column(Integer, primary_key=True) name = Column(String(255), unique=True, nullable=False) -# platforms = relationship('Platforms', back_populates='architecture') + platforms = relationship('Platforms', back_populates='architecture') class Distributions(Base): __tablename__ = 'distributions' id = Column(Integer, primary_key=True) name = Column(String(255), nullable=False) version = Column(String(255), nullable=False) -# platforms = relationship('Platforms', back_populates='distribution') -# + platforms = relationship('Platforms', back_populates='distribution') + class Platforms(Base): __tablename__ = 'platforms' id = Column(Integer, primary_key=True) distribution_id = Column(Integer, ForeignKey('distributions.id'), nullable=False) architecture_id = Column(Integer, ForeignKey('architectures.id'), nullable=False) -# architecture = relationship('Architectures', back_populates='platforms') -# distribution = relationship('Distributions', back_populates='platforms') + architecture = relationship('Architectures', back_populates='platforms') + distribution = relationship('Distributions', back_populates='platforms') # servers = relationship('Servers', back_populates='platform') # #class Facilities(Base):