PYTHON实现代理服务器并实现代理控制

发布于 2024-02-19  534 次阅读


  • 网上关于这个的代码不少,但是其中很多都是只能代理http请求.能够代理https本就不多,在这之中,我没有找到能够单文件控制http和https的请求的代码,只能自己改一个喽
  • 这里讨论的是http代理对于代理http地址和代理https地址的控制,本文实现的两个代理都是http代理

代理http的控制

  • 那些基本概念就不再赘述,能有这么奇怪的需求(最好单文件,代理控制,代理https)应该不需要过多的前缀🤣

  • 代理是一个中间人,在客户端与服务端之间传输信息,那么想要正确的传输信息就至少需要知道所传输信息的一些基本内容,比如目的地,请求类型等

  • 我们知道,http请求过程实际上是报文的传输(关于报文这些东西很久之前一篇谢过了,这里不再写了),我们的代理通过读报文就能够获得这些基本信息,我们通过建立socket获取客户端的信息并建立连接,然后读客户端传来的报文,获取里面的目的地(服务端),然后转发即可(关于socket的使用,很久之前的一篇写过了,这里不再写了)

def serve(ip, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((ip, port))
    s.listen(10)
    print('proxy start...')
    while True:
        conn, addr = s.accept()
        _thread.start_new_thread(handle, (conn,))

if __name__ == '__main__':
    IP = "0.0.0.0"
    PORT = 9000
    serve(IP, PORT)
  • 这里监听9000端口,与试图连接的客户端建立一个socket,创建一个新线程,调用handle函数开始处理这个socket

  • 对于http代理,我们可以通过套接字传来的请求头直接获取报文的相关信息,这里先定义一个类来存放和处理某个socket的首部字段,这其中的原理当然和报文的构造有关

class Header:
    """
    用于读取和解析头信息
    """

    def __init__(self, conn):
        self._method = None
        header = b''
        try:
            while 1:
                data = conn.recv(4096)
                header = b"%s%s" % (header, data)
                if header.endswith(b'\r\n\r\n') or (not data):
                    break
        except:
            pass
        self._header = header
        self.header_list = header.split(b'\r\n')
        self._host = None
        self._port = None

    def get_method(self):
        """
        获取请求方式
        :return:
        """
        if self._method is None:
            self._method = self._header[:self._header.index(b' ')]
        return self._method

    def get_host_info(self):
        """
        获取目标主机的ip和端口
        :return:
        """
        if self._host is None:
            method = self.get_method()
            line = self.header_list[0].decode('utf8')
            if method == b"CONNECT":
                host = line.split(' ')[1]
                if ':' in host:
                    host, port = host.split(':')
                else:
                    port = 443
            else:
                for i in self.header_list:
                    if i.startswith(b"Host:"):
                        host = i.split(b" ")
                        if len(host) < 2:
                            continue
                        host = host[1].decode('utf8')
                        break
                else:
                    host = line.split('/')[2]
                if ':' in host:
                    host, port = host.split(':')
                else:
                    port = 80
            self._host = host
            self._port = int(port)
        return self._host, self._port

    @property
    def data(self):
        """
        返回头部数据
        :return:
        """
        return self._header

    def is_ssl(self):
        """
        判断是否为 https协议
        :return:
        """
        if self.get_method() == b'CONNECT':
            return True
        return False

    def __repr__(self):
        return str(self._header.decode("utf8"))
  • 直接上嫖的完整的代码
# encoding:utf-8
import socket
import _thread
def is_substring(substring, string):
    """
    判断一个字符串是否被包含在另一个字符串中
    :param substring: 要判断的子字符串
    :param string: 目标字符串
    :return: True(被包含)或 False(未被包含)
    """
    return substring in string

class Header:
    """
    用于读取和解析头信息
    """

    def __init__(self, conn):
        self._method = None
        header = b''
        try:
            while 1:
                data = conn.recv(4096)
                header = b"%s%s" % (header, data)
                if header.endswith(b'\r\n\r\n') or (not data):
                    break
        except:
            pass
        self._header = header
        self.header_list = header.split(b'\r\n')
        self._host = None
        self._port = None

    def get_method(self):
        """
        获取请求方式
        :return:
        """
        if self._method is None:
            self._method = self._header[:self._header.index(b' ')]
        return self._method

    def get_host_info(self):
        """
        获取目标主机的ip和端口
        :return:
        """
        if self._host is None:
            method = self.get_method()
            line = self.header_list[0].decode('utf8')
            if method == b"CONNECT":
                host = line.split(' ')[1]
                if ':' in host:
                    host, port = host.split(':')
                else:
                    port = 443
            else:
                for i in self.header_list:
                    if i.startswith(b"Host:"):
                        host = i.split(b" ")
                        if len(host) < 2:
                            continue
                        host = host[1].decode('utf8')
                        break
                else:
                    host = line.split('/')[2]
                if ':' in host:
                    host, port = host.split(':')
                else:
                    port = 80
            self._host = host
            self._port = int(port)
        return self._host, self._port

    @property
    def data(self):
        """
        返回头部数据
        :return:
        """
        return self._header

    def is_ssl(self):
        """
        判断是否为 https协议
        :return:
        """
        if self.get_method() == b'CONNECT':
            return True
        return False

    def __repr__(self):
        return str(self._header.decode("utf8"))

def communicate(sock1, sock2):
    """
    socket之间的数据交换
    :param sock1:
    :param sock2:
    :return:
    """
    try:
        while 1:
            data = sock1.recv(1024)
            if not data:
                return
            sock2.sendall(data)
    except:
        pass

def handle(client):
    """
    处理连接进来的客户端
    :param client:
    :return:
    """
    timeout = 60
    client.settimeout(timeout)
    header = Header(client)
    if not header.data:
        client.close()
        return

    print(*header.get_host_info(), header.get_method())
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        server.connect(header.get_host_info())
        server.settimeout(timeout)
        if header.is_ssl():
            data = b"HTTP/1.0 200 Connection Established\r\n\r\n"
            client.sendall(data)
            _thread.start_new_thread(communicate, (client, server))
        else:
            server.sendall(header.data)
        communicate(server, client)
    except:
        server.close()
        client.close()

def serve(ip, port):
    """
    代理服务
    :param ip:
    :param port:
    :return:
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((ip, port))
    s.listen(10)
    print('proxy start...')
    while True:
        conn, addr = s.accept()
        _thread.start_new_thread(handle, (conn,))

if __name__ == '__main__':
    IP = "0.0.0.0"
    PORT = 9000
    serve(IP, PORT)
  • 这其中http代理对于http的处理是简单的对报文解包得到地址,然后转发
  • 那么要实现控制的话可以简单的在handle函数里面解包之后加上一些if的判断,比如这里加上了判定访问的目标地址是不是自己设定的网址(http://cloud.drinkflower.asia/),如果不是的话就不予代理
def handle(client):
    """
    处理连接进来的客户端
    :param client:
    :return:
    """
    timeout = 60
    client.settimeout(timeout)
    header = Header(client)
    if not header.data:
        client.close()
        return
  #控制开始
  url = header.header_list[0].decode('utf8').split(' ')[1]
    if not is_substring(url, 'http://cloud.drinkflower.asia/'):
        client.close()
        return
  #控制结束
    print(*header.get_host_info(), header.get_method())
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        server.connect(header.get_host_info())
        server.settimeout(timeout)
        if header.is_ssl():
            data = b"HTTP/1.0 200 Connection Established\r\n\r\n"
            client.sendall(data)
            _thread.start_new_thread(communicate, (client, server))
        else:
            server.sendall(header.data)
        communicate(server, client)
    except:
        server.close()
        client.close()

https代理的控制

  • 那么当访问https站点时,上面的控制就失效了,因为http代理服务器无法解https的包,我们可以在handle函数里面把url,打印出来,或者直接把整个报文打印出来(因为过来很久才想着记录一下,懒得弄环境了,就不截图了)
def handle(client):
    """
    处理连接进来的客户端
    :param client:
    :return:
    """
    timeout = 60
    client.settimeout(timeout)
    header = Header(client)
    if not header.data:
        client.close()
        return

  url = header.header_list[0].decode('utf8').split(' ')[1]
  print(url)
  print(header.header_list[0].decode('utf8'))
  #把url和报文全部打印出来看一下
    print(*header.get_host_info(), header.get_method())
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        server.connect(header.get_host_info())
        server.settimeout(timeout)
        if header.is_ssl():
            data = b"HTTP/1.0 200 Connection Established\r\n\r\n"
            client.sendall(data)
            _thread.start_new_thread(communicate, (client, server))
        else:
            server.sendall(header.data)
        communicate(server, client)
    except:
        server.close()
        client.close()
  • 打印出来之后是只有目标站点的ip的,是没有具体的路径的,代理仅同时与客户端和目的地址建立一个"隧道",然后向客户端发送一个connection establish的标志,不解包直接抓发请求,解包的事是把包丢给目标ip,目标网站解包之后才知道具体房屋内路径的
在处理HTTPS流量时,代理服务器通常无法像处理普通HTTP流量那样直接查看或篡改传输的数据。这是因为HTTPS使用了
SSL/TLS加密协议来加密通信内容,包括请求和响应数据,在传输过程中数据是加密的,代理服务器无法直接解密这些数据。

当客户端与目标服务器之间建立HTTPS连接时,代理服务器只能看到加密的流量,无法直接读取其中的内容。代理服务器只是将
加密的数据包进行转发,而不会解密它们。因此,即使代理服务器可以转发HTTPS流量,但在正常情况下,它无法窃取HTTPS通
信中的敏感信息。
  • 那么如何实现上面的路径控制呢,治理提供一套针对自己网站的代码.因为我们自己手上是肯定有自己域名的公钥和私钥的,可以简单的解一下包,解出地址再决定是否代理
  • 先判断访问目标是不是自己的网站
if header.get_host_info()[0]=='drinkflower.asia':
    #print("访问的具体路径:", get_path(header.header_list))
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            server.connect(header.get_host_info())
            server.settimeout(timeout)
            if header.is_ssl():
  • 然后建立连接,并且把将客户端 socket 包装成 SSL socket.这里的context.load_cert_chain里面的参数是要填自己域名的证书和密钥
data = b"HTTP/1.0 200 Connection Established\r\n\r\n"
                client.sendall(data)

                context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
                context.load_cert_chain(certfile='drinkflower.asia.crt', keyfile='drinkflower.asia.key')

                client = context.wrap_socket(client, server_side=True)
                # 读取并解析 HTTP 请求
                header1 = Header(client)
                print("访问的具体路径:",header1.data)
                #获取访问信息之后这里就可以加一些控制语句了
                server = context.wrap_socket(server, server_side=False)
                # 将 HTTP 请求转发给目标服务器
                server.sendall(header1.data)
                #_thread.start_new_thread(communicate, (client, server))
# encoding:utf-8
import socket
import _thread
import ssl
def is_substring(substring, string):
    """
    判断一个字符串是否被包含在另一个字符串中
    :param substring: 要判断的子字符串
    :param string: 目标字符串
    :return: True(被包含)或 False(未被包含)
    """
    return substring in string

class Header:
    """
    用于读取和解析头信息
    """

    def __init__(self, conn):
        self._method = None
        header = b''
        try:
            while 1:
                data = conn.recv(4096)
                header = b"%s%s" % (header, data)
                if header.endswith(b'\r\n\r\n') or (not data):
                    break
        except:
            pass
        self._header = header
        self.header_list = header.split(b'\r\n')
        self._host = None
        self._port = None

    def get_method(self):
        """
        获取请求方式
        :return:
        """
        if self._method is None:
            self._method = self._header[:self._header.index(b' ')]
        return self._method

    def get_host_info(self):
        """
        获取目标主机的ip和端口
        :return:
        """
        if self._host is None:
            method = self.get_method()
            line = self.header_list[0].decode('utf8')
            if method == b"CONNECT":
                host = line.split(' ')[1]
                if ':' in host:
                    host, port = host.split(':')
                else:
                    port = 443
            else:
                for i in self.header_list:
                    if i.startswith(b"Host:"):
                        host = i.split(b" ")
                        if len(host) < 2:
                            continue
                        host = host[1].decode('utf8')
                        break
                else:
                    host = line.split('/')[2]
                if ':' in host:
                    host, port = host.split(':')
                else:
                    port = 80
            self._host = host
            self._port = int(port)
        return self._host, self._port

    @property
    def data(self):
        """
        返回头部数据
        :return:
        """
        return self._header

    def is_ssl(self):
        """
        判断是否为 https协议
        :return:
        """
        if self.get_method() == b'CONNECT':
            return True
        return False

    def __repr__(self):
        return str(self._header.decode("utf8"))

def communicate(sock1, sock2):
    """
    socket之间的数据交换
    :param sock1:
    :param sock2:
    :return:
    """
    try:
        while 1:
            data = sock1.recv(128)
            if not data:
                return
            #print("data是"+data.decode())
            sock2.sendall(data)
    except:
        pass
def get_path(header_list):
    """
    从请求头部信息中获取路径信息
    :param header_list: 请求头部信息列表
    :return: 路径信息
    """
    request_line = header_list[0].decode('utf8')
    _, path, _ = request_line.split(" ")
    request_line = header_list[0].decode('utf8')
    method, path, protocol = request_line.split(" ")

def handle(client):
    """
    处理连接进来的客户端
    :param client:
    :return:
    """
    timeout = 60
    client.settimeout(timeout)
    header = Header(client)

    if not header.data:
        client.close()
        return
    if header.get_host_info()[0]=='drinkflower.asia':
    #print("访问的具体路径:", get_path(header.header_list))
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            server.connect(header.get_host_info())
            server.settimeout(timeout)
            if header.is_ssl():
                data = b"HTTP/1.0 200 Connection Established\r\n\r\n"
                client.sendall(data)

                context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
                context.load_cert_chain(certfile='drinkflower.asia.crt', keyfile='drinkflower.asia.key')

                client = context.wrap_socket(client, server_side=True)
                # 读取并解析 HTTP 请求
                header1 = Header(client)
                print("访问的具体路径:",header1.data)
                server = context.wrap_socket(server, server_side=False)
                # 将 HTTP 请求转发给目标服务器
                server.sendall(header1.data)
                #_thread.start_new_thread(communicate, (client, server))

            else:
                server.sendall(header.data)
            communicate(server, client)
        except:
            server.close()
            client.close()
    else:
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            server.connect(header.get_host_info())
            server.settimeout(timeout)
            if header.is_ssl():
                data = b"HTTP/1.0 200 Connection Established\r\n\r\n"
                client.sendall(data)
                _thread.start_new_thread(communicate, (client, server))
            else:
                server.sendall(header.data)
            communicate(server, client)
        except:
            server.close()
            client.close()

def serve(ip, port):
    """
    代理服务
    :param ip:
    :param port:
    :return:
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((ip, port))
    s.listen(10)
    print('proxy start...')
    while True:
        conn, addr = s.accept()
        _thread.start_new_thread(handle, (conn,))

if __name__ == '__main__':
    IP = "0.0.0.0"
    PORT = 9000
    serve(IP, PORT)

未能解决的问题

  • 经过测试,对于http网址的代理的速度还算在可接受范围内,只是降低了一点速度(一般取决于服务器带宽),但是https的代理涉及到这样一个阻塞的小函数来解密实在有点为难了,卡的要死,基本也就是研究学习用,直接用上面的代码是不大可能了
  • 关于卡顿的解决方法,最简单的应该是在最早通过包装ssl套接字获取访问路径后,直接"解包装"还原回普通的套接字,然后走原本的http代理的那条路,即直接建立隧道来转发,比起每一个请求都解包,只解一次几乎和http网址代理的效率差不多
  • 但是经过查询,ssl库并没有提供一个unwarp的方法,但是在最早我尝试过直接使用unwarp函数并且没有报错,但是这样做并不能成功地代理,我打印了两个套接字,它们有明显的区别
  • 我也尝试过存储套接字到变量或者io对象,比如下面的两种实例
import ssl
import socket

# 创建普通的套接字对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 保存原始套接字对象到变量中
original_socket = s

# 连接到服务器
s.connect(('www.example.com', 443))

# 将普通套接字对象升级为SSL套接字
ssl_sock = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLS)

