首页 > 编程语言 > 详细

基于Httpfs访问HDFS的C++实现

时间:2016-03-19 08:50:02      阅读:280      评论:0      收藏:0      [点我收藏+]

       Httpfs是hadoop2.x中hdfs项目的内置应用,基于tomcat和jesery,对外提供完备HDFS操作的RESTful接口,无需安装客户端,可方便实现数据交互,如从windows访问存储在hdfs上的文件。本文通过Httpfs说明文档,实现了一个基于libcurl和jsoncpp的httpfs客户端程序(C++)。

      1.准备工作

  1.1 编译jsoncpp

     jsoncpp下载地址:https://codeload.github.com/open-source-parsers/jsoncpp/zip/master

      使用VS2010打开jsoncpp解压文件夹/makefiles/msvc2010/jsoncpp.sln,选择lib_json,设置项目的属性。具体设置为:1)常规里设置配置类型为.lib,使用多字节字符集C/C++->代码生成中的代码生成选择 /MD(release) /MDd(debug)。编译环境必须与我们开发的工程一致!!!


    1.2编译libcurl

     libcurl下载地址:https://curl.haxx.se/download/curl-7.47.1.tar.gz

      打开curl解压目录\projects\Windows\VC10\curl-all.sln ,选择lib_debug和lib_release编译。vs2010引用静态链接失败解决:

      1)给工程添加依赖的库:项目->属性->链接器->输入->附加依赖项,把libcurl.lib ws2_32.lib winmm.lib wldap32.lib添加进去(注意,debug配置用libcurld.lib).

      2)、加入预编译选项:项目->属性->c/c++ ->预处理器->预处理器,把;BUILDING_LIBCURL;HTTP_ONLY复制进去(注意不要丢了;)

     解决方案来自网络“vc2010使用libcurl静态库 遇到连接失败的解决方案”

 

    1.3设置头文件引用

     在工程路径下创建一个include目录,将libcurl和jsoncpp中的include文件夹下的文件复制到该include文件夹下,设置为vc++目录引用路径。

  

2.代码实现

 HdfsClient.H

#pragma once
#include <string>
#include <vector>
using namespace std;

typedef struct FileStatus {
    __int64 accessTime;
    __int64 blocksize;
    string group;
    __int64 length;
    __int64 modificationTime;
    string owner;
    string pathSuffix;
    string permission;
    int replication;
    string type;
}FileStatus;

class CHttpFSClient
{
private:
    string m_hostaddr;    //http://<HOST>:<PORT>/webhdfs/v1/
    string m_username;  //i.e. hadoop
    long m_timeout;
    long m_conntimeout;
public:
    enum HTTP_TYPE{GET=0,PUT,POST,DEL};
public:
    CHttpFSClient(string& hostaddr,string& username);
    ~CHttpFSClient(void);
    bool create(string& local_file,string& rem_file,bool overwrite = false);
    bool append(string& local_file,string& rem_file);
    bool mkdirs(string& path);
    bool rename(string& src,string& dst);
    bool del(string& path, bool recursive=false);
    bool read(string& rem_file,string& local_file, long offset=0, long length=0);
    bool ls(string& rem_path,vector<FileStatus>& results);
protected:
    static size_t fileread_callback(void *ptr, size_t size, size_t nmemb, void *stream);
    static size_t filewrite_data(const char *ptr, size_t size, size_t nmemb, void *stream);
    static size_t memwrite_data(const char *contents, size_t size, size_t nmemb, string *stream);   
    static size_t header_callback(const char  *ptr, size_t size, size_t nmemb, std::string *stream);


    void showFileStatus(vector<FileStatus>& results);
};

 

HdfsClient.cpp

#include "StdAfx.h"
#include "HdfsClient.h"

HdfsClient::HdfsClient(void)
{
    m_IsConn = false;
}

HdfsClient::~HdfsClient(void)
{
    if(m_IsConn)
        shutdown();
}

bool HdfsClient::connect(std::string server,int port)
{
    m_Socket = shared_ptr<TTransport>(new TSocket(server,port));
    m_Transport = shared_ptr<TBufferedTransport>(new TBufferedTransport(m_Socket));
    m_Protocol = shared_ptr<TBinaryProtocol>(new TBinaryProtocol(m_Transport));
    m_Client = shared_ptr<ThriftHadoopFileSystemClient>(new ThriftHadoopFileSystemClient(m_Protocol));

    try
    {
        m_Transport->open();
        // tell the HadoopThrift server to die after 60 minutes of inactivity
        m_Client->setInactivityTimeoutPeriod(3600);
        m_IsConn = true;
    }
    catch (const ThriftIOException& ex)
    {
        printf("ERROR: %s",ex.message.c_str());
        return false;
    }
    return true;
}


