什么是微服務(wù)架構(gòu)?

另一方面,微服務(wù)架構(gòu)是一種將應(yīng)用程序劃分為更小、可獨(dú)立部署的服務(wù)的方法。每個(gè)服務(wù)都有自己的業(yè)務(wù)邏輯和數(shù)據(jù)庫,并通過輕量級(jí)協(xié)議與其他服務(wù)通信。這種方法可以縮短開發(fā)周期、簡化維護(hù)并提高可擴(kuò)展性。

整體與微服務(wù)之間的區(qū)別

什么是 RabbitMQ,為什么它用于構(gòu)建微服務(wù)?

RabbitMQ 是一個(gè)實(shí)現(xiàn)高級(jí)消息隊(duì)列協(xié)議 (AMQP) 的消息代理。它充當(dāng)分布式系統(tǒng)各個(gè)組件的中間人,使它們能夠高效地通信和協(xié)調(diào)任務(wù)。以下是 RabbitMQ 在微服務(wù)架構(gòu)中被廣泛使用的原因:

  1. 解耦: RabbitMQ 通過允許系統(tǒng)組件異步通信來幫助解耦系統(tǒng)組件。這意味著服務(wù)可以繼續(xù)獨(dú)立運(yùn)行,而無需等待彼此的響應(yīng),從而實(shí)現(xiàn)更具彈性和可擴(kuò)展性的系統(tǒng)。
  2. 負(fù)載平衡: RabbitMQ 可以將消息分發(fā)到多個(gè)消費(fèi)者實(shí)例,幫助平衡負(fù)載并確保高效的資源利用率。
  3. 容錯(cuò)性: RabbitMQ 支持集群和復(fù)制,即使某個(gè)節(jié)點(diǎn)發(fā)生故障,消息也不會(huì)丟失,系統(tǒng)容錯(cuò)性更強(qiáng),可靠性更高。
  4. 可擴(kuò)展性: RabbitMQ 允許您通過向集群添加更多消費(fèi)者實(shí)例或節(jié)點(diǎn)來擴(kuò)展您的系統(tǒng),從而使您能夠隨著應(yīng)用程序的增長處理增加的消息流量。
  5. 消息路由:RabbitMQ 支持多種消息路由機(jī)制,例如直接、主題和扇出交換,這些機(jī)制允許您根據(jù)路由鍵或模式將消息路由到特定的隊(duì)列。
  6. 消息鳴謝:RabbitMQ 支持消息鳴謝,保證消息只處理一次,不會(huì)在傳輸過程中丟失。
  7. 總體而言,RabbitMQ 是一個(gè)強(qiáng)大而可靠的消息傳遞系統(tǒng),有助于構(gòu)建可擴(kuò)展、解耦和容錯(cuò)的微服務(wù)架構(gòu)。

現(xiàn)在我們已經(jīng)介紹了微服務(wù)的基礎(chǔ)知識(shí),讓我們更深入地研究如何編寫我們的第一個(gè)微服務(wù)。我們將探索如何使用 Python、FastAPI、RabbitMQ 和 PostgreSQL 設(shè)計(jì)和實(shí)現(xiàn)微服務(wù)架構(gòu)。本動(dòng)手指南將引導(dǎo)您設(shè)置每個(gè)組件、設(shè)計(jì)微服務(wù)交互以及部署微服務(wù)以創(chuàng)建功能齊全的應(yīng)用程序。讓我們開始吧!

對(duì)微服務(wù)應(yīng)用程序進(jìn)行編碼

一、項(xiàng)目介紹

