使用 Pandas 操作 Hive | MangoDB | Redis | HBase|Memcache |HDFS |Spark

image.png

使用 python 操作 hadoop 好像只有 少量的功能,使用python 操作 hive 其实还有一个hiveserver 的一个包,不过 看这个 pyhive 应该是比较好用的。

安装依赖

pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

操作

from pyhive import hive
conn = hive.Connection(host='xxxx', port=10000, username='xxx', database='default')
cursor.execute('select * from url_log limit 10')
for result in cursor.fetchall():
    print result
##真实 内网 测试
from pyhive import hive
conn = hive.Connection(host='172.16.16.32', port=10000, username='zhuzheng',auth='LDAP',password="abc123." ,database='fkdb')
cursor=conn.cursor()
cursor.execute('select * from fkdb.tab_client_label limit 10')
for result in cursor.fetchall():
    print(result)
###如果 hive 有账号密码  你需要  写上,如果   hive 不在 同一台机器 也要写明  ip  和port,
###授权模式 需要选择合适的,我这里使用的上 LDAP, 数据库呢 ,你 需要连接你自己 正确的。
####其实在捣鼓是 各种报错,有账号密码 写错 和 授权模式错误  数据库不存在 ,thift 报错 等的,整的人心 烦躁

from impala.dbapi import connect
#需要注意的是这里的auth_mechanism必须有,但database不必须
conn = connect(host='127.0.0.1', port=10000, database='default', auth_mechanism='PLAIN')
cur = conn.cursor()

cur.execute('SHOW DATABASES')
print(cur.fetchall())

cur.execute('SHOW Tables')
print(cur.fetchall())

使用 impala的 python客户端连接,我自己 测试 到现在还没有成功

参考
https://blog.csdn.net/Gamer_gyt/article/details/52564335
impala python 依赖的thrift 版本有问题 ,
thrift-sasl==0.2.1
pip uninstall thrift

pip uninstall impyla

pip install thrift==0.9.3

pip install impyla==0.13.8

https://blog.csdn.net/kkevinyang/article/details/79273106
https://github.com/cloudera/impyla/issues/268
https://github.com/cloudera/impyla/issues/234
http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Python-Error-TSaslClientTransport-object-has-no-attribute-trans/td-p/58033


from pyhive import hive
conn = hive.Connection(host="YOUR_HIVE_HOST", port=PORT, username="YOU")

cursor = conn.cursor()
cursor.execute("SELECT cool_stuff FROM hive_table")
for result in cursor.fetchall():
  use_result(result)

  import pandas as pd

  df = pd.read_sql("SELECT cool_stuff FROM hive_table", conn)
from pyhive import hive
import pandas as pd

def getData():
    conn = hive.Connection(host="1.0.1.38", auth="CUSTOM", username='hive', password="pvXxHTsdqrt8", port=10000, database='tapro_atg')
    df = pd.read_sql("select * from sales_data_leisure_view", conn)
    records = df.head(n=100000)
    print(records.to_json(orient='records'))

getData();
import pandas as pd
from pyhive import hive

conn = hive.connect('192.168.72.135')
cursor = conn.cursor()
sql = "select * from t2 where city='Shanghai'"
cursor.execute(sql)
res = cursor.fetchall()
df = pd.DataFrame(res, columns=['id', 'name', 'year', 'city'])

df1 = pd.read_sql(sql, conn, chunksize=3)
for chunk in df1:
    print(chunk)

# -*- coding:utf-8 -*-
import pandas as pd
from pyhive import hive
import time
import datetime
import os


def rfail(s, file_path):
    with open(file_path, "a+") as f:
        f.write(s + "\n")


def read_query(sql):
    hive_line = '''hive -e "set hive.cli.print.header=true;set mapreduce.job.queuename=hl_report;%s";''' % (sql)
    data_buffer = os.popen(hive_line)
    data = pd.read_table(data_buffer, sep="\t", chunksize=10000)
    return data


def get_from_hive(query, mode, engine_hive):
    #engine_hive = hive.Connection(host="xxxxx", port=10000, username="xxxx")
    if mode == "pyhive":
        data = pd.read_sql(query, engine_hive)
        return data
    elif mode == "raw":
        data = read_query(query)
        return data
    else:
        print("mode: pyhive or raw")
        return None


