Python教你从0搭建微信推送斗鱼直播提醒(单房间简化版)

2021年9月11日 8点热度 0条评论 来源: python可乐编程

重新推出重制版,单房间推送提醒版本(后文有扩展多房间思路),适合没有服务器的小伙伴学习使用。

Gitee文档同步更新:https://gitee.com/LGW_space/dy-live-lintener
语雀文档更为详细(包含成品案例):https://www.yuque.com/books/share/fdc7c120-e4eb-47d5-93de-dcf5d347e5a8?# 《斗鱼直播提醒服务》

斗鱼直播提醒 ——简化版

功能介绍:

当然在学习Python的道路上肯定会困难,没有好的学习资料,怎么去学习呢? 
学习Python中有不明白推荐加入交流Q群号:928946953 
群里有志同道合的小伙伴,互帮互助, 群里有不错的视频学习教程和PDF!
还有大牛解答!

 

  • 主播开播后或更换房间标题后推送提醒到微信(单房间)

    > 理论上可以配置多个房间,可以达到和斗鱼全房间推送的效果一样。

项目截图:

<ignore_js_op>

 

[TOC]

一、项目准备

1. 用到的技术内容

  • 云函数
  • 对象存储(为什么使用对象存储,下文有介绍)
  • python(3.6/3.7)
  • WxPusher

2. 需要的工具

  • 腾讯云(云函数和云对象存储)

二、开通对象存储

这里我们使用对象存储相当于数据库的一个作用,为什么使用对象存储呢?
> 对象存储采用扁平的文件组织方式,所以在文件量上升至千万、亿级别,容量在PB级别的时候,这种文件组织方式下的性能优势就显现出来了,文件不在有目录树深度的问题,历史和近线数据有同样的访问效率。另外,对象存储多采用分布式架构,

简而言之就是便宜效率高

官方入门步骤

1. 注册腾讯云

官方的注册方法

2. 开通 COS 服务

在 腾讯云控制台 中,选择【云产品】>【对象存储】,进入 COS 控制台,按照界面提示开通 COS 服务。(如果您已开通,请跳过该步骤。)

3. 创建存储桶

我们需要创建一个用于存放对象的存储桶:

  1. 在 对象存储控制台 左侧导航栏中单击【存储桶列表】,进入存储桶管理页。

  2. 单击【创建存储桶】,输入以下配置信息,其他配置保持默认即可。

    • 名称:输入存储桶名称。名称设置后不可修改。此处举例输入 examplebucket。
    • 所属地域:存储桶所属地域,选择与您业务最近的一个地区,例如广州地域。
    • 访问权限:存储桶访问权限,此处我们保持默认为“私有读写”。
  3. 单击【确定】,即可创建完成。

三、云函数的开发

云函数我们使用腾讯云函数作为示例,阿里云也同理。(阿里云似乎是不允许触发器的触发周期低于1分钟)

1. 注册腾讯云

官方的注册方法

2. 创建云函数

  • 登录腾讯云,在云产品里找到云函数,在【函数服务】里新建云函数
  • 选择【自定义创建】
  • 修改【函数名称】、【地域】、【运行环境(Python3.6)】、【提交方法(在线编辑)】、【执行方法(index.main_handler)】暂时不用修改默认代码。
  • 点击完成

3. 完善功能

在写代码之前,我们需要用到这几个关键信息:

  • appid :你的APPID(账号信息里有)
  • secret_id : 你的 SecretId (API密钥管理里获取)
  • secret_key :你的 SecretKey(API密钥管理里获取)
  • region :存储桶所在的地域
  • BUCKET:存储桶名称
  • token :没有可为空
  • FILE_NAME :你要上传的文件名 如room_info
  • WxPusherToken :WxPusher的用户appToken

3.1 引入所需包,完成基本配置

 复制代码 隐藏代码
# -*- coding: utf8 -*-
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
from qcloud_cos_v5 import CosServiceError
from qcloud_cos_v5 import CosClientError
import datetime
import json
import logging
import requests
from urllib.parse import parse_qs

logger = logging.getLogger()
logger.setLevel(logging.ERROR) # 默认打印 INFO 级别日志,可根据需要调整为 DEBUG、WARNING、ERROR、CRITICAL 级日志

appid = 123123  # Please replace with your APPID. 请替换为您的 APPID
secret_id = u'xxx'  # Please replace with your SecretId. 请替换为您的 SecretId
secret_key = u'xxx'  # Please replace with your SecretKey. 请替换为您的 SecretKey
region = u'ap-shanghai'  # Please replace with the region where COS bucket located. 请替换为您bucket 所在的地域
BUCKET = 'xxx'
FILE_NAME = 'xxx'
token = ''
WxPusherToken = 'xxxxxx'

# 配置存储桶
config = CosConfig(Secret_id=secret_id, Secret_key=secret_key, Region=region, Token=token)
client = CosS3Client(config)

 

注:qcloud_cos_v5 为 COS的SDK

3.2 创建方法 根据房间号获取直播状态

斗鱼第三方API,通过房间号获取房间信息,自行百度获取。
最后返回 房间信息

 复制代码 隐藏代码
def getRoomInfo():
url = '斗鱼官方API/房间号'
try:
r = requests.get(url, timeout=5)
except requests.exceptions.RequestException as e:
print(str(e))
return 'timeout'
else:
if r.status_code == 404:
return '404'
json_str = json.loads(r.text)
if isinstance(json_str, dict):
if 'error' in json_str.keys() and json_str['error'] == 0:
room_info = json_str['data']
return room_info

 

3.3 创建方法 发送WxPusher消息

复制代码 隐藏代码
def send_WxPusher_msg(c):
    headers = {
        'Content-Type': 'application/json'
    }
    url = 'http://wxpusher.zjiecode.com/api/send/message'
    parameter = {
        "appToken": WxPusherToken,
        "content": c,
        "contentType": 1,    # 内容类型 1表示文字  2表示html(只发送body标签内部的数据即可,不包括body标签) 3表示markdown 
        "topicIds":[ 
            你的主题号
        ]
    }
    r = requests.post(url=url, headers=headers,
                      data=json.dumps(parameter, ensure_ascii=False).encode('utf-8'))
    json_str = json.loads(r.text)
    datas = json_str['data']
    for data in datas:
        if data['code'] == 1001:
            logger.error(data['status'])

 

3.4 创建方法 更新(上传)文件

设置更新时间字段,用于锁定更新,在指定时间内不允许更新/重复推送(我猜是腾讯云多线程的问题,会导致重复更新、重复推送)

 复制代码 隐藏代码
def updateLocalInfo(roominfo):
DIC = {
"room_id": roominfo["room_id"],
"room_status": roominfo["room_status"],
"room_name": roominfo["room_name"],
"update_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
response = client.put_object(
Bucket=BUCKET,
Body=DIC,
Key=FILE_NAME
)
print("更新成功:",response['ETag'])

 

3.5 创建方法 获取文件

url获取:上传文件后,在存储桶内点击文件的详情基本信息对象地址信息
如 https://存储桶名.cos.地域.myqcloud.com/文件名

 复制代码 隐藏代码
def getLocalInfo():
url = '文件的对象地址'
r = requests.get(url)
info_str = r.text
params = parse_qs(info_str)
result = {key: params[key][0] for k
ey in params} return result

 


3.6 创建方法 信息整合

 复制代码 隐藏代码
def time_last(time_str): # 计算时间差
    t_r = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
    now_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    t_n = datetime.datetime.strptime(now_time_str, "%Y-%m-%d %H:%M:%S")
    return (t_n - t_r).seconds

def checkRoomInfo():
    room = getLocalInfo() # 获取历史房间信息
    curr_info = getRoomInfo() # 获取当前真实房间信息
    time_l = time_last(room['update_time']) # 计算当前时间和上次更新时间的时间差
    # 直播开播提醒
    if curr_info['room_status'] == "1": # 正在直播
        if room['room_status'] == "2":  # 之前状态为 未开播
            if time_l > 300:  # 300秒内无法重复推送
                content = '【开播】您关注的主播:' + curr_info['owner_name'] + '开始直播啦' + \
                            '\n房间标题:' + curr_info['room_name'] + \
                            '\n正在直播:' + curr_info['cate_name']
                send_WxPusher_msg(content)
                updateLocalInfo(roominfo=curr_info)
            else:
                logger.error(
                        '【开播】房间号:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n时间差:' + str(
                            time_l))
    else:
        if room['room_status'] == "1":  # 关闭了直播
            updateLocalInfo(roominfo=curr_info)

    # 更换房间标题提醒
    if curr_info['room_name'] != room['room_name']:
        if time_l > 300:
            content1 = '【更换标题】您关注的主播:' + curr_info['owner_name'] + '改变了房间标题' + \
                        '\n当前房间标题:' + curr_info['room_name']
            send_WxPusher_msg(content1)
            updateLocalInfo(roominfo=curr_info)
        else:
                logger.error(
                        '【更换标题】房间号:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n时间差:' + str(
                            time_l))

 

到这所有工能就基本全部完成啦~

开发扩展思路

多房间支持:支持一个房间就一定支持多个房间。

  • 下载文件:文件内容为字典/json数组,每一个字典记录房间的信息。
  • 遍历:获取到存储桶内的房间信息后,用斗鱼API遍历得到每一个房间的真实信息。
  • 更新/上传:通过比对真实信息,更新字典数组,最后再上传到COS。
    原文作者:python可乐编程
    原文地址: https://www.cnblogs.com/pythonQqun200160592/p/15241292.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。