Skip to content
Snippets Groups Projects
inau-dispatcher.py 16.2 KiB
Newer Older
#!/usr/bin/env python3

from http.server import BaseHTTPRequestHandler, HTTPServer
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, exc
from multiprocessing import Process, Queue
from enum import Enum, IntEnum
import os
import signal
import json
import sys
import logging
import logging.handlers
import argparse
import datetime
import subprocess
import paramiko
import hashlib
import shutil
#import requests
#import urllib.parse
from smtplib import SMTP
from email.mime.text import MIMEText
#from distutils.version import StrictVersion

from lib import db

Session = sessionmaker()

allbuilders = {}
#repositories = []

def __sendEmail(to_addrs, subject, body):
    if to_addrs:
        with SMTP(args.smtpserver + "." + args.smtpdomain, port=25) as smtpClient:
            sender = args.smtpsender + "@" + args.smtpdomain
            msg = MIMEText(body)
            msg['Subject'] = "INAU. " + subject
            msg['From'] = sender
            msg['To'] = ', '.join(to_addrs)
            smtpClient.sendmail(from_addr=sender, to_addrs=list(to_addrs), msg=msg.as_string())

def sendEmail(session, recipients, subject, body):
    users = set()
    for user in session.query(db.Users).filter(db.Users.notify==True).all():
        users.add(user.name + "@" + args.smtpdomain)
    to_addrs = set(recipients).intersection(users)
    __sendEmail(to_addrs, subject, body)

def sendEmailAdmins(session, subject, body):
    to_addrs = set()
    for admin in session.query(db.Users).filter(db.Users.admin==True).all():
        to_addrs.add(admin.name + "@" + args.smtpdomain)
    __sendEmail(to_addrs, subject, body)

class JobType(IntEnum):
    kill = 0,
    build = 1,
    update = 2

class Job:
    def __init__(self, type, repository_name=None, repository_url=None, repository_type=None,
            platform_id=None, build_tag=None, build_id=None, emails=None):
        self.type = type
        self.repository_name = repository_name
        self.repository_url = repository_url
        self.repository_type = repository_type
        self.build_tag = build_tag
        self.platform_id = platform_id
        self.build_id = build_id
        self.emails = emails

class Builder:
    def __init__(self, name):
        self.name = name
        self.queue = Queue()
        self.process = Process(target=self.build, name=name)
        self.process.start()
    def build(self):
        print("Parte buidler di " + self.name) # FIXME Debug
        while True:
            try:
                print("buidler di " + self.name + " in attesa...") # FIXME Debug
                job = self.queue.get()

                if job.type == JobType.kill:
                    print("Si ferma buodler di " + self.name) # FIXME Debug
                    break

                print("buidler di " + self.name + " in azione... su: ") # FIXME Debug
                print(job.type) # FIXME Debug
                print(job.repository_name) # FIXME Debug
                print(job.repository_url) # FIXME Debug
                print(job.repository_type) # FIXME Debug
                print(job.build_tag) # FIXME Debug
                print(job.platform_id) # FIXME Debug
                print(job.build_id) # FIXME Debug
                print(job.emails) # FIXME Debug

                engine.dispose()
                session = Session()

                try:
                    platdir = args.repo + '/' + str(job.platform_id)
                    builddir = platdir + "/" + job.repository_name
                    if not os.path.isdir(platdir):
                        os.mkdir(platdir)
                    if os.path.isdir(builddir):
                        subprocess.run(["git -C " + builddir + " remote update"], shell=True, check=True)
                        subprocess.run(["git -C " + builddir + " submodule update --remote --force --recursive"], shell=True, check=True)
                    else:
                        ret = subprocess.run(["git clone --recurse-submodule " + job.repository_url + " " + builddir], shell=True, check=True)
                    subprocess.run(["git -C " + builddir + " reset --hard " + job.build_tag], shell=True, check=True)
                
                    if job.type == JobType.update:
                        continue

                    with paramiko.SSHClient() as sshClient:
                        sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                        sshClient.connect(hostname=self.name, port=22, username="inau",
                                key_filename="/home/inau/.ssh/id_rsa.pub")
                        _, raw, _ = sshClient.exec_command("(source /etc/profile; cd " + builddir 
                                            + " && (test -f *.pro && qmake && cuuimake --plain-text-output);"
                                            + " make -j`getconf _NPROCESSORS_ONLN`) 2>&1")
                        status = raw.channel.recv_exit_status()
                        output = raw.read().decode('latin-1') # utf-8 is rejected by Mysql despite the right character set is configured

                    if job.build_id:
                        build = session.query(db.Builds).filter(db.Builds.id==job.build_id).one()
                        build.date = datetime.datetime.now()
                        build.status = status
                        build.output = output
                        session.commit()

                    outcome = job.repository_name + " " + os.path.basename(job.build_tag) 
                    if status != 0:
                        outcome += ": built failed on " + self.name
                    else:
                        outcome += ": built successfully on " + self.name
                        if job.build_id:
                            if job.repository_type == db.RepositoryType.cplusplus or job.repository_type == db.RepositoryType.python \
                                    or job.repository_type == db.RepositoryType.shellscript:
                                basedir = builddir + "/bin/"
                            elif job.repository_type == db.RepositoryType.configuration:
                                basedir = builddir + "/etc/"
                            else:
                                raiseException('Invalid type')

                            artifacts = []
                            for r, d, f in os.walk(basedir):
                                dir = ""
                                if r != basedir:
                                    dir = os.path.basename(r) + "/"
                                for file in f:
                                    hashFile = ""
                                    with open(basedir + dir + file,"rb") as fd:
                                        bytes = fd.read()
                                        hashFile = hashlib.sha256(bytes).hexdigest();
                                        if not os.path.isfile(args.store + hashFile):
                                            shutil.copyfile(basedir + dir + file, args.store + hashFile, follow_symlinks=False)
                                        artifacts.append(db.Artifacts(build_id=job.build_id, hash=hashFile, filename=dir+file))
                            session.add_all(artifacts)
                            session.commit()

                    sendEmail(session, job.emails, outcome, output)

                except subprocess.CalledProcessError as c:
                    print("C 1:", c)    # TODO
                except Exception as e:
                    session.rollback()
                    print("E 1:", e, type(e))    # TODO
                finally:
                    session.close()

            # TODO Come funzione in background?????
            except KeyboardInterrupt as k:
                break
            except Exception as e:
                print("E 2: ", e)    # TODO


def signalHandler(signalNumber, frame):
    reconcile()

def reconcile():
    logger.info('Reconciling...')

    session = Session()

    try:
#        global allbuilders, repositories
        global allbuilders

        newbuilders = {}
        oldbuilders = {}
        for b in session.query(db.Builders).all():
            try:
                newbuilders[b.platform_id].append(Builder(b.name))
            except KeyError:
                newbuilders[b.platform_id] = [Builder(b.name)]
        oldbuilders = allbuilders
        allbuilders = newbuilders

        for oldbuilder in oldbuilders.values():
            for b in oldbuilder:
                b.queue.put(Job(type=JobType.kill))
                b.process.join()

#        newrepositories = []
#        for repository in session.query(db.Repositories2).all():
#            newrepositories.append(repository)
#        repositories = newrepositories
#
#        for repo in session.query(db.Repositories2).join(db.Providers). \
#                with_entities(db.Repositories2.id, db.Repositories2.name, db.Repositories2.type, db.Providers.url).all():
#            req = requests.get('https://gitlab.elettra.eu/api/v4/projects/' 
#                    + urllib.parse.quote(repo.name, safe='') + '/repository/tags')
#            data = req.json()
#            if req.status_code == 200:
#                # Retrieve commited tags
#                ctags = []
#                for tag in data:
#                    if tag['target'] != tag['commit']['id']:
#                        ctags.append(tag['name'])
#                ctags.sort(key=StrictVersion)
#
#                for platform_id, builders in allbuilders.items():
#                    builds = session.query(db.Builds).filter(db.Builds.repository_id==repo.id, 
#                            db.Builds.platform_id==platform_id).all()
#                    # Retrieve builded tags
#                    btags = []
#                    for build in builds:
#                        btags.append(build.tag)
#                    btags.sort(key=StrictVersion)
#
#                    mtags = list(set(ctags).difference(set(btags)))
#                    mtags.sort(key=StrictVersion)
#                    
#                    if mtags:
#                        i = ctags.index(mtags[0])
#                        if i:
#                            # Re-build the previous built version
#                            idx = builders.index(min(builders, key=lambda x:x.queue.qsize()))
#                            builders[idx].queue.put(Job(type=JobType.build, repository_name = repo.name,
#                                repository_url = repo.url + ":" + repo.name, repository_type = repo.type, 
#                                platform_id = platform_id, build_tag = ctags[i-1]))
#
#                        # Build missing tags
#                        emails = []
#                        for mtag in mtags:
#                            idx = builders.index(min(builders, key=lambda x:x.queue.qsize()))
#                            emails.clear()
#                            for tag in data:
#                                if tag['name'] == mtag:
#                                    emails = [tag['commit']['author_email']]
#                                    break
#                            build = db.Builds(repository_id=repo.id, platform_id=platform_id, tag=mtag)
#                            session.add(build)
#                            session.commit()
#   
#                            builders[idx].queue.put(Job(type=JobType.build, repository_name = repo.name,
#                                repository_url = repo.url + ":" + repo.name, repository_type = repo.type, 
#                                platform_id = platform_id, build_tag = mtag, build_id = build.id, emails=emails))
#
    except Exception as e:
        session.rollback()
        print("E 3: ", e)     # TODO
    finally:
        session.close()

