bot_token = 'your_bot_api_key'
url = f"https://api.telegram.org/bot{bot_token}/getUpdates"

response = requests.get(url)
data = response.json()

if data['ok']:
if len(data['result']) > 0:
for update in data['result']:
chat_id = update['message']['chat']['id']
print(f"Chat ID: {chat_id}")
else:
print("No new updates found.")
else:
print("Failed to get updates. Error:", data)

使用金融建模準(zhǔn)備(Financial Modeling Prep,簡(jiǎn)稱FMP)獲取實(shí)時(shí)加密貨幣更新

要獲取實(shí)時(shí)加密貨幣更新并將它們發(fā)送到Telegram,我們需要將金融建模準(zhǔn)備(FMP)API與Python腳本集成。為了清晰起見(jiàn),我們將這個(gè)過(guò)程分為幾個(gè)小標(biāo)題。

  1. 安裝所需的庫(kù)
    首先,確保您已安裝必要的Python庫(kù):
pip install websocket-client requests
  1. 設(shè)置WebSocket客戶端
    我們將創(chuàng)建一個(gè)類來(lái)處理WebSocket連接,獲取實(shí)時(shí)數(shù)據(jù),并將更新發(fā)送到Telegram。FMPWebSocketClient。
class FMPWebSocketClient:
def __init__(self, api_key, symbol, telegram_token, chat_id):
self.api_key = api_key
self.symbol = symbol
self.telegram_token = telegram_token
self.chat_id = chat_id
self.ws_url = "wss://crypto.financialmodelingprep.com"
self.data = []

api_key:您的FMP API密鑰。
symbol:您想要跟蹤的加密貨幣符號(hào)(例如,SOLUSD)。
telegram_token:您的Telegram機(jī)器人API令牌。
chat_id:更新將被發(fā)送到的聊天ID。

  1. 獲取實(shí)時(shí)數(shù)據(jù)
    我們將創(chuàng)建方法來(lái)處理WebSocket事件,處理傳入的消息,并發(fā)送通知。
 def on_message(self, ws, message):
data = json.loads(message)
if 'price' in data and 'volume' in data:
price = float(data['price'])
volume = float(data['volume'])
timestamp = datetime.utcnow()

data_point = {"price": price, "volume": volume, "timestamp": timestamp}
self.data.append(data_point)

# Keep only the data for the last 5 minutes
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
self.data = [x for x in self.data if x['timestamp'] >= five_minutes_ago]

# Simplified condition to send a message for every new data point
message = f"Symbol: {self.symbol}\nPrice: {price}\nVolume: {volume}\nTime: {timestamp}"
self.send_telegram_message(message)

這個(gè)方法處理傳入的WebSocket消息,并將數(shù)據(jù)點(diǎn)添加到列表中。

  1. 發(fā)送通知到Telegram
    我們將創(chuàng)建一個(gè)方法,將消息發(fā)送到Telegram聊天中。
def send_telegram_message(self, message):
url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage"
payload = {
"chat_id": self.chat_id,
"text": message
}
response = requests.post(url, data=payload)
print("Response from Telegram API:", response.json())

這個(gè)方法構(gòu)建負(fù)載并使用機(jī)器人API發(fā)送消息到Telegram。 它打印出來(lái)自Telegram API的響應(yīng)以供驗(yàn)證。

5. 啟動(dòng)WebSocket客戶端
最后,我們將創(chuàng)建一個(gè)方法來(lái)啟動(dòng)WebSocket客戶端并持續(xù)獲取數(shù)據(jù)。

  def on_error(self, ws, error):
print("WebSocket Error:", error)

def on_close(self, ws, close_status_code, close_msg):
print("WebSocket Closed:", close_msg)

def on_open(self, ws):
login_message = json.dumps({
"event": "login",
"data": {"apiKey": self.api_key}
})
ws.send(login_message)
subscribe_message = json.dumps({
"event": "subscribe",
"data": {"symbol": self.symbol}
})
ws.send(subscribe_message)
print("Subscribed to:", self.symbol)

on_open:向WebSocket服務(wù)器發(fā)送登錄和訂閱消息。
啟動(dòng)客戶端:

 def start(self):
