Httpfs是hadoop2.x中hdfs项目的内置应用,基于tomcat和jesery,对外提供完备HDFS操作的RESTful接口,无需安装客户端,可方便实现数据交互,如从windows访问存储在hdfs上的文件。本文通过Httpfs说明文档,实现了一个基于libcurl和jsoncpp的httpfs客户端程序(C++)。
1.准备工作
1.1 编译jsoncpp
jsoncpp下载地址:https://codeload.github.com/open-source-parsers/jsoncpp/zip/master
使用VS2010打开jsoncpp解压文件夹/makefiles/msvc2010/jsoncpp.sln,选择lib_json,设置项目的属性。具体设置为:1)常规里设置配置类型为.lib,使用多字节字符集C/C++->代码生成中的代码生成选择 /MD(release) /MDd(debug)。编译环境必须与我们开发的工程一致!!!
1.2编译libcurl
libcurl下载地址:https://curl.haxx.se/download/curl-7.47.1.tar.gz
打开curl解压目录\projects\Windows\VC10\curl-all.sln ,选择lib_debug和lib_release编译。vs2010引用静态链接失败解决:
1)给工程添加依赖的库:项目->属性->链接器->输入->附加依赖项,把libcurl.lib ws2_32.lib winmm.lib wldap32.lib添加进去(注意,debug配置用libcurld.lib).
2)、加入预编译选项:项目->属性->c/c++ ->预处理器->预处理器,把;BUILDING_LIBCURL;HTTP_ONLY复制进去(注意不要丢了;)
解决方案来自网络“vc2010使用libcurl静态库 遇到连接失败的解决方案”
1.3设置头文件引用
在工程路径下创建一个include目录,将libcurl和jsoncpp中的include文件夹下的文件复制到该include文件夹下,设置为vc++目录引用路径。
2.代码实现
HdfsClient.H
#pragma once #include <string> #include <vector> using namespace std; typedef struct FileStatus { __int64 accessTime; __int64 blocksize; string group; __int64 length; __int64 modificationTime; string owner; string pathSuffix; string permission; int replication; string type; }FileStatus; class CHttpFSClient { private: string m_hostaddr; //http://<HOST>:<PORT>/webhdfs/v1/ string m_username; //i.e. hadoop long m_timeout; long m_conntimeout; public: enum HTTP_TYPE{GET=0,PUT,POST,DEL}; public: CHttpFSClient(string& hostaddr,string& username); ~CHttpFSClient(void); bool create(string& local_file,string& rem_file,bool overwrite = false); bool append(string& local_file,string& rem_file); bool mkdirs(string& path); bool rename(string& src,string& dst); bool del(string& path, bool recursive=false); bool read(string& rem_file,string& local_file, long offset=0, long length=0); bool ls(string& rem_path,vector<FileStatus>& results); protected: static size_t fileread_callback(void *ptr, size_t size, size_t nmemb, void *stream); static size_t filewrite_data(const char *ptr, size_t size, size_t nmemb, void *stream); static size_t memwrite_data(const char *contents, size_t size, size_t nmemb, string *stream); static size_t header_callback(const char *ptr, size_t size, size_t nmemb, std::string *stream); void showFileStatus(vector<FileStatus>& results); };
HdfsClient.cpp
#include "StdAfx.h" #include "HdfsClient.h" HdfsClient::HdfsClient(void) { m_IsConn = false; } HdfsClient::~HdfsClient(void) { if(m_IsConn) shutdown(); } bool HdfsClient::connect(std::string server,int port) { m_Socket = shared_ptr<TTransport>(new TSocket(server,port)); m_Transport = shared_ptr<TBufferedTransport>(new TBufferedTransport(m_Socket)); m_Protocol = shared_ptr<TBinaryProtocol>(new TBinaryProtocol(m_Transport)); m_Client = shared_ptr<ThriftHadoopFileSystemClient>(new ThriftHadoopFileSystemClient(m_Protocol)); try { m_Transport->open(); // tell the HadoopThrift server to die after 60 minutes of inactivity m_Client->setInactivityTimeoutPeriod(3600); m_IsConn = true; } catch (const ThriftIOException& ex) { printf("ERROR: %s",ex.message.c_str()); return false; } return true; } bool HdfsClient::shutdown() { try { m_Transport->close(); m_IsConn = false; } catch (const ThriftIOException& ex) { printf("ERROR: %s",ex.message.c_str()); return false; } return true; } bool HdfsClient::put(std::string& localfile,std::string& rem_path) { Pathname ptname; ptname.pathname = rem_path; m_Client->create(m_Handler,ptname);//Create the specified file. Returns a handle to write data. if(m_Handler.id == 0)//error return false; else { FILE* fp = fopen(localfile.c_str(),"rb"); while(!feof(fp)) { fread(m_Buffer,sizeof(char),BufferSize,fp); m_Client->write(m_Handler,m_Buffer); } fclose(fp); return m_Client->close(m_Handler); } } bool HdfsClient::append(std::string& localfile,std::string& rem_path) { Pathname ptname; ptname.pathname = rem_path; m_Client->append(m_Handler,ptname); if(m_Handler.id == 0)//error return false; else { FILE* fp = fopen(localfile.c_str(),"rb"); while(!feof(fp)) { fread(m_Buffer,sizeof(char),BufferSize,fp); m_Client->write(m_Handler,m_Buffer); } fclose(fp); return m_Client->close(m_Handler); } } bool HdfsClient::get(std::string& rem_path,std::string& localfile) { Pathname ptname; ptname.pathname = rem_path; m_Client->open(m_Handler,ptname); if(m_Handler.id == 0)//error return false; else { FileStatus rfstat; m_Client->stat(rfstat,ptname); int64_t offset = 0; int64_t chunksize = 1 << 20;//1M std::string content; FILE* fp = fopen(localfile.c_str(),"wb"); while(offset < rfstat.length) { m_Client->read(content,m_Handler,offset,chunksize); if(content != "") { fwrite(content.c_str(),sizeof(char),content.length(),fp);//todo: can use multi thread to read and write } offset += chunksize; } fclose(fp); return m_Client->close(m_Handler); } } bool HdfsClient::rm(std::string& rem_path, const bool recursive) { Pathname ptname; ptname.pathname = rem_path; return m_Client->rm(ptname,recursive); } bool HdfsClient::mv(std::string& src_path,std::string& dst_path) { Pathname src_ptname,dst_ptname; src_ptname.pathname = src_path; dst_ptname.pathname = dst_path; return m_Client->rename(src_ptname,dst_ptname); } bool HdfsClient::mkdirs(std::string& rem_path) { Pathname ptname; ptname.pathname = rem_path; return m_Client->mkdirs(ptname); } bool HdfsClient::exists(std::string& rem_path) { Pathname ptname; ptname.pathname = rem_path; return m_Client->exists(ptname); } void HdfsClient::ls(std::vector<FileStatus> & result, std::string& path) { Pathname ptname; ptname.pathname = path; m_Client->listStatus(result,ptname); } void HdfsClient::chmod(std::string& path, const int16_t mode) { Pathname ptname; ptname.pathname = path; m_Client->chmod(ptname,mode); } void HdfsClient::chown(std::string& path, const std::string& owner) { Pathname ptname; ptname.pathname = path; FileStatus rfstat; m_Client->stat(rfstat,ptname); m_Client->chown(ptname,owner,rfstat.group); } void HdfsClient::setReplication(const std::string& path, const int16_t replication) { Pathname ptname; ptname.pathname = path; m_Client->setReplication(ptname,replication); } void HdfsClient::getFileBlockLocations(std::vector<BlockLocation> & result, const std::string& path, const int64_t start, const int64_t length) { Pathname ptname; ptname.pathname = path; m_Client->getFileBlockLocations(result,ptname,start,length); } int main() { std::string host = "192.168.0.113"; int port = 54573; HdfsClient hdfs; std::string local_file = "G:\\HDFSClient\\HDFSClient\\test.txt"; std::string rem_file = "/test.txt"; hdfs.connect(host,port); hdfs.get(rem_file,local_file); return 0; }
3.工程代码下载
http://files.cnblogs.com/files/hikeepgoing/HttpfsClient.rar
原文:http://www.cnblogs.com/hikeepgoing/p/5294276.html