在IOS双开微信的时候,由于是对原版微信砸壳后更改包名,再签名的方式,会导致双开微信的推送失效。大多数情况下只能使用常驻内存的插件让微信常驻IOS内存,但是这样会加速手机费电,皮卡车插件有后台利用bark进行推送转发的功能,但是这样就必须要有一台ipad通电长开负责转发,没有闲置ipad或者不想长通电的就很难解决。
github上有一个python库wcferry,能够获取PC版的微信消息,并处理。https://github.com/lich0821/WeChatFerry
wcferry的帮助文件,https://wechatferry.readthedocs.io/zh/latest/index.html
用wcferry,可以很好的解决离线推送问题。
一、先说没有解决的问题
- 本解决方案并不是收到推送转发推送,而是收到消息利用bark来通知提醒,所以还是没办法和微信原版推送保持一致的,比如微信通话就没办法用bark来转发
- wcferry的消息类型规则中并没有群被屏蔽的区别,因此关于群消息的推送逻辑很难和原版一致,私信消息也不是独立的类型,而是分成图片、视频、文字等类型,因此在推送转发时,我只设置了常用类型
- bark的操作方式我就不做介绍了,需要可以给我留言单独问
- 因为用的PC微信,所以不能再登录PC版了,会顶掉
二、关于微信版本和处理
wcferry专用版本
wcferry只能拦截3.9.2.23版本以前的微信,这边预留一个,据说已经支持到3.9.10.27了
版本更新处理方式
限制微信更新的方法有很多,最简单的方式就是更改hosts文件
127.0.0.1 dldir1.qq.com #屏蔽腾讯软件升级。缺点:仍有升级提示。
127.0.0.1 dldir1v6.qq.com #屏蔽腾讯软件升级。缺点:仍有升级提示。
真的搞不定就用这个:禁止升级
旧版本登录问题
关于旧版本登录,可以使用pymem,来修改版本标识绕过微信验证的。在pip install pymem之后,使用代码如下:
from pymem import Pymem
ADDRS = [0x2FFEAF8, 0x3020E1C, 0x3021AEC, 0x303C4D8, 0x303FEF4, 0x3040FA4, 0x30416EC]
def fix_version(pm: Pymem):
WeChatWindll_base = 0
for m in list(pm.list_modules()):
path = m.filename
if path.endswith("WeChatWin.dll"):
WeChatWindll_base = m.lpBaseOfDll
break
for offset in ADDRS:
addr = WeChatWindll_base + offset
v = pm.read_uint(addr)
if v == 0x63090A13: # 已经修复过了
continue
elif v != 0x63090217: # 不是 3.9.2.23 修复也没用
raise Exception("别修了,版本不对,修了也没啥用。")
pm.write_uint(addr, 0x63090A13)
print("好了,可以扫码登录了")
if __name__ == "__main__":
try:
pm = Pymem("WeChat.exe")
fix_version(pm)
except Exception as e:
print(f"{e},请确认微信程序已经打开!")
注意:需要先打开微信,进入登录界面后,再运行该py
三、wcferry的介绍
展开看函数说明
wcferry
Submodules
Classes
Package Contents
- classwcferry.Wcf(host: str = None, port: int = 10086, debug: bool = True, block: bool = True)
- WeChatFerry, 一个玩微信的工具。
- 参数:
- download_attach(id: int, thumb: str, extra: str)int
- 下载附件(图片、视频、文件)。这方法别直接调用,下载图片使用 download_image。
- enable_recv_msg(callback: Callable[[wcferry.wxmsg.WxMsg], None] = None)bool
- (不建议使用)设置接收消息回调,消息量大时可能会丢失消息
自 3.7.0.30.13 版本弃用.
- get_audio_msg(id: int, dir: str, timeout: int = 3)str
- 获取语音消息并转成 MP3 :param id: 语音消息 id :type id: int :param dir: MP3 保存目录(目录不存在会出错) :type dir: str :param timeout: 超时时间(秒) :type timeout: int
- 返回:
- 成功返回存储路径;空字符串为失败,原因见日志。
- 返回类型:
- str
- get_chatroom_members(roomid: str)Dict
- 获取群成员
- 参数:
- roomid (str) — 群的 id
- 返回:
- 群成员列表: {wxid1: 昵称1, wxid2: 昵称2, …}
- 返回类型:
- Dict
- get_contacts()List[Dict]
- 获取完整通讯录
- get_friends()List[Dict]
- 获取好友列表
- get_msg(block=True)wcferry.wxmsg.WxMsg
- 从消息队列中获取消息
- get_msg_types()Dict
- 获取所有消息类型
- get_tables(db: str)List[Dict]
- 获取 db 中所有表
- 参数:
- db (str) — 数据库名(可通过 get_dbs 查询)
- 返回:
- db 下的所有表名及对应建表语句
- 返回类型:
- List[Dict]
- get_user_info()Dict
- 获取登录账号个人信息
- keep_running()
- 阻塞进程,让 RPC 一直维持连接
- send_rich_text(name: str, account: str, title: str, digest: str, url: str, thumburl: str, receiver: str)int
- 发送富文本消息 卡片样式:
- LOG
- cmd_socket
- cmd_url
- contacts= []
- host
- msgQ
- msg_socket
- msg_url
- port
- recv_timeout= 5000
- sdk= None
- self_wxid= ”
- send_timeout= 5000
函数名称 | 描述 | 返回类型 |
---|---|---|
cleanup |
关闭连接 回收资源 | None |
keep_running |
阻塞进程(让 RPC 一直维持连接) | |
is_receiving_msg |
是否已启动接收消息功能 | bool |
get_qrcode |
获取登录二维码(已经登录则返回空字符串) | str |
is_login |
检查登录状态 | bool |
get_self_wxid |
获取登录账号的 wxid | str |
get_msg_types |
获取所有消息类型 | Dict |
get_contacts |
获取所有联系人 | List[Dict] |
get_friends |
获取所有好友 | List[Dict] |
get_dbs |
获取数据库 | List[str] |
get_tables |
获取某数据库下的表 | List[Dict] |
get_user_info |
获取登录账号个人信息 | Dict |
get_audio_msg |
取语音消息并转成 MP3 | str |
send_text |
发送文本消息(可 @) | int |
_download_file |
下载文件 | str |
_process_path |
处理路径(如果是网络路径则下载文件) | str |
send_image |
发送图片(非线程安全) | int |
send_file |
发送文件(非线程安全) | int |
send_xml |
发送 XML | int |
send_emotion |
发送表情 | int |
send_rich_text |
发送富文本消息 | int |
send_pat_msg |
拍一拍群友 | int |
forward_msg |
转发消息 | int |
get_msg |
从消息队列中获取消息 | WxMsg |
enable_receiving_msg |
允许接收消息 | bool |
enable_recv_msg |
允许接收消息(旧接口) | bool |
disable_recv_msg |
停止接收消息 | bool |
query_sql |
执行 SQL 查询 | List[Dict] |
accept_new_friend |
接受好友申请 | int |
refresh_pyq |
刷新朋友圈 | int |
download_attach |
下载附件 | int |
get_info_by_wxid |
通过 wxid 查询微信号昵称等信息 | dict |
revoke_msg |
撤回消息 | int |
decrypt_image |
解密图片 | str |
get_ocr_result |
获取 OCR 结果 | str |
download_image |
下载图片 | str |
add_chatroom_members |
添加群成员 | int |
del_chatroom_members |
删除群成员 | int |
invite_chatroom_members |
邀请群成员 | int |
get_chatroom_members |
获取群成员 | Dict |
get_alias_in_chatroom |
获取群名片 | str |
receive_transfer |
接收转账 | int |
电脑端检测登录微信
from wcferry import Wcf
wcf = Wcf()
检测微信登陆状态
检查当前 PC 端微信登陆状态?
from wcferry import Wcf
wcf = Wcf()
print(wcf.is_login())
获取登录账号信息
获取当前 PC 端微信账号信息?
from wcferry import Wcf
wcf = Wcf()
print(wcf.get_user_info())
运行结果
{'wxid': 'wxid_***', 'name': '字里行间', 'mobile': '195********', 'home': 'C:\\Users\\Administrator\\Documents\\WeChat Files\\'}
开辟线程监听群消息
开启线程监听消息:判断是否是群消息?
from queue import Empty
from threading import Thread
from wcferry import Wcf, WxMsg
wcf = Wcf()
def processMsg(msg: WxMsg):
if msg.from_group():
print(msg.content)
def enableReceivingMsg():
def innerWcFerryProcessMsg():
while wcf.is_receiving_msg():
try:
msg = wcf.get_msg()
processMsg(msg)
except Empty:
continue
except Exception as e:
print(f"ERROR: {e}")
wcf.enable_receiving_msg()
Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()
enableReceivingMsg()
wcf.keep_running()
微信消息属性说明
class WxMsg() 微信消息属性说明
属性说明
字段名 | 类型 | 描述 |
---|---|---|
type | int | 消息类型 可通过 get_msg_types 获取 |
id | str | 消息 id |
xml | str | 消息 xml 部分 |
sender | str | 消息发送人 |
roomid | str | (仅群消息有)群 id |
content | str | 消息内容 |
thumb | str | 视频或图片消息的缩略图路径 |
extra | str | 视频或图片消息的路径 |
消息类型
from wcferry import Wcf
wcf = Wcf()
print(wcf.get_msg_types())
消息类型编号 | 消息类型描述 | 属性 |
---|---|---|
0 | 朋友圈消息 | int |
1 | 文字 | int |
3 | 图片 | int |
34 | 语音 | int |
37 | 好友确认 | int |
40 | POSSIBLEFRIEND_MSG | int |
42 | 名片 | int |
43 | 视频 | int |
47 | 石头剪刀布 | 表情图片 |
48 | 位置 | int |
49 | 共享实时位置、文件、转账、链接 | int |
50 | VOIPMSG | int |
51 | 微信初始化 | int |
52 | VOIPNOTIFY | int |
53 | VOIPINVITE | int |
62 | 小视频 | int |
66 | 微信红包 | int |
9999 | SYSNOTICE | int |
10000 | 红包、系统消息 | int |
10002 | 撤回消息 | int |
1048625 | 搜狗表情 | int |
16777265 | 链接 | int |
436207665 | 微信红包 | int |
536936497 | 红包封面 | int |
754974769 | 视频号视频 | int |
771751985 | 视频号名片 | int |
822083633 | 引用消息 | int |
922746929 | 拍一拍 | int |
973078577 | 视频号直播 | int |
974127153 | 商品链接 | int |
975175729 | 视频号直播 | int |
1040187441 | 音乐链接 | int |
1090519089 | 文件 | int |
根据群名称查询群 wxid
特别注意:Wcf 没有提供根据群名称查询群 wxid 功能。我们可以先获取全部联系人数据(微信好友、微信群等等),基于 wxid 进行区分,因为微信群 wxid 后缀都是 “chatroom” 结尾。
from wcferry import Wcf, WxMsg
wcf = Wcf()
wcf_rooms = []
for contact in wcf.get_contacts():
if contact['wxid'].endswith("chatroom"):
wcf_rooms.append(contact)
def get_chatroom_roomid(wcf_rooms: list, room_name: str):
for room in wcf_rooms:
if room['name'] == room_name:
return room['wxid']
return None
room_id = get_chatroom_roomid(wcf_rooms=wcf_rooms, room_name="测试群")
定时发送群文件
如何进行定时发送群文件?通过 aspschedule 第三方库实现定时任务,然后调用 wcf.send_file 函数执行发送文件的消息。
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from wcferry import Wcf
wcf = Wcf()
wcf_rooms = []
for contact in wcf.get_contacts():
if contact['wxid'].endswith("chatroom"):
wcf_rooms.append(contact)
def get_chatroom_roomid(wcf_rooms: list, room_name: str):
for room in wcf_rooms:
if room['name'] == room_name:
return room['wxid']
return None
def schedule_task_job(room_id: str, wcf: Wcf):
wcf.send_file(path="test.txt", receiver=room_id)
customize_time = "2024-05-09 09:10:10"
customize_room = "唤醒手腕测试群"
run_date = datetime.strptime(customize_time, "%Y-%m-%d %H:%M:%S")
room_id = get_chatroom_roomid(wcf_rooms, customize_room)
scheduler = BackgroundScheduler()
scheduler.add_job(schedule_task_job, args=(room_id, wcf), run_date=run_date)
scheduler.start()
wcf.keep_running()
监听保存语音消息
from queue import Empty
from threading import Thread
from wcferry import Wcf, WxMsg
wcf = Wcf()
def processMsg(msg: WxMsg):
if msg.from_group():
response = wcf.get_audio_msg(id=msg.id, dir=f"audio")
print("语音地址:" + response)
def enableReceivingMsg():
def innerWcFerryProcessMsg():
while wcf.is_receiving_msg():
try:
msg = wcf.get_msg()
processMsg(msg)
except Empty:
continue
except Exception as e:
print(f"ERROR: {e}")
wcf.enable_receiving_msg()
Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()
enableReceivingMsg()
wcf.keep_running()
四、开始制作推送的代码
pip install wcferry 安装wcferry,然后使用以下代码:
from queue import Empty
from threading import Thread
import requests, urllib3
from wcferry import Wcf, WxMsg
wcf = Wcf()
def processMsg(msg: WxMsg):
bl = "这里填自己的bark推送链接"
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
numbers = [1,3,34,42,43,47,48,49,50,53,52,66]
if msg.from_group():
if msg.is_at(wcf.get_self_wxid()):
try:
requests.get(bl + '有人at你')
except Exception as e:
print('Reason:', e)
elif not msg.from_self():
if msg.type in numbers:
try:
requests.get(bl + '有人私聊你')
except Exception as e:
print('Reason:', e)
def enableReceivingMsg():
def innerWcFerryProcessMsg():
while wcf.is_receiving_msg():
try:
msg = wcf.get_msg()
processMsg(msg)
except Empty:
continue
except Exception as e:
print(f"ERROR: {e}")
wcf.enable_receiving_msg()
Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()
enableReceivingMsg()
wcf.keep_running()
注意把中间的bark推送链接改成自己的,就是bark里红圈位置的链接。这个py是用wcferry,对收到的群at消息,私信的图片消息、文字消息、视频消息等,进行bark推送,由于wcferry没有区分公众号消息和私信消息的差别,所以公众号如果只推文字、视频时也会bark提醒。
原创文章,作者:熊阿初,如若转载,请注明出处:https://www.guofc.com/1176.html