什么是微服務架構?

另一方面,微服務架構是一種將應用程序劃分為更小、可獨立部署的服務的方法。每個服務都有自己的業(yè)務邏輯和數(shù)據(jù)庫,并通過輕量級協(xié)議與其他服務通信。這種方法可以縮短開發(fā)周期、簡化維護并提高可擴展性。

整體與微服務之間的區(qū)別

什么是 RabbitMQ,為什么它用于構建微服務?

RabbitMQ 是一個實現(xiàn)高級消息隊列協(xié)議 (AMQP) 的消息代理。它充當分布式系統(tǒng)各個組件的中間人,使它們能夠高效地通信和協(xié)調任務。以下是 RabbitMQ 在微服務架構中被廣泛使用的原因:

  1. 解耦: RabbitMQ 通過允許系統(tǒng)組件異步通信來幫助解耦系統(tǒng)組件。這意味著服務可以繼續(xù)獨立運行,而無需等待彼此的響應,從而實現(xiàn)更具彈性和可擴展性的系統(tǒng)。
  2. 負載平衡: RabbitMQ 可以將消息分發(fā)到多個消費者實例,幫助平衡負載并確保高效的資源利用率。
  3. 容錯性: RabbitMQ 支持集群和復制,即使某個節(jié)點發(fā)生故障,消息也不會丟失,系統(tǒng)容錯性更強,可靠性更高。
  4. 可擴展性: RabbitMQ 允許您通過向集群添加更多消費者實例或節(jié)點來擴展您的系統(tǒng),從而使您能夠隨著應用程序的增長處理增加的消息流量。
  5. 消息路由:RabbitMQ 支持多種消息路由機制,例如直接、主題和扇出交換,這些機制允許您根據(jù)路由鍵或模式將消息路由到特定的隊列。
  6. 消息鳴謝:RabbitMQ 支持消息鳴謝,保證消息只處理一次,不會在傳輸過程中丟失。
  7. 總體而言,RabbitMQ 是一個強大而可靠的消息傳遞系統(tǒng),有助于構建可擴展、解耦和容錯的微服務架構。

現(xiàn)在我們已經介紹了微服務的基礎知識,讓我們更深入地研究如何編寫我們的第一個微服務。我們將探索如何使用 Python、FastAPI、RabbitMQ 和 PostgreSQL 設計和實現(xiàn)微服務架構。本動手指南將引導您設置每個組件、設計微服務交互以及部署微服務以創(chuàng)建功能齊全的應用程序。讓我們開始吧!

對微服務應用程序進行編碼

一、項目介紹

我們的應用程序包括四項主要服務:

  1. 網(wǎng)關服務: 此服務充當所有傳入請求的入口點。它將請求路由到適當?shù)奈⒎眨⑻幚響贸绦虻恼w編排。
  2. ML 服務:ML 服務負責處理圖像數(shù)據(jù)。它使用 Keras OCR 從圖像中提取文本,并與網(wǎng)關服務通信以接收圖像數(shù)據(jù)并發(fā)回提取的文本。
  3. Auth 服務:Auth 服務處理用戶身份驗證和電子郵件驗證。它包括用于注冊用戶、生成和驗證 OTP 以及確保電子郵件驗證的功能。
  4. 通知服務:此服務負責向用戶發(fā)送電子郵件。它在進程完成時觸發(fā)。

2 . 先決條件

在開始之前,請確保滿足以下先決條件:

3. 設置要求

使用 Docker 安裝 PostgreSQL

要使用 Docker 安裝 PostgreSQL,請運行以下命令:

docker run --name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres--name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres

使用 Docker 安裝 RabbitMQ

要使用 Docker 安裝 RabbitMQ,請運行以下命令:

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management--主機名我的兔子--名稱一些兔子-p  5672 : 5672 -p 15672 : 15672 rabbitmq: 3-管理

4. 設置項目

A. 設置 Project 文件夾

演示



│env

...

