应用发布系统实践面试题
一、系统架构设计相关问题
1. 问:如何设计一个高可用的Flask应用发布系统的架构?
答:一个生产级别的应用发布系统架构应包含以下组件:
架构组成:
Client -> Nginx -> Flask App Cluster -> Redis/MySQL -> Jenkins -> K8s API
-> Celery Workers
-> RabbitMQ具体实现:
# config.py
class Config:
# 数据库配置
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:pass@localhost/deploy_sys'
# Redis配置
REDIS_URL = 'redis://localhost:6379/0'
# Celery配置
CELERY_BROKER_URL = 'amqp://localhost:5672'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# Jenkins配置
JENKINS_URL = 'http://jenkins.example.com'
JENKINS_USER = 'admin'
JENKINS_TOKEN = 'your-token'
# K8s配置
KUBERNETES_CONFIG = '/path/to/kubeconfig'
# app/__init__.py
from flask import Flask
from celery import Celery
from kubernetes import client, config
import jenkins
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
# 初始化扩展
db.init_app(app)
cache.init_app(app)
# 初始化Celery
celery = make_celery(app)
# 初始化K8s客户端
config.load_kube_config(app.config['KUBERNETES_CONFIG'])
# 注册蓝图
from app.api import bp as api_bp
app.register_blueprint(api_bp, url_prefix='/api')
return app高可用保证:
使用Gunicorn多进程部署
Redis主从复制
MySQL主从复制
RabbitMQ集群
应用多副本部署在K8s中
2. 问:如何处理并发部署请求?如何避免同一个项目被重复发布?
答:这需要考虑分布式锁和任务队列的实现:
from flask import current_app
from redis import Redis
from functools import wraps
class DistributedLock:
def __init__(self, lock_name, expire=60):
self.redis_client = Redis.from_url(current_app.config['REDIS_URL'])
self.lock_name = f"deploy_lock:{lock_name}"
self.expire = expire
def acquire(self):
return self.redis_client.set(
self.lock_name, 1, ex=self.expire, nx=True
)
def release(self):
self.redis_client.delete(self.lock_name)
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def require_deploy_lock(f):
@wraps(f)
def decorated_function(*args, **kwargs):
project_name = kwargs.get('project_name')
with DistributedLock(project_name) as lock:
if not lock:
return {'error': 'Project is being deployed'}, 409
return f(*args, **kwargs)
return decorated_function
@api_bp.route('/deploy/<project_name>', methods=['POST'])
@require_deploy_lock
def deploy_project(project_name):
# 处理部署请求
task = celery.send_task('tasks.deploy', args=[project_name])
return {'task_id': task.id}, 202二、数据模型设计相关问题
3. 问:如何设计应用发布系统的数据模型?需要考虑哪些关键字段?
答:数据模型设计需要考虑项目信息、发布记录、审批流程等:
from datetime import datetime
from app import db
class Project(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, index=True)
git_repo = db.Column(db.String(256))
owner = db.Column(db.String(64))
team = db.Column(db.String(64))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
deployments = db.relationship('Deployment', backref='project', lazy='dynamic')
class Deployment(db.Model):
id = db.Column(db.Integer, primary_key=True)
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
version = db.Column(db.String(64))
environment = db.Column(db.String(32))
status = db.Column(db.String(32))
started_at = db.Column(db.DateTime, default=datetime.utcnow)
finished_at = db.Column(db.DateTime)
deployed_by = db.Column(db.String(64))
def to_dict(self):
return {
'id': self.id,
'project': self.project.name,
'version': self.version,
'environment': self.environment,
'status': self.status,
'started_at': self.started_at.isoformat(),
'finished_at': self.finished_at.isoformat() if self.finished_at else None,
'deployed_by': self.deployed_by
}三、Jenkins Pipeline集成相关问题
4. 问:如何动态生成和触发Jenkins Pipeline?如何处理Pipeline执行结果?
答:需要实现Jenkins API的集成和Pipeline模板的管理:
import jenkins
from jinja2 import Template
class JenkinsClient:
def __init__(self, url, username, token):
self.server = jenkins.Jenkins(url, username=username, password=token)
def create_pipeline_job(self, project_name, pipeline_script):
"""创建Pipeline任务"""
config_xml = self._generate_pipeline_config(pipeline_script)
if not self.server.job_exists(project_name):
self.server.create_job(project_name, config_xml)
else:
self.server.reconfig_job(project_name, config_xml)
def _generate_pipeline_config(self, pipeline_script):
template = """
<?xml version='1.1' encoding='UTF-8'?>
<flow-definition plugin="workflow-job">
<definition class="org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition">
<script>{{pipeline_script}}</script>
<sandbox>true</sandbox>
</definition>
</flow-definition>
"""
return Template(template).render(pipeline_script=pipeline_script)
def build_job(self, project_name, parameters=None):
"""触发构建"""
if parameters:
return self.server.build_job(project_name, parameters=parameters)
return self.server.build_job(project_name)
def get_build_info(self, project_name, build_number):
"""获取构建信息"""
return self.server.get_build_info(project_name, build_number)
@celery.task
def handle_jenkins_build(project_name, build_number):
"""处理Jenkins构建结果"""
jenkins_client = JenkinsClient(
current_app.config['JENKINS_URL'],
current_app.config['JENKINS_USER'],
current_app.config['JENKINS_TOKEN']
)
build_info = jenkins_client.get_build_info(project_name, build_number)
if build_info['result'] == 'SUCCESS':
# 更新部署状态
deployment = Deployment.query.filter_by(
project_name=project_name,
status='BUILDING'
).first()
if deployment:
deployment.status = 'SUCCESS'
deployment.finished_at = datetime.utcnow()
db.session.commit()
# 触发K8s部署
deploy_to_kubernetes.delay(project_name, deployment.id)四、Kubernetes部署相关问题
5. 问:如何实现安全可靠的Kubernetes部署?如何处理滚动更新和回滚?
答:需要实现K8s API的封装和部署策略的管理:
from kubernetes import client, config
from kubernetes.client.rest import ApiException
class KubernetesDeployer:
def __init__(self, kubeconfig_path):
config.load_kube_config(kubeconfig_path)
self.apps_v1 = client.AppsV1Api()
self.core_v1 = client.CoreV1Api()
def deploy(self, namespace, deployment_name, container_image, replicas=3):
try:
# 准备deployment对象
deployment = {
'apiVersion': 'apps/v1',
'kind': 'Deployment',
'metadata': {
'name': deployment_name,
'namespace': namespace
},
'spec': {
'replicas': replicas,
'selector': {
'matchLabels': {
'app': deployment_name
}
},
'template': {
'metadata': {
'labels': {
'app': deployment_name
}
},
'spec': {
'containers': [{
'name': deployment_name,
'image': container_image,
'imagePullPolicy': 'Always',
'resources': {
'requests': {
'memory': "64Mi",
'cpu': "250m"
},
'limits': {
'memory': "128Mi",
'cpu': "500m"
}
},
'readinessProbe': {
'httpGet': {
'path': '/health',
'port': 80
},
'initialDelaySeconds': 5,
'periodSeconds': 10
}
}]
}
}
}
}
# 执行部署
self.apps_v1.create_namespaced_deployment(
namespace=namespace,
body=deployment
)
# 等待部署完成
self._wait_for_deployment(namespace, deployment_name)
return True
except ApiException as e:
current_app.logger.error(f"Exception when calling Kubernetes API: {e}")
return False
def _wait_for_deployment(self, namespace, deployment_name, timeout=300):
"""等待部署完成"""
start_time = time.time()
while time.time() - start_time < timeout:
response = self.apps_v1.read_namespaced_deployment_status(
name=deployment_name,
namespace=namespace
)
if (response.status.available_replicas == response.status.replicas and
response.status.replicas == response.status.ready_replicas):
return True
time.sleep(10)
raise TimeoutError("Deployment did not complete within timeout")
def rollback(self, namespace, deployment_name):
"""执行回滚"""
try:
# 获取部署历史
deployment = self.apps_v1.read_namespaced_deployment(
name=deployment_name,
namespace=namespace
)
# 回滚到上一个版本
deployment.spec.template.spec.containers[0].image = (
deployment.spec.template.spec.containers[0].image.split(':')[0] +
':previous'
)
self.apps_v1.patch_namespaced_deployment(
name=deployment_name,
namespace=namespace,
body=deployment
)
return True
except ApiException as e:
current_app.logger.error(f"Exception when rolling back: {e}")
return False五、监控和日志相关问题
6. 问:如何实现应用发布过程的监控和日志收集?
答:需要实现完整的监控和日志系统:
from elasticsearch import Elasticsearch
from prometheus_client import Counter, Histogram
import structlog
# Prometheus metrics
deployment_counter = Counter(
'deployment_total',
'Total number of deployments',
['project', 'environment', 'status']
)
deployment_duration = Histogram(
'deployment_duration_seconds',
'Time spent in deployment',
['project', 'environment']
)
# 结构化日志
logger = structlog.get_logger()
class DeploymentMonitor:
def __init__(self, elasticsearch_url):
self.es = Elasticsearch([elasticsearch_url])
def record_deployment(self, deployment_data):
"""记录部署信息"""
# 更新Prometheus指标
deployment_counter.labels(
project=deployment_data['project'],
environment=deployment_data['environment'],
status=deployment_data['status']
).inc()
# 记录结构化日志
logger.info(
"deployment_completed",
project=deployment_data['project'],
environment=deployment_data['environment'],
status=deployment_data['status'],
duration=deployment_data['duration']
)
# 存储到Elasticsearch
self.es.index(
index="deployments",
document=deployment_data
)
def get_deployment_stats(self, project_name, time_range):
"""获取部署统计信息"""
query = {
"query": {
"bool": {
"must": [
{"match": {"project": project_name}},
{"range": {"timestamp": {"gte": time_range}}}
]
}
},
"aggs": {
"status_stats": {
"terms": {"field": "status"}
},
"avg_duration": {
"avg": {"field": "duration"}
}
}
}
return self.es.search(index="deployments", body=query)
@celery.task
def monitor_deployment(deployment_id):
"""监控部署任务"""
with deployment_duration.labels(
project=deployment.project,
environment=deployment.environment
).最后更新于