Skip to content
Snippets Groups Projects
Commit b9b362cc authored by Alessio Igor Bogani's avatar Alessio Igor Bogani
Browse files

Massive rewritten of the inau-dispatcher

parent f622cbb2
No related branches found
No related tags found
No related merge requests found
......@@ -28,7 +28,9 @@ from lib import db
Session = sessionmaker()
allbuilders = {}
#repositories = []
users = {}
# TODO Build a repository against ALL platforms and avoid emails entirely
def __sendEmail(to_addrs, subject, body):
if to_addrs:
......@@ -40,144 +42,152 @@ def __sendEmail(to_addrs, subject, body):
msg['To'] = ', '.join(to_addrs)
smtpClient.sendmail(from_addr=sender, to_addrs=list(to_addrs), msg=msg.as_string())
def sendEmail(session, recipients, subject, body):
users = set()
for user in session.query(db.Users).filter(db.Users.notify==True).all():
users.add(user.name + "@" + args.smtpdomain)
to_addrs = set(recipients).intersection(users)
def sendEmail(recipients, subject, body):
notifiable = set()
for user in users:
if (user.notify==True):
notifiable.add(user.name + "@" + args.smtpdomain)
to_addrs = set(recipients).intersection(notifiable)
__sendEmail(to_addrs, subject, body)
def sendEmailAdmins(session, subject, body):
def sendEmailAdmins(subject, body):
to_addrs = set()
for admin in session.query(db.Users).filter(db.Users.admin==True).all():
to_addrs.add(admin.name + "@" + args.smtpdomain)
for user in users:
if (user.admin==True):
to_addrs.add(user.name + "@" + args.smtpdomain)
__sendEmail(to_addrs, subject, body)
class JobType(IntEnum):
kill = 0,
build = 1,
update = 2
# TODO Move follow classes to a separate file
class Die:
pass
class Job:
def __init__(self, type, repository_name=None, repository_url=None, repository_type=None,
platform_id=None, build_tag=None, build_id=None, emails=None):
self.type = type
class Update:
def __init__(self, repository_name, repository_url, build_tag):
self.repository_name = repository_name
self.repository_url = repository_url
self.repository_type = repository_type
self.build_tag = build_tag
self.platform_id = platform_id
self.build_id = build_id
class Build(Update):
def __init__(self, repository_name, repository_url, build_tag):
Update.__init__(self, repository_name, repository_url, build_tag)
self.status = ''
self.output = ''
class Store(Build):
def __init__(self, repository_name, repository_url, build_tag, repository_id, repository_type, emails):
Build.__init__(self, repository_name, repository_url, build_tag)
self.repository_id = repository_id
self.repository_type = repository_type
self.emails = emails
class Builder:
def __init__(self, name):
def __init__(self, name, platform_id):
self.name = name
self.platform_id = platform_id
self.platdir = args.repo + '/' + str(platform_id)
self.queue = Queue()
self.process = Process(target=self.build, name=name)
self.process = Process(target=self.handler)
self.process.start()
def build(self):
print("Parte buidler di " + self.name) # FIXME Debug
def update(self, job):
logger.info("Checkouting " + job.build_tag + " from " + job.repository_url + "...")
builddir = self.platdir + "/" + job.repository_name
if not os.path.isdir(self.platdir):
os.mkdir(self.platdir)
if os.path.isdir(builddir):
subprocess.run(["git -C " + builddir + " remote update"], shell=True, check=True)
subprocess.run(["git -C " + builddir + " submodule update --remote --force --recursive"], shell=True, check=True)
else:
subprocess.run(["git clone --recurse-submodule " + job.repository_url + " " + builddir], shell=True, check=True)
subprocess.run(["git -C " + builddir + " reset --hard " + job.build_tag], shell=True, check=True)
def build(self, job):
logging.info("Building " + job.build_tag + " from " + job.repository_url + "...")
builddir = self.platdir + "/" + job.repository_name
with paramiko.SSHClient() as sshClient:
sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
sshClient.connect(hostname=self.name, port=22, username="inau",
key_filename="/home/inau/.ssh/id_rsa.pub")
_, raw, _ = sshClient.exec_command("(source /etc/profile; cd " + builddir
+ " && (test -f *.pro && qmake && cuuimake --plain-text-output);"
+ " make -j`getconf _NPROCESSORS_ONLN`) 2>&1")
job.status = raw.channel.recv_exit_status()
job.output = raw.read().decode('latin-1') # utf-8 is rejected by Mysql despite it is properly configured
def store(self, job):
logging.info("Storing " + job.build_tag + " from " + job.repository_url + "...")
build = db.Builds(repository_id=job.repository_id, platform_id=self.platform_id, tag=os.path.basename(job.build_tag),
status=job.status, output=job.output)
self.session.add(build)
self.session.commit()
builddir = self.platdir + "/" + job.repository_name
outcome = job.repository_name + " " + os.path.basename(job.build_tag)
if job.status != 0:
outcome += ": built failed on " + self.name
else:
outcome += ": built successfully on " + self.name
if job.repository_type == db.RepositoryType.cplusplus or job.repository_type == db.RepositoryType.python \
or job.repository_type == db.RepositoryType.shellscript:
basedir = builddir + "/bin/"
elif job.repository_type == db.RepositoryType.configuration:
basedir = builddir + "/etc/"
else:
raiseException('Invalid type')
artifacts = []
for r, d, f in os.walk(basedir):
dir = ""
if r != basedir:
dir = os.path.basename(r) + "/"
for file in f:
hashFile = ""
with open(basedir + dir + file,"rb") as fd:
bytes = fd.read()
hashFile = hashlib.sha256(bytes).hexdigest();
if not os.path.isfile(args.store + hashFile):
shutil.copyfile(basedir + dir + file, args.store + hashFile, follow_symlinks=False)
artifacts.append(db.Artifacts(build_id=build.id, hash=hashFile, filename=dir+file))
self.session.add_all(artifacts)
self.session.commit()
sendEmail(job.emails, outcome, job.output)
def handler(self):
logger.info("Starting process for builder " + self.name + "...")
engine.dispose()
self.session = Session()
while True:
try:
print("buidler di " + self.name + " in attesa...") # FIXME Debug
job = self.queue.get()
if job.type == JobType.kill:
print("Si ferma buodler di " + self.name) # FIXME Debug
if isinstance(job, Die):
logger.info("Stopping process for builder " + self.name + "...")
break
print("buidler di " + self.name + " in azione... su: ") # FIXME Debug
print(job.type) # FIXME Debug
print(job.repository_name) # FIXME Debug
print(job.repository_url) # FIXME Debug
print(job.repository_type) # FIXME Debug
print(job.build_tag) # FIXME Debug
print(job.platform_id) # FIXME Debug
print(job.build_id) # FIXME Debug
print(job.emails) # FIXME Debug
engine.dispose()
session = Session()
try:
platdir = args.repo + '/' + str(job.platform_id)
builddir = platdir + "/" + job.repository_name
if not os.path.isdir(platdir):
os.mkdir(platdir)
if os.path.isdir(builddir):
subprocess.run(["git -C " + builddir + " remote update"], shell=True, check=True)
subprocess.run(["git -C " + builddir + " submodule update --remote --force --recursive"], shell=True, check=True)
else:
ret = subprocess.run(["git clone --recurse-submodule " + job.repository_url + " " + builddir], shell=True, check=True)
subprocess.run(["git -C " + builddir + " reset --hard " + job.build_tag], shell=True, check=True)
if job.type == JobType.update:
continue
with paramiko.SSHClient() as sshClient:
sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
sshClient.connect(hostname=self.name, port=22, username="inau",
key_filename="/home/inau/.ssh/id_rsa.pub")
_, raw, _ = sshClient.exec_command("(source /etc/profile; cd " + builddir
+ " && (test -f *.pro && qmake && cuuimake --plain-text-output);"
+ " make -j`getconf _NPROCESSORS_ONLN`) 2>&1")
status = raw.channel.recv_exit_status()
output = raw.read().decode('latin-1') # utf-8 is rejected by Mysql despite the right character set is configured
if job.build_id:
build = session.query(db.Builds).filter(db.Builds.id==job.build_id).one()
build.date = datetime.datetime.now()
build.status = status
build.output = output
session.commit()
outcome = job.repository_name + " " + os.path.basename(job.build_tag)
if status != 0:
outcome += ": built failed on " + self.name
else:
outcome += ": built successfully on " + self.name
if job.build_id:
if job.repository_type == db.RepositoryType.cplusplus or job.repository_type == db.RepositoryType.python \
or job.repository_type == db.RepositoryType.shellscript:
basedir = builddir + "/bin/"
elif job.repository_type == db.RepositoryType.configuration:
basedir = builddir + "/etc/"
else:
raiseException('Invalid type')
artifacts = []
for r, d, f in os.walk(basedir):
dir = ""
if r != basedir:
dir = os.path.basename(r) + "/"
for file in f:
hashFile = ""
with open(basedir + dir + file,"rb") as fd:
bytes = fd.read()
hashFile = hashlib.sha256(bytes).hexdigest();
if not os.path.isfile(args.store + hashFile):
shutil.copyfile(basedir + dir + file, args.store + hashFile, follow_symlinks=False)
artifacts.append(db.Artifacts(build_id=job.build_id, hash=hashFile, filename=dir+file))
session.add_all(artifacts)
session.commit()
sendEmail(session, job.emails, outcome, output)
except subprocess.CalledProcessError as c:
print("C 1:", c) # TODO
except Exception as e:
session.rollback()
print("E 1:", e, type(e)) # TODO
finally:
session.close()
# TODO Come funzione in background?????
if isinstance(job, Update):
self.update(job)
if isinstance(job, Build):
self.build(job)
if isinstance(job, Store):
self.store(job)
except subprocess.CalledProcessError as c:
sendEmailAdmins("Subprocess failed", str(c))
logger.error("Subprocess failed: ", str(c))
session.rollback()
except Exception as e:
sendEmailAdmins("Generic error", str(e))
logger.error("Generic error: ", str(e))
session.rollback()
except KeyboardInterrupt as k:
self.session.rollback()
break
except Exception as e:
print("E 2: ", e) # TODO
finally:
self.session.close()
def signalHandler(signalNumber, frame):
reconcile()
......@@ -186,164 +196,151 @@ def reconcile():
logger.info('Reconciling...')
session = Session()
try:
# global allbuilders, repositories
global allbuilders
global users
users = session.query(db.Users).all()
newbuilders = {}
oldbuilders = {}
oldbuilders = allbuilders
for b in session.query(db.Builders).all():
try:
newbuilders[b.platform_id].append(Builder(b.name))
newbuilders[b.platform_id].append(Builder(b.name, b.platform_id))
except KeyError:
newbuilders[b.platform_id] = [Builder(b.name)]
oldbuilders = allbuilders
newbuilders[b.platform_id] = [Builder(b.name, b.platform_id)]
allbuilders = newbuilders
for oldbuilder in oldbuilders.values():
for b in oldbuilder:
b.queue.put(Job(type=JobType.kill))
b.queue.put(Die())
b.process.join()
# newrepositories = []
# for repository in session.query(db.Repositories2).all():
# newrepositories.append(repository)
# repositories = newrepositories
#
# for repo in session.query(db.Repositories2).join(db.Providers). \
# with_entities(db.Repositories2.id, db.Repositories2.name, db.Repositories2.type, db.Providers.url).all():
# TODO build missing tags
# Build missing tags
# for repo in session.query(db.Repositories).join(db.Providers). \
# filter(db.Repositories.name == "cs/ds/fake"). \
# with_entities(db.Repositories.id, db.Repositories.name,
# db.Repositories.type, db.Repositories.platform_id, db.Providers.url).all():
# req = requests.get('https://gitlab.elettra.eu/api/v4/projects/'
# + urllib.parse.quote(repo.name, safe='') + '/repository/tags')
# data = req.json()
# if req.status_code == 200:
# # Retrieve commited tags
# if req.status_code != 200:
# logger.error('Error looking for repository ' + repo.name);
# else:
# # Retrieve committed tags
# ctags = []
# for tag in data:
# if tag['target'] != tag['commit']['id']:
# ctags.append(tag['name'])
# ctags.sort(key=StrictVersion)
# print(ctags)
#
# for platform_id, builders in allbuilders.items():
# builds = session.query(db.Builds).filter(db.Builds.repository_id==repo.id,
# db.Builds.platform_id==platform_id).all()
# # Retrieve builded tags
# btags = []
# for build in builds:
# btags.append(build.tag)
# btags.sort(key=StrictVersion)
# # Retrieve built tags
# btags = []
# for build in session.query(db.Builds).filter(db.Builds.repository_id==repo.id).all():
# btags.append(build.tag)
# btags.sort(key=StrictVersion)
# print(btags)
#
# mtags = list(set(ctags).difference(set(btags)))
# mtags.sort(key=StrictVersion)
#
# if mtags:
# i = ctags.index(mtags[0])
# if i:
# # Re-build the previous built version
# idx = builders.index(min(builders, key=lambda x:x.queue.qsize()))
# builders[idx].queue.put(Job(type=JobType.build, repository_name = repo.name,
# repository_url = repo.url + ":" + repo.name, repository_type = repo.type,
# platform_id = platform_id, build_tag = ctags[i-1]))
# # Calculate missing tags
# mtags = list(set(ctags).difference(set(btags)))
# mtags.sort(key=StrictVersion)
# print(mtags)
#
# if mtags:
# i = ctags.index(mtags[0])
# if i:
# # Re-build the previous built version
# idx = allbuilders[repo.platform_id].index(min(allbuilders[repo.platform_id],
# key=lambda x:x.queue.qsize()))
# allbuilders[repo.platform_id][idx].queue.put(Job(type=JobType.build,
# repository_name = repo.name, repository_url = repo.url+ "/" + repo.name + ".git",
# repository_type = repo.type,
# build_tag='refs/tags/' + ctags[i-1]))
#
# # Build missing tags
# # Build missing tags
# for mtag in mtags:
# emails = []
# for mtag in mtags:
# idx = builders.index(min(builders, key=lambda x:x.queue.qsize()))
# emails.clear()
# for tag in data:
# if tag['name'] == mtag:
# emails = [tag['commit']['author_email']]
# break
# build = db.Builds(repository_id=repo.id, platform_id=platform_id, tag=mtag)
# session.add(build)
# session.commit()
#
# builders[idx].queue.put(Job(type=JobType.build, repository_name = repo.name,
# repository_url = repo.url + ":" + repo.name, repository_type = repo.type,
# platform_id = platform_id, build_tag = mtag, build_id = build.id, emails=emails))
# for tag in data:
# if tag['name'] == mtag:
# emails = [tag['commit']['author_email']]
# break
# build = db.Builds(repository_id=repo.id, platform_id=repo.platform_id, tag=mtag)
# session.add(build)
# session.commit()
#
# idx = allbuilders[repo.platform_id].index(min(allbuilders[repo.platform_id],
# key=lambda x:x.queue.qsize()))
# allbuilders[repo.platform_id][idx].queue.put(Job(type=JobType.build,
# repository_name = repo.name, repository_url = repo.url+ "/" + repo.name + ".git",
# repository_type = repo.type, build_tag='refs/tags/' + mtag,
# build_id = build.id, emails = emails))
except Exception as e:
sendEmailAdmins("Reconcilation failed", str(e))
logger.error("Reconciliation failed: ", str(e))
session.rollback()
print("E 3: ", e) # TODO
finally:
session.close()
class Server(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
if self.headers['Content-Type'] != 'application/json':
self.send_response(415)
self.end_headers()
return
post_json = json.loads(post_data.decode('utf-8'))
print(post_json) # FIXME DEBUG
# Tag deletion
if post_json['after'] == '0000000000000000000000000000000000000000':
self.send_response(415)
self.end_headers()
return
engine.dispose()
session = Session()
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
# Check if the tag is lightweight
if post_json['after'] == post_json['commits'][0]['id']:
self.send_response(400)
self.end_headers()
return
if self.headers['Content-Type'] != 'application/json':
self.send_response(415)
self.end_headers()
return
builds = []
rn = ''
rt = ''
post_json = json.loads(post_data.decode('utf-8'))
logger.debug(post_json)
session = Session()
for r in session.query(db.Repositories).filter(db.Repositories.name==post_json['project']['path_with_namespace']).all():
rn = r.name
rt = r.type
if r.name == "cs/ds/makefiles" and self.headers['X-Gitlab-Event'] == 'Push Hook' and post_json['event_name'] == 'push':
jt = JobType.update
elif self.headers['X-Gitlab-Event'] == 'Tag Push Hook' and post_json['event_name'] == 'tag_push':
jt = JobType.build
else:
self.send_response(400)
# Tag deletion
if post_json['after'] == '0000000000000000000000000000000000000000':
self.send_response(200)
self.end_headers()
session.close()
return
builds.append(db.Builds(repository_id=r.id, platform_id=r.platform_id, tag=os.path.basename(post_json['ref'])))
if not builds:
self.send_response(404)
self.end_headers()
session.close()
return
if jt == JobType.build:
try:
session.add_all(builds)
session.commit()
except:
session.rollback()
session.close()
self.send_response(500)
# Check if the tag is lightweight
if post_json['after'] == post_json['commits'][0]['id']:
self.send_response(200)
self.end_headers()
return
for build in builds:
print('Assign the job to the builder with shortest queue length...')
idx = allbuilders[build.platform_id].index(min(allbuilders[build.platform_id],
key=lambda x:x.queue.qsize()))
allbuilders[build.platform_id][idx].queue.put(Job(type=jt, repository_name = rn,
repository_url = post_json['project']['http_url'], repository_type = rt,
platform_id = build.platform_id, build_tag=post_json['ref'], build_id=build.id,
emails=[post_json['commits'][0]['author']['email'], post_json['user_email']]))
self.send_response(200)
self.end_headers()
session.close()
for r in session.query(db.Repositories).filter(db.Repositories.name==post_json['project']['path_with_namespace']).all():
if r.name == "cs/ds/makefiles" and self.headers['X-Gitlab-Event'] == 'Push Hook' and post_json['event_name'] == 'push':
job = Update(repository_name = r.name, repository_url = post_json['project']['http_url'], build_tag=post_json['ref'])
elif self.headers['X-Gitlab-Event'] == 'Tag Push Hook' and post_json['event_name'] == 'tag_push':
job = Store(repository_name = r.name, repository_url = post_json['project']['http_url'], build_tag=post_json['ref'],
repository_id = r.id, repository_type = r.type, emails=[post_json['commits'][0]['author']['email'], post_json['user_email']])
else:
continue
# Assign the job to the builder with shortest queue length
idx = allbuilders[r.platform_id].index(min(allbuilders[r.platform_id],
key=lambda x:x.queue.qsize()))
allbuilders[r.platform_id][idx].queue.put(job)
# TODO Enable protected_tags
# req = requests.post('https://gitlab.elettra.eu/api/v4/projects/' + urllib.parse.quote(rn, safe='')
# + '/protected_tags?name=' + post_json['ref'] + '&create_access_level=40')
self.send_response(200)
self.end_headers()
except Exception as e:
sendEmailAdmins("Receive new tag failed", str(e))
logger.error("Receive new tag failed: ", str(e))
session.rollback()
self.send_response(500)
self.end_headers()
finally:
session.close()
# TODO Migrate to multi-thread HTTP server?
def run(address, port, server_class=HTTPServer, handler_class=Server):
logger.info('Starting...')
server_address = (address, port)
......@@ -356,39 +353,38 @@ def run(address, port, server_class=HTTPServer, handler_class=Server):
logger.info('Stopping...')
if __name__ == '__main__':
# TODO Migrate to configuration file to avoid hard-coded defaults
# Retrieve arguments
parser = argparse.ArgumentParser()
parser.add_argument("--db", type=str, help='Database URI to connect to', required=True)
parser.add_argument("--db", type=str, help='Database URI to connect to', required=True) # TODO Enable password-less access to database
parser.add_argument('--bind', type=str, default='localhost', help='IP Address or hostname to bind to')
parser.add_argument('--port', type=int, default=443, help='Port to listen to')
parser.add_argument("--store", type=str, default='/scratch/build/files-store/')
parser.add_argument("--repo", type=str, default='/scratch/build/repositories/')
parser.add_argument("--smtpserver", type=str, default="smtp")
parser.add_argument("--smtpsender", type=str, default="noreply")
parser.add_argument("--smtpdomain", type=str, default="elettra.eu")
parser.add_argument("--store", type=str, default='/scratch/build/files-store/', help='Directory where store produced binaries')
parser.add_argument("--repo", type=str, default='/scratch/build/repositories/', help='Directory where checkout git repositories')
parser.add_argument("--smtpserver", type=str, default="smtp", help='Hostname of the SMTP server')
parser.add_argument("--smtpsender", type=str, default="noreply", help='Email sender')
parser.add_argument("--smtpdomain", type=str, default="elettra.eu", help='Email domain')
args = parser.parse_args()
print("Start inau-dispatcher using", args.db, "on interface", args.bind, "and port",
args.port, "file store directory", args.store, "repositories clone directory", args.repo,
"SMTP server", args.smtpserver, "SMTP sender", args.smtpsender, "SMTP domain", args.smtpdomain)
if os.getpgrp() == os.tcgetpgrp(sys.stdout.fileno()):
# Executed in foreground (Development)
# Executed in foreground so redirect log to terminal and enable SQL echoing (Development)
logging.basicConfig(level=logging.INFO)
engine = create_engine(args.db, pool_pre_ping=True, echo=True)
else:
# Executed in background (Production)
# Executed in background so redirect log to syslog and disable SQL echoing (Production)
syslog_handler = logging.handlers.SysLogHandler(address='/dev/log')
logging.basicConfig(level=logging.INFO, handlers=[syslog_handler])
engine = create_engine(args.db, pool_pre_ping=True, echo=False)
logger = logging.getLogger('inau-dispatcher')
logger = logging.getLogger('inauDispatcher')
Session.configure(bind=engine)
reconcile()
signal.signal(signal.SIGUSR1, signalHandler)
signal.signal(signal.SIGHUP, signalHandler)
if args.bind:
run(args.bind,args.port)
# FIXME It is necessary?
for platform_id, builders in allbuilders.items():
for builder in builders:
builder.process.join()
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey, func
#from sqlalchemy.orm import relationship
from sqlalchemy.orm import relationship
from enum import Enum, IntEnum
import datetime
......@@ -17,22 +17,22 @@ class Architectures(Base):
__tablename__ = 'architectures'
id = Column(Integer, primary_key=True)
name = Column(String(255), unique=True, nullable=False)
# platforms = relationship('Platforms', back_populates='architecture')
platforms = relationship('Platforms', back_populates='architecture')
class Distributions(Base):
__tablename__ = 'distributions'
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False)
version = Column(String(255), nullable=False)
# platforms = relationship('Platforms', back_populates='distribution')
#
platforms = relationship('Platforms', back_populates='distribution')
class Platforms(Base):
__tablename__ = 'platforms'
id = Column(Integer, primary_key=True)
distribution_id = Column(Integer, ForeignKey('distributions.id'), nullable=False)
architecture_id = Column(Integer, ForeignKey('architectures.id'), nullable=False)
# architecture = relationship('Architectures', back_populates='platforms')
# distribution = relationship('Distributions', back_populates='platforms')
architecture = relationship('Architectures', back_populates='platforms')
distribution = relationship('Distributions', back_populates='platforms')
# servers = relationship('Servers', back_populates='platform')
#
#class Facilities(Base):
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment