背景

无意中被网友安利了123盘低价的超大存储空间,端午假期无事去看了眼,的确是便宜20T,3年才200多块钱。我最近一直在给我的文库找低价的文件存储服务器。文库的文件服务器有个特点:小文件特别多,总存储空间巨大,但是单个文件的下载量都不大。用阿里云的OSS存储成本有点大,自己的服务器呢,成本也不低,直到我看到了123盘,这个真的解决了我的问题。

如果您也有类似需求的可以点我链接注册哈:2T超大容量!点击领取注册

免费用户2T空间,VIP20T起,挺超值的

我当前的解决方案是所有的文件都存在了,cloudflare的R2上,每个月大概用5-10$之间,整体看起来也不贵,但是也会存在超量变成60多$的情况。

如果换有123盘来存储,并开通开发者3年存储228,每月20的开发者服务费,外加80/T的流量成本。算下来一年大概396元。cloudflare大概成本:预估每月7$全年预估600元。这么看下来123盘应该更划算一些。所以今天果断弃了cloudflare,改到123盘来提供服务。

Cloudflare R2,阿里云Oss有个好处是可以http直连,一直没考虑网盘是知道国内所有的网盘都不知道http直连,虽然通过一些第三方工具可以搞定http下载,但是稳定性堪忧,毕竟不是网盘官方出的协议。所以方案选型一直没考虑。昨天在研究123pan的时候发现他居然支持http直连,且可以自定义域名,够了,完全够用了,不过要额外付流量费,昨天测试下载了一下网盘里的windows,一下耗了4个G,但是我是下载了几分钟断的,但是他的统计还是以下载完统计的,这里的统计可能有点问题。但是我文库小文件无所畏惧。

实操

一、实操分阶段

  • 1.数据迁移
  • 2.服务部署测试
  • 3.服务切换
  • 4.用量监控

二、数据迁移

1.通过编写python,读取数据库里下载url,依次下载;同步利用123盘的开发者接口或者webdav上传到123盘提供的直连目录里,要保持原路径。

2.通过cloudflare对应的S3协议,批量下载批量上传,但是这个应该是有额外的费用。

所以最终选择的是方案1,代码如下:

import os
import hashlib
import requests
import pymysql
from urllib.parse import urlparse
import urllib3
import time
from datetime import datetime, timezone
import threading
from concurrent.futures import ThreadPoolExecutor
from pymysql.cursors import DictCursor
from dbutils.pooled_db import PooledDB
import random
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
import sys
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# 123盘API参数
CLIENT_ID = 'XXXXXXX'  #官方可以申请
CLIENT_SECRET = 'XXXXX'
PARENT_FILE_ID = '0'  # 字符串0

# 数据库配置
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': 'XXX',
    'database': '39_la',
    'charset': 'utf8mb4'
}

API_BASE = 'https://open-api.123pan.com'
DOWNLOAD_BASE = '/tmp/downloads'  # 本地下载根目录
MAX_WORKERS = 5  # 线程数,可根据服务器性能调整

# 数据库连接池
class DBPool:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._pool = PooledDB(
                    creator=pymysql,
                    maxconnections=MAX_WORKERS * 2,  # 最大连接数
                    mincached=MAX_WORKERS,  # 初始连接数
                    blocking=True,  # 连接池满时阻塞等待
                    ping=0,  # 不主动ping
                    **DB_CONFIG
                )
            return cls._instance

    def get_connection(self):
        return self._pool.connection()

# 数据库操作重试装饰器
def db_retry(max_retries=3, initial_delay=0.1):
    def decorator(func):
        def wrapper(*args, **kwargs):
            delay = initial_delay
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except pymysql.Error as e:
                    if attempt == max_retries - 1:  # 最后一次尝试
                        raise
                    if e.args[0] in (1205, 1213):  # 锁超时和死锁错误
                        time.sleep(delay + random.random() * delay)
                        delay *= 2  # 指数退避
                        continue
                    raise
            return None
        return wrapper
    return decorator

db_pool = DBPool()

# 配置日志
def setup_logging():
    logger = logging.getLogger('qianyi')
    logger.setLevel(logging.INFO)
    
    # 控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    console_formatter = logging.Formatter(
        '%(asctime)s [%(levelname)s] %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    console_handler.setFormatter(console_formatter)
    
    # 文件处理器
    file_handler = logging.FileHandler('qianyi.log')
    file_handler.setLevel(logging.INFO)
    file_formatter = logging.Formatter(
        '%(asctime)s [%(levelname)s] [%(threadName)s] %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    file_handler.setFormatter(file_formatter)
    
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

logger = setup_logging()

@db_retry()
def fetch_filepaths():
    with db_pool.get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(
                "SELECT contentid, filepath FROM wenku_content WHERE isdo=0 LIMIT 100 FOR UPDATE SKIP LOCKED"
            ) #用isdo标识是否处理,完成以及异常做标记,filepath就是要处理的字段值
            results = cursor.fetchall()
            if results:
                logger.info(f"获取到 {len(results)} 条待处理记录")
            return results

@db_retry()
def update_status(record_id, status):
    with db_pool.get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(
                "UPDATE wenku_content SET isdo=%s WHERE contentid=%s",
                (status, record_id)
            )
            conn.commit()
            logger.info(f"更新记录 {record_id} 状态为 {status}")

# Token管理
class TokenManager:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._token = None
                cls._instance._expired_at = None
            return cls._instance

    def get_token(self):
        now = datetime.now(timezone.utc)  # 使用UTC时间
        if self._token is None or (self._expired_at is not None and now >= self._expired_at):
            with self._lock:
                # Double check
                if self._token is None or (self._expired_at is not None and now >= self._expired_at):
                    self._refresh_token()
        return self._token

    def _refresh_token(self):
        data = {'clientID': CLIENT_ID, 'clientSecret': CLIENT_SECRET}
        headers = {
            'Content-Type': 'application/json',
            'Platform': 'open_platform'
        }
        resp = requests.post(f"{API_BASE}/api/v1/access_token", headers=headers, json=data, verify=False)
        res = resp.json()
        if res.get('code') != 0:
            raise Exception(res.get('message', '获取token失败'))
        self._token = res['data']['accessToken']
        # 将API返回的时间转换为UTC时间
        expiry_time = res['data']['expiredAt'].replace('Z', '+00:00')
        self._expired_at = datetime.fromisoformat(expiry_time)

# API请求管理
class RateLimiter:
    def __init__(self, max_per_second=5):
        self.interval = 1.0 / max_per_second
        self.last_request_time = 0
        self._lock = threading.Lock()

    def wait(self):
        with self._lock:
            current_time = time.time()
            elapsed = current_time - self.last_request_time
            if elapsed < self.interval:
                time.sleep(self.interval - elapsed)
            self.last_request_time = time.time()

rate_limiter = RateLimiter()
token_manager = TokenManager()

# 创建带重试机制的session
def create_retry_session():
    session = requests.Session()
    retry_strategy = Retry(
        total=5,  # 总重试次数
        backoff_factor=1,  # 重试间隔
        status_forcelist=[429, 500, 502, 503, 504],  # 需要重试的HTTP状态码
        allowed_methods=["GET", "POST", "PUT"],  # 允许重试的请求方法
        respect_retry_after_header=True  # 遵循响应头的Retry-After
    )
    adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=10, pool_maxsize=20)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session

http_session = create_retry_session()

def open_request(path, data=None, method='POST', params=None):
    rate_limiter.wait()
    headers = {
        'Content-Type': 'application/json',
        'Platform': 'open_platform',
        'Authorization': f'Bearer {token_manager.get_token()}'
    }
    url = API_BASE + path
    
    try:
        if method == 'GET':
            resp = http_session.get(url, headers=headers, params=params, verify=False, timeout=30)
        else:
            resp = http_session.post(url, headers=headers, params=params, json=data, verify=False, timeout=30)
        
        try:
            res = resp.json()
        except Exception as e:
            raise Exception(f"API响应解析失败: {e}")
            
        if res.get('code') != 0:
            raise Exception(res.get('message', '网络错误'))
            
        return res.get('data') or res
        
    except requests.exceptions.RequestException as e:
        # 网络相关错误,等待后重试
        time.sleep(random.uniform(1, 3))
        raise Exception(f"网络请求失败: {str(e)}")

def get_db_connection():
    return pymysql.connect(**DB_CONFIG)

def download_file(url, local_path):
    logger.info(f"开始下载文件: {url}")
    os.makedirs(os.path.dirname(local_path), exist_ok=True)
    try:
        r = http_session.get(url, stream=True, timeout=60)
        if r.status_code == 200:
            with open(local_path, 'wb') as f:
                for chunk in r.iter_content(1024 * 1024):
                    f.write(chunk)
            logger.info(f"文件下载成功: {local_path}")
            return True
        logger.error(f"文件下载失败,状态码: {r.status_code}")
        return False
    except Exception as e:
        logger.error(f"文件下载出错: {str(e)}")
        time.sleep(random.uniform(1, 3))
        return False

def md5_file(file_path):
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

def get_or_create_folder(parent_id, folder_name):
    parent_id = str(parent_id)
    res = open_request(
        '/api/v2/file/list',
        method='GET',
        params={'parentFileId': parent_id, 'limit': 100}
    )
    file_list = res.get('fileList', []) if isinstance(res, dict) else []
    for item in file_list:
        name = item.get('filename') or item.get('fileName')
        ftype = item.get('type') if 'type' in item else item.get('fileType')
        trashed = item.get('trashed', 0)
        if name == folder_name and ftype == 1 and trashed == 0:
            return str(item.get('fileId'))
    res = open_request('/upload/v1/file/mkdir', {
        'parentID': parent_id,
        'name': folder_name
    })
    return str(res['dirID'])

def get_final_parent_id(root_id, rel_path):
    parts = rel_path.split('/')
    dirs = parts[:-1]
    parent_id = str(root_id)
    for folder in dirs:
        parent_id = get_or_create_folder(parent_id, folder)
    return parent_id, parts[-1]

def put_part(url, part_bytes):
    try:
        resp = http_session.put(url, data=part_bytes, verify=False, timeout=60)
        if resp.status_code != 200:
            raise Exception(f'分片传输错误: {resp.status_code}')
    except requests.exceptions.RequestException as e:
        raise Exception(f"分片上传失败: {str(e)}")

def upload_file_123pan(parent, file_path, rel_path):
    logger.info(f"开始上传文件: {file_path}")
    try:
        parent_id, filename = get_final_parent_id(str(parent), rel_path)
        file_size = os.path.getsize(file_path)
        file_etag = md5_file(file_path)
        
        logger.info(f"创建上传任务: {filename} (大小: {file_size})")
        res_data = open_request('/upload/v1/file/create', {
            'parentFileID': str(parent_id),
            'filename': filename,
            'etag': file_etag,
            'size': file_size
        })
        
        if res_data['reuse']:
            logger.info(f"文件已存在,跳过上传: {filename}")
            return True
            
        upload_id = res_data['preuploadID']
        slice_size = res_data['sliceSize']
        logger.info(f"获取分片信息: uploadId={upload_id}, sliceSize={slice_size}")
        
        res_data = open_request('/upload/v1/file/list_upload_parts', {'preuploadID': upload_id})
        parts_map = {part['partNumber']: {'size': part['size'], 'etag': part['etag']} for part in res_data['parts']}
        
        total_parts = (file_size + slice_size - 1) // slice_size
        with open(file_path, 'rb') as f:
            for i in range(total_parts):
                part_num = i + 1
                part_bytes = f.read(slice_size)
                if not part_bytes:
                    break
                    
                part_etag = hashlib.md5(part_bytes).hexdigest()
                if part_num in parts_map and parts_map[part_num]['size'] == len(part_bytes) and parts_map[part_num]['etag'] == part_etag:
                    logger.info(f"分片 {part_num}/{total_parts} 已上传,跳过")
                    continue
                    
                logger.info(f"上传分片 {part_num}/{total_parts}")
                res_data = open_request('/upload/v1/file/get_upload_url', {'preuploadID': upload_id, 'sliceNo': part_num})
                put_part(res_data['presignedURL'], part_bytes)

        logger.info("完成分片上传,等待服务器处理")
        res_data = open_request('/upload/v1/file/upload_complete', {'preuploadID': upload_id})
        if res_data['completed']:
            logger.info(f"文件上传成功: {filename}")
            return True
            
        for attempt in range(20):
            time.sleep(5)
            res_data = open_request('/upload/v1/file/upload_async_result', {'preuploadID': upload_id})
            if res_data['completed']:
                logger.info(f"文件处理完成: {filename}")
                return True
            logger.info(f"等待文件处理完成: {attempt + 1}/20")
            
        logger.error(f"文件处理超时: {filename}")
        return False
    except Exception as e:
        logger.error(f"上传过程出错: {str(e)}")
        return False