def gen_date(bdate, days):
    day = datetime.timedelta(days=1)
    for i in range(days):
        s = bdate + day * i
        # print(type(s))
        yield s.strftime("%Y-%m-%d")


def get_date_list(start=None, end=None):
    if (start is None) | (end is None):
        return []
    else:
        data = []
        for d in gen_date(start, (end - start).days):
            data.append(d)
        return data

import pandas as pd 
from pyhive import presto

cursor = presto.connect(host='presto-master-lb.prod.hulu.com', port = 8080, 
                        catalog = 'hive', username = 'xiaomeng.yang@hulu.com').cursor()
cursor.execute('select * from zzz_emma_genre_16H1_2 limit 10')
result = cursor.fetchall()
result

df = pd.DataFrame(result, columns = ['userid','genre', 'rnk', 'dependency_ratio', 'cumu_ratio'])
import numpy
from pyhive import hive
import pandas as pd
conn = hive.Connection(host="localhost", port=10000, database ='project')
cursor = conn.cursor()
murderdf=pd.read_sql_query("select povertyrate from poverty ",conn)
murdertar=pd.read_sql_query("select murder from crime where district ='TOTAL' and year = 2011",conn)

a=murderdf.as_matrix()
b=murdertar.as_matrix()

print(a)
print(b)
#
# murder = [871, 970, 1095, 1171, 1238, 1244, 1437, 1438, 1631, 1721]
#
# a = [1523, 112]
# b = [10, 20]

print(numpy.corrcoef(a,b))

另外可以借助 pyspark的 hivecontext

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
        
conf = SparkConf().set("spark.executor.memory", "2g") \
  .set("spark.dynamicAllocation.initialExecutors", "2") \
  .set("spark.driver.memory", "2g") \
  .set("spark.kryoserializer.buffer.max", "1g") \
  .set("spark.driver.cores", "4") \
  .set("spark.yarn.queue", "ace") \
  .set("spark.dynamicAllocation.maxExecutors", "32")
  
  
sparkContext = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sparkContext)
hiveContext = HiveContext(sparkContext)


import os
import pandas as pd


def hdfs_to_csv(hdfs_table, csv_name):
  query = "SELECT * FROM prod_rwi_batch.%s" % hdfs_table
  query_df = hiveContext.sql(query)
  query_df.cache()
  df = query_df.toPandas()  

  # save it locally
  csv_file = "%s.csv" % csv_name
  df.to_csv(csv_file)

  # copy it over to BDF
  os.system("hdfs dfs -copyFromLocal %s /user/dhomola" % csv_file) 
  # this didn't work due to access right issues: 
  # hdfs dfs -copyFromLocal initial_pos.csv /production/ace/data/dpt/dhomola

  # delete locally
  os.system("rm %s" % csv_file)

hdfs_to_csv("initialcohort_cp01_1508281365499_1142", "initial_pos")
hdfs_to_csv("restrictivecohort_cp02_1508281365499_1142", "restricted_pos")
hdfs_to_csv("randomsamplecohort_cs03_1508281365499_1142", "random_sample_scoring")
hdfs_to_csv("negativecohort_cn01_1508281365499_1142", "initial_negative")
hdfs_to_csv("cohort_v01_1508281365499_1142", "v01")
hdfs_to_csv("cohort_v02_1508281365499_1142", "v02")

参考

在使用Pandas进行数据处理的时候,我们通常从CSV或EXCEL中导入数据,但有的时候数据都存在数据库内,除了 mysql 还有 其他的 nosql ,我们并没有现成的数据文件,这时候可以通过Pymongo这个库,从mongoDB中读取数据,然后载入到Pandas中,只需要简单的三步。

第一步 安装依赖 导入包

pip3 install  pymongo
import pymongo
import pandas as pd

第二步 设置 MongoDB的连接信息

# 设置MongoDB连接信息
client = pymongo.MongoClient('localhost',27017)
cn_78 = client['cn_78']
project_info = cn_78['project_info']

第三步 加载数据到pandas

data = pd.DataFrame(list(project_info.find()))
# 删除mongodb中的_id字段
del data['_id']
# 选择需要显示的字段
data = data[['aear','cate','subcate','name','maxmoney','minmoney','time']]
print(data)

另外参考

import pandas as pd
from pymongo import MongoClient

