TensorFlow自学第3篇——异步文件

没有经过实践检验的理论,不管它多么漂亮,都会失去分量,不会为人所承认;没有以有分量的理论作基础的实践一定会遭到失败(门捷列夫)。

看视频——记笔记——翻书查漏补缺,这种方式到目前看来,是经过实践检验的,正确的学习方式。独自学习不易,且学且行。

线程与队列

在使用TensorFlow进行异步计算时,队列是一种强大的机制。
一个简单的例子。先创建一个“先入先出”的队列(FIFOQueue),并将其内部所有元素初始化为零。然后,构建一个TensorFlow图,它从队列前端取走一个元素,加上1之后,放回队列的后端。慢慢地,队列的元素的值就会增加。

TensorFlow提供了两个类来帮助多线程的实现:tf.Coordinator和 tf.QueueRunner。Coordinator类可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常,QueueRunner类用来协调多个工作线程同时将多个张量推入同一个队列中。

tf.QueueRunner

QueueRunner类会创建一组线程, 这些线程可以重复的执行Enquene操作, 他们使用同一个Coordinator来处理线程同步终止。此外,一个QueueRunner会运行一个closer thread,当Coordinator收到异常报告时,这个closer thread会自动关闭队列。

您可以使用一个queue runner,来实现上述结构。 首先建立一个TensorFlow图表,这个图表使用队列来输入样本。增加处理样本并将样本推入队列中的操作。增加training操作来移除队列中的样本。

tf.Coordinator

Coordinator类用来帮助多个线程协同工作,多个线程同步终止。 其主要方法有:
should_stop():如果线程应该停止则返回True。
request_stop(): 请求该线程停止。
join():等待被指定的线程终止。
首先创建一个Coordinator对象,然后建立一些使用Coordinator对象的线程。这些线程通常一直循环运行,一直到should_stop()返回True时停止。 任何线程都可以决定计算什么时候应该停止。它只需要调用request_stop(),同时其他线程的should_stop()将会返回True,然后都停下来。

举个栗子

"""
CPU负责TensorFlow的计算,IO负责读取文件
由于速度上的差异,通常做法是:主线程进行模型训练,子线程读取数据,二者通过队列进行数据传输
相当于主线程从队列读数据,子进程往队列放数据
"""

import tensorflow as tf
import os
# 忽略不必要的警告信息
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'


# 一、模拟同步,先处理数据,然后取数据训练

# 1、首先定义队列
Q1 = tf.FIFOQueue(3, tf.float32)
# 放入一些数据
# 此处不能直接传入[0.1,0.2,0.3],因为对于tf来说,接收的一切值都是tensor张量
# 但是此处需要传入的是列表,所以改为[[0.1, 0.2, 0.3], ]
enq_many1 = Q1.enqueue_many([[0.1, 0.2, 0.3], ])
# 2、定义一些处理数据的逻辑,取数据,+1,入队列
out_q1 = Q1.dequeue()
data1 = out_q1 + 1
en_q1 = Q1.enqueue(data1)

with tf.Session() as sess1:
    # 初始化队列
    sess1.run(enq_many1)
    # 处理数据
    for i in range(100):
        sess1.run(en_q1) # TensorFlow中,计算有依赖性
    # 训练数据
    for i in range(Q1.size().eval()):
        print(sess1.run(Q1.dequeue()))

# --------------------------------分割线---------------------------------

# 二、模拟异步,子线程存入样本,主线程读取样本

# 1、定义一个队列,1000
Q2 = tf.FIFOQueue(1000, tf.float32)
# 2、定义要做的事情,循环,+1,放队列
var2 = tf.Variable(0.0)
# 实现自增op
data2 = tf.assign_add(var2, tf.constant(1.0))
en_q2 = Q2.enqueue(data2)
# 3、定义队列管理器op,指定多少个子线程,子线程该干什么事
qr2 = tf.train.QueueRunner(Q2, enqueue_ops=[en_q2] * 2)
# 初始化变量op
init_op2 = tf.global_variables_initializer()

with tf.Session() as sess2:
    # 初始化变量
    sess2.run(init_op2)
    # 开启线程协调器
    coord = tf.train.Coordinator()
    # 开启子线程
    threads = qr2.create_threads(sess2, coord=coord, start=True)
    # 主线程读取数据,等待训练
    for i in range(300):
        print(sess2.run(Q2.dequeue()))

    # 回收线程
    coord.request_stop()
    coord.join(threads)


# --------------------------------分割线---------------------------------


# 三、文件读取

# 1、构造一个文件队列
# 2、构造文件阅读器,读取队列一个样本的内容,解码
# 3、批处理
# 4、主线程取样本数据训练
# TensorFlow默认一次读取一个样本,即对CSV文件只读一行,二进制位文件只读一个样本的字节数,图片文件读取一张

def csvread(filelist):
    """
    读取CSV文件
    :param filelist: 文件路径+文件名的列表
    :return: 读取的内容
    """
    # 1、构造文件队列
    file_queue = tf.train.string_input_producer(filelist)
    # 2、构造CSV阅读器,读取队列,默认以行读取
    reader = tf.TextLineReader()
    key, value = reader.read(file_queue)
    print(value)
    # 3、对每行内容解码
    records = [["None"],["None"]]
    # record_defaults指定每一个样本的每一列的类型,指定默认值[["None"],[4.0]]
    example, label = tf.decode_csv(value, record_defaults=records)
    print(example, label)
    # 读取多个数据,批处理
    example_batch, label_batch = tf.train.batch([example, label], batch_size=9, num_threads=1, capacity=9)
    return example_batch, label_batch


if __name__=="__main__":

    # 1、找到文件,放入列表
    filename = os.listdir("./csvdata/")
    print(filename)
    filelist = [os.path.join("./csvdata/", file) for file in filename]
    print(filelist)

    example_batch, label_batch = csvread(filelist)

    with tf.Session() as sess:
        # 定义线程协调器
        coord = tf.train.Coordinator()
        # 开启读取文件的线程
        threads = tf.train.start_queue_runners(sess, coord=coord)
        # 打印读取内容
        print(sess.run([example_batch, label_batch]))

        # 回收子线程
        coord.request_stop()
        coord.join(threads)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容