B. 實施網(wǎng)關

在本節(jié)中,我們將重點介紹如何實現(xiàn)網(wǎng)關服務,該服務充當微服務架構中所有傳入請求的入口點。網(wǎng)關服務負責將請求路由到適當?shù)奈⒎詹⑻幚響贸绦虻恼w編排。

現(xiàn)在,讓我們實現(xiàn)網(wǎng)關服務。使用以下代碼在 gateway/ 目錄中創(chuàng)建一個 main.py 文件:

from fastapi import FastAPI, HTTPException ,  File, UploadFile
import fastapi as _fastapi
from fastapi.security import OAuth2PasswordBearer
from dotenv import load_dotenv
from jwt.exceptions import DecodeError
from pydantic import BaseModel
import requests
import base64
import pika
import logging
import os
import jwt
import rpc_client

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Load environment variables
load_dotenv()
logging.basicConfig(level=logging.INFO)

# Retrieve environment variables
JWT_SECRET = os.environ.get("JWT_SECRET")
AUTH_BASE_URL = os.environ.get("AUTH_BASE_URL")
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL)) # add container name in docker
channel = connection.channel()
channel.queue_declare(queue='gatewayservice')
channel.queue_declare(queue='ocr_service')

# JWT token validation
async def jwt_validation(token: str = _fastapi.Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return payload
except DecodeError:
raise HTTPException(status_code=401, detail="Invalid JWT token")

# Pydantic models for request body
class GenerateUserToken(BaseModel):
username: str
password: str

class UserCredentials(BaseModel):
username: str
password: str

class UserRegisteration(BaseModel):
name: str
email: str
password: str

class GenerateOtp(BaseModel):
email: str

class VerifyOtp(BaseModel):
email: str
otp: int

# Authentication routes
@app.post("/auth/login", tags=['Authentication Service'])
async def login(user_data: UserCredentials):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/token", json={"username": user_data.username, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")

@app.post("/auth/register", tags=['Authentication Service'])
async def registeration(user_data:UserRegisteration):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users", json={"name":user_data.name,"email": user_data.email, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")

@app.post("/auth/generate_otp", tags=['Authentication Service'])
async def generate_otp(user_data:GenerateOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/generate_otp", json={"email":user_data.email})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")

@app.post("/auth/verify_otp", tags=['Authentication Service'])
async def verify_otp(user_data:VerifyOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/verify_otp", json={"email":user_data.email ,"otp":user_data.otp})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")

# ml microservice route - OCR route
@app.post('/ocr' , tags=['Machine learning Service'] )
def ocr(file: UploadFile = File(...),
payload: dict = _fastapi.Depends(jwt_validation)):

# Save the uploaded file to a temporary location
with open(file.filename, "wb") as buffer:
buffer.write(file.file.read())

ocr_rpc = rpc_client.OcrRpcClient()

with open(file.filename, "rb") as buffer:
file_data = buffer.read()
file_base64 = base64.b64encode(file_data).decode()

request_json = {
'user_name':payload['name'],
'user_email':payload['email'],
'user_id':payload['id'],
'file': file_base64
}

# Call the OCR microservice with the request JSON
response = ocr_rpc.call(request_json)

# Delete the temporary image file
os.remove(file.filename)
return response

if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5001, reload=True)

要為網(wǎng)關設置環(huán)境,請在網(wǎng)關文件夾中創(chuàng)建一個 .env 文件。

AUTH_BASE_URL=http://0.0.0.0:5000
JWT_SECRET=e56623570e0a0152989fd38e13da9cd6eb7031e4e039e939ba845167ee59b496
RABBITMQ_URL=localhost

為了與其他微服務通信,我們將使用 RabbitMQ,這是一個支持服務之間異步消息傳遞的消息代理。我們將在 gateway/ 目錄中創(chuàng)建一個 rpc_client.py 文件來處理與 RabbitMQ 服務器的通信。

import pika
import uuid
import json
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")