def process_record(record):
    record_id, url = record
    logger.info(f"开始处理记录 {record_id}: {url}")
    
    if 'file.neimou.com' in url:
        url = url.replace('file.neimou.com', 'files.41112.net')
        logger.info(f"转换下载地址: {url}")
        
    rel_path = urlparse(url).path.lstrip('/')
    local_path = os.path.join(DOWNLOAD_BASE, 'neimou', rel_path)
    
    max_retries = 3
    retry_delay = 1
    
    for attempt in range(max_retries):
        try:
            if not download_file(url, local_path):
                if attempt == max_retries - 1:
                    logger.error(f"记录 {record_id} 下载失败,已重试 {max_retries} 次")
                    update_status(record_id, 2)
                    return
                logger.warning(f"记录 {record_id} 下载失败,将在 {retry_delay} 秒后重试")
                time.sleep(retry_delay)
                retry_delay *= 2
                continue
                
            upload_rel_path = os.path.join('neimou', rel_path)
            if upload_file_123pan(PARENT_FILE_ID, local_path, upload_rel_path):
                update_status(record_id, 1)
                logger.info(f"记录 {record_id} 处理成功")
                break
            else:
                if attempt == max_retries - 1:
                    logger.error(f"记录 {record_id} 上传失败,已重试 {max_retries} 次")
                    update_status(record_id, 2)
        except Exception as e:
            logger.error(f"记录 {record_id} 处理出错 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
            if attempt == max_retries - 1:
                update_status(record_id, 2)
            else:
                time.sleep(retry_delay)
                retry_delay *= 2
        finally:
            if os.path.exists(local_path):
                try:
                    os.remove(local_path)
                    logger.info(f"清理临时文件: {local_path}")
                except:
                    logger.warning(f"清理临时文件失败: {local_path}")

def main():
    logger.info("启动文件传输程序")
    cycle_count = 0
    
    while True:
        try:
            cycle_count += 1
            logger.info(f"开始第 {cycle_count} 轮处理")
            
            records = fetch_filepaths()
            if not records:
                logger.info("没有待处理的文件,程序退出")
                break
                
            with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
                logger.info(f"创建线程池,并发数: {MAX_WORKERS}")
                executor.map(process_record, records)
            
            logger.info(f"完成第 {cycle_count} 轮处理")
            time.sleep(1)
        except Exception as e:
            logger.error(f"主循环出错: {str(e)}")
            time.sleep(5)

if __name__ == '__main__':
    main()

以上代码我是借用cursor协助写的,充分考虑了123pan接口的限制,以及多线程,以及充分的容错处理等,可以直接使用。

二、服务部署:

123盘虽然支持自定义域名,以及http直连,但是毕竟不是OSS,目录路径还是不能直接使用。

默认获取到的长链接是http://xxx.aaa.com/直连目录/20221022/100002.docx 类似这样的路径,其中前面的域名可以定义为与cloudflare R2一致,但是这个直连目录还是无法去掉的,也就是比原来多了一个直连目录。所以我们可能还需要一台vps做反代去掉这级目录。原理及实操都很简单,可以直接宝塔操作。

三、服务切换

暂时迁移任务还在跑,等跑完再切换

By qidian

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注