背景

最近有一个业务模块需要对大量手机号的MD5值进行解析,这种场景大家应该见的比较少,在dsp系统系统中使用的比较多。

MD5本身不可解密,但是对于固定范围的数据源,可以通过碰撞、彩虹表来解密。

碰撞法一般的做法是将数据存储到数据库中(一般是是非关系型数据库),通过查询匹配返回碰撞结果。这种方法消耗大量的内存、硬盘空间。

彩虹表发本质上也是碰撞,但是相比于碰撞发节省了空间,增加了解析时间。 时间换空间

解决思路

彩虹表:

提供一篇在网上查询资料发现的文章50亿加密手机号md5快速存储及检索,rocksDB、redis等探索

一开始想用彩虹表解决这个需求,但是去网上了搜了一下,资料很少,并且彩虹表的论文网站也404了。。

使用了生成彩虹表的工具,生成了六个彩虹表,使用彩虹表对手机号的MD5进行解析,发现解析速度在解析量小的时候还行,但是碰到1K这种级别的数据就不行了,也有可能是那个解析工具的问题。并且破解的个数总是差几个。

ps:这个工具是老外在13年就搞出来的。

参考链接:密码破解的利器——彩虹表(rainbow table),彩虹表生成,RainbowCrack工具下载 后两个是一起的

工具类生成彩虹链的命令,文档中也有说到

rtgen hash_algorithm charset plaintext_len_min plaintext_len_max table_index chain_len chain_num part_index

由于目标是手机号,加密形式是md5 所以第二个参数md5 第三个参数选择numeric,第四第五个参数选择 11 数据位数范围,其余参数博主看了一下没什么影响就默认一致

rtgen md5 numeric 11 11 0 3800 33554432 0

还记得前面说的嘛 彩虹表的本质也是碰撞,所以多生成几个彩虹表,破解的概率也会变高,但是生成的彩虹表的数据无法自定义,并且是随机生成的,可能会导致垃圾数据的产生从而拖慢破解速度,这也是博主放弃的主要原因。

官方的实例:

An example to generate a rainbow table set with 6 rainbow tables:

主要变动的是第六个参数

rtgen md5 loweralpha-numeric 1 7 0 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 1 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 2 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 3 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 4 3800 33554432 0
rtgen md5 loweralpha-numeric 1 7 5 3800 33554432 0

生成完了之后输入命令

rtsort .

将生成的彩虹链转换成彩虹桌

碰撞法:

由于数据量大,都丢进内存不现实,公司现有环境也不支持。尝试使用了mongodb存储但还是内存占用过大,最终采用MySQL存储。

使用MySQL存储就面临一个问题,数据量和查询效率。一个表的数据量越大,查询效率就降低。

为了降低查询效率,有以下方案:

1.根据MD5前三位进行分类,一位有16种可能(09,af) 三位就是4096,4096张表 52亿数据 平均每张表一百万的数据(1,269,531条),MySQL一百万数据配合索引的加持查询效率还是可以的。

2.第二种方案是基于实现第一种方案过程中出现的问题而提出的(请看下文详细说明),只根据MD5的第一位进行分类,一共16张表。接着对16张表进行水平分表,分表依据是MD5值的排序。类似于分页,将导入前的MD5值进行排序,每100万

配置说明

博主配置:

CPU: i5-12400F

内存: 32G 2666MHz

硬盘: 256G固态 + 1TB机械

⭐CPU关系着排序速度,只不过这个不是必要条件。

⭐在运行的过程中一定要关注自己的存储空间够不够。

⭐内存一定要大,最起码32G,推荐48G以上。如果内存为32G的话,系统盘的空间要有60G的可有空间(最后一步需要将系统盘的一部分空间作为虚拟内存空间)。

代码

1. 生成手机号和对应的MD5值

手机号一共11位,前三位是号段一共有52个号段(不含虚拟号段),后8位随机。

略,这方面大家怎么写都可以,最简单的就是八个for循环嵌套。

2 对手机号和MD5值进行筛选排序

2.1 对文件进行分片

# 分片
def sliver(phoneNumber):
    startTime = datetime.now()
    # 改成自己的路径
    # 每次读取100W数据
    df = pd.read_csv('D:/phone/' + phoneNumber + '.csv', sep=',', chunksize=1000000, header=None)
    df.columns = ['data', 'md5']
    i = 0
    for chunk in df:
        # 改成自己的路径
        chunk.to_csv('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv', index=False)
        i = i + 1
    # 改成自己的路径
    os.remove('D:/phone/' + phoneNumber + '.csv')
    print('耗时', datetime.now() - startTime)