#建立MongoDB数据库连接
client = MongoClient('192.168.248.52',12000)
#连接所需数据库,locateInfo为数据库名
db = client.locateInfo
db.authenticate("me", "me")
#连接所用集合
collection = db.dataCollect
#从mongodb获取数据
gpsData = pd.DataFrame(list(collection.find({"deviceId":"05792"})))
#删除数据Id字段
del gpsData['_id']
# 选择需要显示的字段
gpsData = gpsData[['deviceId','lating','lnging','gnssTime','locateType']]
#对数据按照时间的升序排序
gpsData = gpsData.sort('gnssTime')
print(gpsData)

这样就可以轻松地从MongoDB中读取数据到Pandas中进行数据分析了。

pandas 加载 redis 数据

首先有一个 牛逼的聚合 pandas-redistrict
https://github.com/correctiv/pandas-redistrict

还有 redis 自己的 包
https://github.com/andymccurdy/redis-py

pandas序列化方法msgpack:pd.read_msgpack()/to_msgpack()。虽然目前是实验性支持,但应该是最简洁的方法。在读取时其支持迭代化序列。

redis_db = redis.StrictRedis(host="localhost", port=6379, db=0)
data = data.to_msgpack(compress='zlib')
# 
redis_db.setex(key, data, expire_time)

cached_data = redis_db.get(key)
df = pd.read_msgpack(cached_data)

另一种思路:参考timeseries2redis,可以将Tick或Bar数据在redis中读取,实现方法很有趣。

不过我在看其performance时发现并没有pd.read_csv快,pandas的csv读取底层是C实现的,可以达到几十ms量级,如果希望再快几倍,可以考虑用HDF5,pandas读写性能的比较:performance-considerations


这个还是比较靠谱

set:

redisConn.set("key", df.to_msgpack(compress='zlib'))
get:

pd.read_msgpack(redisConn.get("key"))


stackoverflow上的redis-pickle方案):

from redis import StrictRedis
import cPickle as pickle

# StrictRedis类的子类,可以pickling和unpickling复杂对象,
# "pset"和"pget"方法代替StrictRedis类的"set"和"get"方法

class PickledRedis(StrictRedis):

    def pset(self, key, value, ex=None, px=None, nx=False, xx=False):
        value_pickled = pickle.dumps(value, 2)
        return self.set(key, value_pickled, ex=None, px=None, nx=False, xx=False)

    def pget(self, key):
        value_pickled = self.get(key)
        return pickle.loads(value_pickled)

另外参考 python操作redis
//www.greatytc.com/p/2639549bedc8

另外 python 操作 memcache 的也可以看看
https://docs.lvrui.io/2016/07/24/Python%E6%93%8D%E4%BD%9Cmemcache%E8%AF%A6%E8%A7%A3/

pandas 操作 Hbase

有两种方案 ,

  1. hbase ---> pyspark -->pandas dataframe
    2.hbase ---> mgpack--> pandas dataframe

靠谱的有一个 安装 happybase 和 pdhbase

Writing DataFrame to HBase

Establish hbase connection using happybase and write the dataframe.

 import happybase
    import numpy as np
    import pandas as pd
    import pdhbase as pdh
    connection = None
    try:
        connection = happybase.Connection('127.0.0.1')
        connection.open()
        df = pd.DataFrame(np.random.randn(10, 5), columns=['a', 'b', 'c', 'd', 'e'])
        df['f'] = 'hello world'
        pdh.to_hbase(df, connection, 'sample_table', 'df_key', cf='cf')
    finally:
        if connection:
            connection.close()

Reading DataFrame from HBase

Establish hbase connection using happybase and read the dataframe.

import happybase
import numpy as np
import pandas as pd
import pdhbase as pdh
connection = None
try:
    connection = happybase.Connection('127.0.0.1')
    connection.open()
    df = pdh.read_hbase(connection, 'sample_table', 'df_key', cf='cf')
    print df
finally:
    if connection:
        connection.close()

pandas 操作 spark

有一个现成 的 包 比较老 pyspark_pandas

另外其实pyspark rdd 支持与 pandas的dataframe 交互

import pyspark
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pyspark.sql.functions as F
# import seaborn as sns
# import matplotlib.pyplot as plt
import sys
import numpy as np
from surprise import AlgoBase, Dataset, evaluate
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# import alert_program as ap

spark = SparkSession.builder.getOrCreate()


ratings_df = pd.read_table("data/ratings.dat", delimiter = '::',
                                     names=["user", "movie", "rating",
                                            "timestamp"], engine = 'python')
