pandas使用小结及时间转换及多进程池及多线程池

1去重

1.1unique与drop_duplicates区别

1.unique只能应用于series或者list,或者dataframe中的一列,不能作用于一整个dataframe,返回的是去重后的数据

s = Series(['a','b','a','c','b'])
s.unique()

2.nunique用于统计dataframe中每列的不同值的个数,也可用于series,但不能用于list.返回的是每列不同值的个数.可与groupby结合,统计每个块的不同值的个数。

data.nunique()
data['time'].nunique()
all_user_repay = all_user_repay.groupby(['user_id'])['listing_id'].agg(['nunique']).reset_index()

3.drop_duplicates既可用于series也可用于dataframe

data.duplicated() # 返回一个布尔型,表示各行是否重复
s.drop_duplicates() # 会过滤掉重复的行
df = DataFrame({'水果':['苹果','草莓','苹果'],
           '价格':[3,9,3],
           '数量':[5,6,5]})
df.drop_duplicates()
data.drop_duplicates(['k1']) # 对dataframe根据k1列过滤重复项

2python中时间戳与dataframe的转换

# datetime转时间戳
time.mktime(time_data.timetuple())
# 时间戳转datetime
# 转utc时间
timeStamp = 1381419600
dateArray = datetime.datetime.utcfromtimestamp(timeStamp)
# 转本地时间
dateArray = datetime.datetime.fromtimestamp(timeStamp)
# datetime转字符串
now = datetime.datetime.now()
otherStyleTime = now.strftime("%Y-%m-%d %H:%M:%S")
# 字符串转datetime
datetime.datetime.strptime(otherStyleTime,"%Y-%m-%d %H:%M:%S")
# pandas中字符串时间转换为datetime类型
data['end_time'] = pd.to_datetime(data['end_time'], format="%Y-%m-%d %H:%M:%S")