2.2 根据MD5值的首位字符对文件进行筛选

# 将52亿数据按MD5的首位进行筛选 首位字符一共有16种可能:0~9,a~f phoneNumber是手机号段 如134
def sliverWithMD5(phoneNumber, i):
    # 改成自己的路径
    df = pd.read_csv('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv', sep=',')
    print(phoneNumber + '_' + str(i) + '.csv 开始时间===>', datetime.now())
    df.columns = ['data', 'md5']
    # for循环 循环值为 0~9 a~f
    for j in range(0, 16):
        # 拼接字符串
        md5 = str(hex(j))[2:]
        # 改成自己的路径
        df[df['md5'].str[:1] == md5].to_csv('E:/sliverByMD5/' + md5 + '.csv', index=False, mode='a', header=None)
    print(phoneNumber + '_' + str(i) + '.csv 结束时间===>', datetime.now())
    # 改成自己的路径
    os.remove('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv')

2.3 对筛选后的MD5值进行排序

# 按照MD5值进行排序后追加保存 i的值为:[0,16) str(hex(i))[2:]后为0~9,a~f
def sortByMD5(i):
    startTime = datetime.now()
    print(str(hex(i))[2:] + '.csv 开始时间===>', datetime.now())
    # 改成自己的路径
    df = pd.read_csv('E:/sliverByMD5' + str(hex(i))[2:] + '.csv', sep=',', header=None)
    df.columns = ['data', 'md5']
    # 根据MD5进行排序
    df.sort_values(by='md5', inplace=True)
    # 改成自己的路径
    df.to_csv('E:/final/' + str(hex(i))[2:] + '.csv', index=False, mode='a', header=None)
    print(str(hex(i))[2:] + '.csv 结束时间===>', datetime.now())
    print('耗时:', datetime.now() - startTime)

2.4 对追加保存的文件进行排序

# 最终排序 value是MD5值的首字母
def sortByMD5V3(value):
    startTime = datetime.now()
    print(value + '.csv 开始时间===>', datetime.now())
    # 改成自己的路径
    df = pd.read_csv('E:/final/' + value + '.csv', sep=',', header=None)
    df.columns = ['data', 'md5']
    # 根据MD5进行排序
    df.sort_values(by='md5', inplace=True)
    # 改成自己的路径
    df.to_csv('E:/f/' + value + '.csv', index=False, mode='a', header=None)
    print(value + '.csv 结束时间===>', datetime.now())
    print('耗时:', datetime.now() - startTime)

2.5 完整代码

import os
from datetime import datetime
from multiprocessing import Pool

import numpy as np
# 降低精度 防止最后一步排序时OOM
array_ = np.zeros((10000,10000), dtype='float32')

import pandas as pd

# 分片
def sliver(phoneNumber):
    startTime = datetime.now()
    # 改成自己的路径
    # 每次读取100W数据
    df = pd.read_csv('D:/phone/' + phoneNumber + '.csv', sep=',', chunksize=1000000, header=None)
    df.columns = ['data', 'md5']
    i = 0
    for chunk in df:
        # 改成自己的路径
        chunk.to_csv('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv', index=False)
        i = i + 1
    # 改成自己的路径
    os.remove('D:/phone/' + phoneNumber + '.csv')
    print('耗时', datetime.now() - startTime)
    
# 将52亿数据按MD5的首位进行筛选 首位字符一共有16种可能:0~9,a~f phoneNumber是手机号段 如134
def sliverWithMD5(phoneNumber, i):
    # 改成自己的路径
    df = pd.read_csv('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv', sep=',')
    print(phoneNumber + '_' + str(i) + '.csv 开始时间===>', datetime.now())
    df.columns = ['data', 'md5']
    # for循环 循环值为 0~9 a~f
    for j in range(0, 16):
        # 拼接字符串
        md5 = str(hex(j))[2:]
        # 改成自己的路径
        df[df['md5'].str[:1] == md5].to_csv('E:/sliverByMD5/' + md5 + '.csv', index=False, mode='a', header=None)
    print(phoneNumber + '_' + str(i) + '.csv 结束时间===>', datetime.now())
    # 改成自己的路径
    os.remove('temp/sliver/' + phoneNumber + '_' + str(i) + '.csv')
    
