420 lines
10 KiB
C
420 lines
10 KiB
C
/*
|
|
* libhdfs engine
|
|
*
|
|
* this engine helps perform read/write operations on hdfs cluster using
|
|
* libhdfs. hdfs doesnot support modification of data once file is created.
|
|
*
|
|
* so to mimic that create many files of small size (e.g 256k), and this
|
|
* engine select a file based on the offset generated by fio.
|
|
*
|
|
* thus, random reads and writes can also be achieved with this logic.
|
|
*
|
|
*/
|
|
|
|
#include <math.h>
|
|
#include <hdfs.h>
|
|
|
|
#include "../fio.h"
|
|
#include "../optgroup.h"
|
|
|
|
#define CHUNCK_NAME_LENGTH_MAX 80
|
|
#define CHUNCK_CREATION_BUFFER_SIZE 65536
|
|
|
|
struct hdfsio_data {
|
|
hdfsFS fs;
|
|
hdfsFile fp;
|
|
uint64_t curr_file_id;
|
|
};
|
|
|
|
struct hdfsio_options {
|
|
void *pad; /* needed because offset can't be 0 for a option defined used offsetof */
|
|
char *host;
|
|
char *directory;
|
|
unsigned int port;
|
|
unsigned int chunck_size;
|
|
unsigned int single_instance;
|
|
unsigned int use_direct;
|
|
};
|
|
|
|
static struct fio_option options[] = {
|
|
{
|
|
.name = "namenode",
|
|
.lname = "hfds namenode",
|
|
.type = FIO_OPT_STR_STORE,
|
|
.off1 = offsetof(struct hdfsio_options, host),
|
|
.def = "localhost",
|
|
.help = "Namenode of the HDFS cluster",
|
|
.category = FIO_OPT_C_ENGINE,
|
|
.group = FIO_OPT_G_HDFS,
|
|
},
|
|
{
|
|
.name = "hostname",
|
|
.lname = "hfds namenode",
|
|
.type = FIO_OPT_STR_STORE,
|
|
.off1 = offsetof(struct hdfsio_options, host),
|
|
.def = "localhost",
|
|
.help = "Namenode of the HDFS cluster",
|
|
.category = FIO_OPT_C_ENGINE,
|
|
.group = FIO_OPT_G_HDFS,
|
|
},
|
|
{
|
|
.name = "port",
|
|
.lname = "hdfs namenode port",
|
|
.type = FIO_OPT_INT,
|
|
.off1 = offsetof(struct hdfsio_options, port),
|
|
.def = "9000",
|
|
.minval = 1,
|
|
.maxval = 65535,
|
|
.help = "Port used by the HDFS cluster namenode",
|
|
.category = FIO_OPT_C_ENGINE,
|
|
.group = FIO_OPT_G_HDFS,
|
|
},
|
|
{
|
|
.name = "hdfsdirectory",
|
|
.lname = "hfds directory",
|
|
.type = FIO_OPT_STR_STORE,
|
|
.off1 = offsetof(struct hdfsio_options, directory),
|
|
.def = "/",
|
|
.help = "The HDFS directory where fio will create chuncks",
|
|
.category = FIO_OPT_C_ENGINE,
|
|
.group = FIO_OPT_G_HDFS,
|
|
},
|
|
{
|
|
.name = "chunk_size",
|
|
.alias = "chunck_size",
|
|
.lname = "Chunk size",
|
|
.type = FIO_OPT_INT,
|
|
.off1 = offsetof(struct hdfsio_options, chunck_size),
|
|
.def = "1048576",
|
|
.help = "Size of individual chunck",
|
|
.category = FIO_OPT_C_ENGINE,
|
|
.group = FIO_OPT_G_HDFS,
|
|
},
|
|
{
|
|
.name = "single_instance",
|
|
.lname = "Single Instance",
|
|
.type = FIO_OPT_BOOL,
|
|
.off1 = offsetof(struct hdfsio_options, single_instance),
|
|
.def = "1",
|
|
.help = "Use a single instance",
|
|
.category = FIO_OPT_C_ENGINE,
|
|
.group = FIO_OPT_G_HDFS,
|
|
},
|
|
{
|
|
.name = "hdfs_use_direct",
|
|
.lname = "HDFS Use Direct",
|
|
.type = FIO_OPT_BOOL,
|
|
.off1 = offsetof(struct hdfsio_options, use_direct),
|
|
.def = "0",
|
|
.help = "Use readDirect instead of hdfsRead",
|
|
.category = FIO_OPT_C_ENGINE,
|
|
.group = FIO_OPT_G_HDFS,
|
|
},
|
|
{
|
|
.name = NULL,
|
|
},
|
|
};
|
|
|
|
|
|
static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
|
|
return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
|
|
}
|
|
|
|
static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
|
|
{
|
|
struct hdfsio_options *options = td->eo;
|
|
struct hdfsio_data *hd = td->io_ops_data;
|
|
unsigned long f_id;
|
|
char fname[CHUNCK_NAME_LENGTH_MAX];
|
|
int open_flags;
|
|
|
|
/* find out file id based on the offset generated by fio */
|
|
f_id = floor(io_u->offset / options-> chunck_size);
|
|
|
|
if (f_id == hd->curr_file_id) {
|
|
/* file is already open */
|
|
return 0;
|
|
}
|
|
|
|
if (hd->curr_file_id != -1) {
|
|
if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
|
|
log_err("hdfs: unable to close file: %s\n", strerror(errno));
|
|
return errno;
|
|
}
|
|
hd->curr_file_id = -1;
|
|
}
|
|
|
|
if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
|
|
open_flags = O_RDONLY;
|
|
} else if (io_u->ddir == DDIR_WRITE) {
|
|
open_flags = O_WRONLY;
|
|
} else {
|
|
log_err("hdfs: Invalid I/O Operation\n");
|
|
return 0;
|
|
}
|
|
|
|
get_chunck_name(fname, io_u->file->file_name, f_id);
|
|
hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
|
|
options->chunck_size);
|
|
if(hd->fp == NULL) {
|
|
log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
|
|
return errno;
|
|
}
|
|
hd->curr_file_id = f_id;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
|
|
{
|
|
struct hdfsio_data *hd = td->io_ops_data;
|
|
struct hdfsio_options *options = td->eo;
|
|
int ret;
|
|
unsigned long offset;
|
|
|
|
offset = io_u->offset % options->chunck_size;
|
|
|
|
if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) &&
|
|
hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
|
|
log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno));
|
|
io_u->error = errno;
|
|
return FIO_Q_COMPLETED;
|
|
};
|
|
|
|
// do the IO
|
|
if (io_u->ddir == DDIR_READ) {
|
|
if (options->use_direct) {
|
|
ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
|
|
} else {
|
|
ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
|
|
}
|
|
} else if (io_u->ddir == DDIR_WRITE) {
|
|
ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
|
|
io_u->xfer_buflen);
|
|
} else if (io_u->ddir == DDIR_SYNC) {
|
|
ret = hdfsFlush(hd->fs, hd->fp);
|
|
} else {
|
|
log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
|
|
ret = EINVAL;
|
|
}
|
|
|
|
// Check if the IO went fine, or is incomplete
|
|
if (ret != (int)io_u->xfer_buflen) {
|
|
if (ret >= 0) {
|
|
io_u->resid = io_u->xfer_buflen - ret;
|
|
io_u->error = 0;
|
|
return FIO_Q_COMPLETED;
|
|
} else {
|
|
io_u->error = errno;
|
|
}
|
|
}
|
|
|
|
if (io_u->error)
|
|
td_verror(td, io_u->error, "xfer");
|
|
|
|
return FIO_Q_COMPLETED;
|
|
}
|
|
|
|
int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
|
|
{
|
|
if (td->o.odirect) {
|
|
td->error = EINVAL;
|
|
return 0;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
|
|
{
|
|
struct hdfsio_data *hd = td->io_ops_data;
|
|
|
|
if (hd->curr_file_id != -1) {
|
|
if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
|
|
log_err("hdfs: unable to close file: %s\n", strerror(errno));
|
|
return errno;
|
|
}
|
|
hd->curr_file_id = -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int fio_hdfsio_init(struct thread_data *td)
|
|
{
|
|
struct hdfsio_options *options = td->eo;
|
|
struct hdfsio_data *hd = td->io_ops_data;
|
|
struct fio_file *f;
|
|
uint64_t j,k;
|
|
int i, failure = 0;
|
|
uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
|
|
uint64_t bytes_left;
|
|
char fname[CHUNCK_NAME_LENGTH_MAX];
|
|
hdfsFile fp;
|
|
hdfsFileInfo *fi;
|
|
tOffset fi_size;
|
|
|
|
for_each_file(td, f, i) {
|
|
k = 0;
|
|
for(j=0; j < f->real_file_size; j += options->chunck_size) {
|
|
get_chunck_name(fname, f->file_name, k++);
|
|
fi = hdfsGetPathInfo(hd->fs, fname);
|
|
fi_size = fi ? fi->mSize : 0;
|
|
// fill exist and is big enough, nothing to do
|
|
if( fi && fi_size >= options->chunck_size) {
|
|
continue;
|
|
}
|
|
fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
|
|
options->chunck_size);
|
|
if(fp == NULL) {
|
|
failure = errno;
|
|
log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
|
|
break;
|
|
}
|
|
bytes_left = options->chunck_size;
|
|
memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
|
|
while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
|
|
if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
|
|
!= CHUNCK_CREATION_BUFFER_SIZE) {
|
|
failure = errno;
|
|
log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
|
|
break;
|
|
};
|
|
bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
|
|
}
|
|
if(bytes_left > 0) {
|
|
if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
|
|
!= bytes_left) {
|
|
failure = errno;
|
|
break;
|
|
};
|
|
}
|
|
if( hdfsCloseFile(hd->fs, fp) != 0) {
|
|
failure = errno;
|
|
log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
|
|
break;
|
|
}
|
|
}
|
|
if(failure) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if( !failure ) {
|
|
fio_file_set_size_known(f);
|
|
}
|
|
|
|
return failure;
|
|
}
|
|
|
|
static int fio_hdfsio_setup(struct thread_data *td)
|
|
{
|
|
struct hdfsio_data *hd;
|
|
struct fio_file *f;
|
|
int i;
|
|
uint64_t file_size, total_file_size;
|
|
|
|
if (!td->io_ops_data) {
|
|
hd = malloc(sizeof(*hd));
|
|
memset(hd, 0, sizeof(*hd));
|
|
|
|
hd->curr_file_id = -1;
|
|
|
|
td->io_ops_data = hd;
|
|
}
|
|
|
|
total_file_size = 0;
|
|
file_size = 0;
|
|
|
|
for_each_file(td, f, i) {
|
|
if(!td->o.file_size_low) {
|
|
file_size = floor(td->o.size / td->o.nr_files);
|
|
total_file_size += file_size;
|
|
}
|
|
else if (td->o.file_size_low == td->o.file_size_high)
|
|
file_size = td->o.file_size_low;
|
|
else {
|
|
file_size = get_rand_file_size(td);
|
|
}
|
|
f->real_file_size = file_size;
|
|
}
|
|
/* If the size doesn't divide nicely with the chunck size,
|
|
* make the last files bigger.
|
|
* Used only if filesize was not explicitely given
|
|
*/
|
|
if (!td->o.file_size_low && total_file_size < td->o.size) {
|
|
f->real_file_size += (td->o.size - total_file_size);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
|
|
{
|
|
struct hdfsio_data *hd = td->io_ops_data;
|
|
struct hdfsio_options *options = td->eo;
|
|
int failure;
|
|
struct hdfsBuilder *bld;
|
|
|
|
if (options->host == NULL || options->port == 0) {
|
|
log_err("hdfs: server not defined\n");
|
|
return EINVAL;
|
|
}
|
|
|
|
bld = hdfsNewBuilder();
|
|
if (!bld) {
|
|
failure = errno;
|
|
log_err("hdfs: unable to allocate connect builder\n");
|
|
return failure;
|
|
}
|
|
hdfsBuilderSetNameNode(bld, options->host);
|
|
hdfsBuilderSetNameNodePort(bld, options->port);
|
|
if(! options->single_instance) {
|
|
hdfsBuilderSetForceNewInstance(bld);
|
|
}
|
|
hd->fs = hdfsBuilderConnect(bld);
|
|
|
|
/* hdfsSetWorkingDirectory succeed on non existend directory */
|
|
if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
|
|
failure = errno;
|
|
log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
|
|
return failure;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
|
|
{
|
|
struct hdfsio_data *hd = td->io_ops_data;
|
|
|
|
if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
|
|
log_err("hdfs: disconnect failed: %d\n", errno);
|
|
}
|
|
}
|
|
|
|
static struct ioengine_ops ioengine_hdfs = {
|
|
.name = "libhdfs",
|
|
.version = FIO_IOOPS_VERSION,
|
|
.flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
|
|
.setup = fio_hdfsio_setup,
|
|
.init = fio_hdfsio_init,
|
|
.prep = fio_hdfsio_prep,
|
|
.queue = fio_hdfsio_queue,
|
|
.open_file = fio_hdfsio_open_file,
|
|
.close_file = fio_hdfsio_close_file,
|
|
.io_u_init = fio_hdfsio_io_u_init,
|
|
.io_u_free = fio_hdfsio_io_u_free,
|
|
.option_struct_size = sizeof(struct hdfsio_options),
|
|
.options = options,
|
|
};
|
|
|
|
|
|
static void fio_init fio_hdfsio_register(void)
|
|
{
|
|
register_ioengine(&ioengine_hdfs);
|
|
}
|
|
|
|
static void fio_exit fio_hdfsio_unregister(void)
|
|
{
|
|
unregister_ioengine(&ioengine_hdfs);
|
|
}
|