我們的應(yīng)用程序包括四項(xiàng)主要服務(wù):

  1. 網(wǎng)關(guān)服務(wù): 此服務(wù)充當(dāng)所有傳入請(qǐng)求的入口點(diǎn)。它將請(qǐng)求路由到適當(dāng)?shù)奈⒎?wù),并處理應(yīng)用程序的整體編排。
  2. ML 服務(wù):ML 服務(wù)負(fù)責(zé)處理圖像數(shù)據(jù)。它使用 Keras OCR 從圖像中提取文本,并與網(wǎng)關(guān)服務(wù)通信以接收?qǐng)D像數(shù)據(jù)并發(fā)回提取的文本。
  3. Auth 服務(wù):Auth 服務(wù)處理用戶身份驗(yàn)證和電子郵件驗(yàn)證。它包括用于注冊(cè)用戶、生成和驗(yàn)證 OTP 以及確保電子郵件驗(yàn)證的功能。
  4. 通知服務(wù):此服務(wù)負(fù)責(zé)向用戶發(fā)送電子郵件。它在進(jìn)程完成時(shí)觸發(fā)。

2 . 先決條件

在開始之前,請(qǐng)確保滿足以下先決條件:

3. 設(shè)置要求

使用 Docker 安裝 PostgreSQL

要使用 Docker 安裝 PostgreSQL,請(qǐng)運(yùn)行以下命令:

docker run --name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres--name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres

使用 Docker 安裝 RabbitMQ

要使用 Docker 安裝 RabbitMQ,請(qǐng)運(yùn)行以下命令:

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management--主機(jī)名我的兔子--名稱一些兔子-p  5672 : 5672 -p 15672 : 15672 rabbitmq: 3-管理

4. 設(shè)置項(xiàng)目

A. 設(shè)置 Project 文件夾

演示



│env

...

B. 實(shí)施網(wǎng)關(guān)

在本節(jié)中,我們將重點(diǎn)介紹如何實(shí)現(xiàn)網(wǎng)關(guān)服務(wù),該服務(wù)充當(dāng)微服務(wù)架構(gòu)中所有傳入請(qǐng)求的入口點(diǎn)。網(wǎng)關(guān)服務(wù)負(fù)責(zé)將請(qǐng)求路由到適當(dāng)?shù)奈⒎?wù)并處理應(yīng)用程序的整體編排。

現(xiàn)在,讓我們實(shí)現(xiàn)網(wǎng)關(guān)服務(wù)。使用以下代碼在 gateway/ 目錄中創(chuàng)建一個(gè) main.py 文件:

from fastapi import FastAPI, HTTPException ,  File, UploadFile
import fastapi as _fastapi
from fastapi.security import OAuth2PasswordBearer
from dotenv import load_dotenv
from jwt.exceptions import DecodeError
from pydantic import BaseModel
import requests
import base64
import pika
import logging
import os
import jwt
import rpc_client

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Load environment variables
load_dotenv()
logging.basicConfig(level=logging.INFO)

# Retrieve environment variables
JWT_SECRET = os.environ.get("JWT_SECRET")
AUTH_BASE_URL = os.environ.get("AUTH_BASE_URL")
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL)) # add container name in docker
channel = connection.channel()
channel.queue_declare(queue='gatewayservice')
channel.queue_declare(queue='ocr_service')

# JWT token validation
async def jwt_validation(token: str = _fastapi.Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return payload
except DecodeError:
raise HTTPException(status_code=401, detail="Invalid JWT token")

# Pydantic models for request body
class GenerateUserToken(BaseModel):
username: str
password: str

class UserCredentials(BaseModel):
username: str
password: str

class UserRegisteration(BaseModel):
name: str
email: str
password: str

class GenerateOtp(BaseModel):
email: str

class VerifyOtp(BaseModel):
email: str
otp: int

# Authentication routes
@app.post("/auth/login", tags=['Authentication Service'])
async def login(user_data: UserCredentials):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/token", json={"username": user_data.username, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")

@app.post("/auth/register", tags=['Authentication Service'])
async def registeration(user_data:UserRegisteration):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users", json={"name":user_data.name,"email": user_data.email, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")

@app.post("/auth/generate_otp", tags=['Authentication Service'])
async def generate_otp(user_data:GenerateOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/generate_otp", json={"email":user_data.email})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")

