
大模型新基座,基于FastAPI,利用Python開發(fā)MCP服務器
另一方面,微服務架構是一種將應用程序劃分為更小、可獨立部署的服務的方法。每個服務都有自己的業(yè)務邏輯和數(shù)據(jù)庫,并通過輕量級協(xié)議與其他服務通信。這種方法可以縮短開發(fā)周期、簡化維護并提高可擴展性。
RabbitMQ 是一個實現(xiàn)高級消息隊列協(xié)議 (AMQP) 的消息代理。它充當分布式系統(tǒng)各個組件的中間人,使它們能夠高效地通信和協(xié)調任務。以下是 RabbitMQ 在微服務架構中被廣泛使用的原因:
現(xiàn)在我們已經介紹了微服務的基礎知識,讓我們更深入地研究如何編寫我們的第一個微服務。我們將探索如何使用 Python、FastAPI、RabbitMQ 和 PostgreSQL 設計和實現(xiàn)微服務架構。本動手指南將引導您設置每個組件、設計微服務交互以及部署微服務以創(chuàng)建功能齊全的應用程序。讓我們開始吧!
我們的應用程序包括四項主要服務:
在開始之前,請確保滿足以下先決條件:
要使用 Docker 安裝 PostgreSQL,請運行以下命令:
docker run --name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres--name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres
要使用 Docker 安裝 RabbitMQ,請運行以下命令:
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management--主機名我的兔子--名稱一些兔子-p 5672 : 5672 -p 15672 : 15672 rabbitmq: 3-管理
演示
│
│
│
│env
│
...
在本節(jié)中,我們將重點介紹如何實現(xiàn)網(wǎng)關服務,該服務充當微服務架構中所有傳入請求的入口點。網(wǎng)關服務負責將請求路由到適當?shù)奈⒎詹⑻幚響贸绦虻恼w編排。
現(xiàn)在,讓我們實現(xiàn)網(wǎng)關服務。使用以下代碼在 gateway/
目錄中創(chuàng)建一個 main.py
文件:
from fastapi import FastAPI, HTTPException , File, UploadFile
import fastapi as _fastapi
from fastapi.security import OAuth2PasswordBearer
from dotenv import load_dotenv
from jwt.exceptions import DecodeError
from pydantic import BaseModel
import requests
import base64
import pika
import logging
import os
import jwt
import rpc_client
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Load environment variables
load_dotenv()
logging.basicConfig(level=logging.INFO)
# Retrieve environment variables
JWT_SECRET = os.environ.get("JWT_SECRET")
AUTH_BASE_URL = os.environ.get("AUTH_BASE_URL")
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL)) # add container name in docker
channel = connection.channel()
channel.queue_declare(queue='gatewayservice')
channel.queue_declare(queue='ocr_service')
# JWT token validation
async def jwt_validation(token: str = _fastapi.Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return payload
except DecodeError:
raise HTTPException(status_code=401, detail="Invalid JWT token")
# Pydantic models for request body
class GenerateUserToken(BaseModel):
username: str
password: str
class UserCredentials(BaseModel):
username: str
password: str
class UserRegisteration(BaseModel):
name: str
email: str
password: str
class GenerateOtp(BaseModel):
email: str
class VerifyOtp(BaseModel):
email: str
otp: int
# Authentication routes
@app.post("/auth/login", tags=['Authentication Service'])
async def login(user_data: UserCredentials):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/token", json={"username": user_data.username, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
@app.post("/auth/register", tags=['Authentication Service'])
async def registeration(user_data:UserRegisteration):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users", json={"name":user_data.name,"email": user_data.email, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
@app.post("/auth/generate_otp", tags=['Authentication Service'])
async def generate_otp(user_data:GenerateOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/generate_otp", json={"email":user_data.email})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
@app.post("/auth/verify_otp", tags=['Authentication Service'])
async def verify_otp(user_data:VerifyOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/verify_otp", json={"email":user_data.email ,"otp":user_data.otp})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
# ml microservice route - OCR route
@app.post('/ocr' , tags=['Machine learning Service'] )
def ocr(file: UploadFile = File(...),
payload: dict = _fastapi.Depends(jwt_validation)):
# Save the uploaded file to a temporary location
with open(file.filename, "wb") as buffer:
buffer.write(file.file.read())
ocr_rpc = rpc_client.OcrRpcClient()
with open(file.filename, "rb") as buffer:
file_data = buffer.read()
file_base64 = base64.b64encode(file_data).decode()
request_json = {
'user_name':payload['name'],
'user_email':payload['email'],
'user_id':payload['id'],
'file': file_base64
}
# Call the OCR microservice with the request JSON
response = ocr_rpc.call(request_json)
# Delete the temporary image file
os.remove(file.filename)
return response
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5001, reload=True)
要為網(wǎng)關設置環(huán)境,請在網(wǎng)關文件夾中創(chuàng)建一個 .env
文件。
AUTH_BASE_URL=http://0.0.0.0:5000
JWT_SECRET=e56623570e0a0152989fd38e13da9cd6eb7031e4e039e939ba845167ee59b496
RABBITMQ_URL=localhost
為了與其他微服務通信,我們將使用 RabbitMQ,這是一個支持服務之間異步消息傳遞的消息代理。我們將在 gateway/
目錄中創(chuàng)建一個 rpc_client.py
文件來處理與 RabbitMQ 服務器的通信。
import pika
import uuid
import json
from dotenv import load_dotenv
import os
# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
class OcrRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_URL))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, message):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='ocr_service',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(message))
while self.response is None:
self.connection.process_data_events()
response_json = json.loads(self.response)
return response_json
此代碼定義了一個客戶端類 OcrRpcClient
,用于使用 RabbitMQ 向 OCR 微服務(ML 微服務)發(fā)送消息。它初始化連接,為響應設置回調隊列,并提供異步發(fā)送消息和接收響應的方法。
__init__
):建立與 RabbitMQ 的連接。創(chuàng)建通道并聲明唯一的回調隊列。設置使用者以偵聽回調隊列上的響應。
2. 發(fā)送請求(呼叫
):
向 OCR 微服務(ML 微服務)發(fā)送消息。等待回調隊列上的響應并返回它。
此類使網(wǎng)關服務能夠使用 RabbitMQ 高效地與 OCR 微服務通信。
在本節(jié)中,我們將重點介紹如何實現(xiàn)身份驗證服務,該服務充當微服務架構中所有與身份驗證相關的請求的入口點。身份驗證服務負責用戶身份驗證、注冊和 OTP 生成。現(xiàn)在,讓我們實現(xiàn)身份驗證服務。
在 auth/ 目錄下創(chuàng)建一個 main.py 文件,代碼如下:
此代碼使用 FastAPI 實現(xiàn)身份驗證服務,用于用戶注冊、登錄、JWT 令牌生成、使用 OTP 的電子郵件驗證和用戶資料檢索。它使用 SQLAlchemy 進行數(shù)據(jù)庫作,使用 RabbitMQ 發(fā)送 OTP 電子郵件。該服務包括用于創(chuàng)建用戶、生成 JWT 令牌、檢索用戶資料和驗證 OTP 以進行電子郵件驗證的端點。
from typing import List
from fastapi import HTTPException
import fastapi as _fastapi
import schemas as _schemas
import sqlalchemy.orm as _orm
import models as _models
import service as _services
import logging
import database as _database
import pika
# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue='email_notification')
def get_db():
db = _database.SessionLocal()
try:
yield db
finally:
db.close()
app = _fastapi.FastAPI()
logging.basicConfig(level=logging.INFO)
_models.Base.metadata.create_all(_models.engine)
@app.post("/api/users" , tags = ['User Auth'])
async def create_user(
user: _schemas.UserCreate,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
db_user = await _services.get_user_by_email(email=user.email, db=db)
if db_user:
logging.info('User with that email already exists')
raise _fastapi.HTTPException(
status_code=200,
detail="User with that email already exists")
user = await _services.create_user(user=user, db=db)
return _fastapi.HTTPException(
status_code=201,
detail="User Registered, Please verify email to activate account !")
# Endpoint to check if the API is live
@app.get("/check_api")
async def check_api():
return {"status": "Connected to API Successfully"}
@app.post("/api/token" ,tags = ['User Auth'])
async def generate_token(
#form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),
user_data: _schemas.GenerateUserToken,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.authenticate_user(email=user_data.username, password=user_data.password, db=db)
if user == "is_verified_false":
logging.info('Email verification is pending. Please verify your email to proceed. ')
raise _fastapi.HTTPException(
status_code=403, detail="Email verification is pending. Please verify your email to proceed.")
if not user:
logging.info('Invalid Credentials')
raise _fastapi.HTTPException(
status_code=401, detail="Invalid Credentials")
logging.info('JWT Token Generated')
return await _services.create_token(user=user)
@app.get("/api/users/me", response_model=_schemas.User , tags = ['User Auth'])
async def get_user(user: _schemas.User = _fastapi.Depends(_services.get_current_user)):
return user
@app.get("/api/users/profile", tags=['User Auth'])
async def get_user(email: str, db: _orm.Session = _fastapi.Depends(_services.get_db)):
return db.query(_models.User and _models.Address).filter_by(id=1).first()
@app.post("/api/users/generate_otp", response_model=str, tags=["User Auth"])
async def send_otp_mail(userdata: _schemas.GenerateOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db)
if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")
if user.is_verified:
raise _fastapi.HTTPException(status_code=400, detail="User is already verified")
# Generate and send OTP
otp = _services.generate_otp()
print(otp)
_services.send_otp(userdata.email, otp, channel)
# Store the OTP in the database
user.otp = otp
db.add(user)
db.commit()
return "OTP sent to your email"
@app.post("/api/users/verify_otp", tags=["User Auth"])
async def verify_otp(userdata: _schemas.VerifyOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db )
if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")
if not user.otp or user.otp != userdata.otp:
raise _fastapi.HTTPException(status_code=400, detail="Invalid OTP")
# Update user's is_verified field
user.is_verified = True
user.otp = None # Clear the OTP
db.add(user)
db.commit()
return "Email verified successfully"
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=True)
在 auth/ 目錄下創(chuàng)建一個 database.py 文件,代碼如下:
此代碼設置用于連接到 PostgreSQL 數(shù)據(jù)庫的 SQLAlchemy 引擎和會話創(chuàng)建器。它使用 dotenv
從環(huán)境變量加載數(shù)據(jù)庫連接詳細信息。DATABASE_URL
是使用檢索到的環(huán)境變量(包括主機、數(shù)據(jù)庫名稱、用戶名和密碼)構建的。該引擎是使用 create_engine
和 DATABASE_URL
創(chuàng)建的,并且 SessionLocal
被定義為綁定到此引擎的會話創(chuàng)建者。Base
變量被初始化為用于定義 ORM 模型的聲明性基。
import sqlalchemy as _sql
import sqlalchemy.ext.declarative as _declarative
import sqlalchemy.orm as _orm
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
# Retrieve environment variables
postgres_host = os.environ.get("POSTGRES_HOST")
postgres_db = os.environ.get("POSTGRES_DB")
postgres_user = os.environ.get("POSTGRES_USER")
postgres_password = os.environ.get("POSTGRES_PASSWORD")
# Assuming your PostgreSQL server is running locally with a database named 'mydatabase'
DATABASE_URL = f"postgresql://{postgres_user}:{postgres_password}@{postgres_host}/{postgres_db}"
engine = _sql.create_engine(DATABASE_URL)
SessionLocal = _orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = _declarative.declarative_base()
在 auth/ 目錄下創(chuàng)建一個 models.py 文件,代碼如下:
此代碼為 User
和 Address
表定義 SQLAlchemy 模型 ,存儲用戶信息和地址以及它們之間的關系。它還使用提供的引擎在數(shù)據(jù)庫中創(chuàng)建表。
import datetime as _dt
import sqlalchemy as _sql
import sqlalchemy.orm as _orm
import passlib.hash as _hash
from database import Base , engine
import database as _database
Base.metadata.create_all(engine)
class User(_database.Base):
__tablename__ = "users"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
name = _sql.Column(_sql.String)
email = _sql.Column(_sql.String, unique=True, index=True)
is_verified = _sql.Column(_sql.Boolean , default=False)
otp = _sql.Column(_sql.Integer)
hashed_password = _sql.Column(_sql.String)
addresses = _orm.relationship("Address", back_populates="user")
date_created = _sql.Column(_sql.DateTime, default=_dt.datetime.utcnow)
def verify_password(self, password: str):
return _hash.bcrypt.verify(password, self.hashed_password)
class Address(_database.Base):
__tablename__ = "addresses"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
street = _sql.Column(_sql.String)
landmark = _sql.Column(_sql.String)
city = _sql.Column(_sql.String)
country = _sql.Column(_sql.String)
pincode = _sql.Column(_sql.String)
user_id = _sql.Column(_sql.Integer, _sql.ForeignKey("users.id"))
user = _orm.relationship("User", back_populates="addresses")
latitude = _sql.Column(_sql.Float)
longitude = _sql.Column(_sql.Float)
在 auth/ 目錄下創(chuàng)建一個 schemas.py 文件,代碼如下:
此代碼為 用戶相關的數(shù)據(jù)結構定義 Pydantic 模型,包括用戶創(chuàng)建、身份驗證和 OTP 驗證。它還包括用于位置信息的地址模型。這些模型配置為從字典屬性自動創(chuàng)建實例。
import datetime
import pydantic
class UserBase(pydantic.BaseModel):
name: str
email: str
class Config:
from_attributes=True
class UserCreate(UserBase):
password: str
class Config:
from_attributes=True
class User(UserBase):
id: int
date_created: datetime.datetime
class Config:
from_attributes=True
class AddressBase(pydantic.BaseModel):
street: str
landmark: str
city: str
country: str
pincode: str
latitude: float
longitude: float
class Config:
from_attributes=True
class GenerateUserToken(pydantic.BaseModel):
username: str
password: str
class Config:
from_attributes=True
class GenerateOtp(pydantic.BaseModel):
email: str
class VerifyOtp(pydantic.BaseModel):
email: str
otp: int
在 auth/ 目錄下創(chuàng)建一個 service.py 文件,代碼如下:
此代碼定義了用于用戶身份驗證和 OTP(一次性密碼)生成和驗證的各種函數(shù)和依賴項。它使用 FastAPI 來處理 HTTP 請求,使用 SQLAlchemy 進行數(shù)據(jù)庫作,使用 Pydantic 進行數(shù)據(jù)驗證和序列化,使用 JWT 進行身份驗證,使用 RabbitMQ 發(fā)送電子郵件通知。這些功能包括創(chuàng)建數(shù)據(jù)庫、獲取數(shù)據(jù)庫會話、創(chuàng)建新用戶、對用戶進行身份驗證、創(chuàng)建 JWT 令牌、從 JWT 令牌獲取當前用戶、生成隨機 OTP、連接到 RabbitMQ 以及發(fā)送 OTP 電子郵件通知。
import jwt
import sqlalchemy.orm as _orm
import passlib.hash as _hash
import email_validator as _email_check
import fastapi as _fastapi
import fastapi.security as _security
from passlib.hash import bcrypt
import database as _database
import schemas as _schemas
import models as _models
import random
import json
import pika
import time
import os
# Load environment variables
JWT_SECRET = os.getenv("JWT_SECRET")
RABBITMQ_URL = os.getenv("RABBITMQ_URL")
oauth2schema = _security.OAuth2PasswordBearer("/api/token")
def create_database():
# Create database tables
return _database.Base.metadata.create_all(bind=_database.engine)
def get_db():
# Dependency to get a database session
db = _database.SessionLocal()
try:
yield db
finally:
db.close()
async def get_user_by_email(email: str, db: _orm.Session):
# Retrieve a user by email from the database
return db.query(_models.User).filter(_models.User.email == email and _models.User.is_verified==True).first()
async def create_user(user: _schemas.UserCreate, db: _orm.Session):
# Create a new user in the database
try:
valid = _email_check.validate_email(user.email)
name = user.name
email = valid.email
except _email_check.EmailNotValidError:
raise _fastapi.HTTPException(status_code=404, detail="Please enter a valid email")
user_obj = _models.User(email=email, name=name, hashed_password=_hash.bcrypt.hash(user.password))
db.add(user_obj)
db.commit()
db.refresh(user_obj)
return user_obj
async def authenticate_user(email: str, password: str, db: _orm.Session):
# Authenticate a user
user = await get_user_by_email(email=email, db=db)
if not user:
return False
if not user.is_verified:
return 'is_verified_false'
if not user.verify_password(password):
return False
return user
async def create_token(user: _models.User):
# Create a JWT token for authentication
user_obj = _schemas.User.from_orm(user)
user_dict = user_obj.model_dump()
del user_dict["date_created"]
token = jwt.encode(user_dict, JWT_SECRET, algorithm="HS256")
return dict(access_token=token, token_type="bearer")
async def get_current_user(db: _orm.Session = _fastapi.Depends(get_db), token: str = _fastapi.Depends(oauth2schema)):
# Get the current authenticated user from the JWT token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user = db.query(_models.User).get(payload["id"])
except:
raise _fastapi.HTTPException(status_code=401, detail="Invalid Email or Password")
return _schemas.User.from_orm(user)
def generate_otp():
# Generate a random OTP
return str(random.randint(100000, 999999))
def connect_to_rabbitmq():
# Connect to RabbitMQ
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL))
return connection
except pika.exceptions.AMQPConnectionError:
print("Failed to connect to RabbitMQ. Retrying in 5 seconds...")
time.sleep(5)
def send_otp(email, otp, channel):
# Send an OTP email notification using RabbitMQ
connection = connect_to_rabbitmq()
channel = connection.channel()
message = {'email': email,
'subject': 'Account Verification OTP Notification',
'other': 'null',
'body': f'Your OTP for account verification is: {otp} \n Please enter this OTP on the verification page to complete your account setup. \n If you did not request this OTP, please ignore this message.\n Thank you '
}
try:
queue_declare_ok = channel.queue_declare(queue='email_notification', passive=True)
current_durable = queue_declare_ok.method.queue
if current_durable:
if queue_declare_ok.method.queue != current_durable:
channel.queue_delete(queue='email_notification')
channel.queue_declare(queue='email_notification', durable=True)
else:
channel.queue_declare(queue='email_notification', durable=True)
channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OTP email notification")
except Exception as err:
print(f"Failed to publish message: {err}")
finally:
channel.close()
connection.close()
在本節(jié)中,我們將重點介紹如何實現(xiàn)機器學習服務,該服務作為微服務架構中所有 OCR(光學字符識別)相關請求的入口點。機器學習服務負責處理 OCR 請求、從圖像中提取文本以及在完成時發(fā)送通知。
現(xiàn)在,讓我們實現(xiàn)機器學習服務。
在 ml_services/ 目錄下創(chuàng)建一個 main.py 文件,代碼如下:
此 Python 腳本連接到 RabbitMQ 服務器,并使用來自名為“ocr_service”的隊列中的消息。收到消息后,它使用 OCRService 對象對其進行處理,使用 send_email_notification 函數(shù)發(fā)送電子郵件通知,然后將響應發(fā)布到回復隊列。在處理每條消息后,它確認向 RabbitMQ 傳送消息。該腳本使用預取計數(shù) 1 來限制 RabbitMQ 將傳送的未確認消息的數(shù)量。
import pika
import json
from utils import OCRService
from utils import send_email_notification
# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='ocr_service')
# Callback function to process OCR requests
def on_request(ch, method, props, body):
# Initialize OCR service
ocr_service = OCRService()
# Process OCR request
response = ocr_service.process_request(body)
# Send email notification
send_email_notification(response['user_email'], response['ocr_text'], channel)
# Publish response to the reply queue
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=json.dumps(response))
# Acknowledge the message delivery
ch.basic_ack(delivery_tag=method.delivery_tag)
# Set prefetch count to 1
channel.basic_qos(prefetch_count=1)
# Consume messages from the 'ocr_service' queue
channel.basic_consume(queue='ocr_service', on_message_callback=on_request)
# Start consuming messages
print(" [x] Awaiting RPC requests")
channel.start_consuming()
在 ml_services/ 目錄下創(chuàng)建一個 utils.py 文件,代碼如下:
import json
import base64
import pandas as pd
#keras ocr pipeline and imports
import keras_ocr
import pika
class OCRService:
def __init__(self):
self.keras_pipeline = keras_ocr.pipeline.Pipeline()
def keras_ocr(self, image_path):
results = self.keras_pipeline.recognize([image_path])
df = pd.DataFrame(results[0], columns=['text', 'bbox'])
words = df['text'].tolist()
sentence = ' '.join(words)
return sentence
def process_request(self, message):
message_body = json.loads(message)
user_name = message_body['user_name']
user_email = message_body['user_email']
user_id = message_body['user_id']
file_base64 = message_body['file']
print(f" [x]user_id: {user_id} request recieved from gateway..")
print(f" [x]processing request for {user_name}")
# Assuming file_base64 contains the base64-encoded string
file_data = base64.b64decode(file_base64.encode())
# Write the decoded file data to a new file
with open('artifacts/decoded_file.png', 'wb') as f:
f.write(file_data)
image_path = "artifacts/decoded_file.png"
ocr_text = self.keras_ocr(image_path)
print(" [^]OCR processing done !!!")
response = {
"user_id": user_id,
"user_name": user_name,
"user_email": user_email,
"ocr_text": ocr_text
}
return response
def send_email_notification(email, ocr_text, channel):
# Send an email notification using RabbitMQ
message = {
'email': email,
'subject':'OCR Process Completed !!',
'body':f'We are pleased to inform you that the OCR (Optical Character Recognition) process for your image has been successfully completed.\n The extracted text has been processed and is now ready for use.\n\n OCR text : {ocr_text}',
'other': 'null',
}
try:
channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OCR Process Completed email notification")
except Exception as err:
print(f"Failed to publish message: {err}")
在本節(jié)中,我們將重點介紹如何實現(xiàn)通知服務,該服務向用戶發(fā)送電子郵件通知。
在 notification_service/ 目錄下創(chuàng)建一個 main.py 文件,代碼如下:
此腳本設置一個 RabbitMQ 使用者,用于偵聽“email_notification”隊列中的消息。收到消息時,它會 從 email_service
模塊調用通知
函數(shù)來處理通知過程。如果成功,它將確認消息;否則,它將拒絕該消息并打印錯誤消息。
import pika
import sys
import os
import time
import email_service
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
def main():
# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_URL))
channel = connection.channel()
def callback(ch, method, properties, body):
try:
err = email_service.notification(body)
if err:
ch.basic_nack(delivery_tag=method.delivery_tag)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue="email_notification", on_message_callback=callback
)
print("Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)
在 notification_service/ 目錄下創(chuàng)建一個 email_service.py 文件,代碼如下:
import smtplib, os, json
from email.message import EmailMessage
from dotenv import load_dotenv
from email.mime.text import MIMEText
load_dotenv()
def notification(message):
try:
message = json.loads(message)
receiver_address = message["email"]
subject = message["subject"]
body = message["body"]
other = message["other"]
sender_address = os.environ.get("GMAIL_ADDRESS")
sender_password = os.environ.get("GMAIL_PASSWORD")
# Gmail SMTP server settings
smtp_server = 'smtp.gmail.com'
smtp_port = 587
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_address, sender_password)
# Compose the email message
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = sender_address
msg['To'] = receiver_address
server.sendmail(sender_address, receiver_address, msg.as_string())
server.quit()
print("Mail Sent")
except Exception as e:
print(f"Failed to send email: {e}")
綜上所述,我們成功地使用 FastAPI 和 RabbitMQ 實現(xiàn)了端到端的微服務架構。我們演示了如何創(chuàng)建用戶身份驗證服務、用于 OCR 處理的機器學習服務以及用于電子郵件通知的通知服務。
通過本博客,我們了解了微服務的關鍵概念,例如服務隔離、通過消息隊列進行通信以及使用異步處理實現(xiàn)可擴展性和性能的好處。