# 按照MD5值进行排序后追加保存 i的值为:[0,16) str(hex(i))[2:]后为0~9,a~f
def sortByMD5(i):
    startTime = datetime.now()
    print(str(hex(i))[2:] + '.csv 开始时间===>', datetime.now())
    # 改成自己的路径
    df = pd.read_csv('E:/sliverByMD5' + str(hex(i))[2:] + '.csv', sep=',', header=None)
    df.columns = ['data', 'md5']
    # 根据MD5进行排序
    df.sort_values(by='md5', inplace=True)
    # 改成自己的路径
    df.to_csv('E:/final/' + str(hex(i))[2:] + '.csv', index=False, mode='a', header=None)
    print(str(hex(i))[2:] + '.csv 结束时间===>', datetime.now())
    print('耗时:', datetime.now() - startTime)
    
# 最终排序 value是MD5值的首字母
def sortByMD5Final(value):
    startTime = datetime.now()
    print(value + '.csv 开始时间===>', datetime.now())
    # 改成自己的路径
    df = pd.read_csv('E:/final/' + value + '.csv', sep=',', header=None)
    df.columns = ['data', 'md5']
    # 根据MD5进行排序
    df.sort_values(by='md5', inplace=True)
    # 改成自己的路径
    df.to_csv('E:/f/' + value + '.csv', index=False, mode='a', header=None)
    print(value + '.csv 结束时间===>', datetime.now())
    print('耗时:', datetime.now() - startTime)

# 分布运行,避免OOM,同时也可以监控数据是否紊乱
if __name__ == '__main__':
    # 第一步 分片
    pool = Pool(8)
    # 所有的手机号段 这是只是演示 实际使用时可以将52个手机号段分成几组依次跑
    phoneList = ["177","180","181","189","191","193","199"]
    # 先把一亿数据分成100个一百万小文件,一个号段一亿数据
    for phoneNumber in phoneList:
        pool.apply_async(sliver,args=(phoneNumber,))
    pool.close()
    pool.join()
    # 第二步 根据MD5值的首位字符对文件进行筛选
    pool = Pool(8)
	# 所有的手机号段 这是只是演示 实际使用时可以将52个手机号段分成几组依次跑
    phoneList = ["177","180","181","189","191","193","199"]
    for phoneNumber in phoneList:
    # 分片时 100W一个文件 一个号段一亿 所以100次循环
    for i in range(0, 100):
        pool.apply_async(sliverWithMD5, args=(phoneNumber, i))
    pool.close()
    pool.join()
    # 第三步 按照MD5值进行排序后追加保存
    pool = Pool(8)
    for i in range(0, 16):
        pool.apply_async(sortByMD5, args=(i,))
    pool.close()
    pool.join()
    # 第四步 对追加保存的文件进行排序
    # 这里不要使用多线程或者多进程 这里占用内存极大 应为将52/16亿的数据读到内存中进行排序了
    # 推荐内存48G+、64G+ 32G内存也能跑但记得把虚拟内存调大 实际内存占用大概在50G左右 , 跑完记得检查自己硬盘空间够不够
    for j in range(0, 16):
        # 拼接字符串
        md5 = str(hex(j))[2:]
        sortByMD5Final(md5)

3.将筛选排序后的手机和MD5值导入数据库

3.1 公共方法

# 创建数据库连接池
def connectionPool(connectionList):
    for i in range(0, 4):
        connectionList.append(
            pymysql.connect(host="127.0.0.1",
                            port=3306,
                            user="root",
                            password="root",
                            database="t_md5",
                            autocommit=False))
    return connectionList

# 从连接池获取连接
def getConnection(connectionList):
    # 从连接池中获取一个连接 如果没有连接可用 进程等待
    while True:
        if len(connectionList) > 0:
            return connectionList.pop()
        else:
            # 等待一秒再重新获取
            time.sleep(1)

# 释放连接到连接池
def releaseConnection(connection, connectionList):
    connectionList.append(connection)

# 生成表名数组
def generateTableNameList():
    tableNameList = []
    for i in range(0, 16):
        fileName = str(hex(i))[2:]
        # 经过计算 16个文件 每个文件有163个表 能够覆盖52亿数据
        for j in range(0, 163):
            tableName = 't_md5_' + fileName + '_' + str(j)
            tableNameList.append(tableName)
    return tableNameList