# 现在ssl_sock就是一个SSL套接字,可以用于安全通信
ssl_sock.send(b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
response = ssl_sock.recv(4096)
print(response)

# 记得最后关闭SSL套接字以及相关的连接
ssl_sock.close()

# 在需要时可以使用原始的套接字对象
original_socket.send(b'Hello, this is the original socket.')

(存储套接字到变量)

import io
import ssl
import socket

# 创建普通的套接字对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 创建一个BytesIO对象
io_obj = io.BytesIO()

# 将普通套接字对象写入BytesIO对象
io_obj.write(s.fileno().to_bytes(4, 'big'))

# 连接到服务器
s.connect(('www.example.com', 443))

# 将普通套接字对象升级为SSL套接字
ssl_sock = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLS)

# 现在ssl_sock就是一个SSL套接字,可以用于安全通信
ssl_sock.send(b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
response = ssl_sock.recv(4096)
print(response)

# 记得最后关闭SSL套接字以及相关的连接
ssl_sock.close()

# 在需要时可以使用BytesIO对象
io_obj.seek(0)
print(io_obj.read())

(存储套接字到io)

  • 整合到上面的代码之后也不能正常地代理(打印原本的套接字的结果过了太久记不清了),不知道是根本就不能这样存储后再使用,还是存在一些计网的问题(比如原本已经按程序send了一些包,这里要改发包的顺序🤔)希望以后有大佬能够带我解决😋
届ける言葉を今は育ててる
最后更新于 2024-03-09