update
This commit is contained in:
144
docs/src/pages/python.mdx
Normal file
144
docs/src/pages/python.mdx
Normal file
@@ -0,0 +1,144 @@
|
||||
# python 代码示例
|
||||
|
||||
```sh
|
||||
# 安装 python 环境 ,安装项目
|
||||
uv sync
|
||||
```
|
||||
|
||||
## 运行 python 脚本
|
||||
|
||||
main.py 主模块
|
||||
|
||||
```python
|
||||
import socket
|
||||
import time
|
||||
|
||||
def send_sip_message():
|
||||
# 配置参数
|
||||
target_ip = "192.168.3.3" # 设备eth1的IP(转发入口)
|
||||
target_port = 5060 # 转发端口
|
||||
mac_local_ip = "192.168.3.33"# Mac的本地IP(与Via头部一致)
|
||||
sender_sip = "3011302" # 发送方SIP用户
|
||||
sender_ip = "192.168.9.57" # 设备eth0的IP(转发后源IP)
|
||||
receiver_sip = "3019901" # 接收方SIP用户
|
||||
receiver_ip = "192.168.9.4" # 目标IP
|
||||
|
||||
# 生成唯一标识
|
||||
call_id = f"{int(time.time())}" # 基于时间戳的Call-ID
|
||||
tag = f"{int(time.time() % 1000000)}" # 随机tag
|
||||
branch = f"z9hG4bK{int(time.time())}" # Via分支标识
|
||||
|
||||
# XML消息体(业务数据)
|
||||
xml_body = '''<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<params>
|
||||
<to>sip:{receiver_sip}@{receiver_ip}:5060</to>
|
||||
<elev>1</elev>
|
||||
<direct>2</direct>
|
||||
<floor>13</floor>
|
||||
<family>2</family>
|
||||
<app>elev</app>
|
||||
<event>appoint</event>
|
||||
<event_url>/elev/appoint</event_url>
|
||||
</params>'''.format(receiver_sip=receiver_sip, receiver_ip=receiver_ip)
|
||||
|
||||
# 计算消息体长度(字节数)
|
||||
content_length = len(xml_body.encode('utf-8'))
|
||||
|
||||
# 构造SIP MESSAGE请求(严格使用\r\n换行)
|
||||
sip_message = f'''MESSAGE sip:{receiver_sip}@{receiver_ip}:5060 SIP/2.0
|
||||
Via: SIP/2.0/UDP {mac_local_ip}:5060;rport;branch={branch}
|
||||
From: <sip:{sender_sip}@{sender_ip}:5060>;tag={tag}
|
||||
To: <sip:{receiver_sip}@{receiver_ip}:5060>
|
||||
Call-ID: {call_id}@{mac_local_ip}
|
||||
CSeq: 20 MESSAGE
|
||||
Content-Type: text/plain
|
||||
Max-Forwards: 70
|
||||
User-Agent: DnakeVoip v1.0
|
||||
Content-Length: {content_length}
|
||||
|
||||
{xml_body}'''
|
||||
|
||||
# 替换换行符为\r\n(确保符合SIP协议)
|
||||
sip_message = sip_message.replace('\n', '\r\n')
|
||||
|
||||
try:
|
||||
# 创建UDP socket并发送消息
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
|
||||
# 设置超时时间(5秒)
|
||||
sock.settimeout(5)
|
||||
# 发送消息
|
||||
sock.sendto(sip_message.encode('utf-8'), (target_ip, target_port))
|
||||
print(f"已发送SIP消息到 {target_ip}:{target_port}")
|
||||
print("\n发送的消息内容:")
|
||||
print(sip_message)
|
||||
|
||||
# 尝试接收响应(可选)
|
||||
try:
|
||||
response, addr = sock.recvfrom(4096)
|
||||
print(f"\n收到来自 {addr} 的响应:")
|
||||
print(response.decode('utf-8', errors='ignore'))
|
||||
except socket.timeout:
|
||||
print("\n未收到响应(可能对方未回复或网络延迟)")
|
||||
|
||||
except Exception as e:
|
||||
print(f"发送失败:{str(e)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
send_sip_message()
|
||||
```
|
||||
|
||||
app.py
|
||||
|
||||
```python
|
||||
from fastapi import FastAPI, Query
|
||||
import httpx
|
||||
import asyncio
|
||||
import uvicorn
|
||||
from main import send_sip_message
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
async def main():
|
||||
"""主函数内容"""
|
||||
print("执行main函数...")
|
||||
# 执行发送SIP消息
|
||||
send_sip_message()
|
||||
return {"message": "main function executed - SIP message sent"}
|
||||
|
||||
async def test():
|
||||
"""测试函数内容"""
|
||||
print("执行test函数...")
|
||||
# 在这里添加你的test函数逻辑
|
||||
return {"message": "test function executed"}
|
||||
|
||||
@app.api_route("/api/router", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
|
||||
async def router(path: str = Query(...)):
|
||||
"""路由处理器"""
|
||||
try:
|
||||
# 根据path参数执行不同的函数
|
||||
if path == "floor":
|
||||
result = await main()
|
||||
elif path == "test":
|
||||
result = await test()
|
||||
else:
|
||||
return {"error": f"Unknown path: {path}"}
|
||||
|
||||
return {"success": True, "data": result}
|
||||
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
@app.api_route("/", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
|
||||
async def root():
|
||||
"""健康检查端点"""
|
||||
return {"message": "FastAPI server is running"}
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 监听所有IPv6地址的3001端口
|
||||
uvicorn.run(
|
||||
"app:app",
|
||||
host="::", # 监听所有IPv6地址
|
||||
port=3001,
|
||||
reload=True
|
||||
)
|
||||
```
|
||||
Reference in New Issue
Block a user