3.2 创建表

# 在数据库中创建表:
def createTableInDB(connectionList, tableName):
    connection = getConnection(connectionList)
    try:
        cursor = connection.cursor()
        # MD5值32位,手机号11位,MD5值为主键索引
        cursor.execute("CREATE TABLE IF NOT EXISTS " + tableName +
                       "(`md5` char(32) PRIMARY KEY NOT NULL, `data` char(11) NOT NULL)")
        connection.commit()
        print('创建表:' + tableName + '成功')
        cursor.close()
        releaseConnection(connection, connectionList)
    except Exception as e:
        print('创建表:' + tableName + '失败')
        print(e)
        releaseConnection(connection, connectionList)

if __name__ == '__main__':
    connectionList = []
    connectionPool(connectionList)
    tableNameList = generateTableNameList()
    # 根据个人需求选择是否开启多线程、多进程
    for tableName in tableNameList:
        createTableInDB(connectionList, tableName)

3.3 插入数据

# 批量插入
def batchInsert(connection, connectionList, data, tableName):
    cursor = connection.cursor()
    sql = "INSERT INTO " + 't_md5_' + tableName + "(`data`, `md5`) VALUES(%s, %s)"
    try:
        cursor.executemany(sql, data)
    except Exception as e:
        connection.rollback()
        releaseConnection(connection, connectionList)
        print(e)
    connection.commit()
    releaseConnection(connection, connectionList)
    
if __name__ == '__main__':
    connectionList = []
    # 创建连接池
    connectionPool(connectionList)
    # 0~9,a~f 一共16个文件
    for i in range(0, 16):
        fileName = str(hex(i))[2:]
        startTime = datetime.now()
        print('开始处理文件:' + fileName + '.csv,开始时间', startTime)
        # 批量读取 每次读取两百万条 存到一张表中
        df = pd.read_csv('E:/final/' + fileName + '.csv', chunksize=2000000)
        j = 0
        for chunk in df:
            connection = getConnection(connectionList)
            tableName = fileName + '_' + str(j)
            tableStartTime = datetime.now()
            print('开始处理表:t_md5_' + tableName)
            batchInsert(connection, connectionList, chunk.values.tolist(), tableName)
            j += 1
            print('处理表:t_md5_' + tableName + '完成,' + '耗时:' + str(datetime.now() - tableStartTime) )
            print('-----------------------------------------------------------------------')
        print('处理文件:' + fileName + ',结束时间', datetime.now())
        print('处理文件:' + fileName + ',耗时', datetime.now() - startTime)

3.4 查询每张表最后一条数据,检查插入是否准确

# 批量查询每张表最后一条的MD5值
def getLastMD5List(connectionList, tableNameList):
    lastMD5List = []
    for tableName in tableNameList:
        connection = getConnection(connectionList)
        lastMD5 = getLastMD5(connection, connectionList, tableName)
        lastMD5List.append(lastMD5)
    return lastMD5List

# 获取指定表的最后一条数据
def getLastMD5(connection, connectionList, tableName):
    cursor = connection.cursor()
    sql = "SELECT md5 FROM " + tableName + " ORDER BY md5 DESC LIMIT 1"
    try:
        cursor.execute(sql)
        result = cursor.fetchone()
        if result is None:
            print('表:' + tableName + '没有数据')
            return None
        else:
            return result[0]
    except Exception as e:
        print('查询表:' + tableName + '最后一条MD5失败')
        print(e)
    finally:
        releaseConnection(connection, connectionList)

# 这个检查其实没有多大意义 主要还是为了下一步的查询做准备
# 在能保证每张表都插入200W数据的前提下,直接看最后一张表的数据量就知道有没有漏了
if __name__ == '__main__':
    # 创建数据库连接池
	connectionList = []
    connectionPool(connectionList)
    # 获取所有表名
    tableNameList = generateTableNameList()
    # 根据自己的需求决定是否开启多线程、多进程
    # 默认不开启
    result = getLastMD5List(connectionList, tableNameList)
    print(result)

3.5 完整代码

数据库名为:t_md5

表名格式为:t_md5_x_y, x为016的16进制值, y为0163

import time
from datetime import datetime

from multiprocessing import Pool
import pandas as pd
import pymysql

pymysql.install_as_MySQLdb()

