普通线程池
def test_rpush_redis(msg):
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, db=0, max_connections=10)
redis.StrictRedis(connection_pool=pool).rpush("log-msg", msg)
for i in range(10):
start_time = time.time()
test_rpush_redis(msg)
print(time.time() - start_time
平均写入1条/4s
采用信号量+批量方式
q = deque()
queue_signal = signal("queue_signal")
cycle_num = 0
@queue_signal.connect
def rpush_data(data_list):
try:
msg_key = "log-msg"
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, db=0, max_connections=10)
r = redis.StrictRedis(connection_pool=pool)
[r.rpush(msg_key, i) for i in data_list]
except Exception as e:
log('error', str(e))
def is_list_max(list_max=3000):
if q.__len__() >= list_max:
start_time = time.time()
data_list = [q.pop() for i in range(list_max-1)]
# print(data_list)
# rpush_data(data_list)
queue_signal.send(data_list)
print(time.time() - start_time)
# else:
# global cycle_num
# cycle_num += 1
# print(cycle_num)