# 将字符型时间转日期型时间在转时间戳 然后对时间戳按每5分钟取整,然后时间戳转日期型时间
data['DataTime'] = data['DataTime'].map(lambda x: datetime.datetime.fromtimestamp
                           ((time.mktime(datetime.datetime.strptime
                           (x, '%Y-%m-%d %H:%M:%S').
                            timetuple())//300)*300)
                              + datetime.timedelta(minutes=5))

3面元划分及据类

3.1面元划分

# 定义生成dataframe的函数
def get_pd(num=10):
data_list = []
for _ in range(num):
    st_time = datetime.datetime(2019,7,randint(1,31), randint(0,23),randint(0,59),randint(0,59))
    en_time = st_time + datetime.timedelta(minutes=randint(10,59))
    data = [st_time,en_time,randint(100,500),randint(1000,2000)]
    data_list.append(data)
df = pd.DataFrame(data=data_list,columns=['start','end','lng','lat'])
return df

# 进行面元划分    
data = get_pd(100)
data = data.sort_values('start') # 排序
data.reset_index(drop=True,inplace=True) # 重设排序后的索引
# 将时间从开始到结束按每15分钟划分一个间隔
cu=pd.date_range(data['start'][0],data['start'][99],freq='15Min')
# 用间隔对数据的start列进行划分
qu = pd.cut(data.start, cu, right=False)
# 统计每列的个数,结果会自动倒序对个数进行排序
qu.value_counts()

pd.date_range(start, end, freq) 生成一个时间段
freq参数由英文(M D H Min 。。。)、英文数字结合。D表示一天,M表示一月如20D表示20天,5M表示5个月
pd.bdate_range(end,periods,freq) 根据end时间点开始,以freq为单位,向前生成周期为period的时间序列
pd.bdate_range(start,periods,freq) 根据start时间点开始,以freq为单位,向后生成周期为period的时间序列
pd.bdate_range(end='20180101',periods=5,freq='D') # 向前5天,之间间隔一天
pd.bdate_range(start='20180101',periods=5,freq='D')# 向后5天,之间间隔一天

3.2聚类

# datetime转时间戳
# a = time.mktime(data['start'][99].timetuple())
data['time'] = data['start'].map(lambda x:time.mktime(x.timetuple()))
# 利用转换后的时间戳对900整除(即间隔15分钟)
data['time'] = data['time'].map(lambda x:x//(60*15))
# 对整个数据按处理后的时间(每隔15分钟)进行分组
data.groupby(data['time']).count()
# 对数据的lng列按照处理后的时间进行分组并对统计结果进行倒序排序
data['lng'].groupby(data['time']).count().sort_values(ascending=False)

4多进程与多线程

4.1多线程

# 导入线程池
from multiprocessing.dummy import Pool as threadpool

# 创建20个线程池
pool = threadpool(20)
for detail_data in data:
     # 使用异步形式启动线程池,并传入一个函数和参数
    pool.apply_async(get_detail_json_file, (detail_header,))
# 关闭线程池
pool.close()
# 等待所有线程执行完毕,join必须位于close后
pool.join()

4.2多进程

def job(x):
    return x*x
def work(x):
    print('执行函数完毕')

# 使用进程池
from multiprocessing import Pool
pool = Pool(10)
for i in range(1,7):
    pool.apply_async(job,(i,))
pool.close()
pool.join()

# 有返回值的多进程使用
from multiprocessing import Pool
pool = Pool(10)
# res为返回值组成的列表
res = pool.map(job, range(10))

# 有返回值的多进程的使用
from multiprocessing import Pool
pool = Pool(os.cpu_count())
result = []
for i in range(3):
    result.append(pool.apply_async(job, (i, )))
pool.close()
pool.join()
data = [i.get() for i in result]  # i.get()提取返回值

多线程计算时,若不同线程直接进行数据交流,只需要把变量设置为全局即可。

利用进程池中的Queue完成多进程中的通讯

import multiprocessing
def write(q):
    print("write启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in "python":
        q.put(i)
def read(q):
    print("read启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("read从Queue获取到消息:%s" % q.get(True))

q = multiprocessing.Manager().Queue()
po = multiprocessing.Pool()
po.apply_async(write, args=(q,))
po.apply_async(read, args=(q,))
po.close()
po.join()

5条件筛选赋值

从DF1中选取第A列值为b的行,使他们的第C列的值为D
DF1.loc[DF1['A']=='b','C'] = 'D'

a.loc[a['id']==1595,'row'] 
a.loc[a['id']==1595,['row','col']]
# 以下两句查找结果相同
a[(a['row']==30)&(a['col']==1)]['id']
a.loc[(a['row']==30)&(a['col']==1), 'id']
# 以下写法会报错
a.loc[(a['bottom_lng']<=lng<a['top_lng'])&(a['bottom_lat']<=lat<a['top_lat']), 'id']
# 该写法正确,找出给定经纬度lng,lat在a这个dataframe中的哪个网格中
a.loc[(a['bottom_lng']<=lng)&(lng<a['top_lng'])&(a['bottom_lat']<=lat)& (lat<a['top_lat']), 'id']

5.1利用apply对多列进行函数操作(按行进行映射)

apply默认是对列进行操作,故默认是axis=0,如果要通过行对多列操作要设置axis=1
要对DataFrame的多个列同时进行运算,可以使用apply,例如col3 = col1 + 2 * col2

# 例一
df['col3'] = df.apply(lambda x: x['col1'] + 2 * x['col2'], axis=1)
# 例二
df = pd.DataFrame ({'a' : np.random.randn(6),
         'b' : ['foo', 'bar'] * 3,
         'c' : np.random.randn(6)})
def my_test(a, b):
    return a + b
df['Value'] = df.apply(lambda row: my_test(row['a'], row['c']), axis=1)

例三,通过映射第一个dataframe(data)中的每行,挑选出start_lng和start_lat两列,去另外一个dataframe(json_data)中查找出符合要求的行并返回这一行的id列的数据

data.apply(lambda x: json_data.loc[(json_data['bottom_lng']
    <=x['start_lng'])&(x['start_lng']<json_data['top_lng'])&
    (json_data['bottom_lat']<=x['start_lat'])&(x['start_lat']
    <json_data['top_lat']), 'id'], axis=1)

# 利用numpy创建一个全为0的矩阵
df = pd.DataFrame(data=np.zeros((743,8),dtype=np.int32), columns=['N','NE','E','SE','S','SW','W','NW'])

6从多个文件中获取数据并合并成一个文件

import datetime
import pandas as pd
path ='/home/glzt/桌面/滴滴数据/file'
def get_all_data():
    data = None
    for file in os.listdir('/home/glzt/桌面/滴滴数据/file'):
        data = read_file(data, file)
    data = data.sort_values(by=['start_time', 'end_time'])
    print(data)
    print(data.iloc[0]['start_time'], data.iloc[-1]['start_time'])
    print(data.iloc[0]['end_time'], data.iloc[-1]['end_time'])
    print(len(data))
    return data

def read_file(all_data, file_path):
    # file_path = '/home/glzt/桌面/滴滴数据/file/order_20161104'
    file_path = '/home/glzt/桌面/滴滴数据/file/' + file_path
    data = pd.read_csv(file_path)
    data.columns = ['order', 'start_time', 'end_time', 'start_lng', 'start_lat', 'end_lng', 'end_lat']
    del data['order']
    data['start_time'] = data['start_time'].map(lambda x:datetime.datetime.fromtimestamp(x))
    data['end_time'] = data['end_time'].map(lambda x: datetime.datetime.fromtimestamp(x))
    if all_data is not None:
        data = pd.concat([all_data, data])
    else:
        data = data
    return data

7loc数据筛选

grids = pd.read_sql(sql, engine)
points = pd.DataFrame(data=points_list, columns=['lng', 'lat'])
# 必须在后面接to_numpy(),否则结果是一个n*n的矩阵,而不是1*n的series
# to_numpy()后接不接[0]看情况,如果有可能不是每个都有值,就不能接,否则会报错,如果每个都有值,就可以接
points['id'] = points.apply(lambda x: grids.loc[(grids['bottom_lat'] <= x['lat']) & (grids['top_lat'] >= x['lat']) & (grids['bottom_lng'] <= x['lng']) & (grids['top_lng'] >= x['lng']), 'id'].to_numpy(), axis=1)
print(points)
points['id'] = points.apply(lambda x: grids.loc[(grids['bottom_lat'] <= x['lat']) & (grids['top_lat'] >= x['lat']) & (grids['bottom_lng'] <= x['lng']) & (grids['top_lng'] >= x['lng']), 'id'].to_numpy()[0], axis=1)
print(points)

8改列的名字及存为json文件

data.sort_values(by=['col', 'row'], inplace=True)
data['grid_id'] = data.apply(lambda x: 140*(x['col'] - 1) + x['row'], axis=1)
data.rename(columns={'col': 'column'}, inplace=True)
# orient选择records,则存为常见的列表套字典的数据形式
data.to_json(os.path.join(DATA_DIR, 'grids.json'), orient='records')
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,194评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,058评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,780评论 0 346
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,388评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,430评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,764评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,907评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,679评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,122评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,459评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,605评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,270评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,867评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,734评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,961评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,297评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,472评论 2 348

推荐阅读更多精彩内容