# 创建数据库连接池
def connectionPool(connectionList):
    for i in range(0, 4):
        connectionList.append(
            pymysql.connect(host="127.0.0.1",
                            port=3306,
                            user="root",
                            password="root",
                            database="t_md5",
                            autocommit=False))
    return connectionList


# 从连接池获取连接
def getConnection(connectionList):
    # 从连接池中获取一个连接 如果没有连接可用 线程等待
    while True:
        if len(connectionList) > 0:
            return connectionList.pop()
        else:
            # 等待一秒再重新获取
            time.sleep(1)


# 释放连接到连接池
def releaseConnection(connection, connectionList):
    # 释放连接并通知等待线程
    connectionList.append(connection)

    
# 生成表名列表
def generateTableNameList():
    tableNameList = []
    for i in range(0, 16):
        fileName = str(hex(i))[2:]
        # 经过计算 16个文件 每个文件有163个表 能够覆盖52亿数据
        for j in range(0, 163):
            tableName = 't_md5_' + fileName + '_' + str(j)
            tableNameList.append(tableName)
    return tableNameList


# 在数据库中创建表:
def createTableInDB(connectionList, tableName):
    connection = getConnection(connectionList)
    try:
        cursor = connection.cursor()
        cursor.execute("CREATE TABLE IF NOT EXISTS " + tableName +
                       "(`md5` char(32) PRIMARY KEY NOT NULL, `data` char(11) NOT NULL)")
        connection.commit()
        print('创建表:' + tableName + '成功')
        cursor.close()
        releaseConnection(connection, connectionList)
    except Exception as e:
        print('创建表:' + tableName + '失败')
        print(e)
        releaseConnection(connection, connectionList)


def start():
    connectionList = []
    # 创建连接池
    connectionPool(connectionList)
    # 0~9,a~f 一共16个文件
    for i in range(0, 16):
        fileName = str(hex(i))[2:]
        startTime = datetime.now()
        print('开始处理文件:' + fileName + ',开始时间', startTime)
        # 批量读取 每次读取两百万条 存到一张表中
        df = pd.read_csv('E:/final/' + fileName + '.csv', chunksize=2000000)
        i = 0
        for chunk in df:
            connection = getConnection(connectionList)
            tableName = fileName + '_' + str(i)
            tableStartTime = datetime.now()
            print('开始处理表:' + tableName)
            batchInsert(connection, connectionList, chunk.values.tolist(), tableName)
            i += 1
            print('处理表:' + tableName + '完成,' + '耗时:' + str(datetime.now() - tableStartTime) )
            print('-----------------------------------------------------------------------')
        print('处理文件:' + fileName + ',结束时间', datetime.now())
        print('处理文件:' + fileName + ',耗时', datetime.now() - startTime)

        
if __name__ == '__main__':
    # 第一步创建表
    connectionList = []
    connectionPool(connectionList)
    tableNameList = generateTableNameList()
    pool = Pool(8)
    for tableName in tableNameList:
		# 开启多进程创建表
		pool.apply_async(createTableInDB, args=(connectionList, tableName))
    # 第二步 批量插入数据
   start()

4 运行耗时

博主采用的是分组对手机号段进行分片、排序,最终一起导入数据库的方案

由于写这篇博客时已经在导入数据库阶段了,所以就只截图了导入数据库耗时。但其他步骤耗时大约都在十几分钟左右一组(一组9个手机号段,固态硬盘的前提下)

导入数据库耗时:

⭐说明:博主数据库使用的是机械硬盘,所以插入速度慢(硬件瓶颈),硬盘写入速度在3m/s到9m/s,如果使用固态硬盘速度将大幅提升写入速度起码在200m/s+

image-20240902161941693

5 根据手机号MD5值查询手机号

略,提供思路。此时以及完成数据库的导入。数据库中有16个类型,每个类型163张表。

根据要查询的MD5首位,可以确定要查询的手机号在哪一个类型的163表中。此时查询范围缩小到163张表了。

那么如何在这163张表中找出需求数据所在表呢?还记得将数据入库的方式嘛?按照排序入库,每张表200万数据。

我们只需要将每张表的最后一条数据的MD5值提取出来存起来(list、set、数组都可以)。

一个简单的查询算法:二分查找,查询时将要查询的MD5值与提取出来的MD5值进行比较(equals),查找出目标MD5值在那两个值中间。