背景
最近有一个业务模块需要对大量手机号的MD5值进行解析,这种场景大家应该见的比较少,在dsp系统系统中使用的比较多。
MD5本身不可解密,但是对于固定范围的数据源,可以通过碰撞、彩虹表来解密。
碰撞法一般的做法是将数据存储到数据库中(一般是是非关系型数据库),通过查询匹配返回碰撞结果。这种方法消耗大量的内存、硬盘空间。
彩虹表发本质上也是碰撞,但是相比于碰撞发节省了空间,增加了解析时间。 时间换空间
解决思路
彩虹表:
提供一篇在网上查询资料发现的文章50亿加密手机号md5快速存储及检索,rocksDB、redis等探索
一开始想用彩虹表解决这个需求,但是去网上了搜了一下,资料很少,并且彩虹表的论文网站也404了。。
使用了生成彩虹表的工具,生成了六个彩虹表,使用彩虹表对手机号的MD5进行解析,发现解析速度在解析量小的时候还行,但是碰到1K这种级别的数据就不行了,也有可能是那个解析工具的问题。并且破解的个数总是差几个。
ps:这个工具是老外在13年就搞出来的。
参考链接:密码破解的利器——彩虹表(rainbow table),彩虹表生成,RainbowCrack工具下载 后两个是一起的
工具类生成彩虹链的命令,文档中也有说到
rtgen hash_algorithm charset plaintext_len_min plaintext_len_max table_index chain_len chain_num part_index
由于目标是手机号,加密形式是md5 所以第二个参数md5 第三个参数选择numeric,第四第五个参数选择 11 数据位数范围,其余参数博主看了一下没什么影响就默认一致
rtgen md5 numeric 11 11 0 3800 33554432 0
还记得前面说的嘛 彩虹表的本质也是碰撞,所以多生成几个彩虹表,破解的概率也会变高,但是生成的彩虹表的数据无法自定义,并且是随机生成的,可能会导致垃圾数据的产生从而拖慢破解速度,这也是博主放弃的主要原因。
官方的实例:
An example to generate a rainbow table set with 6 rainbow tables:
主要变动的是第六个参数
rtgen md5 loweralpha-numeric 1 7 0 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 1 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 2 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 3 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 4 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 5 3800 33554432 0
生成完了之后输入命令
rtsort .
将生成的彩虹链转换成彩虹桌
碰撞法:
由于数据量大,都丢进内存不现实,公司现有环境也不支持。尝试使用了mongodb存储但还是内存占用过大,最终采用MySQL存储。
使用MySQL存储就面临一个问题,数据量和查询效率。一个表的数据量越大,查询效率就降低。
为了降低查询效率,有以下方案:
1.根据MD5前三位进行分类,一位有16种可能(09,af) 三位就是4096,4096张表 52亿数据 平均每张表一百万的数据(1,269,531条),MySQL一百万数据配合索引的加持查询效率还是可以的。
2.第二种方案是基于实现第一种方案过程中出现的问题而提出的(请看下文详细说明),只根据MD5的第一位进行分类,一共16张表。接着对16张表进行水平分表,分表依据是MD5值的排序。类似于分页,将导入前的MD5值进行排序,每100万
配置说明
博主配置:
CPU: i5-12400F
内存: 32G 2666MHz
硬盘: 256G固态 + 1TB机械
⭐CPU关系着排序速度,只不过这个不是必要条件。
⭐在运行的过程中一定要关注自己的存储空间够不够。
⭐内存一定要大,最起码32G,推荐48G以上。如果内存为32G的话,系统盘的空间要有60G的可有空间(最后一步需要将系统盘的一部分空间作为虚拟内存空间)。
代码
1. 生成手机号和对应的MD5值
手机号一共11位,前三位是号段一共有52个号段(不含虚拟号段),后8位随机。
略,这方面大家怎么写都可以,最简单的就是八个for循环嵌套。
2 对手机号和MD5值进行筛选排序
2.1 对文件进行分片
# 分片
def sliver(phoneNumber):
startTime = datetime.now()
# 改成自己的路径
# 每次读取100W数据
df = pd.read_csv('D:/phone/' + phoneNumber + '.csv', sep=',', chunksize=1000000, header=None)
df.columns = ['data', 'md5']
i = 0
for chunk in df:
# 改成自己的路径
chunk.to_csv('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv', index=False)
i = i + 1
# 改成自己的路径
os.remove('D:/phone/' + phoneNumber + '.csv')
print('耗时', datetime.now() - startTime)
2.2 根据MD5值的首位字符对文件进行筛选
# 将52亿数据按MD5的首位进行筛选 首位字符一共有16种可能:0~9,a~f phoneNumber是手机号段 如134
def sliverWithMD5(phoneNumber, i):
# 改成自己的路径
df = pd.read_csv('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv', sep=',')
print(phoneNumber + '_' + str(i) + '.csv 开始时间===>', datetime.now())
df.columns = ['data', 'md5']
# for循环 循环值为 0~9 a~f
for j in range(0, 16):
# 拼接字符串
md5 = str(hex(j))[2:]
# 改成自己的路径
df[df['md5'].str[:1] == md5].to_csv('E:/sliverByMD5/' + md5 + '.csv', index=False, mode='a', header=None)
print(phoneNumber + '_' + str(i) + '.csv 结束时间===>', datetime.now())
# 改成自己的路径
os.remove('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv')
2.3 对筛选后的MD5值进行排序
# 按照MD5值进行排序后追加保存 i的值为:[0,16) str(hex(i))[2:]后为0~9,a~f
def sortByMD5(i):
startTime = datetime.now()
print(str(hex(i))[2:] + '.csv 开始时间===>', datetime.now())
# 改成自己的路径
df = pd.read_csv('E:/sliverByMD5' + str(hex(i))[2:] + '.csv', sep=',', header=None)
df.columns = ['data', 'md5']
# 根据MD5进行排序
df.sort_values(by='md5', inplace=True)
# 改成自己的路径
df.to_csv('E:/final/' + str(hex(i))[2:] + '.csv', index=False, mode='a', header=None)
print(str(hex(i))[2:] + '.csv 结束时间===>', datetime.now())
print('耗时:', datetime.now() - startTime)
2.4 对追加保存的文件进行排序
# 最终排序 value是MD5值的首字母
def sortByMD5V3(value):
startTime = datetime.now()
print(value + '.csv 开始时间===>', datetime.now())
# 改成自己的路径
df = pd.read_csv('E:/final/' + value + '.csv', sep=',', header=None)
df.columns = ['data', 'md5']
# 根据MD5进行排序
df.sort_values(by='md5', inplace=True)
# 改成自己的路径
df.to_csv('E:/f/' + value + '.csv', index=False, mode='a', header=None)
print(value + '.csv 结束时间===>', datetime.now())
print('耗时:', datetime.now() - startTime)
2.5 完整代码
import os
from datetime import datetime
from multiprocessing import Pool
import numpy as np
# 降低精度 防止最后一步排序时OOM
array_ = np.zeros((10000,10000), dtype='float32')
import pandas as pd
# 分片
def sliver(phoneNumber):
startTime = datetime.now()
# 改成自己的路径
# 每次读取100W数据
df = pd.read_csv('D:/phone/' + phoneNumber + '.csv', sep=',', chunksize=1000000, header=None)
df.columns = ['data', 'md5']
i = 0
for chunk in df:
# 改成自己的路径
chunk.to_csv('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv', index=False)
i = i + 1
# 改成自己的路径
os.remove('D:/phone/' + phoneNumber + '.csv')
print('耗时', datetime.now() - startTime)
# 将52亿数据按MD5的首位进行筛选 首位字符一共有16种可能:0~9,a~f phoneNumber是手机号段 如134
def sliverWithMD5(phoneNumber, i):
# 改成自己的路径
df = pd.read_csv('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv', sep=',')
print(phoneNumber + '_' + str(i) + '.csv 开始时间===>', datetime.now())
df.columns = ['data', 'md5']
# for循环 循环值为 0~9 a~f
for j in range(0, 16):
# 拼接字符串
md5 = str(hex(j))[2:]
# 改成自己的路径
df[df['md5'].str[:1] == md5].to_csv('E:/sliverByMD5/' + md5 + '.csv', index=False, mode='a', header=None)
print(phoneNumber + '_' + str(i) + '.csv 结束时间===>', datetime.now())
# 改成自己的路径
os.remove('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv')
# 按照MD5值进行排序后追加保存 i的值为:[0,16) str(hex(i))[2:]后为0~9,a~f
def sortByMD5(i):
startTime = datetime.now()
print(str(hex(i))[2:] + '.csv 开始时间===>', datetime.now())
# 改成自己的路径
df = pd.read_csv('E:/sliverByMD5' + str(hex(i))[2:] + '.csv', sep=',', header=None)
df.columns = ['data', 'md5']
# 根据MD5进行排序
df.sort_values(by='md5', inplace=True)
# 改成自己的路径
df.to_csv('E:/final/' + str(hex(i))[2:] + '.csv', index=False, mode='a', header=None)
print(str(hex(i))[2:] + '.csv 结束时间===>', datetime.now())
print('耗时:', datetime.now() - startTime)
# 最终排序 value是MD5值的首字母
def sortByMD5Final(value):
startTime = datetime.now()
print(value + '.csv 开始时间===>', datetime.now())
# 改成自己的路径
df = pd.read_csv('E:/final/' + value + '.csv', sep=',', header=None)
df.columns = ['data', 'md5']
# 根据MD5进行排序
df.sort_values(by='md5', inplace=True)
# 改成自己的路径
df.to_csv('E:/f/' + value + '.csv', index=False, mode='a', header=None)
print(value + '.csv 结束时间===>', datetime.now())
print('耗时:', datetime.now() - startTime)
# 分布运行,避免OOM,同时也可以监控数据是否紊乱
if __name__ == '__main__':
# 第一步 分片
pool = Pool(8)
# 所有的手机号段 这是只是演示 实际使用时可以将52个手机号段分成几组依次跑
phoneList = ["177","180","181","189","191","193","199"]
# 先把一亿数据分成100个一百万小文件,一个号段一亿数据
for phoneNumber in phoneList:
pool.apply_async(sliver,args=(phoneNumber,))
pool.close()
pool.join()
# 第二步 根据MD5值的首位字符对文件进行筛选
pool = Pool(8)
# 所有的手机号段 这是只是演示 实际使用时可以将52个手机号段分成几组依次跑
phoneList = ["177","180","181","189","191","193","199"]
for phoneNumber in phoneList:
# 分片时 100W一个文件 一个号段一亿 所以100次循环
for i in range(0, 100):
pool.apply_async(sliverWithMD5, args=(phoneNumber, i))
pool.close()
pool.join()
# 第三步 按照MD5值进行排序后追加保存
pool = Pool(8)
for i in range(0, 16):
pool.apply_async(sortByMD5, args=(i,))
pool.close()
pool.join()
# 第四步 对追加保存的文件进行排序
# 这里不要使用多线程或者多进程 这里占用内存极大 应为将52/16亿的数据读到内存中进行排序了
# 推荐内存48G+、64G+ 32G内存也能跑但记得把虚拟内存调大 实际内存占用大概在50G左右 , 跑完记得检查自己硬盘空间够不够
for j in range(0, 16):
# 拼接字符串
md5 = str(hex(j))[2:]
sortByMD5Final(md5)
3.将筛选排序后的手机和MD5值导入数据库
3.1 公共方法
# 创建数据库连接池
def connectionPool(connectionList):
for i in range(0, 4):
connectionList.append(
pymysql.connect(host="127.0.0.1",
port=3306,
user="root",
password="root",
database="t_md5",
autocommit=False))
return connectionList
# 从连接池获取连接
def getConnection(connectionList):
# 从连接池中获取一个连接 如果没有连接可用 进程等待
while True:
if len(connectionList) > 0:
return connectionList.pop()
else:
# 等待一秒再重新获取
time.sleep(1)
# 释放连接到连接池
def releaseConnection(connection, connectionList):
connectionList.append(connection)
# 生成表名数组
def generateTableNameList():
tableNameList = []
for i in range(0, 16):
fileName = str(hex(i))[2:]
# 经过计算 16个文件 每个文件有163个表 能够覆盖52亿数据
for j in range(0, 163):
tableName = 't_md5_' + fileName + '_' + str(j)
tableNameList.append(tableName)
return tableNameList
3.2 创建表
# 在数据库中创建表:
def createTableInDB(connectionList, tableName):
connection = getConnection(connectionList)
try:
cursor = connection.cursor()
# MD5值32位,手机号11位,MD5值为主键索引
cursor.execute("CREATE TABLE IF NOT EXISTS " + tableName +
"(`md5` char(32) PRIMARY KEY NOT NULL, `data` char(11) NOT NULL)")
connection.commit()
print('创建表:' + tableName + '成功')
cursor.close()
releaseConnection(connection, connectionList)
except Exception as e:
print('创建表:' + tableName + '失败')
print(e)
releaseConnection(connection, connectionList)
if __name__ == '__main__':
connectionList = []
connectionPool(connectionList)
tableNameList = generateTableNameList()
# 根据个人需求选择是否开启多线程、多进程
for tableName in tableNameList:
createTableInDB(connectionList, tableName)
3.3 插入数据
# 批量插入
def batchInsert(connection, connectionList, data, tableName):
cursor = connection.cursor()
sql = "INSERT INTO " + 't_md5_' + tableName + "(`data`, `md5`) VALUES(%s, %s)"
try:
cursor.executemany(sql, data)
except Exception as e:
connection.rollback()
releaseConnection(connection, connectionList)
print(e)
connection.commit()
releaseConnection(connection, connectionList)
if __name__ == '__main__':
connectionList = []
# 创建连接池
connectionPool(connectionList)
# 0~9,a~f 一共16个文件
for i in range(0, 16):
fileName = str(hex(i))[2:]
startTime = datetime.now()
print('开始处理文件:' + fileName + '.csv,开始时间', startTime)
# 批量读取 每次读取两百万条 存到一张表中
df = pd.read_csv('E:/final/' + fileName + '.csv', chunksize=2000000)
j = 0
for chunk in df:
connection = getConnection(connectionList)
tableName = fileName + '_' + str(j)
tableStartTime = datetime.now()
print('开始处理表:t_md5_' + tableName)
batchInsert(connection, connectionList, chunk.values.tolist(), tableName)
j += 1
print('处理表:t_md5_' + tableName + '完成,' + '耗时:' + str(datetime.now() - tableStartTime) )
print('-----------------------------------------------------------------------')
print('处理文件:' + fileName + ',结束时间', datetime.now())
print('处理文件:' + fileName + ',耗时', datetime.now() - startTime)
3.4 查询每张表最后一条数据,检查插入是否准确
# 批量查询每张表最后一条的MD5值
def getLastMD5List(connectionList, tableNameList):
lastMD5List = []
for tableName in tableNameList:
connection = getConnection(connectionList)
lastMD5 = getLastMD5(connection, connectionList, tableName)
lastMD5List.append(lastMD5)
return lastMD5List
# 获取指定表的最后一条数据
def getLastMD5(connection, connectionList, tableName):
cursor = connection.cursor()
sql = "SELECT md5 FROM " + tableName + " ORDER BY md5 DESC LIMIT 1"
try:
cursor.execute(sql)
result = cursor.fetchone()
if result is None:
print('表:' + tableName + '没有数据')
return None
else:
return result[0]
except Exception as e:
print('查询表:' + tableName + '最后一条MD5失败')
print(e)
finally:
releaseConnection(connection, connectionList)
# 这个检查其实没有多大意义 主要还是为了下一步的查询做准备
# 在能保证每张表都插入200W数据的前提下,直接看最后一张表的数据量就知道有没有漏了
if __name__ == '__main__':
# 创建数据库连接池
connectionList = []
connectionPool(connectionList)
# 获取所有表名
tableNameList = generateTableNameList()
# 根据自己的需求决定是否开启多线程、多进程
# 默认不开启
result = getLastMD5List(connectionList, tableNameList)
print(result)
3.5 完整代码
数据库名为:t_md5
表名格式为:t_md5_x_y, x为016的16进制值, y为0163
import time
from datetime import datetime
from multiprocessing import Pool
import pandas as pd
import pymysql
pymysql.install_as_MySQLdb()
# 创建数据库连接池
def connectionPool(connectionList):
for i in range(0, 4):
connectionList.append(
pymysql.connect(host="127.0.0.1",
port=3306,
user="root",
password="root",
database="t_md5",
autocommit=False))
return connectionList
# 从连接池获取连接
def getConnection(connectionList):
# 从连接池中获取一个连接 如果没有连接可用 线程等待
while True:
if len(connectionList) > 0:
return connectionList.pop()
else:
# 等待一秒再重新获取
time.sleep(1)
# 释放连接到连接池
def releaseConnection(connection, connectionList):
# 释放连接并通知等待线程
connectionList.append(connection)
# 生成表名列表
def generateTableNameList():
tableNameList = []
for i in range(0, 16):
fileName = str(hex(i))[2:]
# 经过计算 16个文件 每个文件有163个表 能够覆盖52亿数据
for j in range(0, 163):
tableName = 't_md5_' + fileName + '_' + str(j)
tableNameList.append(tableName)
return tableNameList
# 在数据库中创建表:
def createTableInDB(connectionList, tableName):
connection = getConnection(connectionList)
try:
cursor = connection.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS " + tableName +
"(`md5` char(32) PRIMARY KEY NOT NULL, `data` char(11) NOT NULL)")
connection.commit()
print('创建表:' + tableName + '成功')
cursor.close()
releaseConnection(connection, connectionList)
except Exception as e:
print('创建表:' + tableName + '失败')
print(e)
releaseConnection(connection, connectionList)
def start():
connectionList = []
# 创建连接池
connectionPool(connectionList)
# 0~9,a~f 一共16个文件
for i in range(0, 16):
fileName = str(hex(i))[2:]
startTime = datetime.now()
print('开始处理文件:' + fileName + ',开始时间', startTime)
# 批量读取 每次读取两百万条 存到一张表中
df = pd.read_csv('E:/final/' + fileName + '.csv', chunksize=2000000)
i = 0
for chunk in df:
connection = getConnection(connectionList)
tableName = fileName + '_' + str(i)
tableStartTime = datetime.now()
print('开始处理表:' + tableName)
batchInsert(connection, connectionList, chunk.values.tolist(), tableName)
i += 1
print('处理表:' + tableName + '完成,' + '耗时:' + str(datetime.now() - tableStartTime) )
print('-----------------------------------------------------------------------')
print('处理文件:' + fileName + ',结束时间', datetime.now())
print('处理文件:' + fileName + ',耗时', datetime.now() - startTime)
if __name__ == '__main__':
# 第一步创建表
connectionList = []
connectionPool(connectionList)
tableNameList = generateTableNameList()
pool = Pool(8)
for tableName in tableNameList:
# 开启多进程创建表
pool.apply_async(createTableInDB, args=(connectionList, tableName))
# 第二步 批量插入数据
start()
4 运行耗时
博主采用的是分组对手机号段进行分片、排序,最终一起导入数据库的方案
由于写这篇博客时已经在导入数据库阶段了,所以就只截图了导入数据库耗时。但其他步骤耗时大约都在十几分钟左右一组(一组9个手机号段,固态硬盘的前提下)
导入数据库耗时:
⭐说明:博主数据库使用的是机械硬盘,所以插入速度慢(硬件瓶颈),硬盘写入速度在3m/s到9m/s,如果使用固态硬盘速度将大幅提升写入速度起码在200m/s+
5 根据手机号MD5值查询手机号
略,提供思路。此时以及完成数据库的导入。数据库中有16个类型,每个类型163张表。
根据要查询的MD5首位,可以确定要查询的手机号在哪一个类型的163表中。此时查询范围缩小到163张表了。
那么如何在这163张表中找出需求数据所在表呢?还记得将数据入库的方式嘛?按照排序入库,每张表200万数据。
我们只需要将每张表的最后一条数据的MD5值提取出来存起来(list、set、数组都可以)。
一个简单的查询算法:二分查找,查询时将要查询的MD5值与提取出来的MD5值进行比较(equals),查找出目标MD5值在那两个值中间。