@app.post("/auth/verify_otp", tags=['Authentication Service'])
async def verify_otp(user_data:VerifyOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/verify_otp", json={"email":user_data.email ,"otp":user_data.otp})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")

# ml microservice route - OCR route
@app.post('/ocr' , tags=['Machine learning Service'] )
def ocr(file: UploadFile = File(...),
payload: dict = _fastapi.Depends(jwt_validation)):

# Save the uploaded file to a temporary location
with open(file.filename, "wb") as buffer:
buffer.write(file.file.read())

ocr_rpc = rpc_client.OcrRpcClient()

with open(file.filename, "rb") as buffer:
file_data = buffer.read()
file_base64 = base64.b64encode(file_data).decode()

request_json = {
'user_name':payload['name'],
'user_email':payload['email'],
'user_id':payload['id'],
'file': file_base64
}

# Call the OCR microservice with the request JSON
response = ocr_rpc.call(request_json)

# Delete the temporary image file
os.remove(file.filename)
return response

if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5001, reload=True)

要為網(wǎng)關(guān)設(shè)置環(huán)境,請(qǐng)?jiān)诰W(wǎng)關(guān)文件夾中創(chuàng)建一個(gè) .env 文件。

AUTH_BASE_URL=http://0.0.0.0:5000
JWT_SECRET=e56623570e0a0152989fd38e13da9cd6eb7031e4e039e939ba845167ee59b496
RABBITMQ_URL=localhost

為了與其他微服務(wù)通信,我們將使用 RabbitMQ,這是一個(gè)支持服務(wù)之間異步消息傳遞的消息代理。我們將在 gateway/ 目錄中創(chuàng)建一個(gè) rpc_client.py 文件來處理與 RabbitMQ 服務(wù)器的通信。

import pika
import uuid
import json
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")

class OcrRpcClient(object):

def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_URL))

self.channel = self.connection.channel()

result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, message):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='ocr_service',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(message))
while self.response is None:
self.connection.process_data_events()
response_json = json.loads(self.response)
return response_json

此代碼定義了一個(gè)客戶端類 OcrRpcClient,用于使用 RabbitMQ 向 OCR 微服務(wù)(ML 微服務(wù))發(fā)送消息。它初始化連接,為響應(yīng)設(shè)置回調(diào)隊(duì)列,并提供異步發(fā)送消息和接收響應(yīng)的方法。

  1. 初始化 (__init__):

建立與 RabbitMQ 的連接。創(chuàng)建通道并聲明唯一的回調(diào)隊(duì)列。設(shè)置使用者以偵聽回調(diào)隊(duì)列上的響應(yīng)。

2. 發(fā)送請(qǐng)求(呼叫):

向 OCR 微服務(wù)(ML 微服務(wù))發(fā)送消息。等待回調(diào)隊(duì)列上的響應(yīng)并返回它。

此類使網(wǎng)關(guān)服務(wù)能夠使用 RabbitMQ 高效地與 OCR 微服務(wù)通信。

C . 實(shí)現(xiàn) Auth 微服務(wù)

在本節(jié)中,我們將重點(diǎn)介紹如何實(shí)現(xiàn)身份驗(yàn)證服務(wù),該服務(wù)充當(dāng)微服務(wù)架構(gòu)中所有與身份驗(yàn)證相關(guān)的請(qǐng)求的入口點(diǎn)。身份驗(yàn)證服務(wù)負(fù)責(zé)用戶身份驗(yàn)證、注冊(cè)和 OTP 生成。現(xiàn)在,讓我們實(shí)現(xiàn)身份驗(yàn)證服務(wù)。

auth/ 目錄下創(chuàng)建一個(gè) main.py 文件,代碼如下:

此代碼使用 FastAPI 實(shí)現(xiàn)身份驗(yàn)證服務(wù),用于用戶注冊(cè)、登錄、JWT 令牌生成、使用 OTP 的電子郵件驗(yàn)證和用戶資料檢索。它使用 SQLAlchemy 進(jìn)行數(shù)據(jù)庫作,使用 RabbitMQ 發(fā)送 OTP 電子郵件。該服務(wù)包括用于創(chuàng)建用戶、生成 JWT 令牌、檢索用戶資料和驗(yàn)證 OTP 以進(jìn)行電子郵件驗(yàn)證的端點(diǎn)。

