Files
dnake/apps/socket/main.py
2025-10-19 17:58:30 +08:00

75 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import time
def send_sip_message():
# 配置参数
target_ip = "192.168.3.3" # 设备eth1的IP转发入口
target_port = 5060 # 转发端口
mac_local_ip = "192.168.3.33"# Mac的本地IP与Via头部一致
sender_sip = "3011302" # 发送方SIP用户
sender_ip = "192.168.9.57" # 设备eth0的IP转发后源IP
receiver_sip = "3019901" # 接收方SIP用户
receiver_ip = "192.168.9.4" # 目标IP
# 生成唯一标识
call_id = f"{int(time.time())}" # 基于时间戳的Call-ID
tag = f"{int(time.time() % 1000000)}" # 随机tag
branch = f"z9hG4bK{int(time.time())}" # Via分支标识
# XML消息体业务数据
xml_body = '''<?xml version="1.0" encoding="UTF-8" ?>
<params>
<to>sip:{receiver_sip}@{receiver_ip}:5060</to>
<elev>1</elev>
<direct>2</direct>
<floor>13</floor>
<family>2</family>
<app>elev</app>
<event>appoint</event>
<event_url>/elev/appoint</event_url>
</params>'''.format(receiver_sip=receiver_sip, receiver_ip=receiver_ip)
# 计算消息体长度(字节数)
content_length = len(xml_body.encode('utf-8'))
# 构造SIP MESSAGE请求严格使用\r\n换行
sip_message = f'''MESSAGE sip:{receiver_sip}@{receiver_ip}:5060 SIP/2.0
Via: SIP/2.0/UDP {mac_local_ip}:5060;rport;branch={branch}
From: <sip:{sender_sip}@{sender_ip}:5060>;tag={tag}
To: <sip:{receiver_sip}@{receiver_ip}:5060>
Call-ID: {call_id}@{mac_local_ip}
CSeq: 20 MESSAGE
Content-Type: text/plain
Max-Forwards: 70
User-Agent: DnakeVoip v1.0
Content-Length: {content_length}
{xml_body}'''
# 替换换行符为\r\n确保符合SIP协议
sip_message = sip_message.replace('\n', '\r\n')
try:
# 创建UDP socket并发送消息
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
# 设置超时时间5秒
sock.settimeout(5)
# 发送消息
sock.sendto(sip_message.encode('utf-8'), (target_ip, target_port))
print(f"已发送SIP消息到 {target_ip}:{target_port}")
print("\n发送的消息内容:")
print(sip_message)
# 尝试接收响应(可选)
try:
response, addr = sock.recvfrom(4096)
print(f"\n收到来自 {addr} 的响应:")
print(response.decode('utf-8', errors='ignore'))
except socket.timeout:
print("\n未收到响应(可能对方未回复或网络延迟)")
except Exception as e:
print(f"发送失败:{str(e)}")
if __name__ == "__main__":
send_sip_message()