class OcrRpcClient(object):

def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_URL))

self.channel = self.connection.channel()

result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, message):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='ocr_service',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(message))
while self.response is None:
self.connection.process_data_events()
response_json = json.loads(self.response)
return response_json

此代碼定義了一個客戶端類 OcrRpcClient,用于使用 RabbitMQ 向 OCR 微服務(ML 微服務)發(fā)送消息。它初始化連接,為響應設置回調隊列,并提供異步發(fā)送消息和接收響應的方法。

  1. 初始化 (__init__):

建立與 RabbitMQ 的連接。創(chuàng)建通道并聲明唯一的回調隊列。設置使用者以偵聽回調隊列上的響應。

2. 發(fā)送請求(呼叫):

向 OCR 微服務(ML 微服務)發(fā)送消息。等待回調隊列上的響應并返回它。

此類使網(wǎng)關服務能夠使用 RabbitMQ 高效地與 OCR 微服務通信。

C . 實現(xiàn) Auth 微服務

在本節(jié)中,我們將重點介紹如何實現(xiàn)身份驗證服務,該服務充當微服務架構中所有與身份驗證相關的請求的入口點。身份驗證服務負責用戶身份驗證、注冊和 OTP 生成。現(xiàn)在,讓我們實現(xiàn)身份驗證服務。

auth/ 目錄下創(chuàng)建一個 main.py 文件,代碼如下:

此代碼使用 FastAPI 實現(xiàn)身份驗證服務,用于用戶注冊、登錄、JWT 令牌生成、使用 OTP 的電子郵件驗證和用戶資料檢索。它使用 SQLAlchemy 進行數(shù)據(jù)庫作,使用 RabbitMQ 發(fā)送 OTP 電子郵件。該服務包括用于創(chuàng)建用戶、生成 JWT 令牌、檢索用戶資料和驗證 OTP 以進行電子郵件驗證的端點。

from typing import List
from fastapi import HTTPException
import fastapi as _fastapi
import schemas as _schemas
import sqlalchemy.orm as _orm
import models as _models
import service as _services
import logging
import database as _database
import pika

# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue='email_notification')

def get_db():
db = _database.SessionLocal()
try:
yield db
finally:
db.close()

app = _fastapi.FastAPI()
logging.basicConfig(level=logging.INFO)
_models.Base.metadata.create_all(_models.engine)