from typing import List
from fastapi import HTTPException
import fastapi as _fastapi
import schemas as _schemas
import sqlalchemy.orm as _orm
import models as _models
import service as _services
import logging
import database as _database
import pika

# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue='email_notification')

def get_db():
db = _database.SessionLocal()
try:
yield db
finally:
db.close()

app = _fastapi.FastAPI()
logging.basicConfig(level=logging.INFO)
_models.Base.metadata.create_all(_models.engine)

@app.post("/api/users" , tags = ['User Auth'])
async def create_user(
user: _schemas.UserCreate,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
db_user = await _services.get_user_by_email(email=user.email, db=db)

if db_user:
logging.info('User with that email already exists')
raise _fastapi.HTTPException(
status_code=200,
detail="User with that email already exists")

user = await _services.create_user(user=user, db=db)

return _fastapi.HTTPException(
status_code=201,
detail="User Registered, Please verify email to activate account !")

# Endpoint to check if the API is live
@app.get("/check_api")
async def check_api():
return {"status": "Connected to API Successfully"}

@app.post("/api/token" ,tags = ['User Auth'])
async def generate_token(
#form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),
user_data: _schemas.GenerateUserToken,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.authenticate_user(email=user_data.username, password=user_data.password, db=db)

if user == "is_verified_false":
logging.info('Email verification is pending. Please verify your email to proceed. ')
raise _fastapi.HTTPException(
status_code=403, detail="Email verification is pending. Please verify your email to proceed.")

if not user:
logging.info('Invalid Credentials')
raise _fastapi.HTTPException(
status_code=401, detail="Invalid Credentials")

logging.info('JWT Token Generated')
return await _services.create_token(user=user)

@app.get("/api/users/me", response_model=_schemas.User , tags = ['User Auth'])
async def get_user(user: _schemas.User = _fastapi.Depends(_services.get_current_user)):
return user

@app.get("/api/users/profile", tags=['User Auth'])
async def get_user(email: str, db: _orm.Session = _fastapi.Depends(_services.get_db)):
return db.query(_models.User and _models.Address).filter_by(id=1).first()

@app.post("/api/users/generate_otp", response_model=str, tags=["User Auth"])
async def send_otp_mail(userdata: _schemas.GenerateOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db)

if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")

if user.is_verified:
raise _fastapi.HTTPException(status_code=400, detail="User is already verified")

# Generate and send OTP
otp = _services.generate_otp()
print(otp)
_services.send_otp(userdata.email, otp, channel)

# Store the OTP in the database
user.otp = otp
db.add(user)
db.commit()

return "OTP sent to your email"

@app.post("/api/users/verify_otp", tags=["User Auth"])
async def verify_otp(userdata: _schemas.VerifyOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db )

if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")

if not user.otp or user.otp != userdata.otp:
raise _fastapi.HTTPException(status_code=400, detail="Invalid OTP")

# Update user's is_verified field
user.is_verified = True
user.otp = None # Clear the OTP
db.add(user)
db.commit()

return "Email verified successfully"

if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=True)

auth/ 目錄下創(chuàng)建一個(gè) database.py 文件,代碼如下:

此代碼設(shè)置用于連接到 PostgreSQL 數(shù)據(jù)庫的 SQLAlchemy 引擎和會(huì)話創(chuàng)建器。它使用 dotenv 從環(huán)境變量加載數(shù)據(jù)庫連接詳細(xì)信息DATABASE_URL是使用檢索到的環(huán)境變量(包括主機(jī)、數(shù)據(jù)庫名稱、用戶名和密碼)構(gòu)建的。該引擎是使用 create_engine DATABASE_URL 創(chuàng)建的并且 SessionLocal 被定義為綁定到此引擎的會(huì)話創(chuàng)建者。Base 變量被初始化為用于定義 ORM 模型的聲明性基。

import sqlalchemy as _sql
import sqlalchemy.ext.declarative as _declarative
import sqlalchemy.orm as _orm
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Retrieve environment variables
postgres_host = os.environ.get("POSTGRES_HOST")
postgres_db = os.environ.get("POSTGRES_DB")
postgres_user = os.environ.get("POSTGRES_USER")
postgres_password = os.environ.get("POSTGRES_PASSWORD")

# Assuming your PostgreSQL server is running locally with a database named 'mydatabase'
DATABASE_URL = f"postgresql://{postgres_user}:{postgres_password}@{postgres_host}/{postgres_db}"

engine = _sql.create_engine(DATABASE_URL)
SessionLocal = _orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = _declarative.declarative_base()

auth/ 目錄下創(chuàng)建一個(gè) models.py 文件,代碼如下:

此代碼為 UserAddress定義 SQLAlchemy 模型 ,存儲(chǔ)用戶信息和地址以及它們之間的關(guān)系。它還使用提供的引擎在數(shù)據(jù)庫中創(chuàng)建表。

import datetime as _dt
import sqlalchemy as _sql
import sqlalchemy.orm as _orm
import passlib.hash as _hash
from database import Base , engine
import database as _database

Base.metadata.create_all(engine)

class User(_database.Base):
__tablename__ = "users"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
name = _sql.Column(_sql.String)
email = _sql.Column(_sql.String, unique=True, index=True)
is_verified = _sql.Column(_sql.Boolean , default=False)
otp = _sql.Column(_sql.Integer)
hashed_password = _sql.Column(_sql.String)
addresses = _orm.relationship("Address", back_populates="user")
date_created = _sql.Column(_sql.DateTime, default=_dt.datetime.utcnow)

def verify_password(self, password: str):
return _hash.bcrypt.verify(password, self.hashed_password)

class Address(_database.Base):
__tablename__ = "addresses"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
street = _sql.Column(_sql.String)
landmark = _sql.Column(_sql.String)
city = _sql.Column(_sql.String)
country = _sql.Column(_sql.String)
pincode = _sql.Column(_sql.String)
user_id = _sql.Column(_sql.Integer, _sql.ForeignKey("users.id"))
user = _orm.relationship("User", back_populates="addresses")
latitude = _sql.Column(_sql.Float)
longitude = _sql.Column(_sql.Float)

auth/ 目錄下創(chuàng)建一個(gè) schemas.py 文件,代碼如下:

此代碼為 用戶相關(guān)的數(shù)據(jù)結(jié)構(gòu)定義 Pydantic 模型,包括用戶創(chuàng)建、身份驗(yàn)證和 OTP 驗(yàn)證。它還包括用于位置信息的地址模型。這些模型配置為從字典屬性自動(dòng)創(chuàng)建實(shí)例。

import datetime
import pydantic

class UserBase(pydantic.BaseModel):
name: str
email: str
class Config:
from_attributes=True

class UserCreate(UserBase):
password: str
class Config:
from_attributes=True

class User(UserBase):
id: int
date_created: datetime.datetime
class Config:
from_attributes=True

class AddressBase(pydantic.BaseModel):
street: str
landmark: str
city: str
country: str
pincode: str
latitude: float
longitude: float
class Config:
from_attributes=True

class GenerateUserToken(pydantic.BaseModel):
username: str
password: str
class Config:
from_attributes=True

class GenerateOtp(pydantic.BaseModel):
email: str

class VerifyOtp(pydantic.BaseModel):
email: str
otp: int

auth/ 目錄下創(chuàng)建一個(gè) service.py 文件,代碼如下:

此代碼定義了用于用戶身份驗(yàn)證和 OTP(一次性密碼)生成和驗(yàn)證的各種函數(shù)和依賴項(xiàng)。它使用 FastAPI 來處理 HTTP 請(qǐng)求,使用 SQLAlchemy 進(jìn)行數(shù)據(jù)庫作,使用 Pydantic 進(jìn)行數(shù)據(jù)驗(yàn)證和序列化,使用 JWT 進(jìn)行身份驗(yàn)證,使用 RabbitMQ 發(fā)送電子郵件通知。這些功能包括創(chuàng)建數(shù)據(jù)庫、獲取數(shù)據(jù)庫會(huì)話、創(chuàng)建新用戶、對(duì)用戶進(jìn)行身份驗(yàn)證、創(chuàng)建 JWT 令牌、從 JWT 令牌獲取當(dāng)前用戶、生成隨機(jī) OTP、連接到 RabbitMQ 以及發(fā)送 OTP 電子郵件通知。

import jwt
import sqlalchemy.orm as _orm
import passlib.hash as _hash
import email_validator as _email_check
import fastapi as _fastapi
import fastapi.security as _security
from passlib.hash import bcrypt
import database as _database
import schemas as _schemas
import models as _models
import random
import json
import pika
import time
import os

# Load environment variables
JWT_SECRET = os.getenv("JWT_SECRET")
RABBITMQ_URL = os.getenv("RABBITMQ_URL")
oauth2schema = _security.OAuth2PasswordBearer("/api/token")

def create_database():
# Create database tables
return _database.Base.metadata.create_all(bind=_database.engine)

def get_db():
# Dependency to get a database session
db = _database.SessionLocal()
try:
yield db
finally:
db.close()

async def get_user_by_email(email: str, db: _orm.Session):
# Retrieve a user by email from the database
return db.query(_models.User).filter(_models.User.email == email and _models.User.is_verified==True).first()

async def create_user(user: _schemas.UserCreate, db: _orm.Session):
# Create a new user in the database
try:
valid = _email_check.validate_email(user.email)
name = user.name
email = valid.email
except _email_check.EmailNotValidError:
raise _fastapi.HTTPException(status_code=404, detail="Please enter a valid email")

user_obj = _models.User(email=email, name=name, hashed_password=_hash.bcrypt.hash(user.password))
db.add(user_obj)
db.commit()
db.refresh(user_obj)
return user_obj

async def authenticate_user(email: str, password: str, db: _orm.Session):
# Authenticate a user
user = await get_user_by_email(email=email, db=db)

if not user:
return False

if not user.is_verified:
return 'is_verified_false'

if not user.verify_password(password):
return False

return user

async def create_token(user: _models.User):
# Create a JWT token for authentication
user_obj = _schemas.User.from_orm(user)
user_dict = user_obj.model_dump()
del user_dict["date_created"]
token = jwt.encode(user_dict, JWT_SECRET, algorithm="HS256")
return dict(access_token=token, token_type="bearer")

async def get_current_user(db: _orm.Session = _fastapi.Depends(get_db), token: str = _fastapi.Depends(oauth2schema)):
# Get the current authenticated user from the JWT token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user = db.query(_models.User).get(payload["id"])
except:
raise _fastapi.HTTPException(status_code=401, detail="Invalid Email or Password")
return _schemas.User.from_orm(user)

def generate_otp():
# Generate a random OTP
return str(random.randint(100000, 999999))

def connect_to_rabbitmq():
# Connect to RabbitMQ
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL))
return connection
except pika.exceptions.AMQPConnectionError:
print("Failed to connect to RabbitMQ. Retrying in 5 seconds...")
time.sleep(5)

