微信截圖_17412478771344.png)
Yahoo Finance API – 完整指南
另一方面,微服務(wù)架構(gòu)是一種將應(yīng)用程序劃分為更小、可獨(dú)立部署的服務(wù)的方法。每個(gè)服務(wù)都有自己的業(yè)務(wù)邏輯和數(shù)據(jù)庫,并通過輕量級(jí)協(xié)議與其他服務(wù)通信。這種方法可以縮短開發(fā)周期、簡化維護(hù)并提高可擴(kuò)展性。
RabbitMQ 是一個(gè)實(shí)現(xiàn)高級(jí)消息隊(duì)列協(xié)議 (AMQP) 的消息代理。它充當(dāng)分布式系統(tǒng)各個(gè)組件的中間人,使它們能夠高效地通信和協(xié)調(diào)任務(wù)。以下是 RabbitMQ 在微服務(wù)架構(gòu)中被廣泛使用的原因:
現(xiàn)在我們已經(jīng)介紹了微服務(wù)的基礎(chǔ)知識(shí),讓我們更深入地研究如何編寫我們的第一個(gè)微服務(wù)。我們將探索如何使用 Python、FastAPI、RabbitMQ 和 PostgreSQL 設(shè)計(jì)和實(shí)現(xiàn)微服務(wù)架構(gòu)。本動(dòng)手指南將引導(dǎo)您設(shè)置每個(gè)組件、設(shè)計(jì)微服務(wù)交互以及部署微服務(wù)以創(chuàng)建功能齊全的應(yīng)用程序。讓我們開始吧!
我們的應(yīng)用程序包括四項(xiàng)主要服務(wù):
在開始之前,請(qǐng)確保滿足以下先決條件:
要使用 Docker 安裝 PostgreSQL,請(qǐng)運(yùn)行以下命令:
docker run --name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres--name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres
要使用 Docker 安裝 RabbitMQ,請(qǐng)運(yùn)行以下命令:
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management--主機(jī)名我的兔子--名稱一些兔子-p 5672 : 5672 -p 15672 : 15672 rabbitmq: 3-管理
演示
│
│
│
│env
│
...
在本節(jié)中,我們將重點(diǎn)介紹如何實(shí)現(xiàn)網(wǎng)關(guān)服務(wù),該服務(wù)充當(dāng)微服務(wù)架構(gòu)中所有傳入請(qǐng)求的入口點(diǎn)。網(wǎng)關(guān)服務(wù)負(fù)責(zé)將請(qǐng)求路由到適當(dāng)?shù)奈⒎?wù)并處理應(yīng)用程序的整體編排。
現(xiàn)在,讓我們實(shí)現(xiàn)網(wǎng)關(guān)服務(wù)。使用以下代碼在 gateway/
目錄中創(chuàng)建一個(gè) main.py
文件:
from fastapi import FastAPI, HTTPException , File, UploadFile
import fastapi as _fastapi
from fastapi.security import OAuth2PasswordBearer
from dotenv import load_dotenv
from jwt.exceptions import DecodeError
from pydantic import BaseModel
import requests
import base64
import pika
import logging
import os
import jwt
import rpc_client
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Load environment variables
load_dotenv()
logging.basicConfig(level=logging.INFO)
# Retrieve environment variables
JWT_SECRET = os.environ.get("JWT_SECRET")
AUTH_BASE_URL = os.environ.get("AUTH_BASE_URL")
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL)) # add container name in docker
channel = connection.channel()
channel.queue_declare(queue='gatewayservice')
channel.queue_declare(queue='ocr_service')
# JWT token validation
async def jwt_validation(token: str = _fastapi.Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return payload
except DecodeError:
raise HTTPException(status_code=401, detail="Invalid JWT token")
# Pydantic models for request body
class GenerateUserToken(BaseModel):
username: str
password: str
class UserCredentials(BaseModel):
username: str
password: str
class UserRegisteration(BaseModel):
name: str
email: str
password: str
class GenerateOtp(BaseModel):
email: str
class VerifyOtp(BaseModel):
email: str
otp: int
# Authentication routes
@app.post("/auth/login", tags=['Authentication Service'])
async def login(user_data: UserCredentials):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/token", json={"username": user_data.username, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
@app.post("/auth/register", tags=['Authentication Service'])
async def registeration(user_data:UserRegisteration):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users", json={"name":user_data.name,"email": user_data.email, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
@app.post("/auth/generate_otp", tags=['Authentication Service'])
async def generate_otp(user_data:GenerateOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/generate_otp", json={"email":user_data.email})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
@app.post("/auth/verify_otp", tags=['Authentication Service'])
async def verify_otp(user_data:VerifyOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/verify_otp", json={"email":user_data.email ,"otp":user_data.otp})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
# ml microservice route - OCR route
@app.post('/ocr' , tags=['Machine learning Service'] )
def ocr(file: UploadFile = File(...),
payload: dict = _fastapi.Depends(jwt_validation)):
# Save the uploaded file to a temporary location
with open(file.filename, "wb") as buffer:
buffer.write(file.file.read())
ocr_rpc = rpc_client.OcrRpcClient()
with open(file.filename, "rb") as buffer:
file_data = buffer.read()
file_base64 = base64.b64encode(file_data).decode()
request_json = {
'user_name':payload['name'],
'user_email':payload['email'],
'user_id':payload['id'],
'file': file_base64
}
# Call the OCR microservice with the request JSON
response = ocr_rpc.call(request_json)
# Delete the temporary image file
os.remove(file.filename)
return response
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5001, reload=True)
要為網(wǎng)關(guān)設(shè)置環(huán)境,請(qǐng)?jiān)诰W(wǎng)關(guān)文件夾中創(chuàng)建一個(gè) .env
文件。
AUTH_BASE_URL=http://0.0.0.0:5000
JWT_SECRET=e56623570e0a0152989fd38e13da9cd6eb7031e4e039e939ba845167ee59b496
RABBITMQ_URL=localhost
為了與其他微服務(wù)通信,我們將使用 RabbitMQ,這是一個(gè)支持服務(wù)之間異步消息傳遞的消息代理。我們將在 gateway/
目錄中創(chuàng)建一個(gè) rpc_client.py
文件來處理與 RabbitMQ 服務(wù)器的通信。
import pika
import uuid
import json
from dotenv import load_dotenv
import os
# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
class OcrRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_URL))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, message):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='ocr_service',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(message))
while self.response is None:
self.connection.process_data_events()
response_json = json.loads(self.response)
return response_json
此代碼定義了一個(gè)客戶端類 OcrRpcClient
,用于使用 RabbitMQ 向 OCR 微服務(wù)(ML 微服務(wù))發(fā)送消息。它初始化連接,為響應(yīng)設(shè)置回調(diào)隊(duì)列,并提供異步發(fā)送消息和接收響應(yīng)的方法。
__init__
):建立與 RabbitMQ 的連接。創(chuàng)建通道并聲明唯一的回調(diào)隊(duì)列。設(shè)置使用者以偵聽回調(diào)隊(duì)列上的響應(yīng)。
2. 發(fā)送請(qǐng)求(呼叫
):
向 OCR 微服務(wù)(ML 微服務(wù))發(fā)送消息。等待回調(diào)隊(duì)列上的響應(yīng)并返回它。
此類使網(wǎng)關(guān)服務(wù)能夠使用 RabbitMQ 高效地與 OCR 微服務(wù)通信。
在本節(jié)中,我們將重點(diǎn)介紹如何實(shí)現(xiàn)身份驗(yàn)證服務(wù),該服務(wù)充當(dāng)微服務(wù)架構(gòu)中所有與身份驗(yàn)證相關(guān)的請(qǐng)求的入口點(diǎn)。身份驗(yàn)證服務(wù)負(fù)責(zé)用戶身份驗(yàn)證、注冊(cè)和 OTP 生成。現(xiàn)在,讓我們實(shí)現(xiàn)身份驗(yàn)證服務(wù)。
在 auth/ 目錄下創(chuàng)建一個(gè) main.py 文件,代碼如下:
此代碼使用 FastAPI 實(shí)現(xiàn)身份驗(yàn)證服務(wù),用于用戶注冊(cè)、登錄、JWT 令牌生成、使用 OTP 的電子郵件驗(yàn)證和用戶資料檢索。它使用 SQLAlchemy 進(jìn)行數(shù)據(jù)庫作,使用 RabbitMQ 發(fā)送 OTP 電子郵件。該服務(wù)包括用于創(chuàng)建用戶、生成 JWT 令牌、檢索用戶資料和驗(yàn)證 OTP 以進(jìn)行電子郵件驗(yàn)證的端點(diǎn)。
from typing import List
from fastapi import HTTPException
import fastapi as _fastapi
import schemas as _schemas
import sqlalchemy.orm as _orm
import models as _models
import service as _services
import logging
import database as _database
import pika
# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue='email_notification')
def get_db():
db = _database.SessionLocal()
try:
yield db
finally:
db.close()
app = _fastapi.FastAPI()
logging.basicConfig(level=logging.INFO)
_models.Base.metadata.create_all(_models.engine)
@app.post("/api/users" , tags = ['User Auth'])
async def create_user(
user: _schemas.UserCreate,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
db_user = await _services.get_user_by_email(email=user.email, db=db)
if db_user:
logging.info('User with that email already exists')
raise _fastapi.HTTPException(
status_code=200,
detail="User with that email already exists")
user = await _services.create_user(user=user, db=db)
return _fastapi.HTTPException(
status_code=201,
detail="User Registered, Please verify email to activate account !")
# Endpoint to check if the API is live
@app.get("/check_api")
async def check_api():
return {"status": "Connected to API Successfully"}
@app.post("/api/token" ,tags = ['User Auth'])
async def generate_token(
#form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),
user_data: _schemas.GenerateUserToken,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.authenticate_user(email=user_data.username, password=user_data.password, db=db)
if user == "is_verified_false":
logging.info('Email verification is pending. Please verify your email to proceed. ')
raise _fastapi.HTTPException(
status_code=403, detail="Email verification is pending. Please verify your email to proceed.")
if not user:
logging.info('Invalid Credentials')
raise _fastapi.HTTPException(
status_code=401, detail="Invalid Credentials")
logging.info('JWT Token Generated')
return await _services.create_token(user=user)
@app.get("/api/users/me", response_model=_schemas.User , tags = ['User Auth'])
async def get_user(user: _schemas.User = _fastapi.Depends(_services.get_current_user)):
return user
@app.get("/api/users/profile", tags=['User Auth'])
async def get_user(email: str, db: _orm.Session = _fastapi.Depends(_services.get_db)):
return db.query(_models.User and _models.Address).filter_by(id=1).first()
@app.post("/api/users/generate_otp", response_model=str, tags=["User Auth"])
async def send_otp_mail(userdata: _schemas.GenerateOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db)
if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")
if user.is_verified:
raise _fastapi.HTTPException(status_code=400, detail="User is already verified")
# Generate and send OTP
otp = _services.generate_otp()
print(otp)
_services.send_otp(userdata.email, otp, channel)
# Store the OTP in the database
user.otp = otp
db.add(user)
db.commit()
return "OTP sent to your email"
@app.post("/api/users/verify_otp", tags=["User Auth"])
async def verify_otp(userdata: _schemas.VerifyOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db )
if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")
if not user.otp or user.otp != userdata.otp:
raise _fastapi.HTTPException(status_code=400, detail="Invalid OTP")
# Update user's is_verified field
user.is_verified = True
user.otp = None # Clear the OTP
db.add(user)
db.commit()
return "Email verified successfully"
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=True)
在 auth/ 目錄下創(chuàng)建一個(gè) database.py 文件,代碼如下:
此代碼設(shè)置用于連接到 PostgreSQL 數(shù)據(jù)庫的 SQLAlchemy 引擎和會(huì)話創(chuàng)建器。它使用 dotenv
從環(huán)境變量加載數(shù)據(jù)庫連接詳細(xì)信息。DATABASE_URL
是使用檢索到的環(huán)境變量(包括主機(jī)、數(shù)據(jù)庫名稱、用戶名和密碼)構(gòu)建的。該引擎是使用 create_engine
和 DATABASE_URL
創(chuàng)建的,并且 SessionLocal
被定義為綁定到此引擎的會(huì)話創(chuàng)建者。Base
變量被初始化為用于定義 ORM 模型的聲明性基。
import sqlalchemy as _sql
import sqlalchemy.ext.declarative as _declarative
import sqlalchemy.orm as _orm
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
# Retrieve environment variables
postgres_host = os.environ.get("POSTGRES_HOST")
postgres_db = os.environ.get("POSTGRES_DB")
postgres_user = os.environ.get("POSTGRES_USER")
postgres_password = os.environ.get("POSTGRES_PASSWORD")
# Assuming your PostgreSQL server is running locally with a database named 'mydatabase'
DATABASE_URL = f"postgresql://{postgres_user}:{postgres_password}@{postgres_host}/{postgres_db}"
engine = _sql.create_engine(DATABASE_URL)
SessionLocal = _orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = _declarative.declarative_base()
在 auth/ 目錄下創(chuàng)建一個(gè) models.py 文件,代碼如下:
此代碼為 User
和 Address
表定義 SQLAlchemy 模型 ,存儲(chǔ)用戶信息和地址以及它們之間的關(guān)系。它還使用提供的引擎在數(shù)據(jù)庫中創(chuàng)建表。
import datetime as _dt
import sqlalchemy as _sql
import sqlalchemy.orm as _orm
import passlib.hash as _hash
from database import Base , engine
import database as _database
Base.metadata.create_all(engine)
class User(_database.Base):
__tablename__ = "users"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
name = _sql.Column(_sql.String)
email = _sql.Column(_sql.String, unique=True, index=True)
is_verified = _sql.Column(_sql.Boolean , default=False)
otp = _sql.Column(_sql.Integer)
hashed_password = _sql.Column(_sql.String)
addresses = _orm.relationship("Address", back_populates="user")
date_created = _sql.Column(_sql.DateTime, default=_dt.datetime.utcnow)
def verify_password(self, password: str):
return _hash.bcrypt.verify(password, self.hashed_password)
class Address(_database.Base):
__tablename__ = "addresses"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
street = _sql.Column(_sql.String)
landmark = _sql.Column(_sql.String)
city = _sql.Column(_sql.String)
country = _sql.Column(_sql.String)
pincode = _sql.Column(_sql.String)
user_id = _sql.Column(_sql.Integer, _sql.ForeignKey("users.id"))
user = _orm.relationship("User", back_populates="addresses")
latitude = _sql.Column(_sql.Float)
longitude = _sql.Column(_sql.Float)
在 auth/ 目錄下創(chuàng)建一個(gè) schemas.py 文件,代碼如下:
此代碼為 用戶相關(guān)的數(shù)據(jù)結(jié)構(gòu)定義 Pydantic 模型,包括用戶創(chuàng)建、身份驗(yàn)證和 OTP 驗(yàn)證。它還包括用于位置信息的地址模型。這些模型配置為從字典屬性自動(dòng)創(chuàng)建實(shí)例。
import datetime
import pydantic
class UserBase(pydantic.BaseModel):
name: str
email: str
class Config:
from_attributes=True
class UserCreate(UserBase):
password: str
class Config:
from_attributes=True
class User(UserBase):
id: int
date_created: datetime.datetime
class Config:
from_attributes=True
class AddressBase(pydantic.BaseModel):
street: str
landmark: str
city: str
country: str
pincode: str
latitude: float
longitude: float
class Config:
from_attributes=True
class GenerateUserToken(pydantic.BaseModel):
username: str
password: str
class Config:
from_attributes=True
class GenerateOtp(pydantic.BaseModel):
email: str
class VerifyOtp(pydantic.BaseModel):
email: str
otp: int
在 auth/ 目錄下創(chuàng)建一個(gè) service.py 文件,代碼如下:
此代碼定義了用于用戶身份驗(yàn)證和 OTP(一次性密碼)生成和驗(yàn)證的各種函數(shù)和依賴項(xiàng)。它使用 FastAPI 來處理 HTTP 請(qǐng)求,使用 SQLAlchemy 進(jìn)行數(shù)據(jù)庫作,使用 Pydantic 進(jìn)行數(shù)據(jù)驗(yàn)證和序列化,使用 JWT 進(jìn)行身份驗(yàn)證,使用 RabbitMQ 發(fā)送電子郵件通知。這些功能包括創(chuàng)建數(shù)據(jù)庫、獲取數(shù)據(jù)庫會(huì)話、創(chuàng)建新用戶、對(duì)用戶進(jìn)行身份驗(yàn)證、創(chuàng)建 JWT 令牌、從 JWT 令牌獲取當(dāng)前用戶、生成隨機(jī) OTP、連接到 RabbitMQ 以及發(fā)送 OTP 電子郵件通知。
import jwt
import sqlalchemy.orm as _orm
import passlib.hash as _hash
import email_validator as _email_check
import fastapi as _fastapi
import fastapi.security as _security
from passlib.hash import bcrypt
import database as _database
import schemas as _schemas
import models as _models
import random
import json
import pika
import time
import os
# Load environment variables
JWT_SECRET = os.getenv("JWT_SECRET")
RABBITMQ_URL = os.getenv("RABBITMQ_URL")
oauth2schema = _security.OAuth2PasswordBearer("/api/token")
def create_database():
# Create database tables
return _database.Base.metadata.create_all(bind=_database.engine)
def get_db():
# Dependency to get a database session
db = _database.SessionLocal()
try:
yield db
finally:
db.close()
async def get_user_by_email(email: str, db: _orm.Session):
# Retrieve a user by email from the database
return db.query(_models.User).filter(_models.User.email == email and _models.User.is_verified==True).first()
async def create_user(user: _schemas.UserCreate, db: _orm.Session):
# Create a new user in the database
try:
valid = _email_check.validate_email(user.email)
name = user.name
email = valid.email
except _email_check.EmailNotValidError:
raise _fastapi.HTTPException(status_code=404, detail="Please enter a valid email")
user_obj = _models.User(email=email, name=name, hashed_password=_hash.bcrypt.hash(user.password))
db.add(user_obj)
db.commit()
db.refresh(user_obj)
return user_obj
async def authenticate_user(email: str, password: str, db: _orm.Session):
# Authenticate a user
user = await get_user_by_email(email=email, db=db)
if not user:
return False
if not user.is_verified:
return 'is_verified_false'
if not user.verify_password(password):
return False
return user
async def create_token(user: _models.User):
# Create a JWT token for authentication
user_obj = _schemas.User.from_orm(user)
user_dict = user_obj.model_dump()
del user_dict["date_created"]
token = jwt.encode(user_dict, JWT_SECRET, algorithm="HS256")
return dict(access_token=token, token_type="bearer")
async def get_current_user(db: _orm.Session = _fastapi.Depends(get_db), token: str = _fastapi.Depends(oauth2schema)):
# Get the current authenticated user from the JWT token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user = db.query(_models.User).get(payload["id"])
except:
raise _fastapi.HTTPException(status_code=401, detail="Invalid Email or Password")
return _schemas.User.from_orm(user)
def generate_otp():
# Generate a random OTP
return str(random.randint(100000, 999999))
def connect_to_rabbitmq():
# Connect to RabbitMQ
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL))
return connection
except pika.exceptions.AMQPConnectionError:
print("Failed to connect to RabbitMQ. Retrying in 5 seconds...")
time.sleep(5)
def send_otp(email, otp, channel):
# Send an OTP email notification using RabbitMQ
connection = connect_to_rabbitmq()
channel = connection.channel()
message = {'email': email,
'subject': 'Account Verification OTP Notification',
'other': 'null',
'body': f'Your OTP for account verification is: {otp} \n Please enter this OTP on the verification page to complete your account setup. \n If you did not request this OTP, please ignore this message.\n Thank you '
}
try:
queue_declare_ok = channel.queue_declare(queue='email_notification', passive=True)
current_durable = queue_declare_ok.method.queue
if current_durable:
if queue_declare_ok.method.queue != current_durable:
channel.queue_delete(queue='email_notification')
channel.queue_declare(queue='email_notification', durable=True)
else:
channel.queue_declare(queue='email_notification', durable=True)
channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OTP email notification")
except Exception as err:
print(f"Failed to publish message: {err}")
finally:
channel.close()
connection.close()
在本節(jié)中,我們將重點(diǎn)介紹如何實(shí)現(xiàn)機(jī)器學(xué)習(xí)服務(wù),該服務(wù)作為微服務(wù)架構(gòu)中所有 OCR(光學(xué)字符識(shí)別)相關(guān)請(qǐng)求的入口點(diǎn)。機(jī)器學(xué)習(xí)服務(wù)負(fù)責(zé)處理 OCR 請(qǐng)求、從圖像中提取文本以及在完成時(shí)發(fā)送通知。
現(xiàn)在,讓我們實(shí)現(xiàn)機(jī)器學(xué)習(xí)服務(wù)。
在 ml_services/ 目錄下創(chuàng)建一個(gè) main.py 文件,代碼如下:
此 Python 腳本連接到 RabbitMQ 服務(wù)器,并使用來自名為“ocr_service”的隊(duì)列中的消息。收到消息后,它使用 OCRService 對(duì)象對(duì)其進(jìn)行處理,使用 send_email_notification 函數(shù)發(fā)送電子郵件通知,然后將響應(yīng)發(fā)布到回復(fù)隊(duì)列。在處理每條消息后,它確認(rèn)向 RabbitMQ 傳送消息。該腳本使用預(yù)取計(jì)數(shù) 1 來限制 RabbitMQ 將傳送的未確認(rèn)消息的數(shù)量。
import pika
import json
from utils import OCRService
from utils import send_email_notification
# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='ocr_service')
# Callback function to process OCR requests
def on_request(ch, method, props, body):
# Initialize OCR service
ocr_service = OCRService()
# Process OCR request
response = ocr_service.process_request(body)
# Send email notification
send_email_notification(response['user_email'], response['ocr_text'], channel)
# Publish response to the reply queue
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=json.dumps(response))
# Acknowledge the message delivery
ch.basic_ack(delivery_tag=method.delivery_tag)
# Set prefetch count to 1
channel.basic_qos(prefetch_count=1)
# Consume messages from the 'ocr_service' queue
channel.basic_consume(queue='ocr_service', on_message_callback=on_request)
# Start consuming messages
print(" [x] Awaiting RPC requests")
channel.start_consuming()
在 ml_services/ 目錄下創(chuàng)建一個(gè) utils.py 文件,代碼如下:
import json
import base64
import pandas as pd
#keras ocr pipeline and imports
import keras_ocr
import pika
class OCRService:
def __init__(self):
self.keras_pipeline = keras_ocr.pipeline.Pipeline()
def keras_ocr(self, image_path):
results = self.keras_pipeline.recognize([image_path])
df = pd.DataFrame(results[0], columns=['text', 'bbox'])
words = df['text'].tolist()
sentence = ' '.join(words)
return sentence
def process_request(self, message):
message_body = json.loads(message)
user_name = message_body['user_name']
user_email = message_body['user_email']
user_id = message_body['user_id']
file_base64 = message_body['file']
print(f" [x]user_id: {user_id} request recieved from gateway..")
print(f" [x]processing request for {user_name}")
# Assuming file_base64 contains the base64-encoded string
file_data = base64.b64decode(file_base64.encode())
# Write the decoded file data to a new file
with open('artifacts/decoded_file.png', 'wb') as f:
f.write(file_data)
image_path = "artifacts/decoded_file.png"
ocr_text = self.keras_ocr(image_path)
print(" [^]OCR processing done !!!")
response = {
"user_id": user_id,
"user_name": user_name,
"user_email": user_email,
"ocr_text": ocr_text
}
return response
def send_email_notification(email, ocr_text, channel):
# Send an email notification using RabbitMQ
message = {
'email': email,
'subject':'OCR Process Completed !!',
'body':f'We are pleased to inform you that the OCR (Optical Character Recognition) process for your image has been successfully completed.\n The extracted text has been processed and is now ready for use.\n\n OCR text : {ocr_text}',
'other': 'null',
}
try:
channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OCR Process Completed email notification")
except Exception as err:
print(f"Failed to publish message: {err}")
在本節(jié)中,我們將重點(diǎn)介紹如何實(shí)現(xiàn)通知服務(wù),該服務(wù)向用戶發(fā)送電子郵件通知。
在 notification_service/ 目錄下創(chuàng)建一個(gè) main.py 文件,代碼如下:
此腳本設(shè)置一個(gè) RabbitMQ 使用者,用于偵聽“email_notification”隊(duì)列中的消息。收到消息時(shí),它會(huì) 從 email_service
模塊調(diào)用通知
函數(shù)來處理通知過程。如果成功,它將確認(rèn)消息;否則,它將拒絕該消息并打印錯(cuò)誤消息。
import pika
import sys
import os
import time
import email_service
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
def main():
# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_URL))
channel = connection.channel()
def callback(ch, method, properties, body):
try:
err = email_service.notification(body)
if err:
ch.basic_nack(delivery_tag=method.delivery_tag)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue="email_notification", on_message_callback=callback
)
print("Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)
在 notification_service/ 目錄下創(chuàng)建一個(gè) email_service.py 文件,代碼如下:
import smtplib, os, json
from email.message import EmailMessage
from dotenv import load_dotenv
from email.mime.text import MIMEText
load_dotenv()
def notification(message):
try:
message = json.loads(message)
receiver_address = message["email"]
subject = message["subject"]
body = message["body"]
other = message["other"]
sender_address = os.environ.get("GMAIL_ADDRESS")
sender_password = os.environ.get("GMAIL_PASSWORD")
# Gmail SMTP server settings
smtp_server = 'smtp.gmail.com'
smtp_port = 587
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_address, sender_password)
# Compose the email message
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = sender_address
msg['To'] = receiver_address
server.sendmail(sender_address, receiver_address, msg.as_string())
server.quit()
print("Mail Sent")
except Exception as e:
print(f"Failed to send email: {e}")
綜上所述,我們成功地使用 FastAPI 和 RabbitMQ 實(shí)現(xiàn)了端到端的微服務(wù)架構(gòu)。我們演示了如何創(chuàng)建用戶身份驗(yàn)證服務(wù)、用于 OCR 處理的機(jī)器學(xué)習(xí)服務(wù)以及用于電子郵件通知的通知服務(wù)。
通過本博客,我們了解了微服務(wù)的關(guān)鍵概念,例如服務(wù)隔離、通過消息隊(duì)列進(jìn)行通信以及使用異步處理實(shí)現(xiàn)可擴(kuò)展性和性能的好處。
Yahoo Finance API – 完整指南
WordPress REST API 內(nèi)容注入漏洞分析
四款A(yù)I大模型API價(jià)格對(duì)比:DeepSeek R1、ChatGPT o3-mini、Grok3、通義千問 Max
四款A(yù)I大模型API基礎(chǔ)參數(shù)、核心性能的區(qū)別:DeepSeek R1、ChatGPT o3-mini、Grok3、通義千問 Max
2025年多模態(tài)大模型API基礎(chǔ)參數(shù)、核心性能:Deepseek、ChatGPT、文心一言
2025年最新推理大模型API價(jià)格對(duì)比:通義千問Max vs 豆包1.5 Pro vs 混元Lite
大模型新基座,基于FastAPI,利用Python開發(fā)MCP服務(wù)器
DeepSeek+ima:打造高效個(gè)人知識(shí)庫,提升學(xué)習(xí)與工作效率
快速接入騰訊地圖MCP Server
對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力
一鍵對(duì)比試用API 限時(shí)免費(fèi)