self.ws = websocket.WebSocketApp(self.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.ws.run_forever()

初始化并啟動(dòng)WebSocket客戶端以連接到FMP WebSocket端點(diǎn)。

  1. 運(yùn)行完整流程
    要運(yùn)行完整流程,使用您的憑證實(shí)例化FMPWebSocketClient類并啟動(dòng)客戶端。
import requests
import time
from datetime import datetime, timedelta

class FMPWebSocketClient:
def __init__(self, api_key, symbol, telegram_token, chat_id):
self.api_key = api_key
self.symbol = symbol
self.telegram_token = telegram_token
self.chat_id = chat_id
self.ws_url = "wss://crypto.financialmodelingprep.com"
self.data = []

def on_message(self, ws, message):
data = json.loads(message)
if 'price' in data and 'volume' in data:
price = float(data['price'])
volume = float(data['volume'])
timestamp = datetime.utcnow()

data_point = {"price": price, "volume": volume, "timestamp": timestamp}
self.data.append(data_point)

# Keep only the data for the last 5 minutes
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
self.data = [x for x in self.data if x['timestamp'] >= five_minutes_ago]

# Simplified condition to send a message for every new data point
message = f"Symbol: {self.symbol}\nPrice: {price}\nVolume: {volume}\nTime: {timestamp}"
self.send_telegram_message(message)

def send_telegram_message(self, message):
url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage"
payload = {"chat_id": self.chat_id, "text": message}
response = requests.post(url, data=payload)
print("Response from Telegram API:", response.json())

def on_error(self, ws, error):
print("WebSocket Error:", error)

def on_close(self, ws, close_status_code, close_msg):
print("WebSocket Closed:", close_msg)

def on_open(self, ws):
login_message = json.dumps({
"event": "login",
"data": {"apiKey": self.api_key}
})
ws.send(login_message)
subscribe_message = json.dumps({
"event": "subscribe",
"data": {"symbol": self.symbol}
})
ws.send(subscribe_message)
print("Subscribed to:", self.symbol)

def start(self):
self.ws = websocket.WebSocketApp(self.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.ws.run_forever()

# Replace with your actual API key, token, and chat ID
api_key = "your_fmp_api_key"
symbol = "SOLUSD" # Use "ADAUSD" for Cardano or any other supported cryptocurrency symbol
telegram_token = "your_telegram_bot_token"
chat_id = "your_chat_id"

client = FMPWebSocketClient(api_key, symbol, telegram_token, chat_id)
client.start()

結(jié)論

通過(guò)將?Financial Modeling Prep (FMP) API?與 Telegram 機(jī)器人集成,您可以毫不費(fèi)力地隨時(shí)了解實(shí)時(shí)加密貨幣數(shù)據(jù)。本指南向您展示了如何設(shè)置 Telegram 機(jī)器人、從?FMP?獲取實(shí)時(shí)數(shù)據(jù)以及向您的聊天發(fā)送更新。

我們首先設(shè)置 Telegram 機(jī)器人,確保它已準(zhǔn)備好接收消息。接下來(lái),我們?cè)?Python 中創(chuàng)建了一個(gè) WebSocket 客戶端,用于從?FMP?獲取實(shí)時(shí)數(shù)據(jù)。客戶端處理這些數(shù)據(jù)并根據(jù)預(yù)定義的條件向 Telegram 發(fā)送通知。

該系統(tǒng)允許您接收有關(guān)重大市場(chǎng)變化的即時(shí)更新,幫助您做出明智的交易決策。無(wú)論您是跟蹤 Solana、Cardano 還是任何其他加密貨幣,此設(shè)置都能確保您不會(huì)錯(cuò)過(guò)重要的市場(chǎng)走勢(shì)。

隨意自定義條件并擴(kuò)展系統(tǒng)以監(jiān)控多種加密貨幣。這種實(shí)時(shí)數(shù)據(jù)與即時(shí)消息的集成可以顯著增強(qiáng)您的交易策略和市場(chǎng)意識(shí)。

如何找到更多同類API?

冪簡(jiǎn)集成是國(guó)內(nèi)領(lǐng)先的API集成管理平臺(tái),專注于為開(kāi)發(fā)者提供全面、高效、易用的API集成解決方案。冪簡(jiǎn)API平臺(tái)可以通過(guò)以下兩種方式找到所需API:通過(guò)關(guān)鍵詞搜索API、或者從API Hub分類頁(yè)進(jìn)入尋找。

原文鏈接:https://medium.datadriveninvestor.com/how-to-get-instant-cypto-updates-on-telegram-using-fmps-api-1608303501dc

上一篇:

如何記錄 AWS Lambda Go 函數(shù)的 API 調(diào)用

下一篇:

將 GraphQL 單體遷移至 Apollo Federation
#你可能也喜歡這些API文章!

我們有何不同?

API服務(wù)商零注冊(cè)

多API并行試用

數(shù)據(jù)驅(qū)動(dòng)選型,提升決策效率

查看全部API→
??

熱門場(chǎng)景實(shí)測(cè),選對(duì)API

#AI文本生成大模型API

對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力

25個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)

#AI深度推理大模型API

對(duì)比大模型API的邏輯推理準(zhǔn)確性、分析深度、可視化建議合理性

10個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)