bool HdfsClient::shutdown()
{
    try
    {
        m_Transport->close();
        m_IsConn = false;
    }
    catch (const ThriftIOException& ex)
    {
        printf("ERROR: %s",ex.message.c_str());
        return false;
    }
    return true;
}

bool HdfsClient::put(std::string& localfile,std::string& rem_path)
{
    Pathname ptname;
    ptname.pathname = rem_path;
    m_Client->create(m_Handler,ptname);//Create the specified file. Returns a handle to write data.

    if(m_Handler.id == 0)//error
        return false;
    else
    {
        FILE* fp = fopen(localfile.c_str(),"rb");
        while(!feof(fp))
        {
            fread(m_Buffer,sizeof(char),BufferSize,fp);
            m_Client->write(m_Handler,m_Buffer);
        }
        fclose(fp);
        return m_Client->close(m_Handler);
    }
}

bool HdfsClient::append(std::string& localfile,std::string& rem_path)
{
    Pathname ptname;
    ptname.pathname = rem_path;
    m_Client->append(m_Handler,ptname);

    if(m_Handler.id == 0)//error
        return false;
    else
    {
        FILE* fp = fopen(localfile.c_str(),"rb");
        while(!feof(fp))
        {
            fread(m_Buffer,sizeof(char),BufferSize,fp);
            m_Client->write(m_Handler,m_Buffer);
        }
        fclose(fp);
        return m_Client->close(m_Handler);
    }
}

bool HdfsClient::get(std::string& rem_path,std::string& localfile)
{
    Pathname ptname;
    ptname.pathname = rem_path;
    m_Client->open(m_Handler,ptname);

    if(m_Handler.id == 0)//error
        return false;
    else
    {
        FileStatus rfstat;
        m_Client->stat(rfstat,ptname);

        int64_t offset = 0;
        int64_t chunksize = 1 << 20;//1M
        std::string content;
        FILE* fp = fopen(localfile.c_str(),"wb");
        while(offset < rfstat.length)
        {
            m_Client->read(content,m_Handler,offset,chunksize);
            if(content != "")
            {
                fwrite(content.c_str(),sizeof(char),content.length(),fp);//todo: can use multi thread to read and write
            }
            offset += chunksize;
        }
        fclose(fp);
        return m_Client->close(m_Handler);
    }
}

bool HdfsClient::rm(std::string& rem_path, const bool recursive)
{
    Pathname ptname;
    ptname.pathname = rem_path;
    return m_Client->rm(ptname,recursive);
}

bool HdfsClient::mv(std::string& src_path,std::string& dst_path)
{
    Pathname src_ptname,dst_ptname;
    src_ptname.pathname = src_path;
    dst_ptname.pathname = dst_path;
    return m_Client->rename(src_ptname,dst_ptname);
}

bool HdfsClient::mkdirs(std::string& rem_path)
{
    Pathname ptname;
    ptname.pathname = rem_path;
    return m_Client->mkdirs(ptname);
}

bool HdfsClient::exists(std::string& rem_path)
{
    Pathname ptname;
    ptname.pathname = rem_path;
    return m_Client->exists(ptname);
}

void HdfsClient::ls(std::vector<FileStatus> & result, std::string& path)
{
    Pathname ptname;
    ptname.pathname = path;
    m_Client->listStatus(result,ptname);
}

void HdfsClient::chmod(std::string& path, const int16_t mode)
{
    Pathname ptname;
    ptname.pathname = path;
    m_Client->chmod(ptname,mode);
}

void HdfsClient::chown(std::string& path, const std::string& owner)
{
    Pathname ptname;
    ptname.pathname = path;

    FileStatus rfstat;
    m_Client->stat(rfstat,ptname);
    m_Client->chown(ptname,owner,rfstat.group);
}

void HdfsClient::setReplication(const std::string& path, const int16_t replication)
{
    Pathname ptname;
    ptname.pathname = path;
    m_Client->setReplication(ptname,replication);
}

void HdfsClient::getFileBlockLocations(std::vector<BlockLocation> & result, const std::string& path, const int64_t start, const int64_t length)
{
    Pathname ptname;
    ptname.pathname = path;

    m_Client->getFileBlockLocations(result,ptname,start,length);
}

int main()
{
    std::string host = "192.168.0.113";
    int port = 54573;
    HdfsClient hdfs;
    std::string local_file = "G:\\HDFSClient\\HDFSClient\\test.txt";
    std::string rem_file = "/test.txt";
    hdfs.connect(host,port);
    hdfs.get(rem_file,local_file);
    return 0;
}

 

3.工程代码下载

 http://files.cnblogs.com/files/hikeepgoing/HttpfsClient.rar

基于Httpfs访问HDFS的C++实现

原文:http://www.cnblogs.com/hikeepgoing/p/5294276.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!