@app.post("/api/users" , tags = ['User Auth'])
async def create_user(
user: _schemas.UserCreate,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
db_user = await _services.get_user_by_email(email=user.email, db=db)

if db_user:
logging.info('User with that email already exists')
raise _fastapi.HTTPException(
status_code=200,
detail="User with that email already exists")

user = await _services.create_user(user=user, db=db)

return _fastapi.HTTPException(
status_code=201,
detail="User Registered, Please verify email to activate account !")

# Endpoint to check if the API is live
@app.get("/check_api")
async def check_api():
return {"status": "Connected to API Successfully"}

@app.post("/api/token" ,tags = ['User Auth'])
async def generate_token(
#form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),
user_data: _schemas.GenerateUserToken,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.authenticate_user(email=user_data.username, password=user_data.password, db=db)

if user == "is_verified_false":
logging.info('Email verification is pending. Please verify your email to proceed. ')
raise _fastapi.HTTPException(
status_code=403, detail="Email verification is pending. Please verify your email to proceed.")

if not user:
logging.info('Invalid Credentials')
raise _fastapi.HTTPException(
status_code=401, detail="Invalid Credentials")

logging.info('JWT Token Generated')
return await _services.create_token(user=user)

@app.get("/api/users/me", response_model=_schemas.User , tags = ['User Auth'])
async def get_user(user: _schemas.User = _fastapi.Depends(_services.get_current_user)):
return user

@app.get("/api/users/profile", tags=['User Auth'])
async def get_user(email: str, db: _orm.Session = _fastapi.Depends(_services.get_db)):
return db.query(_models.User and _models.Address).filter_by(id=1).first()

@app.post("/api/users/generate_otp", response_model=str, tags=["User Auth"])
async def send_otp_mail(userdata: _schemas.GenerateOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db)

if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")

if user.is_verified:
raise _fastapi.HTTPException(status_code=400, detail="User is already verified")

# Generate and send OTP
otp = _services.generate_otp()
print(otp)
_services.send_otp(userdata.email, otp, channel)

# Store the OTP in the database
user.otp = otp
db.add(user)
db.commit()

return "OTP sent to your email"

@app.post("/api/users/verify_otp", tags=["User Auth"])
async def verify_otp(userdata: _schemas.VerifyOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db )

if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")

if not user.otp or user.otp != userdata.otp:
raise _fastapi.HTTPException(status_code=400, detail="Invalid OTP")

# Update user's is_verified field
user.is_verified = True
user.otp = None # Clear the OTP
db.add(user)
db.commit()

return "Email verified successfully"

if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=True)

auth/ 目錄下創(chuàng)建一個 database.py 文件,代碼如下:

此代碼設置用于連接到 PostgreSQL 數(shù)據(jù)庫的 SQLAlchemy 引擎和會話創(chuàng)建器。它使用 dotenv 從環(huán)境變量加載數(shù)據(jù)庫連接詳細信息DATABASE_URL是使用檢索到的環(huán)境變量(包括主機、數(shù)據(jù)庫名稱、用戶名和密碼)構建的。該引擎是使用 create_engine DATABASE_URL 創(chuàng)建的并且 SessionLocal 被定義為綁定到此引擎的會話創(chuàng)建者。Base 變量被初始化為用于定義 ORM 模型的聲明性基。

import sqlalchemy as _sql
import sqlalchemy.ext.declarative as _declarative
import sqlalchemy.orm as _orm
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Retrieve environment variables
postgres_host = os.environ.get("POSTGRES_HOST")
postgres_db = os.environ.get("POSTGRES_DB")
postgres_user = os.environ.get("POSTGRES_USER")
postgres_password = os.environ.get("POSTGRES_PASSWORD")

# Assuming your PostgreSQL server is running locally with a database named 'mydatabase'
DATABASE_URL = f"postgresql://{postgres_user}:{postgres_password}@{postgres_host}/{postgres_db}"

engine = _sql.create_engine(DATABASE_URL)
SessionLocal = _orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = _declarative.declarative_base()

auth/ 目錄下創(chuàng)建一個 models.py 文件,代碼如下:

此代碼為 UserAddress定義 SQLAlchemy 模型 ,存儲用戶信息和地址以及它們之間的關系。它還使用提供的引擎在數(shù)據(jù)庫中創(chuàng)建表。

import datetime as _dt
import sqlalchemy as _sql
import sqlalchemy.orm as _orm
import passlib.hash as _hash
from database import Base , engine
import database as _database

Base.metadata.create_all(engine)

class User(_database.Base):
__tablename__ = "users"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
name = _sql.Column(_sql.String)
email = _sql.Column(_sql.String, unique=True, index=True)
is_verified = _sql.Column(_sql.Boolean , default=False)
otp = _sql.Column(_sql.Integer)
hashed_password = _sql.Column(_sql.String)
addresses = _orm.relationship("Address", back_populates="user")
date_created = _sql.Column(_sql.DateTime, default=_dt.datetime.utcnow)

def verify_password(self, password: str):
return _hash.bcrypt.verify(password, self.hashed_password)

class Address(_database.Base):
__tablename__ = "addresses"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
street = _sql.Column(_sql.String)
landmark = _sql.Column(_sql.String)
city = _sql.Column(_sql.String)
country = _sql.Column(_sql.String)
pincode = _sql.Column(_sql.String)
user_id = _sql.Column(_sql.Integer, _sql.ForeignKey("users.id"))
user = _orm.relationship("User", back_populates="addresses")
latitude = _sql.Column(_sql.Float)
longitude = _sql.Column(_sql.Float)

auth/ 目錄下創(chuàng)建一個 schemas.py 文件,代碼如下:

此代碼為 用戶相關的數(shù)據(jù)結構定義 Pydantic 模型,包括用戶創(chuàng)建、身份驗證和 OTP 驗證。它還包括用于位置信息的地址模型。這些模型配置為從字典屬性自動創(chuàng)建實例。

import datetime
import pydantic

class UserBase(pydantic.BaseModel):
name: str
email: str
class Config:
from_attributes=True

class UserCreate(UserBase):
password: str
class Config:
from_attributes=True

class User(UserBase):
id: int
date_created: datetime.datetime
class Config:
from_attributes=True

class AddressBase(pydantic.BaseModel):
street: str
landmark: str
city: str
country: str
pincode: str
latitude: float
longitude: float
class Config:
from_attributes=True

class GenerateUserToken(pydantic.BaseModel):
username: str
password: str
class Config:
from_attributes=True

class GenerateOtp(pydantic.BaseModel):
email: str

class VerifyOtp(pydantic.BaseModel):
email: str
otp: int

auth/ 目錄下創(chuàng)建一個 service.py 文件,代碼如下:

此代碼定義了用于用戶身份驗證和 OTP(一次性密碼)生成和驗證的各種函數(shù)和依賴項。它使用 FastAPI 來處理 HTTP 請求,使用 SQLAlchemy 進行數(shù)據(jù)庫作,使用 Pydantic 進行數(shù)據(jù)驗證和序列化,使用 JWT 進行身份驗證,使用 RabbitMQ 發(fā)送電子郵件通知。這些功能包括創(chuàng)建數(shù)據(jù)庫、獲取數(shù)據(jù)庫會話、創(chuàng)建新用戶、對用戶進行身份驗證、創(chuàng)建 JWT 令牌、從 JWT 令牌獲取當前用戶、生成隨機 OTP、連接到 RabbitMQ 以及發(fā)送 OTP 電子郵件通知。

import jwt
import sqlalchemy.orm as _orm
import passlib.hash as _hash
import email_validator as _email_check
import fastapi as _fastapi
import fastapi.security as _security
from passlib.hash import bcrypt
import database as _database
import schemas as _schemas
import models as _models
import random
import json
import pika
import time
import os

# Load environment variables
JWT_SECRET = os.getenv("JWT_SECRET")
RABBITMQ_URL = os.getenv("RABBITMQ_URL")
oauth2schema = _security.OAuth2PasswordBearer("/api/token")

def create_database():
# Create database tables
return _database.Base.metadata.create_all(bind=_database.engine)

def get_db():
# Dependency to get a database session
db = _database.SessionLocal()
try:
yield db
finally:
db.close()

async def get_user_by_email(email: str, db: _orm.Session):
# Retrieve a user by email from the database
return db.query(_models.User).filter(_models.User.email == email and _models.User.is_verified==True).first()

async def create_user(user: _schemas.UserCreate, db: _orm.Session):
# Create a new user in the database
try:
valid = _email_check.validate_email(user.email)
name = user.name
email = valid.email
except _email_check.EmailNotValidError:
raise _fastapi.HTTPException(status_code=404, detail="Please enter a valid email")

user_obj = _models.User(email=email, name=name, hashed_password=_hash.bcrypt.hash(user.password))
db.add(user_obj)
db.commit()
db.refresh(user_obj)
return user_obj

async def authenticate_user(email: str, password: str, db: _orm.Session):
# Authenticate a user
user = await get_user_by_email(email=email, db=db)

if not user:
return False

if not user.is_verified:
return 'is_verified_false'

if not user.verify_password(password):
return False

return user

async def create_token(user: _models.User):
# Create a JWT token for authentication
user_obj = _schemas.User.from_orm(user)
user_dict = user_obj.model_dump()
del user_dict["date_created"]
token = jwt.encode(user_dict, JWT_SECRET, algorithm="HS256")
return dict(access_token=token, token_type="bearer")

async def get_current_user(db: _orm.Session = _fastapi.Depends(get_db), token: str = _fastapi.Depends(oauth2schema)):
# Get the current authenticated user from the JWT token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user = db.query(_models.User).get(payload["id"])
except:
raise _fastapi.HTTPException(status_code=401, detail="Invalid Email or Password")
return _schemas.User.from_orm(user)

def generate_otp():
# Generate a random OTP
return str(random.randint(100000, 999999))

def connect_to_rabbitmq():
# Connect to RabbitMQ
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL))
return connection
except pika.exceptions.AMQPConnectionError:
print("Failed to connect to RabbitMQ. Retrying in 5 seconds...")
time.sleep(5)