def send_otp(email, otp, channel):
# Send an OTP email notification using RabbitMQ
connection = connect_to_rabbitmq()
channel = connection.channel()
message = {'email': email,
'subject': 'Account Verification OTP Notification',
'other': 'null',
'body': f'Your OTP for account verification is: {otp} \n Please enter this OTP on the verification page to complete your account setup. \n If you did not request this OTP, please ignore this message.\n Thank you '
}

try:
queue_declare_ok = channel.queue_declare(queue='email_notification', passive=True)
current_durable = queue_declare_ok.method.queue

if current_durable:
if queue_declare_ok.method.queue != current_durable:
channel.queue_delete(queue='email_notification')
channel.queue_declare(queue='email_notification', durable=True)
else:
channel.queue_declare(queue='email_notification', durable=True)

channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OTP email notification")
except Exception as err:
print(f"Failed to publish message: {err}")
finally:
channel.close()
connection.close()

D. 實(shí)施機(jī)器學(xué)習(xí)微服務(wù)

在本節(jié)中,我們將重點(diǎn)介紹如何實(shí)現(xiàn)機(jī)器學(xué)習(xí)服務(wù),該服務(wù)作為微服務(wù)架構(gòu)中所有 OCR(光學(xué)字符識(shí)別)相關(guān)請(qǐng)求的入口點(diǎn)。機(jī)器學(xué)習(xí)服務(wù)負(fù)責(zé)處理 OCR 請(qǐng)求、從圖像中提取文本以及在完成時(shí)發(fā)送通知。

