Press "Enter" to skip to content

随机化非常大的数据集

考虑一个数据集的问题,它太大以至于无法放入内存。本文描述了如何在Python中轻松且(相对)快速地进行这样的随机化操作。

现在,发现以吉字节或甚至太字节为单位衡量数据集的情况并不罕见。这么多的数据可以在训练过程中极大地帮助创建强大的机器学习模型。但是如何随机化这样大的数据集呢?

Photo by Jess Bailey on Unsplash

假设你有一个非常大的数据集,每行一个条目存储在文件中。对于我们的目标来说,数据的细节并不重要。数据集可以是以逗号分隔的值(CSV)或制表符分隔的值(TSV)文件的行,或者每行可以是JSON对象,或者是大型点云中点的X、Y、Z值的列表。我们只需要保证数据集的格式是每行一个条目。

对于包含较小数据集的文件,可以使用一个简单的Python函数在内存中随机化文件(称为“洗牌”):

import randomdef shuffle_in_memory(filename_in, filename_out):    # 按行洗牌文件    with open(filename_in) as fp:        lines = fp.readlines()    # 随机化它们:    random.shuffle(lines)    # 将新顺序写出:    with open(filename_out, "w") as fp:        fp.writelines(lines)

shuffle_in_memory()函数接受输入文件名和输出文件名,使用内置的random.shuffle()函数在内存中洗牌文件的行,并将随机化的数据写出。正如其名字所暗示的,此函数要求将文件的所有行一次性加载到内存中。

为了测试这个函数,让我们创建一些测试文件。函数make_file()接受你想要在测试文件中的行数作为参数。该函数将创建文件并返回文件名。

import osdef make_file(lines):    filename = "test-%s.txt" % lines    print("正在创建测试文件 '%s'..." % filename)    with open(filename, "w") as fp:        for i in range(lines):            fp.write(f"行 {i}\n")    print("完成!")    return filename

例如,要创建一个名为“test-1000.txt”的文件,其中包含100行,可以这样做:

filename_in = make_file(1000)

运行这个函数后,你应该在当前目录中找到名为“test-1000.txt”的文件,其中包含1,000行文本,如下所示:

行 0行 1行 2行 3行 4行 5行 6行 7行 8行 9...

为了测试我们的shuffle_in_memory()函数,我们将给输出文件命名,并将字符串保存在变量filename_out中,然后调用该函数:

filename_out = "test-randomized-1000.txt"shuffle_in_memory(filename_in, filename_out)

现在,你的目录中应该有第二个名为“test-randomized-1000.txt”的文件。它的大小应与“test-1000.txt”完全相同,行数也完全相同,只是顺序是随机的:

行 110行 592行 887行 366行 52行 22行 891行 83行 931行 408...

好了,现在是个大问题:如果我们有一个非常大的文件怎么办?让我们创建一个VoAGI大小的文件,比如说,有1千万行。(对于大多数计算机来说,这个大小仍然足够小,可以在内存中随机化,但是它足够大以进行实践。)与之前一样,我们使用调用make_file()来创建输入文件:

filename_in_big = make_file(10_000_000)

这将需要几秒钟的时间。之后,您的目录中应该有一个名为“test-10000000.txt”的文件。它应该与之前一样,但是将有1000万行。该文件大约为128 MB。

如何随机化它?如果我们不想使用全部的RAM,或者我们没有足够的RAM,我们可以使用硬盘。下面是一个基于类似问题(排序)的递归算法。下面的函数shuffle()基于归并排序算法。

首先,它检查文件是否足够小以在内存中进行洗牌(递归函数中的基本情况)。参数memory_limit以字节为单位给出。如果文件大小小于memory_limit,则将在内存中进行洗牌。如果太大,则将其随机分割为若干较小的文件,并递归进行洗牌。最后,将较小的洗牌文件的内容合并在一起。

下面是该函数:

import tempfiledef shuffle(filename_in, filename_out, memory_limit, file_split_count,             depth=0, debug=False):    if os.path.getsize(filename_in) < memory_limit:        if debug: print(" " * depth, f"Level {depth + 1}",            "内存中洗牌...")        shuffle_in_memory(filename_in, filename_out)    else:        if debug: print(            " " * depth, f"Level {depth + 1}",            f"{os.path.getsize(filename_in)}太大;",            f"拆分为{file_split_count}个文件..."        )        # 将大文件拆分为较小的文件        temp_files = [tempfile.NamedTemporaryFile('w+', delete=False)                      for i in range(file_split_count)]        for line in open(filename_in):            random_index = random.randint(0, len(temp_files) - 1)            temp_files[random_index].write(line)        # 现在我们对每个较小的文件进行洗牌        for temp_file in temp_files:            temp_file.close()            shuffle(temp_file.name, temp_file.name, memory_limit,                     file_split_count, depth+1, debug)        # 并在原始文件的位置上合并        if debug: print(" " * depth, f"Level {depth + 1}",             "合并文件...")        merge_files(temp_files, filename_out)

如果这是一个排序算法,我们会以一种小心的方式将文件合并在一起,以创建一个排序的顺序。然而,对于洗牌,我们不关心以特定顺序合并它们,因为我们希望它们是随机的。因此,merge_files()函数如下所示:

def merge_files(temp_files, filename_out):    with open(filename_out, "w") as fp_out:        for temp_file in temp_files:            with open(temp_file.name) as fp:                line = fp.readline()                while line:                    fp_out.write(line)                    line = fp.readline()

请注意,我们小心地不要一次将文件的所有行都读入内存中。让我们通过将内存洗牌的限制设置为与文件大小完全相同来测试这个函数。由于文件大小不小于128,888,890,它将被分成若干较小的文件。对于这个例子,让我们将大文件分成2个文件,每个文件都足够小以在内存中洗牌:

filename_out_big = "test-randomized-10000000.txt"shuffle(filename_in_big, filename_out_big, 128_888_890, 2, debug=True)

此调用的结果如下:

 Level 1 128888890太大; 拆分为2个文件...  Level 2 内存中洗牌...  Level 2 内存中洗牌... Level 1 合并文件...

生成的“test-randomized-10000000.txt”文件的内容应该有1000万行,并且是随机的。一个更好的测试是将所需的内存减小到远小于要随机化的文件的大小,并将太大的文件分成超过2个的更多文件。假设我们只想使用约1 MB的RAM,并将文件分成20个较小的文件:

shuffle(filename_in_big, filename_out_big, 1_000_000, 20, debug=True)

这个例子将不超过1MB的RAM,并递归地分解大于该大小的子文件,每次处理20个。

该算法适用于任何大小的文件(好吧,你需要足够的磁盘空间!)。您在shuffle_in_memory()中分配的内存越多,它运行得越快。如果较小文件的数量太多,那么打开和关闭文件的时间将太长。您可以尝试不同的memory_limit值,但是我在20到200之间运行良好。初始文件越大,您可能需要更多的子文件。

还有其他算法可以使用。我对将所有行写入SQLite数据库,并以随机顺序SELECT它们抱有很大希望,但速度不比上述代码快。

import sqlite3def shuffle_sql(filename_in, filename_out, memory_limit, depth=0, debug=False):    if os.path.getsize(filename_in) < memory_limit:        if debug: print(" " * depth, f"Level {depth + 1}",            "在内存中洗牌...")        shuffle_in_memory(filename_in, filename_out)    else:        if debug: print(            " " * depth, f"Level {depth + 1}",            f"{os.path.getsize(filename_in)}太大;",            f"写入SQLite数据库..."        )        temp_db = tempfile.NamedTemporaryFile(delete=False)        connection = sqlite3.connect(temp_db.name)        cursor = connection.cursor()        cursor.execute("""            CREATE TABLE IF NOT EXISTS lines (                line TEXT            );        """)        with open(filename_in) as fp:            line = fp.readline()            while line:                cursor.execute("INSERT INTO lines (line) VALUES (?);", [line])                line = fp.readline()            connection.commit()        with open(filename_out, "w") as fp:          for line in cursor.execute("""              SELECT line FROM lines ORDER BY random();              """):              fp.write(line[0])

shuffle_sql(filename_in_big, filename_out_big, 1_000_000, debug=True)

你能在纯Python中击败递归洗牌算法吗?如果可以,我很乐意听到你的想法!

对人工智能、机器学习和数据科学感兴趣吗?考虑点赞和关注。让我知道你对什么感兴趣!

Leave a Reply

Your email address will not be published. Required fields are marked *