def send_otp(email, otp, channel):
# Send an OTP email notification using RabbitMQ
connection = connect_to_rabbitmq()
channel = connection.channel()
message = {'email': email,
'subject': 'Account Verification OTP Notification',
'other': 'null',
'body': f'Your OTP for account verification is: {otp} \n Please enter this OTP on the verification page to complete your account setup. \n If you did not request this OTP, please ignore this message.\n Thank you '
}

try:
queue_declare_ok = channel.queue_declare(queue='email_notification', passive=True)
current_durable = queue_declare_ok.method.queue

if current_durable:
if queue_declare_ok.method.queue != current_durable:
channel.queue_delete(queue='email_notification')
channel.queue_declare(queue='email_notification', durable=True)
else:
channel.queue_declare(queue='email_notification', durable=True)

channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OTP email notification")
except Exception as err:
print(f"Failed to publish message: {err}")
finally:
channel.close()
connection.close()

D. 實施機器學習微服務

在本節(jié)中,我們將重點介紹如何實現(xiàn)機器學習服務,該服務作為微服務架構中所有 OCR(光學字符識別)相關請求的入口點。機器學習服務負責處理 OCR 請求、從圖像中提取文本以及在完成時發(fā)送通知。

現(xiàn)在,讓我們實現(xiàn)機器學習服務。