現(xiàn)在,讓我們實(shí)現(xiàn)機(jī)器學(xué)習(xí)服務(wù)。

ml_services/ 目錄下創(chuàng)建一個(gè) main.py 文件,代碼如下:

此 Python 腳本連接到 RabbitMQ 服務(wù)器,并使用來自名為“ocr_service”的隊(duì)列中的消息。收到消息后,它使用 OCRService 對(duì)象對(duì)其進(jìn)行處理,使用 send_email_notification 函數(shù)發(fā)送電子郵件通知,然后將響應(yīng)發(fā)布到回復(fù)隊(duì)列。在處理每條消息后,它確認(rèn)向 RabbitMQ 傳送消息。該腳本使用預(yù)取計(jì)數(shù) 1 來限制 RabbitMQ 將傳送的未確認(rèn)消息的數(shù)量。

import pika
import json
from utils import OCRService
from utils import send_email_notification

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='ocr_service')

# Callback function to process OCR requests
def on_request(ch, method, props, body):
# Initialize OCR service
ocr_service = OCRService()
# Process OCR request
response = ocr_service.process_request(body)

# Send email notification
send_email_notification(response['user_email'], response['ocr_text'], channel)

# Publish response to the reply queue
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=json.dumps(response))
# Acknowledge the message delivery
ch.basic_ack(delivery_tag=method.delivery_tag)
# Set prefetch count to 1
channel.basic_qos(prefetch_count=1)
# Consume messages from the 'ocr_service' queue
channel.basic_consume(queue='ocr_service', on_message_callback=on_request)
# Start consuming messages
print(" [x] Awaiting RPC requests")
channel.start_consuming()