spark_df = spark.createDataFrame(ratings_df)

spark_df = spark_df.drop("timestamp")
train, test = spark_df.randomSplit([0.8, 0.2], seed=427471138)

als = ALS(
          userCol="user",
          itemCol="movie",
          ratingCol="rating",
          nonnegative=False,
          regParam=0.1,
          rank=10
         )
model = als.fit(train)

predictions = model.transform(test)

pandas_df = predictions.toPandas()
pandas_df_clean=pandas_df.fillna(pandas_df.mean())
pandas_df_clean['RMSE']=np.power(pandas_df_clean['rating']-pandas_df_clean['prediction'],2)
RMSE = np.sqrt(sum(pandas_df_clean['RMSE']) / len(pandas_df_clean))

print (RMSE)
from pyspark.sql import SQLContext
from pandas import DataFrame, Series
import pandas
sqlContext = SQLContext(sc)

df = sqlContext.load(source="org.apache.phoenix.spark", zKUrl="localhost:2181:/hbase-unsecure", table="doctors")
pandas_df = df.toPandas()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
import pandas as pd
spark=SparkSession.builder.master("local").appName("my app").config(conf=SparkConf()).getOrCreate()
sc=SparkSession.builder.config(conf=SparkConf())
df=spark.read.format('json').load(['/home/xs/Documents/Weblog.1457006400155.gz','/home/xs/Documents/Weblog.1457006400158.gz',
'/home/xs/Documents/Weblog.1457006401774.gz'])
g1=df.groupBy("captcha_id",F.substring("request_time",1,19).alias("time")).count().filter(df.captcha_id!='')
pandas_df=g1.toPandas()#转换成pandas的dataframe
data_pivot=pandas_df.pivot_table(index=["captcha_id"],columns=["time"],values=["count"])
data_pivot.to_csv("/home/xs/Documents/data.csv",header=True)

from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark
from pyspark.sql.functions import *
import pandas as pd

import numpy as np

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

sc = pyspark.SparkContext()
sqlcontext = SQLContext(sc)


pandas_df = pd.read_csv('rank.csv')

(trainingData, testData) = pandas_df.randomSplit([0.7, 0.3])

pandas_df['split'] = np.random.randn(pandas_df.shape[0], 1)

msk = np.random.rand(len(pandas_df)) <= 0.7

train = pandas_df[msk]
test = pandas_df[~msk]


s_df = sqlcontext.createDataFrame(train)


trainingData=s_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF(["features", "label"])

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(trainingData)

dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

pipeline = Pipeline(stages=[featureIndexer, dt])

model = pipeline.fit(trainingData)


test_df = sqlcontext.createDataFrame(test)
testData=test_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF(["features", "label"])
predictions = model.transform(testData)

predictions.select("prediction", "label", "features").show(5)

pandas 操作 HDFS

主要是通过 webhdfs

最牛逼的 简直是深藏不露
https://github.com/ibis-project/ibis

Ibis: Python data analysis framework for Hadoop and SQL engines
一站到底

pip install ibis-framework

另外一个 也非常棒
https://github.com/RaRe-Technologies/smart_open

>>> # stream lines from an S3 object
>>> for line in smart_open.smart_open('s3://mybucket/mykey.txt'):
...    print line

>>> # using a completely custom s3 server, like s3proxy:
>>> for line in smart_open.smart_open('s3u://user:secret@host:port@mybucket/mykey.txt'):
...    print line

>>> # you can also use a boto.s3.key.Key instance directly:
>>> key = boto.connect_s3().get_bucket("my_bucket").get_key("my_key")
>>> with smart_open.smart_open(key) as fin:
...     for line in fin:
...         print line

>>> # can use context managers too:
>>> with smart_open.smart_open('s3://mybucket/mykey.txt') as fin:
...     for line in fin:
...         print line
...     fin.seek(0)  # seek to the beginning
...     print fin.read(1000)  # read 1000 bytes

>>> # stream from HDFS
>>> for line in smart_open.smart_open('hdfs://user/hadoop/my_file.txt'):
...     print line

>>> # stream from HTTP
>>> for line in smart_open.smart_open('http://example.com/index.html'):
...     print line

>>> # stream from WebHDFS
>>> for line in smart_open.smart_open('webhdfs://host:port/user/hadoop/my_file.txt'):
...     print line