ml_services/ 目錄下創(chuàng)建一個 main.py 文件,代碼如下:

此 Python 腳本連接到 RabbitMQ 服務器,并使用來自名為“ocr_service”的隊列中的消息。收到消息后,它使用 OCRService 對象對其進行處理,使用 send_email_notification 函數(shù)發(fā)送電子郵件通知,然后將響應發(fā)布到回復隊列。在處理每條消息后,它確認向 RabbitMQ 傳送消息。該腳本使用預取計數(shù) 1 來限制 RabbitMQ 將傳送的未確認消息的數(shù)量。

import pika
import json
from utils import OCRService
from utils import send_email_notification

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='ocr_service')

# Callback function to process OCR requests
def on_request(ch, method, props, body):
# Initialize OCR service
ocr_service = OCRService()
# Process OCR request
response = ocr_service.process_request(body)

# Send email notification
send_email_notification(response['user_email'], response['ocr_text'], channel)

# Publish response to the reply queue
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=json.dumps(response))
# Acknowledge the message delivery
ch.basic_ack(delivery_tag=method.delivery_tag)
# Set prefetch count to 1
channel.basic_qos(prefetch_count=1)
# Consume messages from the 'ocr_service' queue
channel.basic_consume(queue='ocr_service', on_message_callback=on_request)
# Start consuming messages
print(" [x] Awaiting RPC requests")
channel.start_consuming()

ml_services/ 目錄下創(chuàng)建一個 utils.py 文件,代碼如下:

import json
import base64
import pandas as pd
#keras ocr pipeline and imports
import keras_ocr
import pika

class OCRService:

def __init__(self):
self.keras_pipeline = keras_ocr.pipeline.Pipeline()