ml_services/ 目錄下創(chuàng)建一個(gè) utils.py 文件,代碼如下:

import json
import base64
import pandas as pd
#keras ocr pipeline and imports
import keras_ocr
import pika

class OCRService:

def __init__(self):
self.keras_pipeline = keras_ocr.pipeline.Pipeline()

def keras_ocr(self, image_path):
results = self.keras_pipeline.recognize([image_path])
df = pd.DataFrame(results[0], columns=['text', 'bbox'])
words = df['text'].tolist()
sentence = ' '.join(words)
return sentence

def process_request(self, message):
message_body = json.loads(message)
user_name = message_body['user_name']
user_email = message_body['user_email']
user_id = message_body['user_id']
file_base64 = message_body['file']
print(f" [x]user_id: {user_id} request recieved from gateway..")
print(f" [x]processing request for {user_name}")

# Assuming file_base64 contains the base64-encoded string
file_data = base64.b64decode(file_base64.encode())
# Write the decoded file data to a new file
with open('artifacts/decoded_file.png', 'wb') as f:
f.write(file_data)

image_path = "artifacts/decoded_file.png"
ocr_text = self.keras_ocr(image_path)
print(" [^]OCR processing done !!!")

response = {
"user_id": user_id,
"user_name": user_name,
"user_email": user_email,
"ocr_text": ocr_text
}