>>> # stream content *into* S3 (write mode):
>>> with smart_open.smart_open('s3://mybucket/mykey.txt', 'wb') as fout:
...     for line in ['first line', 'second line', 'third line']:
...          fout.write(line + '\n')

>>> # stream content *into* HDFS (write mode):
>>> with smart_open.smart_open('hdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
...     for line in ['first line', 'second line', 'third line']:
...          fout.write(line + '\n')

>>> # stream content *into* WebHDFS (write mode):
>>> with smart_open.smart_open('webhdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
...     for line in ['first line', 'second line', 'third line']:
...          fout.write(line + '\n')

>>> # stream from/to local compressed files:
>>> for line in smart_open.smart_open('./foo.txt.gz'):
...    print line

>>> with smart_open.smart_open('/home/radim/foo.txt.bz2', 'wb') as fout:
...    fout.write("some content\n")

另外一个也很牛逼
https://github.com/spotify/snakebite

Snakebite is a python library that provides a pure python HDFS client and a wrapper around Hadoops minicluster. The client uses protobuf for communicating with the NameNode and comes in the form of a library and a command line interface. Currently, the snakebite client supports most actions that involve the Namenode and reading data from DataNodes.

这个也不错
https://github.com/crs4/pydoop
官方网 http://crs4.github.io/pydoop/

下面的也可以的

https://github.com/mtth/hdfs

官网
https://hdfscli.readthedocs.io/en/latest/

这个也超级 叼哦

https://github.com/HariSekhon/pytools
下面的这个比较 新
https://github.com/dask/hdfs3
https://hdfs3.readthedocs.io/en/latest/

from hdfs.client import Client
client = Client("http://host6.cloud.sinocbd.com:50070/")  # 50070: Hadoop默认namenode
dir(client)
# 其中用到的方法有:
# walk() 类似os.walk,返回值也是包含(路径,目录名,文件名)元素的数组,每层迭代。
# read() 类似file.read,官方文档的说法是client.read必须在with块里使用:
# path=[]
# for i in client.walk('/tempfiles/temp',depth=1):
#     for item in i:
#      path.append(item)
#      print(item)
# print(path)
with client.read('/tempfiles/1.csv', encoding='gbk') as fs:
    content = fs.read()
    print(content)
import webhdfspy
import pandas as pd
webHDFS = webhdfspy.WebHDFSClient("host6.cloud.sinocbd.com", 50070,username='root')
data=pd.DataFrame(webHDFS.listdir('/'))
print(data)
pathlist=data['pathSuffix']
for i in pathlist:
    path="/"+pathlist
    # print(path)
    # print(webHDFS.listdir(path))
import os
import pickle

from pathlib import PurePath

import hdfs3
import pandas as pd

from ufuncs.storage.utils import check_abs_path


def hdfs_chmod_dirs(path, *, permission_code, raise_errors=False, hdfs3_conn=None):
    """Try to change permissions of each part of the path.
    Args:
        path (str): Path in HDFS
        permission_code (int/str): Permission to set on each part of the
            path (eg. 777, 775).
        hdfs3_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Raises:
        IOError: If not possible to change permission of a path part.
    """
    check_abs_path(path)
    hdfs = hdfs3_conn if hdfs3_conn else hdfs3.HDFileSystem()
    # change permissions starting with top dir
    _path = '/'
    for d in PurePath(path).parts[1:]:
        _path = os.path.join(_path, d)
        try:
            hdfs.chmod(_path, int(str(permission_code), 8))
        except IOError as e:
            if raise_errors:
                raise e


def hdfs_put_object(obj, storage_path,
                    *, permission_code=755, overwrite=True, hdfs_conn=None):
    """Store a python object to HDFS in pickle file.
    Args:
        obj: Python object.
        storage_path (str): HDFS full path of the file to write to.
        permission_code (int/str): Permission to set on the pickle file
            (eg. 777, 775). Defaults to 755.
        overwrite (bool): Overwrite if file already exists.
            Defaults to True.
        hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Raises:
        FileExistsError: If file exists and overwrite is set to False.
    """
    # create connector if not exists
    hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
    if not overwrite and hdfs.exists(storage_path):
        raise FileExistsError("HDFS file '{}' already exists. "
                              "Argument overwrite is {}."
                              .format(storage_path, overwrite))
    hdfs.mkdir(os.path.dirname(storage_path))
    with hdfs.open(storage_path, 'wb') as f:
        pickle.dump(obj, f)
    hdfs.chmod(storage_path, int(str(permission_code), 8))


def hdfs_get_object(storage_path, *, hdfs_conn=None):
    """Retrieve a python object from a pickle file in HDFS.
    Args:
        storage_path (str): HDFS full path of the file to write to.
        hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Returns:
        The python object that was loaded from the pickle file.
    Raises:
        NameError: If the object's class is not defined in the namespace.
        FileNotFoundError: If file is not found.
    """
    hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
    try:
        with hdfs.open(storage_path, 'rb') as f:
            obj = pickle.load(f)
    except IOError as _:
        raise FileNotFoundError("No such file or directory in HDFS: '{}'."
                                .format(storage_path))
    except AttributeError as _:
        raise NameError("Pickle file object class not found in the namespace.")
    return obj


def hdfs_df2csv(df, storage_path, *,
                permission_code=755, overwrite=True, hdfs_conn=None, **kwargs):
    """Save pandas dataframe to csv in HDFS.
    The kwargs are used to represent any argument from the known
    pandas.DataFrame.to_csv function.
    Args:
        df (pandas.DataFrame): Dataframe to write as csv
        permission_code (int/str): Permission to set on the pickle file
            (eg. 777, 775). Defaults to 755.
        overwrite (bool): Overwrite if file already exists.
            Defaults to True.
        hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Raises:
        TypeError: If df is not a pandas DataFrame
        FileExistsError: If file exists and overwrite is set to False.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Expected pandas Dataframe, got {}"
                        .format(type(df).__name__))
    check_abs_path(storage_path)
    # make hdfs connection
    hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
    # check if file to overrite
    if not overwrite and hdfs.exists(storage_path):
        raise FileExistsError("HDFS file '{}' already exists. "
                              "Argument overwrite is {}."
                              .format(storage_path, overwrite))
    # make directories
    hdfs.mkdir(os.path.dirname(storage_path))
    # write csv bytes to HDFS file
    with hdfs.open(storage_path, 'wb') as f:
        f.write(df.to_csv(**kwargs))
    # change permission
    hdfs.chmod(storage_path, int(str(permission_code), 8))


def hdfs_csv2df(storage_path, *, hdfs_conn=None, **kwargs):
    """Read .csv from HDFS into a pandas dataframe.
    The kwargs are used to represent any argument from the known
    pandas.DataFrame.read_csv function.
    Args:
        storage_path (str): Location of .csv file in HDFS
        hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Returns:
        pd.DataFrame: Dataframe with .csv data
    """
    check_abs_path(storage_path)
    hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
    with hdfs.open(storage_path, 'rb') as f:
        df = pd.read_csv(f, **kwargs)
    return df
import warnings
warnings.filterwarnings('ignore')

import sys
import random
import numpy as np

from sklearn import linear_model, cross_validation, metrics, svm
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import pydoop.hdfs as hdfs

def read_csv_from_hdfs(path, cols, col_types=None):
  files = hdfs.ls(path);
  pieces = []
  for f in files:
    fhandle = hdfs.open(f)
    pieces.append(pd.read_csv(fhandle, names=cols, dtype=col_types))
    fhandle.close()
  return pd.concat(pieces, ignore_index=True)
import os
import pytest

import fastparquet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pandas.util.testing as tm


def hdfs_test_client(driver='libhdfs'):
    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
    user = os.environ['ARROW_HDFS_TEST_USER']
    try:
        port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500))
    except ValueError:
        raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
                         'an integer')

    return pa.HdfsClient(host, port, user, driver=driver)


def test_fastparquet_read_with_hdfs():
    fs = hdfs_test_client()

    df = tm.makeDataFrame()
    table = pa.Table.from_pandas(df)

    path = '/tmp/testing.parquet'
    with fs.open(path, 'wb') as f:
        pq.write_table(table, f)

    parquet_file = fastparquet.ParquetFile(path, open_with=fs.open)

    result = parquet_file.to_pandas()
    tm.assert_frame_equal(result, df)

python 操作 flink

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/python.html
https://github.com/wdm0006/flink-python-examples

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,194评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,058评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,780评论 0 346
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,388评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,430评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,764评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,907评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,679评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,122评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,459评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,605评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,270评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,867评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,734评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,961评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,297评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,472评论 2 348

推荐阅读更多精彩内容