def keras_ocr(self, image_path):
results = self.keras_pipeline.recognize([image_path])
df = pd.DataFrame(results[0], columns=['text', 'bbox'])
words = df['text'].tolist()
sentence = ' '.join(words)
return sentence

def process_request(self, message):
message_body = json.loads(message)
user_name = message_body['user_name']
user_email = message_body['user_email']
user_id = message_body['user_id']
file_base64 = message_body['file']
print(f" [x]user_id: {user_id} request recieved from gateway..")
print(f" [x]processing request for {user_name}")

# Assuming file_base64 contains the base64-encoded string
file_data = base64.b64decode(file_base64.encode())
# Write the decoded file data to a new file
with open('artifacts/decoded_file.png', 'wb') as f:
f.write(file_data)

image_path = "artifacts/decoded_file.png"
ocr_text = self.keras_ocr(image_path)
print(" [^]OCR processing done !!!")

response = {
"user_id": user_id,
"user_name": user_name,
"user_email": user_email,
"ocr_text": ocr_text
}

return response

def send_email_notification(email, ocr_text, channel):
# Send an email notification using RabbitMQ
message = {
'email': email,
'subject':'OCR Process Completed !!',
'body':f'We are pleased to inform you that the OCR (Optical Character Recognition) process for your image has been successfully completed.\n The extracted text has been processed and is now ready for use.\n\n OCR text : {ocr_text}',
'other': 'null',
}

try:
channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OCR Process Completed email notification")
except Exception as err:
print(f"Failed to publish message: {err}")

D. 實施通知微服務

在本節(jié)中,我們將重點介紹如何實現(xiàn)通知服務,該服務向用戶發(fā)送電子郵件通知。

notification_service/ 目錄下創(chuàng)建一個 main.py 文件,代碼如下:

此腳本設置一個 RabbitMQ 使用者,用于偵聽“email_notification”隊列中的消息。收到消息時,它會 從 email_service 模塊調用通知函數(shù)來處理通知過程。如果成功,它將確認消息;否則,它將拒絕該消息并打印錯誤消息。

import pika
import sys
import os
import time
import email_service
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")

def main():
# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_URL))
channel = connection.channel()

def callback(ch, method, properties, body):
try:
err = email_service.notification(body)
if err:
ch.basic_nack(delivery_tag=method.delivery_tag)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag)

channel.basic_consume(
queue="email_notification", on_message_callback=callback
)

print("Waiting for messages. To exit press CTRL+C")

channel.start_consuming()

if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)

notification_service/ 目錄下創(chuàng)建一個 email_service.py 文件,代碼如下:

import smtplib, os, json
from email.message import EmailMessage
from dotenv import load_dotenv
from email.mime.text import MIMEText

load_dotenv()

def notification(message):
try:
message = json.loads(message)
receiver_address = message["email"]
subject = message["subject"]
body = message["body"]
other = message["other"]

sender_address = os.environ.get("GMAIL_ADDRESS")
sender_password = os.environ.get("GMAIL_PASSWORD")

# Gmail SMTP server settings
smtp_server = 'smtp.gmail.com'
smtp_port = 587

server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_address, sender_password)

# Compose the email message
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = sender_address
msg['To'] = receiver_address

server.sendmail(sender_address, receiver_address, msg.as_string())
server.quit()

print("Mail Sent")
except Exception as e:
print(f"Failed to send email: {e}")

結論

綜上所述,我們成功地使用 FastAPI 和 RabbitMQ 實現(xiàn)了端到端的微服務架構。我們演示了如何創(chuàng)建用戶身份驗證服務、用于 OCR 處理的機器學習服務以及用于電子郵件通知的通知服務。

通過本博客,我們了解了微服務的關鍵概念,例如服務隔離、通過消息隊列進行通信以及使用異步處理實現(xiàn)可擴展性和性能的好處。

上一篇:

Google Gemini API使用教程:提升SEO的終極指南
最后一篇
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數(shù)據(jù)驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費