class Server(BaseHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length']) 
        post_data = self.rfile.read(content_length)

        if self.headers['Content-Type'] != 'application/json':
            self.send_response(415)
            self.end_headers()
            return

        post_json = json.loads(post_data.decode('utf-8'))
        print(post_json) # FIXME DEBUG

        # Tag deletion
        if post_json['after'] == '0000000000000000000000000000000000000000':
            self.send_response(415)
            self.end_headers()
            return

        # Check if the tag is lightweight
        if post_json['after'] == post_json['commits'][0]['id']:
            self.send_response(400)
            self.end_headers()
            return

        builds = []
        rn = ''
        rt = ''

        session = Session()
        for r in session.query(db.Repositories).filter(db.Repositories.name==post_json['project']['path_with_namespace']).all():
            rn = r.name
            rt = r.type
            if r.name == "cs/ds/makefiles" and self.headers['X-Gitlab-Event'] == 'Push Hook' and post_json['event_name'] == 'push':
                jt = JobType.update 
            elif self.headers['X-Gitlab-Event'] == 'Tag Push Hook' and post_json['event_name'] == 'tag_push':
                jt = JobType.build
            else:
                self.send_response(400)
                self.end_headers()
                session.close()
                return

            builds.append(db.Builds(repository_id=r.id, platform_id=r.platform_id, tag=os.path.basename(post_json['ref'])))
               
        if not builds:
            self.send_response(404)
            self.end_headers()
            session.close()
            return

        if jt == JobType.build:
            try:
                session.add_all(builds)
                session.commit()
            except:
                session.rollback()
                session.close()
                self.send_response(500)
                self.end_headers()
                return
       
        for build in builds:
            print('Assign the job to the builder with shortest queue length...')
            idx = allbuilders[build.platform_id].index(min(allbuilders[build.platform_id], 
                key=lambda x:x.queue.qsize()))
            allbuilders[build.platform_id][idx].queue.put(Job(type=jt, repository_name = rn, 
                repository_url = post_json['project']['http_url'], repository_type = rt, 
                platform_id = build.platform_id, build_tag=post_json['ref'], build_id=build.id, 
                emails=[post_json['commits'][0]['author']['email'], post_json['user_email']]))

        self.send_response(200)
        self.end_headers()

        session.close()

def run(address, port, server_class=HTTPServer, handler_class=Server):
    logger.info('Starting...')
    server_address = (address, port)
    httpd = server_class(server_address, handler_class)
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        pass
    httpd.server_close()
    logger.info('Stopping...')

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("--db", type=str, help='Database URI to connect to', required=True)
    parser.add_argument('--bind', type=str, default='localhost', help='IP Address or hostname to bind to')
    parser.add_argument('--port', type=int, default=443, help='Port to listen to')
    parser.add_argument("--store", type=str, default='/scratch/build/files-store/')
    parser.add_argument("--repo", type=str, default='/scratch/build/repositories/')
    parser.add_argument("--smtpserver", type=str, default="smtp")
    parser.add_argument("--smtpsender", type=str, default="noreply")
    parser.add_argument("--smtpdomain", type=str, default="elettra.eu")
    args = parser.parse_args()

    if os.getpgrp() == os.tcgetpgrp(sys.stdout.fileno()):
        # Executed in foreground (Development)
        logging.basicConfig(level=logging.INFO)
        engine = create_engine(args.db, pool_pre_ping=True, echo=True)
    else:
        # Executed in background (Production)
        syslog_handler = logging.handlers.SysLogHandler(address='/dev/log')
        logging.basicConfig(level=logging.INFO, handlers=[syslog_handler])
        engine = create_engine(args.db, pool_pre_ping=True, echo=False)

    logger = logging.getLogger('inauDispatcher')
    
    Session.configure(bind=engine)

    reconcile()

    signal.signal(signal.SIGUSR1, signalHandler)

    if args.bind:
            run(args.bind,args.port)

    # FIXME It is necessary?
    for platform_id, builders in allbuilders.items():
        for builder in builders:
            builder.process.join()