当前位置:首页 > 新闻动态 > 网站文章

用腾讯云函数做 openai 代理,python 的实现方式

来源: 浏览:140 时间:2023-08-11

对于一个程序猿来说,如果对八苦排序,求不得苦应该排第一

云函数配置

用云函数不是好方案,但绝对是最简单的方案。

这是腾讯云函数的访问地址:

https://console.cloud.tencent.com/scf/list?rid=18&ns=default

配置步骤:

  • 选模版创建
  • Flask 框架模版
  • 按默认走,一直到部署完成
  • 在函数配置里,将超时设置为 60
  • 在函数代码里,修改 app.py (将下面贴出来的代码直接覆盖即可)
  • 再部署,搞定
  • 部署完成后,下面有生成的访问路径,那个就是客户端要访问的地址

网上已经有不少讲其他语言如何做的了,我这儿贴一个 python 的简单实现,支持 SSE。

python 代理程序

from flask import Flask, request, abort, Response
import requests
import os
import json
IS_SERVERLESS = bool(os.environ.get('SERVERLESS'))
# print(IS_SERVERLESS)
app = Flask(__name__)
timeout = 60  # 请求超时时间(秒)
@app.route('/', defaults={'path': ''})
@app.route('/', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(path):
    url = f"https://api.openai.com/{path}"
token = request.headers.get('Authorization', None)
    if token is None:
        abort(403,'No Token')
openai_key = token.split(' ')[1]
    if openai_key is None:
        abort(403,'No API Key')
options = {
        'headers': {
            'Content-Type': 'application/json; charset=utf-8',
            'Authorization': f"Bearer {openai_key}"
        }
    }
# 如果请求数据类型是json
    if request.is_json:
        options['json'] = request.json
    else:
        options['data'] = request.get_data()
try:
        # 如果是 chat completion 和 text completion 请求,使用 SSE
        if options['json'].get('stream',False):
            response = requests.request(request.method,url,**options, timeout=timeout, stream=True)
            if response.ok:
                def generate():
                    for line in response.iter_lines():
                        if line:
                            yield f"{line.decode('utf-8')}

"
                    yield "data: [DONE]

"
return Response(generate(), mimetype='text/event-stream')
        else:
            response = requests.request(request.method,url,**options, timeout=timeout)
            return Response(response.content, content_type=response.headers['Content-Type'])
except requests.exceptions.RequestException as e:
        print(e)
        return Response(json.dumps({"error": str(e)}), status=500, content_type='application/json')
# 启动服务,监听 9000 端口,监听地址为 0.0.0.0
app.run(debug=IS_SERVERLESS != True, port=9000, host='0.0.0.0',threaded=True)

几个注意事项:

  • 据传,有用香港服务器后 Key 被拒,所以小心点,选个东京或者首尔
  • 儿童节之后就没免费午餐了
  • 为了支持 SSE,客户端就别用 openai 的官方库访问了
  • 不是长久之计,用来做测试还是可以的


python 客户端

api 调用参考源码:

https://github.com/acheong08/ChatGPT/blob/main/src/revChatGPT/V3.py

我为了调试方便,将代码拷贝过来改了一些内容,不记得改了什么,这里贴出我改过的,用这个代码就不用装官方库(openai)了,直接是通过 requests 访问的,主要是支持 SSE 模式。

官方api 替代(支持SSE):chatapi.py

import argparse
import json
import os
import sys
from typing import NoReturn
import requests
import tiktoken
from utils import create_completer
from utils import create_session
from utils import get_input
ENGINE = os.environ.get("GPT_ENGINE") or "gpt-3.5-turbo"
class Chatbot:
    """
    Official ChatGPT API
    """
def __init__(
        self,
        api_key: str,
        engine: str = None,
        proxy: str = None,
        max_tokens: int = 3000,
        temperature: float = 0.5,
        top_p: float = 1.0,
        presence_penalty: float = 0.0,
        frequency_penalty: float = 0.0,
        reply_count: int = 1,
        system_prompt: str = "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally",
        api_base = 'https://api.openai.com/v1/chat/completions'
    ) -> None:
        """
        Initialize Chatbot with API key (from https://platform.openai.com/account/api-keys)
        """
        self.engine = engine or ENGINE
        self.session = requests.Session()
        self.api_key = api_key
        self.proxy = proxy
        if self.proxy:
            proxies = {
                "http": self.proxy,
                "https": self.proxy,
            }
            self.session.proxies = proxies
        self.conversation: dict = {
            "default": [
                {
                    "role": "system",
                    "content": system_prompt,
                },
            ],
        }
        self.system_prompt = system_prompt
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.top_p = top_p
        self.presence_penalty = presence_penalty
        self.frequency_penalty = frequency_penalty
        self.reply_count = reply_count
        self.api_base = api_base
if self.get_token_count("default") > self.max_tokens:
            raise Exception("System prompt is too long")
def add_to_conversation(
        self,
        message: str,
        role: str,
        convo_id: str = "default",
    ) -> None:
        """
        Add a message to the conversation
        """
        self.conversation[convo_id].append({"role": role, "content": message})
def __truncate_conversation(self, convo_id: str = "default") -> None:
        """
        Truncate the conversation
        """
        while True:
            if (
                self.get_token_count(convo_id) > self.max_tokens
                and len(self.conversation[convo_id]) > 1
            ):
                # Don't remove the first message
                self.conversation[convo_id].pop(1)
            else:
                break
# https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
    def get_token_count(self, convo_id: str = "default") -> int:
        """
        Get token count
        """
        if self.engine not in ["gpt-3.5-turbo", "gpt-3.5-turbo-0301"]:
            raise NotImplementedError("Unsupported engine {self.engine}")
encoding = tiktoken.encoding_for_model(self.engine)
num_tokens = 0
        for message in self.conversation[convo_id]:
            # every message follows {role/name}
{content}

            num_tokens += 4
            for key, value in message.items():
                num_tokens += len(encoding.encode(value))
                if key == "name":  # if there's a name, the role is omitted
                    num_tokens += -1  # role is always required and always 1 token
        num_tokens += 2  # every reply is primed with assistant
        return num_tokens
def get_max_tokens(self, convo_id: str) -> int:
        """
        Get max tokens
        """
        return 4000 - self.get_token_count(convo_id)
def ask_stream(
        self,
        prompt: str,
        role: str = "user",
        convo_id: str = "default",
        **kwargs,
    ) -> str:
        """
        Ask a question
        """
        # Make conversation if it doesn't exist
        if convo_id not in self.conversation:
            self.reset(convo_id=convo_id, system_prompt=self.system_prompt)
        self.add_to_conversation(prompt, "user", convo_id=convo_id)
        self.__truncate_conversation(convo_id=convo_id)
        # Get response
        response = self.session.post(
            self.api_base,
            headers={"Authorization": f"Bearer {kwargs.get('api_key', self.api_key)}"},
            json={
                "model": self.engine,
                "messages": self.conversation[convo_id],
                "stream": True,
                # kwargs
                "temperature": kwargs.get("temperature", self.temperature),
                "top_p": kwargs.get("top_p", self.top_p),
                "presence_penalty": kwargs.get("presence_penalty", self.presence_penalty),
                "frequency_penalty": kwargs.get("frequency_penalty", self.frequency_penalty),
                "n": kwargs.get("n", self.reply_count),
                "user": role,
                # "max_tokens": self.get_max_tokens(convo_id=convo_id),
            },
            stream=True,
        )
        if response.status_code != 200:
            raise Exception(
                f"Error: {response.status_code} {response.reason} {response.text}",
            )
        response_role: str = None
        full_response: str = ""
        for line in response.iter_lines():
            if not line:
                continue
            # Remove "data: "
            line = line.decode("utf-8")[6:]
            # print('line:',line)
            if line == "[DONE]":
                break
            resp: dict = json.loads(line)
            choices = resp.get("choices")
            if not choices:
                continue
            delta = choices[0].get("delta")
            if not delta:
                continue
            if "role" in delta:
                response_role = delta["role"]
            if "content" in delta:
                content = delta["content"]
                full_response += content
                yield content
        self.add_to_conversation(full_response, response_role, convo_id=convo_id)
def ask(
        self,
        prompt: str,
        role: str = "user",
        convo_id: str = "default",
        **kwargs,
    ) -> str:
        """
        Non-streaming ask
        """
        response = self.ask_stream(
            prompt=prompt,
            role=role,
            convo_id=convo_id,
            **kwargs,
        )
        full_response: str = "".join(response)
        return full_response
def rollback(self, n: int = 1, convo_id: str = "default") -> None:
        """
        Rollback the conversation
        """
        for _ in range(n):
            self.conversation[convo_id].pop()
def reset(self, convo_id: str = "default", system_prompt: str = None) -> None:
        """
        Reset the conversation
        """
        self.conversation[convo_id] = [
            {"role": "system", "content": system_prompt or self.system_prompt},
        ]
def save(self, file: str, *convo_ids: str) -> bool:
        """
        Save the conversation to a JSON file
        """
        try:
            with open(file, "w", encoding="utf-8") as f:
                if convo_ids:
                    json.dump({k: self.conversation[k] for k in convo_ids}, f, indent=2)
                else:
                    json.dump(self.conversation, f, indent=2)
        except (FileNotFoundError, KeyError):
            return False
        return True
        # print(f"Error: {file} could not be created")
def load(self, file: str, *convo_ids: str) -> bool:
        """
        Load the conversation from a JSON  file
        """
        try:
            with open(file, encoding="utf-8") as f:
                if convo_ids:
                    convos = json.load(f)
                    self.conversation.update({k: convos[k] for k in convo_ids})
                else:
                    self.conversation = json.load(f)
        except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError):
            return False
        return True


折腾半天,最后真正要用的就是下面的客户端。

客户端代码: chat_client.py

import config
from chatapi import Chatbot
api_key = config.OPENAI_API_KEY
api_base = '你的云函数访问路径/v1/chat/completions'
msg = [{"role": "user", "content": 'Hello World'}]
bot = Chatbot(api_key=api_key,system_prompt='',api_base=api_base) 
while True:
    prompt = input('Prompt:')
    if len(prompt)<=0 or prompt == 'quit':
        break
    res = bot.ask_stream(prompt)
    print('ChatGPT:')
    print('-'*80)
    for content in res:
        print(content,end = '')
    print('
','-'*80,end = '

')

使用说明:

  • api_key = 你自己的 openai key
  • 你的云函数访问路径,替换成云函数部署后生成的路径
  • 将 chatapi.py 和 chat_client 放在同一目录
  • run chat_client

地址 · ADDRESS

地址:建邺区新城科技园嘉陵江东街18号2层

邮箱:309474043@qq.Com

点击查看更多案例

联系 · CALL TEL

400-8793-956

售后专线:025-65016872

业务QQ:309474043    售后QQ:1850555641

©南京安优网络科技有限公司 版权所有   苏ICP备12071769号-4  网站地图