return response

def send_email_notification(email, ocr_text, channel):
# Send an email notification using RabbitMQ
message = {
'email': email,
'subject':'OCR Process Completed !!',
'body':f'We are pleased to inform you that the OCR (Optical Character Recognition) process for your image has been successfully completed.\n The extracted text has been processed and is now ready for use.\n\n OCR text : {ocr_text}',
'other': 'null',
}

try:
channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OCR Process Completed email notification")
except Exception as err:
print(f"Failed to publish message: {err}")

D. 實(shí)施通知微服務(wù)

在本節(jié)中,我們將重點(diǎn)介紹如何實(shí)現(xiàn)通知服務(wù),該服務(wù)向用戶發(fā)送電子郵件通知。

notification_service/ 目錄下創(chuàng)建一個(gè) main.py 文件,代碼如下:

此腳本設(shè)置一個(gè) RabbitMQ 使用者,用于偵聽“email_notification”隊(duì)列中的消息。收到消息時(shí),它會(huì) 從 email_service 模塊調(diào)用通知函數(shù)來處理通知過程。如果成功,它將確認(rèn)消息;否則,它將拒絕該消息并打印錯(cuò)誤消息。

import pika
import sys
import os
import time
import email_service
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")

def main():
# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_URL))
channel = connection.channel()

def callback(ch, method, properties, body):
try:
err = email_service.notification(body)
if err:
ch.basic_nack(delivery_tag=method.delivery_tag)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag)

channel.basic_consume(
queue="email_notification", on_message_callback=callback
)

print("Waiting for messages. To exit press CTRL+C")

channel.start_consuming()

if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)

notification_service/ 目錄下創(chuàng)建一個(gè) email_service.py 文件,代碼如下:

import smtplib, os, json
from email.message import EmailMessage
from dotenv import load_dotenv
from email.mime.text import MIMEText

load_dotenv()

def notification(message):
try:
message = json.loads(message)
receiver_address = message["email"]
subject = message["subject"]
body = message["body"]
other = message["other"]

sender_address = os.environ.get("GMAIL_ADDRESS")
sender_password = os.environ.get("GMAIL_PASSWORD")

# Gmail SMTP server settings
smtp_server = 'smtp.gmail.com'
smtp_port = 587

server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_address, sender_password)

# Compose the email message
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = sender_address
msg['To'] = receiver_address

server.sendmail(sender_address, receiver_address, msg.as_string())
server.quit()

print("Mail Sent")
except Exception as e:
print(f"Failed to send email: {e}")

結(jié)論

綜上所述,我們成功地使用 FastAPI 和 RabbitMQ 實(shí)現(xiàn)了端到端的微服務(wù)架構(gòu)。我們演示了如何創(chuàng)建用戶身份驗(yàn)證服務(wù)、用于 OCR 處理的機(jī)器學(xué)習(xí)服務(wù)以及用于電子郵件通知的通知服務(wù)。

通過本博客,我們了解了微服務(wù)的關(guān)鍵概念,例如服務(wù)隔離、通過消息隊(duì)列進(jìn)行通信以及使用異步處理實(shí)現(xiàn)可擴(kuò)展性和性能的好處。

上一篇:

法律實(shí)踐管理開放API的重要性

下一篇:

9個(gè)最佳Text2Sql開源項(xiàng)目:自然語言到SQL的高效轉(zhuǎn)換工具
#你可能也喜歡這些API文章!

我們有何不同?

API服務(wù)商零注冊(cè)

多API并行試用

數(shù)據(jù)驅(qū)動(dòng)選型,提升決策效率

查看全部API→
??

熱門場(chǎng)景實(shí)測(cè),選對(duì)API

#AI文本生成大模型API

對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力

25個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)

#AI深度推理大模型API

對(duì)比大模型API的邏輯推理準(zhǔn)確性、分析深度、可視化建議合理性

10個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)